From 74d945f5ae1c0c70375196e4ec0b9a1a51fd08d9 Mon Sep 17 00:00:00 2001 From: Naisila Puka <37271756+naisila@users.noreply.github.com> Date: Thu, 2 Jan 2025 12:32:36 +0300 Subject: [PATCH] PG17 - Propagate EXPLAIN options: MEMORY and SERIALIZE (#7802) DESCRIPTION: Propagates MEMORY and SERIALIZE options of EXPLAIN The options for `MEMORY` can be true or false. Default is false. The options for `SERIALIZE` can be none, text or binary. Default is none. I referred to how we added support for WAL option in this PR [Support EXPLAIN(ANALYZE, WAL)](https://github.com/citusdata/citus/pull/4196). For the tests however, I used the same tests as Postgres, not like the tests in the WAL PR. I used exactly the same tests as Postgres does, I simply distributed the table beforehand. See below the relevant Postgres commits from where you can see the tests added as well: - [Add EXPLAIN (MEMORY)](https://github.com/postgres/postgres/commit/5de890e36) - [Invent SERIALIZE option for EXPLAIN.](https://github.com/postgres/postgres/commit/06286709e) This PR required a lot of copying of Postgres static functions regarding how `EXPLAIN` works for `MEMORY` and `SERIALIZE` options. Specifically, these copy-pastes were required for updating `ExplainWorkerPlan()` function, which is in fact based on postgres' `ExplainOnePlan()`: ```C /* copied from explain.c to update ExplainWorkerPlan() in citus according to ExplainOnePlan() in postgres */ #define BYTES_TO_KILOBYTES(b) typedef struct SerializeMetrics static bool peek_buffer_usage(ExplainState *es, const BufferUsage *usage); static void show_buffer_usage(ExplainState *es, const BufferUsage *usage); static void show_memory_counters(ExplainState *es, const MemoryContextCounters *mem_counters); static void ExplainIndentText(ExplainState *es); static void ExplainPrintSerialize(ExplainState *es, SerializeMetrics *metrics); static SerializeMetrics GetSerializationMetrics(DestReceiver *dest); ``` _Note_: it looks like we were missing some `buffers` option details as well. I put them together with the memory option, like the code in Postgres explain.c, as I didn't want to change the copied code. However, I tested locally and there is no big deal in previous Citus versions, and you can also see that existing Citus tests with `buffers true` didn't change. Therefore, I prefer not to backport "buffers" changes to previous versions. --- .../distributed/planner/multi_explain.c | 637 +++++++++++++++++- .../regress/expected/multi_test_helpers.out | 26 + src/test/regress/expected/pg17.out | 416 ++++++++++++ src/test/regress/sql/multi_test_helpers.sql | 28 + src/test/regress/sql/pg17.sql | 42 ++ 5 files changed, 1147 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 39e0b16d6..8b57b5a12 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -95,14 +95,24 @@ typedef struct bool wal; bool timing; bool summary; +#if PG_VERSION_NUM >= PG_VERSION_17 + bool memory; + ExplainSerializeOption serialize; +#endif ExplainFormat format; } ExplainOptions; /* EXPLAIN flags of current distributed explain */ +#if PG_VERSION_NUM >= PG_VERSION_17 +static ExplainOptions CurrentDistributedQueryExplainOptions = { + 0, 0, 0, 0, 0, 0, 0, EXPLAIN_SERIALIZE_NONE, EXPLAIN_FORMAT_TEXT +}; +#else static ExplainOptions CurrentDistributedQueryExplainOptions = { 0, 0, 0, 0, 0, 0, EXPLAIN_FORMAT_TEXT }; +#endif /* Result for a single remote EXPLAIN command */ typedef struct RemoteExplainPlan @@ -124,6 +134,59 @@ typedef struct ExplainAnalyzeDestination TupleDesc lastSavedExplainAnalyzeTupDesc; } ExplainAnalyzeDestination; +#if PG_VERSION_NUM >= PG_VERSION_17 + +/* + * Various places within need to convert bytes to kilobytes. Round these up + * to the next whole kilobyte. + * copied from explain.c + */ +#define BYTES_TO_KILOBYTES(b) (((b) + 1023) / 1024) + +/* copied from explain.c */ +/* Instrumentation data for SERIALIZE option */ +typedef struct SerializeMetrics +{ + uint64 bytesSent; /* # of bytes serialized */ + instr_time timeSpent; /* time spent serializing */ + BufferUsage bufferUsage; /* buffers accessed during serialization */ +} SerializeMetrics; + +/* copied from explain.c */ +static bool peek_buffer_usage(ExplainState *es, const BufferUsage *usage); +static void show_buffer_usage(ExplainState *es, const BufferUsage *usage); +static void show_memory_counters(ExplainState *es, + const MemoryContextCounters *mem_counters); +static void ExplainIndentText(ExplainState *es); +static void ExplainPrintSerialize(ExplainState *es, + SerializeMetrics *metrics); +static SerializeMetrics GetSerializationMetrics(DestReceiver *dest); + +/* + * DestReceiver functions for SERIALIZE option + * + * A DestReceiver for query tuples, that serializes passed rows into RowData + * messages while measuring the resources expended and total serialized size, + * while never sending the data to the client. This allows measuring the + * overhead of deTOASTing and datatype out/sendfuncs, which are not otherwise + * exercisable without actually hitting the network. + * + * copied from explain.c + */ +typedef struct SerializeDestReceiver +{ + DestReceiver pub; + ExplainState *es; /* this EXPLAIN statement's ExplainState */ + int8 format; /* text or binary, like pq wire protocol */ + TupleDesc attrinfo; /* the output tuple desc */ + int nattrs; /* current number of columns */ + FmgrInfo *finfos; /* precomputed call info for output fns */ + MemoryContext tmpcontext; /* per-row temporary memory context */ + StringInfoData buf; /* buffer to hold the constructed message */ + SerializeMetrics metrics; /* collected metrics */ +} SerializeDestReceiver; +#endif + /* Explain functions for distributed queries */ static void ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es); @@ -144,14 +207,27 @@ static void ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOut ExplainState *es); static StringInfo BuildRemoteExplainQuery(char *queryString, ExplainState *es); static const char * ExplainFormatStr(ExplainFormat format); +#if PG_VERSION_NUM >= PG_VERSION_17 +static const char * ExplainSerializeStr(ExplainSerializeOption serializeOption); +#endif static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, +#if PG_VERSION_NUM >= PG_VERSION_17 + const BufferUsage *bufusage, + const MemoryContextCounters *mem_counters, +#endif double *executionDurationMillisec); static ExplainFormat ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName, ExplainFormat defaultValue); +#if PG_VERSION_NUM >= PG_VERSION_17 +static ExplainSerializeOption ExtractFieldExplainSerialize(Datum jsonbDoc, + const char *fieldName, + ExplainSerializeOption + defaultValue); +#endif static TupleDestination * CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest); static void ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, @@ -1023,11 +1099,19 @@ BuildRemoteExplainQuery(char *queryString, ExplainState *es) { StringInfo explainQuery = makeStringInfo(); const char *formatStr = ExplainFormatStr(es->format); +#if PG_VERSION_NUM >= PG_VERSION_17 + const char *serializeStr = ExplainSerializeStr(es->serialize); +#endif + appendStringInfo(explainQuery, "EXPLAIN (ANALYZE %s, VERBOSE %s, " "COSTS %s, BUFFERS %s, WAL %s, " - "TIMING %s, SUMMARY %s, FORMAT %s) %s", + "TIMING %s, SUMMARY %s, " +#if PG_VERSION_NUM >= PG_VERSION_17 + "MEMORY %s, SERIALIZE %s, " +#endif + "FORMAT %s) %s", es->analyze ? "TRUE" : "FALSE", es->verbose ? "TRUE" : "FALSE", es->costs ? "TRUE" : "FALSE", @@ -1035,6 +1119,10 @@ BuildRemoteExplainQuery(char *queryString, ExplainState *es) es->wal ? "TRUE" : "FALSE", es->timing ? "TRUE" : "FALSE", es->summary ? "TRUE" : "FALSE", +#if PG_VERSION_NUM >= PG_VERSION_17 + es->memory ? "TRUE" : "FALSE", + serializeStr, +#endif formatStr, queryString); @@ -1073,6 +1161,42 @@ ExplainFormatStr(ExplainFormat format) } +#if PG_VERSION_NUM >= PG_VERSION_17 + +/* + * ExplainSerializeStr converts the given explain serialize option to string. + */ +static const char * +ExplainSerializeStr(ExplainSerializeOption serializeOption) +{ + switch (serializeOption) + { + case EXPLAIN_SERIALIZE_NONE: + { + return "none"; + } + + case EXPLAIN_SERIALIZE_TEXT: + { + return "text"; + } + + case EXPLAIN_SERIALIZE_BINARY: + { + return "binary"; + } + + default: + { + return "none"; + } + } +} + + +#endif + + /* * worker_last_saved_explain_analyze returns the last saved EXPLAIN ANALYZE output of * a worker task query. It returns NULL if nothing has been saved yet. @@ -1132,6 +1256,11 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) es->verbose = ExtractFieldBoolean(explainOptions, "verbose", es->verbose); es->timing = ExtractFieldBoolean(explainOptions, "timing", es->timing); es->format = ExtractFieldExplainFormat(explainOptions, "format", es->format); +#if PG_VERSION_NUM >= PG_VERSION_17 + es->memory = ExtractFieldBoolean(explainOptions, "memory", es->memory); + es->serialize = ExtractFieldExplainSerialize(explainOptions, "serialize", + es->serialize); +#endif TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); @@ -1177,6 +1306,36 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) /* plan query and record planning stats */ instr_time planStart; instr_time planDuration; +#if PG_VERSION_NUM >= PG_VERSION_17 + BufferUsage bufusage_start, + bufusage; + MemoryContextCounters mem_counters; + MemoryContext planner_ctx = NULL; + MemoryContext saved_ctx = NULL; + + if (es->memory) + { + /* + * Create a new memory context to measure planner's memory consumption + * accurately. Note that if the planner were to be modified to use a + * different memory context type, here we would be changing that to + * AllocSet, which might be undesirable. However, we don't have a way + * to create a context of the same type as another, so we pray and + * hope that this is OK. + * + * copied from explain.c + */ + planner_ctx = AllocSetContextCreate(CurrentMemoryContext, + "explain analyze planner context", + ALLOCSET_DEFAULT_SIZES); + saved_ctx = MemoryContextSwitchTo(planner_ctx); + } + + if (es->buffers) + { + bufusage_start = pgBufferUsage; + } +#endif INSTR_TIME_SET_CURRENT(planStart); @@ -1185,9 +1344,32 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) INSTR_TIME_SET_CURRENT(planDuration); INSTR_TIME_SUBTRACT(planDuration, planStart); +#if PG_VERSION_NUM >= PG_VERSION_17 + if (es->memory) + { + MemoryContextSwitchTo(saved_ctx); + MemoryContextMemConsumed(planner_ctx, &mem_counters); + } + + /* calc differences of buffer counters. */ + if (es->buffers) + { + memset(&bufusage, 0, sizeof(BufferUsage)); + BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); + } + + /* do the actual EXPLAIN ANALYZE */ + ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL, + &planDuration, + (es->buffers ? &bufusage : NULL), + (es->memory ? &mem_counters : NULL), + &executionDurationMillisec); +#else + /* do the actual EXPLAIN ANALYZE */ ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL, &planDuration, &executionDurationMillisec); +#endif ExplainEndOutput(es); @@ -1256,6 +1438,50 @@ ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName, ExplainFormat } +#if PG_VERSION_NUM >= PG_VERSION_17 + +/* + * ExtractFieldExplainSerialize gets value of fieldName from jsonbDoc, or returns + * defaultValue if it doesn't exist. + */ +static ExplainSerializeOption +ExtractFieldExplainSerialize(Datum jsonbDoc, const char *fieldName, ExplainSerializeOption + defaultValue) +{ + Datum jsonbDatum = 0; + bool found = ExtractFieldJsonbDatum(jsonbDoc, fieldName, &jsonbDatum); + if (!found) + { + return defaultValue; + } + + const char *serializeStr = DatumGetCString(DirectFunctionCall1(jsonb_out, + jsonbDatum)); + if (pg_strcasecmp(serializeStr, "\"none\"") == 0) + { + return EXPLAIN_SERIALIZE_NONE; + } + else if (pg_strcasecmp(serializeStr, "\"off\"") == 0) + { + return EXPLAIN_SERIALIZE_NONE; + } + else if (pg_strcasecmp(serializeStr, "\"text\"") == 0) + { + return EXPLAIN_SERIALIZE_TEXT; + } + else if (pg_strcasecmp(serializeStr, "\"binary\"") == 0) + { + return EXPLAIN_SERIALIZE_BINARY; + } + + ereport(ERROR, (errmsg("Invalid explain analyze serialize: %s", serializeStr))); + return 0; +} + + +#endif + + /* * CitusExplainOneQuery is the executor hook that is called when * postgres wants to explain a query. @@ -1273,6 +1499,10 @@ CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into, CurrentDistributedQueryExplainOptions.summary = es->summary; CurrentDistributedQueryExplainOptions.timing = es->timing; CurrentDistributedQueryExplainOptions.format = es->format; +#if PG_VERSION_NUM >= PG_VERSION_17 + CurrentDistributedQueryExplainOptions.memory = es->memory; + CurrentDistributedQueryExplainOptions.serialize = es->serialize; +#endif /* rest is copied from ExplainOneQuery() */ instr_time planstart, @@ -1595,11 +1825,18 @@ WrapQueryForExplainAnalyze(const char *queryString, TupleDesc tupleDesc, StringInfo explainOptions = makeStringInfo(); appendStringInfo(explainOptions, "{\"verbose\": %s, \"costs\": %s, \"buffers\": %s, \"wal\": %s, " +#if PG_VERSION_NUM >= PG_VERSION_17 + "\"memory\": %s, \"serialize\": \"%s\", " +#endif "\"timing\": %s, \"summary\": %s, \"format\": \"%s\"}", CurrentDistributedQueryExplainOptions.verbose ? "true" : "false", CurrentDistributedQueryExplainOptions.costs ? "true" : "false", CurrentDistributedQueryExplainOptions.buffers ? "true" : "false", CurrentDistributedQueryExplainOptions.wal ? "true" : "false", +#if PG_VERSION_NUM >= PG_VERSION_17 + CurrentDistributedQueryExplainOptions.memory ? "true" : "false", + ExplainSerializeStr(CurrentDistributedQueryExplainOptions.serialize), +#endif CurrentDistributedQueryExplainOptions.timing ? "true" : "false", CurrentDistributedQueryExplainOptions.summary ? "true" : "false", ExplainFormatStr(CurrentDistributedQueryExplainOptions.format)); @@ -1824,7 +2061,12 @@ ExplainOneQuery(Query *query, int cursorOptions, static void ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, - const instr_time *planduration, double *executionDurationMillisec) + const instr_time *planduration, +#if PG_VERSION_NUM >= PG_VERSION_17 + const BufferUsage *bufusage, + const MemoryContextCounters *mem_counters, +#endif + double *executionDurationMillisec) { QueryDesc *queryDesc; instr_time starttime; @@ -1893,6 +2135,32 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es /* Create textual dump of plan tree */ ExplainPrintPlan(es, queryDesc); +#if PG_VERSION_NUM >= PG_VERSION_17 + /* Show buffer and/or memory usage in planning */ + if (peek_buffer_usage(es, bufusage) || mem_counters) + { + ExplainOpenGroup("Planning", "Planning", true, es); + + if (es->format == EXPLAIN_FORMAT_TEXT) + { + ExplainIndentText(es); + appendStringInfoString(es->str, "Planning:\n"); + es->indent++; + } + + if (bufusage) + show_buffer_usage(es, bufusage); + + if (mem_counters) + show_memory_counters(es, mem_counters); + + if (es->format == EXPLAIN_FORMAT_TEXT) + es->indent--; + + ExplainCloseGroup("Planning", "Planning", true, es); + } +#endif + if (es->summary && planduration) { double plantime = INSTR_TIME_GET_DOUBLE(*planduration); @@ -1913,6 +2181,23 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es if (es->costs) ExplainPrintJITSummary(es, queryDesc); +#if PG_VERSION_NUM >= PG_VERSION_17 + if (es->serialize != EXPLAIN_SERIALIZE_NONE) + { + /* the SERIALIZE option requires its own tuple receiver */ + DestReceiver *dest_serialize = CreateExplainSerializeDestReceiver(es); + + /* grab serialization metrics before we destroy the DestReceiver */ + SerializeMetrics serializeMetrics = GetSerializationMetrics(dest_serialize); + + /* call the DestReceiver's destroy method even during explain */ + dest_serialize->rDestroy(dest_serialize); + + /* Print info about serialization of output */ + ExplainPrintSerialize(es, &serializeMetrics); + } +#endif + /* * Close down the query and free resources. Include time for this in the * total execution time (although it should be pretty minimal). @@ -1961,3 +2246,351 @@ elapsed_time(instr_time *starttime) INSTR_TIME_SUBTRACT(endtime, *starttime); return INSTR_TIME_GET_DOUBLE(endtime); } + + +#if PG_VERSION_NUM >= PG_VERSION_17 +/* + * Return whether show_buffer_usage would have anything to print, if given + * the same 'usage' data. Note that when the format is anything other than + * text, we print even if the counters are all zeroes. + * + * Copied from explain.c. + */ +static bool +peek_buffer_usage(ExplainState *es, const BufferUsage *usage) +{ + bool has_shared; + bool has_local; + bool has_temp; + bool has_shared_timing; + bool has_local_timing; + bool has_temp_timing; + + if (usage == NULL) + return false; + + if (es->format != EXPLAIN_FORMAT_TEXT) + return true; + + has_shared = (usage->shared_blks_hit > 0 || + usage->shared_blks_read > 0 || + usage->shared_blks_dirtied > 0 || + usage->shared_blks_written > 0); + has_local = (usage->local_blks_hit > 0 || + usage->local_blks_read > 0 || + usage->local_blks_dirtied > 0 || + usage->local_blks_written > 0); + has_temp = (usage->temp_blks_read > 0 || + usage->temp_blks_written > 0); + has_shared_timing = (!INSTR_TIME_IS_ZERO(usage->shared_blk_read_time) || + !INSTR_TIME_IS_ZERO(usage->shared_blk_write_time)); + has_local_timing = (!INSTR_TIME_IS_ZERO(usage->local_blk_read_time) || + !INSTR_TIME_IS_ZERO(usage->local_blk_write_time)); + has_temp_timing = (!INSTR_TIME_IS_ZERO(usage->temp_blk_read_time) || + !INSTR_TIME_IS_ZERO(usage->temp_blk_write_time)); + + return has_shared || has_local || has_temp || has_shared_timing || + has_local_timing || has_temp_timing; +} + + +/* + * Show buffer usage details. This better be sync with peek_buffer_usage. + * + * Copied from explain.c. + */ +static void +show_buffer_usage(ExplainState *es, const BufferUsage *usage) +{ + if (es->format == EXPLAIN_FORMAT_TEXT) + { + bool has_shared = (usage->shared_blks_hit > 0 || + usage->shared_blks_read > 0 || + usage->shared_blks_dirtied > 0 || + usage->shared_blks_written > 0); + bool has_local = (usage->local_blks_hit > 0 || + usage->local_blks_read > 0 || + usage->local_blks_dirtied > 0 || + usage->local_blks_written > 0); + bool has_temp = (usage->temp_blks_read > 0 || + usage->temp_blks_written > 0); + bool has_shared_timing = (!INSTR_TIME_IS_ZERO(usage->shared_blk_read_time) || + !INSTR_TIME_IS_ZERO(usage->shared_blk_write_time)); + bool has_local_timing = (!INSTR_TIME_IS_ZERO(usage->local_blk_read_time) || + !INSTR_TIME_IS_ZERO(usage->local_blk_write_time)); + bool has_temp_timing = (!INSTR_TIME_IS_ZERO(usage->temp_blk_read_time) || + !INSTR_TIME_IS_ZERO(usage->temp_blk_write_time)); + + /* Show only positive counter values. */ + if (has_shared || has_local || has_temp) + { + ExplainIndentText(es); + appendStringInfoString(es->str, "Buffers:"); + + if (has_shared) + { + appendStringInfoString(es->str, " shared"); + if (usage->shared_blks_hit > 0) + appendStringInfo(es->str, " hit=%lld", + (long long) usage->shared_blks_hit); + if (usage->shared_blks_read > 0) + appendStringInfo(es->str, " read=%lld", + (long long) usage->shared_blks_read); + if (usage->shared_blks_dirtied > 0) + appendStringInfo(es->str, " dirtied=%lld", + (long long) usage->shared_blks_dirtied); + if (usage->shared_blks_written > 0) + appendStringInfo(es->str, " written=%lld", + (long long) usage->shared_blks_written); + if (has_local || has_temp) + appendStringInfoChar(es->str, ','); + } + if (has_local) + { + appendStringInfoString(es->str, " local"); + if (usage->local_blks_hit > 0) + appendStringInfo(es->str, " hit=%lld", + (long long) usage->local_blks_hit); + if (usage->local_blks_read > 0) + appendStringInfo(es->str, " read=%lld", + (long long) usage->local_blks_read); + if (usage->local_blks_dirtied > 0) + appendStringInfo(es->str, " dirtied=%lld", + (long long) usage->local_blks_dirtied); + if (usage->local_blks_written > 0) + appendStringInfo(es->str, " written=%lld", + (long long) usage->local_blks_written); + if (has_temp) + appendStringInfoChar(es->str, ','); + } + if (has_temp) + { + appendStringInfoString(es->str, " temp"); + if (usage->temp_blks_read > 0) + appendStringInfo(es->str, " read=%lld", + (long long) usage->temp_blks_read); + if (usage->temp_blks_written > 0) + appendStringInfo(es->str, " written=%lld", + (long long) usage->temp_blks_written); + } + appendStringInfoChar(es->str, '\n'); + } + + /* As above, show only positive counter values. */ + if (has_shared_timing || has_local_timing || has_temp_timing) + { + ExplainIndentText(es); + appendStringInfoString(es->str, "I/O Timings:"); + + if (has_shared_timing) + { + appendStringInfoString(es->str, " shared"); + if (!INSTR_TIME_IS_ZERO(usage->shared_blk_read_time)) + appendStringInfo(es->str, " read=%0.3f", + INSTR_TIME_GET_MILLISEC(usage->shared_blk_read_time)); + if (!INSTR_TIME_IS_ZERO(usage->shared_blk_write_time)) + appendStringInfo(es->str, " write=%0.3f", + INSTR_TIME_GET_MILLISEC(usage->shared_blk_write_time)); + if (has_local_timing || has_temp_timing) + appendStringInfoChar(es->str, ','); + } + if (has_local_timing) + { + appendStringInfoString(es->str, " local"); + if (!INSTR_TIME_IS_ZERO(usage->local_blk_read_time)) + appendStringInfo(es->str, " read=%0.3f", + INSTR_TIME_GET_MILLISEC(usage->local_blk_read_time)); + if (!INSTR_TIME_IS_ZERO(usage->local_blk_write_time)) + appendStringInfo(es->str, " write=%0.3f", + INSTR_TIME_GET_MILLISEC(usage->local_blk_write_time)); + if (has_temp_timing) + appendStringInfoChar(es->str, ','); + } + if (has_temp_timing) + { + appendStringInfoString(es->str, " temp"); + if (!INSTR_TIME_IS_ZERO(usage->temp_blk_read_time)) + appendStringInfo(es->str, " read=%0.3f", + INSTR_TIME_GET_MILLISEC(usage->temp_blk_read_time)); + if (!INSTR_TIME_IS_ZERO(usage->temp_blk_write_time)) + appendStringInfo(es->str, " write=%0.3f", + INSTR_TIME_GET_MILLISEC(usage->temp_blk_write_time)); + } + appendStringInfoChar(es->str, '\n'); + } + } + else + { + ExplainPropertyInteger("Shared Hit Blocks", NULL, + usage->shared_blks_hit, es); + ExplainPropertyInteger("Shared Read Blocks", NULL, + usage->shared_blks_read, es); + ExplainPropertyInteger("Shared Dirtied Blocks", NULL, + usage->shared_blks_dirtied, es); + ExplainPropertyInteger("Shared Written Blocks", NULL, + usage->shared_blks_written, es); + ExplainPropertyInteger("Local Hit Blocks", NULL, + usage->local_blks_hit, es); + ExplainPropertyInteger("Local Read Blocks", NULL, + usage->local_blks_read, es); + ExplainPropertyInteger("Local Dirtied Blocks", NULL, + usage->local_blks_dirtied, es); + ExplainPropertyInteger("Local Written Blocks", NULL, + usage->local_blks_written, es); + ExplainPropertyInteger("Temp Read Blocks", NULL, + usage->temp_blks_read, es); + ExplainPropertyInteger("Temp Written Blocks", NULL, + usage->temp_blks_written, es); + if (track_io_timing) + { + ExplainPropertyFloat("Shared I/O Read Time", "ms", + INSTR_TIME_GET_MILLISEC(usage->shared_blk_read_time), + 3, es); + ExplainPropertyFloat("Shared I/O Write Time", "ms", + INSTR_TIME_GET_MILLISEC(usage->shared_blk_write_time), + 3, es); + ExplainPropertyFloat("Local I/O Read Time", "ms", + INSTR_TIME_GET_MILLISEC(usage->local_blk_read_time), + 3, es); + ExplainPropertyFloat("Local I/O Write Time", "ms", + INSTR_TIME_GET_MILLISEC(usage->local_blk_write_time), + 3, es); + ExplainPropertyFloat("Temp I/O Read Time", "ms", + INSTR_TIME_GET_MILLISEC(usage->temp_blk_read_time), + 3, es); + ExplainPropertyFloat("Temp I/O Write Time", "ms", + INSTR_TIME_GET_MILLISEC(usage->temp_blk_write_time), + 3, es); + } + } +} + + +/* + * Indent a text-format line. + * + * We indent by two spaces per indentation level. However, when emitting + * data for a parallel worker there might already be data on the current line + * (cf. ExplainOpenWorker); in that case, don't indent any more. + * + * Copied from explain.c. + */ +static void +ExplainIndentText(ExplainState *es) +{ + Assert(es->format == EXPLAIN_FORMAT_TEXT); + if (es->str->len == 0 || es->str->data[es->str->len - 1] == '\n') + appendStringInfoSpaces(es->str, es->indent * 2); +} + + +/* + * Show memory usage details. + * + * Copied from explain.c. + */ +static void +show_memory_counters(ExplainState *es, const MemoryContextCounters *mem_counters) +{ + int64 memUsedkB = BYTES_TO_KILOBYTES(mem_counters->totalspace - + mem_counters->freespace); + int64 memAllocatedkB = BYTES_TO_KILOBYTES(mem_counters->totalspace); + + if (es->format == EXPLAIN_FORMAT_TEXT) + { + ExplainIndentText(es); + appendStringInfo(es->str, + "Memory: used=" INT64_FORMAT "kB allocated=" INT64_FORMAT "kB", + memUsedkB, memAllocatedkB); + appendStringInfoChar(es->str, '\n'); + } + else + { + ExplainPropertyInteger("Memory Used", "kB", memUsedkB, es); + ExplainPropertyInteger("Memory Allocated", "kB", memAllocatedkB, es); + } +} + + +/* + * ExplainPrintSerialize - + * Append information about query output volume to es->str. + * + * Copied from explain.c. + */ +static void +ExplainPrintSerialize(ExplainState *es, SerializeMetrics *metrics) +{ + const char *format; + + /* We shouldn't get called for EXPLAIN_SERIALIZE_NONE */ + if (es->serialize == EXPLAIN_SERIALIZE_TEXT) + format = "text"; + else + { + Assert(es->serialize == EXPLAIN_SERIALIZE_BINARY); + format = "binary"; + } + + ExplainOpenGroup("Serialization", "Serialization", true, es); + + if (es->format == EXPLAIN_FORMAT_TEXT) + { + ExplainIndentText(es); + if (es->timing) + appendStringInfo(es->str, "Serialization: time=%.3f ms output=" UINT64_FORMAT "kB format=%s\n", + 1000.0 * INSTR_TIME_GET_DOUBLE(metrics->timeSpent), + BYTES_TO_KILOBYTES(metrics->bytesSent), + format); + else + appendStringInfo(es->str, "Serialization: output=" UINT64_FORMAT "kB format=%s\n", + BYTES_TO_KILOBYTES(metrics->bytesSent), + format); + + if (es->buffers && peek_buffer_usage(es, &metrics->bufferUsage)) + { + es->indent++; + show_buffer_usage(es, &metrics->bufferUsage); + es->indent--; + } + } + else + { + if (es->timing) + ExplainPropertyFloat("Time", "ms", + 1000.0 * INSTR_TIME_GET_DOUBLE(metrics->timeSpent), + 3, es); + ExplainPropertyUInteger("Output Volume", "kB", + BYTES_TO_KILOBYTES(metrics->bytesSent), es); + ExplainPropertyText("Format", format, es); + if (es->buffers) + show_buffer_usage(es, &metrics->bufferUsage); + } + + ExplainCloseGroup("Serialization", "Serialization", true, es); +} + + +/* + * GetSerializationMetrics - collect metrics + * + * We have to be careful here since the receiver could be an IntoRel + * receiver if the subject statement is CREATE TABLE AS. In that + * case, return all-zeroes stats. + * + * Copied from explain.c. + */ +static SerializeMetrics +GetSerializationMetrics(DestReceiver *dest) +{ + SerializeMetrics empty; + + if (dest->mydest == DestExplainSerialize) + return ((SerializeDestReceiver *) dest)->metrics; + + memset(&empty, 0, sizeof(SerializeMetrics)); + INSTR_TIME_SET_ZERO(empty.timeSpent); + + return empty; +} +#endif diff --git a/src/test/regress/expected/multi_test_helpers.out b/src/test/regress/expected/multi_test_helpers.out index 4b74070d1..b8aee4dc0 100644 --- a/src/test/regress/expected/multi_test_helpers.out +++ b/src/test/regress/expected/multi_test_helpers.out @@ -718,3 +718,29 @@ BEGIN RETURN NEXT; END LOOP; END; $$ language plpgsql; +-- To produce stable regression test output, it's usually necessary to +-- ignore details such as exact costs or row counts. These filter +-- functions replace changeable output details with fixed strings. +-- Copied from PG explain.sql +create function explain_filter(text) returns setof text +language plpgsql as +$$ +declare + ln text; +begin + for ln in execute $1 + loop + -- Replace any numeric word with just 'N' + ln := regexp_replace(ln, '-?\m\d+\M', 'N', 'g'); + -- In sort output, the above won't match units-suffixed numbers + ln := regexp_replace(ln, '\m\d+kB', 'NkB', 'g'); + -- Ignore text-mode buffers output because it varies depending + -- on the system state + CONTINUE WHEN (ln ~ ' +Buffers: .*'); + -- Ignore text-mode "Planning:" line because whether it's output + -- varies depending on the system state + CONTINUE WHEN (ln = 'Planning:'); + return next ln; + end loop; +end; +$$; diff --git a/src/test/regress/expected/pg17.out b/src/test/regress/expected/pg17.out index 1010c0d4b..dfd88e30e 100644 --- a/src/test/regress/expected/pg17.out +++ b/src/test/regress/expected/pg17.out @@ -1779,6 +1779,422 @@ DROP EVENT TRIGGER reindex_event_trigger; DROP EVENT TRIGGER reindex_event_trigger_end; DROP TABLE reindex_test CASCADE; -- End of test for REINDEX support in event triggers for Citus-related objects +-- Propagate EXPLAIN MEMORY +-- Relevant PG commit: https://github.com/postgres/postgres/commit/5de890e36 +-- Propagate EXPLAIN SERIALIZE +-- Relevant PG commit: https://github.com/postgres/postgres/commit/06286709e +SET citus.next_shard_id TO 12242024; +CREATE TABLE int8_tbl(q1 int8, q2 int8); +SELECT create_distributed_table('int8_tbl', 'q1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO int8_tbl VALUES + (' 123 ',' 456'), + ('123 ','4567890123456789'), + ('4567890123456789','123'), + (+4567890123456789,'4567890123456789'), + ('+4567890123456789','-4567890123456789'); +-- memory tests, same as postgres tests, we just distributed the table +-- we can see the memory used separately per each task in worker nodes +SET citus.log_remote_commands TO true; +-- for explain analyze, we run worker_save_query_explain_analyze query +-- for regular explain, we run EXPLAIN query +-- therefore let's grep the commands based on the shard id +SET citus.grep_remote_commands TO '%12242024%'; +select public.explain_filter('explain (memory) select * from int8_tbl i8'); +NOTICE: issuing EXPLAIN (ANALYZE FALSE, VERBOSE FALSE, COSTS TRUE, BUFFERS FALSE, WAL FALSE, TIMING FALSE, SUMMARY FALSE, MEMORY TRUE, SERIALIZE none, FORMAT TEXT) SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) + Task Count: N + Tasks Shown: One of N + -> Task + Node: host=localhost port=N dbname=regression + -> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) + Planning: + Memory: used=NkB allocated=NkB + Memory: used=NkB allocated=NkB +(9 rows) + +select public.explain_filter('explain (memory, analyze) select * from int8_tbl i8'); +NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": false, "wal": false, "memory": true, "serialize": "none", "timing": true, "summary": true, "format": "TEXT"}') AS (field_0 bigint, field_1 bigint) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Task Count: N + Tuple data received from nodes: N bytes + Tasks Shown: One of N + -> Task + Tuple data received from node: N bytes + Node: host=localhost port=N dbname=regression + -> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Planning: + Memory: used=NkB allocated=NkB + Planning Time: N.N ms + Execution Time: N.N ms + Memory: used=NkB allocated=NkB + Planning Time: N.N ms + Execution Time: N.N ms +(15 rows) + +select public.explain_filter('explain (memory, summary, format yaml) select * from int8_tbl i8'); +NOTICE: issuing EXPLAIN (ANALYZE FALSE, VERBOSE FALSE, COSTS TRUE, BUFFERS FALSE, WAL FALSE, TIMING FALSE, SUMMARY TRUE, MEMORY TRUE, SERIALIZE none, FORMAT YAML) SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + - Plan: + + Node Type: "Custom Scan" + + Custom Plan Provider: "Citus Adaptive" + + Parallel Aware: false + + Async Capable: false + + Startup Cost: N.N + + Total Cost: N.N + + Plan Rows: N + + Plan Width: N + + Distributed Query: + + Job: + + Task Count: N + + Tasks Shown: "One of N" + + Tasks: + + - Node: "host=localhost port=N dbname=regression"+ + Remote Plan: + + - Plan: + + Node Type: "Seq Scan" + + Parallel Aware: false + + Async Capable: false + + Relation Name: "int8_tbl_12242024" + + Alias: "i8" + + Startup Cost: N.N + + Total Cost: N.N + + Plan Rows: N + + Plan Width: N + + Planning: + + Memory Used: N + + Memory Allocated: N + + Planning Time: N.N + + + + Planning: + + Memory Used: N + + Memory Allocated: N + + Planning Time: N.N +(1 row) + +select public.explain_filter('explain (memory, analyze, format json) select * from int8_tbl i8'); +NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": false, "wal": false, "memory": true, "serialize": "none", "timing": true, "summary": true, "format": "JSON"}') AS (field_0 bigint, field_1 bigint) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + [ + + { + + "Plan": { + + "Node Type": "Custom Scan", + + "Custom Plan Provider": "Citus Adaptive", + + "Parallel Aware": false, + + "Async Capable": false, + + "Startup Cost": N.N, + + "Total Cost": N.N, + + "Plan Rows": N, + + "Plan Width": N, + + "Actual Startup Time": N.N, + + "Actual Total Time": N.N, + + "Actual Rows": N, + + "Actual Loops": N, + + "Distributed Query": { + + "Job": { + + "Task Count": N, + + "Tuple data received from nodes": "N bytes", + + "Tasks Shown": "One of N", + + "Tasks": [ + + { + + "Tuple data received from node": "N bytes", + + "Node": "host=localhost port=N dbname=regression",+ + "Remote Plan": [ + + [ + + { + + "Plan": { + + "Node Type": "Seq Scan", + + "Parallel Aware": false, + + "Async Capable": false, + + "Relation Name": "int8_tbl_12242024", + + "Alias": "i8", + + "Startup Cost": N.N, + + "Total Cost": N.N, + + "Plan Rows": N, + + "Plan Width": N, + + "Actual Startup Time": N.N, + + "Actual Total Time": N.N, + + "Actual Rows": N, + + "Actual Loops": N + + }, + + "Planning": { + + "Memory Used": N, + + "Memory Allocated": N + + }, + + "Planning Time": N.N, + + "Triggers": [ + + ], + + "Execution Time": N.N + + } + + ] + + + + ] + + } + + ] + + } + + } + + }, + + "Planning": { + + "Memory Used": N, + + "Memory Allocated": N + + }, + + "Planning Time": N.N, + + "Triggers": [ + + ], + + "Execution Time": N.N + + } + + ] +(1 row) + +prepare int8_query as select * from int8_tbl i8; +select public.explain_filter('explain (memory) execute int8_query'); +NOTICE: issuing EXPLAIN (ANALYZE FALSE, VERBOSE FALSE, COSTS TRUE, BUFFERS FALSE, WAL FALSE, TIMING FALSE, SUMMARY FALSE, MEMORY TRUE, SERIALIZE none, FORMAT TEXT) SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) + Task Count: N + Tasks Shown: One of N + -> Task + Node: host=localhost port=N dbname=regression + -> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) + Planning: + Memory: used=NkB allocated=NkB + Memory: used=NkB allocated=NkB +(9 rows) + +-- serialize tests, same as postgres tests, we just distributed the table +select public.explain_filter('explain (analyze, serialize, buffers, format yaml) select * from int8_tbl i8'); +NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": true, "wal": false, "memory": false, "serialize": "text", "timing": true, "summary": true, "format": "YAML"}') AS (field_0 bigint, field_1 bigint) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + - Plan: + + Node Type: "Custom Scan" + + Custom Plan Provider: "Citus Adaptive" + + Parallel Aware: false + + Async Capable: false + + Startup Cost: N.N + + Total Cost: N.N + + Plan Rows: N + + Plan Width: N + + Actual Startup Time: N.N + + Actual Total Time: N.N + + Actual Rows: N + + Actual Loops: N + + Distributed Query: + + Job: + + Task Count: N + + Tuple data received from nodes: "N bytes" + + Tasks Shown: "One of N" + + Tasks: + + - Tuple data received from node: "N bytes" + + Node: "host=localhost port=N dbname=regression"+ + Remote Plan: + + - Plan: + + Node Type: "Seq Scan" + + Parallel Aware: false + + Async Capable: false + + Relation Name: "int8_tbl_12242024" + + Alias: "i8" + + Startup Cost: N.N + + Total Cost: N.N + + Plan Rows: N + + Plan Width: N + + Actual Startup Time: N.N + + Actual Total Time: N.N + + Actual Rows: N + + Actual Loops: N + + Shared Hit Blocks: N + + Shared Read Blocks: N + + Shared Dirtied Blocks: N + + Shared Written Blocks: N + + Local Hit Blocks: N + + Local Read Blocks: N + + Local Dirtied Blocks: N + + Local Written Blocks: N + + Temp Read Blocks: N + + Temp Written Blocks: N + + Planning: + + Shared Hit Blocks: N + + Shared Read Blocks: N + + Shared Dirtied Blocks: N + + Shared Written Blocks: N + + Local Hit Blocks: N + + Local Read Blocks: N + + Local Dirtied Blocks: N + + Local Written Blocks: N + + Temp Read Blocks: N + + Temp Written Blocks: N + + Planning Time: N.N + + Triggers: + + Serialization: + + Time: N.N + + Output Volume: N + + Format: "text" + + Shared Hit Blocks: N + + Shared Read Blocks: N + + Shared Dirtied Blocks: N + + Shared Written Blocks: N + + Local Hit Blocks: N + + Local Read Blocks: N + + Local Dirtied Blocks: N + + Local Written Blocks: N + + Temp Read Blocks: N + + Temp Written Blocks: N + + Execution Time: N.N + + + + Shared Hit Blocks: N + + Shared Read Blocks: N + + Shared Dirtied Blocks: N + + Shared Written Blocks: N + + Local Hit Blocks: N + + Local Read Blocks: N + + Local Dirtied Blocks: N + + Local Written Blocks: N + + Temp Read Blocks: N + + Temp Written Blocks: N + + Planning: + + Shared Hit Blocks: N + + Shared Read Blocks: N + + Shared Dirtied Blocks: N + + Shared Written Blocks: N + + Local Hit Blocks: N + + Local Read Blocks: N + + Local Dirtied Blocks: N + + Local Written Blocks: N + + Temp Read Blocks: N + + Temp Written Blocks: N + + Planning Time: N.N + + Triggers: + + Serialization: + + Time: N.N + + Output Volume: N + + Format: "text" + + Shared Hit Blocks: N + + Shared Read Blocks: N + + Shared Dirtied Blocks: N + + Shared Written Blocks: N + + Local Hit Blocks: N + + Local Read Blocks: N + + Local Dirtied Blocks: N + + Local Written Blocks: N + + Temp Read Blocks: N + + Temp Written Blocks: N + + Execution Time: N.N +(1 row) + +select public.explain_filter('explain (analyze,serialize) select * from int8_tbl i8'); +NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": false, "wal": false, "memory": false, "serialize": "text", "timing": true, "summary": true, "format": "TEXT"}') AS (field_0 bigint, field_1 bigint) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Task Count: N + Tuple data received from nodes: N bytes + Tasks Shown: One of N + -> Task + Tuple data received from node: N bytes + Node: host=localhost port=N dbname=regression + -> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Planning Time: N.N ms + Serialization: time=N.N ms output=NkB format=text + Execution Time: N.N ms + Planning Time: N.N ms + Serialization: time=N.N ms output=NkB format=text + Execution Time: N.N ms +(14 rows) + +select public.explain_filter('explain (analyze,serialize text,buffers,timing off) select * from int8_tbl i8'); +NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": true, "wal": false, "memory": false, "serialize": "text", "timing": false, "summary": true, "format": "TEXT"}') AS (field_0 bigint, field_1 bigint) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) (actual rows=N loops=N) + Task Count: N + Tuple data received from nodes: N bytes + Tasks Shown: One of N + -> Task + Tuple data received from node: N bytes + Node: host=localhost port=N dbname=regression + -> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) (actual rows=N loops=N) + Planning Time: N.N ms + Serialization: output=NkB format=text + Execution Time: N.N ms + Planning Time: N.N ms + Serialization: output=NkB format=text + Execution Time: N.N ms +(14 rows) + +select public.explain_filter('explain (analyze,serialize binary,buffers,timing) select * from int8_tbl i8'); +NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": true, "wal": false, "memory": false, "serialize": "binary", "timing": true, "summary": true, "format": "TEXT"}') AS (field_0 bigint, field_1 bigint) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Task Count: N + Tuple data received from nodes: N bytes + Tasks Shown: One of N + -> Task + Tuple data received from node: N bytes + Node: host=localhost port=N dbname=regression + -> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Planning Time: N.N ms + Serialization: time=N.N ms output=NkB format=binary + Execution Time: N.N ms + Planning Time: N.N ms + Serialization: time=N.N ms output=NkB format=binary + Execution Time: N.N ms +(14 rows) + +-- this tests an edge case where we have no data to return +select public.explain_filter('explain (analyze,serialize) create temp table explain_temp as select * from int8_tbl i8'); +NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": false, "wal": false, "memory": false, "serialize": "text", "timing": true, "summary": true, "format": "TEXT"}') AS (field_0 bigint, field_1 bigint) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Task Count: N + Tuple data received from nodes: N bytes + Tasks Shown: One of N + -> Task + Tuple data received from node: N bytes + Node: host=localhost port=N dbname=regression + -> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Planning Time: N.N ms + Serialization: time=N.N ms output=NkB format=text + Execution Time: N.N ms + Planning Time: N.N ms + Serialization: time=N.N ms output=NkB format=text + Execution Time: N.N ms +(14 rows) + +RESET citus.log_remote_commands; +-- End of EXPLAIN MEMORY SERIALIZE tests \set VERBOSITY terse SET client_min_messages TO WARNING; DROP SCHEMA pg17 CASCADE; diff --git a/src/test/regress/sql/multi_test_helpers.sql b/src/test/regress/sql/multi_test_helpers.sql index e349081c2..9ac0db64d 100644 --- a/src/test/regress/sql/multi_test_helpers.sql +++ b/src/test/regress/sql/multi_test_helpers.sql @@ -747,3 +747,31 @@ BEGIN RETURN NEXT; END LOOP; END; $$ language plpgsql; + +-- To produce stable regression test output, it's usually necessary to +-- ignore details such as exact costs or row counts. These filter +-- functions replace changeable output details with fixed strings. +-- Copied from PG explain.sql + +create function explain_filter(text) returns setof text +language plpgsql as +$$ +declare + ln text; +begin + for ln in execute $1 + loop + -- Replace any numeric word with just 'N' + ln := regexp_replace(ln, '-?\m\d+\M', 'N', 'g'); + -- In sort output, the above won't match units-suffixed numbers + ln := regexp_replace(ln, '\m\d+kB', 'NkB', 'g'); + -- Ignore text-mode buffers output because it varies depending + -- on the system state + CONTINUE WHEN (ln ~ ' +Buffers: .*'); + -- Ignore text-mode "Planning:" line because whether it's output + -- varies depending on the system state + CONTINUE WHEN (ln = 'Planning:'); + return next ln; + end loop; +end; +$$; diff --git a/src/test/regress/sql/pg17.sql b/src/test/regress/sql/pg17.sql index 88d0eab0c..70d5f68a8 100644 --- a/src/test/regress/sql/pg17.sql +++ b/src/test/regress/sql/pg17.sql @@ -1038,6 +1038,48 @@ DROP EVENT TRIGGER reindex_event_trigger_end; DROP TABLE reindex_test CASCADE; -- End of test for REINDEX support in event triggers for Citus-related objects +-- Propagate EXPLAIN MEMORY +-- Relevant PG commit: https://github.com/postgres/postgres/commit/5de890e36 +-- Propagate EXPLAIN SERIALIZE +-- Relevant PG commit: https://github.com/postgres/postgres/commit/06286709e + +SET citus.next_shard_id TO 12242024; +CREATE TABLE int8_tbl(q1 int8, q2 int8); +SELECT create_distributed_table('int8_tbl', 'q1'); +INSERT INTO int8_tbl VALUES + (' 123 ',' 456'), + ('123 ','4567890123456789'), + ('4567890123456789','123'), + (+4567890123456789,'4567890123456789'), + ('+4567890123456789','-4567890123456789'); + +-- memory tests, same as postgres tests, we just distributed the table +-- we can see the memory used separately per each task in worker nodes + +SET citus.log_remote_commands TO true; + +-- for explain analyze, we run worker_save_query_explain_analyze query +-- for regular explain, we run EXPLAIN query +-- therefore let's grep the commands based on the shard id +SET citus.grep_remote_commands TO '%12242024%'; + +select public.explain_filter('explain (memory) select * from int8_tbl i8'); +select public.explain_filter('explain (memory, analyze) select * from int8_tbl i8'); +select public.explain_filter('explain (memory, summary, format yaml) select * from int8_tbl i8'); +select public.explain_filter('explain (memory, analyze, format json) select * from int8_tbl i8'); +prepare int8_query as select * from int8_tbl i8; +select public.explain_filter('explain (memory) execute int8_query'); + +-- serialize tests, same as postgres tests, we just distributed the table +select public.explain_filter('explain (analyze, serialize, buffers, format yaml) select * from int8_tbl i8'); +select public.explain_filter('explain (analyze,serialize) select * from int8_tbl i8'); +select public.explain_filter('explain (analyze,serialize text,buffers,timing off) select * from int8_tbl i8'); +select public.explain_filter('explain (analyze,serialize binary,buffers,timing) select * from int8_tbl i8'); +-- this tests an edge case where we have no data to return +select public.explain_filter('explain (analyze,serialize) create temp table explain_temp as select * from int8_tbl i8'); + +RESET citus.log_remote_commands; +-- End of EXPLAIN MEMORY SERIALIZE tests \set VERBOSITY terse SET client_min_messages TO WARNING;