Merge branch 'main' into ihalatci-dependency-updates

ihalatci-dependency-updates
ibrahim halatci 2025-08-06 15:11:55 +03:00 committed by GitHub
commit 80838b1413
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 947 additions and 154 deletions

View File

@ -346,12 +346,12 @@ CdcIsReferenceTableViaCatalog(Oid relationId)
return false;
}
Datum datumArray[Natts_pg_dist_partition];
bool isNullArray[Natts_pg_dist_partition];
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
if (isNullArray[Anum_pg_dist_partition_partmethod - 1] ||
@ -363,6 +363,8 @@ CdcIsReferenceTableViaCatalog(Oid relationId)
*/
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
pfree(datumArray);
pfree(isNullArray);
return false;
}
@ -374,6 +376,8 @@ CdcIsReferenceTableViaCatalog(Oid relationId)
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
pfree(datumArray);
pfree(isNullArray);
/*
* A table is a reference table when its partition method is 'none'

View File

@ -760,7 +760,7 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
*/
LockPartitionsForDistributedPlan(distributedPlan);
ExecuteSubPlans(distributedPlan);
ExecuteSubPlans(distributedPlan, RequestedForExplainAnalyze(scanState));
scanState->finishedPreScan = true;
}
@ -3804,7 +3804,7 @@ PopAssignedPlacementExecution(WorkerSession *session)
/*
* PopAssignedPlacementExecution finds an executable task from the queue of assigned tasks.
* PopUnAssignedPlacementExecution finds an executable task from the queue of unassigned tasks.
*/
static TaskPlacementExecution *
PopUnassignedPlacementExecution(WorkerPool *workerPool)

View File

@ -42,6 +42,7 @@
#include "distributed/merge_planner.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_explain.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
@ -121,7 +122,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
bool binaryFormat =
CanUseBinaryCopyFormatForTargetList(selectQuery->targetList);
ExecuteSubPlans(distSelectPlan);
ExecuteSubPlans(distSelectPlan, RequestedForExplainAnalyze(scanState));
/*
* We have a separate directory for each transaction, so choosing

View File

@ -23,6 +23,7 @@
#include "distributed/merge_executor.h"
#include "distributed/merge_planner.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_explain.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_router_planner.h"
#include "distributed/repartition_executor.h"
@ -132,7 +133,7 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
ereport(DEBUG1, (errmsg("Executing subplans of the source query and "
"storing the results at the respective node(s)")));
ExecuteSubPlans(distSourcePlan);
ExecuteSubPlans(distSourcePlan, RequestedForExplainAnalyze(scanState));
/*
* We have a separate directory for each transaction, so choosing

View File

@ -688,7 +688,7 @@ ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *d
* ExecutePlanIntoDestReceiver executes a query plan and sends results to the given
* DestReceiver.
*/
void
uint64
ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
DestReceiver *dest)
{
@ -713,6 +713,8 @@ ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
PortalStart(portal, params, eflags, GetActiveSnapshot());
QueryCompletion qc = { 0 };
#if PG_VERSION_NUM >= PG_VERSION_18
/* PG 18+: six-arg signature (drop the run_once bool) */
@ -721,7 +723,7 @@ ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
false, /* isTopLevel */
dest, /* DestReceiver *dest */
dest, /* DestReceiver *altdest */
NULL); /* QueryCompletion *qc */
&qc); /* QueryCompletion *qc */
#else
/* PG 17-: original seven-arg signature */
@ -731,10 +733,12 @@ ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
true, /* run_once */
dest, /* DestReceiver *dest */
dest, /* DestReceiver *altdest */
NULL); /* QueryCompletion *qc */
&qc); /* QueryCompletion *qc */
#endif
PortalDrop(portal, false);
return qc.nprocessed;
}

View File

@ -30,13 +30,22 @@ int MaxIntermediateResult = 1048576; /* maximum size in KB the intermediate resu
/* when this is true, we enforce intermediate result size limit in all executors */
int SubPlanLevel = 0;
/*
* SubPlanExplainAnalyzeContext is both a memory context for storing
* subplans EXPLAIN ANALYZE output and a flag indicating that execution
* is running under EXPLAIN ANALYZE for subplans.
*/
MemoryContext SubPlanExplainAnalyzeContext = NULL;
SubPlanExplainOutputData *SubPlanExplainOutput;
extern uint8 TotalExplainOutputCapacity;
extern uint8 NumTasksOutput;
/*
* ExecuteSubPlans executes a list of subplans from a distributed plan
* by sequentially executing each plan from the top.
*/
void
ExecuteSubPlans(DistributedPlan *distributedPlan)
ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled)
{
uint64 planId = distributedPlan->planId;
List *subPlanList = distributedPlan->subPlanList;
@ -47,6 +56,19 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
return;
}
/*
* If the root DistributedPlan has EXPLAIN ANALYZE enabled,
* its subplans should also have EXPLAIN ANALYZE enabled.
*/
if (explainAnalyzeEnabled)
{
SubPlanExplainAnalyzeContext = GetMemoryChunkContext(distributedPlan);
}
else
{
SubPlanExplainAnalyzeContext = NULL;
}
HTAB *intermediateResultsHash = MakeIntermediateResultHTAB();
RecordSubplanExecutionsOnNodes(intermediateResultsHash, distributedPlan);
@ -79,7 +101,23 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
TimestampTz startTimestamp = GetCurrentTimestamp();
ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest);
uint64 nprocessed;
PG_TRY();
{
nprocessed =
ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest);
}
PG_CATCH();
{
SubPlanExplainAnalyzeContext = NULL;
SubPlanExplainOutput = NULL;
TotalExplainOutputCapacity = 0;
NumTasksOutput = 0;
PG_RE_THROW();
}
PG_END_TRY();
/*
* EXPLAIN ANALYZE instrumentations. Calculating these are very light-weight,
@ -94,10 +132,24 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
subPlan->durationMillisecs += durationMicrosecs * MICRO_TO_MILLI_SECOND;
subPlan->bytesSentPerWorker = RemoteFileDestReceiverBytesSent(copyDest);
subPlan->ntuples = nprocessed;
subPlan->remoteWorkerCount = list_length(remoteWorkerNodeList);
subPlan->writeLocalFile = entry->writeLocalFile;
SubPlanLevel--;
/*
* Save the EXPLAIN ANALYZE output(s) for later extraction in ExplainSubPlans().
* Because the SubPlan context isnt available during distributed execution,
* pass the pointer as a global variable in SubPlanExplainOutput.
*/
subPlan->totalExplainOutput = SubPlanExplainOutput;
subPlan->numTasksOutput = NumTasksOutput;
SubPlanExplainOutput = NULL;
TotalExplainOutputCapacity = 0;
NumTasksOutput = 0;
FreeExecutorState(estate);
}
SubPlanExplainAnalyzeContext = NULL;
}

View File

