From 1a20c4fe6d2a92674dd2b99f0d23350bef921ee1 Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Fri, 6 Jun 2025 17:43:47 -0700 Subject: [PATCH] Bakc --- src/backend/distributed/executor/adaptive_executor.c | 1 + src/backend/distributed/executor/subplan_execution.c | 5 +++++ src/backend/distributed/planner/distributed_planner.c | 1 + src/backend/distributed/planner/multi_explain.c | 2 +- src/backend/distributed/planner/multi_router_planner.c | 2 ++ src/include/distributed/multi_physical_planner.h | 3 +++ 6 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 895f01ae7..1c5d4f3d1 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -897,6 +897,7 @@ AdaptiveExecutor(CitusScanState *scanState) FinishDistributedExecution(execution); + job->jobExecuted = true; if (SortReturning && distributedPlan->expectResults && commandType != CMD_SELECT) { SortTupleStore(scanState); diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c index ef2838343..6cc87105f 100644 --- a/src/backend/distributed/executor/subplan_execution.c +++ b/src/backend/distributed/executor/subplan_execution.c @@ -41,12 +41,17 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) uint64 planId = distributedPlan->planId; List *subPlanList = distributedPlan->subPlanList; + if (distributedPlan->workerJob) + distributedPlan->workerJob->jobExecuted = true; + distributedPlan->subPlansExecuted = true; + if (subPlanList == NIL) { /* no subplans to execute */ return; } + HTAB *intermediateResultsHash = MakeIntermediateResultHTAB(); RecordSubplanExecutionsOnNodes(intermediateResultsHash, distributedPlan); diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 193e2f250..8c372735a 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -1395,6 +1395,7 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) PlannedStmt *finalPlan = NULL; CustomScan *customScan = makeNode(CustomScan); MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST; + distributedPlan->subPlansExecuted = false; /* this field is used in JobExecutorType */ distributedPlan->relationIdList = localPlan->relationOids; diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index d1e6be1a8..708225bec 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -625,7 +625,7 @@ ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es, ExplainOpenGroup("Job", "Job", true, es); ExplainPropertyInteger("Task Count", NULL, taskCount, es); - if (ShowReceivedTupleData(scanState, es)) + if (ShowReceivedTupleData(scanState, es) || job->jobExecuted) { Task *task = NULL; uint64 totalReceivedTupleDataForAllTasks = 0; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 19d386343..5a81c9b08 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1692,6 +1692,7 @@ CreateJob(Query *query) job->subqueryPushdown = false; job->requiresCoordinatorEvaluation = false; job->deferredPruning = false; + job->jobExecuted = false; return job; } @@ -1791,6 +1792,7 @@ CreateTask(TaskType taskType) task->modifyWithSubquery = false; task->partiallyLocalOrRemote = false; task->relationShardList = NIL; + task->taskCompleted = false; return task; } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 475a41b37..521aacc39 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -153,6 +153,7 @@ typedef struct Job */ bool parametersInJobQueryResolved; uint32 colocationId; /* common colocation group ID of the relations */ + bool jobExecuted; } Job; @@ -334,6 +335,7 @@ typedef struct Task Const *partitionKeyValue; int colocationId; + bool taskCompleted; } Task; @@ -471,6 +473,7 @@ typedef struct DistributedPlan * of source rows to be repartitioned for colocation with the target. */ int sourceResultRepartitionColumnIndex; + bool subPlansExecuted; } DistributedPlan;