diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index ca213e01e..d147370e9 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -76,8 +76,9 @@ typedef struct RemoteFileDestReceiver CopyOutState copyOutState; FmgrInfo *columnOutputFunctions; - /* number of tuples sent */ + /* statistics */ uint64 tuplesSent; + uint64 bytesSent; } RemoteFileDestReceiver; @@ -224,6 +225,17 @@ CreateRemoteFileDestReceiver(const char *resultId, EState *executorState, } +/* + * RemoteFileDestReceiverBytesSent returns number of bytes sent per remote worker. + */ +uint64 +RemoteFileDestReceiverBytesSent(DestReceiver *destReceiver) +{ + RemoteFileDestReceiver *remoteDestReceiver = (RemoteFileDestReceiver *) destReceiver; + return remoteDestReceiver->bytesSent; +} + + /* * RemoteFileDestReceiverStartup implements the rStartup interface of * RemoteFileDestReceiver. It opens connections to the nodes in initialNodeList, @@ -415,6 +427,7 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) MemoryContextSwitchTo(oldContext); resultDest->tuplesSent++; + resultDest->bytesSent += copyData->len; ResetPerTupleExprContext(executorState); diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c index b82667368..a579ab8db 100644 --- a/src/backend/distributed/executor/subplan_execution.c +++ b/src/backend/distributed/executor/subplan_execution.c @@ -20,6 +20,7 @@ #include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "executor/executor.h" +#include "utils/datetime.h" int MaxIntermediateResult = 1048576; /* maximum size in KB the intermediate result can grow to */ @@ -73,8 +74,24 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) CreateRemoteFileDestReceiver(resultId, estate, remoteWorkerNodeList, entry->writeLocalFile); + TimestampTz startTimestamp = GetCurrentTimestamp(); + ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest); + /* + * EXPLAIN ANALYZE instrumentations. Calculating these are very light-weight, + * so always populate them regardless of EXPLAIN ANALYZE or not. + */ + long durationSeconds = 0.0; + int durationMicrosecs = 0; + TimestampDifference(startTimestamp, GetCurrentTimestamp(), &durationSeconds, + &durationMicrosecs); + subPlan->durationMillisecs = durationSeconds * 1000 * +durationMicrosecs * 10e-3; + + subPlan->bytesSentPerWorker = RemoteFileDestReceiverBytesSent(copyDest); + subPlan->remoteWorkerCount = list_length(remoteWorkerNodeList); + subPlan->writeLocalFile = entry->writeLocalFile; + SubPlanLevel--; FreeExecutorState(estate); } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 08df2827f..155f8fd77 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -277,6 +277,36 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) es->indent += 3; } + if (es->analyze) + { + if (es->timing) + { + ExplainPropertyFloat("Subplan Duration", "ms", subPlan->durationMillisecs, + 2, es); + } + + ExplainPropertyInteger("Intermediate Data Size", "bytes", + subPlan->bytesSentPerWorker, es); + + StringInfo destination = makeStringInfo(); + if (subPlan->remoteWorkerCount && subPlan->writeLocalFile) + { + appendStringInfo(destination, "Send to %d nodes, write locally", + subPlan->remoteWorkerCount); + } + else if (subPlan->writeLocalFile) + { + appendStringInfoString(destination, "Write locally"); + } + else + { + appendStringInfo(destination, "Send to %d nodes", + subPlan->remoteWorkerCount); + } + + ExplainPropertyText("Result destination", destination->data, es); + } + INSTR_TIME_SET_ZERO(planduration); ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration); diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h index 73b25460e..d7ebf15bf 100644 --- a/src/include/distributed/intermediate_results.h +++ b/src/include/distributed/intermediate_results.h @@ -53,6 +53,7 @@ extern DestReceiver * CreateRemoteFileDestReceiver(const char *resultId, EState *executorState, List *initialNodeList, bool writeLocalFile); +extern uint64 RemoteFileDestReceiverBytesSent(DestReceiver *destReceiver); extern void SendQueryResultViaCopy(const char *resultId); extern void ReceiveQueryResultViaCopy(const char *resultId); extern void RemoveIntermediateResultsDirectory(void); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 77e1e5e94..63f5e42e3 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -466,6 +466,12 @@ typedef struct DistributedSubPlan uint32 subPlanId; PlannedStmt *plan; + + /* EXPLAIN ANALYZE instrumentations */ + uint64 bytesSentPerWorker; + uint32 remoteWorkerCount; + double durationMillisecs; + bool writeLocalFile; } DistributedSubPlan; diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index bc0014aeb..177a7604e 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -304,6 +304,29 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distribute Filter: (age = 20) (8 rows) +EXPLAIN (ANALYZE ON, COSTS OFF, SUMMARY OFF, TIMING OFF) +WITH r AS ( SELECT random() z,* FROM distributed_table) +SELECT * FROM r WHERE z < 2; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + -> Distributed Subplan XXX_1 + Intermediate Data Size: 40 bytes + Result destination: Write locally + -> Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on distributed_table_1470001 distributed_table (actual rows=1 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1) + Filter: (z < '2'::double precision) +(16 rows) + EXPLAIN (COSTS OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20; QUERY PLAN --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 4abce570c..30845aea3 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -2019,6 +2019,8 @@ WITH r AS ( SELECT count(distinct a) from r NATURAL JOIN ref_table; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) -> Distributed Subplan XXX_1 + Intermediate Data Size: 220 bytes + Result destination: Send to 2 nodes -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 Tasks Shown: One of 4 @@ -2063,6 +2065,8 @@ WHERE EXISTS(SELECT random() FROM dist_table NATURAL JOIN ref_table); Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) -> Distributed Subplan XXX_1 + Intermediate Data Size: 140 bytes + Result destination: Send to 2 nodes -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 Tasks Shown: One of 4 @@ -2100,6 +2104,8 @@ WITH r AS ( SELECT count(distinct a2) FROM s; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) -> Distributed Subplan XXX_1 + Intermediate Data Size: 100 bytes + Result destination: Write locally -> Custom Scan (Citus Adaptive) (actual rows=20 loops=1) Task Count: 4 Tasks Shown: One of 4 @@ -2109,6 +2115,8 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) -> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1) Filter: ((worker_hash(a) >= '-2147483648'::integer) AND (worker_hash(a) <= '-1073741825'::integer)) -> Distributed Subplan XXX_2 + Intermediate Data Size: 220 bytes + Result destination: Write locally -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 1 Tasks Shown: All diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index b55d01668..bf6497617 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -203,6 +203,10 @@ EXPLAIN (COSTS OFF) SELECT * FROM distributed_table WHERE key = 1 AND age = 20; EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distributed_table WHERE key = 1 AND age = 20; +EXPLAIN (ANALYZE ON, COSTS OFF, SUMMARY OFF, TIMING OFF) +WITH r AS ( SELECT random() z,* FROM distributed_table) +SELECT * FROM r WHERE z < 2; + EXPLAIN (COSTS OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20; EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20;