@ -729,12 +729,13 @@ PartitionMethodViaCatalog(Oid relationId)
return DISTRIBUTE_BY_INVALID;
}
Datum datumArray[Natts_pg_dist_partition];
bool isNullArray[Natts_pg_dist_partition];
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
if (isNullArray[Anum_pg_dist_partition_partmethod - 1])
@ -742,6 +743,8 @@ PartitionMethodViaCatalog(Oid relationId)
/* partition method cannot be NULL, still let's make sure */
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
pfree(datumArray);
pfree(isNullArray);
return DISTRIBUTE_BY_INVALID;
}
@ -750,6 +753,8 @@ PartitionMethodViaCatalog(Oid relationId)
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
pfree(datumArray);
pfree(isNullArray);
return partitionMethodChar;
}
@ -768,12 +773,12 @@ PartitionColumnViaCatalog(Oid relationId)
return NULL;
}
Datum datumArray[Natts_pg_dist_partition];
bool isNullArray[Natts_pg_dist_partition];
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
if (isNullArray[Anum_pg_dist_partition_partkey - 1])
@ -781,6 +786,8 @@ PartitionColumnViaCatalog(Oid relationId)
/* partition key cannot be NULL, still let's make sure */
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
pfree(datumArray);
pfree(isNullArray);
return NULL;
}
@ -795,6 +802,8 @@ PartitionColumnViaCatalog(Oid relationId)
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
pfree(datumArray);
pfree(isNullArray);
return partitionColumn;
}
@ -813,12 +822,13 @@ ColocationIdViaCatalog(Oid relationId)
return INVALID_COLOCATION_ID;
}
Datum datumArray[Natts_pg_dist_partition];
bool isNullArray[Natts_pg_dist_partition];
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
if (isNullArray[Anum_pg_dist_partition_colocationid - 1])
@ -826,6 +836,8 @@ ColocationIdViaCatalog(Oid relationId)
/* colocation id cannot be NULL, still let's make sure */
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
pfree(datumArray);
pfree(isNullArray);
return INVALID_COLOCATION_ID;
}
@ -834,6 +846,8 @@ ColocationIdViaCatalog(Oid relationId)
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
pfree(datumArray);
pfree(isNullArray);
return colocationId;
}
@ -1741,10 +1755,11 @@ BuildCitusTableCacheEntry(Oid relationId)
}
MemoryContext oldContext = NULL;
Datum datumArray[Natts_pg_dist_partition];
bool isNullArray[Natts_pg_dist_partition];
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(distPartitionTuple, tupleDescriptor, datumArray, isNullArray);
CitusTableCacheEntry *cacheEntry =
@ -1797,7 +1812,7 @@ BuildCitusTableCacheEntry(Oid relationId)
cacheEntry->replicationModel = DatumGetChar(replicationModelDatum);
}
if (isNullArray[Anum_pg_dist_partition_autoconverted - 1])
if (isNullArray[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)])
{
/*
* We don't expect this to happen, but set it to false (the default value)
@ -1808,7 +1823,7 @@ BuildCitusTableCacheEntry(Oid relationId)
else
{
cacheEntry->autoConverted = DatumGetBool(
datumArray[Anum_pg_dist_partition_autoconverted - 1]);
datumArray[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)]);
}
heap_freetuple(distPartitionTuple);
@ -1852,6 +1867,9 @@ BuildCitusTableCacheEntry(Oid relationId)
table_close(pgDistPartition, NoLock);
pfree(datumArray);
pfree(isNullArray);
cacheEntry->isValid = true;
return cacheEntry;
@ -5011,10 +5029,13 @@ CitusTableTypeIdList(CitusTableType citusTableType)
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
while (HeapTupleIsValid(heapTuple))
{
bool isNullArray[Natts_pg_dist_partition];
Datum datumArray[Natts_pg_dist_partition];
memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum));
memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
Datum partMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1];
@ -5038,6 +5059,9 @@ CitusTableTypeIdList(CitusTableType citusTableType)
heapTuple = systable_getnext(scanDescriptor);
}
pfree(datumArray);
pfree(isNullArray);
systable_endscan(scanDescriptor);
table_close(pgDistPartition, AccessShareLock);

View File

@ -573,13 +573,17 @@ FetchRelationIdFromPgPartitionHeapTuple(HeapTuple heapTuple, TupleDesc tupleDesc
{
Assert(heapTuple->t_tableOid == DistPartitionRelationId());
bool isNullArray[Natts_pg_dist_partition];
Datum datumArray[Natts_pg_dist_partition];
Datum *datumArray = (Datum *) palloc(tupleDesc->natts * sizeof(Datum));
bool *isNullArray = (bool *) palloc(tupleDesc->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDesc, datumArray, isNullArray);
Datum relationIdDatum = datumArray[Anum_pg_dist_partition_logicalrelid - 1];
Oid relationId = DatumGetObjectId(relationIdDatum);
pfree(datumArray);
pfree(isNullArray);
return relationId;
}

View File

@ -812,6 +812,7 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
{
partitionedShardNames = lappend(partitionedShardNames, quotedShardName);
}
/* for non-partitioned tables, we will use Postgres' size functions */
else
{
@ -1919,23 +1920,22 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
{
char *distributionColumnString = NULL;
Datum newValues[Natts_pg_dist_partition];
bool newNulls[Natts_pg_dist_partition];
/* open system catalog and insert new tuple */
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
Datum *newValues = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
bool *newNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
/* form new tuple for pg_dist_partition */
memset(newValues, 0, sizeof(newValues));
memset(newNulls, false, sizeof(newNulls));
newValues[Anum_pg_dist_partition_logicalrelid - 1] =
ObjectIdGetDatum(relationId);
newValues[Anum_pg_dist_partition_partmethod - 1] =
CharGetDatum(distributionMethod);
newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
newValues[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted);
newValues[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)] =
BoolGetDatum(autoConverted);
/* set partkey column to NULL for reference tables */
if (distributionMethod != DISTRIBUTE_BY_NONE)
@ -1951,7 +1951,7 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
newNulls[Anum_pg_dist_partition_partkey - 1] = true;
}
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues,
HeapTuple newTuple = heap_form_tuple(tupleDescriptor, newValues,
newNulls);
/* finally insert tuple, build index entries & register cache invalidation */
@ -1963,6 +1963,9 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
CommandCounterIncrement();
table_close(pgDistPartition, NoLock);
pfree(newValues);
pfree(newNulls);
}
@ -2154,13 +2157,13 @@ UpdatePlacementGroupId(uint64 placementId, int groupId)
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
Datum values[Natts_pg_dist_placement];
bool isnull[Natts_pg_dist_placement];
bool replace[Natts_pg_dist_placement];
bool colIsNull = false;
Relation pgDistPlacement = table_open(DistPlacementRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement);
Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_placementid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(placementId));
@ -2177,8 +2180,6 @@ UpdatePlacementGroupId(uint64 placementId, int groupId)
placementId)));
}
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_placement_groupid - 1] = Int32GetDatum(groupId);
isnull[Anum_pg_dist_placement_groupid - 1] = false;
replace[Anum_pg_dist_placement_groupid - 1] = true;
@ -2197,6 +2198,10 @@ UpdatePlacementGroupId(uint64 placementId, int groupId)
systable_endscan(scanDescriptor);
table_close(pgDistPlacement, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
}
@ -2210,12 +2215,13 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted)
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
Datum values[Natts_pg_dist_partition];
bool isnull[Natts_pg_dist_partition];
bool replace[Natts_pg_dist_partition];
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(citusTableId));
@ -2231,11 +2237,10 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted)
citusTableId)));
}
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted);
isnull[Anum_pg_dist_partition_autoconverted - 1] = false;
replace[Anum_pg_dist_partition_autoconverted - 1] = true;
int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor);
values[autoconvertedindex] = BoolGetDatum(autoConverted);
isnull[autoconvertedindex] = false;
replace[autoconvertedindex] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
@ -2247,6 +2252,10 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted)
systable_endscan(scanDescriptor);
table_close(pgDistPartition, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
}
@ -2286,12 +2295,13 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
Datum values[Natts_pg_dist_partition];
bool isnull[Natts_pg_dist_partition];
bool replace[Natts_pg_dist_partition];
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
@ -2307,8 +2317,6 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
relationId)));
}
memset(replace, 0, sizeof(replace));
replace[Anum_pg_dist_partition_partmethod - 1] = true;
values[Anum_pg_dist_partition_partmethod - 1] = CharGetDatum(distributionMethod);
isnull[Anum_pg_dist_partition_partmethod - 1] = false;
@ -2317,9 +2325,10 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
isnull[Anum_pg_dist_partition_colocationid - 1] = false;
replace[Anum_pg_dist_partition_autoconverted - 1] = true;
values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(false);
isnull[Anum_pg_dist_partition_autoconverted - 1] = false;
int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor);
replace[autoconvertedindex] = true;
values[autoconvertedindex] = BoolGetDatum(false);
isnull[autoconvertedindex] = false;
char *distributionColumnString = nodeToString((Node *) distributionColumn);
@ -2337,6 +2346,10 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
systable_endscan(scanDescriptor);
table_close(pgDistPartition, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
}
@ -2380,12 +2393,13 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
Datum values[Natts_pg_dist_partition];
bool isnull[Natts_pg_dist_partition];
bool replace[Natts_pg_dist_partition];
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
@ -2401,8 +2415,6 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca
relationId)));
}
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
isnull[Anum_pg_dist_partition_colocationid - 1] = false;
replace[Anum_pg_dist_partition_colocationid - 1] = true;
@ -2411,9 +2423,10 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca
isnull[Anum_pg_dist_partition_repmodel - 1] = false;
replace[Anum_pg_dist_partition_repmodel - 1] = true;
values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted);
isnull[Anum_pg_dist_partition_autoconverted - 1] = false;
replace[Anum_pg_dist_partition_autoconverted - 1] = true;
int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor);
values[autoconvertedindex] = BoolGetDatum(autoConverted);
isnull[autoconvertedindex] = false;
replace[autoconvertedindex] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
@ -2424,6 +2437,10 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca
systable_endscan(scanDescriptor);
table_close(pgDistPartition, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
}
@ -3149,8 +3166,8 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC
values[Anum_pg_dist_background_task_nodes_involved - 1] =
IntArrayToDatum(nodesInvolvedCount, nodesInvolved);
nulls[Anum_pg_dist_background_task_nodes_involved - 1] = (nodesInvolvedCount ==
0);
nulls[Anum_pg_dist_background_task_nodes_involved - 1] =
(nodesInvolvedCount == 0);
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask),
values, nulls);
@ -4420,3 +4437,23 @@ UnblockDependingBackgroundTasks(BackgroundTask *task)
table_close(pgDistBackgroundTasksDepend, NoLock);
}
/*
* GetAutoConvertedAttrIndexInPgDistPartition returns attrnum for autoconverted attr.
*
* autoconverted attr was added to table pg_dist_partition using alter operation after
* the version where Citus started supporting downgrades, and it's only column that we've
* introduced to pg_dist_partition since then.
*
* And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than
* Natts_pg_dist_partition and when this happens, then we know that attrnum autoconverted is
* not Anum_pg_dist_partition_autoconverted anymore but tupleDesc->natts - 1.
*/
int
GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc)
{
return TupleDescSize(tupleDesc) == Natts_pg_dist_partition
? (Anum_pg_dist_partition_autoconverted - 1)
: tupleDesc->natts - 1;
}

