From 214ea8b0bc0d6975e9181f189ab5d6a3799cb3f4 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Mon, 6 Nov 2023 14:37:19 +0300 Subject: [PATCH] Execute coordinator tasks immediately --- .../distributed/commands/utility_hook.c | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index afc8fa9fd..14afc6c61 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -110,6 +110,11 @@ static void DecrementUtilityHookCountersIfNecessary(Node *parsetree); static bool IsDropSchemaOrDB(Node *parsetree); static bool ShouldCheckUndistributeCitusLocalTables(void); +static void SplitCoordinatorJob(DDLJob *ddlJob, DDLJob **coordinatorDDLJob, + DDLJob **nonCoordinatorNodesDDLJob); +static void SplitCoordinatorTask(Task *task, Task **coordinatorTask, + Task **nonCoordinatorNodesTask); + /* * ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of * pieces of a utility statement before invoking ProcessUtility. @@ -632,6 +637,32 @@ citus_ProcessUtilityInternal(PlannedStmt *pstmt, { ddlJobs = ops->preprocess(parsetree, queryString, context); } + + if (ddlJobs != NIL) + { + List *nonCoordinatorNodeJobs = NIL; + + DDLJob *ddlJob = NULL; + foreach_ptr(ddlJob, ddlJobs) + { + DDLJob *coordinatorJob = NULL; + DDLJob *nonCoordinatorNodeJob = NULL; + SplitCoordinatorJob(ddlJob, &coordinatorJob, &nonCoordinatorNodeJob); + + if (coordinatorJob) + { + ExecuteDistributedDDLJob(coordinatorJob); + } + + if (nonCoordinatorNodeJob) + { + nonCoordinatorNodeJobs = lappend(nonCoordinatorNodeJobs, + nonCoordinatorNodeJob); + } + } + + ddlJobs = nonCoordinatorNodeJobs; + } } else { @@ -1533,6 +1564,97 @@ NodeDDLTaskList(TargetWorkerSet targets, List *commands) } +static void +SplitCoordinatorJob(DDLJob *ddlJob, DDLJob **coordinatorDDLJob, + DDLJob **nonCoordinatorNodesDDLJob) +{ + *coordinatorDDLJob = palloc(sizeof(DDLJob)); + *nonCoordinatorNodesDDLJob = palloc(sizeof(DDLJob)); + + **coordinatorDDLJob = *ddlJob; + **nonCoordinatorNodesDDLJob = *ddlJob; + + (*coordinatorDDLJob)->taskList = NIL; + (*nonCoordinatorNodesDDLJob)->taskList = NIL; + + Task *task = NULL; + foreach_ptr(task, ddlJob->taskList) + { + Task *coordinatorTask = NULL; + Task *nonCoordinatorNodesTask = NULL; + SplitCoordinatorTask(task, &coordinatorTask, &nonCoordinatorNodesTask); + + if (coordinatorTask != NULL) + { + (*coordinatorDDLJob)->taskList = + lappend((*coordinatorDDLJob)->taskList, coordinatorTask); + } + + if (nonCoordinatorNodesTask != NULL) + { + (*nonCoordinatorNodesDDLJob)->taskList = + lappend((*nonCoordinatorNodesDDLJob)->taskList, nonCoordinatorNodesTask); + } + } + + if ((*coordinatorDDLJob)->taskList == NIL) + { + *coordinatorDDLJob = NULL; + } + + if ((*nonCoordinatorNodesDDLJob)->taskList == NIL) + { + *nonCoordinatorNodesDDLJob = NULL; + } + + if (*coordinatorDDLJob && *nonCoordinatorNodesDDLJob) + { + /* defer the metadata sync to "other nodes" phase */ + (*coordinatorDDLJob)->metadataSyncCommand = NULL; + } + + Assert(*coordinatorDDLJob || *nonCoordinatorNodesDDLJob); +} + + +static void +SplitCoordinatorTask(Task *task, Task **coordinatorTask, Task **nonCoordinatorNodesTask) +{ + *coordinatorTask = copyObject(task); + *nonCoordinatorNodesTask = copyObject(task); + + (*coordinatorTask)->taskPlacementList = NIL; + (*nonCoordinatorNodesTask)->taskPlacementList = NIL; + + ShardPlacement *taskPlacement = NULL; + foreach_ptr(taskPlacement, task->taskPlacementList) + { + if (taskPlacement->groupId == COORDINATOR_GROUP_ID) + { + (*coordinatorTask)->taskPlacementList = + lappend((*coordinatorTask)->taskPlacementList, taskPlacement); + } + else + { + (*nonCoordinatorNodesTask)->taskPlacementList = + lappend((*nonCoordinatorNodesTask)->taskPlacementList, taskPlacement); + } + } + + if ((*coordinatorTask)->taskPlacementList == NIL) + { + *coordinatorTask = NULL; + } + + if ((*nonCoordinatorNodesTask)->taskPlacementList == NIL) + { + *nonCoordinatorNodesTask = NULL; + } + + Assert(*coordinatorTask || *nonCoordinatorNodesTask); +} + + /* * AlterTableInProgress returns true if we're processing an ALTER TABLE command * right now.