mirror of https://github.com/citusdata/citus.git
Merge 86b6c46b2f
into 55a0d1f730
commit
d7c7bff5c8
|
@ -630,6 +630,7 @@ typedef struct TaskPlacementExecution
|
|||
instr_time endTime;
|
||||
} TaskPlacementExecution;
|
||||
|
||||
extern MemoryContext SubPlanExplainAnalyzeContext;
|
||||
|
||||
/* local functions */
|
||||
static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel,
|
||||
|
@ -760,7 +761,7 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
|
|||
*/
|
||||
LockPartitionsForDistributedPlan(distributedPlan);
|
||||
|
||||
ExecuteSubPlans(distributedPlan);
|
||||
ExecuteSubPlans(distributedPlan, RequestedForExplainAnalyze(scanState));
|
||||
|
||||
scanState->finishedPreScan = true;
|
||||
}
|
||||
|
@ -808,7 +809,13 @@ AdaptiveExecutor(CitusScanState *scanState)
|
|||
|
||||
bool localExecutionSupported = true;
|
||||
|
||||
if (RequestedForExplainAnalyze(scanState))
|
||||
/*
|
||||
* When running a distributed plan—either the root plan or a subplan’s
|
||||
* distributed fragment—we need to know if we’re under EXPLAIN ANALYZE.
|
||||
* Subplans can’t receive the EXPLAIN ANALYZE flag directly, so we use
|
||||
* SubPlanExplainAnalyzeContext as a flag to indicate that context.
|
||||
*/
|
||||
if (RequestedForExplainAnalyze(scanState) || SubPlanExplainAnalyzeContext)
|
||||
{
|
||||
/*
|
||||
* We use multiple queries per task in EXPLAIN ANALYZE which need to
|
||||
|
|
|
@ -121,7 +121,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
|||
bool binaryFormat =
|
||||
CanUseBinaryCopyFormatForTargetList(selectQuery->targetList);
|
||||
|
||||
ExecuteSubPlans(distSelectPlan);
|
||||
ExecuteSubPlans(distSelectPlan, false);
|
||||
|
||||
/*
|
||||
* We have a separate directory for each transaction, so choosing
|
||||
|
|
|
@ -132,7 +132,7 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
|
|||
ereport(DEBUG1, (errmsg("Executing subplans of the source query and "
|
||||
"storing the results at the respective node(s)")));
|
||||
|
||||
ExecuteSubPlans(distSourcePlan);
|
||||
ExecuteSubPlans(distSourcePlan, false);
|
||||
|
||||
/*
|
||||
* We have a separate directory for each transaction, so choosing
|
||||
|
|
|
@ -30,13 +30,21 @@ int MaxIntermediateResult = 1048576; /* maximum size in KB the intermediate resu
|
|||
/* when this is true, we enforce intermediate result size limit in all executors */
|
||||
int SubPlanLevel = 0;
|
||||
|
||||
/*
|
||||
* SubPlanExplainAnalyzeContext is both a memory context for storing
|
||||
* subplans’ EXPLAIN ANALYZE output and a flag indicating that execution
|
||||
* is running under EXPLAIN ANALYZE for subplans.
|
||||
*/
|
||||
MemoryContext SubPlanExplainAnalyzeContext = NULL;
|
||||
SubPlanExplainOutput *SubPlanExplainAnalyzeOutput;
|
||||
extern int NumTasksOutput;
|
||||
|
||||
/*
|
||||
* ExecuteSubPlans executes a list of subplans from a distributed plan
|
||||
* by sequentially executing each plan from the top.
|
||||
*/
|
||||
void
|
||||
ExecuteSubPlans(DistributedPlan *distributedPlan)
|
||||
ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled)
|
||||
{
|
||||
uint64 planId = distributedPlan->planId;
|
||||
List *subPlanList = distributedPlan->subPlanList;
|
||||
|
@ -47,6 +55,15 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
|
|||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* If the root DistributedPlan has EXPLAIN ANALYZE enabled,
|
||||
* its subplans should also have EXPLAIN ANALYZE enabled.
|
||||
*/
|
||||
if (explainAnalyzeEnabled)
|
||||
{
|
||||
SubPlanExplainAnalyzeContext = GetMemoryChunkContext(distributedPlan);
|
||||
}
|
||||
|
||||
HTAB *intermediateResultsHash = MakeIntermediateResultHTAB();
|
||||
RecordSubplanExecutionsOnNodes(intermediateResultsHash, distributedPlan);
|
||||
|
||||
|
@ -61,6 +78,13 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
|
|||
DistributedSubPlan *subPlan = NULL;
|
||||
foreach_declared_ptr(subPlan, subPlanList)
|
||||
{
|
||||
/*
|
||||
* Save the EXPLAIN ANALYZE output(s) to be extracted later
|
||||
* in ExplainSubPlans()
|
||||
*/
|
||||
MemSet(subPlan->totalExplainOutput, 0, sizeof(subPlan->totalExplainOutput));
|
||||
SubPlanExplainAnalyzeOutput = subPlan->totalExplainOutput;
|
||||
|
||||
PlannedStmt *plannedStmt = subPlan->plan;
|
||||
uint32 subPlanId = subPlan->subPlanId;
|
||||
ParamListInfo params = NULL;
|
||||
|
@ -98,6 +122,13 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
|
|||
subPlan->writeLocalFile = entry->writeLocalFile;
|
||||
|
||||
SubPlanLevel--;
|
||||
subPlan->numTasksOutput = NumTasksOutput;
|
||||
NumTasksOutput = 0;
|
||||
FreeExecutorState(estate);
|
||||
}
|
||||
|
||||
if (explainAnalyzeEnabled)
|
||||
{
|
||||
SubPlanExplainAnalyzeContext = NULL;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,6 +78,7 @@
|
|||
bool ExplainDistributedQueries = true;
|
||||
bool ExplainAllTasks = false;
|
||||
int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME;
|
||||
extern MemoryContext SubPlanExplainAnalyzeContext;
|
||||
|
||||
/*
|
||||
* If enabled, EXPLAIN ANALYZE output & other statistics of last worker task
|
||||
|
@ -85,6 +86,8 @@ int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME;
|
|||
*/
|
||||
static char *SavedExplainPlan = NULL;
|
||||
static double SavedExecutionDurationMillisec = 0.0;
|
||||
extern SubPlanExplainOutput *SubPlanExplainAnalyzeOutput;
|
||||
int NumTasksOutput = 0;
|
||||
|
||||
/* struct to save explain flags */
|
||||
typedef struct
|
||||
|
@ -251,6 +254,7 @@ static double elapsed_time(instr_time *starttime);
|
|||
static void ExplainPropertyBytes(const char *qlabel, int64 bytes, ExplainState *es);
|
||||
static uint64 TaskReceivedTupleData(Task *task);
|
||||
static bool ShowReceivedTupleData(CitusScanState *scanState, ExplainState *es);
|
||||
static void ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr);
|
||||
|
||||
|
||||
/* exports for SQL callable functions */
|
||||
|
@ -453,25 +457,9 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
|
|||
* for now we put an empty string, which is valid according to postgres.
|
||||
*/
|
||||
char *queryString = pstrdup("");
|
||||
instr_time planduration;
|
||||
BufferUsage bufusage_start,
|
||||
bufusage;
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
MemoryContextCounters mem_counters;
|
||||
MemoryContext planner_ctx = NULL;
|
||||
MemoryContext saved_ctx = NULL;
|
||||
|
||||
if (es->memory)
|
||||
{
|
||||
/* copy paste from postgres code */
|
||||
planner_ctx = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"explain analyze planner context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
saved_ctx = MemoryContextSwitchTo(planner_ctx);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (es->buffers)
|
||||
{
|
||||
bufusage_start = pgBufferUsage;
|
||||
|
@ -518,8 +506,6 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
|
|||
ExplainPropertyText("Result destination", destination->data, es);
|
||||
}
|
||||
|
||||
INSTR_TIME_SET_ZERO(planduration);
|
||||
|
||||
/* calc differences of buffer counters. */
|
||||
if (es->buffers)
|
||||
{
|
||||
|
@ -529,21 +515,100 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
|
|||
|
||||
ExplainOpenGroup("PlannedStmt", "PlannedStmt", false, es);
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
if (es->memory)
|
||||
/* Print only and not execute */
|
||||
DestReceiver *dest;
|
||||
if (into)
|
||||
{
|
||||
MemoryContextSwitchTo(saved_ctx);
|
||||
MemoryContextMemConsumed(planner_ctx, &mem_counters);
|
||||
dest = CreateIntoRelDestReceiver(into);
|
||||
}
|
||||
else
|
||||
{
|
||||
dest = None_Receiver;
|
||||
}
|
||||
|
||||
ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration,
|
||||
(es->buffers ? &bufusage : NULL),
|
||||
(es->memory ? &mem_counters : NULL));
|
||||
#else
|
||||
ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration,
|
||||
(es->buffers ? &bufusage : NULL));
|
||||
#endif
|
||||
int instrument_option = 0;
|
||||
|
||||
if (es->analyze && es->timing)
|
||||
{
|
||||
instrument_option |= INSTRUMENT_TIMER;
|
||||
}
|
||||
else if (es->analyze)
|
||||
{
|
||||
instrument_option |= INSTRUMENT_ROWS;
|
||||
}
|
||||
|
||||
if (es->buffers)
|
||||
{
|
||||
instrument_option |= INSTRUMENT_BUFFERS;
|
||||
}
|
||||
if (es->wal)
|
||||
{
|
||||
instrument_option |= INSTRUMENT_WAL;
|
||||
}
|
||||
|
||||
/* Create a QueryDesc for the query */
|
||||
QueryDesc *queryDesc =
|
||||
CreateQueryDesc(plan, queryString, GetActiveSnapshot(),
|
||||
InvalidSnapshot, dest, params, NULL, instrument_option);
|
||||
|
||||
ExecutorStart(queryDesc, EXEC_FLAG_EXPLAIN_ONLY);
|
||||
|
||||
/* Inject the earlier executed results into the newly created tasks */
|
||||
|
||||
if (subPlan->numTasksOutput && (queryDesc->planstate != NULL) &&
|
||||
IsA(queryDesc->planstate, CustomScanState))
|
||||
{
|
||||
DistributedPlan *newdistributedPlan =
|
||||
((CitusScanState *) queryDesc->planstate)->distributedPlan;
|
||||
|
||||
ListCell *lc;
|
||||
int idx = 0;
|
||||
|
||||
/* We need to extract this from the explain output of workers */
|
||||
Instrumentation instr = { 0 };
|
||||
foreach(lc, newdistributedPlan->workerJob->taskList)
|
||||
{
|
||||
if (subPlan->totalExplainOutput[idx].explainOutput &&
|
||||
idx < subPlan->numTasksOutput)
|
||||
{
|
||||
/*
|
||||
* Now feed the earlier saved output, which will be used
|
||||
* by RemoteExplain() when printing tasks
|
||||
*/
|
||||
Task *task = (Task *) lfirst(lc);
|
||||
MemoryContext taskContext = GetMemoryChunkContext(task);
|
||||
|
||||
task->totalReceivedTupleData =
|
||||
subPlan->totalExplainOutput[idx].totalReceivedTupleData;
|
||||
task->fetchedExplainAnalyzeExecutionDuration =
|
||||
subPlan->totalExplainOutput[idx].executionDuration;
|
||||
task->fetchedExplainAnalyzePlan =
|
||||
MemoryContextStrdup(taskContext,
|
||||
subPlan->totalExplainOutput[idx].explainOutput);
|
||||
ParseExplainAnalyzeOutput(task->fetchedExplainAnalyzePlan, &instr);
|
||||
|
||||
subPlan->totalExplainOutput[idx].explainOutput = NULL;
|
||||
}
|
||||
|
||||
idx++;
|
||||
}
|
||||
instr.nloops = 1;
|
||||
queryDesc->planstate->instrument = &instr;
|
||||
}
|
||||
|
||||
ExplainOpenGroup("Query", NULL, true, es);
|
||||
|
||||
ExplainPrintPlan(es, queryDesc);
|
||||
|
||||
if (es->analyze)
|
||||
{
|
||||
ExplainPrintTriggers(es, queryDesc);
|
||||
}
|
||||
|
||||
ExecutorEnd(queryDesc);
|
||||
FreeQueryDesc(queryDesc);
|
||||
|
||||
ExplainCloseGroup("Query", NULL, true, es);
|
||||
ExplainCloseGroup("PlannedStmt", "PlannedStmt", false, es);
|
||||
ExplainCloseGroup("Subplan", NULL, true, es);
|
||||
|
||||
|
@ -1621,6 +1686,11 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
|
|||
originalTupDest->putTuple(originalTupDest, task, placementIndex, 0, heapTuple,
|
||||
tupleLibpqSize);
|
||||
tupleDestination->originalTask->totalReceivedTupleData += tupleLibpqSize;
|
||||
if (SubPlanExplainAnalyzeContext && NumTasksOutput < MAX_ANALYZE_OUTPUT)
|
||||
{
|
||||
SubPlanExplainAnalyzeOutput[NumTasksOutput].totalReceivedTupleData =
|
||||
tupleDestination->originalTask->totalReceivedTupleData;
|
||||
}
|
||||
}
|
||||
else if (queryNumber == 1)
|
||||
{
|
||||
|
@ -1670,6 +1740,17 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
|
|||
placementIndex;
|
||||
tupleDestination->originalTask->fetchedExplainAnalyzeExecutionDuration =
|
||||
fetchedExplainAnalyzeExecutionDuration;
|
||||
|
||||
/* We should build tupleDestination in subPlan similar to the above */
|
||||
if (SubPlanExplainAnalyzeContext && NumTasksOutput < MAX_ANALYZE_OUTPUT)
|
||||
{
|
||||
SubPlanExplainAnalyzeOutput[NumTasksOutput].explainOutput =
|
||||
MemoryContextStrdup(SubPlanExplainAnalyzeContext,
|
||||
fetchedExplainAnalyzePlan);
|
||||
SubPlanExplainAnalyzeOutput[NumTasksOutput].executionDuration =
|
||||
fetchedExplainAnalyzeExecutionDuration;
|
||||
NumTasksOutput++;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -2248,6 +2329,94 @@ elapsed_time(instr_time *starttime)
|
|||
}
|
||||
|
||||
|
||||
static void
|
||||
ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr)
|
||||
{
|
||||
char *token, *saveptr;
|
||||
|
||||
/* Validate input */
|
||||
if (explainOutput == NULL || instr == NULL)
|
||||
return;
|
||||
|
||||
char *line = pstrdup(explainOutput);
|
||||
|
||||
bool inWal = false;
|
||||
bool inResult = false;
|
||||
|
||||
/* split on spaces, parentheses or newlines */
|
||||
for (token = strtok_r(line, " ()\n", &saveptr);
|
||||
token != NULL;
|
||||
token = strtok_r(NULL, " ()\n", &saveptr))
|
||||
{
|
||||
if (strcmp(token, "WAL:") == 0)
|
||||
{
|
||||
inWal = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strcmp(token, "Result") == 0)
|
||||
{
|
||||
inResult = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Reset Result flag when we see "actual" - but only skip if we're in Result mode */
|
||||
if (strcmp(token, "actual") == 0)
|
||||
{
|
||||
/* If we were in Result mode, the next tokens should be skipped */
|
||||
/* If we weren't in Result mode, continue normally */
|
||||
continue;
|
||||
}
|
||||
|
||||
if (inWal)
|
||||
{
|
||||
if (strncmp(token, "records=", 8) == 0)
|
||||
instr->walusage.wal_records += strtoul(token + 8, NULL, 10);
|
||||
else if (strncmp(token, "bytes=", 6) == 0)
|
||||
{
|
||||
instr->walusage.wal_bytes += strtoul(token + 6, NULL, 10);
|
||||
/* once we've seen bytes=, we can leave WAL mode */
|
||||
inWal = false;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Skip Result node's actual timing data */
|
||||
if (inResult)
|
||||
{
|
||||
if (strncmp(token, "time=", 5) == 0 ||
|
||||
strncmp(token, "rows=", 5) == 0 ||
|
||||
strncmp(token, "loops=", 6) == 0)
|
||||
{
|
||||
/* If this is loops=, we've seen all Result data */
|
||||
if (strncmp(token, "loops=", 6) == 0)
|
||||
inResult = false;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (strncmp(token, "time=", 5) == 0)
|
||||
{
|
||||
/* token is "time=X..Y" */
|
||||
char *timeStr = token + 5;
|
||||
char *doubleDot = strstr(timeStr, "..");
|
||||
if (doubleDot)
|
||||
{
|
||||
*doubleDot = '\0';
|
||||
instr->startup += strtod(timeStr, NULL) / 1000.0;
|
||||
instr->total += strtod(doubleDot + 2, NULL) / 1000.0;
|
||||
}
|
||||
}
|
||||
else if (strncmp(token, "rows=", 5) == 0)
|
||||
{
|
||||
instr->ntuples += strtol(token + 5, NULL, 10);
|
||||
break; /* We are done for this Task */
|
||||
}
|
||||
}
|
||||
|
||||
pfree(line);
|
||||
}
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
/*
|
||||
* Return whether show_buffer_usage would have anything to print, if given
|
||||
|
|
|
@ -147,6 +147,27 @@ CopyNodeDistributedSubPlan(COPYFUNC_ARGS)
|
|||
|
||||
COPY_SCALAR_FIELD(subPlanId);
|
||||
COPY_NODE_FIELD(plan);
|
||||
COPY_SCALAR_FIELD(bytesSentPerWorker);
|
||||
COPY_SCALAR_FIELD(remoteWorkerCount);
|
||||
COPY_SCALAR_FIELD(durationMillisecs);
|
||||
COPY_SCALAR_FIELD(writeLocalFile);
|
||||
|
||||
MemSet(newnode->totalExplainOutput, 0, sizeof(newnode->totalExplainOutput));
|
||||
|
||||
/* copy each SubPlanExplainOutput element */
|
||||
for (int i = 0; i < MAX_ANALYZE_OUTPUT; i++)
|
||||
{
|
||||
/* copy the explainOutput string pointer */
|
||||
COPY_STRING_FIELD(totalExplainOutput[i].explainOutput);
|
||||
|
||||
/* copy the executionDuration (double) */
|
||||
COPY_SCALAR_FIELD(totalExplainOutput[i].executionDuration);
|
||||
|
||||
/* copy the totalReceivedTupleData (uint64) */
|
||||
COPY_SCALAR_FIELD(totalExplainOutput[i].totalReceivedTupleData);
|
||||
}
|
||||
|
||||
COPY_SCALAR_FIELD(numTasksOutput);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -215,6 +215,46 @@ OutDistributedSubPlan(OUTFUNC_ARGS)
|
|||
|
||||
WRITE_UINT_FIELD(subPlanId);
|
||||
WRITE_NODE_FIELD(plan);
|
||||
WRITE_UINT64_FIELD(bytesSentPerWorker);
|
||||
WRITE_INT_FIELD(remoteWorkerCount);
|
||||
WRITE_FLOAT_FIELD(durationMillisecs, "%.2f");
|
||||
WRITE_BOOL_FIELD(writeLocalFile);
|
||||
|
||||
appendStringInfoString(str, " totalExplainOutput [");
|
||||
for (int i = 0; i < MAX_ANALYZE_OUTPUT; i++)
|
||||
{
|
||||
const SubPlanExplainOutput *e = &node->totalExplainOutput[i];
|
||||
|
||||
/* skip empty slots */
|
||||
if (e->explainOutput == NULL &&
|
||||
e->executionDuration == 0
|
||||
&& e->totalReceivedTupleData == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (i > 0)
|
||||
{
|
||||
appendStringInfoChar(str, ' ');
|
||||
}
|
||||
|
||||
appendStringInfoChar(str, '(');
|
||||
|
||||
/* string pointer – prints quoted or NULL */
|
||||
WRITE_STRING_FIELD(totalExplainOutput[i].explainOutput);
|
||||
|
||||
/* double field */
|
||||
WRITE_FLOAT_FIELD(totalExplainOutput[i].executionDuration, "%.2f");
|
||||
|
||||
/* 64-bit unsigned – use the uint64 macro */
|
||||
WRITE_UINT64_FIELD(totalExplainOutput[i].totalReceivedTupleData);
|
||||
|
||||
appendStringInfoChar(str, ')');
|
||||
}
|
||||
|
||||
WRITE_INT_FIELD(numTasksOutput);
|
||||
|
||||
appendStringInfoChar(str, ']');
|
||||
}
|
||||
|
||||
void
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
|
||||
#define CURSOR_OPT_FORCE_DISTRIBUTED 0x080000
|
||||
|
||||
#define MAX_ANALYZE_OUTPUT 32
|
||||
|
||||
/* level of planner calls */
|
||||
extern int PlannerLevel;
|
||||
|
|
|
@ -474,6 +474,17 @@ typedef struct DistributedPlan
|
|||
} DistributedPlan;
|
||||
|
||||
|
||||
/*
|
||||
*
|
||||
*/
|
||||
typedef struct SubPlanExplainOutput
|
||||
{
|
||||
char *explainOutput;
|
||||
double executionDuration;
|
||||
uint64 totalReceivedTupleData;
|
||||
} SubPlanExplainOutput;
|
||||
|
||||
|
||||
/*
|
||||
* DistributedSubPlan contains a subplan of a distributed plan. Subplans are
|
||||
* executed before the distributed query and their results are written to
|
||||
|
@ -492,6 +503,8 @@ typedef struct DistributedSubPlan
|
|||
uint32 remoteWorkerCount;
|
||||
double durationMillisecs;
|
||||
bool writeLocalFile;
|
||||
SubPlanExplainOutput totalExplainOutput[MAX_ANALYZE_OUTPUT];
|
||||
uint32 numTasksOutput;
|
||||
} DistributedSubPlan;
|
||||
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
extern int MaxIntermediateResult;
|
||||
extern int SubPlanLevel;
|
||||
|
||||
extern void ExecuteSubPlans(DistributedPlan *distributedPlan);
|
||||
extern void ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled);
|
||||
|
||||
/**
|
||||
* IntermediateResultsHashEntry is used to store which nodes need to receive
|
||||
|
|
|
@ -2492,12 +2492,12 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
|
|||
-> Distributed Subplan XXX_1
|
||||
Intermediate Data Size: 100 bytes
|
||||
Result destination: Write locally
|
||||
-> Custom Scan (Citus Adaptive) (actual rows=20 loops=1)
|
||||
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
|
||||
Task Count: 4
|
||||
Tuple data received from nodes: 160 bytes
|
||||
Tuple data received from nodes: 80 bytes
|
||||
Tasks Shown: One of 4
|
||||
-> Task
|
||||
Tuple data received from node: 64 bytes
|
||||
Tuple data received from node: 32 bytes
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1)
|
||||
-> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1)
|
||||
|
|
|
@ -721,13 +721,11 @@ CALL exec_query_and_check_query_counters($$
|
|||
0, 0
|
||||
);
|
||||
-- same with explain analyze
|
||||
--
|
||||
-- this time, query_execution_multi_shard is incremented twice because of #4212
|
||||
CALL exec_query_and_check_query_counters($$
|
||||
EXPLAIN (ANALYZE)
|
||||
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
|
||||
$$,
|
||||
1, 2
|
||||
1, 1
|
||||
);
|
||||
CALL exec_query_and_check_query_counters($$
|
||||
DELETE FROM dist_table WHERE a = 1
|
||||
|
@ -1041,9 +1039,6 @@ PL/pgSQL function exec_query_and_check_query_counters(text,bigint,bigint) line X
|
|||
-- A similar one but without the insert, so we would normally expect 2 increments
|
||||
-- for query_execution_single_shard and 2 for query_execution_multi_shard instead
|
||||
-- of 3 since the insert is not there anymore.
|
||||
--
|
||||
-- But this time we observe more counter increments because we execute the subplans
|
||||
-- twice because of #4212.
|
||||
CALL exec_query_and_check_query_counters($$
|
||||
EXPLAIN (ANALYZE)
|
||||
-- single-shard subplan (whole cte)
|
||||
|
@ -1057,7 +1052,7 @@ CALL exec_query_and_check_query_counters($$
|
|||
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q)
|
||||
JOIN cte ON q.a = cte.a
|
||||
$$,
|
||||
3, 4
|
||||
2, 2
|
||||
);
|
||||
-- safe to push-down
|
||||
CALL exec_query_and_check_query_counters($$
|
||||
|
|
|
@ -476,13 +476,11 @@ CALL exec_query_and_check_query_counters($$
|
|||
);
|
||||
|
||||
-- same with explain analyze
|
||||
--
|
||||
-- this time, query_execution_multi_shard is incremented twice because of #4212
|
||||
CALL exec_query_and_check_query_counters($$
|
||||
EXPLAIN (ANALYZE)
|
||||
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
|
||||
$$,
|
||||
1, 2
|
||||
1, 1
|
||||
);
|
||||
|
||||
CALL exec_query_and_check_query_counters($$
|
||||
|
@ -807,9 +805,6 @@ CALL exec_query_and_check_query_counters($$
|
|||
-- A similar one but without the insert, so we would normally expect 2 increments
|
||||
-- for query_execution_single_shard and 2 for query_execution_multi_shard instead
|
||||
-- of 3 since the insert is not there anymore.
|
||||
--
|
||||
-- But this time we observe more counter increments because we execute the subplans
|
||||
-- twice because of #4212.
|
||||
CALL exec_query_and_check_query_counters($$
|
||||
EXPLAIN (ANALYZE)
|
||||
-- single-shard subplan (whole cte)
|
||||
|
@ -823,7 +818,7 @@ CALL exec_query_and_check_query_counters($$
|
|||
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q)
|
||||
JOIN cte ON q.a = cte.a
|
||||
$$,
|
||||
3, 4
|
||||
2, 2
|
||||
);
|
||||
|
||||
-- safe to push-down
|
||||
|
|
Loading…
Reference in New Issue