fix-4212
Teja Mupparti 2025-05-20 16:36:48 -07:00
parent 5e37fe0c46
commit 369d636b22
8 changed files with 151 additions and 101 deletions

View File

@ -189,7 +189,8 @@ typedef struct SerializeDestReceiver
/* Explain functions for distributed queries */
static void ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es);
static void ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es,
bool queryExecuted);
static void ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es,
ParamListInfo params);
static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es);
@ -296,7 +297,7 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es
if (distributedPlan->subPlanList != NIL)
{
ExplainSubPlans(distributedPlan, es);
ExplainSubPlans(distributedPlan, es, scanState->finishedRemoteScan);
}
ExplainJob(scanState, distributedPlan->workerJob, es, params);
@ -434,13 +435,28 @@ NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors,
* planning time and set it to 0.
*/
static void
ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es, bool queryExecuted)
{
ListCell *subPlanCell = NULL;
uint64 planId = distributedPlan->planId;
bool analyzeEnabled = es->analyze;
bool timingEnabled = es->timing;
bool walEnabled = es->wal;
ExplainOpenGroup("Subplans", "Subplans", false, es);
if (queryExecuted)
{
/*
* Subplans are already executed recursively when
* executing the top-level of the plan. Here, we just
* need to explain them but not execute them again.
*/
es->analyze = false;
es->timing = false;
es->wal = false;
}
foreach(subPlanCell, distributedPlan->subPlanList)
{
DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell);
@ -488,9 +504,9 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
ExplainOpenGroup("Subplan", NULL, true, es);
if (es->analyze)
if (analyzeEnabled)
{
if (es->timing)
if (timingEnabled)
{
ExplainPropertyFloat("Subplan Duration", "ms", subPlan->durationMillisecs,
2, es);
@ -553,6 +569,14 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
}
}
/* Restore the settings */
if (queryExecuted)
{
es->analyze = analyzeEnabled;
es->timing = timingEnabled;
es->wal = walEnabled;
}
ExplainCloseGroup("Subplans", "Subplans", false, es);
}

View File

