From 510535f558fb2194da09579161b77bf683acdf07 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Mon, 13 Jul 2020 15:26:12 +0300 Subject: [PATCH] address feedback --- .../executor/multi_task_tracker_executor.c | 2 +- .../executor/repartition_join_execution.c | 4 +- .../transaction/worker_transaction.c | 2 +- src/include/distributed/worker_transaction.h | 5 +- .../regress/expected/follower_single_node.out | 95 ++++++++++++++++++- src/test/regress/expected/single_node.out | 41 +++++++- src/test/regress/sql/follower_single_node.sql | 41 ++++++++ src/test/regress/sql/single_node.sql | 15 ++- 8 files changed, 190 insertions(+), 15 deletions(-) diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index e804b0d8d..54ad1e8a6 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -209,7 +209,7 @@ MultiTaskTrackerExecute(Job *job) * assigning and checking the status of tasks. The second (temporary) hash * helps us in fetching results data from worker nodes to the master node. */ - List *workerNodeList = ActiveReadableNodeList(); + List *workerNodeList = ActivePrimaryNodeList(ShareLock); uint32 taskTrackerCount = (uint32) list_length(workerNodeList); /* connect as the current user for running queries */ diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 7809008de..045f4b4f5 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -104,7 +104,7 @@ CreateTemporarySchemasForMergeTasks(Job *topLeveLJob) { List *jobIds = ExtractJobsInJobTree(topLeveLJob); char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName()); - SendCommandToWorkersInParallel(ALL_DATA_NODES, createSchemasCommand, + SendCommandToWorkersInParallel(ALL_SHARD_NODES, createSchemasCommand, CitusExtensionOwnerName()); return jobIds; } @@ -191,7 +191,7 @@ GenerateJobCommands(List *jobIds, char *templateCommand) void DoRepartitionCleanup(List *jobIds) { - SendCommandToWorkersOptionalInParallel(ALL_DATA_NODES, GenerateDeleteJobsCommand( + SendCommandToWorkersOptionalInParallel(ALL_SHARD_NODES, GenerateDeleteJobsCommand( jobIds), CitusExtensionOwnerName()); } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 31a8e9073..bf65d1e04 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -131,7 +131,7 @@ List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { List *workerNodeList = NIL; - if (targetWorkerSet == ALL_DATA_NODES) + if (targetWorkerSet == ALL_SHARD_NODES) { workerNodeList = ActivePrimaryNodeList(lockMode); } diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 21aa47e34..3322596af 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -18,14 +18,13 @@ /* * TargetWorkerSet is used for determining the type of workers that a command - * is targeted to. Currently it doesn't include coordinator even if it is added - * as a worker. + * is targeted to. */ typedef enum TargetWorkerSet { NON_COORDINATOR_METADATA_NODES, NON_COORDINATOR_NODES, - ALL_DATA_NODES + ALL_SHARD_NODES } TargetWorkerSet; diff --git a/src/test/regress/expected/follower_single_node.out b/src/test/regress/expected/follower_single_node.out index 9c5ae8a1b..a4cd51561 100644 --- a/src/test/regress/expected/follower_single_node.out +++ b/src/test/regress/expected/follower_single_node.out @@ -44,6 +44,44 @@ SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; 5 | 6 | 4 | 5 (3 rows) +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +RESET citus.enable_single_hash_repartition_joins; +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + RESET citus.enable_repartition_joins; -- connect to the follower and check that a simple select query works, the follower -- is still in the default cluster and will send queries to the primary nodes @@ -113,10 +151,29 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; 7 | 8 | 7 | 8 (2 rows) -SET citus.enable_repartition_joins TO ON; +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; ERROR: writing to worker nodes is not currently allowed -DETAIL: the database is in recovery mode +DETAIL: the database is read-only +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; count @@ -142,6 +199,30 @@ SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); --------------------------------------------------------------------- (0 rows) +SET citus.task_assignment_policy TO 'greedy'; +SELECT count(*) FROM test WHERE false; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); + count +--------------------------------------------------------------------- +(0 rows) + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT count(*) FROM test WHERE false; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); + count +--------------------------------------------------------------------- +(0 rows) + RESET citus.task_assignment_policy; -- now, connect to the follower but tell it to use secondary nodes. There are no -- secondary nodes so this should fail. @@ -232,10 +313,16 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; 7 | 8 | 7 | 8 (2 rows) -SET citus.enable_repartition_joins TO ON; +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; ERROR: writing to worker nodes is not currently allowed -DETAIL: the database is in recovery mode +DETAIL: the database is read-only +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; count diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index cb50b4917..e71c3ad79 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -102,9 +102,6 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; (2 rows) -- Check repartion joins are supported -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: the query contains a join that requires repartitioning -HINT: Set citus.enable_repartition_joins to on to enable repartitioning SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; x | y | x | y @@ -114,7 +111,45 @@ SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; 5 | 6 | 4 | 5 (3 rows) +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- INSERT SELECT router BEGIN; INSERT INTO test(x, y) SELECT x, y FROM test WHERE x = 1; diff --git a/src/test/regress/sql/follower_single_node.sql b/src/test/regress/sql/follower_single_node.sql index 91aa27abe..9d0d38db1 100644 --- a/src/test/regress/sql/follower_single_node.sql +++ b/src/test/regress/sql/follower_single_node.sql @@ -21,8 +21,24 @@ INSERT INTO local VALUES (1, 2), (3, 4), (7, 8); -- Check repartion joins are supported SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +RESET citus.enable_single_hash_repartition_joins; + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + RESET citus.enable_repartition_joins; + + -- connect to the follower and check that a simple select query works, the follower -- is still in the default cluster and will send queries to the primary nodes \c - - - :follower_master_port @@ -41,6 +57,21 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; @@ -49,6 +80,12 @@ SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); SET citus.task_assignment_policy TO 'round-robin'; SELECT count(*) FROM test WHERE false; SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); +SET citus.task_assignment_policy TO 'greedy'; +SELECT count(*) FROM test WHERE false; +SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); +SET citus.task_assignment_policy TO 'first-replica'; +SELECT count(*) FROM test WHERE false; +SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); RESET citus.task_assignment_policy; @@ -86,6 +123,10 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 3c2fb35ab..e11701ef5 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -34,10 +34,23 @@ SELECT * FROM local ORDER BY c; SELECT * FROM ref, local WHERE a = c ORDER BY a; -- Check repartion joins are supported -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- INSERT SELECT router BEGIN;