PG17 - Propagate EXPLAIN options: MEMORY and SERIALIZE (#7802)

DESCRIPTION: Propagates MEMORY and SERIALIZE options of EXPLAIN

The options for `MEMORY` can be true or false. Default is false.
The options for `SERIALIZE` can be none, text or binary. Default is
none.

I referred to how we added support for WAL option in this PR [Support
EXPLAIN(ANALYZE, WAL)](https://github.com/citusdata/citus/pull/4196).
For the tests however, I used the same tests as Postgres, not like the
tests in the WAL PR. I used exactly the same tests as Postgres does, I
simply distributed the table beforehand. See below the relevant Postgres
commits from where you can see the tests added as well:
- [Add EXPLAIN
(MEMORY)](https://github.com/postgres/postgres/commit/5de890e36)
- [Invent SERIALIZE option for
EXPLAIN.](https://github.com/postgres/postgres/commit/06286709e)

This PR required a lot of copying of Postgres static functions regarding
how `EXPLAIN` works for `MEMORY` and `SERIALIZE` options. Specifically,
these copy-pastes were required for updating `ExplainWorkerPlan()`
function, which is in fact based on postgres' `ExplainOnePlan()`:
```C
/* copied from explain.c to update ExplainWorkerPlan() in citus according to ExplainOnePlan() in postgres */
#define BYTES_TO_KILOBYTES(b)
typedef struct SerializeMetrics
static bool peek_buffer_usage(ExplainState *es, const BufferUsage *usage);
static void show_buffer_usage(ExplainState *es, const BufferUsage *usage);
static void show_memory_counters(ExplainState *es, const MemoryContextCounters *mem_counters);
static void ExplainIndentText(ExplainState *es);
static void ExplainPrintSerialize(ExplainState *es, SerializeMetrics *metrics);
static SerializeMetrics GetSerializationMetrics(DestReceiver *dest);
```

_Note_: it looks like we were missing some `buffers` option details as
well. I put them together with the memory option, like the code in
Postgres explain.c, as I didn't want to change the copied code. However,
I tested locally and there is no big deal in previous Citus versions,
and you can also see that existing Citus tests with `buffers true`
didn't change. Therefore, I prefer not to backport "buffers" changes to
previous versions.
pull/7837/head
Naisila Puka 2025-01-02 12:32:36 +03:00 committed by GitHub
parent 48849ff3c2
commit 9bad33f89d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 1147 additions and 2 deletions

View File

@ -95,14 +95,24 @@ typedef struct
bool wal;
bool timing;
bool summary;
#if PG_VERSION_NUM >= PG_VERSION_17
bool memory;
ExplainSerializeOption serialize;
#endif
ExplainFormat format;
} ExplainOptions;
/* EXPLAIN flags of current distributed explain */
#if PG_VERSION_NUM >= PG_VERSION_17
static ExplainOptions CurrentDistributedQueryExplainOptions = {
0, 0, 0, 0, 0, 0, 0, EXPLAIN_SERIALIZE_NONE, EXPLAIN_FORMAT_TEXT
};
#else
static ExplainOptions CurrentDistributedQueryExplainOptions = {
0, 0, 0, 0, 0, 0, EXPLAIN_FORMAT_TEXT
};
#endif
/* Result for a single remote EXPLAIN command */
typedef struct RemoteExplainPlan
@ -124,6 +134,59 @@ typedef struct ExplainAnalyzeDestination
TupleDesc lastSavedExplainAnalyzeTupDesc;
} ExplainAnalyzeDestination;
#if PG_VERSION_NUM >= PG_VERSION_17
/*
* Various places within need to convert bytes to kilobytes. Round these up
* to the next whole kilobyte.
* copied from explain.c
*/
#define BYTES_TO_KILOBYTES(b) (((b) + 1023) / 1024)
/* copied from explain.c */
/* Instrumentation data for SERIALIZE option */
typedef struct SerializeMetrics
{
uint64 bytesSent; /* # of bytes serialized */
instr_time timeSpent; /* time spent serializing */
BufferUsage bufferUsage; /* buffers accessed during serialization */
} SerializeMetrics;
/* copied from explain.c */
static bool peek_buffer_usage(ExplainState *es, const BufferUsage *usage);
static void show_buffer_usage(ExplainState *es, const BufferUsage *usage);
static void show_memory_counters(ExplainState *es,
const MemoryContextCounters *mem_counters);
static void ExplainIndentText(ExplainState *es);
static void ExplainPrintSerialize(ExplainState *es,
SerializeMetrics *metrics);
static SerializeMetrics GetSerializationMetrics(DestReceiver *dest);
/*
* DestReceiver functions for SERIALIZE option
*
* A DestReceiver for query tuples, that serializes passed rows into RowData
* messages while measuring the resources expended and total serialized size,
* while never sending the data to the client. This allows measuring the
* overhead of deTOASTing and datatype out/sendfuncs, which are not otherwise
* exercisable without actually hitting the network.
*
* copied from explain.c
*/
typedef struct SerializeDestReceiver
{
DestReceiver pub;
ExplainState *es; /* this EXPLAIN statement's ExplainState */
int8 format; /* text or binary, like pq wire protocol */
TupleDesc attrinfo; /* the output tuple desc */
int nattrs; /* current number of columns */
FmgrInfo *finfos; /* precomputed call info for output fns */
MemoryContext tmpcontext; /* per-row temporary memory context */
StringInfoData buf; /* buffer to hold the constructed message */
SerializeMetrics metrics; /* collected metrics */
} SerializeDestReceiver;
#endif
/* Explain functions for distributed queries */
static void ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es);
@ -144,14 +207,27 @@ static void ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOut
ExplainState *es);
static StringInfo BuildRemoteExplainQuery(char *queryString, ExplainState *es);
static const char * ExplainFormatStr(ExplainFormat format);
#if PG_VERSION_NUM >= PG_VERSION_17
static const char * ExplainSerializeStr(ExplainSerializeOption serializeOption);
#endif
static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest,
ExplainState *es,
const char *queryString, ParamListInfo params,
QueryEnvironment *queryEnv,
const instr_time *planduration,
#if PG_VERSION_NUM >= PG_VERSION_17
const BufferUsage *bufusage,
const MemoryContextCounters *mem_counters,
#endif
double *executionDurationMillisec);
static ExplainFormat ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName,
ExplainFormat defaultValue);
#if PG_VERSION_NUM >= PG_VERSION_17
static ExplainSerializeOption ExtractFieldExplainSerialize(Datum jsonbDoc,
const char *fieldName,
ExplainSerializeOption
defaultValue);
#endif
static TupleDestination * CreateExplainAnlyzeDestination(Task *task,
TupleDestination *taskDest);
static void ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
@ -1025,11 +1101,19 @@ BuildRemoteExplainQuery(char *queryString, ExplainState *es)
{
StringInfo explainQuery = makeStringInfo();
const char *formatStr = ExplainFormatStr(es->format);
#if PG_VERSION_NUM >= PG_VERSION_17
const char *serializeStr = ExplainSerializeStr(es->serialize);
#endif
appendStringInfo(explainQuery,
"EXPLAIN (ANALYZE %s, VERBOSE %s, "
"COSTS %s, BUFFERS %s, WAL %s, "
"TIMING %s, SUMMARY %s, FORMAT %s) %s",
"TIMING %s, SUMMARY %s, "
#if PG_VERSION_NUM >= PG_VERSION_17
"MEMORY %s, SERIALIZE %s, "
#endif
"FORMAT %s) %s",
es->analyze ? "TRUE" : "FALSE",
es->verbose ? "TRUE" : "FALSE",
es->costs ? "TRUE" : "FALSE",
@ -1037,6 +1121,10 @@ BuildRemoteExplainQuery(char *queryString, ExplainState *es)
es->wal ? "TRUE" : "FALSE",
es->timing ? "TRUE" : "FALSE",
es->summary ? "TRUE" : "FALSE",
#if PG_VERSION_NUM >= PG_VERSION_17
es->memory ? "TRUE" : "FALSE",
serializeStr,
#endif
formatStr,
queryString);
@ -1075,6 +1163,42 @@ ExplainFormatStr(ExplainFormat format)
}
#if PG_VERSION_NUM >= PG_VERSION_17
/*
* ExplainSerializeStr converts the given explain serialize option to string.
*/
static const char *
ExplainSerializeStr(ExplainSerializeOption serializeOption)
{
switch (serializeOption)
{
case EXPLAIN_SERIALIZE_NONE:
{
return "none";
}
case EXPLAIN_SERIALIZE_TEXT:
{
return "text";
}
case EXPLAIN_SERIALIZE_BINARY:
{
return "binary";
}
default:
{
return "none";
}
}
}
#endif
/*
* worker_last_saved_explain_analyze returns the last saved EXPLAIN ANALYZE output of
* a worker task query. It returns NULL if nothing has been saved yet.
@ -1134,6 +1258,11 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
es->verbose = ExtractFieldBoolean(explainOptions, "verbose", es->verbose);
es->timing = ExtractFieldBoolean(explainOptions, "timing", es->timing);
es->format = ExtractFieldExplainFormat(explainOptions, "format", es->format);
#if PG_VERSION_NUM >= PG_VERSION_17
es->memory = ExtractFieldBoolean(explainOptions, "memory", es->memory);
es->serialize = ExtractFieldExplainSerialize(explainOptions, "serialize",
es->serialize);
#endif
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
@ -1179,6 +1308,36 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
/* plan query and record planning stats */
instr_time planStart;
instr_time planDuration;
#if PG_VERSION_NUM >= PG_VERSION_17
BufferUsage bufusage_start,
bufusage;
MemoryContextCounters mem_counters;
MemoryContext planner_ctx = NULL;
MemoryContext saved_ctx = NULL;
if (es->memory)
{
/*
* Create a new memory context to measure planner's memory consumption
* accurately. Note that if the planner were to be modified to use a
* different memory context type, here we would be changing that to
* AllocSet, which might be undesirable. However, we don't have a way
* to create a context of the same type as another, so we pray and
* hope that this is OK.
*
* copied from explain.c
*/
planner_ctx = AllocSetContextCreate(CurrentMemoryContext,
"explain analyze planner context",
ALLOCSET_DEFAULT_SIZES);
saved_ctx = MemoryContextSwitchTo(planner_ctx);
}
if (es->buffers)
{
bufusage_start = pgBufferUsage;
}
#endif
INSTR_TIME_SET_CURRENT(planStart);
@ -1187,9 +1346,32 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
INSTR_TIME_SET_CURRENT(planDuration);
INSTR_TIME_SUBTRACT(planDuration, planStart);
#if PG_VERSION_NUM >= PG_VERSION_17
if (es->memory)
{
MemoryContextSwitchTo(saved_ctx);
MemoryContextMemConsumed(planner_ctx, &mem_counters);
}
/* calc differences of buffer counters. */
if (es->buffers)
{
memset(&bufusage, 0, sizeof(BufferUsage));
BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start);
}
/* do the actual EXPLAIN ANALYZE */
ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL,
&planDuration,
(es->buffers ? &bufusage : NULL),
(es->memory ? &mem_counters : NULL),
&executionDurationMillisec);
#else
/* do the actual EXPLAIN ANALYZE */
ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL,
&planDuration, &executionDurationMillisec);
#endif
ExplainEndOutput(es);
@ -1258,6 +1440,50 @@ ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName, ExplainFormat
}
#if PG_VERSION_NUM >= PG_VERSION_17
/*
* ExtractFieldExplainSerialize gets value of fieldName from jsonbDoc, or returns
* defaultValue if it doesn't exist.
*/
static ExplainSerializeOption
ExtractFieldExplainSerialize(Datum jsonbDoc, const char *fieldName, ExplainSerializeOption
defaultValue)
{
Datum jsonbDatum = 0;
bool found = ExtractFieldJsonbDatum(jsonbDoc, fieldName, &jsonbDatum);
if (!found)
{
return defaultValue;
}
const char *serializeStr = DatumGetCString(DirectFunctionCall1(jsonb_out,
jsonbDatum));
if (pg_strcasecmp(serializeStr, "\"none\"") == 0)
{
return EXPLAIN_SERIALIZE_NONE;
}
else if (pg_strcasecmp(serializeStr, "\"off\"") == 0)
{
return EXPLAIN_SERIALIZE_NONE;
}
else if (pg_strcasecmp(serializeStr, "\"text\"") == 0)
{
return EXPLAIN_SERIALIZE_TEXT;
}
else if (pg_strcasecmp(serializeStr, "\"binary\"") == 0)
{
return EXPLAIN_SERIALIZE_BINARY;
}
ereport(ERROR, (errmsg("Invalid explain analyze serialize: %s", serializeStr)));
return 0;
}
#endif
/*
* CitusExplainOneQuery is the executor hook that is called when
* postgres wants to explain a query.
@ -1275,6 +1501,10 @@ CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into,
CurrentDistributedQueryExplainOptions.summary = es->summary;
CurrentDistributedQueryExplainOptions.timing = es->timing;
CurrentDistributedQueryExplainOptions.format = es->format;
#if PG_VERSION_NUM >= PG_VERSION_17
CurrentDistributedQueryExplainOptions.memory = es->memory;
CurrentDistributedQueryExplainOptions.serialize = es->serialize;
#endif
/* rest is copied from ExplainOneQuery() */
instr_time planstart,
@ -1597,11 +1827,18 @@ WrapQueryForExplainAnalyze(const char *queryString, TupleDesc tupleDesc,
StringInfo explainOptions = makeStringInfo();
appendStringInfo(explainOptions,
"{\"verbose\": %s, \"costs\": %s, \"buffers\": %s, \"wal\": %s, "
#if PG_VERSION_NUM >= PG_VERSION_17
"\"memory\": %s, \"serialize\": \"%s\", "
#endif
"\"timing\": %s, \"summary\": %s, \"format\": \"%s\"}",
CurrentDistributedQueryExplainOptions.verbose ? "true" : "false",
CurrentDistributedQueryExplainOptions.costs ? "true" : "false",
CurrentDistributedQueryExplainOptions.buffers ? "true" : "false",
CurrentDistributedQueryExplainOptions.wal ? "true" : "false",
#if PG_VERSION_NUM >= PG_VERSION_17
CurrentDistributedQueryExplainOptions.memory ? "true" : "false",
ExplainSerializeStr(CurrentDistributedQueryExplainOptions.serialize),
#endif
CurrentDistributedQueryExplainOptions.timing ? "true" : "false",
CurrentDistributedQueryExplainOptions.summary ? "true" : "false",
ExplainFormatStr(CurrentDistributedQueryExplainOptions.format));
@ -1826,7 +2063,12 @@ ExplainOneQuery(Query *query, int cursorOptions,
static void
ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es,
const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv,
const instr_time *planduration, double *executionDurationMillisec)
const instr_time *planduration,
#if PG_VERSION_NUM >= PG_VERSION_17
const BufferUsage *bufusage,
const MemoryContextCounters *mem_counters,
#endif
double *executionDurationMillisec)
{
QueryDesc *queryDesc;
instr_time starttime;
@ -1895,6 +2137,32 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
/* Create textual dump of plan tree */
ExplainPrintPlan(es, queryDesc);
#if PG_VERSION_NUM >= PG_VERSION_17
/* Show buffer and/or memory usage in planning */
if (peek_buffer_usage(es, bufusage) || mem_counters)
{
ExplainOpenGroup("Planning", "Planning", true, es);
if (es->format == EXPLAIN_FORMAT_TEXT)
{
ExplainIndentText(es);
appendStringInfoString(es->str, "Planning:\n");
es->indent++;
}
if (bufusage)
show_buffer_usage(es, bufusage);
if (mem_counters)
show_memory_counters(es, mem_counters);
if (es->format == EXPLAIN_FORMAT_TEXT)
es->indent--;
ExplainCloseGroup("Planning", "Planning", true, es);
}
#endif
if (es->summary && planduration)
{
double plantime = INSTR_TIME_GET_DOUBLE(*planduration);
@ -1915,6 +2183,23 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
if (es->costs)
ExplainPrintJITSummary(es, queryDesc);
#if PG_VERSION_NUM >= PG_VERSION_17
if (es->serialize != EXPLAIN_SERIALIZE_NONE)
{
/* the SERIALIZE option requires its own tuple receiver */
DestReceiver *dest_serialize = CreateExplainSerializeDestReceiver(es);
/* grab serialization metrics before we destroy the DestReceiver */
SerializeMetrics serializeMetrics = GetSerializationMetrics(dest_serialize);
/* call the DestReceiver's destroy method even during explain */
dest_serialize->rDestroy(dest_serialize);
/* Print info about serialization of output */
ExplainPrintSerialize(es, &serializeMetrics);
}
#endif
/*
* Close down the query and free resources. Include time for this in the
* total execution time (although it should be pretty minimal).
@ -1963,3 +2248,351 @@ elapsed_time(instr_time *starttime)
INSTR_TIME_SUBTRACT(endtime, *starttime);
return INSTR_TIME_GET_DOUBLE(endtime);
}
#if PG_VERSION_NUM >= PG_VERSION_17
/*
* Return whether show_buffer_usage would have anything to print, if given
* the same 'usage' data. Note that when the format is anything other than
* text, we print even if the counters are all zeroes.
*
* Copied from explain.c.
*/
static bool
peek_buffer_usage(ExplainState *es, const BufferUsage *usage)
{
bool has_shared;
bool has_local;
bool has_temp;
bool has_shared_timing;
bool has_local_timing;
bool has_temp_timing;
if (usage == NULL)
return false;
if (es->format != EXPLAIN_FORMAT_TEXT)
return true;
has_shared = (usage->shared_blks_hit > 0 ||
usage->shared_blks_read > 0 ||
usage->shared_blks_dirtied > 0 ||
usage->shared_blks_written > 0);
has_local = (usage->local_blks_hit > 0 ||
usage->local_blks_read > 0 ||
usage->local_blks_dirtied > 0 ||
usage->local_blks_written > 0);
has_temp = (usage->temp_blks_read > 0 ||
usage->temp_blks_written > 0);
has_shared_timing = (!INSTR_TIME_IS_ZERO(usage->shared_blk_read_time) ||
!INSTR_TIME_IS_ZERO(usage->shared_blk_write_time));
has_local_timing = (!INSTR_TIME_IS_ZERO(usage->local_blk_read_time) ||
!INSTR_TIME_IS_ZERO(usage->local_blk_write_time));
has_temp_timing = (!INSTR_TIME_IS_ZERO(usage->temp_blk_read_time) ||
!INSTR_TIME_IS_ZERO(usage->temp_blk_write_time));
return has_shared || has_local || has_temp || has_shared_timing ||
has_local_timing || has_temp_timing;
}
/*
* Show buffer usage details. This better be sync with peek_buffer_usage.
*
* Copied from explain.c.
*/
static void
show_buffer_usage(ExplainState *es, const BufferUsage *usage)
{
if (es->format == EXPLAIN_FORMAT_TEXT)
{
bool has_shared = (usage->shared_blks_hit > 0 ||
usage->shared_blks_read > 0 ||
usage->shared_blks_dirtied > 0 ||
usage->shared_blks_written > 0);
bool has_local = (usage->local_blks_hit > 0 ||
usage->local_blks_read > 0 ||
usage->local_blks_dirtied > 0 ||
usage->local_blks_written > 0);
bool has_temp = (usage->temp_blks_read > 0 ||
usage->temp_blks_written > 0);
bool has_shared_timing = (!INSTR_TIME_IS_ZERO(usage->shared_blk_read_time) ||
!INSTR_TIME_IS_ZERO(usage->shared_blk_write_time));
bool has_local_timing = (!INSTR_TIME_IS_ZERO(usage->local_blk_read_time) ||
!INSTR_TIME_IS_ZERO(usage->local_blk_write_time));
bool has_temp_timing = (!INSTR_TIME_IS_ZERO(usage->temp_blk_read_time) ||
!INSTR_TIME_IS_ZERO(usage->temp_blk_write_time));
/* Show only positive counter values. */
if (has_shared || has_local || has_temp)
{
ExplainIndentText(es);
appendStringInfoString(es->str, "Buffers:");
if (has_shared)
{
appendStringInfoString(es->str, " shared");
if (usage->shared_blks_hit > 0)
appendStringInfo(es->str, " hit=%lld",
(long long) usage->shared_blks_hit);
if (usage->shared_blks_read > 0)
appendStringInfo(es->str, " read=%lld",
(long long) usage->shared_blks_read);
if (usage->shared_blks_dirtied > 0)
appendStringInfo(es->str, " dirtied=%lld",
(long long) usage->shared_blks_dirtied);
if (usage->shared_blks_written > 0)
appendStringInfo(es->str, " written=%lld",
(long long) usage->shared_blks_written);
if (has_local || has_temp)
appendStringInfoChar(es->str, ',');
}
if (has_local)
{
appendStringInfoString(es->str, " local");
if (usage->local_blks_hit > 0)
appendStringInfo(es->str, " hit=%lld",
(long long) usage->local_blks_hit);
if (usage->local_blks_read > 0)
appendStringInfo(es->str, " read=%lld",
(long long) usage->local_blks_read);
if (usage->local_blks_dirtied > 0)
appendStringInfo(es->str, " dirtied=%lld",
(long long) usage->local_blks_dirtied);
if (usage->local_blks_written > 0)
appendStringInfo(es->str, " written=%lld",
(long long) usage->local_blks_written);
if (has_temp)
appendStringInfoChar(es->str, ',');
}
if (has_temp)
{
appendStringInfoString(es->str, " temp");
if (usage->temp_blks_read > 0)
appendStringInfo(es->str, " read=%lld",
(long long) usage->temp_blks_read);
if (usage->temp_blks_written > 0)
appendStringInfo(es->str, " written=%lld",
(long long) usage->temp_blks_written);
}
appendStringInfoChar(es->str, '\n');
}
/* As above, show only positive counter values. */
if (has_shared_timing || has_local_timing || has_temp_timing)
{
ExplainIndentText(es);
appendStringInfoString(es->str, "I/O Timings:");
if (has_shared_timing)
{
appendStringInfoString(es->str, " shared");
if (!INSTR_TIME_IS_ZERO(usage->shared_blk_read_time))
appendStringInfo(es->str, " read=%0.3f",
INSTR_TIME_GET_MILLISEC(usage->shared_blk_read_time));
if (!INSTR_TIME_IS_ZERO(usage->shared_blk_write_time))
appendStringInfo(es->str, " write=%0.3f",
INSTR_TIME_GET_MILLISEC(usage->shared_blk_write_time));
if (has_local_timing || has_temp_timing)
appendStringInfoChar(es->str, ',');
}
if (has_local_timing)
{
appendStringInfoString(es->str, " local");
if (!INSTR_TIME_IS_ZERO(usage->local_blk_read_time))
appendStringInfo(es->str, " read=%0.3f",
INSTR_TIME_GET_MILLISEC(usage->local_blk_read_time));
if (!INSTR_TIME_IS_ZERO(usage->local_blk_write_time))
appendStringInfo(es->str, " write=%0.3f",
INSTR_TIME_GET_MILLISEC(usage->local_blk_write_time));
if (has_temp_timing)
appendStringInfoChar(es->str, ',');
}
if (has_temp_timing)
{
appendStringInfoString(es->str, " temp");
if (!INSTR_TIME_IS_ZERO(usage->temp_blk_read_time))
appendStringInfo(es->str, " read=%0.3f",
INSTR_TIME_GET_MILLISEC(usage->temp_blk_read_time));
if (!INSTR_TIME_IS_ZERO(usage->temp_blk_write_time))
appendStringInfo(es->str, " write=%0.3f",
INSTR_TIME_GET_MILLISEC(usage->temp_blk_write_time));
}
appendStringInfoChar(es->str, '\n');
}
}
else
{
ExplainPropertyInteger("Shared Hit Blocks", NULL,
usage->shared_blks_hit, es);
ExplainPropertyInteger("Shared Read Blocks", NULL,
usage->shared_blks_read, es);
ExplainPropertyInteger("Shared Dirtied Blocks", NULL,
usage->shared_blks_dirtied, es);
ExplainPropertyInteger("Shared Written Blocks", NULL,
usage->shared_blks_written, es);
ExplainPropertyInteger("Local Hit Blocks", NULL,
usage->local_blks_hit, es);
ExplainPropertyInteger("Local Read Blocks", NULL,
usage->local_blks_read, es);
ExplainPropertyInteger("Local Dirtied Blocks", NULL,
usage->local_blks_dirtied, es);
ExplainPropertyInteger("Local Written Blocks", NULL,
usage->local_blks_written, es);
ExplainPropertyInteger("Temp Read Blocks", NULL,
usage->temp_blks_read, es);
ExplainPropertyInteger("Temp Written Blocks", NULL,
usage->temp_blks_written, es);
if (track_io_timing)
{
ExplainPropertyFloat("Shared I/O Read Time", "ms",
INSTR_TIME_GET_MILLISEC(usage->shared_blk_read_time),
3, es);
ExplainPropertyFloat("Shared I/O Write Time", "ms",
INSTR_TIME_GET_MILLISEC(usage->shared_blk_write_time),
3, es);
ExplainPropertyFloat("Local I/O Read Time", "ms",
INSTR_TIME_GET_MILLISEC(usage->local_blk_read_time),
3, es);
ExplainPropertyFloat("Local I/O Write Time", "ms",
INSTR_TIME_GET_MILLISEC(usage->local_blk_write_time),
3, es);
ExplainPropertyFloat("Temp I/O Read Time", "ms",
INSTR_TIME_GET_MILLISEC(usage->temp_blk_read_time),
3, es);
ExplainPropertyFloat("Temp I/O Write Time", "ms",
INSTR_TIME_GET_MILLISEC(usage->temp_blk_write_time),
3, es);
}
}
}
/*
* Indent a text-format line.
*
* We indent by two spaces per indentation level. However, when emitting
* data for a parallel worker there might already be data on the current line
* (cf. ExplainOpenWorker); in that case, don't indent any more.
*
* Copied from explain.c.
*/
static void
ExplainIndentText(ExplainState *es)
{
Assert(es->format == EXPLAIN_FORMAT_TEXT);
if (es->str->len == 0 || es->str->data[es->str->len - 1] == '\n')
appendStringInfoSpaces(es->str, es->indent * 2);
}
/*
* Show memory usage details.
*
* Copied from explain.c.
*/
static void
show_memory_counters(ExplainState *es, const MemoryContextCounters *mem_counters)
{
int64 memUsedkB = BYTES_TO_KILOBYTES(mem_counters->totalspace -
mem_counters->freespace);
int64 memAllocatedkB = BYTES_TO_KILOBYTES(mem_counters->totalspace);
if (es->format == EXPLAIN_FORMAT_TEXT)
{
ExplainIndentText(es);
appendStringInfo(es->str,
"Memory: used=" INT64_FORMAT "kB allocated=" INT64_FORMAT "kB",
memUsedkB, memAllocatedkB);
appendStringInfoChar(es->str, '\n');
}
else
{
ExplainPropertyInteger("Memory Used", "kB", memUsedkB, es);
ExplainPropertyInteger("Memory Allocated", "kB", memAllocatedkB, es);
}
}
/*
* ExplainPrintSerialize -
* Append information about query output volume to es->str.
*
* Copied from explain.c.
*/
static void
ExplainPrintSerialize(ExplainState *es, SerializeMetrics *metrics)
{
const char *format;
/* We shouldn't get called for EXPLAIN_SERIALIZE_NONE */
if (es->serialize == EXPLAIN_SERIALIZE_TEXT)
format = "text";
else
{
Assert(es->serialize == EXPLAIN_SERIALIZE_BINARY);
format = "binary";
}
ExplainOpenGroup("Serialization", "Serialization", true, es);
if (es->format == EXPLAIN_FORMAT_TEXT)
{
ExplainIndentText(es);
if (es->timing)
appendStringInfo(es->str, "Serialization: time=%.3f ms output=" UINT64_FORMAT "kB format=%s\n",
1000.0 * INSTR_TIME_GET_DOUBLE(metrics->timeSpent),
BYTES_TO_KILOBYTES(metrics->bytesSent),
format);
else
appendStringInfo(es->str, "Serialization: output=" UINT64_FORMAT "kB format=%s\n",
BYTES_TO_KILOBYTES(metrics->bytesSent),
format);
if (es->buffers && peek_buffer_usage(es, &metrics->bufferUsage))
{
es->indent++;
show_buffer_usage(es, &metrics->bufferUsage);
es->indent--;
}
}
else
{
if (es->timing)
ExplainPropertyFloat("Time", "ms",
1000.0 * INSTR_TIME_GET_DOUBLE(metrics->timeSpent),
3, es);
ExplainPropertyUInteger("Output Volume", "kB",
BYTES_TO_KILOBYTES(metrics->bytesSent), es);
ExplainPropertyText("Format", format, es);
if (es->buffers)
show_buffer_usage(es, &metrics->bufferUsage);
}
ExplainCloseGroup("Serialization", "Serialization", true, es);
}
/*
* GetSerializationMetrics - collect metrics
*
* We have to be careful here since the receiver could be an IntoRel
* receiver if the subject statement is CREATE TABLE AS. In that
* case, return all-zeroes stats.
*
* Copied from explain.c.
*/
static SerializeMetrics
GetSerializationMetrics(DestReceiver *dest)
{
SerializeMetrics empty;
if (dest->mydest == DestExplainSerialize)
return ((SerializeDestReceiver *) dest)->metrics;
memset(&empty, 0, sizeof(SerializeMetrics));
INSTR_TIME_SET_ZERO(empty.timeSpent);
return empty;
}
#endif

View File

@ -605,3 +605,29 @@ BEGIN
RETURN NEXT;
END LOOP;
END; $$ language plpgsql;
-- To produce stable regression test output, it's usually necessary to
-- ignore details such as exact costs or row counts. These filter
-- functions replace changeable output details with fixed strings.
-- Copied from PG explain.sql
create function explain_filter(text) returns setof text
language plpgsql as
$$
declare
ln text;
begin
for ln in execute $1
loop
-- Replace any numeric word with just 'N'
ln := regexp_replace(ln, '-?\m\d+\M', 'N', 'g');
-- In sort output, the above won't match units-suffixed numbers
ln := regexp_replace(ln, '\m\d+kB', 'NkB', 'g');
-- Ignore text-mode buffers output because it varies depending
-- on the system state
CONTINUE WHEN (ln ~ ' +Buffers: .*');
-- Ignore text-mode "Planning:" line because whether it's output
-- varies depending on the system state
CONTINUE WHEN (ln = 'Planning:');
return next ln;
end loop;
end;
$$;

View File

@ -1779,6 +1779,422 @@ DROP EVENT TRIGGER reindex_event_trigger;
DROP EVENT TRIGGER reindex_event_trigger_end;
DROP TABLE reindex_test CASCADE;
-- End of test for REINDEX support in event triggers for Citus-related objects
-- Propagate EXPLAIN MEMORY
-- Relevant PG commit: https://github.com/postgres/postgres/commit/5de890e36
-- Propagate EXPLAIN SERIALIZE
-- Relevant PG commit: https://github.com/postgres/postgres/commit/06286709e
SET citus.next_shard_id TO 12242024;
CREATE TABLE int8_tbl(q1 int8, q2 int8);
SELECT create_distributed_table('int8_tbl', 'q1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO int8_tbl VALUES
(' 123 ',' 456'),
('123 ','4567890123456789'),
('4567890123456789','123'),
(+4567890123456789,'4567890123456789'),
('+4567890123456789','-4567890123456789');
-- memory tests, same as postgres tests, we just distributed the table
-- we can see the memory used separately per each task in worker nodes
SET citus.log_remote_commands TO true;
-- for explain analyze, we run worker_save_query_explain_analyze query
-- for regular explain, we run EXPLAIN query
-- therefore let's grep the commands based on the shard id
SET citus.grep_remote_commands TO '%12242024%';
select public.explain_filter('explain (memory) select * from int8_tbl i8');
NOTICE: issuing EXPLAIN (ANALYZE FALSE, VERBOSE FALSE, COSTS TRUE, BUFFERS FALSE, WAL FALSE, TIMING FALSE, SUMMARY FALSE, MEMORY TRUE, SERIALIZE none, FORMAT TEXT) SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement
explain_filter
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N)
Task Count: N
Tasks Shown: One of N
-> Task
Node: host=localhost port=N dbname=regression
-> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N)
Planning:
Memory: used=NkB allocated=NkB
Memory: used=NkB allocated=NkB
(9 rows)
select public.explain_filter('explain (memory, analyze) select * from int8_tbl i8');
NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": false, "wal": false, "memory": true, "serialize": "none", "timing": true, "summary": true, "format": "TEXT"}') AS (field_0 bigint, field_1 bigint)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement
explain_filter
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N)
Task Count: N
Tuple data received from nodes: N bytes
Tasks Shown: One of N
-> Task
Tuple data received from node: N bytes
Node: host=localhost port=N dbname=regression
-> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N)
Planning:
Memory: used=NkB allocated=NkB
Planning Time: N.N ms
Execution Time: N.N ms
Memory: used=NkB allocated=NkB
Planning Time: N.N ms
Execution Time: N.N ms
(15 rows)
select public.explain_filter('explain (memory, summary, format yaml) select * from int8_tbl i8');
NOTICE: issuing EXPLAIN (ANALYZE FALSE, VERBOSE FALSE, COSTS TRUE, BUFFERS FALSE, WAL FALSE, TIMING FALSE, SUMMARY TRUE, MEMORY TRUE, SERIALIZE none, FORMAT YAML) SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement
explain_filter
---------------------------------------------------------------------
- Plan: +
Node Type: "Custom Scan" +
Custom Plan Provider: "Citus Adaptive" +
Parallel Aware: false +
Async Capable: false +
Startup Cost: N.N +
Total Cost: N.N +
Plan Rows: N +
Plan Width: N +
Distributed Query: +
Job: +
Task Count: N +
Tasks Shown: "One of N" +
Tasks: +
- Node: "host=localhost port=N dbname=regression"+
Remote Plan: +
- Plan: +
Node Type: "Seq Scan" +
Parallel Aware: false +
Async Capable: false +
Relation Name: "int8_tbl_12242024" +
Alias: "i8" +
Startup Cost: N.N +
Total Cost: N.N +
Plan Rows: N +
Plan Width: N +
Planning: +
Memory Used: N +
Memory Allocated: N +
Planning Time: N.N +
+
Planning: +
Memory Used: N +
Memory Allocated: N +
Planning Time: N.N
(1 row)
select public.explain_filter('explain (memory, analyze, format json) select * from int8_tbl i8');
NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": false, "wal": false, "memory": true, "serialize": "none", "timing": true, "summary": true, "format": "JSON"}') AS (field_0 bigint, field_1 bigint)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement
explain_filter
---------------------------------------------------------------------
[ +
{ +
"Plan": { +
"Node Type": "Custom Scan", +
"Custom Plan Provider": "Citus Adaptive", +
"Parallel Aware": false, +
"Async Capable": false, +
"Startup Cost": N.N, +
"Total Cost": N.N, +
"Plan Rows": N, +
"Plan Width": N, +
"Actual Startup Time": N.N, +
"Actual Total Time": N.N, +
"Actual Rows": N, +
"Actual Loops": N, +
"Distributed Query": { +
"Job": { +
"Task Count": N, +
"Tuple data received from nodes": "N bytes", +
"Tasks Shown": "One of N", +
"Tasks": [ +
{ +
"Tuple data received from node": "N bytes", +
"Node": "host=localhost port=N dbname=regression",+
"Remote Plan": [ +
[ +
{ +
"Plan": { +
"Node Type": "Seq Scan", +
"Parallel Aware": false, +
"Async Capable": false, +
"Relation Name": "int8_tbl_12242024", +
"Alias": "i8", +
"Startup Cost": N.N, +
"Total Cost": N.N, +
"Plan Rows": N, +
"Plan Width": N, +
"Actual Startup Time": N.N, +
"Actual Total Time": N.N, +
"Actual Rows": N, +
"Actual Loops": N +
}, +
"Planning": { +
"Memory Used": N, +
"Memory Allocated": N +
}, +
"Planning Time": N.N, +
"Triggers": [ +
], +
"Execution Time": N.N +
} +
] +
+
] +
} +
] +
} +
} +
}, +
"Planning": { +
"Memory Used": N, +
"Memory Allocated": N +
}, +
"Planning Time": N.N, +
"Triggers": [ +
], +
"Execution Time": N.N +
} +
]
(1 row)
prepare int8_query as select * from int8_tbl i8;
select public.explain_filter('explain (memory) execute int8_query');
NOTICE: issuing EXPLAIN (ANALYZE FALSE, VERBOSE FALSE, COSTS TRUE, BUFFERS FALSE, WAL FALSE, TIMING FALSE, SUMMARY FALSE, MEMORY TRUE, SERIALIZE none, FORMAT TEXT) SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement
explain_filter
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N)
Task Count: N
Tasks Shown: One of N
-> Task
Node: host=localhost port=N dbname=regression
-> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N)
Planning:
Memory: used=NkB allocated=NkB
Memory: used=NkB allocated=NkB
(9 rows)
-- serialize tests, same as postgres tests, we just distributed the table
select public.explain_filter('explain (analyze, serialize, buffers, format yaml) select * from int8_tbl i8');
NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": true, "wal": false, "memory": false, "serialize": "text", "timing": true, "summary": true, "format": "YAML"}') AS (field_0 bigint, field_1 bigint)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement
explain_filter
---------------------------------------------------------------------
- Plan: +
Node Type: "Custom Scan" +
Custom Plan Provider: "Citus Adaptive" +
Parallel Aware: false +
Async Capable: false +
Startup Cost: N.N +
Total Cost: N.N +
Plan Rows: N +
Plan Width: N +
Actual Startup Time: N.N +
Actual Total Time: N.N +
Actual Rows: N +
Actual Loops: N +
Distributed Query: +
Job: +
Task Count: N +
Tuple data received from nodes: "N bytes" +
Tasks Shown: "One of N" +
Tasks: +
- Tuple data received from node: "N bytes" +
Node: "host=localhost port=N dbname=regression"+
Remote Plan: +
- Plan: +
Node Type: "Seq Scan" +
Parallel Aware: false +
Async Capable: false +
Relation Name: "int8_tbl_12242024" +
Alias: "i8" +
Startup Cost: N.N +
Total Cost: N.N +
Plan Rows: N +
Plan Width: N +
Actual Startup Time: N.N +
Actual Total Time: N.N +
Actual Rows: N +
Actual Loops: N +
Shared Hit Blocks: N +
Shared Read Blocks: N +
Shared Dirtied Blocks: N +
Shared Written Blocks: N +
Local Hit Blocks: N +
Local Read Blocks: N +
Local Dirtied Blocks: N +
Local Written Blocks: N +
Temp Read Blocks: N +
Temp Written Blocks: N +
Planning: +
Shared Hit Blocks: N +
Shared Read Blocks: N +
Shared Dirtied Blocks: N +
Shared Written Blocks: N +
Local Hit Blocks: N +
Local Read Blocks: N +
Local Dirtied Blocks: N +
Local Written Blocks: N +
Temp Read Blocks: N +
Temp Written Blocks: N +
Planning Time: N.N +
Triggers: +
Serialization: +
Time: N.N +
Output Volume: N +
Format: "text" +
Shared Hit Blocks: N +
Shared Read Blocks: N +
Shared Dirtied Blocks: N +
Shared Written Blocks: N +
Local Hit Blocks: N +
Local Read Blocks: N +
Local Dirtied Blocks: N +
Local Written Blocks: N +
Temp Read Blocks: N +
Temp Written Blocks: N +
Execution Time: N.N +
+
Shared Hit Blocks: N +
Shared Read Blocks: N +
Shared Dirtied Blocks: N +
Shared Written Blocks: N +
Local Hit Blocks: N +
Local Read Blocks: N +
Local Dirtied Blocks: N +
Local Written Blocks: N +
Temp Read Blocks: N +
Temp Written Blocks: N +
Planning: +
Shared Hit Blocks: N +
Shared Read Blocks: N +
Shared Dirtied Blocks: N +
Shared Written Blocks: N +
Local Hit Blocks: N +
Local Read Blocks: N +
Local Dirtied Blocks: N +
Local Written Blocks: N +
Temp Read Blocks: N +
Temp Written Blocks: N +
Planning Time: N.N +
Triggers: +
Serialization: +
Time: N.N +
Output Volume: N +
Format: "text" +
Shared Hit Blocks: N +
Shared Read Blocks: N +
Shared Dirtied Blocks: N +
Shared Written Blocks: N +
Local Hit Blocks: N +
Local Read Blocks: N +
Local Dirtied Blocks: N +
Local Written Blocks: N +
Temp Read Blocks: N +
Temp Written Blocks: N +
Execution Time: N.N
(1 row)
select public.explain_filter('explain (analyze,serialize) select * from int8_tbl i8');
NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": false, "wal": false, "memory": false, "serialize": "text", "timing": true, "summary": true, "format": "TEXT"}') AS (field_0 bigint, field_1 bigint)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement
explain_filter
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N)
Task Count: N
Tuple data received from nodes: N bytes
Tasks Shown: One of N
-> Task
Tuple data received from node: N bytes
Node: host=localhost port=N dbname=regression
-> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N)
Planning Time: N.N ms
Serialization: time=N.N ms output=NkB format=text
Execution Time: N.N ms
Planning Time: N.N ms
Serialization: time=N.N ms output=NkB format=text
Execution Time: N.N ms
(14 rows)
select public.explain_filter('explain (analyze,serialize text,buffers,timing off) select * from int8_tbl i8');
NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": true, "wal": false, "memory": false, "serialize": "text", "timing": false, "summary": true, "format": "TEXT"}') AS (field_0 bigint, field_1 bigint)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement
explain_filter
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) (actual rows=N loops=N)
Task Count: N
Tuple data received from nodes: N bytes
Tasks Shown: One of N
-> Task
Tuple data received from node: N bytes
Node: host=localhost port=N dbname=regression
-> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) (actual rows=N loops=N)
Planning Time: N.N ms
Serialization: output=NkB format=text
Execution Time: N.N ms
Planning Time: N.N ms
Serialization: output=NkB format=text
Execution Time: N.N ms
(14 rows)
select public.explain_filter('explain (analyze,serialize binary,buffers,timing) select * from int8_tbl i8');
NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": true, "wal": false, "memory": false, "serialize": "binary", "timing": true, "summary": true, "format": "TEXT"}') AS (field_0 bigint, field_1 bigint)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement
explain_filter
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N)
Task Count: N
Tuple data received from nodes: N bytes
Tasks Shown: One of N
-> Task
Tuple data received from node: N bytes
Node: host=localhost port=N dbname=regression
-> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N)
Planning Time: N.N ms
Serialization: time=N.N ms output=NkB format=binary
Execution Time: N.N ms
Planning Time: N.N ms
Serialization: time=N.N ms output=NkB format=binary
Execution Time: N.N ms
(14 rows)
-- this tests an edge case where we have no data to return
select public.explain_filter('explain (analyze,serialize) create temp table explain_temp as select * from int8_tbl i8');
NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": false, "wal": false, "memory": false, "serialize": "text", "timing": true, "summary": true, "format": "TEXT"}') AS (field_0 bigint, field_1 bigint)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement
explain_filter
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N)
Task Count: N
Tuple data received from nodes: N bytes
Tasks Shown: One of N
-> Task
Tuple data received from node: N bytes
Node: host=localhost port=N dbname=regression
-> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N)
Planning Time: N.N ms
Serialization: time=N.N ms output=NkB format=text
Execution Time: N.N ms
Planning Time: N.N ms
Serialization: time=N.N ms output=NkB format=text
Execution Time: N.N ms
(14 rows)
RESET citus.log_remote_commands;
-- End of EXPLAIN MEMORY SERIALIZE tests
\set VERBOSITY terse
SET client_min_messages TO WARNING;
DROP SCHEMA pg17 CASCADE;