View File

@ -2930,7 +2930,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
CatalogTupleInsert(pgDistNode, heapTuple);
CATALOG_INSERT_WITH_SNAPSHOT(pgDistNode, heapTuple);
CitusInvalidateRelcacheByRelid(DistNodeRelationId());

View File

@ -26,6 +26,7 @@
#include "commands/tablecmds.h"
#include "executor/tstoreReceiver.h"
#include "lib/stringinfo.h"
#include "nodes/nodeFuncs.h"
#include "nodes/plannodes.h"
#include "nodes/primnodes.h"
#include "nodes/print.h"
@ -73,6 +74,7 @@
#include "distributed/placement_connection.h"
#include "distributed/recursive_planning.h"
#include "distributed/remote_commands.h"
#include "distributed/subplan_execution.h"
#include "distributed/tuple_destination.h"
#include "distributed/tuplestore.h"
#include "distributed/version_compat.h"
@ -83,6 +85,7 @@
bool ExplainDistributedQueries = true;
bool ExplainAllTasks = false;
int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME;
extern MemoryContext SubPlanExplainAnalyzeContext;
/*
* If enabled, EXPLAIN ANALYZE output & other statistics of last worker task
@ -90,6 +93,11 @@ int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME;
*/
static char *SavedExplainPlan = NULL;
static double SavedExecutionDurationMillisec = 0.0;
static double SavedExplainPlanNtuples = 0;
static double SavedExplainPlanNloops = 0;
extern SubPlanExplainOutputData *SubPlanExplainOutput;
uint8 TotalExplainOutputCapacity = 0;
uint8 NumTasksOutput = 0;
/* struct to save explain flags */
typedef struct
@ -215,7 +223,8 @@ static const char * ExplainFormatStr(ExplainFormat format);
#if PG_VERSION_NUM >= PG_VERSION_17
static const char * ExplainSerializeStr(ExplainSerializeOption serializeOption);
#endif
static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest,
static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DistributedSubPlan *subPlan,
DestReceiver *dest,
ExplainState *es,
const char *queryString, ParamListInfo params,
QueryEnvironment *queryEnv,
@ -224,7 +233,9 @@ static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest,
const BufferUsage *bufusage,
const MemoryContextCounters *mem_counters,
#endif
double *executionDurationMillisec);
double *executionDurationMillisec,
double *executionTuples,
double *executionLoops);
static ExplainFormat ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName,
ExplainFormat defaultValue);
#if PG_VERSION_NUM >= PG_VERSION_17
@ -256,7 +267,8 @@ static double elapsed_time(instr_time *starttime);
static void ExplainPropertyBytes(const char *qlabel, int64 bytes, ExplainState *es);
static uint64 TaskReceivedTupleData(Task *task);
static bool ShowReceivedTupleData(CitusScanState *scanState, ExplainState *es);
static bool PlanStateAnalyzeWalker(PlanState *planState, void *ctx);
static void ExtractAnalyzeStats(DistributedSubPlan *subPlan, PlanState *planState);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(worker_last_saved_explain_analyze);
@ -432,6 +444,84 @@ NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors,
}
/*
* ExtractAnalyzeStats parses the EXPLAIN ANALYZE output of the pre-executed
* subplans and injects the parsed statistics into queryDesc->planstate->instrument.
*/
static void
ExtractAnalyzeStats(DistributedSubPlan *subPlan, PlanState *planState)
{
if (!planState)
{
return;
}
Instrumentation *instr = planState->instrument;
if (!IsA(planState, CustomScanState))
{
instr->ntuples = subPlan->ntuples;
instr->nloops = 1; /* subplan nodes are executed only once */
return;
}
Assert(IsA(planState, CustomScanState));
if (subPlan->numTasksOutput <= 0)
{
return;
}
ListCell *lc;
int tasksOutput = 0;
double tasksNtuples = 0;
double tasksNloops = 0;
memset(instr, 0, sizeof(Instrumentation));
DistributedPlan *newdistributedPlan =
((CitusScanState *) planState)->distributedPlan;
/*
* Inject the earlier executed resultsextracted from the workers' EXPLAIN output
* into the newly created tasks.
*/
foreach(lc, newdistributedPlan->workerJob->taskList)
{
Task *task = (Task *) lfirst(lc);
uint32 taskId = task->taskId;
if (tasksOutput > subPlan->numTasksOutput)
{
break;
}
if (!subPlan->totalExplainOutput[taskId].explainOutput)
{
continue;
}
/*
* Now feed the earlier saved output, which will be used
* by RemoteExplain() when printing tasks
*/
MemoryContext taskContext = GetMemoryChunkContext(task);
task->totalReceivedTupleData =
subPlan->totalExplainOutput[taskId].totalReceivedTupleData;
task->fetchedExplainAnalyzeExecutionDuration =
subPlan->totalExplainOutput[taskId].executionDuration;
task->fetchedExplainAnalyzePlan =
MemoryContextStrdup(taskContext,
subPlan->totalExplainOutput[taskId].explainOutput);
tasksNtuples += subPlan->totalExplainOutput[taskId].executionNtuples;
tasksNloops = subPlan->totalExplainOutput[taskId].executionNloops;
subPlan->totalExplainOutput[taskId].explainOutput = NULL;
tasksOutput++;
}
instr->ntuples = tasksNtuples;
instr->nloops = tasksNloops;
}
/*
* ExplainSubPlans generates EXPLAIN output for subplans for CTEs
* and complex subqueries. Because the planning for these queries
@ -450,7 +540,6 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
{
DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell);
PlannedStmt *plan = subPlan->plan;
IntoClause *into = NULL;
ParamListInfo params = NULL;
/*
@ -534,6 +623,11 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
ExplainOpenGroup("PlannedStmt", "PlannedStmt", false, es);
DestReceiver *dest = None_Receiver; /* No query execution */
double executionDurationMillisec = 0.0;
double executionTuples = 0;
double executionLoops = 0;
/* Capture memory stats on PG17+ */
#if PG_VERSION_NUM >= PG_VERSION_17
if (es->memory)
@ -541,31 +635,21 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
MemoryContextSwitchTo(saved_ctx);
MemoryContextMemConsumed(planner_ctx, &mem_counters);
}
#endif
#if PG_VERSION_NUM >= PG_VERSION_17
ExplainOnePlan(
plan,
into,
es,
queryString,
params,
NULL, /* QueryEnvironment *queryEnv */
&planduration,
(es->buffers ? &bufusage : NULL),
(es->memory ? &mem_counters : NULL)
);
/* Execute EXPLAIN without ANALYZE */
ExplainWorkerPlan(plan, subPlan, dest, es, queryString, params, NULL,
&planduration,
(es->buffers ? &bufusage : NULL),
(es->memory ? &mem_counters : NULL),
&executionDurationMillisec,
&executionTuples,
&executionLoops);
#else
ExplainOnePlan(
plan,
into,
es,
queryString,
params,
NULL, /* QueryEnvironment *queryEnv */
&planduration,
(es->buffers ? &bufusage : NULL)
);
/* Execute EXPLAIN without ANALYZE */
ExplainWorkerPlan(plan, subPlan, dest, es, queryString, params, NULL,
&planduration, &executionDurationMillisec,
&executionTuples, &executionLoops);
#endif
ExplainCloseGroup("PlannedStmt", "PlannedStmt", false, es);
@ -1236,17 +1320,19 @@ worker_last_saved_explain_analyze(PG_FUNCTION_ARGS)
if (SavedExplainPlan != NULL)
{
int columnCount = tupleDescriptor->natts;
if (columnCount != 2)
if (columnCount != 4)
{
ereport(ERROR, (errmsg("expected 3 output columns in definition of "
ereport(ERROR, (errmsg("expected 4 output columns in definition of "
"worker_last_saved_explain_analyze, but got %d",
columnCount)));
}
bool columnNulls[2] = { false };
Datum columnValues[2] = {
bool columnNulls[4] = { false };
Datum columnValues[4] = {
CStringGetTextDatum(SavedExplainPlan),
Float8GetDatum(SavedExecutionDurationMillisec)
Float8GetDatum(SavedExecutionDurationMillisec),
Float8GetDatum(SavedExplainPlanNtuples),
Float8GetDatum(SavedExplainPlanNloops)
};
tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls);
@ -1267,6 +1353,8 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
text *queryText = PG_GETARG_TEXT_P(0);
char *queryString = text_to_cstring(queryText);
double executionDurationMillisec = 0.0;
double executionTuples = 0;
double executionLoops = 0;
Datum explainOptions = PG_GETARG_DATUM(1);
ExplainState *es = NewExplainState();
@ -1383,16 +1471,19 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
}
/* do the actual EXPLAIN ANALYZE */
ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL,
ExplainWorkerPlan(plan, NULL, tupleStoreDest, es, queryString, boundParams, NULL,
&planDuration,
(es->buffers ? &bufusage : NULL),
(es->memory ? &mem_counters : NULL),
&executionDurationMillisec);
&executionDurationMillisec,
&executionTuples,
&executionLoops);
#else
/* do the actual EXPLAIN ANALYZE */
ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL,
&planDuration, &executionDurationMillisec);
ExplainWorkerPlan(plan, NULL, tupleStoreDest, es, queryString, boundParams, NULL,
&planDuration, &executionDurationMillisec,
&executionTuples, &executionLoops);
#endif
ExplainEndOutput(es);
@ -1403,6 +1494,8 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
SavedExplainPlan = pstrdup(es->str->data);
SavedExecutionDurationMillisec = executionDurationMillisec;
SavedExplainPlanNtuples = executionTuples;
SavedExplainPlanNloops = executionLoops;
MemoryContextSwitchTo(oldContext);
@ -1632,11 +1725,13 @@ CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest)
tupleDestination->originalTask = task;
tupleDestination->originalTaskDestination = taskDest;
TupleDesc lastSavedExplainAnalyzeTupDesc = CreateTemplateTupleDesc(2);
TupleDesc lastSavedExplainAnalyzeTupDesc = CreateTemplateTupleDesc(4);
TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 1, "explain analyze", TEXTOID, 0,
0);
TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 2, "duration", FLOAT8OID, 0, 0);
TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 3, "ntuples", FLOAT8OID, 0, 0);
TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 4, "nloops", FLOAT8OID, 0, 0);
tupleDestination->lastSavedExplainAnalyzeTupDesc = lastSavedExplainAnalyzeTupDesc;
@ -1647,6 +1742,51 @@ CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest)
}
/*
* EnsureExplainOutputCapacity is to ensure capacity for new entries. Input
* parameter requiredSize is minimum number of elements needed.
*/
static void
EnsureExplainOutputCapacity(int requiredSize)
{
if (requiredSize < TotalExplainOutputCapacity)
{
return;
}
int newCapacity =
(TotalExplainOutputCapacity == 0) ? 32 : TotalExplainOutputCapacity * 2;
while (newCapacity <= requiredSize)
{
newCapacity *= 2;
}
if (SubPlanExplainOutput == NULL)
{
SubPlanExplainOutput =
(SubPlanExplainOutputData *) MemoryContextAllocZero(
SubPlanExplainAnalyzeContext,
newCapacity *
sizeof(SubPlanExplainOutputData));
}
else
{
/* Use repalloc and manually zero the new memory */
int oldSize = TotalExplainOutputCapacity * sizeof(SubPlanExplainOutputData);
int newSize = newCapacity * sizeof(SubPlanExplainOutputData);
SubPlanExplainOutput =
(SubPlanExplainOutputData *) repalloc(SubPlanExplainOutput, newSize);
/* Zero out the newly allocated memory */
MemSet((char *) SubPlanExplainOutput + oldSize, 0, newSize - oldSize);
}
TotalExplainOutputCapacity = newCapacity;
}
/*
* ExplainAnalyzeDestPutTuple implements TupleDestination->putTuple
* for ExplainAnalyzeDestination.
@ -1656,6 +1796,8 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple, uint64 tupleLibpqSize)
{
uint32 taskId = task->taskId;
ExplainAnalyzeDestination *tupleDestination = (ExplainAnalyzeDestination *) self;
if (queryNumber == 0)
{
@ -1663,6 +1805,13 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
originalTupDest->putTuple(originalTupDest, task, placementIndex, 0, heapTuple,
tupleLibpqSize);
tupleDestination->originalTask->totalReceivedTupleData += tupleLibpqSize;
if (SubPlanExplainAnalyzeContext)
{
EnsureExplainOutputCapacity(taskId + 1);
SubPlanExplainOutput[taskId].totalReceivedTupleData =
tupleDestination->originalTask->totalReceivedTupleData;
}
}
else if (queryNumber == 1)
{
@ -1678,6 +1827,8 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
}
Datum executionDuration = heap_getattr(heapTuple, 2, tupDesc, &isNull);
Datum executionTuples = heap_getattr(heapTuple, 3, tupDesc, &isNull);
Datum executionLoops = heap_getattr(heapTuple, 4, tupDesc, &isNull);
if (isNull)
{
@ -1687,6 +1838,8 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
char *fetchedExplainAnalyzePlan = TextDatumGetCString(explainAnalyze);
double fetchedExplainAnalyzeExecutionDuration = DatumGetFloat8(executionDuration);
double fetchedExplainAnalyzeTuples = DatumGetFloat8(executionTuples);
double fetchedExplainAnalyzeLoops = DatumGetFloat8(executionLoops);
/*
* Allocate fetchedExplainAnalyzePlan in the same context as the Task, since we are
@ -1712,6 +1865,20 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
placementIndex;
tupleDestination->originalTask->fetchedExplainAnalyzeExecutionDuration =
fetchedExplainAnalyzeExecutionDuration;
/* We should build tupleDestination in subPlan similar to the above */
if (SubPlanExplainAnalyzeContext)
{
EnsureExplainOutputCapacity(taskId + 1);
SubPlanExplainOutput[taskId].explainOutput =
MemoryContextStrdup(SubPlanExplainAnalyzeContext,
fetchedExplainAnalyzePlan);
SubPlanExplainOutput[taskId].executionDuration =
fetchedExplainAnalyzeExecutionDuration;
SubPlanExplainOutput[taskId].executionNtuples = fetchedExplainAnalyzeTuples;
SubPlanExplainOutput[taskId].executionNloops = fetchedExplainAnalyzeLoops;
NumTasksOutput++;
}
}
else
{
@ -1774,7 +1941,14 @@ ExplainAnalyzeDestTupleDescForQuery(TupleDestination *self, int queryNumber)
bool
RequestedForExplainAnalyze(CitusScanState *node)
{
return (node->customScanState.ss.ps.state->es_instrument != 0);
/*
* When running a distributed planeither the root plan or a subplans
* distributed fragmentwe need to know if were under EXPLAIN ANALYZE.
* Subplans cant receive the EXPLAIN ANALYZE flag directly, so we use
* SubPlanExplainAnalyzeContext as a flag to indicate that context.
*/
return (node->customScanState.ss.ps.state->es_instrument != 0) ||
(SubPlanLevel > 0 && SubPlanExplainAnalyzeContext);
}
@ -1933,7 +2107,8 @@ FetchPlanQueryForExplainAnalyze(const char *queryString, ParamListInfo params)
}
appendStringInfoString(fetchQuery,
"SELECT explain_analyze_output, execution_duration "
"SELECT explain_analyze_output, execution_duration, "
"execution_ntuples, execution_nloops "
"FROM worker_last_saved_explain_analyze()");
return fetchQuery->data;
@ -2105,6 +2280,20 @@ ExplainOneQuery(Query *query, int cursorOptions,
}
/*
* PlanStateAnalyzeWalker Tree walker callback that visits each PlanState node in the
* plan tree and extracts analyze statistics from CustomScanState tasks using
* ExtractAnalyzeStats. Always returns false to recurse into all children.
*/
static bool
PlanStateAnalyzeWalker(PlanState *planState, void *ctx)
{
DistributedSubPlan *subplan = (DistributedSubPlan *) ctx;
ExtractAnalyzeStats(subplan, planState);
return false;
}
/*
* ExplainWorkerPlan produces explain output into es. If es->analyze, it also executes
* the given plannedStmt and sends the results to dest. It puts total time to execute in
@ -2119,20 +2308,25 @@ ExplainOneQuery(Query *query, int cursorOptions,
* destination.
*/
static void
ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es,
ExplainWorkerPlan(PlannedStmt *plannedstmt, DistributedSubPlan *subPlan, DestReceiver *dest, ExplainState *es,
const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv,
const instr_time *planduration,
#if PG_VERSION_NUM >= PG_VERSION_17
const BufferUsage *bufusage,
const MemoryContextCounters *mem_counters,
#endif
double *executionDurationMillisec)
double *executionDurationMillisec,
double *executionTuples,
double *executionLoops)
{
QueryDesc *queryDesc;
instr_time starttime;
double totaltime = 0;
int eflags;
int instrument_option = 0;
/* Sub-plan already executed; skipping execution */
bool executeQuery = (es->analyze && !subPlan);
bool executeSubplan = (es->analyze && subPlan);
Assert(plannedstmt->commandType != CMD_UTILITY);
@ -2174,7 +2368,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
);
/* Select execution options */
if (es->analyze)
if (executeQuery)
eflags = 0; /* default run-to-completion flags */
else
eflags = EXEC_FLAG_EXPLAIN_ONLY;
@ -2183,7 +2377,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
ExecutorStart(queryDesc, eflags);
/* Execute the plan for statistics if asked for */
if (es->analyze)
if (executeQuery)
{
ScanDirection dir = ForwardScanDirection;
@ -2206,6 +2400,12 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
ExplainOpenGroup("Query", NULL, true, es);
if (executeSubplan)
{
ExtractAnalyzeStats(subPlan, queryDesc->planstate);
planstate_tree_walker(queryDesc->planstate, PlanStateAnalyzeWalker, (void *) subPlan);
}
/* Create textual dump of plan tree */
ExplainPrintPlan(es, queryDesc);
@ -2278,6 +2478,13 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
*/
INSTR_TIME_SET_CURRENT(starttime);
if (executeQuery)
{
Instrumentation *instr = queryDesc->planstate->instrument;
*executionTuples = instr->ntuples;
*executionLoops = instr->nloops;
}
ExecutorEnd(queryDesc);
FreeQueryDesc(queryDesc);
@ -2285,7 +2492,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
PopActiveSnapshot();
/* We need a CCI just in case query expanded to multiple plans */
if (es->analyze)
if (executeQuery)
CommandCounterIncrement();
totaltime += elapsed_time(&starttime);

View File

@ -1,2 +1,3 @@
-- citus--13.1-1--13.2-1
-- bump version to 13.2-1
#include "udfs/worker_last_saved_explain_analyze/13.2-1.sql"

View File

@ -1,2 +1,5 @@
-- citus--13.2-1--13.1-1
-- downgrade version to 13.1-1
DROP FUNCTION IF EXISTS pg_catalog.worker_last_saved_explain_analyze();
#include "../udfs/worker_last_saved_explain_analyze/9.4-1.sql"

View File

@ -0,0 +1,10 @@
DROP FUNCTION pg_catalog.worker_last_saved_explain_analyze();
CREATE OR REPLACE FUNCTION pg_catalog.worker_last_saved_explain_analyze()
RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION,
execution_ntuples DOUBLE PRECISION, execution_nloops DOUBLE PRECISION)
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION pg_catalog.worker_last_saved_explain_analyze() IS
'Returns the saved explain analyze output for the last run query';

