diff --git a/src/backend/distributed/cdc/cdc_decoder_utils.c b/src/backend/distributed/cdc/cdc_decoder_utils.c index b571d18b9..9053d1b68 100644 --- a/src/backend/distributed/cdc/cdc_decoder_utils.c +++ b/src/backend/distributed/cdc/cdc_decoder_utils.c @@ -346,12 +346,12 @@ CdcIsReferenceTableViaCatalog(Oid relationId) return false; } - Datum datumArray[Natts_pg_dist_partition]; - bool isNullArray[Natts_pg_dist_partition]; - Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); if (isNullArray[Anum_pg_dist_partition_partmethod - 1] || @@ -363,6 +363,8 @@ CdcIsReferenceTableViaCatalog(Oid relationId) */ heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return false; } @@ -374,6 +376,8 @@ CdcIsReferenceTableViaCatalog(Oid relationId) heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); /* * A table is a reference table when its partition method is 'none' diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 895f01ae7..677535591 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -760,7 +760,7 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState) */ LockPartitionsForDistributedPlan(distributedPlan); - ExecuteSubPlans(distributedPlan); + ExecuteSubPlans(distributedPlan, RequestedForExplainAnalyze(scanState)); scanState->finishedPreScan = true; } @@ -3804,7 +3804,7 @@ PopAssignedPlacementExecution(WorkerSession *session) /* - * PopAssignedPlacementExecution finds an executable task from the queue of assigned tasks. + * PopUnAssignedPlacementExecution finds an executable task from the queue of unassigned tasks. */ static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool) diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 9ed1962fa..58c172c66 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -42,6 +42,7 @@ #include "distributed/merge_planner.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" +#include "distributed/multi_explain.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" @@ -121,7 +122,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node) bool binaryFormat = CanUseBinaryCopyFormatForTargetList(selectQuery->targetList); - ExecuteSubPlans(distSelectPlan); + ExecuteSubPlans(distSelectPlan, RequestedForExplainAnalyze(scanState)); /* * We have a separate directory for each transaction, so choosing diff --git a/src/backend/distributed/executor/merge_executor.c b/src/backend/distributed/executor/merge_executor.c index d0f01dcf2..56bde62bc 100644 --- a/src/backend/distributed/executor/merge_executor.c +++ b/src/backend/distributed/executor/merge_executor.c @@ -23,6 +23,7 @@ #include "distributed/merge_executor.h" #include "distributed/merge_planner.h" #include "distributed/multi_executor.h" +#include "distributed/multi_explain.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_router_planner.h" #include "distributed/repartition_executor.h" @@ -132,7 +133,7 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState) ereport(DEBUG1, (errmsg("Executing subplans of the source query and " "storing the results at the respective node(s)"))); - ExecuteSubPlans(distSourcePlan); + ExecuteSubPlans(distSourcePlan, RequestedForExplainAnalyze(scanState)); /* * We have a separate directory for each transaction, so choosing diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index dba302e7c..eb6bdf111 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -688,7 +688,7 @@ ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *d * ExecutePlanIntoDestReceiver executes a query plan and sends results to the given * DestReceiver. */ -void +uint64 ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, DestReceiver *dest) { @@ -713,6 +713,8 @@ ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, PortalStart(portal, params, eflags, GetActiveSnapshot()); + QueryCompletion qc = { 0 }; + #if PG_VERSION_NUM >= PG_VERSION_18 /* PG 18+: six-arg signature (drop the run_once bool) */ @@ -721,7 +723,7 @@ ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, false, /* isTopLevel */ dest, /* DestReceiver *dest */ dest, /* DestReceiver *altdest */ - NULL); /* QueryCompletion *qc */ + &qc); /* QueryCompletion *qc */ #else /* PG 17-: original seven-arg signature */ @@ -731,10 +733,12 @@ ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, true, /* run_once */ dest, /* DestReceiver *dest */ dest, /* DestReceiver *altdest */ - NULL); /* QueryCompletion *qc */ + &qc); /* QueryCompletion *qc */ #endif PortalDrop(portal, false); + + return qc.nprocessed; } diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c index ef2838343..108d130ec 100644 --- a/src/backend/distributed/executor/subplan_execution.c +++ b/src/backend/distributed/executor/subplan_execution.c @@ -30,13 +30,22 @@ int MaxIntermediateResult = 1048576; /* maximum size in KB the intermediate resu /* when this is true, we enforce intermediate result size limit in all executors */ int SubPlanLevel = 0; +/* + * SubPlanExplainAnalyzeContext is both a memory context for storing + * subplans’ EXPLAIN ANALYZE output and a flag indicating that execution + * is running under EXPLAIN ANALYZE for subplans. + */ +MemoryContext SubPlanExplainAnalyzeContext = NULL; +SubPlanExplainOutputData *SubPlanExplainOutput; +extern uint8 TotalExplainOutputCapacity; +extern uint8 NumTasksOutput; /* * ExecuteSubPlans executes a list of subplans from a distributed plan * by sequentially executing each plan from the top. */ void -ExecuteSubPlans(DistributedPlan *distributedPlan) +ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled) { uint64 planId = distributedPlan->planId; List *subPlanList = distributedPlan->subPlanList; @@ -47,6 +56,19 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) return; } + /* + * If the root DistributedPlan has EXPLAIN ANALYZE enabled, + * its subplans should also have EXPLAIN ANALYZE enabled. + */ + if (explainAnalyzeEnabled) + { + SubPlanExplainAnalyzeContext = GetMemoryChunkContext(distributedPlan); + } + else + { + SubPlanExplainAnalyzeContext = NULL; + } + HTAB *intermediateResultsHash = MakeIntermediateResultHTAB(); RecordSubplanExecutionsOnNodes(intermediateResultsHash, distributedPlan); @@ -79,7 +101,23 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) TimestampTz startTimestamp = GetCurrentTimestamp(); - ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest); + uint64 nprocessed; + + PG_TRY(); + { + nprocessed = + ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest); + } + PG_CATCH(); + { + SubPlanExplainAnalyzeContext = NULL; + SubPlanExplainOutput = NULL; + TotalExplainOutputCapacity = 0; + NumTasksOutput = 0; + PG_RE_THROW(); + } + PG_END_TRY(); + /* * EXPLAIN ANALYZE instrumentations. Calculating these are very light-weight, @@ -94,10 +132,24 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) subPlan->durationMillisecs += durationMicrosecs * MICRO_TO_MILLI_SECOND; subPlan->bytesSentPerWorker = RemoteFileDestReceiverBytesSent(copyDest); + subPlan->ntuples = nprocessed; subPlan->remoteWorkerCount = list_length(remoteWorkerNodeList); subPlan->writeLocalFile = entry->writeLocalFile; SubPlanLevel--; + + /* + * Save the EXPLAIN ANALYZE output(s) for later extraction in ExplainSubPlans(). + * Because the SubPlan context isn’t available during distributed execution, + * pass the pointer as a global variable in SubPlanExplainOutput. + */ + subPlan->totalExplainOutput = SubPlanExplainOutput; + subPlan->numTasksOutput = NumTasksOutput; + SubPlanExplainOutput = NULL; + TotalExplainOutputCapacity = 0; + NumTasksOutput = 0; FreeExecutorState(estate); } + + SubPlanExplainAnalyzeContext = NULL; } diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 79cc61092..8fd39d3b7 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -729,12 +729,13 @@ PartitionMethodViaCatalog(Oid relationId) return DISTRIBUTE_BY_INVALID; } - Datum datumArray[Natts_pg_dist_partition]; - bool isNullArray[Natts_pg_dist_partition]; - Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); if (isNullArray[Anum_pg_dist_partition_partmethod - 1]) @@ -742,6 +743,8 @@ PartitionMethodViaCatalog(Oid relationId) /* partition method cannot be NULL, still let's make sure */ heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return DISTRIBUTE_BY_INVALID; } @@ -750,6 +753,8 @@ PartitionMethodViaCatalog(Oid relationId) heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return partitionMethodChar; } @@ -768,12 +773,12 @@ PartitionColumnViaCatalog(Oid relationId) return NULL; } - Datum datumArray[Natts_pg_dist_partition]; - bool isNullArray[Natts_pg_dist_partition]; - Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); if (isNullArray[Anum_pg_dist_partition_partkey - 1]) @@ -781,6 +786,8 @@ PartitionColumnViaCatalog(Oid relationId) /* partition key cannot be NULL, still let's make sure */ heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return NULL; } @@ -795,6 +802,8 @@ PartitionColumnViaCatalog(Oid relationId) heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return partitionColumn; } @@ -813,12 +822,13 @@ ColocationIdViaCatalog(Oid relationId) return INVALID_COLOCATION_ID; } - Datum datumArray[Natts_pg_dist_partition]; - bool isNullArray[Natts_pg_dist_partition]; - Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); if (isNullArray[Anum_pg_dist_partition_colocationid - 1]) @@ -826,6 +836,8 @@ ColocationIdViaCatalog(Oid relationId) /* colocation id cannot be NULL, still let's make sure */ heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return INVALID_COLOCATION_ID; } @@ -834,6 +846,8 @@ ColocationIdViaCatalog(Oid relationId) heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return colocationId; } @@ -1741,10 +1755,11 @@ BuildCitusTableCacheEntry(Oid relationId) } MemoryContext oldContext = NULL; - Datum datumArray[Natts_pg_dist_partition]; - bool isNullArray[Natts_pg_dist_partition]; TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(distPartitionTuple, tupleDescriptor, datumArray, isNullArray); CitusTableCacheEntry *cacheEntry = @@ -1797,7 +1812,7 @@ BuildCitusTableCacheEntry(Oid relationId) cacheEntry->replicationModel = DatumGetChar(replicationModelDatum); } - if (isNullArray[Anum_pg_dist_partition_autoconverted - 1]) + if (isNullArray[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)]) { /* * We don't expect this to happen, but set it to false (the default value) @@ -1808,7 +1823,7 @@ BuildCitusTableCacheEntry(Oid relationId) else { cacheEntry->autoConverted = DatumGetBool( - datumArray[Anum_pg_dist_partition_autoconverted - 1]); + datumArray[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)]); } heap_freetuple(distPartitionTuple); @@ -1852,6 +1867,9 @@ BuildCitusTableCacheEntry(Oid relationId) table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); + cacheEntry->isValid = true; return cacheEntry; @@ -5011,10 +5029,13 @@ CitusTableTypeIdList(CitusTableType citusTableType) TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); HeapTuple heapTuple = systable_getnext(scanDescriptor); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); while (HeapTupleIsValid(heapTuple)) { - bool isNullArray[Natts_pg_dist_partition]; - Datum datumArray[Natts_pg_dist_partition]; + memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum)); + memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); Datum partMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1]; @@ -5038,6 +5059,9 @@ CitusTableTypeIdList(CitusTableType citusTableType) heapTuple = systable_getnext(scanDescriptor); } + pfree(datumArray); + pfree(isNullArray); + systable_endscan(scanDescriptor); table_close(pgDistPartition, AccessShareLock); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index f73856169..e3b655ab0 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -573,13 +573,17 @@ FetchRelationIdFromPgPartitionHeapTuple(HeapTuple heapTuple, TupleDesc tupleDesc { Assert(heapTuple->t_tableOid == DistPartitionRelationId()); - bool isNullArray[Natts_pg_dist_partition]; - Datum datumArray[Natts_pg_dist_partition]; + Datum *datumArray = (Datum *) palloc(tupleDesc->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDesc->natts * sizeof(bool)); + heap_deform_tuple(heapTuple, tupleDesc, datumArray, isNullArray); Datum relationIdDatum = datumArray[Anum_pg_dist_partition_logicalrelid - 1]; Oid relationId = DatumGetObjectId(relationIdDatum); + pfree(datumArray); + pfree(isNullArray); + return relationId; } diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 0c3dbbda3..2b8bd0d1c 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -812,6 +812,7 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, { partitionedShardNames = lappend(partitionedShardNames, quotedShardName); } + /* for non-partitioned tables, we will use Postgres' size functions */ else { @@ -1919,23 +1920,22 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod, { char *distributionColumnString = NULL; - Datum newValues[Natts_pg_dist_partition]; - bool newNulls[Natts_pg_dist_partition]; - /* open system catalog and insert new tuple */ Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + + Datum *newValues = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *newNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); /* form new tuple for pg_dist_partition */ - memset(newValues, 0, sizeof(newValues)); - memset(newNulls, false, sizeof(newNulls)); - newValues[Anum_pg_dist_partition_logicalrelid - 1] = ObjectIdGetDatum(relationId); newValues[Anum_pg_dist_partition_partmethod - 1] = CharGetDatum(distributionMethod); newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel); - newValues[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted); + newValues[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)] = + BoolGetDatum(autoConverted); /* set partkey column to NULL for reference tables */ if (distributionMethod != DISTRIBUTE_BY_NONE) @@ -1951,7 +1951,7 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod, newNulls[Anum_pg_dist_partition_partkey - 1] = true; } - HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, + HeapTuple newTuple = heap_form_tuple(tupleDescriptor, newValues, newNulls); /* finally insert tuple, build index entries & register cache invalidation */ @@ -1963,6 +1963,9 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod, CommandCounterIncrement(); table_close(pgDistPartition, NoLock); + + pfree(newValues); + pfree(newNulls); } @@ -2154,13 +2157,13 @@ UpdatePlacementGroupId(uint64 placementId, int groupId) ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; - Datum values[Natts_pg_dist_placement]; - bool isnull[Natts_pg_dist_placement]; - bool replace[Natts_pg_dist_placement]; bool colIsNull = false; Relation pgDistPlacement = table_open(DistPlacementRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_placementid, BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(placementId)); @@ -2177,8 +2180,6 @@ UpdatePlacementGroupId(uint64 placementId, int groupId) placementId))); } - memset(replace, 0, sizeof(replace)); - values[Anum_pg_dist_placement_groupid - 1] = Int32GetDatum(groupId); isnull[Anum_pg_dist_placement_groupid - 1] = false; replace[Anum_pg_dist_placement_groupid - 1] = true; @@ -2197,6 +2198,10 @@ UpdatePlacementGroupId(uint64 placementId, int groupId) systable_endscan(scanDescriptor); table_close(pgDistPlacement, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -2210,12 +2215,13 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted) ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; - Datum values[Natts_pg_dist_partition]; - bool isnull[Natts_pg_dist_partition]; - bool replace[Natts_pg_dist_partition]; Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(citusTableId)); @@ -2231,11 +2237,10 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted) citusTableId))); } - memset(replace, 0, sizeof(replace)); - - values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted); - isnull[Anum_pg_dist_partition_autoconverted - 1] = false; - replace[Anum_pg_dist_partition_autoconverted - 1] = true; + int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); + values[autoconvertedindex] = BoolGetDatum(autoConverted); + isnull[autoconvertedindex] = false; + replace[autoconvertedindex] = true; heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); @@ -2247,6 +2252,10 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted) systable_endscan(scanDescriptor); table_close(pgDistPartition, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -2286,12 +2295,13 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; - Datum values[Natts_pg_dist_partition]; - bool isnull[Natts_pg_dist_partition]; - bool replace[Natts_pg_dist_partition]; Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); @@ -2307,8 +2317,6 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut relationId))); } - memset(replace, 0, sizeof(replace)); - replace[Anum_pg_dist_partition_partmethod - 1] = true; values[Anum_pg_dist_partition_partmethod - 1] = CharGetDatum(distributionMethod); isnull[Anum_pg_dist_partition_partmethod - 1] = false; @@ -2317,9 +2325,10 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); isnull[Anum_pg_dist_partition_colocationid - 1] = false; - replace[Anum_pg_dist_partition_autoconverted - 1] = true; - values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(false); - isnull[Anum_pg_dist_partition_autoconverted - 1] = false; + int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); + replace[autoconvertedindex] = true; + values[autoconvertedindex] = BoolGetDatum(false); + isnull[autoconvertedindex] = false; char *distributionColumnString = nodeToString((Node *) distributionColumn); @@ -2337,6 +2346,10 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut systable_endscan(scanDescriptor); table_close(pgDistPartition, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -2380,12 +2393,13 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; - Datum values[Natts_pg_dist_partition]; - bool isnull[Natts_pg_dist_partition]; - bool replace[Natts_pg_dist_partition]; Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); @@ -2401,8 +2415,6 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca relationId))); } - memset(replace, 0, sizeof(replace)); - values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); isnull[Anum_pg_dist_partition_colocationid - 1] = false; replace[Anum_pg_dist_partition_colocationid - 1] = true; @@ -2411,9 +2423,10 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca isnull[Anum_pg_dist_partition_repmodel - 1] = false; replace[Anum_pg_dist_partition_repmodel - 1] = true; - values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted); - isnull[Anum_pg_dist_partition_autoconverted - 1] = false; - replace[Anum_pg_dist_partition_autoconverted - 1] = true; + int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); + values[autoconvertedindex] = BoolGetDatum(autoConverted); + isnull[autoconvertedindex] = false; + replace[autoconvertedindex] = true; heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); @@ -2424,6 +2437,10 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca systable_endscan(scanDescriptor); table_close(pgDistPartition, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -3149,8 +3166,8 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC values[Anum_pg_dist_background_task_nodes_involved - 1] = IntArrayToDatum(nodesInvolvedCount, nodesInvolved); - nulls[Anum_pg_dist_background_task_nodes_involved - 1] = (nodesInvolvedCount == - 0); + nulls[Anum_pg_dist_background_task_nodes_involved - 1] = + (nodesInvolvedCount == 0); HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask), values, nulls); @@ -4420,3 +4437,23 @@ UnblockDependingBackgroundTasks(BackgroundTask *task) table_close(pgDistBackgroundTasksDepend, NoLock); } + + +/* + * GetAutoConvertedAttrIndexInPgDistPartition returns attrnum for autoconverted attr. + * + * autoconverted attr was added to table pg_dist_partition using alter operation after + * the version where Citus started supporting downgrades, and it's only column that we've + * introduced to pg_dist_partition since then. + * + * And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than + * Natts_pg_dist_partition and when this happens, then we know that attrnum autoconverted is + * not Anum_pg_dist_partition_autoconverted anymore but tupleDesc->natts - 1. + */ +int +GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc) +{ + return TupleDescSize(tupleDesc) == Natts_pg_dist_partition + ? (Anum_pg_dist_partition_autoconverted - 1) + : tupleDesc->natts - 1; +} diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 6c7a98587..2412a88a2 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -2930,7 +2930,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); - CatalogTupleInsert(pgDistNode, heapTuple); + CATALOG_INSERT_WITH_SNAPSHOT(pgDistNode, heapTuple); CitusInvalidateRelcacheByRelid(DistNodeRelationId()); diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index f357663a6..4d27939f7 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -26,6 +26,7 @@ #include "commands/tablecmds.h" #include "executor/tstoreReceiver.h" #include "lib/stringinfo.h" +#include "nodes/nodeFuncs.h" #include "nodes/plannodes.h" #include "nodes/primnodes.h" #include "nodes/print.h" @@ -73,6 +74,7 @@ #include "distributed/placement_connection.h" #include "distributed/recursive_planning.h" #include "distributed/remote_commands.h" +#include "distributed/subplan_execution.h" #include "distributed/tuple_destination.h" #include "distributed/tuplestore.h" #include "distributed/version_compat.h" @@ -83,6 +85,7 @@ bool ExplainDistributedQueries = true; bool ExplainAllTasks = false; int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME; +extern MemoryContext SubPlanExplainAnalyzeContext; /* * If enabled, EXPLAIN ANALYZE output & other statistics of last worker task @@ -90,6 +93,11 @@ int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME; */ static char *SavedExplainPlan = NULL; static double SavedExecutionDurationMillisec = 0.0; +static double SavedExplainPlanNtuples = 0; +static double SavedExplainPlanNloops = 0; +extern SubPlanExplainOutputData *SubPlanExplainOutput; +uint8 TotalExplainOutputCapacity = 0; +uint8 NumTasksOutput = 0; /* struct to save explain flags */ typedef struct @@ -215,7 +223,8 @@ static const char * ExplainFormatStr(ExplainFormat format); #if PG_VERSION_NUM >= PG_VERSION_17 static const char * ExplainSerializeStr(ExplainSerializeOption serializeOption); #endif -static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest, +static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DistributedSubPlan *subPlan, + DestReceiver *dest, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, @@ -224,7 +233,9 @@ static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest, const BufferUsage *bufusage, const MemoryContextCounters *mem_counters, #endif - double *executionDurationMillisec); + double *executionDurationMillisec, + double *executionTuples, + double *executionLoops); static ExplainFormat ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName, ExplainFormat defaultValue); #if PG_VERSION_NUM >= PG_VERSION_17 @@ -256,7 +267,8 @@ static double elapsed_time(instr_time *starttime); static void ExplainPropertyBytes(const char *qlabel, int64 bytes, ExplainState *es); static uint64 TaskReceivedTupleData(Task *task); static bool ShowReceivedTupleData(CitusScanState *scanState, ExplainState *es); - +static bool PlanStateAnalyzeWalker(PlanState *planState, void *ctx); +static void ExtractAnalyzeStats(DistributedSubPlan *subPlan, PlanState *planState); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(worker_last_saved_explain_analyze); @@ -432,6 +444,84 @@ NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors, } +/* + * ExtractAnalyzeStats parses the EXPLAIN ANALYZE output of the pre-executed + * subplans and injects the parsed statistics into queryDesc->planstate->instrument. + */ +static void +ExtractAnalyzeStats(DistributedSubPlan *subPlan, PlanState *planState) +{ + if (!planState) + { + return; + } + + Instrumentation *instr = planState->instrument; + if (!IsA(planState, CustomScanState)) + { + instr->ntuples = subPlan->ntuples; + instr->nloops = 1; /* subplan nodes are executed only once */ + return; + } + + Assert(IsA(planState, CustomScanState)); + + if (subPlan->numTasksOutput <= 0) + { + return; + } + + ListCell *lc; + int tasksOutput = 0; + double tasksNtuples = 0; + double tasksNloops = 0; + memset(instr, 0, sizeof(Instrumentation)); + DistributedPlan *newdistributedPlan = + ((CitusScanState *) planState)->distributedPlan; + + /* + * Inject the earlier executed results—extracted from the workers' EXPLAIN output— + * into the newly created tasks. + */ + foreach(lc, newdistributedPlan->workerJob->taskList) + { + Task *task = (Task *) lfirst(lc); + uint32 taskId = task->taskId; + + if (tasksOutput > subPlan->numTasksOutput) + { + break; + } + + if (!subPlan->totalExplainOutput[taskId].explainOutput) + { + continue; + } + + /* + * Now feed the earlier saved output, which will be used + * by RemoteExplain() when printing tasks + */ + MemoryContext taskContext = GetMemoryChunkContext(task); + task->totalReceivedTupleData = + subPlan->totalExplainOutput[taskId].totalReceivedTupleData; + task->fetchedExplainAnalyzeExecutionDuration = + subPlan->totalExplainOutput[taskId].executionDuration; + task->fetchedExplainAnalyzePlan = + MemoryContextStrdup(taskContext, + subPlan->totalExplainOutput[taskId].explainOutput); + tasksNtuples += subPlan->totalExplainOutput[taskId].executionNtuples; + tasksNloops = subPlan->totalExplainOutput[taskId].executionNloops; + + subPlan->totalExplainOutput[taskId].explainOutput = NULL; + tasksOutput++; + } + + instr->ntuples = tasksNtuples; + instr->nloops = tasksNloops; +} + + /* * ExplainSubPlans generates EXPLAIN output for subplans for CTEs * and complex subqueries. Because the planning for these queries @@ -450,7 +540,6 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) { DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell); PlannedStmt *plan = subPlan->plan; - IntoClause *into = NULL; ParamListInfo params = NULL; /* @@ -534,6 +623,11 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) ExplainOpenGroup("PlannedStmt", "PlannedStmt", false, es); + DestReceiver *dest = None_Receiver; /* No query execution */ + double executionDurationMillisec = 0.0; + double executionTuples = 0; + double executionLoops = 0; + /* Capture memory stats on PG17+ */ #if PG_VERSION_NUM >= PG_VERSION_17 if (es->memory) @@ -541,31 +635,21 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) MemoryContextSwitchTo(saved_ctx); MemoryContextMemConsumed(planner_ctx, &mem_counters); } -#endif -#if PG_VERSION_NUM >= PG_VERSION_17 - ExplainOnePlan( - plan, - into, - es, - queryString, - params, - NULL, /* QueryEnvironment *queryEnv */ - &planduration, - (es->buffers ? &bufusage : NULL), - (es->memory ? &mem_counters : NULL) - ); + /* Execute EXPLAIN without ANALYZE */ + ExplainWorkerPlan(plan, subPlan, dest, es, queryString, params, NULL, + &planduration, + (es->buffers ? &bufusage : NULL), + (es->memory ? &mem_counters : NULL), + &executionDurationMillisec, + &executionTuples, + &executionLoops); #else - ExplainOnePlan( - plan, - into, - es, - queryString, - params, - NULL, /* QueryEnvironment *queryEnv */ - &planduration, - (es->buffers ? &bufusage : NULL) - ); + + /* Execute EXPLAIN without ANALYZE */ + ExplainWorkerPlan(plan, subPlan, dest, es, queryString, params, NULL, + &planduration, &executionDurationMillisec, + &executionTuples, &executionLoops); #endif ExplainCloseGroup("PlannedStmt", "PlannedStmt", false, es); @@ -1236,17 +1320,19 @@ worker_last_saved_explain_analyze(PG_FUNCTION_ARGS) if (SavedExplainPlan != NULL) { int columnCount = tupleDescriptor->natts; - if (columnCount != 2) + if (columnCount != 4) { - ereport(ERROR, (errmsg("expected 3 output columns in definition of " + ereport(ERROR, (errmsg("expected 4 output columns in definition of " "worker_last_saved_explain_analyze, but got %d", columnCount))); } - bool columnNulls[2] = { false }; - Datum columnValues[2] = { + bool columnNulls[4] = { false }; + Datum columnValues[4] = { CStringGetTextDatum(SavedExplainPlan), - Float8GetDatum(SavedExecutionDurationMillisec) + Float8GetDatum(SavedExecutionDurationMillisec), + Float8GetDatum(SavedExplainPlanNtuples), + Float8GetDatum(SavedExplainPlanNloops) }; tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls); @@ -1267,6 +1353,8 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) text *queryText = PG_GETARG_TEXT_P(0); char *queryString = text_to_cstring(queryText); double executionDurationMillisec = 0.0; + double executionTuples = 0; + double executionLoops = 0; Datum explainOptions = PG_GETARG_DATUM(1); ExplainState *es = NewExplainState(); @@ -1383,16 +1471,19 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) } /* do the actual EXPLAIN ANALYZE */ - ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL, + ExplainWorkerPlan(plan, NULL, tupleStoreDest, es, queryString, boundParams, NULL, &planDuration, (es->buffers ? &bufusage : NULL), (es->memory ? &mem_counters : NULL), - &executionDurationMillisec); + &executionDurationMillisec, + &executionTuples, + &executionLoops); #else /* do the actual EXPLAIN ANALYZE */ - ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL, - &planDuration, &executionDurationMillisec); + ExplainWorkerPlan(plan, NULL, tupleStoreDest, es, queryString, boundParams, NULL, + &planDuration, &executionDurationMillisec, + &executionTuples, &executionLoops); #endif ExplainEndOutput(es); @@ -1403,6 +1494,8 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) SavedExplainPlan = pstrdup(es->str->data); SavedExecutionDurationMillisec = executionDurationMillisec; + SavedExplainPlanNtuples = executionTuples; + SavedExplainPlanNloops = executionLoops; MemoryContextSwitchTo(oldContext); @@ -1632,11 +1725,13 @@ CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest) tupleDestination->originalTask = task; tupleDestination->originalTaskDestination = taskDest; - TupleDesc lastSavedExplainAnalyzeTupDesc = CreateTemplateTupleDesc(2); + TupleDesc lastSavedExplainAnalyzeTupDesc = CreateTemplateTupleDesc(4); TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 1, "explain analyze", TEXTOID, 0, 0); TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 2, "duration", FLOAT8OID, 0, 0); + TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 3, "ntuples", FLOAT8OID, 0, 0); + TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 4, "nloops", FLOAT8OID, 0, 0); tupleDestination->lastSavedExplainAnalyzeTupDesc = lastSavedExplainAnalyzeTupDesc; @@ -1647,6 +1742,51 @@ CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest) } +/* + * EnsureExplainOutputCapacity is to ensure capacity for new entries. Input + * parameter requiredSize is minimum number of elements needed. + */ +static void +EnsureExplainOutputCapacity(int requiredSize) +{ + if (requiredSize < TotalExplainOutputCapacity) + { + return; + } + + int newCapacity = + (TotalExplainOutputCapacity == 0) ? 32 : TotalExplainOutputCapacity * 2; + + while (newCapacity <= requiredSize) + { + newCapacity *= 2; + } + + if (SubPlanExplainOutput == NULL) + { + SubPlanExplainOutput = + (SubPlanExplainOutputData *) MemoryContextAllocZero( + SubPlanExplainAnalyzeContext, + newCapacity * + sizeof(SubPlanExplainOutputData)); + } + else + { + /* Use repalloc and manually zero the new memory */ + int oldSize = TotalExplainOutputCapacity * sizeof(SubPlanExplainOutputData); + int newSize = newCapacity * sizeof(SubPlanExplainOutputData); + + SubPlanExplainOutput = + (SubPlanExplainOutputData *) repalloc(SubPlanExplainOutput, newSize); + + /* Zero out the newly allocated memory */ + MemSet((char *) SubPlanExplainOutput + oldSize, 0, newSize - oldSize); + } + + TotalExplainOutputCapacity = newCapacity; +} + + /* * ExplainAnalyzeDestPutTuple implements TupleDestination->putTuple * for ExplainAnalyzeDestination. @@ -1656,6 +1796,8 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, int placementIndex, int queryNumber, HeapTuple heapTuple, uint64 tupleLibpqSize) { + uint32 taskId = task->taskId; + ExplainAnalyzeDestination *tupleDestination = (ExplainAnalyzeDestination *) self; if (queryNumber == 0) { @@ -1663,6 +1805,13 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, originalTupDest->putTuple(originalTupDest, task, placementIndex, 0, heapTuple, tupleLibpqSize); tupleDestination->originalTask->totalReceivedTupleData += tupleLibpqSize; + + if (SubPlanExplainAnalyzeContext) + { + EnsureExplainOutputCapacity(taskId + 1); + SubPlanExplainOutput[taskId].totalReceivedTupleData = + tupleDestination->originalTask->totalReceivedTupleData; + } } else if (queryNumber == 1) { @@ -1678,6 +1827,8 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, } Datum executionDuration = heap_getattr(heapTuple, 2, tupDesc, &isNull); + Datum executionTuples = heap_getattr(heapTuple, 3, tupDesc, &isNull); + Datum executionLoops = heap_getattr(heapTuple, 4, tupDesc, &isNull); if (isNull) { @@ -1687,6 +1838,8 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, char *fetchedExplainAnalyzePlan = TextDatumGetCString(explainAnalyze); double fetchedExplainAnalyzeExecutionDuration = DatumGetFloat8(executionDuration); + double fetchedExplainAnalyzeTuples = DatumGetFloat8(executionTuples); + double fetchedExplainAnalyzeLoops = DatumGetFloat8(executionLoops); /* * Allocate fetchedExplainAnalyzePlan in the same context as the Task, since we are @@ -1712,6 +1865,20 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, placementIndex; tupleDestination->originalTask->fetchedExplainAnalyzeExecutionDuration = fetchedExplainAnalyzeExecutionDuration; + + /* We should build tupleDestination in subPlan similar to the above */ + if (SubPlanExplainAnalyzeContext) + { + EnsureExplainOutputCapacity(taskId + 1); + SubPlanExplainOutput[taskId].explainOutput = + MemoryContextStrdup(SubPlanExplainAnalyzeContext, + fetchedExplainAnalyzePlan); + SubPlanExplainOutput[taskId].executionDuration = + fetchedExplainAnalyzeExecutionDuration; + SubPlanExplainOutput[taskId].executionNtuples = fetchedExplainAnalyzeTuples; + SubPlanExplainOutput[taskId].executionNloops = fetchedExplainAnalyzeLoops; + NumTasksOutput++; + } } else { @@ -1774,7 +1941,14 @@ ExplainAnalyzeDestTupleDescForQuery(TupleDestination *self, int queryNumber) bool RequestedForExplainAnalyze(CitusScanState *node) { - return (node->customScanState.ss.ps.state->es_instrument != 0); + /* + * When running a distributed plan—either the root plan or a subplan’s + * distributed fragment—we need to know if we’re under EXPLAIN ANALYZE. + * Subplans can’t receive the EXPLAIN ANALYZE flag directly, so we use + * SubPlanExplainAnalyzeContext as a flag to indicate that context. + */ + return (node->customScanState.ss.ps.state->es_instrument != 0) || + (SubPlanLevel > 0 && SubPlanExplainAnalyzeContext); } @@ -1933,7 +2107,8 @@ FetchPlanQueryForExplainAnalyze(const char *queryString, ParamListInfo params) } appendStringInfoString(fetchQuery, - "SELECT explain_analyze_output, execution_duration " + "SELECT explain_analyze_output, execution_duration, " + "execution_ntuples, execution_nloops " "FROM worker_last_saved_explain_analyze()"); return fetchQuery->data; @@ -2105,6 +2280,20 @@ ExplainOneQuery(Query *query, int cursorOptions, } +/* + * PlanStateAnalyzeWalker Tree walker callback that visits each PlanState node in the + * plan tree and extracts analyze statistics from CustomScanState tasks using + * ExtractAnalyzeStats. Always returns false to recurse into all children. + */ +static bool +PlanStateAnalyzeWalker(PlanState *planState, void *ctx) +{ + DistributedSubPlan *subplan = (DistributedSubPlan *) ctx; + ExtractAnalyzeStats(subplan, planState); + return false; +} + + /* * ExplainWorkerPlan produces explain output into es. If es->analyze, it also executes * the given plannedStmt and sends the results to dest. It puts total time to execute in @@ -2119,20 +2308,25 @@ ExplainOneQuery(Query *query, int cursorOptions, * destination. */ static void -ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es, +ExplainWorkerPlan(PlannedStmt *plannedstmt, DistributedSubPlan *subPlan, DestReceiver *dest, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, #if PG_VERSION_NUM >= PG_VERSION_17 const BufferUsage *bufusage, const MemoryContextCounters *mem_counters, #endif - double *executionDurationMillisec) + double *executionDurationMillisec, + double *executionTuples, + double *executionLoops) { QueryDesc *queryDesc; instr_time starttime; double totaltime = 0; int eflags; int instrument_option = 0; + /* Sub-plan already executed; skipping execution */ + bool executeQuery = (es->analyze && !subPlan); + bool executeSubplan = (es->analyze && subPlan); Assert(plannedstmt->commandType != CMD_UTILITY); @@ -2174,7 +2368,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es ); /* Select execution options */ - if (es->analyze) + if (executeQuery) eflags = 0; /* default run-to-completion flags */ else eflags = EXEC_FLAG_EXPLAIN_ONLY; @@ -2183,7 +2377,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es ExecutorStart(queryDesc, eflags); /* Execute the plan for statistics if asked for */ - if (es->analyze) + if (executeQuery) { ScanDirection dir = ForwardScanDirection; @@ -2206,6 +2400,12 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es ExplainOpenGroup("Query", NULL, true, es); + if (executeSubplan) + { + ExtractAnalyzeStats(subPlan, queryDesc->planstate); + planstate_tree_walker(queryDesc->planstate, PlanStateAnalyzeWalker, (void *) subPlan); + } + /* Create textual dump of plan tree */ ExplainPrintPlan(es, queryDesc); @@ -2278,6 +2478,13 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es */ INSTR_TIME_SET_CURRENT(starttime); + if (executeQuery) + { + Instrumentation *instr = queryDesc->planstate->instrument; + *executionTuples = instr->ntuples; + *executionLoops = instr->nloops; + } + ExecutorEnd(queryDesc); FreeQueryDesc(queryDesc); @@ -2285,7 +2492,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es PopActiveSnapshot(); /* We need a CCI just in case query expanded to multiple plans */ - if (es->analyze) + if (executeQuery) CommandCounterIncrement(); totaltime += elapsed_time(&starttime); diff --git a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql index 0373d3c40..2f507eb24 100644 --- a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql +++ b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql @@ -1,2 +1,3 @@ -- citus--13.1-1--13.2-1 -- bump version to 13.2-1 +#include "udfs/worker_last_saved_explain_analyze/13.2-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql index 6f4ecd1ef..de26b790a 100644 --- a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql @@ -1,2 +1,5 @@ -- citus--13.2-1--13.1-1 -- downgrade version to 13.1-1 + +DROP FUNCTION IF EXISTS pg_catalog.worker_last_saved_explain_analyze(); +#include "../udfs/worker_last_saved_explain_analyze/9.4-1.sql" diff --git a/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/13.2-1.sql b/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/13.2-1.sql new file mode 100644 index 000000000..805dc83cc --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/13.2-1.sql @@ -0,0 +1,10 @@ + +DROP FUNCTION pg_catalog.worker_last_saved_explain_analyze(); + +CREATE OR REPLACE FUNCTION pg_catalog.worker_last_saved_explain_analyze() + RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION, + execution_ntuples DOUBLE PRECISION, execution_nloops DOUBLE PRECISION) + LANGUAGE C STRICT + AS 'citus'; +COMMENT ON FUNCTION pg_catalog.worker_last_saved_explain_analyze() IS + 'Returns the saved explain analyze output for the last run query'; diff --git a/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/latest.sql b/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/latest.sql index 17a5a15c5..805dc83cc 100644 --- a/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/latest.sql +++ b/src/backend/distributed/sql/udfs/worker_last_saved_explain_analyze/latest.sql @@ -1,6 +1,9 @@ +DROP FUNCTION pg_catalog.worker_last_saved_explain_analyze(); + CREATE OR REPLACE FUNCTION pg_catalog.worker_last_saved_explain_analyze() - RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION) + RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION, + execution_ntuples DOUBLE PRECISION, execution_nloops DOUBLE PRECISION) LANGUAGE C STRICT AS 'citus'; COMMENT ON FUNCTION pg_catalog.worker_last_saved_explain_analyze() IS diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index fb5509def..a4ad3e094 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -106,7 +106,7 @@ LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId out TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction); HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); - CatalogTupleInsert(pgDistTransaction, heapTuple); + CATALOG_INSERT_WITH_SNAPSHOT(pgDistTransaction, heapTuple); CommandCounterIncrement(); diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 51716cff3..aca376df9 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -147,6 +147,31 @@ CopyNodeDistributedSubPlan(COPYFUNC_ARGS) COPY_SCALAR_FIELD(subPlanId); COPY_NODE_FIELD(plan); + COPY_SCALAR_FIELD(bytesSentPerWorker); + COPY_SCALAR_FIELD(remoteWorkerCount); + COPY_SCALAR_FIELD(durationMillisecs); + COPY_SCALAR_FIELD(writeLocalFile); + + if (newnode->totalExplainOutput) + { + MemSet(newnode->totalExplainOutput, 0, sizeof(newnode->totalExplainOutput)); + } + + /* copy each SubPlanExplainOutput element */ + for (int i = 0; i < from->numTasksOutput; i++) + { + /* copy the explainOutput string pointer */ + COPY_STRING_FIELD(totalExplainOutput[i].explainOutput); + + /* copy the executionDuration (double) */ + COPY_SCALAR_FIELD(totalExplainOutput[i].executionDuration); + + /* copy the totalReceivedTupleData (uint64) */ + COPY_SCALAR_FIELD(totalExplainOutput[i].totalReceivedTupleData); + } + + COPY_SCALAR_FIELD(numTasksOutput); + COPY_SCALAR_FIELD(ntuples); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 751063789..c19b0c3d4 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -215,6 +215,48 @@ OutDistributedSubPlan(OUTFUNC_ARGS) WRITE_UINT_FIELD(subPlanId); WRITE_NODE_FIELD(plan); + WRITE_UINT64_FIELD(bytesSentPerWorker); + WRITE_INT_FIELD(remoteWorkerCount); + WRITE_FLOAT_FIELD(durationMillisecs, "%.2f"); + WRITE_BOOL_FIELD(writeLocalFile); + + appendStringInfoString(str, " totalExplainOutput ["); + for (int i = 0; i < node->numTasksOutput; i++) + { + const SubPlanExplainOutputData *e = &node->totalExplainOutput[i]; + + /* skip empty slots */ + if (e->explainOutput == NULL && + e->executionDuration == 0 + && e->totalReceivedTupleData == 0) + { + continue; + } + + if (i > 0) + { + appendStringInfoChar(str, ' '); + } + + appendStringInfoChar(str, '('); + + /* string pointer – prints quoted or NULL */ + WRITE_STRING_FIELD(totalExplainOutput[i].explainOutput); + + /* double field */ + WRITE_FLOAT_FIELD(totalExplainOutput[i].executionDuration, "%.2f"); + + /* 64-bit unsigned – use the uint64 macro */ + WRITE_UINT64_FIELD(totalExplainOutput[i].totalReceivedTupleData); + + appendStringInfoChar(str, ')'); + } + + appendStringInfoChar(str, ']'); + + WRITE_INT_FIELD(numTasksOutput); + WRITE_FLOAT_FIELD(ntuples, "%.2f"); + } void diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 5f031b2b5..af507d5b9 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -815,13 +815,14 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId, bool indexOK = true; int scanKeyCount = 1; ScanKeyData scanKey[1]; - Datum values[Natts_pg_dist_partition]; - bool isNull[Natts_pg_dist_partition]; - bool replace[Natts_pg_dist_partition]; Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isNull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId)); @@ -838,10 +839,6 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId, distributedRelationName))); } - memset(values, 0, sizeof(values)); - memset(isNull, false, sizeof(isNull)); - memset(replace, false, sizeof(replace)); - values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); isNull[Anum_pg_dist_partition_colocationid - 1] = false; replace[Anum_pg_dist_partition_colocationid - 1] = true; @@ -858,6 +855,10 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId, systable_endscan(scanDescriptor); table_close(pgDistPartition, NoLock); + pfree(values); + pfree(isNull); + pfree(replace); + bool shouldSyncMetadata = ShouldSyncTableMetadata(distributedRelationId); if (shouldSyncMetadata && !localOnly) { @@ -998,10 +999,12 @@ ColocationGroupTableList(uint32 colocationId, uint32 count) indexOK, NULL, scanKeyCount, scanKey); HeapTuple heapTuple = systable_getnext(scanDescriptor); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); while (HeapTupleIsValid(heapTuple)) { - bool isNullArray[Natts_pg_dist_partition]; - Datum datumArray[Natts_pg_dist_partition]; + memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum)); + memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); Oid colocatedTableId = DatumGetObjectId( datumArray[Anum_pg_dist_partition_logicalrelid - 1]); @@ -1020,6 +1023,8 @@ ColocationGroupTableList(uint32 colocationId, uint32 count) break; } } + pfree(datumArray); + pfree(isNullArray); systable_endscan(scanDescriptor); table_close(pgDistPartition, AccessShareLock); @@ -1192,10 +1197,12 @@ ColocatedTableId(int32 colocationId) indexOK, NULL, scanKeyCount, scanKey); HeapTuple heapTuple = systable_getnext(scanDescriptor); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); while (HeapTupleIsValid(heapTuple)) { - bool isNullArray[Natts_pg_dist_partition]; - Datum datumArray[Natts_pg_dist_partition]; + memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum)); + memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); colocatedTableId = DatumGetObjectId( datumArray[Anum_pg_dist_partition_logicalrelid - 1]); @@ -1223,6 +1230,8 @@ ColocatedTableId(int32 colocationId) heapTuple = systable_getnext(scanDescriptor); } + pfree(datumArray); + pfree(isNullArray); systable_endscan(scanDescriptor); table_close(pgDistPartition, AccessShareLock); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 38c13eb51..a507138d2 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -466,4 +466,5 @@ extern bool IsBackgroundJobStatusTerminal(BackgroundJobStatus status); extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status); extern Oid BackgroundJobStatusOid(BackgroundJobStatus status); extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status); +extern int GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDEsc); #endif /* METADATA_UTILITY_H */ diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 6708d9a64..b0b0288de 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -146,8 +146,8 @@ extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamLis DestReceiver *dest); extern void ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *dest); -extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, - DestReceiver *dest); +extern uint64 ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, + DestReceiver *dest); extern void SetLocalMultiShardModifyModeToSequential(void); extern void EnsureSequentialMode(ObjectType objType); extern void SetLocalForceMaxQueryParallelization(void); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index e5ec2205d..1040b4149 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -490,6 +490,24 @@ typedef struct DistributedPlan } DistributedPlan; +/* + * SubPlanExplainOutputData Holds the EXPLAIN ANALYZE output and collected + * statistics for a single task executed by a worker during distributed + * query execution. + * explainOutput — raw EXPLAIN ANALYZE output for the task + * executionDuration — wall‑clock time taken to run the task + * totalReceivedTupleData — total bytes of tuple data received from the worker + */ +typedef struct SubPlanExplainOutputData +{ + char *explainOutput; + double executionDuration; + double executionNtuples; + double executionNloops; + uint64 totalReceivedTupleData; +} SubPlanExplainOutputData; + + /* * DistributedSubPlan contains a subplan of a distributed plan. Subplans are * executed before the distributed query and their results are written to @@ -508,6 +526,9 @@ typedef struct DistributedSubPlan uint32 remoteWorkerCount; double durationMillisecs; bool writeLocalFile; + SubPlanExplainOutputData *totalExplainOutput; + uint32 numTasksOutput; /* actual size of the above array */ + double ntuples; /* total tuples produced */ } DistributedSubPlan; diff --git a/src/include/distributed/subplan_execution.h b/src/include/distributed/subplan_execution.h index d68db43ce..045e77bc6 100644 --- a/src/include/distributed/subplan_execution.h +++ b/src/include/distributed/subplan_execution.h @@ -17,7 +17,7 @@ extern int MaxIntermediateResult; extern int SubPlanLevel; -extern void ExecuteSubPlans(DistributedPlan *distributedPlan); +extern void ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled); /** * IntermediateResultsHashEntry is used to store which nodes need to receive diff --git a/src/include/pg_version_compat.h b/src/include/pg_version_compat.h index 385aecd38..997ad4b58 100644 --- a/src/include/pg_version_compat.h +++ b/src/include/pg_version_compat.h @@ -13,6 +13,10 @@ #include "pg_version_constants.h" +/* we need these for PG-18’s PushActiveSnapshot/PopActiveSnapshot APIs */ +#include "access/xact.h" +#include "utils/snapmgr.h" + #if PG_VERSION_NUM >= PG_VERSION_18 #define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, k) \ create_foreignscan_path( \ @@ -36,6 +40,14 @@ /* PG-18 unified row-compare operator codes under COMPARE_* */ #define ROWCOMPARE_NE COMPARE_NE +#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \ + do { \ + Snapshot __snap = GetTransactionSnapshot(); \ + PushActiveSnapshot(__snap); \ + CatalogTupleInsert((rel), (tup)); \ + PopActiveSnapshot(); \ + } while (0) + #elif PG_VERSION_NUM >= PG_VERSION_17 #define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, k) \ create_foreignscan_path( \ @@ -43,6 +55,10 @@ (e), (f), \ (g), (h), (i), (j), (k) \ ) + +/* no-op wrapper on older PGs */ +#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \ + CatalogTupleInsert((rel), (tup)) #endif #if PG_VERSION_NUM >= PG_VERSION_17 @@ -453,6 +469,10 @@ getStxstattarget_compat(HeapTuple tup) k) create_foreignscan_path(a, b, c, d, e, f, g, h, \ i, k) +/* no-op wrapper on older PGs */ +#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \ + CatalogTupleInsert((rel), (tup)) + #define getProcNo_compat(a) (a->pgprocno) #define getLxid_compat(a) (a->lxid) diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index bfcf29c4d..49027b217 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -2492,15 +2492,15 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) -> Distributed Subplan XXX_1 Intermediate Data Size: 100 bytes Result destination: Write locally - -> Custom Scan (Citus Adaptive) (actual rows=20 loops=1) + -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 - Tuple data received from nodes: 160 bytes + Tuple data received from nodes: 80 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 64 bytes + Tuple data received from node: 32 bytes Node: host=localhost port=xxxxx dbname=regression - -> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1) - -> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1) + -> Insert on dist_table_570017 citus_table_alias (actual rows=4 loops=1) + -> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1) Filter: (a IS NOT NULL) -> Distributed Subplan XXX_2 Intermediate Data Size: 150 bytes @@ -3228,6 +3228,159 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1) -> Update on tbl_570036 tbl (actual rows=0 loops=1) -> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1) Filter: (a = 1) +-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212) +SET search_path TO multi_explain; +CREATE TABLE test_subplans (x int primary key, y int); +SELECT create_distributed_table('test_subplans','x'); + +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + -> Distributed Subplan XXX_1 + Intermediate Data Size: 18 bytes + Result destination: Write locally + -> Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 16 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 16 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Insert on test_subplans_570038 (actual rows=1 loops=1) + -> Result (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 8 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 8 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1) +-- Only one row must exist +SELECT * FROM test_subplans; +1|2 +-- Will fail with duplicate pk +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +ERROR: duplicate key value violates unique constraint "test_subplans_pkey_570038" +DETAIL: Key (x)=(1) already exists. +CONTEXT: while executing command on localhost:xxxxx +-- Test JSON format +TRUNCATE test_subplans; +EXPLAIN (FORMAT JSON, COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +[ + { + "Plan": { + "Node Type": "Custom Scan", + "Custom Plan Provider": "Citus Adaptive", + "Parallel Aware": false, + "Async Capable": false, + "Actual Rows": 1, + "Actual Loops": 1, + "Distributed Query": { + "Subplans": [ + { + "Intermediate Data Size": "18 bytes", + "Result destination": "Write locally", + "PlannedStmt": [ + { + "Plan": { + "Node Type": "Custom Scan", + "Custom Plan Provider": "Citus Adaptive", + "Parallel Aware": false, + "Async Capable": false, + "Actual Rows": 1, + "Actual Loops": 1, + "Distributed Query": { + "Job": { + "Task Count": 1, + "Tuple data received from nodes": "16 bytes", + "Tasks Shown": "All", + "Tasks": [ + { + "Tuple data received from node": "16 bytes", + "Node": "host=localhost port=xxxxx dbname=regression", + "Remote Plan": [ + [ + { + "Plan": { + "Node Type": "ModifyTable", + "Operation": "Insert", + "Parallel Aware": false, + "Async Capable": false, + "Relation Name": "test_subplans_570038", + "Alias": "test_subplans_570038", + "Actual Rows": 1, + "Actual Loops": 1, + "Plans": [ + { + "Node Type": "Result", + "Parent Relationship": "Outer", + "Parallel Aware": false, + "Async Capable": false, + "Actual Rows": 1, + "Actual Loops": 1 + } + ] + }, + "Triggers": [ + ] + } + ] + + ] + } + ] + } + } + }, + "Triggers": [ + ] + } + ] + } + ], + "Job": { + "Task Count": 1, + "Tuple data received from nodes": "8 bytes", + "Tasks Shown": "All", + "Tasks": [ + { + "Tuple data received from node": "8 bytes", + "Node": "host=localhost port=xxxxx dbname=regression", + "Remote Plan": [ + [ + { + "Plan": { + "Node Type": "Function Scan", + "Parallel Aware": false, + "Async Capable": false, + "Function Name": "read_intermediate_result", + "Alias": "intermediate_result", + "Actual Rows": 1, + "Actual Loops": 1 + }, + "Triggers": [ + ] + } + ] + + ] + } + ] + } + } + }, + "Triggers": [ + ] + } +] +-- Only one row must exist +SELECT * FROM test_subplans; +1|2 -- check when auto explain + analyze is enabled, we do not allow local execution. CREATE SCHEMA test_auto_explain; SET search_path TO 'test_auto_explain'; diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out index 4d3acd14d..00a8309a9 100644 --- a/src/test/regress/expected/multi_explain_0.out +++ b/src/test/regress/expected/multi_explain_0.out @@ -2484,15 +2484,15 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) -> Distributed Subplan XXX_1 Intermediate Data Size: 100 bytes Result destination: Write locally - -> Custom Scan (Citus Adaptive) (actual rows=20 loops=1) + -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 - Tuple data received from nodes: 160 bytes + Tuple data received from nodes: 80 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 64 bytes + Tuple data received from node: 32 bytes Node: host=localhost port=xxxxx dbname=regression - -> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1) - -> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1) + -> Insert on dist_table_570017 citus_table_alias (actual rows=4 loops=1) + -> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1) Filter: (a IS NOT NULL) -> Distributed Subplan XXX_2 Intermediate Data Size: 150 bytes @@ -3217,6 +3217,159 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1) -> Update on tbl_570036 tbl (actual rows=0 loops=1) -> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1) Filter: (a = 1) +-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212) +SET search_path TO multi_explain; +CREATE TABLE test_subplans (x int primary key, y int); +SELECT create_distributed_table('test_subplans','x'); + +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + -> Distributed Subplan XXX_1 + Intermediate Data Size: 18 bytes + Result destination: Write locally + -> Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 16 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 16 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Insert on test_subplans_570038 (actual rows=1 loops=1) + -> Result (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 8 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 8 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1) +-- Only one row must exist +SELECT * FROM test_subplans; +1|2 +-- Will fail with duplicate pk +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +ERROR: duplicate key value violates unique constraint "test_subplans_pkey_570038" +DETAIL: Key (x)=(1) already exists. +CONTEXT: while executing command on localhost:xxxxx +-- Test JSON format +TRUNCATE test_subplans; +EXPLAIN (FORMAT JSON, COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +[ + { + "Plan": { + "Node Type": "Custom Scan", + "Custom Plan Provider": "Citus Adaptive", + "Parallel Aware": false, + "Async Capable": false, + "Actual Rows": 1, + "Actual Loops": 1, + "Distributed Query": { + "Subplans": [ + { + "Intermediate Data Size": "18 bytes", + "Result destination": "Write locally", + "PlannedStmt": [ + { + "Plan": { + "Node Type": "Custom Scan", + "Custom Plan Provider": "Citus Adaptive", + "Parallel Aware": false, + "Async Capable": false, + "Actual Rows": 1, + "Actual Loops": 1, + "Distributed Query": { + "Job": { + "Task Count": 1, + "Tuple data received from nodes": "16 bytes", + "Tasks Shown": "All", + "Tasks": [ + { + "Tuple data received from node": "16 bytes", + "Node": "host=localhost port=xxxxx dbname=regression", + "Remote Plan": [ + [ + { + "Plan": { + "Node Type": "ModifyTable", + "Operation": "Insert", + "Parallel Aware": false, + "Async Capable": false, + "Relation Name": "test_subplans_570038", + "Alias": "test_subplans_570038", + "Actual Rows": 1, + "Actual Loops": 1, + "Plans": [ + { + "Node Type": "Result", + "Parent Relationship": "Outer", + "Parallel Aware": false, + "Async Capable": false, + "Actual Rows": 1, + "Actual Loops": 1 + } + ] + }, + "Triggers": [ + ] + } + ] + + ] + } + ] + } + } + }, + "Triggers": [ + ] + } + ] + } + ], + "Job": { + "Task Count": 1, + "Tuple data received from nodes": "8 bytes", + "Tasks Shown": "All", + "Tasks": [ + { + "Tuple data received from node": "8 bytes", + "Node": "host=localhost port=xxxxx dbname=regression", + "Remote Plan": [ + [ + { + "Plan": { + "Node Type": "Function Scan", + "Parallel Aware": false, + "Async Capable": false, + "Function Name": "read_intermediate_result", + "Alias": "intermediate_result", + "Actual Rows": 1, + "Actual Loops": 1 + }, + "Triggers": [ + ] + } + ] + + ] + } + ] + } + } + }, + "Triggers": [ + ] + } +] +-- Only one row must exist +SELECT * FROM test_subplans; +1|2 -- check when auto explain + analyze is enabled, we do not allow local execution. CREATE SCHEMA test_auto_explain; SET search_path TO 'test_auto_explain'; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 4e8e927f4..defe41f0d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1503,7 +1503,9 @@ ALTER EXTENSION citus UPDATE TO '13.2-1'; SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- -(0 rows) + function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision) | + | function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision, execution_ntuples double precision, execution_nloops double precision) +(2 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/stat_counters.out b/src/test/regress/expected/stat_counters.out index a27eb3241..25327d4f7 100644 --- a/src/test/regress/expected/stat_counters.out +++ b/src/test/regress/expected/stat_counters.out @@ -721,13 +721,11 @@ CALL exec_query_and_check_query_counters($$ 0, 0 ); -- same with explain analyze --- --- this time, query_execution_multi_shard is incremented twice because of #4212 CALL exec_query_and_check_query_counters($$ EXPLAIN (ANALYZE) SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q $$, - 1, 2 + 1, 1 ); CALL exec_query_and_check_query_counters($$ DELETE FROM dist_table WHERE a = 1 @@ -1041,9 +1039,6 @@ PL/pgSQL function exec_query_and_check_query_counters(text,bigint,bigint) line X -- A similar one but without the insert, so we would normally expect 2 increments -- for query_execution_single_shard and 2 for query_execution_multi_shard instead -- of 3 since the insert is not there anymore. --- --- But this time we observe more counter increments because we execute the subplans --- twice because of #4212. CALL exec_query_and_check_query_counters($$ EXPLAIN (ANALYZE) -- single-shard subplan (whole cte) @@ -1057,7 +1052,7 @@ CALL exec_query_and_check_query_counters($$ FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q) JOIN cte ON q.a = cte.a $$, - 3, 4 + 2, 2 ); -- safe to push-down CALL exec_query_and_check_query_counters($$ diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index 65ca6f5da..c6502fec8 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -1166,6 +1166,32 @@ PREPARE q2(int_wrapper_type) AS WITH a AS (UPDATE tbl SET b = $1 WHERE a = 1 RET EXPLAIN (COSTS false) EXECUTE q2('(1)'); EXPLAIN :default_analyze_flags EXECUTE q2('(1)'); +-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212) +SET search_path TO multi_explain; +CREATE TABLE test_subplans (x int primary key, y int); +SELECT create_distributed_table('test_subplans','x'); + +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; + +-- Only one row must exist +SELECT * FROM test_subplans; + +-- Will fail with duplicate pk +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; + +-- Test JSON format +TRUNCATE test_subplans; +EXPLAIN (FORMAT JSON, COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; + +-- Only one row must exist +SELECT * FROM test_subplans; + -- check when auto explain + analyze is enabled, we do not allow local execution. CREATE SCHEMA test_auto_explain; SET search_path TO 'test_auto_explain'; diff --git a/src/test/regress/sql/stat_counters.sql b/src/test/regress/sql/stat_counters.sql index 3376ba6c7..18f4b8aac 100644 --- a/src/test/regress/sql/stat_counters.sql +++ b/src/test/regress/sql/stat_counters.sql @@ -476,13 +476,11 @@ CALL exec_query_and_check_query_counters($$ ); -- same with explain analyze --- --- this time, query_execution_multi_shard is incremented twice because of #4212 CALL exec_query_and_check_query_counters($$ EXPLAIN (ANALYZE) SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q $$, - 1, 2 + 1, 1 ); CALL exec_query_and_check_query_counters($$ @@ -807,9 +805,6 @@ CALL exec_query_and_check_query_counters($$ -- A similar one but without the insert, so we would normally expect 2 increments -- for query_execution_single_shard and 2 for query_execution_multi_shard instead -- of 3 since the insert is not there anymore. --- --- But this time we observe more counter increments because we execute the subplans --- twice because of #4212. CALL exec_query_and_check_query_counters($$ EXPLAIN (ANALYZE) -- single-shard subplan (whole cte) @@ -823,7 +818,7 @@ CALL exec_query_and_check_query_counters($$ FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q) JOIN cte ON q.a = cte.a $$, - 3, 4 + 2, 2 ); -- safe to push-down