Remove scanf

new
Teja Mupparti 2025-06-12 11:51:26 -07:00
parent 368f5267e4
commit 86b6c46b2f
6 changed files with 88 additions and 95 deletions

View File

@ -122,6 +122,8 @@ ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled)
subPlan->writeLocalFile = entry->writeLocalFile; subPlan->writeLocalFile = entry->writeLocalFile;
SubPlanLevel--; SubPlanLevel--;
subPlan->numTasksOutput = NumTasksOutput;
NumTasksOutput = 0;
FreeExecutorState(estate); FreeExecutorState(estate);
} }

View File

@ -301,7 +301,6 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es
if (distributedPlan->subPlanList != NIL) if (distributedPlan->subPlanList != NIL)
{ {
ExplainSubPlans(distributedPlan, es); ExplainSubPlans(distributedPlan, es);
NumTasksOutput = 0;
} }
ExplainJob(scanState, distributedPlan->workerJob, es, params); ExplainJob(scanState, distributedPlan->workerJob, es, params);
@ -556,7 +555,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
/* Inject the earlier executed results into the newly created tasks */ /* Inject the earlier executed results into the newly created tasks */
if (NumTasksOutput && (queryDesc->planstate != NULL) && if (subPlan->numTasksOutput && (queryDesc->planstate != NULL) &&
IsA(queryDesc->planstate, CustomScanState)) IsA(queryDesc->planstate, CustomScanState))
{ {
DistributedPlan *newdistributedPlan = DistributedPlan *newdistributedPlan =
@ -570,7 +569,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
foreach(lc, newdistributedPlan->workerJob->taskList) foreach(lc, newdistributedPlan->workerJob->taskList)
{ {
if (subPlan->totalExplainOutput[idx].explainOutput && if (subPlan->totalExplainOutput[idx].explainOutput &&
idx < NumTasksOutput) idx < subPlan->numTasksOutput)
{ {
/* /*
* Now feed the earlier saved output, which will be used * Now feed the earlier saved output, which will be used
@ -593,6 +592,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
idx++; idx++;
} }
instr.nloops = 1;
queryDesc->planstate->instrument = &instr; queryDesc->planstate->instrument = &instr;
} }
@ -2332,104 +2332,90 @@ elapsed_time(instr_time *starttime)
static void static void
ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr) ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr)
{ {
char *line = pstrdup(explainOutput); char *token, *saveptr;
char *token,
*saveptr;
bool in_wal = false;
/* split on spaces, parentheses or newlines */ /* Validate input */
for (token = strtok_r(line, " ()\n", &saveptr); if (explainOutput == NULL || instr == NULL)
token != NULL; return;
token = strtok_r(NULL, " ()\n", &saveptr))
{
if (strcmp(token, "WAL:") == 0)
{
in_wal = true;
continue;
}
if (in_wal) char *line = pstrdup(explainOutput);
{
if (strncmp(token, "records=", 8) == 0)
instr->walusage.wal_records =
strtoul(token + 8, NULL, 10);
else if (strncmp(token, "bytes=", 6) == 0)
{
instr->walusage.wal_bytes =
strtoul(token + 6, NULL, 10);
/* once weve seen bytes=, we can leave WAL mode */
in_wal = false;
}
continue;
}
if (strncmp(token, "time=", 5) == 0) bool inWal = false;
{ bool inResult = false;
/* token is "time=X..Y" */
char *p = token + 5;
char *dd = strstr(p, "..");
if (dd) /* split on spaces, parentheses or newlines */
{ for (token = strtok_r(line, " ()\n", &saveptr);
*dd = '\0'; token != NULL;
instr->startup += strtod(p, NULL) / 1000.0; token = strtok_r(NULL, " ()\n", &saveptr))
instr->total += strtod(dd + 2, NULL) / 1000.0;
}
}
else if (strncmp(token, "rows=", 5) == 0)
{
instr->ntuples += strtol(token + 5, NULL, 10);
}
else if (strncmp(token, "loops=", 6) == 0)
{
instr->nloops = strtol(token + 6, NULL, 10);
}
}
pfree(line);
}
#if 0
/*
* ParseExplainAnalyzeOutput
*/
static void
ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr)
{
double start_ms = 0.0, end_ms = 0.0;
int rows = 0, loops = 0;
/* 1) Extract “actual time=XXX..YYY rows=R loops=L” */
if (sscanf(explainOutput, "%*[^=]=%lf..%lf rows=%d loops=%d",
&start_ms, &end_ms, &rows, &loops) == 4)
{ {
/* times in ms, convert to seconds */ if (strcmp(token, "WAL:") == 0)
instr->startup += start_ms / 1000.0;
instr->total += end_ms / 1000.0;
instr->ntuples += (double) rows;
instr->nloops = (double) loops;
}
else if (sscanf(explainOutput, "%*[^(\n](actual rows=%d loops=%d)", &rows, &loops) == 2)
{
/* no timing present, just capture rows & loops */
instr->ntuples += (double) rows;
instr->nloops = (double) loops;
}
/* 2) Look for “WAL: records=X bytes=Y” */
const char *wal = strstr(explainOutput, "WAL:");
if (wal)
{
int recs = 0, bytes = 0;
if (sscanf(wal, "WAL: records=%d bytes=%d", &recs, &bytes) == 2)
{ {
instr->walusage.wal_records += recs; inWal = true;
instr->walusage.wal_bytes += bytes; continue;
}
if (strcmp(token, "Result") == 0)
{
inResult = true;
continue;
}
/* Reset Result flag when we see "actual" - but only skip if we're in Result mode */
if (strcmp(token, "actual") == 0)
{
/* If we were in Result mode, the next tokens should be skipped */
/* If we weren't in Result mode, continue normally */
continue;
}
if (inWal)
{
if (strncmp(token, "records=", 8) == 0)
instr->walusage.wal_records += strtoul(token + 8, NULL, 10);
else if (strncmp(token, "bytes=", 6) == 0)
{
instr->walusage.wal_bytes += strtoul(token + 6, NULL, 10);
/* once we've seen bytes=, we can leave WAL mode */
inWal = false;
}
continue;
}
/* Skip Result node's actual timing data */
if (inResult)
{
if (strncmp(token, "time=", 5) == 0 ||
strncmp(token, "rows=", 5) == 0 ||
strncmp(token, "loops=", 6) == 0)
{
/* If this is loops=, we've seen all Result data */
if (strncmp(token, "loops=", 6) == 0)
inResult = false;
continue;
}
}
if (strncmp(token, "time=", 5) == 0)
{
/* token is "time=X..Y" */
char *timeStr = token + 5;
char *doubleDot = strstr(timeStr, "..");
if (doubleDot)
{
*doubleDot = '\0';
instr->startup += strtod(timeStr, NULL) / 1000.0;
instr->total += strtod(doubleDot + 2, NULL) / 1000.0;
}
}
else if (strncmp(token, "rows=", 5) == 0)
{
instr->ntuples += strtol(token + 5, NULL, 10);
break; /* We are done for this Task */
} }
} }
}
#endif
pfree(line);
}
#if PG_VERSION_NUM >= PG_VERSION_17 #if PG_VERSION_NUM >= PG_VERSION_17
/* /*

View File

@ -166,6 +166,8 @@ CopyNodeDistributedSubPlan(COPYFUNC_ARGS)
/* copy the totalReceivedTupleData (uint64) */ /* copy the totalReceivedTupleData (uint64) */
COPY_SCALAR_FIELD(totalExplainOutput[i].totalReceivedTupleData); COPY_SCALAR_FIELD(totalExplainOutput[i].totalReceivedTupleData);
} }
COPY_SCALAR_FIELD(numTasksOutput);
} }

