From a02a2a90c7521cd26fc8510764382c1fc1995713 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Sat, 11 Mar 2017 13:21:28 -0700 Subject: [PATCH] Refactor ExecuteDistDDLCommand to expect struct Will let us separate out the determination of what to execute from its actual execution. --- .../distributed/executor/multi_utility.c | 110 +++++++----------- src/include/distributed/multi_utility.h | 7 ++ 2 files changed, 47 insertions(+), 70 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index b0c980dbe..0c2a5a49d 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -132,11 +132,7 @@ static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement); /* Local functions forward declarations for helper functions */ static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort); static bool IsAlterTableRenameStmt(RenameStmt *renameStatement); -static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, - bool isTopLevel); -static void ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId, - const char *ddlCommandString, - bool isTopLevel); +static void ExecuteDistributedDDLJob(DDLJob *ddlJob); static void ShowNoticeIfNotUsing2PC(void); static List * DDLTaskList(Oid relationId, const char *commandString); static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId, @@ -673,7 +669,12 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand /* if index does not exist, send the command to workers */ if (!OidIsValid(indexRelationId)) { - ExecuteDistributedDDLCommand(relationId, createIndexCommand, isTopLevel); + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ddlJob->targetRelationId = relationId; + ddlJob->commandString = createIndexCommand; + ddlJob->taskList = DDLTaskList(relationId, createIndexCommand); + + ExecuteDistributedDDLJob(ddlJob); } else if (!createIndexStatement->if_not_exists) { @@ -764,10 +765,16 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, if (OidIsValid(distributedIndexId)) { + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); /* if it is supported, go ahead and execute the command */ - ExecuteDistributedDDLCommand(distributedRelationId, dropIndexCommand, isTopLevel); + ddlJob->targetRelationId = distributedRelationId; + ddlJob->commandString = dropIndexCommand; + ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand); + + ExecuteDistributedDDLJob(ddlJob); } return (Node *) dropIndexStatement; @@ -856,12 +863,22 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl if (rightRelationId) { - ExecuteDistributedForeignKeyCommand(leftRelationId, rightRelationId, - alterTableCommand, isTopLevel); + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ddlJob->targetRelationId = leftRelationId; + ddlJob->commandString = alterTableCommand; + ddlJob->taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, + alterTableCommand); + + ExecuteDistributedDDLJob(ddlJob); } else { - ExecuteDistributedDDLCommand(leftRelationId, alterTableCommand, isTopLevel); + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ddlJob->targetRelationId = leftRelationId; + ddlJob->commandString = alterTableCommand; + ddlJob->taskList = DDLTaskList(leftRelationId, alterTableCommand); + + ExecuteDistributedDDLJob(ddlJob); } return (Node *) alterTableStatement; @@ -1951,19 +1968,17 @@ IsAlterTableRenameStmt(RenameStmt *renameStmt) /* - * ExecuteDistributedDDLCommand applies a given DDL command to the given - * distributed table in a distributed transaction. If the multi shard commit protocol is + * ExecuteDistributedDDLJob simply executes a provided DDLJob in a distributed + * transaction, including metadata sync if needed. If the multi shard commit protocol is * in its default value of '1pc', then a notice message indicating that '2pc' might be * used for extra safety. In the commit protocol, a BEGIN is sent after connection to * each shard placement and COMMIT/ROLLBACK is handled by * CompleteShardPlacementTransactions function. */ static void -ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, - bool isTopLevel) +ExecuteDistributedDDLJob(DDLJob *ddlJob) { - List *taskList = NIL; - bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId); + bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId); if (XactModificationLevel == XACT_MODIFICATION_DATA) { @@ -1979,60 +1994,10 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, if (shouldSyncMetadata) { SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); - SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString); + SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString); } - taskList = DDLTaskList(relationId, ddlCommandString); - - ExecuteModifyTasksWithoutResults(taskList); -} - - -/* - * ExecuteDistributedForeignKeyCommand applies a given foreign key command to the given - * distributed table in a distributed transaction. If the multi shard commit protocol is - * in its default value of '1pc', then a notice message indicating that '2pc' might be - * used for extra safety. In the commit protocol, a BEGIN is sent after connection to - * each shard placement and COMMIT/ROLLBACK is handled by - * CompleteShardPlacementTransactions function. - * - * leftRelationId is the relation id of actual distributed table which given foreign key - * command is applied. rightRelationId is the relation id of distributed table which - * foreign key refers to. - */ -static void -ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId, - const char *ddlCommandString, bool isTopLevel) -{ - List *taskList = NIL; - bool shouldSyncMetadata = false; - - if (XactModificationLevel == XACT_MODIFICATION_DATA) - { - ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("distributed DDL commands must not appear within " - "transaction blocks containing single-shard data " - "modifications"))); - } - - EnsureCoordinator(); - ShowNoticeIfNotUsing2PC(); - - /* - * It is sufficient to check only one of the tables for metadata syncing on workers, - * since the colocation of two tables implies that either both or none of them have - * metadata on workers. - */ - shouldSyncMetadata = ShouldSyncTableMetadata(leftRelationId); - if (shouldSyncMetadata) - { - SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); - SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString); - } - - taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, ddlCommandString); - - ExecuteModifyTasksWithoutResults(taskList); + ExecuteModifyTasksWithoutResults(ddlJob->taskList); } @@ -2409,6 +2374,7 @@ ReplicateGrantStmt(Node *parsetree) ListCell *granteeCell = NULL; ListCell *objectCell = NULL; bool isFirst = true; + DDLJob *ddlJob = palloc(sizeof(DDLJob)); initStringInfo(&privsString); initStringInfo(&granteesString); @@ -2493,7 +2459,6 @@ ReplicateGrantStmt(Node *parsetree) RangeVar *relvar = (RangeVar *) lfirst(objectCell); Oid relOid = RangeVarGetRelid(relvar, NoLock, false); const char *grantOption = ""; - bool isTopLevel = true; if (!IsDistributedTable(relOid)) { @@ -2526,7 +2491,12 @@ ReplicateGrantStmt(Node *parsetree) granteesString.data); } - ExecuteDistributedDDLCommand(relOid, ddlString.data, isTopLevel); + ddlJob->targetRelationId = relOid; + ddlJob->commandString = ddlString.data; + ddlJob->taskList = DDLTaskList(relOid, ddlString.data); + + ExecuteDistributedDDLJob(ddlJob); + resetStringInfo(&ddlString); } } diff --git a/src/include/distributed/multi_utility.h b/src/include/distributed/multi_utility.h index 2b3a7cdf1..6a4d7b42c 100644 --- a/src/include/distributed/multi_utility.h +++ b/src/include/distributed/multi_utility.h @@ -14,6 +14,13 @@ extern bool EnableDDLPropagation; +typedef struct DDLJob +{ + Oid targetRelationId; + const char *commandString; + List *taskList; +} DDLJob; + extern void multi_ProcessUtility(Node *parsetree, const char *queryString, ProcessUtilityContext context, ParamListInfo params, DestReceiver *dest, char *completionTag);