View File

@ -632,3 +632,31 @@ BEGIN
RETURN NEXT;
END LOOP;
END; $$ language plpgsql;
-- To produce stable regression test output, it's usually necessary to
-- ignore details such as exact costs or row counts. These filter
-- functions replace changeable output details with fixed strings.
-- Copied from PG explain.sql
create function explain_filter(text) returns setof text
language plpgsql as
$$
declare
ln text;
begin
for ln in execute $1
loop
-- Replace any numeric word with just 'N'
ln := regexp_replace(ln, '-?\m\d+\M', 'N', 'g');
-- In sort output, the above won't match units-suffixed numbers
ln := regexp_replace(ln, '\m\d+kB', 'NkB', 'g');
-- Ignore text-mode buffers output because it varies depending
-- on the system state
CONTINUE WHEN (ln ~ ' +Buffers: .*');
-- Ignore text-mode "Planning:" line because whether it's output
-- varies depending on the system state
CONTINUE WHEN (ln = 'Planning:');
return next ln;
end loop;
end;
$$;

View File

@ -1038,6 +1038,48 @@ DROP EVENT TRIGGER reindex_event_trigger_end;
DROP TABLE reindex_test CASCADE;
-- End of test for REINDEX support in event triggers for Citus-related objects
-- Propagate EXPLAIN MEMORY
-- Relevant PG commit: https://github.com/postgres/postgres/commit/5de890e36
-- Propagate EXPLAIN SERIALIZE
-- Relevant PG commit: https://github.com/postgres/postgres/commit/06286709e
SET citus.next_shard_id TO 12242024;
CREATE TABLE int8_tbl(q1 int8, q2 int8);
SELECT create_distributed_table('int8_tbl', 'q1');
INSERT INTO int8_tbl VALUES
(' 123 ',' 456'),
('123 ','4567890123456789'),
('4567890123456789','123'),
(+4567890123456789,'4567890123456789'),
('+4567890123456789','-4567890123456789');
-- memory tests, same as postgres tests, we just distributed the table
-- we can see the memory used separately per each task in worker nodes
SET citus.log_remote_commands TO true;
-- for explain analyze, we run worker_save_query_explain_analyze query
-- for regular explain, we run EXPLAIN query
-- therefore let's grep the commands based on the shard id
SET citus.grep_remote_commands TO '%12242024%';
select public.explain_filter('explain (memory) select * from int8_tbl i8');
select public.explain_filter('explain (memory, analyze) select * from int8_tbl i8');
select public.explain_filter('explain (memory, summary, format yaml) select * from int8_tbl i8');
select public.explain_filter('explain (memory, analyze, format json) select * from int8_tbl i8');
prepare int8_query as select * from int8_tbl i8;
select public.explain_filter('explain (memory) execute int8_query');
-- serialize tests, same as postgres tests, we just distributed the table
select public.explain_filter('explain (analyze, serialize, buffers, format yaml) select * from int8_tbl i8');
select public.explain_filter('explain (analyze,serialize) select * from int8_tbl i8');
select public.explain_filter('explain (analyze,serialize text,buffers,timing off) select * from int8_tbl i8');
select public.explain_filter('explain (analyze,serialize binary,buffers,timing) select * from int8_tbl i8');
-- this tests an edge case where we have no data to return
select public.explain_filter('explain (analyze,serialize) create temp table explain_temp as select * from int8_tbl i8');
RESET citus.log_remote_commands;
-- End of EXPLAIN MEMORY SERIALIZE tests
\set VERBOSITY terse
SET client_min_messages TO WARNING;