View File

@ -1,6 +1,9 @@
DROP FUNCTION pg_catalog.worker_last_saved_explain_analyze();
CREATE OR REPLACE FUNCTION pg_catalog.worker_last_saved_explain_analyze()
RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION)
RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION,
execution_ntuples DOUBLE PRECISION, execution_nloops DOUBLE PRECISION)
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION pg_catalog.worker_last_saved_explain_analyze() IS

View File

@ -106,7 +106,7 @@ LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId out
TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction);
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
CatalogTupleInsert(pgDistTransaction, heapTuple);
CATALOG_INSERT_WITH_SNAPSHOT(pgDistTransaction, heapTuple);
CommandCounterIncrement();

View File

@ -147,6 +147,31 @@ CopyNodeDistributedSubPlan(COPYFUNC_ARGS)
COPY_SCALAR_FIELD(subPlanId);
COPY_NODE_FIELD(plan);
COPY_SCALAR_FIELD(bytesSentPerWorker);
COPY_SCALAR_FIELD(remoteWorkerCount);
COPY_SCALAR_FIELD(durationMillisecs);
COPY_SCALAR_FIELD(writeLocalFile);
if (newnode->totalExplainOutput)
{
MemSet(newnode->totalExplainOutput, 0, sizeof(newnode->totalExplainOutput));
}
/* copy each SubPlanExplainOutput element */
for (int i = 0; i < from->numTasksOutput; i++)
{
/* copy the explainOutput string pointer */
COPY_STRING_FIELD(totalExplainOutput[i].explainOutput);
/* copy the executionDuration (double) */
COPY_SCALAR_FIELD(totalExplainOutput[i].executionDuration);
/* copy the totalReceivedTupleData (uint64) */
COPY_SCALAR_FIELD(totalExplainOutput[i].totalReceivedTupleData);
}
COPY_SCALAR_FIELD(numTasksOutput);
COPY_SCALAR_FIELD(ntuples);
}

