diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 93e76f18a..03f5e1349 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -101,17 +101,16 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement); /* Local functions forward declarations for processing distributed table commands */ static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner); -static Node * PlanIndexStmt(IndexStmt *createIndexStatement, - const char *createIndexCommand, bool isTopLevel); -static Node * PlanDropIndexStmt(DropStmt *dropIndexStatement, - const char *dropIndexCommand, bool isTopLevel); -static Node * PlanAlterTableStmt(AlterTableStmt *alterTableStatement, - const char *alterTableCommand, bool isTopLevel); +static DDLJob * PlanIndexStmt(IndexStmt *createIndexStatement, + const char *createIndexCommand); +static DDLJob * PlanDropIndexStmt(DropStmt *dropIndexStatement, + const char *dropIndexCommand); +static DDLJob * PlanAlterTableStmt(AlterTableStmt *alterTableStatement, + const char *alterTableCommand); static Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand); -static Node * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, - const char *alterObjectSchemaCommand, - bool isTopLevel); +static DDLJob * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, + const char *alterObjectSchemaCommand); static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand); static bool IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumStmt); static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt); @@ -253,12 +252,11 @@ multi_ProcessUtility(Node *parsetree, */ if (EnableDDLPropagation) { - bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL); + DDLJob *ddlJob = NULL; if (IsA(parsetree, IndexStmt)) { - parsetree = PlanIndexStmt((IndexStmt *) parsetree, queryString, - isTopLevel); + ddlJob = PlanIndexStmt((IndexStmt *) parsetree, queryString); } if (IsA(parsetree, DropStmt)) @@ -266,7 +264,7 @@ multi_ProcessUtility(Node *parsetree, DropStmt *dropStatement = (DropStmt *) parsetree; if (dropStatement->removeType == OBJECT_INDEX) { - parsetree = PlanDropIndexStmt(dropStatement, queryString, isTopLevel); + ddlJob = PlanDropIndexStmt(dropStatement, queryString); } } @@ -275,8 +273,7 @@ multi_ProcessUtility(Node *parsetree, AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree; if (alterTableStmt->relkind == OBJECT_TABLE) { - parsetree = PlanAlterTableStmt(alterTableStmt, queryString, - isTopLevel); + ddlJob = PlanAlterTableStmt(alterTableStmt, queryString); } } @@ -300,8 +297,7 @@ multi_ProcessUtility(Node *parsetree, if (IsA(parsetree, AlterObjectSchemaStmt)) { AlterObjectSchemaStmt *setSchemaStmt = (AlterObjectSchemaStmt *) parsetree; - parsetree = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString, - isTopLevel); + ddlJob = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString); } /* @@ -316,6 +312,11 @@ multi_ProcessUtility(Node *parsetree, errhint("Connect to worker nodes directly to manually " "move all tables."))); } + + if (ddlJob != NULL) + { + ExecuteDistributedDDLJob(ddlJob); + } } else { @@ -611,10 +612,11 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR * The function returns the IndexStmt node for the command to be executed on the * master node table. */ -static Node * -PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, - bool isTopLevel) +static DDLJob * +PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) { + DDLJob *ddlJob = NULL; + /* * We first check whether a distributed relation is affected. For that, we need to * open the relation. To prevent race conditions with later lookups, lock the table, @@ -669,12 +671,10 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, /* if index does not exist, send the command to workers */ if (!OidIsValid(indexRelationId)) { - DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; ddlJob->commandString = createIndexCommand; ddlJob->taskList = DDLTaskList(relationId, createIndexCommand); - - ExecuteDistributedDDLJob(ddlJob); } else if (!createIndexStatement->if_not_exists) { @@ -685,7 +685,7 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, } } - return (Node *) createIndexStatement; + return ddlJob; } @@ -697,10 +697,10 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, * The function returns the DropStmt node for the command to be executed on the * master node table. */ -static Node * -PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, - bool isTopLevel) +static DDLJob * +PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) { + DDLJob *ddlJob = NULL; ListCell *dropObjectCell = NULL; Oid distributedIndexId = InvalidOid; Oid distributedRelationId = InvalidOid; @@ -765,7 +765,7 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, if (OidIsValid(distributedIndexId)) { - DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ddlJob = palloc0(sizeof(DDLJob)); ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); @@ -773,11 +773,9 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, ddlJob->targetRelationId = distributedRelationId; ddlJob->commandString = dropIndexCommand; ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand); - - ExecuteDistributedDDLJob(ddlJob); } - return (Node *) dropIndexStatement; + return ddlJob; } @@ -789,10 +787,10 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, * The function returns the AlterTableStmt node for the command to be executed on the * master node table. */ -static Node * -PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand, - bool isTopLevel) +static DDLJob * +PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand) { + DDLJob *ddlJob = NULL; LOCKMODE lockmode = 0; Oid leftRelationId = InvalidOid; Oid rightRelationId = InvalidOid; @@ -803,20 +801,20 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo /* first check whether a distributed relation is affected */ if (alterTableStatement->relation == NULL) { - return (Node *) alterTableStatement; + return NULL; } lockmode = AlterTableGetLockLevel(alterTableStatement->cmds); leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode); if (!OidIsValid(leftRelationId)) { - return (Node *) alterTableStatement; + return NULL; } isDistributedRelation = IsDistributedTable(leftRelationId); if (!isDistributedRelation) { - return (Node *) alterTableStatement; + return NULL; } ErrorIfUnsupportedAlterTableStmt(alterTableStatement); @@ -861,27 +859,21 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo } } + ddlJob = palloc0(sizeof(DDLJob)); + ddlJob->targetRelationId = leftRelationId; + ddlJob->commandString = alterTableCommand; + if (rightRelationId) { - DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = leftRelationId; - ddlJob->commandString = alterTableCommand; ddlJob->taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, alterTableCommand); - - ExecuteDistributedDDLJob(ddlJob); } else { - DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = leftRelationId; - ddlJob->commandString = alterTableCommand; ddlJob->taskList = DDLTaskList(leftRelationId, alterTableCommand); - - ExecuteDistributedDDLJob(ddlJob); } - return (Node *) alterTableStatement; + return ddlJob; } @@ -953,16 +945,16 @@ WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, * or not. If it does, then it checks whether given object is a table. If it is, we warn * out, since we do not support ALTER ... SET SCHEMA */ -static Node * +static DDLJob * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, - const char *alterObjectSchemaCommand, bool isTopLevel) + const char *alterObjectSchemaCommand) { Oid relationId = InvalidOid; bool noWait = false; if (alterObjectSchemaStmt->relation == NULL) { - return (Node *) alterObjectSchemaStmt; + return NULL; } relationId = RangeVarGetRelidExtended(alterObjectSchemaStmt->relation, @@ -973,7 +965,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, /* first check whether a distributed relation is affected */ if (!OidIsValid(relationId) || !IsDistributedTable(relationId)) { - return (Node *) alterObjectSchemaStmt; + return NULL; } /* warn out if a distributed relation is affected */ @@ -982,7 +974,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, errhint("Connect to worker nodes directly to manually " "change schemas of affected objects."))); - return (Node *) alterObjectSchemaStmt; + return NULL; }