Execute coordinator tasks immediately

pull/7325/head
Onur Tirtir 2023-11-06 14:37:19 +03:00
parent 240313e286
commit 214ea8b0bc
1 changed files with 122 additions and 0 deletions

View File

@ -110,6 +110,11 @@ static void DecrementUtilityHookCountersIfNecessary(Node *parsetree);
static bool IsDropSchemaOrDB(Node *parsetree); static bool IsDropSchemaOrDB(Node *parsetree);
static bool ShouldCheckUndistributeCitusLocalTables(void); static bool ShouldCheckUndistributeCitusLocalTables(void);
static void SplitCoordinatorJob(DDLJob *ddlJob, DDLJob **coordinatorDDLJob,
DDLJob **nonCoordinatorNodesDDLJob);
static void SplitCoordinatorTask(Task *task, Task **coordinatorTask,
Task **nonCoordinatorNodesTask);
/* /*
* ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of * ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of
* pieces of a utility statement before invoking ProcessUtility. * pieces of a utility statement before invoking ProcessUtility.
@ -632,6 +637,32 @@ citus_ProcessUtilityInternal(PlannedStmt *pstmt,
{ {
ddlJobs = ops->preprocess(parsetree, queryString, context); ddlJobs = ops->preprocess(parsetree, queryString, context);
} }
if (ddlJobs != NIL)
{
List *nonCoordinatorNodeJobs = NIL;
DDLJob *ddlJob = NULL;
foreach_ptr(ddlJob, ddlJobs)
{
DDLJob *coordinatorJob = NULL;
DDLJob *nonCoordinatorNodeJob = NULL;
SplitCoordinatorJob(ddlJob, &coordinatorJob, &nonCoordinatorNodeJob);
if (coordinatorJob)
{
ExecuteDistributedDDLJob(coordinatorJob);
}
if (nonCoordinatorNodeJob)
{
nonCoordinatorNodeJobs = lappend(nonCoordinatorNodeJobs,
nonCoordinatorNodeJob);
}
}
ddlJobs = nonCoordinatorNodeJobs;
}
} }
else else
{ {
@ -1533,6 +1564,97 @@ NodeDDLTaskList(TargetWorkerSet targets, List *commands)
} }
static void
SplitCoordinatorJob(DDLJob *ddlJob, DDLJob **coordinatorDDLJob,
DDLJob **nonCoordinatorNodesDDLJob)
{
*coordinatorDDLJob = palloc(sizeof(DDLJob));
*nonCoordinatorNodesDDLJob = palloc(sizeof(DDLJob));
**coordinatorDDLJob = *ddlJob;
**nonCoordinatorNodesDDLJob = *ddlJob;
(*coordinatorDDLJob)->taskList = NIL;
(*nonCoordinatorNodesDDLJob)->taskList = NIL;
Task *task = NULL;
foreach_ptr(task, ddlJob->taskList)
{
Task *coordinatorTask = NULL;
Task *nonCoordinatorNodesTask = NULL;
SplitCoordinatorTask(task, &coordinatorTask, &nonCoordinatorNodesTask);
if (coordinatorTask != NULL)
{
(*coordinatorDDLJob)->taskList =
lappend((*coordinatorDDLJob)->taskList, coordinatorTask);
}
if (nonCoordinatorNodesTask != NULL)
{
(*nonCoordinatorNodesDDLJob)->taskList =
lappend((*nonCoordinatorNodesDDLJob)->taskList, nonCoordinatorNodesTask);
}
}
if ((*coordinatorDDLJob)->taskList == NIL)
{
*coordinatorDDLJob = NULL;
}
if ((*nonCoordinatorNodesDDLJob)->taskList == NIL)
{
*nonCoordinatorNodesDDLJob = NULL;
}
if (*coordinatorDDLJob && *nonCoordinatorNodesDDLJob)
{
/* defer the metadata sync to "other nodes" phase */
(*coordinatorDDLJob)->metadataSyncCommand = NULL;
}
Assert(*coordinatorDDLJob || *nonCoordinatorNodesDDLJob);
}
static void
SplitCoordinatorTask(Task *task, Task **coordinatorTask, Task **nonCoordinatorNodesTask)
{
*coordinatorTask = copyObject(task);
*nonCoordinatorNodesTask = copyObject(task);
(*coordinatorTask)->taskPlacementList = NIL;
(*nonCoordinatorNodesTask)->taskPlacementList = NIL;
ShardPlacement *taskPlacement = NULL;
foreach_ptr(taskPlacement, task->taskPlacementList)
{
if (taskPlacement->groupId == COORDINATOR_GROUP_ID)
{
(*coordinatorTask)->taskPlacementList =
lappend((*coordinatorTask)->taskPlacementList, taskPlacement);
}
else
{
(*nonCoordinatorNodesTask)->taskPlacementList =
lappend((*nonCoordinatorNodesTask)->taskPlacementList, taskPlacement);
}
}
if ((*coordinatorTask)->taskPlacementList == NIL)
{
*coordinatorTask = NULL;
}
if ((*nonCoordinatorNodesTask)->taskPlacementList == NIL)
{
*nonCoordinatorNodesTask = NULL;
}
Assert(*coordinatorTask || *nonCoordinatorNodesTask);
}
/* /*
* AlterTableInProgress returns true if we're processing an ALTER TABLE command * AlterTableInProgress returns true if we're processing an ALTER TABLE command
* right now. * right now.