View File

@ -215,6 +215,48 @@ OutDistributedSubPlan(OUTFUNC_ARGS)
WRITE_UINT_FIELD(subPlanId);
WRITE_NODE_FIELD(plan);
WRITE_UINT64_FIELD(bytesSentPerWorker);
WRITE_INT_FIELD(remoteWorkerCount);
WRITE_FLOAT_FIELD(durationMillisecs, "%.2f");
WRITE_BOOL_FIELD(writeLocalFile);
appendStringInfoString(str, " totalExplainOutput [");
for (int i = 0; i < node->numTasksOutput; i++)
{
const SubPlanExplainOutputData *e = &node->totalExplainOutput[i];
/* skip empty slots */
if (e->explainOutput == NULL &&
e->executionDuration == 0
&& e->totalReceivedTupleData == 0)
{
continue;
}
if (i > 0)
{
appendStringInfoChar(str, ' ');
}
appendStringInfoChar(str, '(');
/* string pointer prints quoted or NULL */
WRITE_STRING_FIELD(totalExplainOutput[i].explainOutput);
/* double field */
WRITE_FLOAT_FIELD(totalExplainOutput[i].executionDuration, "%.2f");
/* 64-bit unsigned use the uint64 macro */
WRITE_UINT64_FIELD(totalExplainOutput[i].totalReceivedTupleData);
appendStringInfoChar(str, ')');
}
appendStringInfoChar(str, ']');
WRITE_INT_FIELD(numTasksOutput);
WRITE_FLOAT_FIELD(ntuples, "%.2f");
}
void

