use ActivePrimaryNodeList to include coordinator

ActiveReadableWorkerNodeList doesn't include coordinator, however if
coordinator is added as a worker, we should also include that while
planning. The current methods are very easily misusable and this
requires a refactoring to make the distinction between methods that
include coordinator and that don't very explicit as they can introduce
subtle/major bugs pretty easily.
pull/4005/head
Sait Talha Nisanci 2020-07-08 21:55:59 +03:00
parent db1b78148c
commit d97d03ec65
8 changed files with 45 additions and 37 deletions

View File

@ -319,7 +319,6 @@ ActiveReadableNonCoordinatorNodeCount(void)
return liveWorkerCount; return liveWorkerCount;
} }
/* /*
* NodeIsCoordinator returns true if the given node represents the coordinator. * NodeIsCoordinator returns true if the given node represents the coordinator.
*/ */

View File

@ -2107,7 +2107,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependentJobList, Var *partitionKey,
static uint32 static uint32
HashPartitionCount(void) HashPartitionCount(void)
{ {
uint32 groupCount = ActiveReadableNonCoordinatorNodeCount(); uint32 groupCount = list_length(ActiveReadableNodeList());
double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0; double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0;
uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode); uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode);
@ -5717,7 +5717,7 @@ AssignDualHashTaskList(List *taskList)
* if subsequent jobs have a small number of tasks, we won't allocate the * if subsequent jobs have a small number of tasks, we won't allocate the
* tasks to the same worker repeatedly. * tasks to the same worker repeatedly.
*/ */
List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); List *workerNodeList = ActiveReadableNodeList();
uint32 workerNodeCount = (uint32) list_length(workerNodeList); uint32 workerNodeCount = (uint32) list_length(workerNodeList);
uint32 beginningNodeIndex = jobId % workerNodeCount; uint32 beginningNodeIndex = jobId % workerNodeCount;

View File

@ -456,6 +456,7 @@ BEGIN;
-- copying task -- copying task
INSERT INTO dist_table SELECT a + 1 FROM dist_table; INSERT INTO dist_table SELECT a + 1 FROM dist_table;
ROLLBACK; ROLLBACK;
SET citus.shard_replication_factor TO 1;
BEGIN; BEGIN;
SET citus.shard_replication_factor TO 2; SET citus.shard_replication_factor TO 2;
CREATE TABLE dist_table1(a int); CREATE TABLE dist_table1(a int);
@ -476,11 +477,12 @@ RESET citus.enable_cte_inlining;
DELETE FROM test; DELETE FROM test;
DROP TABLE test; DROP TABLE test;
DROP TABLE dist_table; DROP TABLE dist_table;
DROP TABLE ref;
NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)"
PL/pgSQL function citus_drop_trigger() line 19 at PERFORM
DROP SCHEMA coordinator_shouldhaveshards CASCADE; DROP SCHEMA coordinator_shouldhaveshards CASCADE;
NOTICE: drop cascades to 3 other objects NOTICE: drop cascades to table local
DETAIL: drop cascades to table ref
drop cascades to table ref_1503016
drop cascades to table local
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -34,6 +34,17 @@ CREATE TABLE local(c int, d int);
INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5); INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5);
INSERT INTO ref VALUES (1, 2), (5, 6), (7, 8); INSERT INTO ref VALUES (1, 2), (5, 6), (7, 8);
INSERT INTO local VALUES (1, 2), (3, 4), (7, 8); INSERT INTO local VALUES (1, 2), (3, 4), (7, 8);
-- Check repartion joins are supported
SET citus.enable_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
x | y | x | y
---------------------------------------------------------------------
2 | 7 | 1 | 2
4 | 5 | 3 | 4
5 | 6 | 4 | 5
(3 rows)
RESET citus.enable_repartition_joins;
-- connect to the follower and check that a simple select query works, the follower -- connect to the follower and check that a simple select query works, the follower
-- is still in the default cluster and will send queries to the primary nodes -- is still in the default cluster and will send queries to the primary nodes
\c - - - :follower_master_port \c - - - :follower_master_port
@ -102,15 +113,10 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a;
7 | 8 | 7 | 8 7 | 8 | 7 | 8
(2 rows) (2 rows)
-- Check repartion joins are support
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
ERROR: floating-point exception
DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero.
SET citus.enable_repartition_joins TO ON; SET citus.enable_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
ERROR: floating-point exception ERROR: writing to worker nodes is not currently allowed
DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. DETAIL: the database is in recovery mode
RESET citus.enable_repartition_joins;
-- Confirm that dummy placements work -- Confirm that dummy placements work
SELECT count(*) FROM test WHERE false; SELECT count(*) FROM test WHERE false;
count count
@ -226,15 +232,10 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a;
7 | 8 | 7 | 8 7 | 8 | 7 | 8
(2 rows) (2 rows)
-- Check repartion joins are support
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
ERROR: floating-point exception
DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero.
SET citus.enable_repartition_joins TO ON; SET citus.enable_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
ERROR: floating-point exception ERROR: writing to worker nodes is not currently allowed
DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. DETAIL: the database is in recovery mode
RESET citus.enable_repartition_joins;
-- Confirm that dummy placements work -- Confirm that dummy placements work
SELECT count(*) FROM test WHERE false; SELECT count(*) FROM test WHERE false;
count count

