From 95d8d27c4fad73cce6ba8c32233a2b39f1afe201 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Tue, 21 Mar 2017 16:47:53 -0600 Subject: [PATCH] Change IndexStmt to generate worker DDL on master Because we can't execute CREATE INDEX CONCURRENTLY during transactions, worker_apply_shard_ddl_command is insufficient. --- .../distributed/executor/multi_utility.c | 57 ++++++++++- .../distributed/utils/citus_ruleutils.c | 98 +++++++++++++++++++ src/include/distributed/citus_ruleutils.h | 2 + 3 files changed, 156 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 745b440ef..c1f9a8027 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -134,6 +134,7 @@ static bool IsAlterTableRenameStmt(RenameStmt *renameStatement); static void ExecuteDistributedDDLJob(DDLJob *ddlJob); static void ShowNoticeIfNotUsing2PC(void); static List * DDLTaskList(Oid relationId, const char *commandString); +static List * IndexTaskList(Oid relationId, IndexStmt *indexStmt); static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId, const char *commandString); static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, @@ -678,7 +679,7 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; ddlJob->commandString = createIndexCommand; - ddlJob->taskList = DDLTaskList(relationId, createIndexCommand); + ddlJob->taskList = IndexTaskList(relationId, createIndexStatement); ddlJobs = list_make1(ddlJob); } @@ -2071,6 +2072,60 @@ DDLTaskList(Oid relationId, const char *commandString) } +/* + * DDLTaskList builds a list of tasks to execute a DDL command on a + * given list of shards. + */ +static List * +IndexTaskList(Oid relationId, IndexStmt *indexStmt) +{ + List *taskList = NIL; + List *shardIntervalList = LoadShardIntervalList(relationId); + ListCell *shardIntervalCell = NULL; + Oid schemaId = get_rel_namespace(relationId); + char *schemaName = get_namespace_name(schemaId); + StringInfoData ddlString; + uint64 jobId = INVALID_JOB_ID; + int taskId = 1; + + initStringInfo(&ddlString); + + /* set statement's schema name if it is not set already */ + if (indexStmt->relation->schemaname == NULL) + { + indexStmt->relation->schemaname = schemaName; + } + + /* lock metadata before getting placement lists */ + LockShardListMetadata(shardIntervalList, ShareLock); + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + uint64 shardId = shardInterval->shardId; + Task *task = NULL; + + deparse_shard_index_statement(indexStmt, relationId, shardId, &ddlString); + + task = CitusMakeNode(Task); + task->jobId = jobId; + task->taskId = taskId++; + task->taskType = DDL_TASK; + task->queryString = pstrdup(ddlString.data); + task->replicationModel = REPLICATION_MODEL_INVALID; + task->dependedTaskList = NULL; + task->anchorShardId = shardId; + task->taskPlacementList = FinalizedShardPlacementList(shardId); + + taskList = lappend(taskList, task); + + resetStringInfo(&ddlString); + } + + return taskList; +} + + /* * ForeignKeyTaskList builds a list of tasks to execute a foreign key command on a * shards of given list of distributed table. diff --git a/src/backend/distributed/utils/citus_ruleutils.c b/src/backend/distributed/utils/citus_ruleutils.c index 3342d3394..df2ac1929 100644 --- a/src/backend/distributed/utils/citus_ruleutils.c +++ b/src/backend/distributed/utils/citus_ruleutils.c @@ -24,6 +24,7 @@ #include "access/tupdesc.h" #include "catalog/dependency.h" #include "catalog/indexing.h" +#include "catalog/namespace.h" #include "catalog/pg_attribute.h" #include "catalog/pg_authid.h" #include "catalog/pg_class.h" @@ -33,12 +34,14 @@ #include "commands/defrem.h" #include "commands/extension.h" #include "distributed/citus_ruleutils.h" +#include "distributed/relay_utility.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "nodes/nodes.h" #include "nodes/nodeFuncs.h" #include "nodes/parsenodes.h" #include "nodes/pg_list.h" +#include "parser/parse_utilcmd.h" #include "storage/lock.h" #include "utils/acl.h" #include "utils/array.h" @@ -587,6 +590,101 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId) } +char * +deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, + StringInfo buffer) +{ + IndexStmt *indexStmt = copyObject(origStmt); /* copy to avoid modifications */ + char *relationName = indexStmt->relation->relname; + char *indexName = indexStmt->idxname; + ListCell *indexParameterCell = NULL; + List *deparseContext = NULL; + + /* extend relation and index name using shard identifier */ + AppendShardIdToName(&relationName, shardid); + AppendShardIdToName(&indexName, shardid); + + /* use extended shard name and transformed stmt for deparsing */ + deparseContext = deparse_context_for(relationName, distrelid); + indexStmt = transformIndexStmt(distrelid, indexStmt, NULL); + + appendStringInfo(buffer, "CREATE %s INDEX %s %s %s ON %s USING %s ", + (indexStmt->unique ? "UNIQUE" : ""), + (indexStmt->concurrent ? "CONCURRENTLY" : ""), + (indexStmt->if_not_exists ? "IF NOT EXISTS" : ""), + quote_identifier(indexName), + quote_qualified_identifier(indexStmt->relation->schemaname, + relationName), + indexStmt->accessMethod); + + /* index column or expression list begins here */ + appendStringInfoChar(buffer, '('); + + foreach(indexParameterCell, indexStmt->indexParams) + { + IndexElem *indexElement = (IndexElem *) lfirst(indexParameterCell); + + /* use commas to separate subsequent elements */ + if (indexParameterCell != list_head(indexStmt->indexParams)) + { + appendStringInfoChar(buffer, ','); + } + + if (indexElement->name) + { + appendStringInfo(buffer, "%s ", quote_identifier(indexElement->name)); + } + else if (indexElement->expr) + { + appendStringInfo(buffer, "(%s)", deparse_expression(indexElement->expr, + deparseContext, false, + false)); + } + + if (indexElement->collation != NIL) + { + appendStringInfo(buffer, "COLLATE %s ", + NameListToQuotedString(indexElement->collation)); + } + + if (indexElement->opclass != NIL) + { + appendStringInfo(buffer, "%s ", + NameListToQuotedString(indexElement->opclass)); + } + + if (indexElement->ordering != SORTBY_DEFAULT) + { + bool sortAsc = (indexElement->ordering == SORTBY_ASC); + appendStringInfo(buffer, "%s ", (sortAsc ? "ASC" : "DESC")); + } + + if (indexElement->nulls_ordering != SORTBY_NULLS_DEFAULT) + { + bool nullsFirst = (indexElement->nulls_ordering == SORTBY_NULLS_FIRST); + appendStringInfo(buffer, "NULLS %s ", (nullsFirst ? "FIRST" : "LAST")); + } + } + + appendStringInfoString(buffer, ") "); + + if (indexStmt->options != NIL) + { + appendStringInfoString(buffer, "WITH "); + AppendOptionListToString(buffer, indexStmt->options); + } + + if (indexStmt->whereClause != NULL) + { + appendStringInfo(buffer, "WHERE %s", deparse_expression(indexStmt->whereClause, + deparseContext, false, + false)); + } + + return buffer->data; +} + + /* * pg_get_indexclusterdef_string returns the definition of a cluster statement * for given index. The function returns null if the table is not clustered on diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index b64e1a0e8..a6f7d2b6d 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -32,6 +32,8 @@ extern char * pg_get_sequencedef_string(Oid sequenceRelid); extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId); extern char * pg_get_tableschemadef_string(Oid tableRelationId, bool forShardCreation); extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId); +extern char * deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 + shardid, StringInfo buffer); extern char * pg_get_indexclusterdef_string(Oid indexRelationId); extern List * pg_get_table_grants(Oid relationId);