View File

@ -815,13 +815,14 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId,
bool indexOK = true;
int scanKeyCount = 1;
ScanKeyData scanKey[1];
Datum values[Natts_pg_dist_partition];
bool isNull[Natts_pg_dist_partition];
bool replace[Natts_pg_dist_partition];
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
bool *isNull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId));
@ -838,10 +839,6 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId,
distributedRelationName)));
}
memset(values, 0, sizeof(values));
memset(isNull, false, sizeof(isNull));
memset(replace, false, sizeof(replace));
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
isNull[Anum_pg_dist_partition_colocationid - 1] = false;
replace[Anum_pg_dist_partition_colocationid - 1] = true;
@ -858,6 +855,10 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId,
systable_endscan(scanDescriptor);
table_close(pgDistPartition, NoLock);
pfree(values);
pfree(isNull);
pfree(replace);
bool shouldSyncMetadata = ShouldSyncTableMetadata(distributedRelationId);
if (shouldSyncMetadata && !localOnly)
{
@ -998,10 +999,12 @@ ColocationGroupTableList(uint32 colocationId, uint32 count)
indexOK, NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
while (HeapTupleIsValid(heapTuple))
{
bool isNullArray[Natts_pg_dist_partition];
Datum datumArray[Natts_pg_dist_partition];
memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum));
memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
Oid colocatedTableId = DatumGetObjectId(
datumArray[Anum_pg_dist_partition_logicalrelid - 1]);
@ -1020,6 +1023,8 @@ ColocationGroupTableList(uint32 colocationId, uint32 count)
break;
}
}
pfree(datumArray);
pfree(isNullArray);
systable_endscan(scanDescriptor);
table_close(pgDistPartition, AccessShareLock);
@ -1192,10 +1197,12 @@ ColocatedTableId(int32 colocationId)
indexOK, NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
while (HeapTupleIsValid(heapTuple))
{
bool isNullArray[Natts_pg_dist_partition];
Datum datumArray[Natts_pg_dist_partition];
memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum));
memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
colocatedTableId = DatumGetObjectId(
datumArray[Anum_pg_dist_partition_logicalrelid - 1]);
@ -1223,6 +1230,8 @@ ColocatedTableId(int32 colocationId)
heapTuple = systable_getnext(scanDescriptor);
}
pfree(datumArray);
pfree(isNullArray);
systable_endscan(scanDescriptor);
table_close(pgDistPartition, AccessShareLock);

