From 24e60b44a1d0ea95859ad7379b7a6557862f173e Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Thu, 14 Jan 2021 12:34:45 +0300 Subject: [PATCH] Consider coordinator in intermediate result optimization It seems that we were not considering the case where coordinator was added to the cluster as a worker in the optimization of intermediate results. This could lead to errors when coordinator was added as a worker. --- .../distributed/planner/distributed_planner.c | 1 - .../planner/intermediate_result_pruning.c | 4 +- .../expected/coordinator_shouldhaveshards.out | 103 +++++++++- .../regress/expected/local_shard_copy.out | 8 +- .../mx_coordinator_shouldhaveshards.out | 185 ++++++++++++++++++ src/test/regress/multi_mx_schedule | 1 + .../sql/coordinator_shouldhaveshards.sql | 44 +++++ src/test/regress/sql/local_shard_copy.sql | 2 +- .../sql/mx_coordinator_shouldhaveshards.sql | 94 +++++++++ 9 files changed, 433 insertions(+), 9 deletions(-) create mode 100644 src/test/regress/expected/mx_coordinator_shouldhaveshards.out create mode 100644 src/test/regress/sql/mx_coordinator_shouldhaveshards.sql diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index a0c27fc66..7d94939be 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -158,7 +158,6 @@ distributed_planner(Query *parse, } int rteIdCounter = 1; - DistributedPlanningContext planContext = { .query = parse, .cursorOptions = cursorOptions, diff --git a/src/backend/distributed/planner/intermediate_result_pruning.c b/src/backend/distributed/planner/intermediate_result_pruning.c index 1aa7fe7fd..6ec08aad0 100644 --- a/src/backend/distributed/planner/intermediate_result_pruning.c +++ b/src/backend/distributed/planner/intermediate_result_pruning.c @@ -154,7 +154,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList; List *subPlanList = distributedPlan->subPlanList; ListCell *subPlanCell = NULL; - int workerNodeCount = ActiveReadableNonCoordinatorNodeCount(); + int workerNodeCount = list_length(ActiveReadableNodeList()); foreach(subPlanCell, usedSubPlanNodeList) { @@ -272,7 +272,7 @@ AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, static void AppendAllWorkerNodes(IntermediateResultsHashEntry *entry) { - List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); + List *workerNodeList = ActiveReadableNodeList(); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 319bc0e93..7f8e03834 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -778,6 +778,107 @@ SELECT table_name, citus_table_type, distribution_column, shard_count FROM publi (1 row) SET client_min_messages TO DEFAULT; +-- issue 4508 table_1 and table_2 are used to test +-- some edge cases around intermediate result pruning +CREATE TABLE table_1 (key int, value text); +SELECT create_distributed_table('table_1', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table_2 (key int, value text); +SELECT create_distributed_table('table_2', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_1_1503102 AS citus_table_alias (key, value) VALUES (1,'1'::text) +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_1_1503105 AS citus_table_alias (key, value) VALUES (2,'2'::text) +INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_2_1503106 AS citus_table_alias (key, value) VALUES (1,'1'::text), (5,'5'::text) +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_2_1503109 AS citus_table_alias (key, value) VALUES (2,'2'::text) +SET citus.log_intermediate_results TO ON; +SET client_min_messages to debug1; +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 +NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 + count | key +--------------------------------------------------------------------- + 1 | 1 +(1 row) + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint +DEBUG: Subplan XXX_2 will be written to local file +NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 +NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 +NOTICE: executing the command locally: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +NOTICE: executing the copy locally for shard xxxxx +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; +DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1 +DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO coordinator_shouldhaveshards.table_2 (key, value) SELECT key, count(*) AS count FROM coordinator_shouldhaveshards.table_1 WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +NOTICE: executing the command locally: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true +NOTICE: executing the command locally: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Collecting INSERT ... SELECT results on coordinator +NOTICE: executing the command locally: SELECT worker_column_1 AS key, (count(*))::text AS value FROM (SELECT table_1.key AS worker_column_1 FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE (table_1.key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats))) worker_subquery GROUP BY worker_column_1 HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT worker_column_1 AS key, (count(*))::text AS value FROM (SELECT table_1.key AS worker_column_1 FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE (table_1.key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats))) worker_subquery GROUP BY worker_column_1 HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts + count +--------------------------------------------------------------------- + 0 +(1 row) + +RESET client_min_messages; \set VERBOSITY terse DROP TABLE ref_table; NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_table_xxxxx CASCADE @@ -788,7 +889,7 @@ DROP TABLE ref; NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE DROP TABLE test_append_table; DROP SCHEMA coordinator_shouldhaveshards CASCADE; -NOTICE: drop cascades to 13 other objects +NOTICE: drop cascades to 19 other objects SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index 6a29ce673..9d856653c 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -3,10 +3,10 @@ SET search_path TO local_shard_copy; SET client_min_messages TO DEBUG; SET citus.next_shard_id TO 1570000; SET citus.replicate_reference_tables_on_activate TO off; -SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); - master_add_node +SELECT 1 FROM master_add_node('localhost', :master_port, groupid := 0); + ?column? --------------------------------------------------------------------- - 32 + 1 (1 row) SET citus.shard_count TO 4; @@ -485,7 +485,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- shard creation should be done locally SELECT create_reference_table('ref_table'); -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'CREATE TABLE local_shard_copy.ref_table (a integer)');SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'ALTER TABLE local_shard_copy.ref_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'CREATE TABLE local_shard_copy.ref_table (a integer) ');SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'ALTER TABLE local_shard_copy.ref_table OWNER TO postgres') NOTICE: executing the copy locally for shard xxxxx NOTICE: Copying data from local table... NOTICE: copying the data has completed diff --git a/src/test/regress/expected/mx_coordinator_shouldhaveshards.out b/src/test/regress/expected/mx_coordinator_shouldhaveshards.out new file mode 100644 index 000000000..5b5a87f05 --- /dev/null +++ b/src/test/regress/expected/mx_coordinator_shouldhaveshards.out @@ -0,0 +1,185 @@ +CREATE SCHEMA mx_coordinator_shouldhaveshards; +SET search_path TO mx_coordinator_shouldhaveshards; +SET citus.shard_replication_factor to 1; +SET citus.replication_model TO streaming; +SET client_min_messages TO WARNING; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +RESET client_min_messages; +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- issue 4508 table_1 and table_2 are used to test some edge cases +-- around intermediate result pruning +CREATE TABLE table_1 (key int, value text); +SELECT create_distributed_table('table_1', 'key', colocate_with := 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table_2 (key int, value text); +SELECT create_distributed_table('table_2', 'key', colocate_with := 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); +set citus.log_intermediate_results TO ON; +set client_min_messages to debug1; +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + count | key +--------------------------------------------------------------------- + 1 | 1 +(1 row) + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; +DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM mx_coordinator_shouldhaveshards.table_1 +DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO mx_coordinator_shouldhaveshards.table_2 (key, value) SELECT key, count(*) AS count FROM mx_coordinator_shouldhaveshards.table_1 WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Collecting INSERT ... SELECT results on coordinator + count +--------------------------------------------------------------------- + 0 +(1 row) + +\c - - - :worker_1_port +SET search_path TO mx_coordinator_shouldhaveshards; +set citus.log_intermediate_results TO ON; +set client_min_messages to debug1; +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + count | key +--------------------------------------------------------------------- + 1 | 1 +(1 row) + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; +DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM mx_coordinator_shouldhaveshards.table_1 +DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO mx_coordinator_shouldhaveshards.table_2 (key, value) SELECT key, count(*) AS count FROM mx_coordinator_shouldhaveshards.table_1 WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Collecting INSERT ... SELECT results on coordinator + count +--------------------------------------------------------------------- + 0 +(1 row) + +\c - - - :master_port +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +DROP SCHEMA mx_coordinator_shouldhaveshards CASCADE; +NOTICE: drop cascades to 6 other objects +DETAIL: drop cascades to table mx_coordinator_shouldhaveshards.table_1 +drop cascades to table mx_coordinator_shouldhaveshards.table_1_1130052 +drop cascades to table mx_coordinator_shouldhaveshards.table_1_1130055 +drop cascades to table mx_coordinator_shouldhaveshards.table_2 +drop cascades to table mx_coordinator_shouldhaveshards.table_2_1130056 +drop cascades to table mx_coordinator_shouldhaveshards.table_2_1130059 +SELECT master_remove_node('localhost', :master_port); + master_remove_node +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 5a1f1f7e2..ac5206d4b 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -26,6 +26,7 @@ test: multi_mx_hide_shard_names test: multi_mx_add_coordinator test: multi_mx_modifications_to_reference_tables test: multi_mx_partitioning +test: mx_coordinator_shouldhaveshards test: multi_mx_copy_data multi_mx_router_planner test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10 test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19 diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 5362db9c1..7501be265 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -323,6 +323,50 @@ SELECT table_name, citus_table_type, distribution_column, shard_count FROM publi SET client_min_messages TO DEFAULT; +-- issue 4508 table_1 and table_2 are used to test +-- some edge cases around intermediate result pruning +CREATE TABLE table_1 (key int, value text); +SELECT create_distributed_table('table_1', 'key'); + +CREATE TABLE table_2 (key int, value text); +SELECT create_distributed_table('table_2', 'key'); + +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); + +SET citus.log_intermediate_results TO ON; +SET client_min_messages to debug1; +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; + +RESET client_min_messages; + + \set VERBOSITY terse DROP TABLE ref_table; diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql index ca29bff7f..0f2535c73 100644 --- a/src/test/regress/sql/local_shard_copy.sql +++ b/src/test/regress/sql/local_shard_copy.sql @@ -5,7 +5,7 @@ SET client_min_messages TO DEBUG; SET citus.next_shard_id TO 1570000; SET citus.replicate_reference_tables_on_activate TO off; -SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); +SELECT 1 FROM master_add_node('localhost', :master_port, groupid := 0); SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; diff --git a/src/test/regress/sql/mx_coordinator_shouldhaveshards.sql b/src/test/regress/sql/mx_coordinator_shouldhaveshards.sql new file mode 100644 index 000000000..377b6acbe --- /dev/null +++ b/src/test/regress/sql/mx_coordinator_shouldhaveshards.sql @@ -0,0 +1,94 @@ +CREATE SCHEMA mx_coordinator_shouldhaveshards; +SET search_path TO mx_coordinator_shouldhaveshards; + +SET citus.shard_replication_factor to 1; +SET citus.replication_model TO streaming; +SET client_min_messages TO WARNING; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); +RESET client_min_messages; + +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + +-- issue 4508 table_1 and table_2 are used to test some edge cases +-- around intermediate result pruning +CREATE TABLE table_1 (key int, value text); +SELECT create_distributed_table('table_1', 'key', colocate_with := 'none'); + +CREATE TABLE table_2 (key int, value text); +SELECT create_distributed_table('table_2', 'key', colocate_with := 'none'); + +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); + +set citus.log_intermediate_results TO ON; +set client_min_messages to debug1; + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; + +\c - - - :worker_1_port +SET search_path TO mx_coordinator_shouldhaveshards; + +set citus.log_intermediate_results TO ON; +set client_min_messages to debug1; + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; + +\c - - - :master_port + +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); + +DROP SCHEMA mx_coordinator_shouldhaveshards CASCADE; + +SELECT master_remove_node('localhost', :master_port);