diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c index 10e0fe470..033a21b2b 100644 --- a/src/backend/distributed/executor/subplan_execution.c +++ b/src/backend/distributed/executor/subplan_execution.c @@ -122,6 +122,8 @@ ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled) subPlan->writeLocalFile = entry->writeLocalFile; SubPlanLevel--; + subPlan->numTasksOutput = NumTasksOutput; + NumTasksOutput = 0; FreeExecutorState(estate); } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index f97887de4..93d1cbba7 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -301,7 +301,6 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es if (distributedPlan->subPlanList != NIL) { ExplainSubPlans(distributedPlan, es); - NumTasksOutput = 0; } ExplainJob(scanState, distributedPlan->workerJob, es, params); @@ -556,7 +555,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) /* Inject the earlier executed results into the newly created tasks */ - if (NumTasksOutput && (queryDesc->planstate != NULL) && + if (subPlan->numTasksOutput && (queryDesc->planstate != NULL) && IsA(queryDesc->planstate, CustomScanState)) { DistributedPlan *newdistributedPlan = @@ -570,7 +569,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) foreach(lc, newdistributedPlan->workerJob->taskList) { if (subPlan->totalExplainOutput[idx].explainOutput && - idx < NumTasksOutput) + idx < subPlan->numTasksOutput) { /* * Now feed the earlier saved output, which will be used @@ -593,6 +592,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) idx++; } + instr.nloops = 1; queryDesc->planstate->instrument = &instr; } @@ -2332,104 +2332,90 @@ elapsed_time(instr_time *starttime) static void ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr) { - char *line = pstrdup(explainOutput); - char *token, - *saveptr; - bool in_wal = false; + char *token, *saveptr; - /* split on spaces, parentheses or newlines */ - for (token = strtok_r(line, " ()\n", &saveptr); - token != NULL; - token = strtok_r(NULL, " ()\n", &saveptr)) - { - if (strcmp(token, "WAL:") == 0) - { - in_wal = true; - continue; - } + /* Validate input */ + if (explainOutput == NULL || instr == NULL) + return; - if (in_wal) - { - if (strncmp(token, "records=", 8) == 0) - instr->walusage.wal_records = - strtoul(token + 8, NULL, 10); - else if (strncmp(token, "bytes=", 6) == 0) - { - instr->walusage.wal_bytes = - strtoul(token + 6, NULL, 10); - /* once we’ve seen bytes=, we can leave WAL mode */ - in_wal = false; - } - continue; - } + char *line = pstrdup(explainOutput); - if (strncmp(token, "time=", 5) == 0) - { - /* token is "time=X..Y" */ - char *p = token + 5; - char *dd = strstr(p, ".."); + bool inWal = false; + bool inResult = false; - if (dd) - { - *dd = '\0'; - instr->startup += strtod(p, NULL) / 1000.0; - instr->total += strtod(dd + 2, NULL) / 1000.0; - } - } - else if (strncmp(token, "rows=", 5) == 0) - { - instr->ntuples += strtol(token + 5, NULL, 10); - } - else if (strncmp(token, "loops=", 6) == 0) - { - instr->nloops = strtol(token + 6, NULL, 10); - } - } - - pfree(line); -} - -#if 0 -/* - * ParseExplainAnalyzeOutput - */ -static void -ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr) -{ - double start_ms = 0.0, end_ms = 0.0; - int rows = 0, loops = 0; - - /* 1) Extract “actual time=XXX..YYY rows=R loops=L” */ - if (sscanf(explainOutput, "%*[^=]=%lf..%lf rows=%d loops=%d", - &start_ms, &end_ms, &rows, &loops) == 4) + /* split on spaces, parentheses or newlines */ + for (token = strtok_r(line, " ()\n", &saveptr); + token != NULL; + token = strtok_r(NULL, " ()\n", &saveptr)) { - /* times in ms, convert to seconds */ - instr->startup += start_ms / 1000.0; - instr->total += end_ms / 1000.0; - instr->ntuples += (double) rows; - instr->nloops = (double) loops; - } - else if (sscanf(explainOutput, "%*[^(\n](actual rows=%d loops=%d)", &rows, &loops) == 2) - { - /* no timing present, just capture rows & loops */ - instr->ntuples += (double) rows; - instr->nloops = (double) loops; - } - - /* 2) Look for “WAL: records=X bytes=Y” */ - const char *wal = strstr(explainOutput, "WAL:"); - if (wal) - { - int recs = 0, bytes = 0; - if (sscanf(wal, "WAL: records=%d bytes=%d", &recs, &bytes) == 2) + if (strcmp(token, "WAL:") == 0) { - instr->walusage.wal_records += recs; - instr->walusage.wal_bytes += bytes; + inWal = true; + continue; + } + + if (strcmp(token, "Result") == 0) + { + inResult = true; + continue; + } + + /* Reset Result flag when we see "actual" - but only skip if we're in Result mode */ + if (strcmp(token, "actual") == 0) + { + /* If we were in Result mode, the next tokens should be skipped */ + /* If we weren't in Result mode, continue normally */ + continue; + } + + if (inWal) + { + if (strncmp(token, "records=", 8) == 0) + instr->walusage.wal_records += strtoul(token + 8, NULL, 10); + else if (strncmp(token, "bytes=", 6) == 0) + { + instr->walusage.wal_bytes += strtoul(token + 6, NULL, 10); + /* once we've seen bytes=, we can leave WAL mode */ + inWal = false; + } + continue; + } + + /* Skip Result node's actual timing data */ + if (inResult) + { + if (strncmp(token, "time=", 5) == 0 || + strncmp(token, "rows=", 5) == 0 || + strncmp(token, "loops=", 6) == 0) + { + /* If this is loops=, we've seen all Result data */ + if (strncmp(token, "loops=", 6) == 0) + inResult = false; + continue; + } + } + + if (strncmp(token, "time=", 5) == 0) + { + /* token is "time=X..Y" */ + char *timeStr = token + 5; + char *doubleDot = strstr(timeStr, ".."); + if (doubleDot) + { + *doubleDot = '\0'; + instr->startup += strtod(timeStr, NULL) / 1000.0; + instr->total += strtod(doubleDot + 2, NULL) / 1000.0; + } + } + else if (strncmp(token, "rows=", 5) == 0) + { + instr->ntuples += strtol(token + 5, NULL, 10); + break; /* We are done for this Task */ } } -} -#endif + pfree(line); +} #if PG_VERSION_NUM >= PG_VERSION_17 /* diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 81775c6db..d0b97de53 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -166,6 +166,8 @@ CopyNodeDistributedSubPlan(COPYFUNC_ARGS) /* copy the totalReceivedTupleData (uint64) */ COPY_SCALAR_FIELD(totalExplainOutput[i].totalReceivedTupleData); } + + COPY_SCALAR_FIELD(numTasksOutput); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 451bc279e..aac0c60e6 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -252,6 +252,8 @@ OutDistributedSubPlan(OUTFUNC_ARGS) appendStringInfoChar(str, ')'); } + WRITE_INT_FIELD(numTasksOutput); + appendStringInfoChar(str, ']'); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index fbb13fb71..d6b0cc36c 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -504,6 +504,7 @@ typedef struct DistributedSubPlan double durationMillisecs; bool writeLocalFile; SubPlanExplainOutput totalExplainOutput[MAX_ANALYZE_OUTPUT]; + uint32 numTasksOutput; } DistributedSubPlan; diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index bfcf29c4d..91ee0ebf3 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -2492,12 +2492,12 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) -> Distributed Subplan XXX_1 Intermediate Data Size: 100 bytes Result destination: Write locally - -> Custom Scan (Citus Adaptive) (actual rows=20 loops=1) + -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 - Tuple data received from nodes: 160 bytes + Tuple data received from nodes: 80 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 64 bytes + Tuple data received from node: 32 bytes Node: host=localhost port=xxxxx dbname=regression -> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1) -> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1)