View File

@ -252,6 +252,8 @@ OutDistributedSubPlan(OUTFUNC_ARGS)
appendStringInfoChar(str, ')'); appendStringInfoChar(str, ')');
} }
WRITE_INT_FIELD(numTasksOutput);
appendStringInfoChar(str, ']'); appendStringInfoChar(str, ']');
} }

View File

@ -504,6 +504,7 @@ typedef struct DistributedSubPlan
double durationMillisecs; double durationMillisecs;
bool writeLocalFile; bool writeLocalFile;
SubPlanExplainOutput totalExplainOutput[MAX_ANALYZE_OUTPUT]; SubPlanExplainOutput totalExplainOutput[MAX_ANALYZE_OUTPUT];
uint32 numTasksOutput;
} DistributedSubPlan; } DistributedSubPlan;

View File

@ -2492,12 +2492,12 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1 -> Distributed Subplan XXX_1
Intermediate Data Size: 100 bytes Intermediate Data Size: 100 bytes
Result destination: Write locally Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=20 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Task Count: 4 Task Count: 4
Tuple data received from nodes: 160 bytes Tuple data received from nodes: 80 bytes
Tasks Shown: One of 4 Tasks Shown: One of 4
-> Task -> Task
Tuple data received from node: 64 bytes Tuple data received from node: 32 bytes
Node: host=localhost port=xxxxx dbname=regression Node: host=localhost port=xxxxx dbname=regression
-> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1) -> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1)
-> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1) -> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1)