From d97d03ec65a01aa1e24a9261116aee48cf1a89df Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Wed, 8 Jul 2020 21:55:59 +0300 Subject: [PATCH] use ActivePrimaryNodeList to include coordinator ActiveReadableWorkerNodeList doesn't include coordinator, however if coordinator is added as a worker, we should also include that while planning. The current methods are very easily misusable and this requires a refactoring to make the distinction between methods that include coordinator and that don't very explicit as they can introduce subtle/major bugs pretty easily. --- .../operations/worker_node_manager.c | 1 - .../planner/multi_physical_planner.c | 4 +-- .../expected/coordinator_shouldhaveshards.out | 10 +++--- .../regress/expected/follower_single_node.out | 33 ++++++++++--------- src/test/regress/expected/single_node.out | 15 ++++++--- .../sql/coordinator_shouldhaveshards.sql | 2 ++ src/test/regress/sql/follower_single_node.sql | 15 ++++----- src/test/regress/sql/single_node.sql | 2 +- 8 files changed, 45 insertions(+), 37 deletions(-) diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 565215f15..8cd99adda 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -319,7 +319,6 @@ ActiveReadableNonCoordinatorNodeCount(void) return liveWorkerCount; } - /* * NodeIsCoordinator returns true if the given node represents the coordinator. */ diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 3e389e8a9..df8c4ec9b 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2107,7 +2107,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependentJobList, Var *partitionKey, static uint32 HashPartitionCount(void) { - uint32 groupCount = ActiveReadableNonCoordinatorNodeCount(); + uint32 groupCount = list_length(ActiveReadableNodeList()); double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0; uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode); @@ -5717,7 +5717,7 @@ AssignDualHashTaskList(List *taskList) * if subsequent jobs have a small number of tasks, we won't allocate the * tasks to the same worker repeatedly. */ - List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); + List *workerNodeList = ActiveReadableNodeList(); uint32 workerNodeCount = (uint32) list_length(workerNodeList); uint32 beginningNodeIndex = jobId % workerNodeCount; diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 40f239547..da38c3108 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -456,6 +456,7 @@ BEGIN; -- copying task INSERT INTO dist_table SELECT a + 1 FROM dist_table; ROLLBACK; +SET citus.shard_replication_factor TO 1; BEGIN; SET citus.shard_replication_factor TO 2; CREATE TABLE dist_table1(a int); @@ -476,11 +477,12 @@ RESET citus.enable_cte_inlining; DELETE FROM test; DROP TABLE test; DROP TABLE dist_table; +DROP TABLE ref; +NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE +CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 19 at PERFORM DROP SCHEMA coordinator_shouldhaveshards CASCADE; -NOTICE: drop cascades to 3 other objects -DETAIL: drop cascades to table ref -drop cascades to table ref_1503016 -drop cascades to table local +NOTICE: drop cascades to table local SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/follower_single_node.out b/src/test/regress/expected/follower_single_node.out index 785206709..9c5ae8a1b 100644 --- a/src/test/regress/expected/follower_single_node.out +++ b/src/test/regress/expected/follower_single_node.out @@ -34,6 +34,17 @@ CREATE TABLE local(c int, d int); INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5); INSERT INTO ref VALUES (1, 2), (5, 6), (7, 8); INSERT INTO local VALUES (1, 2), (3, 4), (7, 8); +-- Check repartion joins are supported +SET citus.enable_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +RESET citus.enable_repartition_joins; -- connect to the follower and check that a simple select query works, the follower -- is still in the default cluster and will send queries to the primary nodes \c - - - :follower_master_port @@ -102,15 +113,10 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; 7 | 8 | 7 | 8 (2 rows) --- Check repartion joins are support +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. -SET citus.enable_repartition_joins TO ON; -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. -RESET citus.enable_repartition_joins; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is in recovery mode -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; count @@ -226,15 +232,10 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; 7 | 8 | 7 | 8 (2 rows) --- Check repartion joins are support +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. -SET citus.enable_repartition_joins TO ON; -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. -RESET citus.enable_repartition_joins; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is in recovery mode -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; count diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 31561e4fb..a52699926 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -101,14 +101,19 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; 7 | 8 | 7 | 8 (2 rows) --- Check repartion joins are support +-- Check repartion joins are supported SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + RESET citus.enable_repartition_joins; -- INSERT SELECT router BEGIN; diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 0707be471..18ebcb55a 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -191,6 +191,7 @@ BEGIN; -- copying task INSERT INTO dist_table SELECT a + 1 FROM dist_table; ROLLBACK; +SET citus.shard_replication_factor TO 1; BEGIN; SET citus.shard_replication_factor TO 2; @@ -205,6 +206,7 @@ RESET citus.enable_cte_inlining; DELETE FROM test; DROP TABLE test; DROP TABLE dist_table; +DROP TABLE ref; DROP SCHEMA coordinator_shouldhaveshards CASCADE; diff --git a/src/test/regress/sql/follower_single_node.sql b/src/test/regress/sql/follower_single_node.sql index 8fc26346d..64467ddfb 100644 --- a/src/test/regress/sql/follower_single_node.sql +++ b/src/test/regress/sql/follower_single_node.sql @@ -18,6 +18,11 @@ INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5); INSERT INTO ref VALUES (1, 2), (5, 6), (7, 8); INSERT INTO local VALUES (1, 2), (3, 4), (7, 8); +-- Check repartion joins are supported +SET citus.enable_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +RESET citus.enable_repartition_joins; + -- connect to the follower and check that a simple select query works, the follower -- is still in the default cluster and will send queries to the primary nodes \c - - - :follower_master_port @@ -34,11 +39,8 @@ SELECT count(*) FROM local; SELECT * FROM local ORDER BY c; SELECT * FROM ref, local WHERE a = c ORDER BY a; --- Check repartion joins are support +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -SET citus.enable_repartition_joins TO ON; -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -RESET citus.enable_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; @@ -82,11 +84,8 @@ SELECT count(*) FROM local; SELECT * FROM local ORDER BY c; SELECT * FROM ref, local WHERE a = c ORDER BY a; --- Check repartion joins are support +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -SET citus.enable_repartition_joins TO ON; -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -RESET citus.enable_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index ce0e36285..340f89882 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -33,7 +33,7 @@ SELECT count(*) FROM local; SELECT * FROM local ORDER BY c; SELECT * FROM ref, local WHERE a = c ORDER BY a; --- Check repartion joins are support +-- Check repartion joins are supported SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;