address feedback

pull/4005/head
Sait Talha Nisanci 2020-07-13 15:26:12 +03:00
parent 41ec76a6ad
commit 510535f558
8 changed files with 190 additions and 15 deletions

View File

@ -209,7 +209,7 @@ MultiTaskTrackerExecute(Job *job)
* assigning and checking the status of tasks. The second (temporary) hash * assigning and checking the status of tasks. The second (temporary) hash
* helps us in fetching results data from worker nodes to the master node. * helps us in fetching results data from worker nodes to the master node.
*/ */
List *workerNodeList = ActiveReadableNodeList(); List *workerNodeList = ActivePrimaryNodeList(ShareLock);
uint32 taskTrackerCount = (uint32) list_length(workerNodeList); uint32 taskTrackerCount = (uint32) list_length(workerNodeList);
/* connect as the current user for running queries */ /* connect as the current user for running queries */

View File

@ -104,7 +104,7 @@ CreateTemporarySchemasForMergeTasks(Job *topLeveLJob)
{ {
List *jobIds = ExtractJobsInJobTree(topLeveLJob); List *jobIds = ExtractJobsInJobTree(topLeveLJob);
char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName()); char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName());
SendCommandToWorkersInParallel(ALL_DATA_NODES, createSchemasCommand, SendCommandToWorkersInParallel(ALL_SHARD_NODES, createSchemasCommand,
CitusExtensionOwnerName()); CitusExtensionOwnerName());
return jobIds; return jobIds;
} }
@ -191,7 +191,7 @@ GenerateJobCommands(List *jobIds, char *templateCommand)
void void
DoRepartitionCleanup(List *jobIds) DoRepartitionCleanup(List *jobIds)
{ {
SendCommandToWorkersOptionalInParallel(ALL_DATA_NODES, GenerateDeleteJobsCommand( SendCommandToWorkersOptionalInParallel(ALL_SHARD_NODES, GenerateDeleteJobsCommand(
jobIds), jobIds),
CitusExtensionOwnerName()); CitusExtensionOwnerName());
} }

View File