View File

@ -466,4 +466,5 @@ extern bool IsBackgroundJobStatusTerminal(BackgroundJobStatus status);
extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status);
extern Oid BackgroundJobStatusOid(BackgroundJobStatus status);
extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status);
extern int GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDEsc);
#endif /* METADATA_UTILITY_H */

View File

@ -146,8 +146,8 @@ extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamLis
DestReceiver *dest);
extern void ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params,
DestReceiver *dest);
extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
DestReceiver *dest);
extern uint64 ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
DestReceiver *dest);
extern void SetLocalMultiShardModifyModeToSequential(void);
extern void EnsureSequentialMode(ObjectType objType);
extern void SetLocalForceMaxQueryParallelization(void);

View File

@ -490,6 +490,24 @@ typedef struct DistributedPlan
} DistributedPlan;
/*
* SubPlanExplainOutputData Holds the EXPLAIN ANALYZE output and collected
* statistics for a single task executed by a worker during distributed
* query execution.
* explainOutput raw EXPLAIN ANALYZE output for the task
* executionDuration wallclock time taken to run the task
* totalReceivedTupleData total bytes of tuple data received from the worker
*/
typedef struct SubPlanExplainOutputData
{
char *explainOutput;
double executionDuration;
double executionNtuples;
double executionNloops;
uint64 totalReceivedTupleData;
} SubPlanExplainOutputData;
/*
* DistributedSubPlan contains a subplan of a distributed plan. Subplans are
* executed before the distributed query and their results are written to
@ -508,6 +526,9 @@ typedef struct DistributedSubPlan
uint32 remoteWorkerCount;
double durationMillisecs;
bool writeLocalFile;
SubPlanExplainOutputData *totalExplainOutput;
uint32 numTasksOutput; /* actual size of the above array */
double ntuples; /* total tuples produced */
} DistributedSubPlan;

View File

@ -17,7 +17,7 @@
extern int MaxIntermediateResult;
extern int SubPlanLevel;
extern void ExecuteSubPlans(DistributedPlan *distributedPlan);
extern void ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled);
/**
* IntermediateResultsHashEntry is used to store which nodes need to receive

View File

@ -13,6 +13,10 @@
#include "pg_version_constants.h"
/* we need these for PG-18s PushActiveSnapshot/PopActiveSnapshot APIs */
#include "access/xact.h"
#include "utils/snapmgr.h"
#if PG_VERSION_NUM >= PG_VERSION_18
#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, k) \
create_foreignscan_path( \
@ -36,6 +40,14 @@
/* PG-18 unified row-compare operator codes under COMPARE_* */
#define ROWCOMPARE_NE COMPARE_NE
#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \
do { \
Snapshot __snap = GetTransactionSnapshot(); \
PushActiveSnapshot(__snap); \
CatalogTupleInsert((rel), (tup)); \
PopActiveSnapshot(); \
} while (0)
#elif PG_VERSION_NUM >= PG_VERSION_17
#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, k) \
create_foreignscan_path( \
@ -43,6 +55,10 @@
(e), (f), \
(g), (h), (i), (j), (k) \
)
/* no-op wrapper on older PGs */
#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \
CatalogTupleInsert((rel), (tup))
#endif
#if PG_VERSION_NUM >= PG_VERSION_17
@ -453,6 +469,10 @@ getStxstattarget_compat(HeapTuple tup)
k) create_foreignscan_path(a, b, c, d, e, f, g, h, \
i, k)
/* no-op wrapper on older PGs */
#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \
CatalogTupleInsert((rel), (tup))
#define getProcNo_compat(a) (a->pgprocno)
#define getLxid_compat(a) (a->lxid)

View File

