From a02a2a90c7521cd26fc8510764382c1fc1995713 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Sat, 11 Mar 2017 13:21:28 -0700 Subject: [PATCH 1/8] Refactor ExecuteDistDDLCommand to expect struct Will let us separate out the determination of what to execute from its actual execution. --- .../distributed/executor/multi_utility.c | 110 +++++++----------- src/include/distributed/multi_utility.h | 7 ++ 2 files changed, 47 insertions(+), 70 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index b0c980dbe..0c2a5a49d 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -132,11 +132,7 @@ static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement); /* Local functions forward declarations for helper functions */ static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort); static bool IsAlterTableRenameStmt(RenameStmt *renameStatement); -static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, - bool isTopLevel); -static void ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId, - const char *ddlCommandString, - bool isTopLevel); +static void ExecuteDistributedDDLJob(DDLJob *ddlJob); static void ShowNoticeIfNotUsing2PC(void); static List * DDLTaskList(Oid relationId, const char *commandString); static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId, @@ -673,7 +669,12 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand /* if index does not exist, send the command to workers */ if (!OidIsValid(indexRelationId)) { - ExecuteDistributedDDLCommand(relationId, createIndexCommand, isTopLevel); + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ddlJob->targetRelationId = relationId; + ddlJob->commandString = createIndexCommand; + ddlJob->taskList = DDLTaskList(relationId, createIndexCommand); + + ExecuteDistributedDDLJob(ddlJob); } else if (!createIndexStatement->if_not_exists) { @@ -764,10 +765,16 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, if (OidIsValid(distributedIndexId)) { + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); /* if it is supported, go ahead and execute the command */ - ExecuteDistributedDDLCommand(distributedRelationId, dropIndexCommand, isTopLevel); + ddlJob->targetRelationId = distributedRelationId; + ddlJob->commandString = dropIndexCommand; + ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand); + + ExecuteDistributedDDLJob(ddlJob); } return (Node *) dropIndexStatement; @@ -856,12 +863,22 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl if (rightRelationId) { - ExecuteDistributedForeignKeyCommand(leftRelationId, rightRelationId, - alterTableCommand, isTopLevel); + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ddlJob->targetRelationId = leftRelationId; + ddlJob->commandString = alterTableCommand; + ddlJob->taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, + alterTableCommand); + + ExecuteDistributedDDLJob(ddlJob); } else { - ExecuteDistributedDDLCommand(leftRelationId, alterTableCommand, isTopLevel); + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ddlJob->targetRelationId = leftRelationId; + ddlJob->commandString = alterTableCommand; + ddlJob->taskList = DDLTaskList(leftRelationId, alterTableCommand); + + ExecuteDistributedDDLJob(ddlJob); } return (Node *) alterTableStatement; @@ -1951,19 +1968,17 @@ IsAlterTableRenameStmt(RenameStmt *renameStmt) /* - * ExecuteDistributedDDLCommand applies a given DDL command to the given - * distributed table in a distributed transaction. If the multi shard commit protocol is + * ExecuteDistributedDDLJob simply executes a provided DDLJob in a distributed + * transaction, including metadata sync if needed. If the multi shard commit protocol is * in its default value of '1pc', then a notice message indicating that '2pc' might be * used for extra safety. In the commit protocol, a BEGIN is sent after connection to * each shard placement and COMMIT/ROLLBACK is handled by * CompleteShardPlacementTransactions function. */ static void -ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, - bool isTopLevel) +ExecuteDistributedDDLJob(DDLJob *ddlJob) { - List *taskList = NIL; - bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId); + bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId); if (XactModificationLevel == XACT_MODIFICATION_DATA) { @@ -1979,60 +1994,10 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, if (shouldSyncMetadata) { SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); - SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString); + SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString); } - taskList = DDLTaskList(relationId, ddlCommandString); - - ExecuteModifyTasksWithoutResults(taskList); -} - - -/* - * ExecuteDistributedForeignKeyCommand applies a given foreign key command to the given - * distributed table in a distributed transaction. If the multi shard commit protocol is - * in its default value of '1pc', then a notice message indicating that '2pc' might be - * used for extra safety. In the commit protocol, a BEGIN is sent after connection to - * each shard placement and COMMIT/ROLLBACK is handled by - * CompleteShardPlacementTransactions function. - * - * leftRelationId is the relation id of actual distributed table which given foreign key - * command is applied. rightRelationId is the relation id of distributed table which - * foreign key refers to. - */ -static void -ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId, - const char *ddlCommandString, bool isTopLevel) -{ - List *taskList = NIL; - bool shouldSyncMetadata = false; - - if (XactModificationLevel == XACT_MODIFICATION_DATA) - { - ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("distributed DDL commands must not appear within " - "transaction blocks containing single-shard data " - "modifications"))); - } - - EnsureCoordinator(); - ShowNoticeIfNotUsing2PC(); - - /* - * It is sufficient to check only one of the tables for metadata syncing on workers, - * since the colocation of two tables implies that either both or none of them have - * metadata on workers. - */ - shouldSyncMetadata = ShouldSyncTableMetadata(leftRelationId); - if (shouldSyncMetadata) - { - SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); - SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString); - } - - taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, ddlCommandString); - - ExecuteModifyTasksWithoutResults(taskList); + ExecuteModifyTasksWithoutResults(ddlJob->taskList); } @@ -2409,6 +2374,7 @@ ReplicateGrantStmt(Node *parsetree) ListCell *granteeCell = NULL; ListCell *objectCell = NULL; bool isFirst = true; + DDLJob *ddlJob = palloc(sizeof(DDLJob)); initStringInfo(&privsString); initStringInfo(&granteesString); @@ -2493,7 +2459,6 @@ ReplicateGrantStmt(Node *parsetree) RangeVar *relvar = (RangeVar *) lfirst(objectCell); Oid relOid = RangeVarGetRelid(relvar, NoLock, false); const char *grantOption = ""; - bool isTopLevel = true; if (!IsDistributedTable(relOid)) { @@ -2526,7 +2491,12 @@ ReplicateGrantStmt(Node *parsetree) granteesString.data); } - ExecuteDistributedDDLCommand(relOid, ddlString.data, isTopLevel); + ddlJob->targetRelationId = relOid; + ddlJob->commandString = ddlString.data; + ddlJob->taskList = DDLTaskList(relOid, ddlString.data); + + ExecuteDistributedDDLJob(ddlJob); + resetStringInfo(&ddlString); } } diff --git a/src/include/distributed/multi_utility.h b/src/include/distributed/multi_utility.h index 2b3a7cdf1..6a4d7b42c 100644 --- a/src/include/distributed/multi_utility.h +++ b/src/include/distributed/multi_utility.h @@ -14,6 +14,13 @@ extern bool EnableDDLPropagation; +typedef struct DDLJob +{ + Oid targetRelationId; + const char *commandString; + List *taskList; +} DDLJob; + extern void multi_ProcessUtility(Node *parsetree, const char *queryString, ProcessUtilityContext context, ParamListInfo params, DestReceiver *dest, char *completionTag); From a64165767d39abd9eead39f3702048666cc8d9b4 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Sat, 11 Mar 2017 13:29:47 -0700 Subject: [PATCH 2/8] Rename Process*Stmt functions to Plan*Stmt To reflect their new purpose planning a DDLJob rather than fully processing a distributed DDL statement. --- .../distributed/executor/multi_utility.c | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 0c2a5a49d..93e76f18a 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -101,15 +101,15 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement); /* Local functions forward declarations for processing distributed table commands */ static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner); -static Node * ProcessIndexStmt(IndexStmt *createIndexStatement, +static Node * PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, bool isTopLevel); -static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement, +static Node * PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, bool isTopLevel); -static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, +static Node * PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand, bool isTopLevel); static Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand); -static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, +static Node * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, const char *alterObjectSchemaCommand, bool isTopLevel); static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand); @@ -257,7 +257,7 @@ multi_ProcessUtility(Node *parsetree, if (IsA(parsetree, IndexStmt)) { - parsetree = ProcessIndexStmt((IndexStmt *) parsetree, queryString, + parsetree = PlanIndexStmt((IndexStmt *) parsetree, queryString, isTopLevel); } @@ -266,7 +266,7 @@ multi_ProcessUtility(Node *parsetree, DropStmt *dropStatement = (DropStmt *) parsetree; if (dropStatement->removeType == OBJECT_INDEX) { - parsetree = ProcessDropIndexStmt(dropStatement, queryString, isTopLevel); + parsetree = PlanDropIndexStmt(dropStatement, queryString, isTopLevel); } } @@ -275,7 +275,7 @@ multi_ProcessUtility(Node *parsetree, AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree; if (alterTableStmt->relkind == OBJECT_TABLE) { - parsetree = ProcessAlterTableStmt(alterTableStmt, queryString, + parsetree = PlanAlterTableStmt(alterTableStmt, queryString, isTopLevel); } } @@ -300,7 +300,7 @@ multi_ProcessUtility(Node *parsetree, if (IsA(parsetree, AlterObjectSchemaStmt)) { AlterObjectSchemaStmt *setSchemaStmt = (AlterObjectSchemaStmt *) parsetree; - parsetree = ProcessAlterObjectSchemaStmt(setSchemaStmt, queryString, + parsetree = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString, isTopLevel); } @@ -604,7 +604,7 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR /* - * ProcessIndexStmt processes create index statements for distributed tables. + * PlanIndexStmt processes create index statements for distributed tables. * The function first checks if the statement belongs to a distributed table * or not. If it does, then it executes distributed logic for the command. * @@ -612,7 +612,7 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR * master node table. */ static Node * -ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, +PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, bool isTopLevel) { /* @@ -690,7 +690,7 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand /* - * ProcessDropIndexStmt processes drop index statements for distributed tables. + * PlanDropIndexStmt processes drop index statements for distributed tables. * The function first checks if the statement belongs to a distributed table * or not. If it does, then it executes distributed logic for the command. * @@ -698,7 +698,7 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand * master node table. */ static Node * -ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, +PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, bool isTopLevel) { ListCell *dropObjectCell = NULL; @@ -782,7 +782,7 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, /* - * ProcessAlterTableStmt processes alter table statements for distributed tables. + * PlanAlterTableStmt processes alter table statements for distributed tables. * The function first checks if the statement belongs to a distributed table * or not. If it does, then it executes distributed logic for the command. * @@ -790,7 +790,7 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, * master node table. */ static Node * -ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand, +PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand, bool isTopLevel) { LOCKMODE lockmode = 0; @@ -948,13 +948,13 @@ WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, /* - * ProcessAlterObjectSchemaStmt processes ALTER ... SET SCHEMA statements for distributed + * PlanAlterObjectSchemaStmt processes ALTER ... SET SCHEMA statements for distributed * objects. The function first checks if the statement belongs to a distributed objects * or not. If it does, then it checks whether given object is a table. If it is, we warn * out, since we do not support ALTER ... SET SCHEMA */ static Node * -ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, +PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, const char *alterObjectSchemaCommand, bool isTopLevel) { Oid relationId = InvalidOid; From 419a4c3745260374ed6d9abaaeb72fdafebd2ca7 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Sat, 11 Mar 2017 13:45:31 -0700 Subject: [PATCH 3/8] Remove execution from stmt-specific util functions Now have a single Execute call in the main body. --- .../distributed/executor/multi_utility.c | 98 +++++++++---------- 1 file changed, 45 insertions(+), 53 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 93e76f18a..03f5e1349 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -101,17 +101,16 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement); /* Local functions forward declarations for processing distributed table commands */ static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner); -static Node * PlanIndexStmt(IndexStmt *createIndexStatement, - const char *createIndexCommand, bool isTopLevel); -static Node * PlanDropIndexStmt(DropStmt *dropIndexStatement, - const char *dropIndexCommand, bool isTopLevel); -static Node * PlanAlterTableStmt(AlterTableStmt *alterTableStatement, - const char *alterTableCommand, bool isTopLevel); +static DDLJob * PlanIndexStmt(IndexStmt *createIndexStatement, + const char *createIndexCommand); +static DDLJob * PlanDropIndexStmt(DropStmt *dropIndexStatement, + const char *dropIndexCommand); +static DDLJob * PlanAlterTableStmt(AlterTableStmt *alterTableStatement, + const char *alterTableCommand); static Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand); -static Node * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, - const char *alterObjectSchemaCommand, - bool isTopLevel); +static DDLJob * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, + const char *alterObjectSchemaCommand); static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand); static bool IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumStmt); static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt); @@ -253,12 +252,11 @@ multi_ProcessUtility(Node *parsetree, */ if (EnableDDLPropagation) { - bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL); + DDLJob *ddlJob = NULL; if (IsA(parsetree, IndexStmt)) { - parsetree = PlanIndexStmt((IndexStmt *) parsetree, queryString, - isTopLevel); + ddlJob = PlanIndexStmt((IndexStmt *) parsetree, queryString); } if (IsA(parsetree, DropStmt)) @@ -266,7 +264,7 @@ multi_ProcessUtility(Node *parsetree, DropStmt *dropStatement = (DropStmt *) parsetree; if (dropStatement->removeType == OBJECT_INDEX) { - parsetree = PlanDropIndexStmt(dropStatement, queryString, isTopLevel); + ddlJob = PlanDropIndexStmt(dropStatement, queryString); } } @@ -275,8 +273,7 @@ multi_ProcessUtility(Node *parsetree, AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree; if (alterTableStmt->relkind == OBJECT_TABLE) { - parsetree = PlanAlterTableStmt(alterTableStmt, queryString, - isTopLevel); + ddlJob = PlanAlterTableStmt(alterTableStmt, queryString); } } @@ -300,8 +297,7 @@ multi_ProcessUtility(Node *parsetree, if (IsA(parsetree, AlterObjectSchemaStmt)) { AlterObjectSchemaStmt *setSchemaStmt = (AlterObjectSchemaStmt *) parsetree; - parsetree = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString, - isTopLevel); + ddlJob = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString); } /* @@ -316,6 +312,11 @@ multi_ProcessUtility(Node *parsetree, errhint("Connect to worker nodes directly to manually " "move all tables."))); } + + if (ddlJob != NULL) + { + ExecuteDistributedDDLJob(ddlJob); + } } else { @@ -611,10 +612,11 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR * The function returns the IndexStmt node for the command to be executed on the * master node table. */ -static Node * -PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, - bool isTopLevel) +static DDLJob * +PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) { + DDLJob *ddlJob = NULL; + /* * We first check whether a distributed relation is affected. For that, we need to * open the relation. To prevent race conditions with later lookups, lock the table, @@ -669,12 +671,10 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, /* if index does not exist, send the command to workers */ if (!OidIsValid(indexRelationId)) { - DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; ddlJob->commandString = createIndexCommand; ddlJob->taskList = DDLTaskList(relationId, createIndexCommand); - - ExecuteDistributedDDLJob(ddlJob); } else if (!createIndexStatement->if_not_exists) { @@ -685,7 +685,7 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, } } - return (Node *) createIndexStatement; + return ddlJob; } @@ -697,10 +697,10 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, * The function returns the DropStmt node for the command to be executed on the * master node table. */ -static Node * -PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, - bool isTopLevel) +static DDLJob * +PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) { + DDLJob *ddlJob = NULL; ListCell *dropObjectCell = NULL; Oid distributedIndexId = InvalidOid; Oid distributedRelationId = InvalidOid; @@ -765,7 +765,7 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, if (OidIsValid(distributedIndexId)) { - DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ddlJob = palloc0(sizeof(DDLJob)); ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); @@ -773,11 +773,9 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, ddlJob->targetRelationId = distributedRelationId; ddlJob->commandString = dropIndexCommand; ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand); - - ExecuteDistributedDDLJob(ddlJob); } - return (Node *) dropIndexStatement; + return ddlJob; } @@ -789,10 +787,10 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, * The function returns the AlterTableStmt node for the command to be executed on the * master node table. */ -static Node * -PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand, - bool isTopLevel) +static DDLJob * +PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand) { + DDLJob *ddlJob = NULL; LOCKMODE lockmode = 0; Oid leftRelationId = InvalidOid; Oid rightRelationId = InvalidOid; @@ -803,20 +801,20 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo /* first check whether a distributed relation is affected */ if (alterTableStatement->relation == NULL) { - return (Node *) alterTableStatement; + return NULL; } lockmode = AlterTableGetLockLevel(alterTableStatement->cmds); leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode); if (!OidIsValid(leftRelationId)) { - return (Node *) alterTableStatement; + return NULL; } isDistributedRelation = IsDistributedTable(leftRelationId); if (!isDistributedRelation) { - return (Node *) alterTableStatement; + return NULL; } ErrorIfUnsupportedAlterTableStmt(alterTableStatement); @@ -861,27 +859,21 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo } } + ddlJob = palloc0(sizeof(DDLJob)); + ddlJob->targetRelationId = leftRelationId; + ddlJob->commandString = alterTableCommand; + if (rightRelationId) { - DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = leftRelationId; - ddlJob->commandString = alterTableCommand; ddlJob->taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, alterTableCommand); - - ExecuteDistributedDDLJob(ddlJob); } else { - DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = leftRelationId; - ddlJob->commandString = alterTableCommand; ddlJob->taskList = DDLTaskList(leftRelationId, alterTableCommand); - - ExecuteDistributedDDLJob(ddlJob); } - return (Node *) alterTableStatement; + return ddlJob; } @@ -953,16 +945,16 @@ WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, * or not. If it does, then it checks whether given object is a table. If it is, we warn * out, since we do not support ALTER ... SET SCHEMA */ -static Node * +static DDLJob * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, - const char *alterObjectSchemaCommand, bool isTopLevel) + const char *alterObjectSchemaCommand) { Oid relationId = InvalidOid; bool noWait = false; if (alterObjectSchemaStmt->relation == NULL) { - return (Node *) alterObjectSchemaStmt; + return NULL; } relationId = RangeVarGetRelidExtended(alterObjectSchemaStmt->relation, @@ -973,7 +965,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, /* first check whether a distributed relation is affected */ if (!OidIsValid(relationId) || !IsDistributedTable(relationId)) { - return (Node *) alterObjectSchemaStmt; + return NULL; } /* warn out if a distributed relation is affected */ @@ -982,7 +974,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, errhint("Connect to worker nodes directly to manually " "change schemas of affected objects."))); - return (Node *) alterObjectSchemaStmt; + return NULL; } From f181b248599afb3d37dfd27a1d6d1a53b0ca9f22 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Sat, 11 Mar 2017 16:58:41 -0700 Subject: [PATCH 4/8] Move worker execution to after master, fix tests Some tests relied on worker errors though local commands were invalid. Fixed those by ensuring preconditions were met to have command work correctly. Otherwise most test changes are related to slight changes in local/remote error ordering. --- src/backend/distributed/executor/multi_utility.c | 13 ++++++------- .../regress/expected/multi_index_statements.out | 6 ++---- .../expected/multi_join_order_additional.out | 2 +- src/test/regress/expected/multi_schema_support.out | 4 ++-- .../multi_unsupported_worker_operations.out | 12 +++++++++++- .../output/multi_alter_table_statements.source | 7 +++---- .../sql/multi_unsupported_worker_operations.sql | 4 +++- 7 files changed, 28 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 03f5e1349..b87809859 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -168,6 +168,7 @@ multi_ProcessUtility(Node *parsetree, bool commandMustRunAsOwner = false; Oid savedUserId = InvalidOid; int savedSecurityContext = 0; + DDLJob *ddlJob = NULL; if (IsA(parsetree, TransactionStmt)) { @@ -252,8 +253,6 @@ multi_ProcessUtility(Node *parsetree, */ if (EnableDDLPropagation) { - DDLJob *ddlJob = NULL; - if (IsA(parsetree, IndexStmt)) { ddlJob = PlanIndexStmt((IndexStmt *) parsetree, queryString); @@ -312,11 +311,6 @@ multi_ProcessUtility(Node *parsetree, errhint("Connect to worker nodes directly to manually " "move all tables."))); } - - if (ddlJob != NULL) - { - ExecuteDistributedDDLJob(ddlJob); - } } else { @@ -381,6 +375,11 @@ multi_ProcessUtility(Node *parsetree, SetUserIdAndSecContext(savedUserId, savedSecurityContext); } + if (ddlJob != NULL) + { + ExecuteDistributedDDLJob(ddlJob); + } + /* we run VacuumStmt after standard hook to benefit from its checks and locking */ if (IsA(parsetree, VacuumStmt)) { diff --git a/src/test/regress/expected/multi_index_statements.out b/src/test/regress/expected/multi_index_statements.out index 100ce2d05..d8c1f58ed 100644 --- a/src/test/regress/expected/multi_index_statements.out +++ b/src/test/regress/expected/multi_index_statements.out @@ -163,14 +163,10 @@ ERROR: creating unique indexes on append-partitioned tables is currently unsupp CREATE INDEX lineitem_orderkey_index ON lineitem (l_orderkey); ERROR: relation "lineitem_orderkey_index" already exists CREATE INDEX try_index ON lineitem USING gist (l_orderkey); -NOTICE: using one-phase commit for distributed DDL commands -HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' ERROR: data type bigint has no default operator class for access method "gist" HINT: You must specify an operator class for the index or define a default operator class for the data type. -CONTEXT: while executing command on localhost:57638 CREATE INDEX try_index ON lineitem (non_existent_column); ERROR: column "non_existent_column" does not exist -CONTEXT: while executing command on localhost:57638 CREATE INDEX ON lineitem (l_orderkey); ERROR: creating index without a name on a distributed table is currently unsupported -- Verify that none of failed indexes got created on the master node @@ -205,6 +201,8 @@ DROP INDEX CONCURRENTLY lineitem_orderkey_index; ERROR: dropping indexes concurrently on distributed tables is currently unsupported -- Verify that we can succesfully drop indexes DROP INDEX lineitem_orderkey_index; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' DROP INDEX lineitem_orderkey_index_new; DROP INDEX lineitem_partkey_desc_index; DROP INDEX lineitem_partial_index; diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index da8ddd2f5..1d96eb681 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -43,9 +43,9 @@ SELECT master_create_worker_shards('lineitem_hash', 2, 1); (1 row) CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate); +DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash" NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' -DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash" CREATE TABLE orders_hash ( o_orderkey bigint not null, o_custkey integer not null, diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index b476242b3..1cdc897ab 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -610,9 +610,9 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh \c - - - :master_port ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS non_existent_column; +NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' -NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS new_col; -- verify column is dropped \d test_schema_support.nation_hash; @@ -665,9 +665,9 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh \c - - - :master_port SET search_path TO test_schema_support; ALTER TABLE nation_hash DROP COLUMN IF EXISTS non_existent_column; +NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' -NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping ALTER TABLE nation_hash DROP COLUMN IF EXISTS new_col; -- verify column is dropped \d test_schema_support.nation_hash; diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index a5165c40a..41a1474ee 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -140,6 +140,9 @@ SELECT * FROM mx_ref_table ORDER BY col_1; \c - - - :master_port DROP TABLE mx_ref_table; +CREATE UNIQUE INDEX mx_test_uniq_index ON mx_table(col_1); +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' \c - - - :worker_1_port -- DDL commands \d mx_table @@ -149,8 +152,10 @@ DROP TABLE mx_ref_table; col_1 | integer | col_2 | text | col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) +Indexes: + "mx_test_uniq_index" UNIQUE, btree (col_1) -CREATE INDEX mx_test_index ON mx_table(col_1); +CREATE INDEX mx_test_index ON mx_table(col_2); ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. ALTER TABLE mx_table ADD COLUMN col_4 int; @@ -166,6 +171,8 @@ HINT: Connect to the coordinator and run it again. col_1 | integer | col_2 | text | col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) +Indexes: + "mx_test_uniq_index" UNIQUE, btree (col_1) -- master_modify_multiple_shards SELECT master_modify_multiple_shards('UPDATE mx_table SET col_2=''none'''); @@ -223,6 +230,9 @@ SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; -- master_remove_node \c - - - :master_port +DROP INDEX mx_test_uniq_index; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' SELECT master_add_node('localhost', 5432); master_add_node -------------------------------------------- diff --git a/src/test/regress/output/multi_alter_table_statements.source b/src/test/regress/output/multi_alter_table_statements.source index b16ab3310..4f9395709 100644 --- a/src/test/regress/output/multi_alter_table_statements.source +++ b/src/test/regress/output/multi_alter_table_statements.source @@ -261,8 +261,7 @@ ALTER TABLE IF EXISTS non_existent_table ADD COLUMN new_column INTEGER; NOTICE: relation "non_existent_table" does not exist, skipping ALTER TABLE IF EXISTS lineitem_alter ALTER COLUMN int_column2 SET DATA TYPE INTEGER; ALTER TABLE lineitem_alter DROP COLUMN non_existent_column; -ERROR: column "non_existent_column" of relation "lineitem_alter_220000" does not exist -CONTEXT: while executing command on localhost:57638 +ERROR: column "non_existent_column" of relation "lineitem_alter" does not exist ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS non_existent_column; NOTICE: column "non_existent_column" of relation "lineitem_alter" does not exist, skipping ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS int_column2; @@ -360,13 +359,13 @@ DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CON -- types ALTER TABLE lineitem_alter ADD COLUMN new_column non_existent_type; ERROR: type "non_existent_type" does not exist -CONTEXT: while executing command on localhost:57638 +LINE 1: ALTER TABLE lineitem_alter ADD COLUMN new_column non_existen... + ^ ALTER TABLE lineitem_alter ALTER COLUMN null_column SET NOT NULL; ERROR: column "null_column" contains null values CONTEXT: while executing command on localhost:57638 ALTER TABLE lineitem_alter ALTER COLUMN l_partkey SET DEFAULT 'a'; ERROR: invalid input syntax for integer: "a" -CONTEXT: while executing command on localhost:57638 -- Verify that we error out on statements involving RENAME ALTER TABLE lineitem_alter RENAME TO lineitem_renamed; ERROR: renaming distributed tables or their objects is currently unsupported diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index e890354b1..3860e80e0 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -91,11 +91,12 @@ SELECT * FROM mx_ref_table ORDER BY col_1; \c - - - :master_port DROP TABLE mx_ref_table; +CREATE UNIQUE INDEX mx_test_uniq_index ON mx_table(col_1); \c - - - :worker_1_port -- DDL commands \d mx_table -CREATE INDEX mx_test_index ON mx_table(col_1); +CREATE INDEX mx_test_index ON mx_table(col_2); ALTER TABLE mx_table ADD COLUMN col_4 int; ALTER TABLE mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col_1) REFERENCES mx_table(col_1); \d mx_table @@ -122,6 +123,7 @@ SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; -- master_remove_node \c - - - :master_port +DROP INDEX mx_test_uniq_index; SELECT master_add_node('localhost', 5432); \c - - - :worker_1_port From 42c799faee022165da0a338fec785acf1155295b Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Mon, 13 Mar 2017 15:24:12 -0600 Subject: [PATCH 5/8] Fix MX tests Missed some of these. One had a bad DDL statement to begin with (mixed up column type and column name) and other was just master/worker order. --- src/test/regress/expected/multi_mx_ddl.out | 2 +- src/test/regress/expected/multi_mx_modifying_xacts.out | 2 +- src/test/regress/sql/multi_mx_modifying_xacts.sql | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/regress/expected/multi_mx_ddl.out b/src/test/regress/expected/multi_mx_ddl.out index 132ed3e72..df9d775ae 100644 --- a/src/test/regress/expected/multi_mx_ddl.out +++ b/src/test/regress/expected/multi_mx_ddl.out @@ -142,9 +142,9 @@ Indexes: \c - - - :master_port SET client_min_messages TO debug2; CREATE INDEX ddl_test_index ON mx_ddl_table(value); +DEBUG: building index "ddl_test_index" on table "mx_ddl_table" NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' -DEBUG: building index "ddl_test_index" on table "mx_ddl_table" RESET client_min_messages; DROP INDEX ddl_test_index; -- show that sequences owned by mx tables result in unique values diff --git a/src/test/regress/expected/multi_mx_modifying_xacts.out b/src/test/regress/expected/multi_mx_modifying_xacts.out index cf45bb9e0..afac72f54 100644 --- a/src/test/regress/expected/multi_mx_modifying_xacts.out +++ b/src/test/regress/expected/multi_mx_modifying_xacts.out @@ -183,7 +183,7 @@ ABORT; -- applies to DDL BEGIN; INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); -ALTER TABLE labs_mx ADD COLUMN text motto; +ALTER TABLE labs_mx ADD COLUMN motto text; ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications COMMIT; -- doesn't apply to COPY after modifications diff --git a/src/test/regress/sql/multi_mx_modifying_xacts.sql b/src/test/regress/sql/multi_mx_modifying_xacts.sql index aaf25c722..b7b482095 100644 --- a/src/test/regress/sql/multi_mx_modifying_xacts.sql +++ b/src/test/regress/sql/multi_mx_modifying_xacts.sql @@ -154,7 +154,7 @@ ABORT; -- applies to DDL BEGIN; INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); -ALTER TABLE labs_mx ADD COLUMN text motto; +ALTER TABLE labs_mx ADD COLUMN motto text; COMMIT; -- doesn't apply to COPY after modifications From 23f5e4282dd10adf7291fa38a5060f1916744885 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Mon, 13 Mar 2017 17:18:02 -0600 Subject: [PATCH 6/8] Change DDLJob usage to be wrapped in lists To prepare for GRANT fixes. --- .../distributed/executor/multi_utility.c | 77 +++++++++++-------- 1 file changed, 45 insertions(+), 32 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index b87809859..dc4c78a05 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -101,16 +101,16 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement); /* Local functions forward declarations for processing distributed table commands */ static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner); -static DDLJob * PlanIndexStmt(IndexStmt *createIndexStatement, - const char *createIndexCommand); -static DDLJob * PlanDropIndexStmt(DropStmt *dropIndexStatement, - const char *dropIndexCommand); -static DDLJob * PlanAlterTableStmt(AlterTableStmt *alterTableStatement, - const char *alterTableCommand); +static List * PlanIndexStmt(IndexStmt *createIndexStatement, + const char *createIndexCommand); +static List * PlanDropIndexStmt(DropStmt *dropIndexStatement, + const char *dropIndexCommand); +static List * PlanAlterTableStmt(AlterTableStmt *alterTableStatement, + const char *alterTableCommand); static Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand); -static DDLJob * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, - const char *alterObjectSchemaCommand); +static List * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, + const char *alterObjectSchemaCommand); static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand); static bool IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumStmt); static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt); @@ -168,7 +168,7 @@ multi_ProcessUtility(Node *parsetree, bool commandMustRunAsOwner = false; Oid savedUserId = InvalidOid; int savedSecurityContext = 0; - DDLJob *ddlJob = NULL; + List *ddlJobs = NIL; if (IsA(parsetree, TransactionStmt)) { @@ -255,7 +255,7 @@ multi_ProcessUtility(Node *parsetree, { if (IsA(parsetree, IndexStmt)) { - ddlJob = PlanIndexStmt((IndexStmt *) parsetree, queryString); + ddlJobs = PlanIndexStmt((IndexStmt *) parsetree, queryString); } if (IsA(parsetree, DropStmt)) @@ -263,7 +263,7 @@ multi_ProcessUtility(Node *parsetree, DropStmt *dropStatement = (DropStmt *) parsetree; if (dropStatement->removeType == OBJECT_INDEX) { - ddlJob = PlanDropIndexStmt(dropStatement, queryString); + ddlJobs = PlanDropIndexStmt(dropStatement, queryString); } } @@ -272,7 +272,7 @@ multi_ProcessUtility(Node *parsetree, AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree; if (alterTableStmt->relkind == OBJECT_TABLE) { - ddlJob = PlanAlterTableStmt(alterTableStmt, queryString); + ddlJobs = PlanAlterTableStmt(alterTableStmt, queryString); } } @@ -296,7 +296,7 @@ multi_ProcessUtility(Node *parsetree, if (IsA(parsetree, AlterObjectSchemaStmt)) { AlterObjectSchemaStmt *setSchemaStmt = (AlterObjectSchemaStmt *) parsetree; - ddlJob = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString); + ddlJobs = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString); } /* @@ -375,9 +375,15 @@ multi_ProcessUtility(Node *parsetree, SetUserIdAndSecContext(savedUserId, savedSecurityContext); } - if (ddlJob != NULL) + if (ddlJobs != NIL) { - ExecuteDistributedDDLJob(ddlJob); + ListCell *ddlJobCell = NULL; + + foreach(ddlJobCell, ddlJobs) + { + DDLJob *ddlJob = (DDLJob *) lfirst(ddlJobCell); + ExecuteDistributedDDLJob(ddlJob); + } } /* we run VacuumStmt after standard hook to benefit from its checks and locking */ @@ -611,10 +617,10 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR * The function returns the IndexStmt node for the command to be executed on the * master node table. */ -static DDLJob * +static List * PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) { - DDLJob *ddlJob = NULL; + List *ddlJobs = NIL; /* * We first check whether a distributed relation is affected. For that, we need to @@ -670,10 +676,12 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) /* if index does not exist, send the command to workers */ if (!OidIsValid(indexRelationId)) { - ddlJob = palloc0(sizeof(DDLJob)); + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; ddlJob->commandString = createIndexCommand; ddlJob->taskList = DDLTaskList(relationId, createIndexCommand); + + ddlJobs = list_make1(ddlJob); } else if (!createIndexStatement->if_not_exists) { @@ -684,7 +692,7 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) } } - return ddlJob; + return ddlJobs; } @@ -696,10 +704,10 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) * The function returns the DropStmt node for the command to be executed on the * master node table. */ -static DDLJob * +static List * PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) { - DDLJob *ddlJob = NULL; + List *ddlJobs = NIL; ListCell *dropObjectCell = NULL; Oid distributedIndexId = InvalidOid; Oid distributedRelationId = InvalidOid; @@ -764,7 +772,7 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) if (OidIsValid(distributedIndexId)) { - ddlJob = palloc0(sizeof(DDLJob)); + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); @@ -772,9 +780,11 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) ddlJob->targetRelationId = distributedRelationId; ddlJob->commandString = dropIndexCommand; ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand); + + ddlJobs = list_make1(ddlJob); } - return ddlJob; + return ddlJobs; } @@ -786,9 +796,10 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) * The function returns the AlterTableStmt node for the command to be executed on the * master node table. */ -static DDLJob * +static List * PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand) { + List *ddlJobs = NIL; DDLJob *ddlJob = NULL; LOCKMODE lockmode = 0; Oid leftRelationId = InvalidOid; @@ -800,20 +811,20 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo /* first check whether a distributed relation is affected */ if (alterTableStatement->relation == NULL) { - return NULL; + return NIL; } lockmode = AlterTableGetLockLevel(alterTableStatement->cmds); leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode); if (!OidIsValid(leftRelationId)) { - return NULL; + return NIL; } isDistributedRelation = IsDistributedTable(leftRelationId); if (!isDistributedRelation) { - return NULL; + return NIL; } ErrorIfUnsupportedAlterTableStmt(alterTableStatement); @@ -872,7 +883,9 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo ddlJob->taskList = DDLTaskList(leftRelationId, alterTableCommand); } - return ddlJob; + ddlJobs = list_make1(ddlJob); + + return ddlJobs; } @@ -944,7 +957,7 @@ WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, * or not. If it does, then it checks whether given object is a table. If it is, we warn * out, since we do not support ALTER ... SET SCHEMA */ -static DDLJob * +static List * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, const char *alterObjectSchemaCommand) { @@ -953,7 +966,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, if (alterObjectSchemaStmt->relation == NULL) { - return NULL; + return NIL; } relationId = RangeVarGetRelidExtended(alterObjectSchemaStmt->relation, @@ -964,7 +977,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, /* first check whether a distributed relation is affected */ if (!OidIsValid(relationId) || !IsDistributedTable(relationId)) { - return NULL; + return NIL; } /* warn out if a distributed relation is affected */ @@ -973,7 +986,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, errhint("Connect to worker nodes directly to manually " "change schemas of affected objects."))); - return NULL; + return NIL; } From d95b5bbad35b70e2274b5eafca5ee6f7cfee7a7e Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Wed, 15 Mar 2017 19:07:26 -0600 Subject: [PATCH 7/8] Rework ReplicateGrantStmt to use new flow This was the impetus for the previous commit that changed from using a DDLJob * to a List * of them. --- .../distributed/executor/multi_utility.c | 20 +++++++++++-------- src/include/distributed/multi_utility.h | 2 +- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index dc4c78a05..5037e754e 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -382,6 +382,7 @@ multi_ProcessUtility(Node *parsetree, foreach(ddlJobCell, ddlJobs) { DDLJob *ddlJob = (DDLJob *) lfirst(ddlJobCell); + ExecuteDistributedDDLJob(ddlJob); } } @@ -2362,15 +2363,14 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) /* - * ReplicateGrantStmt replicates GRANT/REVOKE command to worker nodes if the + * PlanGrantStmt replicates GRANT/REVOKE command to worker nodes if the * the statement affects distributed tables. * * NB: So far column level privileges are not supported. */ -void -ReplicateGrantStmt(Node *parsetree) +List * +PlanGrantStmt(GrantStmt *grantStmt) { - GrantStmt *grantStmt = (GrantStmt *) parsetree; StringInfoData privsString; StringInfoData granteesString; StringInfoData targetString; @@ -2378,7 +2378,7 @@ ReplicateGrantStmt(Node *parsetree) ListCell *granteeCell = NULL; ListCell *objectCell = NULL; bool isFirst = true; - DDLJob *ddlJob = palloc(sizeof(DDLJob)); + List *ddlJobs = NIL; initStringInfo(&privsString); initStringInfo(&granteesString); @@ -2392,7 +2392,7 @@ ReplicateGrantStmt(Node *parsetree) if (grantStmt->targtype != ACL_TARGET_OBJECT || grantStmt->objtype != ACL_OBJECT_RELATION) { - return; + return NIL; } /* deparse the privileges */ @@ -2463,6 +2463,7 @@ ReplicateGrantStmt(Node *parsetree) RangeVar *relvar = (RangeVar *) lfirst(objectCell); Oid relOid = RangeVarGetRelid(relvar, NoLock, false); const char *grantOption = ""; + DDLJob *ddlJob = NULL; if (!IsDistributedTable(relOid)) { @@ -2495,12 +2496,15 @@ ReplicateGrantStmt(Node *parsetree) granteesString.data); } + ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relOid; - ddlJob->commandString = ddlString.data; + ddlJob->commandString = pstrdup(ddlString.data); ddlJob->taskList = DDLTaskList(relOid, ddlString.data); - ExecuteDistributedDDLJob(ddlJob); + ddlJobs = lappend(ddlJobs, ddlJob); resetStringInfo(&ddlString); } + + return ddlJobs; } diff --git a/src/include/distributed/multi_utility.h b/src/include/distributed/multi_utility.h index 6a4d7b42c..4f0b111ca 100644 --- a/src/include/distributed/multi_utility.h +++ b/src/include/distributed/multi_utility.h @@ -24,7 +24,7 @@ typedef struct DDLJob extern void multi_ProcessUtility(Node *parsetree, const char *queryString, ProcessUtilityContext context, ParamListInfo params, DestReceiver *dest, char *completionTag); -extern void ReplicateGrantStmt(Node *parsetree); +extern List * PlanGrantStmt(GrantStmt *grantStmt); #endif /* MULTI_UTILITY_H */ From 34a62abb7d80bd1cff701eb231b1dcb3893e571b Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Wed, 22 Mar 2017 15:01:15 -0600 Subject: [PATCH 8/8] Address code review comments --- .../distributed/executor/multi_utility.c | 97 +++++++++---------- src/include/distributed/multi_utility.h | 12 ++- 2 files changed, 55 insertions(+), 54 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 5037e754e..e569232aa 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -146,16 +146,13 @@ static bool warnedUserAbout2PC = false; /* - * Utility for handling citus specific concerns around utility statements. - * - * There's two basic types of concerns here: - * 1) Intercept utility statements that run after distributed query - * execution. At this stage, the Create Table command for the master node's - * temporary table has been executed, and this table's relationId is - * visible to us. We can therefore update the relationId in master node's - * select query. - * 2) Handle utility statements on distributed tables that the core code can't - * handle. + * multi_ProcessUtility is the main entry hook for implementing Citus-specific + * utility behavior. Its primary responsibilities are intercepting COPY and DDL + * commands and augmenting the coordinator's command with corresponding tasks + * to be run on worker nodes, after suitably ensuring said commands' options + * are fully supported by Citus. Much of the DDL behavior is toggled by Citus' + * enable_ddl_propagation GUC. In addition to DDL and COPY, utilities such as + * TRUNCATE and VACUUM are also supported. */ void multi_ProcessUtility(Node *parsetree, @@ -232,6 +229,8 @@ multi_ProcessUtility(Node *parsetree, } } + /* we're mostly in DDL (and VACUUM/TRUNCATE) territory at this point... */ + if (IsA(parsetree, CreateSeqStmt)) { ErrorIfUnsupportedSeqStmt((CreateSeqStmt *) parsetree); @@ -247,10 +246,7 @@ multi_ProcessUtility(Node *parsetree, ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree); } - /* - * DDL commands are propagated to workers only if EnableDDLPropagation is - * set to true and the current node is the coordinator - */ + /* only generate worker DDLJobs if propagation is enabled */ if (EnableDDLPropagation) { if (IsA(parsetree, IndexStmt)) @@ -335,7 +331,8 @@ multi_ProcessUtility(Node *parsetree, * Therefore, we check whether the given ALTER TABLE statement is a * FOREIGN KEY constraint and if so disable the validation step. * Note that validation is done on the shard level when DDL - * propagation is enabled. + * propagation is enabled. Unlike the preceeding Plan* calls, the + * following eagerly executes some tasks on workers. */ parsetree = WorkerProcessAlterTableStmt(alterTableStmt, queryString); } @@ -360,13 +357,13 @@ multi_ProcessUtility(Node *parsetree, " necessary users and roles."))); } + /* set user if needed and go ahead and run local utility using standard hook */ if (commandMustRunAsOwner) { GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); } - /* now drop into standard process utility */ standard_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); @@ -375,6 +372,7 @@ multi_ProcessUtility(Node *parsetree, SetUserIdAndSecContext(savedUserId, savedSecurityContext); } + /* after local command has completed, finish by executing worker DDLJobs, if any */ if (ddlJobs != NIL) { ListCell *ddlJobCell = NULL; @@ -387,7 +385,7 @@ multi_ProcessUtility(Node *parsetree, } } - /* we run VacuumStmt after standard hook to benefit from its checks and locking */ + /* TODO: fold VACUUM's processing into the above block */ if (IsA(parsetree, VacuumStmt)) { VacuumStmt *vacuumStmt = (VacuumStmt *) parsetree; @@ -611,12 +609,12 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR /* - * PlanIndexStmt processes create index statements for distributed tables. - * The function first checks if the statement belongs to a distributed table - * or not. If it does, then it executes distributed logic for the command. - * - * The function returns the IndexStmt node for the command to be executed on the - * master node table. + * PlanIndexStmt determines whether a given CREATE INDEX statement involves + * a distributed table. If so (and if the statement does not use unsupported + * options), it modifies the input statement to ensure proper execution against + * the master node table and creates a DDLJob to encapsulate information needed + * during the worker node portion of DDL execution before returning that DDLJob + * in a List. If no distributed table is involved, this function returns NIL. */ static List * PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) @@ -684,12 +682,6 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) ddlJobs = list_make1(ddlJob); } - else if (!createIndexStatement->if_not_exists) - { - /* if the index exists and there is no IF NOT EXISTS clause, error */ - ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE), - errmsg("relation \"%s\" already exists", indexName))); - } } } @@ -698,12 +690,12 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) /* - * PlanDropIndexStmt processes drop index statements for distributed tables. - * The function first checks if the statement belongs to a distributed table - * or not. If it does, then it executes distributed logic for the command. - * - * The function returns the DropStmt node for the command to be executed on the - * master node table. + * PlanDropIndexStmt determines whether a given DROP INDEX statement involves + * a distributed table. If so (and if the statement does not use unsupported + * options), it modifies the input statement to ensure proper execution against + * the master node table and creates a DDLJob to encapsulate information needed + * during the worker node portion of DDL execution before returning that DDLJob + * in a List. If no distributed table is involved, this function returns NIL. */ static List * PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) @@ -777,7 +769,6 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); - /* if it is supported, go ahead and execute the command */ ddlJob->targetRelationId = distributedRelationId; ddlJob->commandString = dropIndexCommand; ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand); @@ -790,12 +781,12 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) /* - * PlanAlterTableStmt processes alter table statements for distributed tables. - * The function first checks if the statement belongs to a distributed table - * or not. If it does, then it executes distributed logic for the command. - * - * The function returns the AlterTableStmt node for the command to be executed on the - * master node table. + * PlanAlterTableStmt determines whether a given ALTER TABLE statement involves + * a distributed table. If so (and if the statement does not use unsupported + * options), it modifies the input statement to ensure proper execution against + * the master node table and creates a DDLJob to encapsulate information needed + * during the worker node portion of DDL execution before returning that DDLJob + * in a List. If no distributed table is involved, this function returns NIL. */ static List * PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand) @@ -876,11 +867,13 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo if (rightRelationId) { + /* if foreign key related, use specialized task list function ... */ ddlJob->taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, alterTableCommand); } else { + /* ... otherwise use standard DDL task list function */ ddlJob->taskList = DDLTaskList(leftRelationId, alterTableCommand); } @@ -953,10 +946,10 @@ WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, /* - * PlanAlterObjectSchemaStmt processes ALTER ... SET SCHEMA statements for distributed - * objects. The function first checks if the statement belongs to a distributed objects - * or not. If it does, then it checks whether given object is a table. If it is, we warn - * out, since we do not support ALTER ... SET SCHEMA + * PlanAlterObjectSchemaStmt determines whether a given ALTER ... SET SCHEMA + * statement involves a distributed table and issues a warning if so. Because + * we do not support distributed ALTER ... SET SCHEMA, this function always + * returns NIL. */ static List * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, @@ -981,7 +974,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, return NIL; } - /* warn out if a distributed relation is affected */ + /* emit a warning if a distributed relation is affected */ ereport(WARNING, (errmsg("not propagating ALTER ... SET SCHEMA commands to " "worker nodes"), errhint("Connect to worker nodes directly to manually " @@ -1973,8 +1966,8 @@ IsAlterTableRenameStmt(RenameStmt *renameStmt) /* - * ExecuteDistributedDDLJob simply executes a provided DDLJob in a distributed - * transaction, including metadata sync if needed. If the multi shard commit protocol is + * ExecuteDistributedDDLJob simply executes a provided DDLJob in a distributed trans- + * action, including metadata sync if needed. If the multi shard commit protocol is * in its default value of '1pc', then a notice message indicating that '2pc' might be * used for extra safety. In the commit protocol, a BEGIN is sent after connection to * each shard placement and COMMIT/ROLLBACK is handled by @@ -2363,8 +2356,10 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) /* - * PlanGrantStmt replicates GRANT/REVOKE command to worker nodes if the - * the statement affects distributed tables. + * PlanGrantStmt determines whether a given GRANT/REVOKE statement involves + * a distributed table. If so, it creates DDLJobs to encapsulate information + * needed during the worker node portion of DDL execution before returning the + * DDLJobs in a List. If no distributed table is involved, this returns NIL. * * NB: So far column level privileges are not supported. */ diff --git a/src/include/distributed/multi_utility.h b/src/include/distributed/multi_utility.h index 4f0b111ca..143df5d65 100644 --- a/src/include/distributed/multi_utility.h +++ b/src/include/distributed/multi_utility.h @@ -14,11 +14,17 @@ extern bool EnableDDLPropagation; +/* + * A DDLJob encapsulates the remote tasks and commands needed to process all or + * part of a distributed DDL command. It hold the distributed relation's oid, + * the original DDL command string (for MX DDL propagation), and a task list of + * DDL_TASK-type Tasks to be executed. + */ typedef struct DDLJob { - Oid targetRelationId; - const char *commandString; - List *taskList; + Oid targetRelationId; /* oid of the target distributed relation */ + const char *commandString; /* initial (coordinator) DDL command string */ + List *taskList; /* worker DDL tasks to execute */ } DDLJob; extern void multi_ProcessUtility(Node *parsetree, const char *queryString,