From c0ad44f97588be2bb3b981168bd9eb07704284f0 Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Tue, 3 Mar 2020 11:36:49 +0300 Subject: [PATCH] Fix early exit bug on intermediate result pruning There are 2 problems with our early exit strategy that this commit fixes: 1- When we decide that a subplan results are sent to all worker nodes, we used to skip traversing the whole distributed plan, instead of skipping only the subplan. 2- We used to consider all available nodes in the cluster (secondaries and inactive nodes as well as active primaries) when deciding on early exit strategy. This resulted in failures to early exit when there are secondaries or inactive nodes. --- .../planner/intermediate_result_pruning.c | 11 +- .../expected/intermediate_result_pruning.out | 101 +++++++++++++++++- .../expected/local_shard_execution.out | 65 ++++++++++- .../expected/multi_read_from_secondaries.out | 91 +++++++++++++++- .../sql/intermediate_result_pruning.sql | 60 ++++++++++- .../regress/sql/local_shard_execution.sql | 39 +++++++ .../sql/multi_read_from_secondaries.sql | 55 +++++++++- 7 files changed, 410 insertions(+), 12 deletions(-) diff --git a/src/backend/distributed/planner/intermediate_result_pruning.c b/src/backend/distributed/planner/intermediate_result_pruning.c index f01595d6b..f5809d59a 100644 --- a/src/backend/distributed/planner/intermediate_result_pruning.c +++ b/src/backend/distributed/planner/intermediate_result_pruning.c @@ -106,7 +106,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList; List *subPlanList = distributedPlan->subPlanList; ListCell *subPlanCell = NULL; - int workerNodeCount = GetWorkerNodeCount(); + int workerNodeCount = ActiveReadableWorkerNodeCount(); foreach(subPlanCell, usedSubPlanNodeList) { @@ -124,13 +124,14 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, } /* - * There is no need to traverse the whole plan if the intermediate result - * will be written to a local file and send to all nodes + * There is no need to traverse the subplan if the intermediate result + * will be written to a local file and sent to all nodes. Note that the + * remaining subplans in the distributed plan should still be traversed. */ if (list_length(entry->nodeIdList) == workerNodeCount && entry->writeLocalFile) { elog(DEBUG4, "Subplan %s is used in all workers", resultId); - break; + continue; } else if (usedPlan->locationMask & SUBPLAN_ACCESS_REMOTE) { @@ -139,7 +140,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, * * If we have reference tables in the distributed plan, all the * workers will be in the node list. We can improve intermediate result - * pruning by deciding which reference table shard will be accessed earlier + * pruning by deciding which reference table shard will be accessed earlier. */ AppendAllAccessedWorkerNodes(entry, distributedPlan, workerNodeCount); diff --git a/src/test/regress/expected/intermediate_result_pruning.out b/src/test/regress/expected/intermediate_result_pruning.out index 49c9955a4..e499e4893 100644 --- a/src/test/regress/expected/intermediate_result_pruning.out +++ b/src/test/regress/expected/intermediate_result_pruning.out @@ -933,6 +933,105 @@ DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx 0 (1 row) +-- test case for issue #3556 +CREATE TABLE accounts (id text PRIMARY KEY); +DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially +DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "accounts_pkey" for table "accounts" +DEBUG: building index "accounts_pkey" on table "accounts" serially +CREATE TABLE stats (account_id text PRIMARY KEY, spent int); +DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially +DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "stats_pkey" for table "stats" +DEBUG: building index "stats_pkey" on table "stats" serially +SELECT create_distributed_table('accounts', 'id', colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('stats', 'account_id', colocate_with => 'accounts'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO accounts (id) VALUES ('foo'); +INSERT INTO stats (account_id, spent) VALUES ('foo', 100); +SELECT * +FROM +( + WITH accounts_cte AS ( + SELECT id AS account_id + FROM accounts + ), + joined_stats_cte_1 AS ( + SELECT spent, account_id + FROM stats + INNER JOIN accounts_cte USING (account_id) + ), + joined_stats_cte_2 AS ( + SELECT spent, account_id + FROM joined_stats_cte_1 + INNER JOIN accounts_cte USING (account_id) + ) + SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) + FROM accounts_cte + INNER JOIN joined_stats_cte_2 USING (account_id) +) inner_query; +DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM intermediate_result_pruning.accounts +DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (intermediate_result_pruning.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) +DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) +DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file +DEBUG: Subplan XXX_4 will be written to local file + sum +--------------------------------------------------------------------- + 100 +(1 row) + +-- confirm that the pruning works well when using round-robin as well +SET citus.task_assignment_policy to 'round-robin'; +SELECT * +FROM +( + WITH accounts_cte AS ( + SELECT id AS account_id + FROM accounts + ), + joined_stats_cte_1 AS ( + SELECT spent, account_id + FROM stats + INNER JOIN accounts_cte USING (account_id) + ), + joined_stats_cte_2 AS ( + SELECT spent, account_id + FROM joined_stats_cte_1 + INNER JOIN accounts_cte USING (account_id) + ) + SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) + FROM accounts_cte + INNER JOIN joined_stats_cte_2 USING (account_id) +) inner_query; +DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM intermediate_result_pruning.accounts +DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (intermediate_result_pruning.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) +DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) +DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx + sum +--------------------------------------------------------------------- + 100 +(1 row) + +SET citus.task_assignment_policy to DEFAULT; SET client_min_messages TO DEFAULT; -DROP TABLE table_1, table_2, table_3, ref_table, range_partitioned; +DROP TABLE table_1, table_2, table_3, ref_table, accounts, stats, range_partitioned; DROP SCHEMA intermediate_result_pruning; diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 22d79b0f7..c604a3c76 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -61,6 +61,23 @@ SELECT create_distributed_function('get_local_node_id_volatile()'); (1 row) +-- test case for issue #3556 +CREATE TABLE accounts (id text PRIMARY KEY); +CREATE TABLE stats (account_id text PRIMARY KEY, spent int); +SELECT create_distributed_table('accounts', 'id', colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('stats', 'account_id', colocate_with => 'accounts'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO accounts (id) VALUES ('foo'); +INSERT INTO stats (account_id, spent) VALUES ('foo', 100); -- connection worker and get ready for the tests \c - - - :worker_1_port SET search_path TO local_shard_execution; @@ -89,6 +106,50 @@ CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) R RETURN shard_is_local; END; $$ LANGUAGE plpgsql; +-- test case for issue #3556 +SET citus.log_intermediate_results TO TRUE; +SET client_min_messages TO DEBUG1; +SELECT * +FROM +( + WITH accounts_cte AS ( + SELECT id AS account_id + FROM accounts + ), + joined_stats_cte_1 AS ( + SELECT spent, account_id + FROM stats + INNER JOIN accounts_cte USING (account_id) + ), + joined_stats_cte_2 AS ( + SELECT spent, account_id + FROM joined_stats_cte_1 + INNER JOIN accounts_cte USING (account_id) + ) + SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) + FROM accounts_cte + INNER JOIN joined_stats_cte_2 USING (account_id) +) inner_query; +DEBUG: CTE joined_stats_cte_1 is going to be inlined via distributed planning +DEBUG: CTE joined_stats_cte_2 is going to be inlined via distributed planning +DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM local_shard_execution.accounts +DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM local_shard_execution.accounts +DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (local_shard_execution.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) +DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) +DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file +DEBUG: Subplan XXX_4 will be written to local file + sum +--------------------------------------------------------------------- + 100 +(1 row) + +SET citus.log_intermediate_results TO DEFAULT; +SET client_min_messages TO DEFAULT; -- pick some example values that reside on the shards locally and remote -- distribution key values of 1,6, 500 and 701 are LOCAL to shards, -- we'll use these values in the tests @@ -660,7 +721,7 @@ CREATE OR REPLACE PROCEDURE only_local_execution_with_function_evaluation() AS $ IF nodeId <= 0 THEN RAISE NOTICE 'unexpected node id'; END IF; - + -- regular router SELECT get_local_node_id_volatile() INTO nodeId FROM distributed_table d1 JOIN distributed_table d2 USING (key) WHERE d1.key = 1; IF nodeId <= 0 THEN @@ -701,7 +762,7 @@ CREATE OR REPLACE PROCEDURE only_local_execution_with_function_evaluation_param( IF nodeId <= 0 THEN RAISE NOTICE 'unexpected node id'; END IF; - + -- regular router SELECT get_local_node_id_volatile() INTO nodeId FROM distributed_table d1 JOIN distributed_table d2 USING (key) WHERE d1.key = $1; IF nodeId <= 0 THEN diff --git a/src/test/regress/expected/multi_read_from_secondaries.out b/src/test/regress/expected/multi_read_from_secondaries.out index cbab871e6..a7bc7c9a5 100644 --- a/src/test/regress/expected/multi_read_from_secondaries.out +++ b/src/test/regress/expected/multi_read_from_secondaries.out @@ -21,6 +21,7 @@ SELECT create_distributed_table('source_table', 'a'); INSERT INTO dest_table (a, b) VALUES (1, 1); INSERT INTO dest_table (a, b) VALUES (2, 1); +INSERT INTO source_table (a, b) VALUES (1, 5); INSERT INTO source_table (a, b) VALUES (10, 10); -- simluate actually having secondary nodes SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node; @@ -73,9 +74,95 @@ DEBUG: generating subplan XXX_1 for subquery SELECT a FROM (SELECT DISTINCT des DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) foo ORDER BY a a --------------------------------------------------------------------- -(0 rows) + 1 +(1 row) +-- intermediate result pruning should still work +SET citus.log_intermediate_results TO TRUE; +SELECT * +FROM +( + WITH dest_table_cte AS ( + SELECT a + FROM dest_table + ), + joined_source_table_cte_1 AS ( + SELECT b, a + FROM source_table + INNER JOIN dest_table_cte USING (a) + ), + joined_source_table_cte_2 AS ( + SELECT b, a + FROM joined_source_table_cte_1 + INNER JOIN dest_table_cte USING (a) + ) + SELECT SUM(b) OVER (PARTITION BY coalesce(a, NULL)) + FROM dest_table_cte + INNER JOIN joined_source_table_cte_2 USING (a) +) inner_query; +DEBUG: CTE joined_source_table_cte_1 is going to be inlined via distributed planning +DEBUG: CTE joined_source_table_cte_2 is going to be inlined via distributed planning +DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table +DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table +DEBUG: generating subplan XXX_2 for CTE joined_source_table_cte_1: SELECT source_table.b, source_table.a FROM (public.source_table JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a)) +DEBUG: generating subplan XXX_3 for CTE joined_source_table_cte_2: SELECT joined_source_table_cte_1.b, joined_source_table_cte_1.a FROM ((SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_1 JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a)) +DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_source_table_cte_2.b) OVER (PARTITION BY COALESCE(dest_table_cte.a, NULL::integer)) AS sum FROM ((SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte JOIN (SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_2 USING (a)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file +DEBUG: Subplan XXX_4 will be written to local file + sum +--------------------------------------------------------------------- + 5 +(1 row) + +-- confirm that the pruning works well when using round-robin as well +SET citus.task_assignment_policy to 'round-robin'; +SELECT * +FROM +( + WITH dest_table_cte AS ( + SELECT a + FROM dest_table + ), + joined_source_table_cte_1 AS ( + SELECT b, a + FROM source_table + INNER JOIN dest_table_cte USING (a) + ), + joined_source_table_cte_2 AS ( + SELECT b, a + FROM joined_source_table_cte_1 + INNER JOIN dest_table_cte USING (a) + ) + SELECT SUM(b) OVER (PARTITION BY coalesce(a, NULL)) + FROM dest_table_cte + INNER JOIN joined_source_table_cte_2 USING (a) +) inner_query; +DEBUG: CTE joined_source_table_cte_1 is going to be inlined via distributed planning +DEBUG: CTE joined_source_table_cte_2 is going to be inlined via distributed planning +DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table +DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table +DEBUG: generating subplan XXX_2 for CTE joined_source_table_cte_1: SELECT source_table.b, source_table.a FROM (public.source_table JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a)) +DEBUG: generating subplan XXX_3 for CTE joined_source_table_cte_2: SELECT joined_source_table_cte_1.b, joined_source_table_cte_1.a FROM ((SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_1 JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a)) +DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_source_table_cte_2.b) OVER (PARTITION BY COALESCE(dest_table_cte.a, NULL::integer)) AS sum FROM ((SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte JOIN (SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_2 USING (a)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx + sum +--------------------------------------------------------------------- + 5 +(1 row) + +SET citus.task_assignment_policy to DEFAULT; SET client_min_messages TO DEFAULT; +SET citus.log_intermediate_results TO DEFAULT; -- insert into is definitely not allowed INSERT INTO dest_table (a, b) SELECT a, b FROM source_table; @@ -83,4 +170,4 @@ ERROR: writing to worker nodes is not currently allowed DETAIL: citus.use_secondary_nodes is set to 'always' \c "dbname=regression options='-c\ citus.use_secondary_nodes=never'" UPDATE pg_dist_node SET noderole = 'primary'; -DROP TABLE dest_table; +DROP TABLE source_table, dest_table; diff --git a/src/test/regress/sql/intermediate_result_pruning.sql b/src/test/regress/sql/intermediate_result_pruning.sql index 3300f113f..a0fe32145 100644 --- a/src/test/regress/sql/intermediate_result_pruning.sql +++ b/src/test/regress/sql/intermediate_result_pruning.sql @@ -551,7 +551,65 @@ WHERE range_column IN ('A', 'E') AND range_partitioned.data IN (SELECT data FROM some_data); + +-- test case for issue #3556 +CREATE TABLE accounts (id text PRIMARY KEY); +CREATE TABLE stats (account_id text PRIMARY KEY, spent int); + +SELECT create_distributed_table('accounts', 'id', colocate_with => 'none'); +SELECT create_distributed_table('stats', 'account_id', colocate_with => 'accounts'); + +INSERT INTO accounts (id) VALUES ('foo'); +INSERT INTO stats (account_id, spent) VALUES ('foo', 100); + +SELECT * +FROM +( + WITH accounts_cte AS ( + SELECT id AS account_id + FROM accounts + ), + joined_stats_cte_1 AS ( + SELECT spent, account_id + FROM stats + INNER JOIN accounts_cte USING (account_id) + ), + joined_stats_cte_2 AS ( + SELECT spent, account_id + FROM joined_stats_cte_1 + INNER JOIN accounts_cte USING (account_id) + ) + SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) + FROM accounts_cte + INNER JOIN joined_stats_cte_2 USING (account_id) +) inner_query; + +-- confirm that the pruning works well when using round-robin as well +SET citus.task_assignment_policy to 'round-robin'; +SELECT * +FROM +( + WITH accounts_cte AS ( + SELECT id AS account_id + FROM accounts + ), + joined_stats_cte_1 AS ( + SELECT spent, account_id + FROM stats + INNER JOIN accounts_cte USING (account_id) + ), + joined_stats_cte_2 AS ( + SELECT spent, account_id + FROM joined_stats_cte_1 + INNER JOIN accounts_cte USING (account_id) + ) + SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) + FROM accounts_cte + INNER JOIN joined_stats_cte_2 USING (account_id) +) inner_query; + +SET citus.task_assignment_policy to DEFAULT; SET client_min_messages TO DEFAULT; -DROP TABLE table_1, table_2, table_3, ref_table, range_partitioned; +DROP TABLE table_1, table_2, table_3, ref_table, accounts, stats, range_partitioned; DROP SCHEMA intermediate_result_pruning; diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index 50fb07b06..78505463c 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -46,6 +46,16 @@ BEGIN END; $$ language plpgsql VOLATILE; SELECT create_distributed_function('get_local_node_id_volatile()'); +-- test case for issue #3556 +CREATE TABLE accounts (id text PRIMARY KEY); +CREATE TABLE stats (account_id text PRIMARY KEY, spent int); + +SELECT create_distributed_table('accounts', 'id', colocate_with => 'none'); +SELECT create_distributed_table('stats', 'account_id', colocate_with => 'accounts'); + +INSERT INTO accounts (id) VALUES ('foo'); +INSERT INTO stats (account_id, spent) VALUES ('foo', 100); + -- connection worker and get ready for the tests \c - - - :worker_1_port SET search_path TO local_shard_execution; @@ -76,6 +86,35 @@ CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) R END; $$ LANGUAGE plpgsql; +-- test case for issue #3556 +SET citus.log_intermediate_results TO TRUE; +SET client_min_messages TO DEBUG1; + +SELECT * +FROM +( + WITH accounts_cte AS ( + SELECT id AS account_id + FROM accounts + ), + joined_stats_cte_1 AS ( + SELECT spent, account_id + FROM stats + INNER JOIN accounts_cte USING (account_id) + ), + joined_stats_cte_2 AS ( + SELECT spent, account_id + FROM joined_stats_cte_1 + INNER JOIN accounts_cte USING (account_id) + ) + SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) + FROM accounts_cte + INNER JOIN joined_stats_cte_2 USING (account_id) +) inner_query; + +SET citus.log_intermediate_results TO DEFAULT; +SET client_min_messages TO DEFAULT; + -- pick some example values that reside on the shards locally and remote -- distribution key values of 1,6, 500 and 701 are LOCAL to shards, diff --git a/src/test/regress/sql/multi_read_from_secondaries.sql b/src/test/regress/sql/multi_read_from_secondaries.sql index 641ef9976..3f530561d 100644 --- a/src/test/regress/sql/multi_read_from_secondaries.sql +++ b/src/test/regress/sql/multi_read_from_secondaries.sql @@ -16,6 +16,7 @@ SELECT create_distributed_table('source_table', 'a'); INSERT INTO dest_table (a, b) VALUES (1, 1); INSERT INTO dest_table (a, b) VALUES (2, 1); +INSERT INTO source_table (a, b) VALUES (1, 5); INSERT INTO source_table (a, b) VALUES (10, 10); -- simluate actually having secondary nodes @@ -49,7 +50,59 @@ FROM dest_table.b IN (1,2,3,4) ) SELECT * FROM cte ORDER BY 1 DESC LIMIT 5 ) as foo ORDER BY 1; + +-- intermediate result pruning should still work +SET citus.log_intermediate_results TO TRUE; + +SELECT * +FROM +( + WITH dest_table_cte AS ( + SELECT a + FROM dest_table + ), + joined_source_table_cte_1 AS ( + SELECT b, a + FROM source_table + INNER JOIN dest_table_cte USING (a) + ), + joined_source_table_cte_2 AS ( + SELECT b, a + FROM joined_source_table_cte_1 + INNER JOIN dest_table_cte USING (a) + ) + SELECT SUM(b) OVER (PARTITION BY coalesce(a, NULL)) + FROM dest_table_cte + INNER JOIN joined_source_table_cte_2 USING (a) +) inner_query; + +-- confirm that the pruning works well when using round-robin as well +SET citus.task_assignment_policy to 'round-robin'; +SELECT * +FROM +( + WITH dest_table_cte AS ( + SELECT a + FROM dest_table + ), + joined_source_table_cte_1 AS ( + SELECT b, a + FROM source_table + INNER JOIN dest_table_cte USING (a) + ), + joined_source_table_cte_2 AS ( + SELECT b, a + FROM joined_source_table_cte_1 + INNER JOIN dest_table_cte USING (a) + ) + SELECT SUM(b) OVER (PARTITION BY coalesce(a, NULL)) + FROM dest_table_cte + INNER JOIN joined_source_table_cte_2 USING (a) +) inner_query; + +SET citus.task_assignment_policy to DEFAULT; SET client_min_messages TO DEFAULT; +SET citus.log_intermediate_results TO DEFAULT; -- insert into is definitely not allowed INSERT INTO dest_table (a, b) @@ -57,4 +110,4 @@ INSERT INTO dest_table (a, b) \c "dbname=regression options='-c\ citus.use_secondary_nodes=never'" UPDATE pg_dist_node SET noderole = 'primary'; -DROP TABLE dest_table; +DROP TABLE source_table, dest_table;