@ -131,7 +131,7 @@ List *
TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
{ {
List *workerNodeList = NIL; List *workerNodeList = NIL;
if (targetWorkerSet == ALL_DATA_NODES) if (targetWorkerSet == ALL_SHARD_NODES)
{ {
workerNodeList = ActivePrimaryNodeList(lockMode); workerNodeList = ActivePrimaryNodeList(lockMode);
} }

View File

@ -18,14 +18,13 @@
/* /*
* TargetWorkerSet is used for determining the type of workers that a command * TargetWorkerSet is used for determining the type of workers that a command
* is targeted to. Currently it doesn't include coordinator even if it is added * is targeted to.
* as a worker.
*/ */
typedef enum TargetWorkerSet typedef enum TargetWorkerSet
{ {
NON_COORDINATOR_METADATA_NODES, NON_COORDINATOR_METADATA_NODES,
NON_COORDINATOR_NODES, NON_COORDINATOR_NODES,
ALL_DATA_NODES ALL_SHARD_NODES
} TargetWorkerSet; } TargetWorkerSet;

View File

@ -44,6 +44,44 @@ SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
5 | 6 | 4 | 5 5 | 6 | 4 | 5
(3 rows) (3 rows)
SET citus.enable_single_hash_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
x | y | x | y
---------------------------------------------------------------------
2 | 7 | 1 | 2
4 | 5 | 3 | 4
5 | 6 | 4 | 5
(3 rows)
RESET citus.enable_single_hash_repartition_joins;
SET citus.task_assignment_policy TO 'round-robin';
SET citus.enable_single_hash_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
x | y | x | y
---------------------------------------------------------------------
2 | 7 | 1 | 2
4 | 5 | 3 | 4
5 | 6 | 4 | 5
(3 rows)
SET citus.task_assignment_policy TO 'greedy';
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
x | y | x | y
---------------------------------------------------------------------
2 | 7 | 1 | 2
4 | 5 | 3 | 4
5 | 6 | 4 | 5
(3 rows)
SET citus.task_assignment_policy TO 'first-replica';
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
x | y | x | y
---------------------------------------------------------------------
2 | 7 | 1 | 2
4 | 5 | 3 | 4
5 | 6 | 4 | 5
(3 rows)
RESET citus.enable_repartition_joins; RESET citus.enable_repartition_joins;
-- connect to the follower and check that a simple select query works, the follower -- connect to the follower and check that a simple select query works, the follower
-- is still in the default cluster and will send queries to the primary nodes -- is still in the default cluster and will send queries to the primary nodes
@ -116,7 +154,26 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a;
SET citus.enable_repartition_joins TO ON; SET citus.enable_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
ERROR: writing to worker nodes is not currently allowed ERROR: writing to worker nodes is not currently allowed
DETAIL: the database is in recovery mode DETAIL: the database is read-only
SET citus.enable_single_hash_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
ERROR: writing to worker nodes is not currently allowed
DETAIL: the database is read-only
SET citus.task_assignment_policy TO 'round-robin';
SET citus.enable_single_hash_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
ERROR: writing to worker nodes is not currently allowed
DETAIL: the database is read-only
SET citus.task_assignment_policy TO 'greedy';
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
ERROR: writing to worker nodes is not currently allowed
DETAIL: the database is read-only
SET citus.task_assignment_policy TO 'first-replica';
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
ERROR: writing to worker nodes is not currently allowed
DETAIL: the database is read-only
RESET citus.enable_repartition_joins;
RESET citus.enable_single_hash_repartition_joins;
-- Confirm that dummy placements work -- Confirm that dummy placements work
SELECT count(*) FROM test WHERE false; SELECT count(*) FROM test WHERE false;
count count
@ -142,6 +199,30 @@ SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
SET citus.task_assignment_policy TO 'greedy';
SELECT count(*) FROM test WHERE false;
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
count
---------------------------------------------------------------------
(0 rows)
SET citus.task_assignment_policy TO 'first-replica';
SELECT count(*) FROM test WHERE false;
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
count
---------------------------------------------------------------------
(0 rows)
RESET citus.task_assignment_policy; RESET citus.task_assignment_policy;
-- now, connect to the follower but tell it to use secondary nodes. There are no -- now, connect to the follower but tell it to use secondary nodes. There are no
-- secondary nodes so this should fail. -- secondary nodes so this should fail.
@ -235,7 +316,13 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a;
SET citus.enable_repartition_joins TO ON; SET citus.enable_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
ERROR: writing to worker nodes is not currently allowed ERROR: writing to worker nodes is not currently allowed
DETAIL: the database is in recovery mode DETAIL: the database is read-only
SET citus.enable_single_hash_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
ERROR: writing to worker nodes is not currently allowed
DETAIL: the database is read-only
RESET citus.enable_repartition_joins;
RESET citus.enable_single_hash_repartition_joins;
-- Confirm that dummy placements work -- Confirm that dummy placements work
SELECT count(*) FROM test WHERE false; SELECT count(*) FROM test WHERE false;
count count

View File

@ -102,9 +102,6 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a;
(2 rows) (2 rows)
-- Check repartion joins are supported -- Check repartion joins are supported
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
SET citus.enable_repartition_joins TO ON; SET citus.enable_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
x | y | x | y x | y | x | y
@ -114,7 +111,45 @@ SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
5 | 6 | 4 | 5 5 | 6 | 4 | 5
(3 rows) (3 rows)
SET citus.enable_single_hash_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
x | y | x | y
---------------------------------------------------------------------
2 | 7 | 1 | 2
4 | 5 | 3 | 4
5 | 6 | 4 | 5
(3 rows)
SET citus.task_assignment_policy TO 'round-robin';
SET citus.enable_single_hash_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
x | y | x | y
---------------------------------------------------------------------
2 | 7 | 1 | 2
4 | 5 | 3 | 4
5 | 6 | 4 | 5
(3 rows)
SET citus.task_assignment_policy TO 'greedy';
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
x | y | x | y
---------------------------------------------------------------------
2 | 7 | 1 | 2
4 | 5 | 3 | 4
5 | 6 | 4 | 5
(3 rows)
SET citus.task_assignment_policy TO 'first-replica';
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
x | y | x | y
---------------------------------------------------------------------
2 | 7 | 1 | 2
4 | 5 | 3 | 4
5 | 6 | 4 | 5
(3 rows)
RESET citus.enable_repartition_joins; RESET citus.enable_repartition_joins;
RESET citus.enable_single_hash_repartition_joins;
-- INSERT SELECT router -- INSERT SELECT router
BEGIN; BEGIN;
INSERT INTO test(x, y) SELECT x, y FROM test WHERE x = 1; INSERT INTO test(x, y) SELECT x, y FROM test WHERE x = 1;

View File

@ -21,8 +21,24 @@ INSERT INTO local VALUES (1, 2), (3, 4), (7, 8);
-- Check repartion joins are supported -- Check repartion joins are supported
SET citus.enable_repartition_joins TO ON; SET citus.enable_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.enable_single_hash_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
RESET citus.enable_single_hash_repartition_joins;
SET citus.task_assignment_policy TO 'round-robin';
SET citus.enable_single_hash_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.task_assignment_policy TO 'greedy';
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.task_assignment_policy TO 'first-replica';
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
RESET citus.enable_repartition_joins; RESET citus.enable_repartition_joins;
-- connect to the follower and check that a simple select query works, the follower -- connect to the follower and check that a simple select query works, the follower
-- is still in the default cluster and will send queries to the primary nodes -- is still in the default cluster and will send queries to the primary nodes
\c - - - :follower_master_port \c - - - :follower_master_port
@ -41,6 +57,21 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a;
SET citus.enable_repartition_joins TO ON; SET citus.enable_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.enable_single_hash_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.task_assignment_policy TO 'round-robin';
SET citus.enable_single_hash_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.task_assignment_policy TO 'greedy';
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.task_assignment_policy TO 'first-replica';
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
RESET citus.enable_repartition_joins;
RESET citus.enable_single_hash_repartition_joins;
-- Confirm that dummy placements work -- Confirm that dummy placements work
SELECT count(*) FROM test WHERE false; SELECT count(*) FROM test WHERE false;
@ -49,6 +80,12 @@ SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
SET citus.task_assignment_policy TO 'round-robin'; SET citus.task_assignment_policy TO 'round-robin';
SELECT count(*) FROM test WHERE false; SELECT count(*) FROM test WHERE false;
SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
SET citus.task_assignment_policy TO 'greedy';
SELECT count(*) FROM test WHERE false;
SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
SET citus.task_assignment_policy TO 'first-replica';
SELECT count(*) FROM test WHERE false;
SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
RESET citus.task_assignment_policy; RESET citus.task_assignment_policy;
@ -86,6 +123,10 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a;
SET citus.enable_repartition_joins TO ON; SET citus.enable_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.enable_single_hash_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
RESET citus.enable_repartition_joins;
RESET citus.enable_single_hash_repartition_joins;
-- Confirm that dummy placements work -- Confirm that dummy placements work
SELECT count(*) FROM test WHERE false; SELECT count(*) FROM test WHERE false;

View File

@ -34,10 +34,23 @@ SELECT * FROM local ORDER BY c;
SELECT * FROM ref, local WHERE a = c ORDER BY a; SELECT * FROM ref, local WHERE a = c ORDER BY a;
-- Check repartion joins are supported -- Check repartion joins are supported
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.enable_repartition_joins TO ON; SET citus.enable_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.enable_single_hash_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.task_assignment_policy TO 'round-robin';
SET citus.enable_single_hash_repartition_joins TO ON;
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.task_assignment_policy TO 'greedy';
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
SET citus.task_assignment_policy TO 'first-replica';
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
RESET citus.enable_repartition_joins; RESET citus.enable_repartition_joins;
RESET citus.enable_single_hash_repartition_joins;
-- INSERT SELECT router -- INSERT SELECT router
BEGIN; BEGIN;