@ -330,20 +330,18 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distribute
EXPLAIN (ANALYZE ON, COSTS OFF, SUMMARY OFF, TIMING OFF)
WITH r AS ( SELECT GREATEST(random(), 2) z,* FROM distributed_table)
SELECT 1 FROM r WHERE z < 3;
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 40 bytes
Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tuple data received from nodes: 22 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 22 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on distributed_table_1470001 distributed_table (actual rows=1 loops=1)
-> Seq Scan on distributed_table_1470001 distributed_table
Task Count: 1
Tuple data received from nodes: 4 bytes
Tasks Shown: All
@ -352,7 +350,7 @@ SELECT 1 FROM r WHERE z < 3;
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1)
Filter: (z < '3'::double precision)
(20 rows)
(18 rows)
EXPLAIN (COSTS OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20;
QUERY PLAN

View File

@ -268,20 +268,18 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distribute
EXPLAIN (ANALYZE ON, COSTS OFF, SUMMARY OFF, TIMING OFF)
WITH r AS ( SELECT GREATEST(random(), 2) z,* FROM distributed_table)
SELECT 1 FROM r WHERE z < 3;
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 40 bytes
Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tuple data received from nodes: 22 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 22 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on distributed_table_1500001 distributed_table (actual rows=1 loops=1)
-> Seq Scan on distributed_table_1500001 distributed_table
Task Count: 1
Tuple data received from nodes: 4 bytes
Tasks Shown: All
@ -290,7 +288,7 @@ SELECT 1 FROM r WHERE z < 3;
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1)
Filter: (z < '3'::double precision)
(20 rows)
(18 rows)
EXPLAIN (COSTS OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20;
QUERY PLAN

View File

@ -385,10 +385,9 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 14 bytes
Result destination: Write locally
-> Aggregate (actual rows=1 loops=1)
-> Custom Scan (Citus Adaptive) (actual rows=6 loops=1)
-> Aggregate
-> Custom Scan (Citus Adaptive)
Task Count: 6
Tuple data received from nodes: 48 bytes
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 3
@ -2390,14 +2389,12 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 220 bytes
Result destination: Send to 3 nodes
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tuple data received from nodes: 120 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 48 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1)
-> Seq Scan on dist_table_570017 dist_table
Task Count: 1
Tuple data received from nodes: 8 bytes
Tasks Shown: All
@ -2447,23 +2444,19 @@ Aggregate (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 70 bytes
Result destination: Send to 2 nodes
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tuple data received from nodes: 10 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Merge Join (actual rows=4 loops=1)
-> Merge Join
Merge Cond: (dist_table.a = ref_table.a)
-> Sort (actual rows=4 loops=1)
-> Sort
Sort Key: dist_table.a
Sort Method: quicksort Memory: 25kB
-> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1)
-> Sort (actual rows=10 loops=1)
-> Seq Scan on dist_table_570017 dist_table
-> Sort
Sort Key: ref_table.a
Sort Method: quicksort Memory: 25kB
-> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1)
-> Seq Scan on ref_table_570021 ref_table
Task Count: 4
Tuple data received from nodes: 32 bytes
Tasks Shown: One of 4
@ -2492,27 +2485,23 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 100 bytes
Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=20 loops=1)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tuple data received from nodes: 160 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 64 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1)
-> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1)
-> Insert on dist_table_570017 citus_table_alias
-> Seq Scan on dist_table_570017 dist_table
Filter: (a IS NOT NULL)
-> Distributed Subplan XXX_2
Intermediate Data Size: 150 bytes
Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
-> Custom Scan (Citus Adaptive)
Task Count: 1
Tuple data received from nodes: 50 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 50 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1)
-> Function Scan on read_intermediate_result intermediate_result
Task Count: 1
Tuple data received from nodes: 8 bytes
Tasks Shown: All
@ -3006,17 +2995,14 @@ Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 0 bytes
Result destination: Send to 0 nodes
-> Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tuple data received from nodes: 0 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Delete on lineitem_hash_part_360041 lineitem_hash_part (actual rows=0 loops=1)
-> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part (actual rows=0 loops=1)
-> Delete on lineitem_hash_part_360041 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part
Filter: (l_quantity < '-1'::numeric)
Rows Removed by Filter: 2885
Task Count: 1
Tuple data received from nodes: 40 bytes
Tasks Shown: All
@ -3138,13 +3124,13 @@ Limit (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 14 bytes
Result destination: Send to 2 nodes
-> WindowAgg (actual rows=1 loops=1)
-> Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> WindowAgg
-> Custom Scan (Citus Adaptive)
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on distributed_table_1_570032 distributed_table_1 (actual rows=1 loops=1)
-> Seq Scan on distributed_table_1_570032 distributed_table_1
Task Count: 2
Tuple data received from nodes: 16 bytes
Tasks Shown: One of 2
@ -3240,5 +3226,34 @@ set auto_explain.log_analyze to true;
-- the following should not be locally executed since explain analyze is on
select * from test_ref_table;
DROP SCHEMA test_auto_explain CASCADE;
SET search_path TO multi_explain;
-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212)
CREATE TABLE test_subplans (x int primary key, y int);
SELECT create_distributed_table('test_subplans','x');
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 18 bytes
Result destination: Write locally
-> Custom Scan (Citus Adaptive)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Insert on test_subplans_570039
-> Result
Task Count: 1
Tuple data received from nodes: 8 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 8 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1)
-- Only one row must exist
SELECT * FROM test_subplans;
1|2
SET client_min_messages TO ERROR;
DROP SCHEMA multi_explain CASCADE;

View File

