diff --git a/src/backend/distributed/planner/intermediate_result_pruning.c b/src/backend/distributed/planner/intermediate_result_pruning.c index f01595d6b..f5809d59a 100644 --- a/src/backend/distributed/planner/intermediate_result_pruning.c +++ b/src/backend/distributed/planner/intermediate_result_pruning.c @@ -106,7 +106,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList; List *subPlanList = distributedPlan->subPlanList; ListCell *subPlanCell = NULL; - int workerNodeCount = GetWorkerNodeCount(); + int workerNodeCount = ActiveReadableWorkerNodeCount(); foreach(subPlanCell, usedSubPlanNodeList) { @@ -124,13 +124,14 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, } /* - * There is no need to traverse the whole plan if the intermediate result - * will be written to a local file and send to all nodes + * There is no need to traverse the subplan if the intermediate result + * will be written to a local file and sent to all nodes. Note that the + * remaining subplans in the distributed plan should still be traversed. */ if (list_length(entry->nodeIdList) == workerNodeCount && entry->writeLocalFile) { elog(DEBUG4, "Subplan %s is used in all workers", resultId); - break; + continue; } else if (usedPlan->locationMask & SUBPLAN_ACCESS_REMOTE) { @@ -139,7 +140,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, * * If we have reference tables in the distributed plan, all the * workers will be in the node list. We can improve intermediate result - * pruning by deciding which reference table shard will be accessed earlier + * pruning by deciding which reference table shard will be accessed earlier. */ AppendAllAccessedWorkerNodes(entry, distributedPlan, workerNodeCount); diff --git a/src/test/regress/expected/intermediate_result_pruning.out b/src/test/regress/expected/intermediate_result_pruning.out index 49c9955a4..e499e4893 100644 --- a/src/test/regress/expected/intermediate_result_pruning.out +++ b/src/test/regress/expected/intermediate_result_pruning.out @@ -933,6 +933,105 @@ DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx 0 (1 row) +-- test case for issue #3556 +CREATE TABLE accounts (id text PRIMARY KEY); +DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially +DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "accounts_pkey" for table "accounts" +DEBUG: building index "accounts_pkey" on table "accounts" serially +CREATE TABLE stats (account_id text PRIMARY KEY, spent int); +DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially +DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "stats_pkey" for table "stats" +DEBUG: building index "stats_pkey" on table "stats" serially +SELECT create_distributed_table('accounts', 'id', colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('stats', 'account_id', colocate_with => 'accounts'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO accounts (id) VALUES ('foo'); +INSERT INTO stats (account_id, spent) VALUES ('foo', 100); +SELECT * +FROM +( + WITH accounts_cte AS ( + SELECT id AS account_id + FROM accounts + ), + joined_stats_cte_1 AS ( + SELECT spent, account_id + FROM stats + INNER JOIN accounts_cte USING (account_id) + ), + joined_stats_cte_2 AS ( + SELECT spent, account_id + FROM joined_stats_cte_1 + INNER JOIN accounts_cte USING (account_id) + ) + SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) + FROM accounts_cte + INNER JOIN joined_stats_cte_2 USING (account_id) +) inner_query; +DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM intermediate_result_pruning.accounts +DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (intermediate_result_pruning.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) +DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) +DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file +DEBUG: Subplan XXX_4 will be written to local file + sum +--------------------------------------------------------------------- + 100 +(1 row) + +-- confirm that the pruning works well when using round-robin as well +SET citus.task_assignment_policy to 'round-robin'; +SELECT * +FROM +( + WITH accounts_cte AS ( + SELECT id AS account_id + FROM accounts + ), + joined_stats_cte_1 AS ( + SELECT spent, account_id + FROM stats + INNER JOIN accounts_cte USING (account_id) + ), + joined_stats_cte_2 AS ( + SELECT spent, account_id + FROM joined_stats_cte_1 + INNER JOIN accounts_cte USING (account_id) + ) + SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) + FROM accounts_cte + INNER JOIN joined_stats_cte_2 USING (account_id) +) inner_query; +DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM intermediate_result_pruning.accounts +DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (intermediate_result_pruning.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) +DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) +DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx + sum +--------------------------------------------------------------------- + 100 +(1 row) + +SET citus.task_assignment_policy to DEFAULT; SET client_min_messages TO DEFAULT; -DROP TABLE table_1, table_2, table_3, ref_table, range_partitioned; +DROP TABLE table_1, table_2, table_3, ref_table, accounts, stats, range_partitioned; DROP SCHEMA intermediate_result_pruning; diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 22d79b0f7..c604a3c76 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -61,6 +61,23 @@ SELECT create_distributed_function('get_local_node_id_volatile()'); (1 row) +-- test case for issue #3556 +CREATE TABLE accounts (id text PRIMARY KEY); +CREATE TABLE stats (account_id text PRIMARY KEY, spent int); +SELECT create_distributed_table('accounts', 'id', colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('stats', 'account_id', colocate_with => 'accounts'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO accounts (id) VALUES ('foo'); +INSERT INTO stats (account_id, spent) VALUES ('foo', 100); -- connection worker and get ready for the tests \c - - - :worker_1_port SET search_path TO local_shard_execution; @@ -89,6 +106,50 @@ CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) R RETURN shard_is_local; END; $$ LANGUAGE plpgsql; +-- test case for issue #3556 +SET citus.log_intermediate_results TO TRUE; +SET client_min_messages TO DEBUG1; +SELECT * +FROM +( + WITH accounts_cte AS ( + SELECT id AS account_id + FROM accounts + ), + joined_stats_cte_1 AS ( + SELECT spent, account_id + FROM stats + INNER JOIN accounts_cte USING (account_id) + ), + joined_stats_cte_2 AS ( + SELECT spent, account_id + FROM joined_stats_cte_1 + INNER JOIN accounts_cte USING (account_id) + ) + SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) + FROM accounts_cte + INNER JOIN joined_stats_cte_2 USING (account_id) +) inner_query; +DEBUG: CTE joined_stats_cte_1 is going to be inlined via distributed planning +DEBUG: CTE joined_stats_cte_2 is going to be inlined via distributed planning +DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM local_shard_execution.accounts +DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM local_shard_execution.accounts +DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (local_shard_execution.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) +DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) +DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file +DEBUG: Subplan XXX_4 will be written to local file + sum +--------------------------------------------------------------------- + 100 +(1 row) + +SET citus.log_intermediate_results TO DEFAULT; +SET client_min_messages TO DEFAULT; -- pick some example values that reside on the shards locally and remote -- distribution key values of 1,6, 500 and 701 are LOCAL to shards, -- we'll use these values in the tests @@ -660,7 +721,7 @@ CREATE OR REPLACE PROCEDURE only_local_execution_with_function_evaluation() AS $ IF nodeId <= 0 THEN RAISE NOTICE 'unexpected node id'; END IF; - + -- regular router SELECT get_local_node_id_volatile() INTO nodeId FROM distributed_table d1 JOIN distributed_table d2 USING (key) WHERE d1.key = 1; IF nodeId <= 0 THEN @@ -701,7 +762,7 @@ CREATE OR REPLACE PROCEDURE only_local_execution_with_function_evaluation_param( IF nodeId <= 0 THEN RAISE NOTICE 'unexpected node id'; END IF; - + -- regular router SELECT get_local_node_id_volatile() INTO nodeId FROM distributed_table d1 JOIN distributed_table d2 USING (key) WHERE d1.key = $1; IF nodeId <= 0 THEN diff --git a/src/test/regress/expected/multi_read_from_secondaries.out b/src/test/regress/expected/multi_read_from_secondaries.out index cbab871e6..a7bc7c9a5 100644 --- a/src/test/regress/expected/multi_read_from_secondaries.out +++ b/src/test/regress/expected/multi_read_from_secondaries.out @@ -21,6 +21,7 @@ SELECT create_distributed_table('source_table', 'a'); INSERT INTO dest_table (a, b) VALUES (1, 1); INSERT INTO dest_table (a, b) VALUES (2, 1); +INSERT INTO source_table (a, b) VALUES (1, 5); INSERT INTO source_table (a, b) VALUES (10, 10); -- simluate actually having secondary nodes SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node; @@ -73,9 +74,95 @@ DEBUG: generating subplan XXX_1 for subquery SELECT a FROM (SELECT DISTINCT des DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) foo ORDER BY a a --------------------------------------------------------------------- -(0 rows) + 1 +(1 row) +-- intermediate result pruning should still work +SET citus.log_intermediate_results TO TRUE; +SELECT * +FROM +( + WITH dest_table_cte AS ( + SELECT a + FROM dest_table + ), + joined_source_table_cte_1 AS ( + SELECT b, a + FROM source_table + INNER JOIN dest_table_cte USING (a) + ), + joined_source_table_cte_2 AS ( + SELECT b, a + FROM joined_source_table_cte_1 + INNER JOIN dest_table_cte USING (a) + ) + SELECT SUM(b) OVER (PARTITION BY coalesce(a, NULL)) + FROM dest_table_cte + INNER JOIN joined_source_table_cte_2 USING (a) +) inner_query; +DEBUG: CTE joined_source_table_cte_1 is going to be inlined via distributed planning +DEBUG: CTE joined_source_table_cte_2 is going to be inlined via distributed planning +DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table +DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table +DEBUG: generating subplan XXX_2 for CTE joined_source_table_cte_1: SELECT source_table.b, source_table.a FROM (public.source_table JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a)) +DEBUG: generating subplan XXX_3 for CTE joined_source_table_cte_2: SELECT joined_source_table_cte_1.b, joined_source_table_cte_1.a FROM ((SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_1 JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a)) +DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_source_table_cte_2.b) OVER (PARTITION BY COALESCE(dest_table_cte.a, NULL::integer)) AS sum FROM ((SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte JOIN (SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_2 USING (a)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file +DEBUG: Subplan XXX_4 will be written to local file + sum +--------------------------------------------------------------------- + 5 +(1 row) + +-- confirm that the pruning works well when using round-robin as well +SET citus.task_assignment_policy to 'round-robin'; +SELECT * +FROM +( + WITH dest_table_cte AS ( + SELECT a + FROM dest_table + ), + joined_source_table_cte_1 AS ( + SELECT b, a + FROM source_table + INNER JOIN dest_table_cte USING (a) + ), + joined_source_table_cte_2 AS ( + SELECT b, a + FROM joined_source_table_cte_1 + INNER JOIN dest_table_cte USING (a) + ) + SELECT SUM(b) OVER (PARTITION BY coalesce(a, NULL)) + FROM dest_table_cte + INNER JOIN joined_source_table_cte_2 USING (a) +) inner_query; +DEBUG: CTE joined_source_table_cte_1 is going to be inlined via distributed planning +DEBUG: CTE joined_source_table_cte_2 is going to be inlined via distributed planning +DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table +DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table +DEBUG: generating subplan XXX_2 for CTE joined_source_table_cte_1: SELECT source_table.b, source_table.a FROM (public.source_table JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a)) +DEBUG: generating subplan XXX_3 for CTE joined_source_table_cte_2: SELECT joined_source_table_cte_1.b, joined_source_table_cte_1.a FROM ((SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_1 JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a)) +DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_source_table_cte_2.b) OVER (PARTITION BY COALESCE(dest_table_cte.a, NULL::integer)) AS sum FROM ((SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte JOIN (SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_2 USING (a)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx + sum +--------------------------------------------------------------------- + 5 +(1 row) + +SET citus.task_assignment_policy to DEFAULT; SET client_min_messages TO DEFAULT; +SET citus.log_intermediate_results TO DEFAULT; -- insert into is definitely not allowed INSERT INTO dest_table (a, b) SELECT a, b FROM source_table; @@ -83,4 +170,4 @@ ERROR: writing to worker nodes is not currently allowed DETAIL: citus.use_secondary_nodes is set to 'always' \c "dbname=regression options='-c\ citus.use_secondary_nodes=never'" UPDATE pg_dist_node SET noderole = 'primary'; -DROP TABLE dest_table; +DROP TABLE source_table, dest_table; diff --git a/src/test/regress/sql/intermediate_result_pruning.sql b/src/test/regress/sql/intermediate_result_pruning.sql index 3300f113f..a0fe32145 100644 --- a/src/test/regress/sql/intermediate_result_pruning.sql +++ b/src/test/regress/sql/intermediate_result_pruning.sql @@ -551,7 +551,65 @@ WHERE range_column IN ('A', 'E') AND range_partitioned.data IN (SELECT data FROM some_data); + +-- test case for issue #3556 +CREATE TABLE accounts (id text PRIMARY KEY); +CREATE TABLE stats (account_id text PRIMARY KEY, spent int); + +SELECT create_distributed_table('accounts', 'id', colocate_with => 'none'); +SELECT create_distributed_table('stats', 'account_id', colocate_with => 'accounts'); + +INSERT INTO accounts (id) VALUES ('foo'); +INSERT INTO stats (account_id, spent) VALUES ('foo', 100); + +SELECT * +FROM +( + WITH accounts_cte AS ( + SELECT id AS account_id + FROM accounts + ), + joined_stats_cte_1 AS ( + SELECT spent, account_id + FROM stats + INNER JOIN accounts_cte USING (account_id) + ), + joined_stats_cte_2 AS ( + SELECT spent, account_id + FROM joined_stats_cte_1 + INNER JOIN accounts_cte USING (account_id) + ) + SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) + FROM accounts_cte + INNER JOIN joined_stats_cte_2 USING (account_id) +) inner_query; + +-- confirm that the pruning works well when using round-robin as well +SET citus.task_assignment_policy to 'round-robin'; +SELECT * +FROM +( + WITH accounts_cte AS ( + SELECT id AS account_id + FROM accounts + ), + joined_stats_cte_1 AS ( + SELECT spent, account_id + FROM stats + INNER JOIN accounts_cte USING (account_id) + ), + joined_stats_cte_2 AS ( + SELECT spent, account_id + FROM joined_stats_cte_1 + INNER JOIN accounts_cte USING (account_id) + ) + SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) + FROM accounts_cte + INNER JOIN joined_stats_cte_2 USING (account_id) +) inner_query; + +SET citus.task_assignment_policy to DEFAULT; SET client_min_messages TO DEFAULT; -DROP TABLE table_1, table_2, table_3, ref_table, range_partitioned; +DROP TABLE table_1, table_2, table_3, ref_table, accounts, stats, range_partitioned; DROP SCHEMA intermediate_result_pruning; diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index 50fb07b06..78505463c 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -46,6 +46,16 @@ BEGIN END; $$ language plpgsql VOLATILE; SELECT create_distributed_function('get_local_node_id_volatile()'); +-- test case for issue #3556 +CREATE TABLE accounts (id text PRIMARY KEY); +CREATE TABLE stats (account_id text PRIMARY KEY, spent int); + +SELECT create_distributed_table('accounts', 'id', colocate_with => 'none'); +SELECT create_distributed_table('stats', 'account_id', colocate_with => 'accounts'); + +INSERT INTO accounts (id) VALUES ('foo'); +INSERT INTO stats (account_id, spent) VALUES ('foo', 100); + -- connection worker and get ready for the tests \c - - - :worker_1_port SET search_path TO local_shard_execution; @@ -76,6 +86,35 @@ CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) R END; $$ LANGUAGE plpgsql; +-- test case for issue #3556 +SET citus.log_intermediate_results TO TRUE; +SET client_min_messages TO DEBUG1; + +SELECT * +FROM +( + WITH accounts_cte AS ( + SELECT id AS account_id + FROM accounts + ), + joined_stats_cte_1 AS ( + SELECT spent, account_id + FROM stats + INNER JOIN accounts_cte USING (account_id) + ), + joined_stats_cte_2 AS ( + SELECT spent, account_id + FROM joined_stats_cte_1 + INNER JOIN accounts_cte USING (account_id) + ) + SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) + FROM accounts_cte + INNER JOIN joined_stats_cte_2 USING (account_id) +) inner_query; + +SET citus.log_intermediate_results TO DEFAULT; +SET client_min_messages TO DEFAULT; + -- pick some example values that reside on the shards locally and remote -- distribution key values of 1,6, 500 and 701 are LOCAL to shards, diff --git a/src/test/regress/sql/multi_read_from_secondaries.sql b/src/test/regress/sql/multi_read_from_secondaries.sql index 641ef9976..3f530561d 100644 --- a/src/test/regress/sql/multi_read_from_secondaries.sql +++ b/src/test/regress/sql/multi_read_from_secondaries.sql @@ -16,6 +16,7 @@ SELECT create_distributed_table('source_table', 'a'); INSERT INTO dest_table (a, b) VALUES (1, 1); INSERT INTO dest_table (a, b) VALUES (2, 1); +INSERT INTO source_table (a, b) VALUES (1, 5); INSERT INTO source_table (a, b) VALUES (10, 10); -- simluate actually having secondary nodes @@ -49,7 +50,59 @@ FROM dest_table.b IN (1,2,3,4) ) SELECT * FROM cte ORDER BY 1 DESC LIMIT 5 ) as foo ORDER BY 1; + +-- intermediate result pruning should still work +SET citus.log_intermediate_results TO TRUE; + +SELECT * +FROM +( + WITH dest_table_cte AS ( + SELECT a + FROM dest_table + ), + joined_source_table_cte_1 AS ( + SELECT b, a + FROM source_table + INNER JOIN dest_table_cte USING (a) + ), + joined_source_table_cte_2 AS ( + SELECT b, a + FROM joined_source_table_cte_1 + INNER JOIN dest_table_cte USING (a) + ) + SELECT SUM(b) OVER (PARTITION BY coalesce(a, NULL)) + FROM dest_table_cte + INNER JOIN joined_source_table_cte_2 USING (a) +) inner_query; + +-- confirm that the pruning works well when using round-robin as well +SET citus.task_assignment_policy to 'round-robin'; +SELECT * +FROM +( + WITH dest_table_cte AS ( + SELECT a + FROM dest_table + ), + joined_source_table_cte_1 AS ( + SELECT b, a + FROM source_table + INNER JOIN dest_table_cte USING (a) + ), + joined_source_table_cte_2 AS ( + SELECT b, a + FROM joined_source_table_cte_1 + INNER JOIN dest_table_cte USING (a) + ) + SELECT SUM(b) OVER (PARTITION BY coalesce(a, NULL)) + FROM dest_table_cte + INNER JOIN joined_source_table_cte_2 USING (a) +) inner_query; + +SET citus.task_assignment_policy to DEFAULT; SET client_min_messages TO DEFAULT; +SET citus.log_intermediate_results TO DEFAULT; -- insert into is definitely not allowed INSERT INTO dest_table (a, b) @@ -57,4 +110,4 @@ INSERT INTO dest_table (a, b) \c "dbname=regression options='-c\ citus.use_secondary_nodes=never'" UPDATE pg_dist_node SET noderole = 'primary'; -DROP TABLE dest_table; +DROP TABLE source_table, dest_table;