diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 99b8edd01..ef6a789b6 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -3731,6 +3731,21 @@ ReceiveResults(WorkerSession *session, bool storeRows) { executionStats->totalIntermediateResultSize += valueLength; } + + /* + * We use this to count the amount of data that has been + * received for EXPLAIN ANALYZE. + * Only EXPLAIN ANALYZE TupleDestination has originalTask + * defined. So that's why we check for it, otherwise we + * don't have to keep track of this data. + * The worker plan itself is also sent as a result in the + * same task. We filter this out by only counting the data + * from the first query. + */ + if (tupleDest->originalTask && queryIndex == 0) + { + tupleDest->originalTask->totalReceivedData += valueLength; + } } } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index ac0aaa769..38b3d4726 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -116,13 +116,14 @@ typedef struct ExplainAnalyzeDestination /* Explain functions for distributed queries */ static void ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es); -static void ExplainJob(Job *job, ExplainState *es); +static void ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es); static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es); -static void ExplainTaskList(List *taskList, ExplainState *es); +static void ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es); static RemoteExplainPlan * RemoteExplain(Task *task, ExplainState *es); static RemoteExplainPlan * GetSavedRemoteExplain(Task *task, ExplainState *es); static RemoteExplainPlan * FetchRemoteExplainFromWorkers(Task *task, ExplainState *es); -static void ExplainTask(Task *task, int placementIndex, List *explainOutputList, +static void ExplainTask(CitusScanState *scanState, Task *task, int placementIndex, + List *explainOutputList, ExplainState *es); static void ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOutputList, ExplainState *es); @@ -154,6 +155,9 @@ static void ExplainOneQuery(Query *query, int cursorOptions, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv); static double elapsed_time(instr_time *starttime); +static void ExplainPropertyBytes(const char *qlabel, int64 bytes, ExplainState *es); +static uint64 TaskReceivedData(Task *task); +static bool ShowReceivedData(CitusScanState *scanState, ExplainState *es); /* exports for SQL callable functions */ @@ -187,7 +191,7 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es ExplainSubPlans(distributedPlan, es); } - ExplainJob(distributedPlan->workerJob, es); + ExplainJob(scanState, distributedPlan->workerJob, es); ExplainCloseGroup("Distributed Query", "Distributed Query", true, es); } @@ -287,8 +291,8 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) 2, es); } - ExplainPropertyInteger("Intermediate Data Size", "bytes", - subPlan->bytesSentPerWorker, es); + ExplainPropertyBytes("Intermediate Data Size", + subPlan->bytesSentPerWorker, es); StringInfo destination = makeStringInfo(); if (subPlan->remoteWorkerCount && subPlan->writeLocalFile) @@ -323,13 +327,37 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) } +/* + * ExplainPropertyBytes formats bytes in a human readable way by using + * pg_size_pretty. + */ +static void +ExplainPropertyBytes(const char *qlabel, int64 bytes, ExplainState *es) +{ + Datum textDatum = DirectFunctionCall1(pg_size_pretty, Int64GetDatum(bytes)); + ExplainPropertyText(qlabel, text_to_cstring(DatumGetTextP(textDatum)), es); +} + + +/* + * ShowReceivedData returns true if explain should show received data. This is + * only the case when using EXPLAIN ANALYZE on queries that return rows. + */ +static bool +ShowReceivedData(CitusScanState *scanState, ExplainState *es) +{ + TupleDesc tupDesc = ScanStateGetTupleDescriptor(scanState); + return es->analyze && tupDesc != NULL && tupDesc->natts > 0; +} + + /* * ExplainJob shows the EXPLAIN output for a Job in the physical plan of * a distributed query by showing the remote EXPLAIN for the first task, * or all tasks if citus.explain_all_tasks is on. */ static void -ExplainJob(Job *job, ExplainState *es) +ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es) { List *dependentJobList = job->dependentJobList; int dependentJobCount = list_length(dependentJobList); @@ -340,6 +368,18 @@ ExplainJob(Job *job, ExplainState *es) ExplainOpenGroup("Job", "Job", true, es); ExplainPropertyInteger("Task Count", NULL, taskCount, es); + if (ShowReceivedData(scanState, es)) + { + Task *task = NULL; + uint64 totalReceivedDataForAllTasks = 0; + foreach_ptr(task, taskList) + { + totalReceivedDataForAllTasks += TaskReceivedData(task); + } + ExplainPropertyBytes("Data received from workers", + totalReceivedDataForAllTasks, + es); + } if (dependentJobCount > 0) { @@ -366,7 +406,7 @@ ExplainJob(Job *job, ExplainState *es) { ExplainOpenGroup("Tasks", "Tasks", false, es); - ExplainTaskList(taskList, es); + ExplainTaskList(scanState, taskList, es); ExplainCloseGroup("Tasks", "Tasks", false, es); } @@ -392,6 +432,23 @@ ExplainJob(Job *job, ExplainState *es) } +/* + * TaskReceivedData returns the amount of data that was received by the + * coordinator for the task. If it's a RETURNING DML task the value stored in + * totalReceivedData is not correct yet because it only counts the bytes for + * one placement. + */ +static uint64 +TaskReceivedData(Task *task) +{ + if (task->taskType == MODIFY_TASK) + { + return task->totalReceivedData * list_length(task->taskPlacementList); + } + return task->totalReceivedData; +} + + /* * ExplainMapMergeJob shows a very basic EXPLAIN plan for a MapMergeJob. It does * not yet show the EXPLAIN plan for the individual tasks, because this requires @@ -449,7 +506,7 @@ ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es) * or all tasks if citus.explain_all_tasks is on. */ static void -ExplainTaskList(List *taskList, ExplainState *es) +ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es) { ListCell *taskCell = NULL; ListCell *remoteExplainCell = NULL; @@ -477,7 +534,7 @@ ExplainTaskList(List *taskList, ExplainState *es) RemoteExplainPlan *remoteExplain = (RemoteExplainPlan *) lfirst(remoteExplainCell); - ExplainTask(task, remoteExplain->placementIndex, + ExplainTask(scanState, task, remoteExplain->placementIndex, remoteExplain->explainOutputList, es); } } @@ -623,7 +680,9 @@ FetchRemoteExplainFromWorkers(Task *task, ExplainState *es) * then the EXPLAIN output could not be fetched from any placement. */ static void -ExplainTask(Task *task, int placementIndex, List *explainOutputList, ExplainState *es) +ExplainTask(CitusScanState *scanState, Task *task, int placementIndex, + List *explainOutputList, + ExplainState *es) { ExplainOpenGroup("Task", NULL, true, es); @@ -640,6 +699,13 @@ ExplainTask(Task *task, int placementIndex, List *explainOutputList, ExplainStat ExplainPropertyText("Query", queryText, es); } + if (ShowReceivedData(scanState, es)) + { + ExplainPropertyBytes("Data received from worker", + TaskReceivedData(task), + es); + } + if (explainOutputList != NIL) { List *taskPlacementList = task->taskPlacementList; @@ -1092,6 +1158,7 @@ CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest) tupleDestination->pub.putTuple = ExplainAnalyzeDestPutTuple; tupleDestination->pub.tupleDescForQuery = ExplainAnalyzeDestTupleDescForQuery; + tupleDestination->pub.originalTask = task; return (TupleDestination *) tupleDestination; } diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 18ca29582..0de85253b 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -329,6 +329,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(parametersInQueryStringResolved); COPY_SCALAR_FIELD(tupleDest); COPY_SCALAR_FIELD(queryCount); + COPY_SCALAR_FIELD(totalReceivedData); COPY_SCALAR_FIELD(fetchedExplainAnalyzePlacementIndex); COPY_STRING_FIELD(fetchedExplainAnalyzePlan); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index b37cb9ef6..e96931c1f 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -341,7 +341,11 @@ typedef struct Task /* * EXPLAIN ANALYZE output fetched from worker. This is saved to be used later * by RemoteExplain(). + * + * totalReceivedData only counts the data for a single placement. So for + * RETURNING DML this is not really correct. */ + uint64 totalReceivedData; char *fetchedExplainAnalyzePlan; int fetchedExplainAnalyzePlacementIndex; } Task; diff --git a/src/include/distributed/tuple_destination.h b/src/include/distributed/tuple_destination.h index c04d4f7f9..d57cdc3b7 100644 --- a/src/include/distributed/tuple_destination.h +++ b/src/include/distributed/tuple_destination.h @@ -36,6 +36,7 @@ struct TupleDestination /* tupleDescForQuery returns tuple descriptor for a query number. Can return NULL. */ TupleDesc (*tupleDescForQuery)(TupleDestination *self, int queryNumber); + Task *originalTask; }; extern TupleDestination * CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 177a7604e..21da46798 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -296,17 +296,19 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distribute --------------------------------------------------------------------- Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 + Data received from workers: 5 bytes Tasks Shown: All -> Task + Data received from worker: 5 bytes Node: host=localhost port=xxxxx dbname=regression -> Index Scan using distributed_table_pkey_1470001 on distributed_table_1470001 distributed_table (actual rows=1 loops=1) Index Cond: (key = 1) Filter: (age = 20) -(8 rows) +(10 rows) EXPLAIN (ANALYZE ON, COSTS OFF, SUMMARY OFF, TIMING OFF) WITH r AS ( SELECT random() z,* FROM distributed_table) -SELECT * FROM r WHERE z < 2; +SELECT 1 FROM r WHERE z < 2; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus Adaptive) (actual rows=1 loops=1) @@ -315,17 +317,21 @@ SELECT * FROM r WHERE z < 2; Result destination: Write locally -> Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 4 + Data received from workers: 22 bytes Tasks Shown: One of 4 -> Task + Data received from worker: 22 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on distributed_table_1470001 distributed_table (actual rows=1 loops=1) Task Count: 1 + Data received from workers: 1 bytes Tasks Shown: All -> Task + Data received from worker: 1 bytes Node: host=localhost port=xxxxx dbname=regression -> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1) Filter: (z < '2'::double precision) -(16 rows) +(20 rows) EXPLAIN (COSTS OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20; QUERY PLAN diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 2762a2a43..47f0121a6 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -283,8 +283,10 @@ Sort (actual rows=50 loops=1) Group Key: remote_scan.l_quantity -> Custom Scan (Citus Adaptive) (actual rows=100 loops=1) Task Count: 2 + Data received from workers: 780 bytes Tasks Shown: One of 2 -> Task + Data received from worker: 390 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Group Key: l_quantity @@ -301,6 +303,7 @@ EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) SELECT count(*) FROM t1 Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=4 loops=1) Task Count: 4 + Data received from workers: 4 bytes Tasks Shown: None, not supported for re-partition queries -> MapMergeJob Map Task Count: 3 @@ -324,9 +327,11 @@ Sort (actual rows=50 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=100 loops=1) Output: remote_scan.l_quantity, remote_scan.count_quantity Task Count: 2 + Data received from workers: 780 bytes Tasks Shown: One of 2 -> Task Query: SELECT l_quantity, count(*) AS count_quantity FROM lineitem_290000 lineitem WHERE true GROUP BY l_quantity + Data received from worker: 390 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Output: l_quantity, count(*) @@ -966,13 +971,16 @@ Sort (actual rows=50 loops=1) Group Key: remote_scan.l_quantity -> Custom Scan (Citus Adaptive) (actual rows=100 loops=1) Task Count: 2 + Data received from workers: 780 bytes Tasks Shown: All -> Task + Data received from worker: 390 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Group Key: l_quantity -> Seq Scan on lineitem_290000 lineitem (actual rows=6000 loops=1) -> Task + Data received from worker: 390 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Group Key: l_quantity @@ -1239,8 +1247,10 @@ Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=18) EXPLAIN (ANALYZE ON, COSTS OFF, TIMING OFF, SUMMARY OFF) EXECUTE router_executor_query_param(5); Custom Scan (Citus Adaptive) (actual rows=3 loops=1) Task Count: 1 + Data received from workers: 0 bytes Tasks Shown: All -> Task + Data received from worker: 0 bytes Node: host=localhost port=xxxxx dbname=regression -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (actual rows=3 loops=1) Index Cond: (l_orderkey = 5) @@ -1796,8 +1806,10 @@ SELECT create_distributed_table('explain_analyze_test', 'a'); EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 1; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 + Data received from workers: 8 bytes Tasks Shown: All -> Task + Data received from worker: 8 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1) Filter: (a = 1) @@ -1806,11 +1818,37 @@ EXPLAIN :default_analyze_flags SELECT count(*) FROM explain_analyze_test; Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=4 loops=1) Task Count: 4 + Data received from workers: 4 bytes Tasks Shown: One of 4 -> Task + Data received from worker: 1 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1) +-- empty router SELECT +EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 10000; +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Data received from workers: 0 bytes + Tasks Shown: All + -> Task + Data received from worker: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on explain_analyze_test_570012 explain_analyze_test (actual rows=0 loops=1) + Filter: (a = 10000) + Rows Removed by Filter: 1 +-- empty multi-shard SELECT +EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE b = 'does not exist'; +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 4 + Data received from workers: 0 bytes + Tasks Shown: One of 4 + -> Task + Data received from worker: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=0 loops=1) + Filter: (b = 'does not exist'::text) + Rows Removed by Filter: 1 -- router DML BEGIN; EXPLAIN :default_analyze_flags DELETE FROM explain_analyze_test WHERE a = 1; @@ -1857,6 +1895,32 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1) -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1) SELECT * FROM explain_analyze_test ORDER BY a; ROLLBACK; +-- router DML with RETURNING with empty result +EXPLAIN :default_analyze_flags UPDATE explain_analyze_test SET b = 'something' WHERE a = 10000 RETURNING *; +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Data received from workers: 0 bytes + Tasks Shown: All + -> Task + Data received from worker: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Update on explain_analyze_test_570012 explain_analyze_test (actual rows=0 loops=1) + -> Seq Scan on explain_analyze_test_570012 explain_analyze_test (actual rows=0 loops=1) + Filter: (a = 10000) + Rows Removed by Filter: 1 +-- multi-shard DML with RETURNING with empty result +EXPLAIN :default_analyze_flags UPDATE explain_analyze_test SET b = 'something' WHERE b = 'does not exist' RETURNING *; +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 4 + Data received from workers: 0 bytes + Tasks Shown: One of 4 + -> Task + Data received from worker: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Update on explain_analyze_test_570009 explain_analyze_test (actual rows=0 loops=1) + -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=0 loops=1) + Filter: (b = 'does not exist'::text) + Rows Removed by Filter: 1 -- single-row insert BEGIN; EXPLAIN :default_analyze_flags INSERT INTO explain_analyze_test VALUES (5, 'value 5'); @@ -1964,6 +2028,50 @@ EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT JSON) INSERT INT } ] ROLLBACK; +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT JSON) SELECT * FROM explain_pk; +[ + { + "Plan": { + "Node Type": "Custom Scan", + "Custom Plan Provider": "Citus Adaptive", + "Parallel Aware": false, + "Actual Rows": 0, + "Actual Loops": 1, + "Distributed Query": { + "Job": { + "Task Count": 4, + "Data received from workers": "0 bytes", + "Tasks Shown": "One of 4", + "Tasks": [ + { + "Data received from worker": "0 bytes", + "Node": "host=localhost port=xxxxx dbname=regression", + "Remote Plan": [ + [ + { + "Plan": { + "Node Type": "Seq Scan", + "Parallel Aware": false, + "Relation Name": "explain_pk_570013", + "Alias": "explain_pk", + "Actual Rows": 0, + "Actual Loops": 1 + }, + "Triggers": [ + ] + } + ] + + ] + } + ] + } + } + }, + "Triggers": [ + ] + } +] BEGIN; EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT XML) INSERT INTO explain_pk VALUES (1, 2), (2, 3); @@ -2017,6 +2125,49 @@ EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT XML) INSERT INTO ROLLBACK; +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT XML) SELECT * FROM explain_pk; + + + + Custom Scan + Citus Adaptive + false + 0 + 1 + + + 4 + 0 bytes + One of 4 + + + 0 bytes + host=localhost port=xxxxx dbname=regression + + + + + Seq Scan + false + explain_pk_570013 + explain_pk + 0 + 1 + + + + + + + + + + + + + + + DROP TABLE explain_pk; -- test EXPLAIN ANALYZE with CTEs and subqueries CREATE TABLE dist_table(a int, b int); @@ -2029,7 +2180,7 @@ INSERT INTO dist_table SELECT i, i*i FROM generate_series(1, 10) i; INSERT INTO ref_table SELECT i FROM generate_series(1, 10) i; EXPLAIN :default_analyze_flags WITH r AS ( - SELECT random() r, a FROM dist_table + SELECT GREATEST(random(), 2) r, a FROM dist_table ) SELECT count(distinct a) from r NATURAL JOIN ref_table; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) @@ -2038,13 +2189,17 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Result destination: Send to 2 nodes -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 + Data received from workers: 21 bytes Tasks Shown: One of 4 -> Task + Data received from worker: 9 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1) Task Count: 1 + Data received from workers: 2 bytes Tasks Shown: All -> Task + Data received from worker: 2 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Hash Join (actual rows=10 loops=1) @@ -2054,12 +2209,14 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Buckets: 1024 Batches: 1 Memory Usage: 9kB -> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1) EXPLAIN :default_analyze_flags -SELECT count(distinct a) FROM (SELECT random() r, a FROM dist_table) t NATURAL JOIN ref_table; +SELECT count(distinct a) FROM (SELECT GREATEST(random(), 2) r, a FROM dist_table) t NATURAL JOIN ref_table; Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 + Data received from workers: 11 bytes Tasks Shown: One of 4 -> Task + Data received from worker: 5 bytes Node: host=localhost port=xxxxx dbname=regression -> Group (actual rows=4 loops=1) Group Key: t.a @@ -2076,16 +2233,18 @@ Aggregate (actual rows=1 loops=1) -> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1) EXPLAIN :default_analyze_flags SELECT count(distinct a) FROM dist_table -WHERE EXISTS(SELECT random() FROM dist_table NATURAL JOIN ref_table); +WHERE EXISTS(SELECT random() < 2 FROM dist_table NATURAL JOIN ref_table); Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) -> Distributed Subplan XXX_1 - Intermediate Data Size: 140 bytes + Intermediate Data Size: 70 bytes Result destination: Send to 2 nodes -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 + Data received from workers: 10 bytes Tasks Shown: One of 4 -> Task + Data received from worker: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Merge Join (actual rows=4 loops=1) Merge Cond: (dist_table.a = ref_table.a) @@ -2098,8 +2257,10 @@ Aggregate (actual rows=1 loops=1) Sort Method: quicksort Memory: 25kB -> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1) Task Count: 4 + Data received from workers: 11 bytes Tasks Shown: One of 4 -> Task + Data received from worker: 5 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=4 loops=1) Group Key: dist_table.a @@ -2114,7 +2275,7 @@ WITH r AS ( INSERT INTO dist_table SELECT a, a * a FROM dist_table RETURNING a ), s AS ( - SELECT random(), a * a a2 FROM r + SELECT random() < 2, a * a a2 FROM r ) SELECT count(distinct a2) FROM s; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) @@ -2123,26 +2284,81 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Result destination: Write locally -> Custom Scan (Citus Adaptive) (actual rows=20 loops=1) Task Count: 4 + Data received from workers: 44 bytes Tasks Shown: One of 4 -> Task + Data received from worker: 20 bytes Node: host=localhost port=xxxxx dbname=regression -> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1) -> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1) Filter: ((worker_hash(a) >= '-2147483648'::integer) AND (worker_hash(a) <= '-1073741825'::integer)) -> Distributed Subplan XXX_2 - Intermediate Data Size: 220 bytes + Intermediate Data Size: 150 bytes Result destination: Write locally -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 1 + Data received from workers: 28 bytes Tasks Shown: All -> Task + Data received from worker: 28 bytes Node: host=localhost port=xxxxx dbname=regression -> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1) Task Count: 1 + Data received from workers: 2 bytes Tasks Shown: All -> Task + Data received from worker: 2 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1) ROLLBACK; DROP TABLE ref_table, dist_table; +-- test EXPLAIN ANALYZE with different replication factors +SET citus.shard_count = 2; +SET citus.shard_replication_factor = 1; +CREATE TABLE dist_table_rep1(a int); +SELECT create_distributed_table('dist_table_rep1', 'a'); + +SET citus.shard_replication_factor = 2; +CREATE TABLE dist_table_rep2(a int); +SELECT create_distributed_table('dist_table_rep2', 'a'); + +EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep1 VALUES(1), (2), (3), (4) RETURNING *; +Custom Scan (Citus Adaptive) (actual rows=4 loops=1) + Task Count: 2 + Data received from workers: 4 bytes + Tasks Shown: One of 2 + -> Task + Data received from worker: 3 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Insert on dist_table_rep1_570022 citus_table_alias (actual rows=3 loops=1) + -> Values Scan on "*VALUES*" (actual rows=3 loops=1) +EXPLAIN :default_analyze_flags SELECT * from dist_table_rep1; +Custom Scan (Citus Adaptive) (actual rows=4 loops=1) + Task Count: 2 + Data received from workers: 4 bytes + Tasks Shown: One of 2 + -> Task + Data received from worker: 3 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=3 loops=1) +EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep2 VALUES(1), (2), (3), (4) RETURNING *; +Custom Scan (Citus Adaptive) (actual rows=4 loops=1) + Task Count: 2 + Data received from workers: 8 bytes + Tasks Shown: One of 2 + -> Task + Data received from worker: 6 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Insert on dist_table_rep2_570024 citus_table_alias (actual rows=3 loops=1) + -> Values Scan on "*VALUES*" (actual rows=3 loops=1) +EXPLAIN :default_analyze_flags SELECT * from dist_table_rep2; +Custom Scan (Citus Adaptive) (actual rows=4 loops=1) + Task Count: 2 + Data received from workers: 4 bytes + Tasks Shown: One of 2 + -> Task + Data received from worker: 3 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep2_570024 dist_table_rep2 (actual rows=3 loops=1) +DROP TABLE dist_table_rep1, dist_table_rep2; diff --git a/src/test/regress/sql/binary_protocol.sql b/src/test/regress/sql/binary_protocol.sql index 1c4fd858e..4fb2a0cac 100644 --- a/src/test/regress/sql/binary_protocol.sql +++ b/src/test/regress/sql/binary_protocol.sql @@ -1,4 +1,4 @@ -SET citus.shard_count = 4; +SET citus.shard_count = 2; SET citus.next_shard_id TO 4754000; CREATE SCHEMA binary_protocol; SET search_path TO binary_protocol; @@ -19,6 +19,12 @@ SELECT id, id, id, id, id, id, id, id, id, id FROM t ORDER BY id; +-- EXPLAIN ANALYZE is currently forced to use text protocol. Once that is +-- changed the numbers reported should change. +EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM t ORDER BY 1; +SET citus.explain_all_tasks TO ON; +EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM t ORDER BY 1; + INSERT INTO t SELECT count(*) from t; INSERT INTO t (SELECT id+1 from t); diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index bf6497617..24f3015bf 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -205,7 +205,7 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distribute EXPLAIN (ANALYZE ON, COSTS OFF, SUMMARY OFF, TIMING OFF) WITH r AS ( SELECT random() z,* FROM distributed_table) -SELECT * FROM r WHERE z < 2; +SELECT 1 FROM r WHERE z < 2; EXPLAIN (COSTS OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20; diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index 6c1caafbd..86b6e0fc2 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -769,6 +769,12 @@ EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 1; -- multi-shard SELECT EXPLAIN :default_analyze_flags SELECT count(*) FROM explain_analyze_test; +-- empty router SELECT +EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 10000; + +-- empty multi-shard SELECT +EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE b = 'does not exist'; + -- router DML BEGIN; EXPLAIN :default_analyze_flags DELETE FROM explain_analyze_test WHERE a = 1; @@ -783,6 +789,12 @@ EXPLAIN :default_analyze_flags DELETE FROM explain_analyze_test; SELECT * FROM explain_analyze_test ORDER BY a; ROLLBACK; +-- router DML with RETURNING with empty result +EXPLAIN :default_analyze_flags UPDATE explain_analyze_test SET b = 'something' WHERE a = 10000 RETURNING *; +-- multi-shard DML with RETURNING with empty result +EXPLAIN :default_analyze_flags UPDATE explain_analyze_test SET b = 'something' WHERE b = 'does not exist' RETURNING *; + + -- single-row insert BEGIN; EXPLAIN :default_analyze_flags INSERT INTO explain_analyze_test VALUES (5, 'value 5'); @@ -814,10 +826,14 @@ BEGIN; EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT JSON) INSERT INTO explain_pk VALUES (1, 2), (2, 3); ROLLBACK; +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT JSON) SELECT * FROM explain_pk; + BEGIN; EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT XML) INSERT INTO explain_pk VALUES (1, 2), (2, 3); ROLLBACK; +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT XML) SELECT * FROM explain_pk; + DROP TABLE explain_pk; -- test EXPLAIN ANALYZE with CTEs and subqueries @@ -831,16 +847,16 @@ INSERT INTO ref_table SELECT i FROM generate_series(1, 10) i; EXPLAIN :default_analyze_flags WITH r AS ( - SELECT random() r, a FROM dist_table + SELECT GREATEST(random(), 2) r, a FROM dist_table ) SELECT count(distinct a) from r NATURAL JOIN ref_table; EXPLAIN :default_analyze_flags -SELECT count(distinct a) FROM (SELECT random() r, a FROM dist_table) t NATURAL JOIN ref_table; +SELECT count(distinct a) FROM (SELECT GREATEST(random(), 2) r, a FROM dist_table) t NATURAL JOIN ref_table; EXPLAIN :default_analyze_flags SELECT count(distinct a) FROM dist_table -WHERE EXISTS(SELECT random() FROM dist_table NATURAL JOIN ref_table); +WHERE EXISTS(SELECT random() < 2 FROM dist_table NATURAL JOIN ref_table); BEGIN; EXPLAIN :default_analyze_flags @@ -848,9 +864,27 @@ WITH r AS ( INSERT INTO dist_table SELECT a, a * a FROM dist_table RETURNING a ), s AS ( - SELECT random(), a * a a2 FROM r + SELECT random() < 2, a * a a2 FROM r ) SELECT count(distinct a2) FROM s; ROLLBACK; DROP TABLE ref_table, dist_table; + +-- test EXPLAIN ANALYZE with different replication factors +SET citus.shard_count = 2; +SET citus.shard_replication_factor = 1; +CREATE TABLE dist_table_rep1(a int); +SELECT create_distributed_table('dist_table_rep1', 'a'); + +SET citus.shard_replication_factor = 2; +CREATE TABLE dist_table_rep2(a int); +SELECT create_distributed_table('dist_table_rep2', 'a'); + +EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep1 VALUES(1), (2), (3), (4) RETURNING *; +EXPLAIN :default_analyze_flags SELECT * from dist_table_rep1; + +EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep2 VALUES(1), (2), (3), (4) RETURNING *; +EXPLAIN :default_analyze_flags SELECT * from dist_table_rep2; + +DROP TABLE dist_table_rep1, dist_table_rep2;