@ -2492,15 +2492,15 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 100 bytes
Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=20 loops=1)
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Task Count: 4
Tuple data received from nodes: 160 bytes
Tuple data received from nodes: 80 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 64 bytes
Tuple data received from node: 32 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1)
-> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1)
-> Insert on dist_table_570017 citus_table_alias (actual rows=4 loops=1)
-> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1)
Filter: (a IS NOT NULL)
-> Distributed Subplan XXX_2
Intermediate Data Size: 150 bytes
@ -3228,6 +3228,159 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
-> Update on tbl_570036 tbl (actual rows=0 loops=1)
-> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1)
Filter: (a = 1)
-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212)
SET search_path TO multi_explain;
CREATE TABLE test_subplans (x int primary key, y int);
SELECT create_distributed_table('test_subplans','x');
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 18 bytes
Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 16 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 16 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Insert on test_subplans_570038 (actual rows=1 loops=1)
-> Result (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 8 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 8 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1)
-- Only one row must exist
SELECT * FROM test_subplans;
1|2
-- Will fail with duplicate pk
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
ERROR: duplicate key value violates unique constraint "test_subplans_pkey_570038"
DETAIL: Key (x)=(1) already exists.
CONTEXT: while executing command on localhost:xxxxx
-- Test JSON format
TRUNCATE test_subplans;
EXPLAIN (FORMAT JSON, COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
[
{
"Plan": {
"Node Type": "Custom Scan",
"Custom Plan Provider": "Citus Adaptive",
"Parallel Aware": false,
"Async Capable": false,
"Actual Rows": 1,
"Actual Loops": 1,
"Distributed Query": {
"Subplans": [
{
"Intermediate Data Size": "18 bytes",
"Result destination": "Write locally",
"PlannedStmt": [
{
"Plan": {
"Node Type": "Custom Scan",
"Custom Plan Provider": "Citus Adaptive",
"Parallel Aware": false,
"Async Capable": false,
"Actual Rows": 1,
"Actual Loops": 1,
"Distributed Query": {
"Job": {
"Task Count": 1,
"Tuple data received from nodes": "16 bytes",
"Tasks Shown": "All",
"Tasks": [
{
"Tuple data received from node": "16 bytes",
"Node": "host=localhost port=xxxxx dbname=regression",
"Remote Plan": [
[
{
"Plan": {
"Node Type": "ModifyTable",
"Operation": "Insert",
"Parallel Aware": false,
"Async Capable": false,
"Relation Name": "test_subplans_570038",
"Alias": "test_subplans_570038",
"Actual Rows": 1,
"Actual Loops": 1,
"Plans": [
{
"Node Type": "Result",
"Parent Relationship": "Outer",
"Parallel Aware": false,
"Async Capable": false,
"Actual Rows": 1,
"Actual Loops": 1
}
]
},
"Triggers": [
]
}
]
]
}
]
}
}
},
"Triggers": [
]
}
]
}
],
"Job": {
"Task Count": 1,
"Tuple data received from nodes": "8 bytes",
"Tasks Shown": "All",
"Tasks": [
{
"Tuple data received from node": "8 bytes",
"Node": "host=localhost port=xxxxx dbname=regression",
"Remote Plan": [
[
{
"Plan": {
"Node Type": "Function Scan",
"Parallel Aware": false,
"Async Capable": false,
"Function Name": "read_intermediate_result",
"Alias": "intermediate_result",
"Actual Rows": 1,
"Actual Loops": 1
},
"Triggers": [
]
}
]
]
}
]
}
}
},
"Triggers": [
]
}
]
-- Only one row must exist
SELECT * FROM test_subplans;
1|2
-- check when auto explain + analyze is enabled, we do not allow local execution.
CREATE SCHEMA test_auto_explain;
SET search_path TO 'test_auto_explain';

View File

@ -2484,15 +2484,15 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 100 bytes
Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=20 loops=1)
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Task Count: 4
Tuple data received from nodes: 160 bytes
Tuple data received from nodes: 80 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 64 bytes
Tuple data received from node: 32 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1)
-> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1)
-> Insert on dist_table_570017 citus_table_alias (actual rows=4 loops=1)
-> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1)
Filter: (a IS NOT NULL)
-> Distributed Subplan XXX_2
Intermediate Data Size: 150 bytes
@ -3217,6 +3217,159 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
-> Update on tbl_570036 tbl (actual rows=0 loops=1)
-> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1)
Filter: (a = 1)
-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212)
SET search_path TO multi_explain;
CREATE TABLE test_subplans (x int primary key, y int);
SELECT create_distributed_table('test_subplans','x');
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 18 bytes
Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 16 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 16 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Insert on test_subplans_570038 (actual rows=1 loops=1)
-> Result (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 8 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 8 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1)
-- Only one row must exist
SELECT * FROM test_subplans;
1|2
-- Will fail with duplicate pk
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
ERROR: duplicate key value violates unique constraint "test_subplans_pkey_570038"
DETAIL: Key (x)=(1) already exists.
CONTEXT: while executing command on localhost:xxxxx
-- Test JSON format
TRUNCATE test_subplans;
EXPLAIN (FORMAT JSON, COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
[
{
"Plan": {
"Node Type": "Custom Scan",
"Custom Plan Provider": "Citus Adaptive",
"Parallel Aware": false,
"Async Capable": false,
"Actual Rows": 1,
"Actual Loops": 1,
"Distributed Query": {
"Subplans": [
{
"Intermediate Data Size": "18 bytes",
"Result destination": "Write locally",
"PlannedStmt": [
{
"Plan": {
"Node Type": "Custom Scan",
"Custom Plan Provider": "Citus Adaptive",
"Parallel Aware": false,
"Async Capable": false,
"Actual Rows": 1,
"Actual Loops": 1,
"Distributed Query": {
"Job": {
"Task Count": 1,
"Tuple data received from nodes": "16 bytes",
"Tasks Shown": "All",
"Tasks": [
{
"Tuple data received from node": "16 bytes",
"Node": "host=localhost port=xxxxx dbname=regression",
"Remote Plan": [
[
{
"Plan": {
"Node Type": "ModifyTable",
"Operation": "Insert",
"Parallel Aware": false,
"Async Capable": false,
"Relation Name": "test_subplans_570038",
"Alias": "test_subplans_570038",
"Actual Rows": 1,
"Actual Loops": 1,
"Plans": [
{
"Node Type": "Result",
"Parent Relationship": "Outer",
"Parallel Aware": false,
"Async Capable": false,
"Actual Rows": 1,
"Actual Loops": 1
}
]
},
"Triggers": [
]
}
]
]
}
]
}
}
},
"Triggers": [
]
}
]
}
],
"Job": {
"Task Count": 1,
"Tuple data received from nodes": "8 bytes",
"Tasks Shown": "All",
"Tasks": [
{
"Tuple data received from node": "8 bytes",
"Node": "host=localhost port=xxxxx dbname=regression",
"Remote Plan": [
[
{
"Plan": {
"Node Type": "Function Scan",
"Parallel Aware": false,
"Async Capable": false,
"Function Name": "read_intermediate_result",
"Alias": "intermediate_result",
"Actual Rows": 1,
"Actual Loops": 1
},
"Triggers": [
]
}
]
]
}
]
}
}
},
"Triggers": [
]
}
]
-- Only one row must exist
SELECT * FROM test_subplans;
1|2
-- check when auto explain + analyze is enabled, we do not allow local execution.
CREATE SCHEMA test_auto_explain;
SET search_path TO 'test_auto_explain';

View File

@ -1503,7 +1503,9 @@ ALTER EXTENSION citus UPDATE TO '13.2-1';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
(0 rows)
function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision) |
| function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision, execution_ntuples double precision, execution_nloops double precision)
(2 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -721,13 +721,11 @@ CALL exec_query_and_check_query_counters($$
0, 0
);
-- same with explain analyze
--
-- this time, query_execution_multi_shard is incremented twice because of #4212
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
$$,
1, 2
1, 1
);
CALL exec_query_and_check_query_counters($$
DELETE FROM dist_table WHERE a = 1
@ -1041,9 +1039,6 @@ PL/pgSQL function exec_query_and_check_query_counters(text,bigint,bigint) line X
-- A similar one but without the insert, so we would normally expect 2 increments
-- for query_execution_single_shard and 2 for query_execution_multi_shard instead
-- of 3 since the insert is not there anymore.
--
-- But this time we observe more counter increments because we execute the subplans
-- twice because of #4212.
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
-- single-shard subplan (whole cte)
@ -1057,7 +1052,7 @@ CALL exec_query_and_check_query_counters($$
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q)
JOIN cte ON q.a = cte.a
$$,
3, 4
2, 2
);
-- safe to push-down
CALL exec_query_and_check_query_counters($$

View File

@ -1166,6 +1166,32 @@ PREPARE q2(int_wrapper_type) AS WITH a AS (UPDATE tbl SET b = $1 WHERE a = 1 RET
EXPLAIN (COSTS false) EXECUTE q2('(1)');
EXPLAIN :default_analyze_flags EXECUTE q2('(1)');
-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212)
SET search_path TO multi_explain;
CREATE TABLE test_subplans (x int primary key, y int);
SELECT create_distributed_table('test_subplans','x');
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
-- Only one row must exist
SELECT * FROM test_subplans;
-- Will fail with duplicate pk
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
-- Test JSON format
TRUNCATE test_subplans;
EXPLAIN (FORMAT JSON, COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
-- Only one row must exist
SELECT * FROM test_subplans;
-- check when auto explain + analyze is enabled, we do not allow local execution.
CREATE SCHEMA test_auto_explain;
SET search_path TO 'test_auto_explain';

View File

@ -476,13 +476,11 @@ CALL exec_query_and_check_query_counters($$
);
-- same with explain analyze
--
-- this time, query_execution_multi_shard is incremented twice because of #4212
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
$$,
1, 2
1, 1
);
CALL exec_query_and_check_query_counters($$
@ -807,9 +805,6 @@ CALL exec_query_and_check_query_counters($$
-- A similar one but without the insert, so we would normally expect 2 increments
-- for query_execution_single_shard and 2 for query_execution_multi_shard instead
-- of 3 since the insert is not there anymore.
--
-- But this time we observe more counter increments because we execute the subplans
-- twice because of #4212.
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
-- single-shard subplan (whole cte)
@ -823,7 +818,7 @@ CALL exec_query_and_check_query_counters($$
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q)
JOIN cte ON q.a = cte.a
$$,
3, 4
2, 2
);
-- safe to push-down