Change IndexStmt to generate worker DDL on master

Because we can't execute CREATE INDEX CONCURRENTLY during transactions,
worker_apply_shard_ddl_command is insufficient.
pull/1287/head
Jason Petersen 2017-03-21 16:47:53 -06:00
parent de034d2cab
commit 95d8d27c4f
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
3 changed files with 156 additions and 1 deletions

View File

@ -134,6 +134,7 @@ static bool IsAlterTableRenameStmt(RenameStmt *renameStatement);
static void ExecuteDistributedDDLJob(DDLJob *ddlJob);
static void ShowNoticeIfNotUsing2PC(void);
static List * DDLTaskList(Oid relationId, const char *commandString);
static List * IndexTaskList(Oid relationId, IndexStmt *indexStmt);
static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId,
const char *commandString);
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
@ -678,7 +679,7 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand)
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId;
ddlJob->commandString = createIndexCommand;
ddlJob->taskList = DDLTaskList(relationId, createIndexCommand);
ddlJob->taskList = IndexTaskList(relationId, createIndexStatement);
ddlJobs = list_make1(ddlJob);
}
@ -2071,6 +2072,60 @@ DDLTaskList(Oid relationId, const char *commandString)
}
/*
* DDLTaskList builds a list of tasks to execute a DDL command on a
* given list of shards.
*/
static List *
IndexTaskList(Oid relationId, IndexStmt *indexStmt)
{
List *taskList = NIL;
List *shardIntervalList = LoadShardIntervalList(relationId);
ListCell *shardIntervalCell = NULL;
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
StringInfoData ddlString;
uint64 jobId = INVALID_JOB_ID;
int taskId = 1;
initStringInfo(&ddlString);
/* set statement's schema name if it is not set already */
if (indexStmt->relation->schemaname == NULL)
{
indexStmt->relation->schemaname = schemaName;
}
/* lock metadata before getting placement lists */
LockShardListMetadata(shardIntervalList, ShareLock);
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
Task *task = NULL;
deparse_shard_index_statement(indexStmt, relationId, shardId, &ddlString);
task = CitusMakeNode(Task);
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = DDL_TASK;
task->queryString = pstrdup(ddlString.data);
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependedTaskList = NULL;
task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId);
taskList = lappend(taskList, task);
resetStringInfo(&ddlString);
}
return taskList;
}
/*
* ForeignKeyTaskList builds a list of tasks to execute a foreign key command on a
* shards of given list of distributed table.

View File

@ -24,6 +24,7 @@
#include "access/tupdesc.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_class.h"
@ -33,12 +34,14 @@
#include "commands/defrem.h"
#include "commands/extension.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/relay_utility.h"
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "nodes/nodeFuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "parser/parse_utilcmd.h"
#include "storage/lock.h"
#include "utils/acl.h"
#include "utils/array.h"
@ -587,6 +590,101 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId)
}
char *
deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid,
StringInfo buffer)
{
IndexStmt *indexStmt = copyObject(origStmt); /* copy to avoid modifications */
char *relationName = indexStmt->relation->relname;
char *indexName = indexStmt->idxname;
ListCell *indexParameterCell = NULL;
List *deparseContext = NULL;
/* extend relation and index name using shard identifier */
AppendShardIdToName(&relationName, shardid);
AppendShardIdToName(&indexName, shardid);
/* use extended shard name and transformed stmt for deparsing */
deparseContext = deparse_context_for(relationName, distrelid);
indexStmt = transformIndexStmt(distrelid, indexStmt, NULL);
appendStringInfo(buffer, "CREATE %s INDEX %s %s %s ON %s USING %s ",
(indexStmt->unique ? "UNIQUE" : ""),
(indexStmt->concurrent ? "CONCURRENTLY" : ""),
(indexStmt->if_not_exists ? "IF NOT EXISTS" : ""),
quote_identifier(indexName),
quote_qualified_identifier(indexStmt->relation->schemaname,
relationName),
indexStmt->accessMethod);
/* index column or expression list begins here */
appendStringInfoChar(buffer, '(');
foreach(indexParameterCell, indexStmt->indexParams)
{
IndexElem *indexElement = (IndexElem *) lfirst(indexParameterCell);
/* use commas to separate subsequent elements */
if (indexParameterCell != list_head(indexStmt->indexParams))
{
appendStringInfoChar(buffer, ',');
}
if (indexElement->name)
{
appendStringInfo(buffer, "%s ", quote_identifier(indexElement->name));
}
else if (indexElement->expr)
{
appendStringInfo(buffer, "(%s)", deparse_expression(indexElement->expr,
deparseContext, false,
false));
}
if (indexElement->collation != NIL)
{
appendStringInfo(buffer, "COLLATE %s ",
NameListToQuotedString(indexElement->collation));
}
if (indexElement->opclass != NIL)
{
appendStringInfo(buffer, "%s ",
NameListToQuotedString(indexElement->opclass));
}
if (indexElement->ordering != SORTBY_DEFAULT)
{
bool sortAsc = (indexElement->ordering == SORTBY_ASC);
appendStringInfo(buffer, "%s ", (sortAsc ? "ASC" : "DESC"));
}
if (indexElement->nulls_ordering != SORTBY_NULLS_DEFAULT)
{
bool nullsFirst = (indexElement->nulls_ordering == SORTBY_NULLS_FIRST);
appendStringInfo(buffer, "NULLS %s ", (nullsFirst ? "FIRST" : "LAST"));
}
}
appendStringInfoString(buffer, ") ");
if (indexStmt->options != NIL)
{
appendStringInfoString(buffer, "WITH ");
AppendOptionListToString(buffer, indexStmt->options);
}
if (indexStmt->whereClause != NULL)
{
appendStringInfo(buffer, "WHERE %s", deparse_expression(indexStmt->whereClause,
deparseContext, false,
false));
}
return buffer->data;
}
/*
* pg_get_indexclusterdef_string returns the definition of a cluster statement
* for given index. The function returns null if the table is not clustered on

View File

@ -32,6 +32,8 @@ extern char * pg_get_sequencedef_string(Oid sequenceRelid);
extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId);
extern char * pg_get_tableschemadef_string(Oid tableRelationId, bool forShardCreation);
extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId);
extern char * deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64
shardid, StringInfo buffer);
extern char * pg_get_indexclusterdef_string(Oid indexRelationId);
extern List * pg_get_table_grants(Oid relationId);