@ -385,10 +385,9 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 14 bytes
Result destination: Write locally
-> Aggregate (actual rows=1 loops=1)
-> Custom Scan (Citus Adaptive) (actual rows=6 loops=1)
-> Aggregate
-> Custom Scan (Citus Adaptive)
Task Count: 6
Tuple data received from nodes: 48 bytes
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 3
@ -2390,14 +2389,12 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 220 bytes
Result destination: Send to 3 nodes
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tuple data received from nodes: 120 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 48 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1)
-> Seq Scan on dist_table_570017 dist_table
Task Count: 1
Tuple data received from nodes: 8 bytes
Tasks Shown: All
@ -2442,23 +2439,19 @@ Aggregate (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 70 bytes
Result destination: Send to 2 nodes
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tuple data received from nodes: 10 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Merge Join (actual rows=4 loops=1)
-> Merge Join
Merge Cond: (dist_table.a = ref_table.a)
-> Sort (actual rows=4 loops=1)
-> Sort
Sort Key: dist_table.a
Sort Method: quicksort Memory: 25kB
-> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1)
-> Sort (actual rows=10 loops=1)
-> Seq Scan on dist_table_570017 dist_table
-> Sort
Sort Key: ref_table.a
Sort Method: quicksort Memory: 25kB
-> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1)
-> Seq Scan on ref_table_570021 ref_table
Task Count: 4
Tuple data received from nodes: 32 bytes
Tasks Shown: One of 4
@ -2484,27 +2477,23 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 100 bytes
Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=20 loops=1)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tuple data received from nodes: 160 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 64 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1)
-> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1)
-> Insert on dist_table_570017 citus_table_alias
-> Seq Scan on dist_table_570017 dist_table
Filter: (a IS NOT NULL)
-> Distributed Subplan XXX_2
Intermediate Data Size: 150 bytes
Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
-> Custom Scan (Citus Adaptive)
Task Count: 1
Tuple data received from nodes: 50 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 50 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1)
-> Function Scan on read_intermediate_result intermediate_result
Task Count: 1
Tuple data received from nodes: 8 bytes
Tasks Shown: All
@ -2995,17 +2984,14 @@ Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 0 bytes
Result destination: Send to 0 nodes
-> Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tuple data received from nodes: 0 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Delete on lineitem_hash_part_360041 lineitem_hash_part (actual rows=0 loops=1)
-> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part (actual rows=0 loops=1)
-> Delete on lineitem_hash_part_360041 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part
Filter: (l_quantity < '-1'::numeric)
Rows Removed by Filter: 2885
Task Count: 1
Tuple data received from nodes: 40 bytes
Tasks Shown: All
@ -3127,13 +3113,13 @@ Limit (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 14 bytes
Result destination: Send to 2 nodes
-> WindowAgg (actual rows=1 loops=1)
-> Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> WindowAgg
-> Custom Scan (Citus Adaptive)
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on distributed_table_1_570032 distributed_table_1 (actual rows=1 loops=1)
-> Seq Scan on distributed_table_1_570032 distributed_table_1
Task Count: 2
Tuple data received from nodes: 16 bytes
Tasks Shown: One of 2
@ -3229,5 +3215,34 @@ set auto_explain.log_analyze to true;
-- the following should not be locally executed since explain analyze is on
select * from test_ref_table;
DROP SCHEMA test_auto_explain CASCADE;
SET search_path TO multi_explain;
-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212)
CREATE TABLE test_subplans (x int primary key, y int);
SELECT create_distributed_table('test_subplans','x');
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 18 bytes
Result destination: Write locally
-> Custom Scan (Citus Adaptive)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Insert on test_subplans_570039
-> Result
Task Count: 1
Tuple data received from nodes: 8 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 8 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1)
-- Only one row must exist
SELECT * FROM test_subplans;
1|2
SET client_min_messages TO ERROR;
DROP SCHEMA multi_explain CASCADE;

View File

@ -721,13 +721,11 @@ CALL exec_query_and_check_query_counters($$
0, 0
);
-- same with explain analyze
--
-- this time, query_execution_multi_shard is incremented twice because of #4212
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
$$,
1, 2
1, 1
);
CALL exec_query_and_check_query_counters($$
DELETE FROM dist_table WHERE a = 1
@ -1041,9 +1039,6 @@ PL/pgSQL function exec_query_and_check_query_counters(text,bigint,bigint) line X
-- A similar one but without the insert, so we would normally expect 2 increments
-- for query_execution_single_shard and 2 for query_execution_multi_shard instead
-- of 3 since the insert is not there anymore.
--
-- But this time we observe more counter increments because we execute the subplans
-- twice because of #4212.
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
-- single-shard subplan (whole cte)
@ -1057,7 +1052,7 @@ CALL exec_query_and_check_query_counters($$
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q)
JOIN cte ON q.a = cte.a
$$,
3, 4
2, 2
);
-- safe to push-down
CALL exec_query_and_check_query_counters($$

View File

@ -1182,5 +1182,15 @@ select * from test_ref_table;
DROP SCHEMA test_auto_explain CASCADE;
SET search_path TO multi_explain;
-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212)
CREATE TABLE test_subplans (x int primary key, y int);
SELECT create_distributed_table('test_subplans','x');
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
-- Only one row must exist
SELECT * FROM test_subplans;
SET client_min_messages TO ERROR;
DROP SCHEMA multi_explain CASCADE;

View File

@ -476,13 +476,11 @@ CALL exec_query_and_check_query_counters($$
);
-- same with explain analyze
--
-- this time, query_execution_multi_shard is incremented twice because of #4212
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
$$,
1, 2
1, 1
);
CALL exec_query_and_check_query_counters($$
@ -807,9 +805,6 @@ CALL exec_query_and_check_query_counters($$
-- A similar one but without the insert, so we would normally expect 2 increments
-- for query_execution_single_shard and 2 for query_execution_multi_shard instead
-- of 3 since the insert is not there anymore.
--
-- But this time we observe more counter increments because we execute the subplans
-- twice because of #4212.
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
-- single-shard subplan (whole cte)
@ -823,7 +818,7 @@ CALL exec_query_and_check_query_counters($$
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q)
JOIN cte ON q.a = cte.a
$$,
3, 4
2, 2
);
-- safe to push-down