From 6b9962c0c03da7b5ee24945c9c1ef24b01f52d7e Mon Sep 17 00:00:00 2001 From: ibrahim halatci Date: Tue, 29 Jul 2025 13:24:42 +0300 Subject: [PATCH 1/5] [doc] wrong code comments for function PopUnassignedPlacementExecution (#8079) Fixes #7621 DESCRIPTION: function comment correction --- src/backend/distributed/executor/adaptive_executor.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 895f01ae7..846ba6427 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -3804,7 +3804,7 @@ PopAssignedPlacementExecution(WorkerSession *session) /* - * PopAssignedPlacementExecution finds an executable task from the queue of assigned tasks. + * PopUnAssignedPlacementExecution finds an executable task from the queue of unassigned tasks. */ static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool) From f31bcb42199116cfdbb8be86ed4de0c07c82a576 Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Tue, 29 Jul 2025 15:52:36 +0300 Subject: [PATCH 2/5] PG18 - Assert("HaveRegisteredOrActiveSnapshot() fix for cluster creation (#8073) fixes #8072 fixes #8055 https://github.com/postgres/postgres/commit/706054b11b959c865c0c7935c34d92370d7168d4 before fix when try to create cluster with assert on `citus_dev make test1 --destroy` ``` TRAP: failed Assert("HaveRegisteredOrActiveSnapshot()"), File: "heapam.c", Line: 232, PID: 75572 postgres: citus citus [local] SELECT(ExceptionalCondition+0x6e)[0x5585e16123e6] postgres: citus citus [local] SELECT(heap_insert+0x220)[0x5585e10709af] postgres: citus citus [local] SELECT(simple_heap_insert+0x33)[0x5585e1071a20] postgres: citus citus [local] SELECT(CatalogTupleInsert+0x32)[0x5585e1135843] /home/citus/.pgenv/pgsql-18beta2/lib/citus.so(+0x11e0aa)[0x7fa26f1ca0aa] /home/citus/.pgenv/pgsql-18beta2/lib/citus.so(+0x11b607)[0x7fa26f1c7607] /home/citus/.pgenv/pgsql-18beta2/lib/citus.so(+0x11bf25)[0x7fa26f1c7f25] /home/citus/.pgenv/pgsql-18beta2/lib/citus.so(+0x11d4e2)[0x7fa26f1c94e2] postgres: citus citus [local] SELECT(+0x1c267d)[0x5585e10e967d] postgres: citus citus [local] SELECT(+0x1c6ba0)[0x5585e10edba0] postgres: citus citus [local] SELECT(+0x1c7b80)[0x5585e10eeb80] postgres: citus citus [local] SELECT(CommitTransactionCommand+0xd)[0x5585e10eef0a] postgres: citus citus [local] SELECT(+0x575b3d)[0x5585e149cb3d] postgres: citus citus [local] SELECT(+0x5788ce)[0x5585e149f8ce] postgres: citus citus [local] SELECT(PostgresMain+0xae7)[0x5585e14a2088] postgres: citus citus [local] SELECT(BackendMain+0x51)[0x5585e149ab36] postgres: citus citus [local] SELECT(postmaster_child_launch+0x101)[0x5585e13d6b32] postgres: citus citus [local] SELECT(+0x4b273f)[0x5585e13d973f] postgres: citus citus [local] SELECT(+0x4b49f3)[0x5585e13db9f3] postgres: citus citus [local] SELECT(PostmasterMain+0x1089)[0x5585e13dcee2] postgres: citus citus [local] SELECT(main+0x1d7)[0x5585e12e3428] /lib/x86_64-linux-gnu/libc.so.6(+0x29d90)[0x7fa271421d90] /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0x80)[0x7fa271421e40] ``` --- .../distributed/metadata/node_metadata.c | 2 +- .../transaction/transaction_recovery.c | 2 +- src/include/pg_version_compat.h | 20 +++++++++++++++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 6c7a98587..2412a88a2 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -2930,7 +2930,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); - CatalogTupleInsert(pgDistNode, heapTuple); + CATALOG_INSERT_WITH_SNAPSHOT(pgDistNode, heapTuple); CitusInvalidateRelcacheByRelid(DistNodeRelationId()); diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index fb5509def..a4ad3e094 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -106,7 +106,7 @@ LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId out TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction); HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); - CatalogTupleInsert(pgDistTransaction, heapTuple); + CATALOG_INSERT_WITH_SNAPSHOT(pgDistTransaction, heapTuple); CommandCounterIncrement(); diff --git a/src/include/pg_version_compat.h b/src/include/pg_version_compat.h index 385aecd38..997ad4b58 100644 --- a/src/include/pg_version_compat.h +++ b/src/include/pg_version_compat.h @@ -13,6 +13,10 @@ #include "pg_version_constants.h" +/* we need these for PG-18’s PushActiveSnapshot/PopActiveSnapshot APIs */ +#include "access/xact.h" +#include "utils/snapmgr.h" + #if PG_VERSION_NUM >= PG_VERSION_18 #define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, k) \ create_foreignscan_path( \ @@ -36,6 +40,14 @@ /* PG-18 unified row-compare operator codes under COMPARE_* */ #define ROWCOMPARE_NE COMPARE_NE +#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \ + do { \ + Snapshot __snap = GetTransactionSnapshot(); \ + PushActiveSnapshot(__snap); \ + CatalogTupleInsert((rel), (tup)); \ + PopActiveSnapshot(); \ + } while (0) + #elif PG_VERSION_NUM >= PG_VERSION_17 #define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, k) \ create_foreignscan_path( \ @@ -43,6 +55,10 @@ (e), (f), \ (g), (h), (i), (j), (k) \ ) + +/* no-op wrapper on older PGs */ +#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \ + CatalogTupleInsert((rel), (tup)) #endif #if PG_VERSION_NUM >= PG_VERSION_17 @@ -453,6 +469,10 @@ getStxstattarget_compat(HeapTuple tup) k) create_foreignscan_path(a, b, c, d, e, f, g, h, \ i, k) +/* no-op wrapper on older PGs */ +#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \ + CatalogTupleInsert((rel), (tup)) + #define getProcNo_compat(a) (a->pgprocno) #define getLxid_compat(a) (a->lxid) From 889aa92ac019c539d404a0958098904e99f3ee62 Mon Sep 17 00:00:00 2001 From: Teja Mupparti <44680808+tejeswarm@users.noreply.github.com> Date: Wed, 30 Jul 2025 11:29:50 -0700 Subject: [PATCH 3/5] EXPLAIN ANALYZE - Prevent execution of the plan during the plan-print (#8017) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DESCRIPTION: Fixed a bug in EXPLAIN ANALYZE to prevent unintended (duplicate) execution of the (sub)plans during the explain phase. Fixes #4212 ### 🐞 Bug #4212 : Redundant (Subplan) Execution in `EXPLAIN ANALYZE` codepath #### 🔍 Background In the standard PostgreSQL execution path, `ExplainOnePlan()` is responsible for two distinct operations depending on whether `EXPLAIN ANALYZE` is requested: 1. **Execute the plan** ```c if (es->analyze) ExecutorRun(queryDesc, direction, 0L, true); ``` 2. **Print the plan tree** ```c ExplainPrintPlan(es, queryDesc); ``` When printing the plan, the executor should **not run the plan again**. Execution is only expected to happen once—at the top level when `es->analyze = true`. --- #### ⚠️ Issue in Citus In the Citus implementation of `CustomScanMethods.ExplainCustomScan = CitusExplainScan`, which is a custom scan explain callback function used to print explain information of a Citus plan incorrectly performs **redundant execution** inside the explain path of `ExplainPrintPlan()` ```c ExplainOnePlan() ExplainPrintPlan() ExplainNode() CitusExplainScan() if (distributedPlan->subPlanList != NIL) { ExplainSubPlans(distributedPlan, es); { PlannedStmt *plan = subPlan->plan; ExplainOnePlan(plan, ...); // ⚠️ May re-execute subplan if es->analyze is true } } ``` This causes the subplans to be **executed again**, even though they have already been executed during the top-level plan execution. This behavior violates the expectation in PostgreSQL where `EXPLAIN ANALYZE` should **execute each node exactly once** for analysis. --- #### ✅ Fix (proposed) Save the output of Subplans during `ExecuteSubPlans()`, and later use it in `ExplainSubPlans()` --- .../distributed/executor/adaptive_executor.c | 2 +- .../executor/insert_select_executor.c | 3 +- .../distributed/executor/merge_executor.c | 3 +- .../distributed/executor/multi_executor.c | 10 +- .../distributed/executor/subplan_execution.c | 56 +++- .../distributed/planner/multi_explain.c | 295 +++++++++++++++--- .../distributed/sql/citus--13.1-1--13.2-1.sql | 1 + .../sql/downgrades/citus--13.2-1--13.1-1.sql | 1 + .../13.2-1.sql | 10 + .../9.4-1.sql | 2 + .../latest.sql | 5 +- .../distributed/utils/citus_copyfuncs.c | 25 ++ .../distributed/utils/citus_outfuncs.c | 42 +++ src/include/distributed/multi_executor.h | 4 +- .../distributed/multi_physical_planner.h | 21 ++ src/include/distributed/subplan_execution.h | 2 +- src/test/regress/expected/multi_explain.out | 163 +++++++++- src/test/regress/expected/multi_explain_0.out | 163 +++++++++- src/test/regress/expected/multi_extension.out | 4 +- src/test/regress/expected/stat_counters.out | 9 +- src/test/regress/sql/multi_explain.sql | 26 ++ src/test/regress/sql/stat_counters.sql | 9 +- 22 files changed, 775 insertions(+), 81 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/13.2-1.sql diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 846ba6427..677535591 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -760,7 +760,7 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState) */ LockPartitionsForDistributedPlan(distributedPlan); - ExecuteSubPlans(distributedPlan); + ExecuteSubPlans(distributedPlan, RequestedForExplainAnalyze(scanState)); scanState->finishedPreScan = true; } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 9ed1962fa..58c172c66 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -42,6 +42,7 @@ #include "distributed/merge_planner.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" +#include "distributed/multi_explain.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" @@ -121,7 +122,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node) bool binaryFormat = CanUseBinaryCopyFormatForTargetList(selectQuery->targetList); - ExecuteSubPlans(distSelectPlan); + ExecuteSubPlans(distSelectPlan, RequestedForExplainAnalyze(scanState)); /* * We have a separate directory for each transaction, so choosing diff --git a/src/backend/distributed/executor/merge_executor.c b/src/backend/distributed/executor/merge_executor.c index d0f01dcf2..56bde62bc 100644 --- a/src/backend/distributed/executor/merge_executor.c +++ b/src/backend/distributed/executor/merge_executor.c @@ -23,6 +23,7 @@ #include "distributed/merge_executor.h" #include "distributed/merge_planner.h" #include "distributed/multi_executor.h" +#include "distributed/multi_explain.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_router_planner.h" #include "distributed/repartition_executor.h" @@ -132,7 +133,7 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState) ereport(DEBUG1, (errmsg("Executing subplans of the source query and " "storing the results at the respective node(s)"))); - ExecuteSubPlans(distSourcePlan); + ExecuteSubPlans(distSourcePlan, RequestedForExplainAnalyze(scanState)); /* * We have a separate directory for each transaction, so choosing diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index dba302e7c..eb6bdf111 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -688,7 +688,7 @@ ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *d * ExecutePlanIntoDestReceiver executes a query plan and sends results to the given * DestReceiver. */ -void +uint64 ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, DestReceiver *dest) { @@ -713,6 +713,8 @@ ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, PortalStart(portal, params, eflags, GetActiveSnapshot()); + QueryCompletion qc = { 0 }; + #if PG_VERSION_NUM >= PG_VERSION_18 /* PG 18+: six-arg signature (drop the run_once bool) */ @@ -721,7 +723,7 @@ ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, false, /* isTopLevel */ dest, /* DestReceiver *dest */ dest, /* DestReceiver *altdest */ - NULL); /* QueryCompletion *qc */ + &qc); /* QueryCompletion *qc */ #else /* PG 17-: original seven-arg signature */ @@ -731,10 +733,12 @@ ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, true, /* run_once */ dest, /* DestReceiver *dest */ dest, /* DestReceiver *altdest */ - NULL); /* QueryCompletion *qc */ + &qc); /* QueryCompletion *qc */ #endif PortalDrop(portal, false); + + return qc.nprocessed; } diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c index ef2838343..108d130ec 100644 --- a/src/backend/distributed/executor/subplan_execution.c +++ b/src/backend/distributed/executor/subplan_execution.c @@ -30,13 +30,22 @@ int MaxIntermediateResult = 1048576; /* maximum size in KB the intermediate resu /* when this is true, we enforce intermediate result size limit in all executors */ int SubPlanLevel = 0; +/* + * SubPlanExplainAnalyzeContext is both a memory context for storing + * subplans’ EXPLAIN ANALYZE output and a flag indicating that execution + * is running under EXPLAIN ANALYZE for subplans. + */ +MemoryContext SubPlanExplainAnalyzeContext = NULL; +SubPlanExplainOutputData *SubPlanExplainOutput; +extern uint8 TotalExplainOutputCapacity; +extern uint8 NumTasksOutput; /* * ExecuteSubPlans executes a list of subplans from a distributed plan * by sequentially executing each plan from the top. */ void -ExecuteSubPlans(DistributedPlan *distributedPlan) +ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled) { uint64 planId = distributedPlan->planId; List *subPlanList = distributedPlan->subPlanList; @@ -47,6 +56,19 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) return; } + /* + * If the root DistributedPlan has EXPLAIN ANALYZE enabled, + * its subplans should also have EXPLAIN ANALYZE enabled. + */ + if (explainAnalyzeEnabled) + { + SubPlanExplainAnalyzeContext = GetMemoryChunkContext(distributedPlan); + } + else + { + SubPlanExplainAnalyzeContext = NULL; + } + HTAB *intermediateResultsHash = MakeIntermediateResultHTAB(); RecordSubplanExecutionsOnNodes(intermediateResultsHash, distributedPlan); @@ -79,7 +101,23 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) TimestampTz startTimestamp = GetCurrentTimestamp(); - ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest); + uint64 nprocessed; + + PG_TRY(); + { + nprocessed = + ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest); + } + PG_CATCH(); + { + SubPlanExplainAnalyzeContext = NULL; + SubPlanExplainOutput = NULL; + TotalExplainOutputCapacity = 0; + NumTasksOutput = 0; + PG_RE_THROW(); + } + PG_END_TRY(); + /* * EXPLAIN ANALYZE instrumentations. Calculating these are very light-weight, @@ -94,10 +132,24 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) subPlan->durationMillisecs += durationMicrosecs * MICRO_TO_MILLI_SECOND; subPlan->bytesSentPerWorker = RemoteFileDestReceiverBytesSent(copyDest); + subPlan->ntuples = nprocessed; subPlan->remoteWorkerCount = list_length(remoteWorkerNodeList); subPlan->writeLocalFile = entry->writeLocalFile; SubPlanLevel--; + + /* + * Save the EXPLAIN ANALYZE output(s) for later extraction in ExplainSubPlans(). + * Because the SubPlan context isn’t available during distributed execution, + * pass the pointer as a global variable in SubPlanExplainOutput. + */ + subPlan->totalExplainOutput = SubPlanExplainOutput; + subPlan->numTasksOutput = NumTasksOutput; + SubPlanExplainOutput = NULL; + TotalExplainOutputCapacity = 0; + NumTasksOutput = 0; FreeExecutorState(estate); } + + SubPlanExplainAnalyzeContext = NULL; } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index f357663a6..4d27939f7 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -26,6 +26,7 @@ #include "commands/tablecmds.h" #include "executor/tstoreReceiver.h" #include "lib/stringinfo.h" +#include "nodes/nodeFuncs.h" #include "nodes/plannodes.h" #include "nodes/primnodes.h" #include "nodes/print.h" @@ -73,6 +74,7 @@ #include "distributed/placement_connection.h" #include "distributed/recursive_planning.h" #include "distributed/remote_commands.h" +#include "distributed/subplan_execution.h" #include "distributed/tuple_destination.h" #include "distributed/tuplestore.h" #include "distributed/version_compat.h" @@ -83,6 +85,7 @@ bool ExplainDistributedQueries = true; bool ExplainAllTasks = false; int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME; +extern MemoryContext SubPlanExplainAnalyzeContext; /* * If enabled, EXPLAIN ANALYZE output & other statistics of last worker task @@ -90,6 +93,11 @@ int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME; */ static char *SavedExplainPlan = NULL; static double SavedExecutionDurationMillisec = 0.0; +static double SavedExplainPlanNtuples = 0; +static double SavedExplainPlanNloops = 0; +extern SubPlanExplainOutputData *SubPlanExplainOutput; +uint8 TotalExplainOutputCapacity = 0; +uint8 NumTasksOutput = 0; /* struct to save explain flags */ typedef struct @@ -215,7 +223,8 @@ static const char * ExplainFormatStr(ExplainFormat format); #if PG_VERSION_NUM >= PG_VERSION_17 static const char * ExplainSerializeStr(ExplainSerializeOption serializeOption); #endif -static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest, +static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DistributedSubPlan *subPlan, + DestReceiver *dest, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, @@ -224,7 +233,9 @@ static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest, const BufferUsage *bufusage, const MemoryContextCounters *mem_counters, #endif - double *executionDurationMillisec); + double *executionDurationMillisec, + double *executionTuples, + double *executionLoops); static ExplainFormat ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName, ExplainFormat defaultValue); #if PG_VERSION_NUM >= PG_VERSION_17 @@ -256,7 +267,8 @@ static double elapsed_time(instr_time *starttime); static void ExplainPropertyBytes(const char *qlabel, int64 bytes, ExplainState *es); static uint64 TaskReceivedTupleData(Task *task); static bool ShowReceivedTupleData(CitusScanState *scanState, ExplainState *es); - +static bool PlanStateAnalyzeWalker(PlanState *planState, void *ctx); +static void ExtractAnalyzeStats(DistributedSubPlan *subPlan, PlanState *planState); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(worker_last_saved_explain_analyze); @@ -432,6 +444,84 @@ NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors, } +/* + * ExtractAnalyzeStats parses the EXPLAIN ANALYZE output of the pre-executed + * subplans and injects the parsed statistics into queryDesc->planstate->instrument. + */ +static void +ExtractAnalyzeStats(DistributedSubPlan *subPlan, PlanState *planState) +{ + if (!planState) + { + return; + } + + Instrumentation *instr = planState->instrument; + if (!IsA(planState, CustomScanState)) + { + instr->ntuples = subPlan->ntuples; + instr->nloops = 1; /* subplan nodes are executed only once */ + return; + } + + Assert(IsA(planState, CustomScanState)); + + if (subPlan->numTasksOutput <= 0) + { + return; + } + + ListCell *lc; + int tasksOutput = 0; + double tasksNtuples = 0; + double tasksNloops = 0; + memset(instr, 0, sizeof(Instrumentation)); + DistributedPlan *newdistributedPlan = + ((CitusScanState *) planState)->distributedPlan; + + /* + * Inject the earlier executed results—extracted from the workers' EXPLAIN output— + * into the newly created tasks. + */ + foreach(lc, newdistributedPlan->workerJob->taskList) + { + Task *task = (Task *) lfirst(lc); + uint32 taskId = task->taskId; + + if (tasksOutput > subPlan->numTasksOutput) + { + break; + } + + if (!subPlan->totalExplainOutput[taskId].explainOutput) + { + continue; + } + + /* + * Now feed the earlier saved output, which will be used + * by RemoteExplain() when printing tasks + */ + MemoryContext taskContext = GetMemoryChunkContext(task); + task->totalReceivedTupleData = + subPlan->totalExplainOutput[taskId].totalReceivedTupleData; + task->fetchedExplainAnalyzeExecutionDuration = + subPlan->totalExplainOutput[taskId].executionDuration; + task->fetchedExplainAnalyzePlan = + MemoryContextStrdup(taskContext, + subPlan->totalExplainOutput[taskId].explainOutput); + tasksNtuples += subPlan->totalExplainOutput[taskId].executionNtuples; + tasksNloops = subPlan->totalExplainOutput[taskId].executionNloops; + + subPlan->totalExplainOutput[taskId].explainOutput = NULL; + tasksOutput++; + } + + instr->ntuples = tasksNtuples; + instr->nloops = tasksNloops; +} + + /* * ExplainSubPlans generates EXPLAIN output for subplans for CTEs * and complex subqueries. Because the planning for these queries @@ -450,7 +540,6 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) { DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell); PlannedStmt *plan = subPlan->plan; - IntoClause *into = NULL; ParamListInfo params = NULL; /* @@ -534,6 +623,11 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) ExplainOpenGroup("PlannedStmt", "PlannedStmt", false, es); + DestReceiver *dest = None_Receiver; /* No query execution */ + double executionDurationMillisec = 0.0; + double executionTuples = 0; + double executionLoops = 0; + /* Capture memory stats on PG17+ */ #if PG_VERSION_NUM >= PG_VERSION_17 if (es->memory) @@ -541,31 +635,21 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) MemoryContextSwitchTo(saved_ctx); MemoryContextMemConsumed(planner_ctx, &mem_counters); } -#endif -#if PG_VERSION_NUM >= PG_VERSION_17 - ExplainOnePlan( - plan, - into, - es, - queryString, - params, - NULL, /* QueryEnvironment *queryEnv */ - &planduration, - (es->buffers ? &bufusage : NULL), - (es->memory ? &mem_counters : NULL) - ); + /* Execute EXPLAIN without ANALYZE */ + ExplainWorkerPlan(plan, subPlan, dest, es, queryString, params, NULL, + &planduration, + (es->buffers ? &bufusage : NULL), + (es->memory ? &mem_counters : NULL), + &executionDurationMillisec, + &executionTuples, + &executionLoops); #else - ExplainOnePlan( - plan, - into, - es, - queryString, - params, - NULL, /* QueryEnvironment *queryEnv */ - &planduration, - (es->buffers ? &bufusage : NULL) - ); + + /* Execute EXPLAIN without ANALYZE */ + ExplainWorkerPlan(plan, subPlan, dest, es, queryString, params, NULL, + &planduration, &executionDurationMillisec, + &executionTuples, &executionLoops); #endif ExplainCloseGroup("PlannedStmt", "PlannedStmt", false, es); @@ -1236,17 +1320,19 @@ worker_last_saved_explain_analyze(PG_FUNCTION_ARGS) if (SavedExplainPlan != NULL) { int columnCount = tupleDescriptor->natts; - if (columnCount != 2) + if (columnCount != 4) { - ereport(ERROR, (errmsg("expected 3 output columns in definition of " + ereport(ERROR, (errmsg("expected 4 output columns in definition of " "worker_last_saved_explain_analyze, but got %d", columnCount))); } - bool columnNulls[2] = { false }; - Datum columnValues[2] = { + bool columnNulls[4] = { false }; + Datum columnValues[4] = { CStringGetTextDatum(SavedExplainPlan), - Float8GetDatum(SavedExecutionDurationMillisec) + Float8GetDatum(SavedExecutionDurationMillisec), + Float8GetDatum(SavedExplainPlanNtuples), + Float8GetDatum(SavedExplainPlanNloops) }; tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls); @@ -1267,6 +1353,8 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) text *queryText = PG_GETARG_TEXT_P(0); char *queryString = text_to_cstring(queryText); double executionDurationMillisec = 0.0; + double executionTuples = 0; + double executionLoops = 0; Datum explainOptions = PG_GETARG_DATUM(1); ExplainState *es = NewExplainState(); @@ -1383,16 +1471,19 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) } /* do the actual EXPLAIN ANALYZE */ - ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL, + ExplainWorkerPlan(plan, NULL, tupleStoreDest, es, queryString, boundParams, NULL, &planDuration, (es->buffers ? &bufusage : NULL), (es->memory ? &mem_counters : NULL), - &executionDurationMillisec); + &executionDurationMillisec, + &executionTuples, + &executionLoops); #else /* do the actual EXPLAIN ANALYZE */ - ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL, - &planDuration, &executionDurationMillisec); + ExplainWorkerPlan(plan, NULL, tupleStoreDest, es, queryString, boundParams, NULL, + &planDuration, &executionDurationMillisec, + &executionTuples, &executionLoops); #endif ExplainEndOutput(es); @@ -1403,6 +1494,8 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) SavedExplainPlan = pstrdup(es->str->data); SavedExecutionDurationMillisec = executionDurationMillisec; + SavedExplainPlanNtuples = executionTuples; + SavedExplainPlanNloops = executionLoops; MemoryContextSwitchTo(oldContext); @@ -1632,11 +1725,13 @@ CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest) tupleDestination->originalTask = task; tupleDestination->originalTaskDestination = taskDest; - TupleDesc lastSavedExplainAnalyzeTupDesc = CreateTemplateTupleDesc(2); + TupleDesc lastSavedExplainAnalyzeTupDesc = CreateTemplateTupleDesc(4); TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 1, "explain analyze", TEXTOID, 0, 0); TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 2, "duration", FLOAT8OID, 0, 0); + TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 3, "ntuples", FLOAT8OID, 0, 0); + TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 4, "nloops", FLOAT8OID, 0, 0); tupleDestination->lastSavedExplainAnalyzeTupDesc = lastSavedExplainAnalyzeTupDesc; @@ -1647,6 +1742,51 @@ CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest) } +/* + * EnsureExplainOutputCapacity is to ensure capacity for new entries. Input + * parameter requiredSize is minimum number of elements needed. + */ +static void +EnsureExplainOutputCapacity(int requiredSize) +{ + if (requiredSize < TotalExplainOutputCapacity) + { + return; + } + + int newCapacity = + (TotalExplainOutputCapacity == 0) ? 32 : TotalExplainOutputCapacity * 2; + + while (newCapacity <= requiredSize) + { + newCapacity *= 2; + } + + if (SubPlanExplainOutput == NULL) + { + SubPlanExplainOutput = + (SubPlanExplainOutputData *) MemoryContextAllocZero( + SubPlanExplainAnalyzeContext, + newCapacity * + sizeof(SubPlanExplainOutputData)); + } + else + { + /* Use repalloc and manually zero the new memory */ + int oldSize = TotalExplainOutputCapacity * sizeof(SubPlanExplainOutputData); + int newSize = newCapacity * sizeof(SubPlanExplainOutputData); + + SubPlanExplainOutput = + (SubPlanExplainOutputData *) repalloc(SubPlanExplainOutput, newSize); + + /* Zero out the newly allocated memory */ + MemSet((char *) SubPlanExplainOutput + oldSize, 0, newSize - oldSize); + } + + TotalExplainOutputCapacity = newCapacity; +} + + /* * ExplainAnalyzeDestPutTuple implements TupleDestination->putTuple * for ExplainAnalyzeDestination. @@ -1656,6 +1796,8 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, int placementIndex, int queryNumber, HeapTuple heapTuple, uint64 tupleLibpqSize) { + uint32 taskId = task->taskId; + ExplainAnalyzeDestination *tupleDestination = (ExplainAnalyzeDestination *) self; if (queryNumber == 0) { @@ -1663,6 +1805,13 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, originalTupDest->putTuple(originalTupDest, task, placementIndex, 0, heapTuple, tupleLibpqSize); tupleDestination->originalTask->totalReceivedTupleData += tupleLibpqSize; + + if (SubPlanExplainAnalyzeContext) + { + EnsureExplainOutputCapacity(taskId + 1); + SubPlanExplainOutput[taskId].totalReceivedTupleData = + tupleDestination->originalTask->totalReceivedTupleData; + } } else if (queryNumber == 1) { @@ -1678,6 +1827,8 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, } Datum executionDuration = heap_getattr(heapTuple, 2, tupDesc, &isNull); + Datum executionTuples = heap_getattr(heapTuple, 3, tupDesc, &isNull); + Datum executionLoops = heap_getattr(heapTuple, 4, tupDesc, &isNull); if (isNull) { @@ -1687,6 +1838,8 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, char *fetchedExplainAnalyzePlan = TextDatumGetCString(explainAnalyze); double fetchedExplainAnalyzeExecutionDuration = DatumGetFloat8(executionDuration); + double fetchedExplainAnalyzeTuples = DatumGetFloat8(executionTuples); + double fetchedExplainAnalyzeLoops = DatumGetFloat8(executionLoops); /* * Allocate fetchedExplainAnalyzePlan in the same context as the Task, since we are @@ -1712,6 +1865,20 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, placementIndex; tupleDestination->originalTask->fetchedExplainAnalyzeExecutionDuration = fetchedExplainAnalyzeExecutionDuration; + + /* We should build tupleDestination in subPlan similar to the above */ + if (SubPlanExplainAnalyzeContext) + { + EnsureExplainOutputCapacity(taskId + 1); + SubPlanExplainOutput[taskId].explainOutput = + MemoryContextStrdup(SubPlanExplainAnalyzeContext, + fetchedExplainAnalyzePlan); + SubPlanExplainOutput[taskId].executionDuration = + fetchedExplainAnalyzeExecutionDuration; + SubPlanExplainOutput[taskId].executionNtuples = fetchedExplainAnalyzeTuples; + SubPlanExplainOutput[taskId].executionNloops = fetchedExplainAnalyzeLoops; + NumTasksOutput++; + } } else { @@ -1774,7 +1941,14 @@ ExplainAnalyzeDestTupleDescForQuery(TupleDestination *self, int queryNumber) bool RequestedForExplainAnalyze(CitusScanState *node) { - return (node->customScanState.ss.ps.state->es_instrument != 0); + /* + * When running a distributed plan—either the root plan or a subplan’s + * distributed fragment—we need to know if we’re under EXPLAIN ANALYZE. + * Subplans can’t receive the EXPLAIN ANALYZE flag directly, so we use + * SubPlanExplainAnalyzeContext as a flag to indicate that context. + */ + return (node->customScanState.ss.ps.state->es_instrument != 0) || + (SubPlanLevel > 0 && SubPlanExplainAnalyzeContext); } @@ -1933,7 +2107,8 @@ FetchPlanQueryForExplainAnalyze(const char *queryString, ParamListInfo params) } appendStringInfoString(fetchQuery, - "SELECT explain_analyze_output, execution_duration " + "SELECT explain_analyze_output, execution_duration, " + "execution_ntuples, execution_nloops " "FROM worker_last_saved_explain_analyze()"); return fetchQuery->data; @@ -2105,6 +2280,20 @@ ExplainOneQuery(Query *query, int cursorOptions, } +/* + * PlanStateAnalyzeWalker Tree walker callback that visits each PlanState node in the + * plan tree and extracts analyze statistics from CustomScanState tasks using + * ExtractAnalyzeStats. Always returns false to recurse into all children. + */ +static bool +PlanStateAnalyzeWalker(PlanState *planState, void *ctx) +{ + DistributedSubPlan *subplan = (DistributedSubPlan *) ctx; + ExtractAnalyzeStats(subplan, planState); + return false; +} + + /* * ExplainWorkerPlan produces explain output into es. If es->analyze, it also executes * the given plannedStmt and sends the results to dest. It puts total time to execute in @@ -2119,20 +2308,25 @@ ExplainOneQuery(Query *query, int cursorOptions, * destination. */ static void -ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es, +ExplainWorkerPlan(PlannedStmt *plannedstmt, DistributedSubPlan *subPlan, DestReceiver *dest, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, #if PG_VERSION_NUM >= PG_VERSION_17 const BufferUsage *bufusage, const MemoryContextCounters *mem_counters, #endif - double *executionDurationMillisec) + double *executionDurationMillisec, + double *executionTuples, + double *executionLoops) { QueryDesc *queryDesc; instr_time starttime; double totaltime = 0; int eflags; int instrument_option = 0; + /* Sub-plan already executed; skipping execution */ + bool executeQuery = (es->analyze && !subPlan); + bool executeSubplan = (es->analyze && subPlan); Assert(plannedstmt->commandType != CMD_UTILITY); @@ -2174,7 +2368,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es ); /* Select execution options */ - if (es->analyze) + if (executeQuery) eflags = 0; /* default run-to-completion flags */ else eflags = EXEC_FLAG_EXPLAIN_ONLY; @@ -2183,7 +2377,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es ExecutorStart(queryDesc, eflags); /* Execute the plan for statistics if asked for */ - if (es->analyze) + if (executeQuery) { ScanDirection dir = ForwardScanDirection; @@ -2206,6 +2400,12 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es ExplainOpenGroup("Query", NULL, true, es); + if (executeSubplan) + { + ExtractAnalyzeStats(subPlan, queryDesc->planstate); + planstate_tree_walker(queryDesc->planstate, PlanStateAnalyzeWalker, (void *) subPlan); + } + /* Create textual dump of plan tree */ ExplainPrintPlan(es, queryDesc); @@ -2278,6 +2478,13 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es */ INSTR_TIME_SET_CURRENT(starttime); + if (executeQuery) + { + Instrumentation *instr = queryDesc->planstate->instrument; + *executionTuples = instr->ntuples; + *executionLoops = instr->nloops; + } + ExecutorEnd(queryDesc); FreeQueryDesc(queryDesc); @@ -2285,7 +2492,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es PopActiveSnapshot(); /* We need a CCI just in case query expanded to multiple plans */ - if (es->analyze) + if (executeQuery) CommandCounterIncrement(); totaltime += elapsed_time(&starttime); diff --git a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql index 0373d3c40..2f507eb24 100644 --- a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql +++ b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql @@ -1,2 +1,3 @@ -- citus--13.1-1--13.2-1 -- bump version to 13.2-1 +#include "udfs/worker_last_saved_explain_analyze/13.2-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql index 6f4ecd1ef..2212600f4 100644 --- a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql @@ -1,2 +1,3 @@ -- citus--13.2-1--13.1-1 -- downgrade version to 13.1-1 +#include "../udfs/worker_last_saved_explain_analyze/9.4-1.sql" diff --git a/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/13.2-1.sql b/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/13.2-1.sql new file mode 100644 index 000000000..805dc83cc --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/13.2-1.sql @@ -0,0 +1,10 @@ + +DROP FUNCTION pg_catalog.worker_last_saved_explain_analyze(); + +CREATE OR REPLACE FUNCTION pg_catalog.worker_last_saved_explain_analyze() + RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION, + execution_ntuples DOUBLE PRECISION, execution_nloops DOUBLE PRECISION) + LANGUAGE C STRICT + AS 'citus'; +COMMENT ON FUNCTION pg_catalog.worker_last_saved_explain_analyze() IS + 'Returns the saved explain analyze output for the last run query'; diff --git a/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/9.4-1.sql b/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/9.4-1.sql index 17a5a15c5..037a17b92 100644 --- a/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/9.4-1.sql +++ b/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/9.4-1.sql @@ -1,4 +1,6 @@ +DROP FUNCTION IF EXISTS pg_catalog.worker_last_saved_explain_analyze(); + CREATE OR REPLACE FUNCTION pg_catalog.worker_last_saved_explain_analyze() RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION) LANGUAGE C STRICT diff --git a/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/latest.sql b/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/latest.sql index 17a5a15c5..805dc83cc 100644 --- a/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/latest.sql +++ b/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/latest.sql @@ -1,6 +1,9 @@ +DROP FUNCTION pg_catalog.worker_last_saved_explain_analyze(); + CREATE OR REPLACE FUNCTION pg_catalog.worker_last_saved_explain_analyze() - RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION) + RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION, + execution_ntuples DOUBLE PRECISION, execution_nloops DOUBLE PRECISION) LANGUAGE C STRICT AS 'citus'; COMMENT ON FUNCTION pg_catalog.worker_last_saved_explain_analyze() IS diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 51716cff3..aca376df9 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -147,6 +147,31 @@ CopyNodeDistributedSubPlan(COPYFUNC_ARGS) COPY_SCALAR_FIELD(subPlanId); COPY_NODE_FIELD(plan); + COPY_SCALAR_FIELD(bytesSentPerWorker); + COPY_SCALAR_FIELD(remoteWorkerCount); + COPY_SCALAR_FIELD(durationMillisecs); + COPY_SCALAR_FIELD(writeLocalFile); + + if (newnode->totalExplainOutput) + { + MemSet(newnode->totalExplainOutput, 0, sizeof(newnode->totalExplainOutput)); + } + + /* copy each SubPlanExplainOutput element */ + for (int i = 0; i < from->numTasksOutput; i++) + { + /* copy the explainOutput string pointer */ + COPY_STRING_FIELD(totalExplainOutput[i].explainOutput); + + /* copy the executionDuration (double) */ + COPY_SCALAR_FIELD(totalExplainOutput[i].executionDuration); + + /* copy the totalReceivedTupleData (uint64) */ + COPY_SCALAR_FIELD(totalExplainOutput[i].totalReceivedTupleData); + } + + COPY_SCALAR_FIELD(numTasksOutput); + COPY_SCALAR_FIELD(ntuples); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 751063789..c19b0c3d4 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -215,6 +215,48 @@ OutDistributedSubPlan(OUTFUNC_ARGS) WRITE_UINT_FIELD(subPlanId); WRITE_NODE_FIELD(plan); + WRITE_UINT64_FIELD(bytesSentPerWorker); + WRITE_INT_FIELD(remoteWorkerCount); + WRITE_FLOAT_FIELD(durationMillisecs, "%.2f"); + WRITE_BOOL_FIELD(writeLocalFile); + + appendStringInfoString(str, " totalExplainOutput ["); + for (int i = 0; i < node->numTasksOutput; i++) + { + const SubPlanExplainOutputData *e = &node->totalExplainOutput[i]; + + /* skip empty slots */ + if (e->explainOutput == NULL && + e->executionDuration == 0 + && e->totalReceivedTupleData == 0) + { + continue; + } + + if (i > 0) + { + appendStringInfoChar(str, ' '); + } + + appendStringInfoChar(str, '('); + + /* string pointer – prints quoted or NULL */ + WRITE_STRING_FIELD(totalExplainOutput[i].explainOutput); + + /* double field */ + WRITE_FLOAT_FIELD(totalExplainOutput[i].executionDuration, "%.2f"); + + /* 64-bit unsigned – use the uint64 macro */ + WRITE_UINT64_FIELD(totalExplainOutput[i].totalReceivedTupleData); + + appendStringInfoChar(str, ')'); + } + + appendStringInfoChar(str, ']'); + + WRITE_INT_FIELD(numTasksOutput); + WRITE_FLOAT_FIELD(ntuples, "%.2f"); + } void diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 6708d9a64..b0b0288de 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -146,8 +146,8 @@ extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamLis DestReceiver *dest); extern void ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *dest); -extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, - DestReceiver *dest); +extern uint64 ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, + DestReceiver *dest); extern void SetLocalMultiShardModifyModeToSequential(void); extern void EnsureSequentialMode(ObjectType objType); extern void SetLocalForceMaxQueryParallelization(void); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index e5ec2205d..1040b4149 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -490,6 +490,24 @@ typedef struct DistributedPlan } DistributedPlan; +/* + * SubPlanExplainOutputData Holds the EXPLAIN ANALYZE output and collected + * statistics for a single task executed by a worker during distributed + * query execution. + * explainOutput — raw EXPLAIN ANALYZE output for the task + * executionDuration — wall‑clock time taken to run the task + * totalReceivedTupleData — total bytes of tuple data received from the worker + */ +typedef struct SubPlanExplainOutputData +{ + char *explainOutput; + double executionDuration; + double executionNtuples; + double executionNloops; + uint64 totalReceivedTupleData; +} SubPlanExplainOutputData; + + /* * DistributedSubPlan contains a subplan of a distributed plan. Subplans are * executed before the distributed query and their results are written to @@ -508,6 +526,9 @@ typedef struct DistributedSubPlan uint32 remoteWorkerCount; double durationMillisecs; bool writeLocalFile; + SubPlanExplainOutputData *totalExplainOutput; + uint32 numTasksOutput; /* actual size of the above array */ + double ntuples; /* total tuples produced */ } DistributedSubPlan; diff --git a/src/include/distributed/subplan_execution.h b/src/include/distributed/subplan_execution.h index d68db43ce..045e77bc6 100644 --- a/src/include/distributed/subplan_execution.h +++ b/src/include/distributed/subplan_execution.h @@ -17,7 +17,7 @@ extern int MaxIntermediateResult; extern int SubPlanLevel; -extern void ExecuteSubPlans(DistributedPlan *distributedPlan); +extern void ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled); /** * IntermediateResultsHashEntry is used to store which nodes need to receive diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index bfcf29c4d..49027b217 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -2492,15 +2492,15 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) -> Distributed Subplan XXX_1 Intermediate Data Size: 100 bytes Result destination: Write locally - -> Custom Scan (Citus Adaptive) (actual rows=20 loops=1) + -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 - Tuple data received from nodes: 160 bytes + Tuple data received from nodes: 80 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 64 bytes + Tuple data received from node: 32 bytes Node: host=localhost port=xxxxx dbname=regression - -> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1) - -> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1) + -> Insert on dist_table_570017 citus_table_alias (actual rows=4 loops=1) + -> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1) Filter: (a IS NOT NULL) -> Distributed Subplan XXX_2 Intermediate Data Size: 150 bytes @@ -3228,6 +3228,159 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1) -> Update on tbl_570036 tbl (actual rows=0 loops=1) -> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1) Filter: (a = 1) +-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212) +SET search_path TO multi_explain; +CREATE TABLE test_subplans (x int primary key, y int); +SELECT create_distributed_table('test_subplans','x'); + +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + -> Distributed Subplan XXX_1 + Intermediate Data Size: 18 bytes + Result destination: Write locally + -> Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 16 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 16 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Insert on test_subplans_570038 (actual rows=1 loops=1) + -> Result (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 8 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 8 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1) +-- Only one row must exist +SELECT * FROM test_subplans; +1|2 +-- Will fail with duplicate pk +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +ERROR: duplicate key value violates unique constraint "test_subplans_pkey_570038" +DETAIL: Key (x)=(1) already exists. +CONTEXT: while executing command on localhost:xxxxx +-- Test JSON format +TRUNCATE test_subplans; +EXPLAIN (FORMAT JSON, COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +[ + { + "Plan": { + "Node Type": "Custom Scan", + "Custom Plan Provider": "Citus Adaptive", + "Parallel Aware": false, + "Async Capable": false, + "Actual Rows": 1, + "Actual Loops": 1, + "Distributed Query": { + "Subplans": [ + { + "Intermediate Data Size": "18 bytes", + "Result destination": "Write locally", + "PlannedStmt": [ + { + "Plan": { + "Node Type": "Custom Scan", + "Custom Plan Provider": "Citus Adaptive", + "Parallel Aware": false, + "Async Capable": false, + "Actual Rows": 1, + "Actual Loops": 1, + "Distributed Query": { + "Job": { + "Task Count": 1, + "Tuple data received from nodes": "16 bytes", + "Tasks Shown": "All", + "Tasks": [ + { + "Tuple data received from node": "16 bytes", + "Node": "host=localhost port=xxxxx dbname=regression", + "Remote Plan": [ + [ + { + "Plan": { + "Node Type": "ModifyTable", + "Operation": "Insert", + "Parallel Aware": false, + "Async Capable": false, + "Relation Name": "test_subplans_570038", + "Alias": "test_subplans_570038", + "Actual Rows": 1, + "Actual Loops": 1, + "Plans": [ + { + "Node Type": "Result", + "Parent Relationship": "Outer", + "Parallel Aware": false, + "Async Capable": false, + "Actual Rows": 1, + "Actual Loops": 1 + } + ] + }, + "Triggers": [ + ] + } + ] + + ] + } + ] + } + } + }, + "Triggers": [ + ] + } + ] + } + ], + "Job": { + "Task Count": 1, + "Tuple data received from nodes": "8 bytes", + "Tasks Shown": "All", + "Tasks": [ + { + "Tuple data received from node": "8 bytes", + "Node": "host=localhost port=xxxxx dbname=regression", + "Remote Plan": [ + [ + { + "Plan": { + "Node Type": "Function Scan", + "Parallel Aware": false, + "Async Capable": false, + "Function Name": "read_intermediate_result", + "Alias": "intermediate_result", + "Actual Rows": 1, + "Actual Loops": 1 + }, + "Triggers": [ + ] + } + ] + + ] + } + ] + } + } + }, + "Triggers": [ + ] + } +] +-- Only one row must exist +SELECT * FROM test_subplans; +1|2 -- check when auto explain + analyze is enabled, we do not allow local execution. CREATE SCHEMA test_auto_explain; SET search_path TO 'test_auto_explain'; diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out index 4d3acd14d..00a8309a9 100644 --- a/src/test/regress/expected/multi_explain_0.out +++ b/src/test/regress/expected/multi_explain_0.out @@ -2484,15 +2484,15 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) -> Distributed Subplan XXX_1 Intermediate Data Size: 100 bytes Result destination: Write locally - -> Custom Scan (Citus Adaptive) (actual rows=20 loops=1) + -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 - Tuple data received from nodes: 160 bytes + Tuple data received from nodes: 80 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 64 bytes + Tuple data received from node: 32 bytes Node: host=localhost port=xxxxx dbname=regression - -> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1) - -> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1) + -> Insert on dist_table_570017 citus_table_alias (actual rows=4 loops=1) + -> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1) Filter: (a IS NOT NULL) -> Distributed Subplan XXX_2 Intermediate Data Size: 150 bytes @@ -3217,6 +3217,159 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1) -> Update on tbl_570036 tbl (actual rows=0 loops=1) -> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1) Filter: (a = 1) +-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212) +SET search_path TO multi_explain; +CREATE TABLE test_subplans (x int primary key, y int); +SELECT create_distributed_table('test_subplans','x'); + +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + -> Distributed Subplan XXX_1 + Intermediate Data Size: 18 bytes + Result destination: Write locally + -> Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 16 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 16 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Insert on test_subplans_570038 (actual rows=1 loops=1) + -> Result (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 8 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 8 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1) +-- Only one row must exist +SELECT * FROM test_subplans; +1|2 +-- Will fail with duplicate pk +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +ERROR: duplicate key value violates unique constraint "test_subplans_pkey_570038" +DETAIL: Key (x)=(1) already exists. +CONTEXT: while executing command on localhost:xxxxx +-- Test JSON format +TRUNCATE test_subplans; +EXPLAIN (FORMAT JSON, COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +[ + { + "Plan": { + "Node Type": "Custom Scan", + "Custom Plan Provider": "Citus Adaptive", + "Parallel Aware": false, + "Async Capable": false, + "Actual Rows": 1, + "Actual Loops": 1, + "Distributed Query": { + "Subplans": [ + { + "Intermediate Data Size": "18 bytes", + "Result destination": "Write locally", + "PlannedStmt": [ + { + "Plan": { + "Node Type": "Custom Scan", + "Custom Plan Provider": "Citus Adaptive", + "Parallel Aware": false, + "Async Capable": false, + "Actual Rows": 1, + "Actual Loops": 1, + "Distributed Query": { + "Job": { + "Task Count": 1, + "Tuple data received from nodes": "16 bytes", + "Tasks Shown": "All", + "Tasks": [ + { + "Tuple data received from node": "16 bytes", + "Node": "host=localhost port=xxxxx dbname=regression", + "Remote Plan": [ + [ + { + "Plan": { + "Node Type": "ModifyTable", + "Operation": "Insert", + "Parallel Aware": false, + "Async Capable": false, + "Relation Name": "test_subplans_570038", + "Alias": "test_subplans_570038", + "Actual Rows": 1, + "Actual Loops": 1, + "Plans": [ + { + "Node Type": "Result", + "Parent Relationship": "Outer", + "Parallel Aware": false, + "Async Capable": false, + "Actual Rows": 1, + "Actual Loops": 1 + } + ] + }, + "Triggers": [ + ] + } + ] + + ] + } + ] + } + } + }, + "Triggers": [ + ] + } + ] + } + ], + "Job": { + "Task Count": 1, + "Tuple data received from nodes": "8 bytes", + "Tasks Shown": "All", + "Tasks": [ + { + "Tuple data received from node": "8 bytes", + "Node": "host=localhost port=xxxxx dbname=regression", + "Remote Plan": [ + [ + { + "Plan": { + "Node Type": "Function Scan", + "Parallel Aware": false, + "Async Capable": false, + "Function Name": "read_intermediate_result", + "Alias": "intermediate_result", + "Actual Rows": 1, + "Actual Loops": 1 + }, + "Triggers": [ + ] + } + ] + + ] + } + ] + } + } + }, + "Triggers": [ + ] + } +] +-- Only one row must exist +SELECT * FROM test_subplans; +1|2 -- check when auto explain + analyze is enabled, we do not allow local execution. CREATE SCHEMA test_auto_explain; SET search_path TO 'test_auto_explain'; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 4e8e927f4..defe41f0d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1503,7 +1503,9 @@ ALTER EXTENSION citus UPDATE TO '13.2-1'; SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- -(0 rows) + function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision) | + | function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision, execution_ntuples double precision, execution_nloops double precision) +(2 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/stat_counters.out b/src/test/regress/expected/stat_counters.out index a27eb3241..25327d4f7 100644 --- a/src/test/regress/expected/stat_counters.out +++ b/src/test/regress/expected/stat_counters.out @@ -721,13 +721,11 @@ CALL exec_query_and_check_query_counters($$ 0, 0 ); -- same with explain analyze --- --- this time, query_execution_multi_shard is incremented twice because of #4212 CALL exec_query_and_check_query_counters($$ EXPLAIN (ANALYZE) SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q $$, - 1, 2 + 1, 1 ); CALL exec_query_and_check_query_counters($$ DELETE FROM dist_table WHERE a = 1 @@ -1041,9 +1039,6 @@ PL/pgSQL function exec_query_and_check_query_counters(text,bigint,bigint) line X -- A similar one but without the insert, so we would normally expect 2 increments -- for query_execution_single_shard and 2 for query_execution_multi_shard instead -- of 3 since the insert is not there anymore. --- --- But this time we observe more counter increments because we execute the subplans --- twice because of #4212. CALL exec_query_and_check_query_counters($$ EXPLAIN (ANALYZE) -- single-shard subplan (whole cte) @@ -1057,7 +1052,7 @@ CALL exec_query_and_check_query_counters($$ FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q) JOIN cte ON q.a = cte.a $$, - 3, 4 + 2, 2 ); -- safe to push-down CALL exec_query_and_check_query_counters($$ diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index 65ca6f5da..c6502fec8 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -1166,6 +1166,32 @@ PREPARE q2(int_wrapper_type) AS WITH a AS (UPDATE tbl SET b = $1 WHERE a = 1 RET EXPLAIN (COSTS false) EXECUTE q2('(1)'); EXPLAIN :default_analyze_flags EXECUTE q2('(1)'); +-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212) +SET search_path TO multi_explain; +CREATE TABLE test_subplans (x int primary key, y int); +SELECT create_distributed_table('test_subplans','x'); + +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; + +-- Only one row must exist +SELECT * FROM test_subplans; + +-- Will fail with duplicate pk +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; + +-- Test JSON format +TRUNCATE test_subplans; +EXPLAIN (FORMAT JSON, COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; + +-- Only one row must exist +SELECT * FROM test_subplans; + -- check when auto explain + analyze is enabled, we do not allow local execution. CREATE SCHEMA test_auto_explain; SET search_path TO 'test_auto_explain'; diff --git a/src/test/regress/sql/stat_counters.sql b/src/test/regress/sql/stat_counters.sql index 3376ba6c7..18f4b8aac 100644 --- a/src/test/regress/sql/stat_counters.sql +++ b/src/test/regress/sql/stat_counters.sql @@ -476,13 +476,11 @@ CALL exec_query_and_check_query_counters($$ ); -- same with explain analyze --- --- this time, query_execution_multi_shard is incremented twice because of #4212 CALL exec_query_and_check_query_counters($$ EXPLAIN (ANALYZE) SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q $$, - 1, 2 + 1, 1 ); CALL exec_query_and_check_query_counters($$ @@ -807,9 +805,6 @@ CALL exec_query_and_check_query_counters($$ -- A similar one but without the insert, so we would normally expect 2 increments -- for query_execution_single_shard and 2 for query_execution_multi_shard instead -- of 3 since the insert is not there anymore. --- --- But this time we observe more counter increments because we execute the subplans --- twice because of #4212. CALL exec_query_and_check_query_counters($$ EXPLAIN (ANALYZE) -- single-shard subplan (whole cte) @@ -823,7 +818,7 @@ CALL exec_query_and_check_query_counters($$ FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q) JOIN cte ON q.a = cte.a $$, - 3, 4 + 2, 2 ); -- safe to push-down From c183634207e522818f00f4a257f83a0f1b439f5e Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Thu, 31 Jul 2025 13:30:12 +0300 Subject: [PATCH 4/5] Move "DROP FUNCTION" for older version of UDF to correct file (#8085) We never update an older version of a SQL object for consistency across release tags, so this commit moves "DROP FUNCTION .." for the older version of "pg_catalog.worker_last_saved_explain_analyze();" to the appropriate migration script. See https://github.com/citusdata/citus/pull/8017. --- .../distributed/sql/downgrades/citus--13.2-1--13.1-1.sql | 2 ++ .../sql/udfs/worker_last_saved_explain_analyze/9.4-1.sql | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql index 2212600f4..de26b790a 100644 --- a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql @@ -1,3 +1,5 @@ -- citus--13.2-1--13.1-1 -- downgrade version to 13.1-1 + +DROP FUNCTION IF EXISTS pg_catalog.worker_last_saved_explain_analyze(); #include "../udfs/worker_last_saved_explain_analyze/9.4-1.sql" diff --git a/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/9.4-1.sql b/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/9.4-1.sql index 037a17b92..17a5a15c5 100644 --- a/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/9.4-1.sql +++ b/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/9.4-1.sql @@ -1,6 +1,4 @@ -DROP FUNCTION IF EXISTS pg_catalog.worker_last_saved_explain_analyze(); - CREATE OR REPLACE FUNCTION pg_catalog.worker_last_saved_explain_analyze() RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION) LANGUAGE C STRICT From f0789bd388be193962b32161989c48e8cd3afa8c Mon Sep 17 00:00:00 2001 From: manaldush Date: Tue, 5 Aug 2025 13:03:35 +0300 Subject: [PATCH 5/5] Fix memory corruptions that could happen when a Citus downgrade is followed by an upgrade (#7950) DESCRIPTION: Fixes potential memory corruptions that could happen when a Citus downgrade is followed by a Citus upgrade. In case of citus downgrade and further upgrade citus crash with core dump. The reason is that citus hardcoded number of columns in pg_dist_partition table, but in case of downgrade and following update table can have more columns, and some of then can be marked as dropped. Patch suggest decision for this problem with using tupleDescriptor->nattrs(postgres internal approach). Fixes #7933. --------- Co-authored-by: Onur Tirtir --- .../distributed/cdc/cdc_decoder_utils.c | 10 +- .../distributed/metadata/metadata_cache.c | 54 +++++--- .../distributed/metadata/metadata_sync.c | 8 +- .../distributed/metadata/metadata_utility.c | 115 ++++++++++++------ .../distributed/utils/colocation_utils.c | 31 +++-- src/include/distributed/metadata_utility.h | 1 + 6 files changed, 149 insertions(+), 70 deletions(-) diff --git a/src/backend/distributed/cdc/cdc_decoder_utils.c b/src/backend/distributed/cdc/cdc_decoder_utils.c index b571d18b9..9053d1b68 100644 --- a/src/backend/distributed/cdc/cdc_decoder_utils.c +++ b/src/backend/distributed/cdc/cdc_decoder_utils.c @@ -346,12 +346,12 @@ CdcIsReferenceTableViaCatalog(Oid relationId) return false; } - Datum datumArray[Natts_pg_dist_partition]; - bool isNullArray[Natts_pg_dist_partition]; - Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); if (isNullArray[Anum_pg_dist_partition_partmethod - 1] || @@ -363,6 +363,8 @@ CdcIsReferenceTableViaCatalog(Oid relationId) */ heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return false; } @@ -374,6 +376,8 @@ CdcIsReferenceTableViaCatalog(Oid relationId) heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); /* * A table is a reference table when its partition method is 'none' diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 79cc61092..8fd39d3b7 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -729,12 +729,13 @@ PartitionMethodViaCatalog(Oid relationId) return DISTRIBUTE_BY_INVALID; } - Datum datumArray[Natts_pg_dist_partition]; - bool isNullArray[Natts_pg_dist_partition]; - Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); if (isNullArray[Anum_pg_dist_partition_partmethod - 1]) @@ -742,6 +743,8 @@ PartitionMethodViaCatalog(Oid relationId) /* partition method cannot be NULL, still let's make sure */ heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return DISTRIBUTE_BY_INVALID; } @@ -750,6 +753,8 @@ PartitionMethodViaCatalog(Oid relationId) heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return partitionMethodChar; } @@ -768,12 +773,12 @@ PartitionColumnViaCatalog(Oid relationId) return NULL; } - Datum datumArray[Natts_pg_dist_partition]; - bool isNullArray[Natts_pg_dist_partition]; - Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); if (isNullArray[Anum_pg_dist_partition_partkey - 1]) @@ -781,6 +786,8 @@ PartitionColumnViaCatalog(Oid relationId) /* partition key cannot be NULL, still let's make sure */ heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return NULL; } @@ -795,6 +802,8 @@ PartitionColumnViaCatalog(Oid relationId) heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return partitionColumn; } @@ -813,12 +822,13 @@ ColocationIdViaCatalog(Oid relationId) return INVALID_COLOCATION_ID; } - Datum datumArray[Natts_pg_dist_partition]; - bool isNullArray[Natts_pg_dist_partition]; - Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); if (isNullArray[Anum_pg_dist_partition_colocationid - 1]) @@ -826,6 +836,8 @@ ColocationIdViaCatalog(Oid relationId) /* colocation id cannot be NULL, still let's make sure */ heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return INVALID_COLOCATION_ID; } @@ -834,6 +846,8 @@ ColocationIdViaCatalog(Oid relationId) heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return colocationId; } @@ -1741,10 +1755,11 @@ BuildCitusTableCacheEntry(Oid relationId) } MemoryContext oldContext = NULL; - Datum datumArray[Natts_pg_dist_partition]; - bool isNullArray[Natts_pg_dist_partition]; TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(distPartitionTuple, tupleDescriptor, datumArray, isNullArray); CitusTableCacheEntry *cacheEntry = @@ -1797,7 +1812,7 @@ BuildCitusTableCacheEntry(Oid relationId) cacheEntry->replicationModel = DatumGetChar(replicationModelDatum); } - if (isNullArray[Anum_pg_dist_partition_autoconverted - 1]) + if (isNullArray[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)]) { /* * We don't expect this to happen, but set it to false (the default value) @@ -1808,7 +1823,7 @@ BuildCitusTableCacheEntry(Oid relationId) else { cacheEntry->autoConverted = DatumGetBool( - datumArray[Anum_pg_dist_partition_autoconverted - 1]); + datumArray[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)]); } heap_freetuple(distPartitionTuple); @@ -1852,6 +1867,9 @@ BuildCitusTableCacheEntry(Oid relationId) table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); + cacheEntry->isValid = true; return cacheEntry; @@ -5011,10 +5029,13 @@ CitusTableTypeIdList(CitusTableType citusTableType) TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); HeapTuple heapTuple = systable_getnext(scanDescriptor); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); while (HeapTupleIsValid(heapTuple)) { - bool isNullArray[Natts_pg_dist_partition]; - Datum datumArray[Natts_pg_dist_partition]; + memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum)); + memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); Datum partMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1]; @@ -5038,6 +5059,9 @@ CitusTableTypeIdList(CitusTableType citusTableType) heapTuple = systable_getnext(scanDescriptor); } + pfree(datumArray); + pfree(isNullArray); + systable_endscan(scanDescriptor); table_close(pgDistPartition, AccessShareLock); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index f73856169..e3b655ab0 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -573,13 +573,17 @@ FetchRelationIdFromPgPartitionHeapTuple(HeapTuple heapTuple, TupleDesc tupleDesc { Assert(heapTuple->t_tableOid == DistPartitionRelationId()); - bool isNullArray[Natts_pg_dist_partition]; - Datum datumArray[Natts_pg_dist_partition]; + Datum *datumArray = (Datum *) palloc(tupleDesc->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDesc->natts * sizeof(bool)); + heap_deform_tuple(heapTuple, tupleDesc, datumArray, isNullArray); Datum relationIdDatum = datumArray[Anum_pg_dist_partition_logicalrelid - 1]; Oid relationId = DatumGetObjectId(relationIdDatum); + pfree(datumArray); + pfree(isNullArray); + return relationId; } diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 0c3dbbda3..2b8bd0d1c 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -812,6 +812,7 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, { partitionedShardNames = lappend(partitionedShardNames, quotedShardName); } + /* for non-partitioned tables, we will use Postgres' size functions */ else { @@ -1919,23 +1920,22 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod, { char *distributionColumnString = NULL; - Datum newValues[Natts_pg_dist_partition]; - bool newNulls[Natts_pg_dist_partition]; - /* open system catalog and insert new tuple */ Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + + Datum *newValues = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *newNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); /* form new tuple for pg_dist_partition */ - memset(newValues, 0, sizeof(newValues)); - memset(newNulls, false, sizeof(newNulls)); - newValues[Anum_pg_dist_partition_logicalrelid - 1] = ObjectIdGetDatum(relationId); newValues[Anum_pg_dist_partition_partmethod - 1] = CharGetDatum(distributionMethod); newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel); - newValues[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted); + newValues[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)] = + BoolGetDatum(autoConverted); /* set partkey column to NULL for reference tables */ if (distributionMethod != DISTRIBUTE_BY_NONE) @@ -1951,7 +1951,7 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod, newNulls[Anum_pg_dist_partition_partkey - 1] = true; } - HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, + HeapTuple newTuple = heap_form_tuple(tupleDescriptor, newValues, newNulls); /* finally insert tuple, build index entries & register cache invalidation */ @@ -1963,6 +1963,9 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod, CommandCounterIncrement(); table_close(pgDistPartition, NoLock); + + pfree(newValues); + pfree(newNulls); } @@ -2154,13 +2157,13 @@ UpdatePlacementGroupId(uint64 placementId, int groupId) ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; - Datum values[Natts_pg_dist_placement]; - bool isnull[Natts_pg_dist_placement]; - bool replace[Natts_pg_dist_placement]; bool colIsNull = false; Relation pgDistPlacement = table_open(DistPlacementRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_placementid, BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(placementId)); @@ -2177,8 +2180,6 @@ UpdatePlacementGroupId(uint64 placementId, int groupId) placementId))); } - memset(replace, 0, sizeof(replace)); - values[Anum_pg_dist_placement_groupid - 1] = Int32GetDatum(groupId); isnull[Anum_pg_dist_placement_groupid - 1] = false; replace[Anum_pg_dist_placement_groupid - 1] = true; @@ -2197,6 +2198,10 @@ UpdatePlacementGroupId(uint64 placementId, int groupId) systable_endscan(scanDescriptor); table_close(pgDistPlacement, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -2210,12 +2215,13 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted) ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; - Datum values[Natts_pg_dist_partition]; - bool isnull[Natts_pg_dist_partition]; - bool replace[Natts_pg_dist_partition]; Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(citusTableId)); @@ -2231,11 +2237,10 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted) citusTableId))); } - memset(replace, 0, sizeof(replace)); - - values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted); - isnull[Anum_pg_dist_partition_autoconverted - 1] = false; - replace[Anum_pg_dist_partition_autoconverted - 1] = true; + int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); + values[autoconvertedindex] = BoolGetDatum(autoConverted); + isnull[autoconvertedindex] = false; + replace[autoconvertedindex] = true; heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); @@ -2247,6 +2252,10 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted) systable_endscan(scanDescriptor); table_close(pgDistPartition, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -2286,12 +2295,13 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; - Datum values[Natts_pg_dist_partition]; - bool isnull[Natts_pg_dist_partition]; - bool replace[Natts_pg_dist_partition]; Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); @@ -2307,8 +2317,6 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut relationId))); } - memset(replace, 0, sizeof(replace)); - replace[Anum_pg_dist_partition_partmethod - 1] = true; values[Anum_pg_dist_partition_partmethod - 1] = CharGetDatum(distributionMethod); isnull[Anum_pg_dist_partition_partmethod - 1] = false; @@ -2317,9 +2325,10 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); isnull[Anum_pg_dist_partition_colocationid - 1] = false; - replace[Anum_pg_dist_partition_autoconverted - 1] = true; - values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(false); - isnull[Anum_pg_dist_partition_autoconverted - 1] = false; + int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); + replace[autoconvertedindex] = true; + values[autoconvertedindex] = BoolGetDatum(false); + isnull[autoconvertedindex] = false; char *distributionColumnString = nodeToString((Node *) distributionColumn); @@ -2337,6 +2346,10 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut systable_endscan(scanDescriptor); table_close(pgDistPartition, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -2380,12 +2393,13 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; - Datum values[Natts_pg_dist_partition]; - bool isnull[Natts_pg_dist_partition]; - bool replace[Natts_pg_dist_partition]; Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); @@ -2401,8 +2415,6 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca relationId))); } - memset(replace, 0, sizeof(replace)); - values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); isnull[Anum_pg_dist_partition_colocationid - 1] = false; replace[Anum_pg_dist_partition_colocationid - 1] = true; @@ -2411,9 +2423,10 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca isnull[Anum_pg_dist_partition_repmodel - 1] = false; replace[Anum_pg_dist_partition_repmodel - 1] = true; - values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted); - isnull[Anum_pg_dist_partition_autoconverted - 1] = false; - replace[Anum_pg_dist_partition_autoconverted - 1] = true; + int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); + values[autoconvertedindex] = BoolGetDatum(autoConverted); + isnull[autoconvertedindex] = false; + replace[autoconvertedindex] = true; heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); @@ -2424,6 +2437,10 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca systable_endscan(scanDescriptor); table_close(pgDistPartition, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -3149,8 +3166,8 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC values[Anum_pg_dist_background_task_nodes_involved - 1] = IntArrayToDatum(nodesInvolvedCount, nodesInvolved); - nulls[Anum_pg_dist_background_task_nodes_involved - 1] = (nodesInvolvedCount == - 0); + nulls[Anum_pg_dist_background_task_nodes_involved - 1] = + (nodesInvolvedCount == 0); HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask), values, nulls); @@ -4420,3 +4437,23 @@ UnblockDependingBackgroundTasks(BackgroundTask *task) table_close(pgDistBackgroundTasksDepend, NoLock); } + + +/* + * GetAutoConvertedAttrIndexInPgDistPartition returns attrnum for autoconverted attr. + * + * autoconverted attr was added to table pg_dist_partition using alter operation after + * the version where Citus started supporting downgrades, and it's only column that we've + * introduced to pg_dist_partition since then. + * + * And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than + * Natts_pg_dist_partition and when this happens, then we know that attrnum autoconverted is + * not Anum_pg_dist_partition_autoconverted anymore but tupleDesc->natts - 1. + */ +int +GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc) +{ + return TupleDescSize(tupleDesc) == Natts_pg_dist_partition + ? (Anum_pg_dist_partition_autoconverted - 1) + : tupleDesc->natts - 1; +} diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 5f031b2b5..af507d5b9 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -815,13 +815,14 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId, bool indexOK = true; int scanKeyCount = 1; ScanKeyData scanKey[1]; - Datum values[Natts_pg_dist_partition]; - bool isNull[Natts_pg_dist_partition]; - bool replace[Natts_pg_dist_partition]; Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isNull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId)); @@ -838,10 +839,6 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId, distributedRelationName))); } - memset(values, 0, sizeof(values)); - memset(isNull, false, sizeof(isNull)); - memset(replace, false, sizeof(replace)); - values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); isNull[Anum_pg_dist_partition_colocationid - 1] = false; replace[Anum_pg_dist_partition_colocationid - 1] = true; @@ -858,6 +855,10 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId, systable_endscan(scanDescriptor); table_close(pgDistPartition, NoLock); + pfree(values); + pfree(isNull); + pfree(replace); + bool shouldSyncMetadata = ShouldSyncTableMetadata(distributedRelationId); if (shouldSyncMetadata && !localOnly) { @@ -998,10 +999,12 @@ ColocationGroupTableList(uint32 colocationId, uint32 count) indexOK, NULL, scanKeyCount, scanKey); HeapTuple heapTuple = systable_getnext(scanDescriptor); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); while (HeapTupleIsValid(heapTuple)) { - bool isNullArray[Natts_pg_dist_partition]; - Datum datumArray[Natts_pg_dist_partition]; + memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum)); + memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); Oid colocatedTableId = DatumGetObjectId( datumArray[Anum_pg_dist_partition_logicalrelid - 1]); @@ -1020,6 +1023,8 @@ ColocationGroupTableList(uint32 colocationId, uint32 count) break; } } + pfree(datumArray); + pfree(isNullArray); systable_endscan(scanDescriptor); table_close(pgDistPartition, AccessShareLock); @@ -1192,10 +1197,12 @@ ColocatedTableId(int32 colocationId) indexOK, NULL, scanKeyCount, scanKey); HeapTuple heapTuple = systable_getnext(scanDescriptor); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); while (HeapTupleIsValid(heapTuple)) { - bool isNullArray[Natts_pg_dist_partition]; - Datum datumArray[Natts_pg_dist_partition]; + memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum)); + memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); colocatedTableId = DatumGetObjectId( datumArray[Anum_pg_dist_partition_logicalrelid - 1]); @@ -1223,6 +1230,8 @@ ColocatedTableId(int32 colocationId) heapTuple = systable_getnext(scanDescriptor); } + pfree(datumArray); + pfree(isNullArray); systable_endscan(scanDescriptor); table_close(pgDistPartition, AccessShareLock); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 38c13eb51..a507138d2 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -466,4 +466,5 @@ extern bool IsBackgroundJobStatusTerminal(BackgroundJobStatus status); extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status); extern Oid BackgroundJobStatusOid(BackgroundJobStatus status); extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status); +extern int GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDEsc); #endif /* METADATA_UTILITY_H */