View File

@ -101,14 +101,19 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a;
7 | 8 | 7 | 8 7 | 8 | 7 | 8
(2 rows) (2 rows)
-- Check repartion joins are support -- Check repartion joins are supported
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
ERROR: floating-point exception ERROR: the query contains a join that requires repartitioning
DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. HINT: Set citus.enable_repartition_joins to on to enable repartitioning
SET citus.enable_repartition_joins TO ON; SET citus.enable_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
ERROR: floating-point exception x | y | x | y
DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. ---------------------------------------------------------------------
2 | 7 | 1 | 2
4 | 5 | 3 | 4
5 | 6 | 4 | 5
(3 rows)
RESET citus.enable_repartition_joins; RESET citus.enable_repartition_joins;
-- INSERT SELECT router -- INSERT SELECT router
BEGIN; BEGIN;

View File

@ -191,6 +191,7 @@ BEGIN;
-- copying task -- copying task
INSERT INTO dist_table SELECT a + 1 FROM dist_table; INSERT INTO dist_table SELECT a + 1 FROM dist_table;
ROLLBACK; ROLLBACK;
SET citus.shard_replication_factor TO 1;
BEGIN; BEGIN;
SET citus.shard_replication_factor TO 2; SET citus.shard_replication_factor TO 2;
@ -205,6 +206,7 @@ RESET citus.enable_cte_inlining;
DELETE FROM test; DELETE FROM test;
DROP TABLE test; DROP TABLE test;
DROP TABLE dist_table; DROP TABLE dist_table;
DROP TABLE ref;
DROP SCHEMA coordinator_shouldhaveshards CASCADE; DROP SCHEMA coordinator_shouldhaveshards CASCADE;

View File

@ -18,6 +18,11 @@ INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5);
INSERT INTO ref VALUES (1, 2), (5, 6), (7, 8); INSERT INTO ref VALUES (1, 2), (5, 6), (7, 8);
INSERT INTO local VALUES (1, 2), (3, 4), (7, 8); INSERT INTO local VALUES (1, 2), (3, 4), (7, 8);
-- Check repartion joins are supported
SET citus.enable_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
RESET citus.enable_repartition_joins;
-- connect to the follower and check that a simple select query works, the follower -- connect to the follower and check that a simple select query works, the follower
-- is still in the default cluster and will send queries to the primary nodes -- is still in the default cluster and will send queries to the primary nodes
\c - - - :follower_master_port \c - - - :follower_master_port
@ -34,11 +39,8 @@ SELECT count(*) FROM local;
SELECT * FROM local ORDER BY c; SELECT * FROM local ORDER BY c;
SELECT * FROM ref, local WHERE a = c ORDER BY a; SELECT * FROM ref, local WHERE a = c ORDER BY a;
-- Check repartion joins are support
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.enable_repartition_joins TO ON; SET citus.enable_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
RESET citus.enable_repartition_joins;
-- Confirm that dummy placements work -- Confirm that dummy placements work
SELECT count(*) FROM test WHERE false; SELECT count(*) FROM test WHERE false;
@ -82,11 +84,8 @@ SELECT count(*) FROM local;
SELECT * FROM local ORDER BY c; SELECT * FROM local ORDER BY c;
SELECT * FROM ref, local WHERE a = c ORDER BY a; SELECT * FROM ref, local WHERE a = c ORDER BY a;
-- Check repartion joins are support
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.enable_repartition_joins TO ON; SET citus.enable_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
RESET citus.enable_repartition_joins;
-- Confirm that dummy placements work -- Confirm that dummy placements work
SELECT count(*) FROM test WHERE false; SELECT count(*) FROM test WHERE false;

View File

@ -33,7 +33,7 @@ SELECT count(*) FROM local;
SELECT * FROM local ORDER BY c; SELECT * FROM local ORDER BY c;
SELECT * FROM ref, local WHERE a = c ORDER BY a; SELECT * FROM ref, local WHERE a = c ORDER BY a;
-- Check repartion joins are support -- Check repartion joins are supported
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.enable_repartition_joins TO ON; SET citus.enable_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;