diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 3ca1a2e40..6bbdc02de 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -118,8 +118,7 @@ JobExecutorType(DistributedPlan *distributedPlan) } else { - List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); - int workerNodeCount = list_length(workerNodeList); + int workerNodeCount = list_length(ActiveReadableNodeList()); int taskCount = list_length(job->taskList); double tasksPerNode = taskCount / ((double) workerNodeCount); diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index d27fd2a9d..54ad1e8a6 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -209,7 +209,7 @@ MultiTaskTrackerExecute(Job *job) * assigning and checking the status of tasks. The second (temporary) hash * helps us in fetching results data from worker nodes to the master node. */ - List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); + List *workerNodeList = ActivePrimaryNodeList(ShareLock); uint32 taskTrackerCount = (uint32) list_length(workerNodeList); /* connect as the current user for running queries */ diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 696c1ad9f..045f4b4f5 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -104,7 +104,7 @@ CreateTemporarySchemasForMergeTasks(Job *topLeveLJob) { List *jobIds = ExtractJobsInJobTree(topLeveLJob); char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName()); - SendCommandToWorkersInParallel(NON_COORDINATOR_NODES, createSchemasCommand, + SendCommandToWorkersInParallel(ALL_SHARD_NODES, createSchemasCommand, CitusExtensionOwnerName()); return jobIds; } @@ -191,8 +191,8 @@ GenerateJobCommands(List *jobIds, char *templateCommand) void DoRepartitionCleanup(List *jobIds) { - SendCommandToWorkersOptionalInParallel(NON_COORDINATOR_NODES, - GenerateDeleteJobsCommand(jobIds), + SendCommandToWorkersOptionalInParallel(ALL_SHARD_NODES, GenerateDeleteJobsCommand( + jobIds), CitusExtensionOwnerName()); } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 3e389e8a9..df8c4ec9b 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2107,7 +2107,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependentJobList, Var *partitionKey, static uint32 HashPartitionCount(void) { - uint32 groupCount = ActiveReadableNonCoordinatorNodeCount(); + uint32 groupCount = list_length(ActiveReadableNodeList()); double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0; uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode); @@ -5717,7 +5717,7 @@ AssignDualHashTaskList(List *taskList) * if subsequent jobs have a small number of tasks, we won't allocate the * tasks to the same worker repeatedly. */ - List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); + List *workerNodeList = ActiveReadableNodeList(); uint32 workerNodeCount = (uint32) list_length(workerNodeList); uint32 beginningNodeIndex = jobId % workerNodeCount; diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 1415e1d4a..bf65d1e04 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -130,7 +130,15 @@ SendCommandToWorkersWithMetadata(const char *command) List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { - List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); + List *workerNodeList = NIL; + if (targetWorkerSet == ALL_SHARD_NODES) + { + workerNodeList = ActivePrimaryNodeList(lockMode); + } + else + { + workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); + } List *result = NIL; WorkerNode *workerNode = NULL; diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 7ea24fd97..3322596af 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -18,13 +18,13 @@ /* * TargetWorkerSet is used for determining the type of workers that a command - * is targeted to. Currently it doesn't include coordinator even if it is added - * as a worker. + * is targeted to. */ typedef enum TargetWorkerSet { NON_COORDINATOR_METADATA_NODES, - NON_COORDINATOR_NODES + NON_COORDINATOR_NODES, + ALL_SHARD_NODES } TargetWorkerSet; diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 58ad5cb09..da38c3108 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -198,13 +198,23 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato (1 row) ROLLBACK; --- Commented out since it currently does not clean up task files on the --- coordinator --- See this issue for details: https://github.com/citusdata/citus/issues/3996 --- BEGIN; --- SET citus.enable_repartition_joins TO ON; --- SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; --- ROLLBACK; +-- repartition queries should work fine +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; + count +--------------------------------------------------------------------- + 100 +(1 row) + +BEGIN; +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; + count +--------------------------------------------------------------------- + 100 +(1 row) + +END; BEGIN; SET citus.enable_repartition_joins TO ON; -- trigger local execution @@ -446,6 +456,7 @@ BEGIN; -- copying task INSERT INTO dist_table SELECT a + 1 FROM dist_table; ROLLBACK; +SET citus.shard_replication_factor TO 1; BEGIN; SET citus.shard_replication_factor TO 2; CREATE TABLE dist_table1(a int); @@ -466,11 +477,12 @@ RESET citus.enable_cte_inlining; DELETE FROM test; DROP TABLE test; DROP TABLE dist_table; +DROP TABLE ref; +NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE +CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 19 at PERFORM DROP SCHEMA coordinator_shouldhaveshards CASCADE; -NOTICE: drop cascades to 3 other objects -DETAIL: drop cascades to table ref -drop cascades to table ref_1503016 -drop cascades to table local +NOTICE: drop cascades to table local SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/follower_single_node.out b/src/test/regress/expected/follower_single_node.out index 785206709..a4cd51561 100644 --- a/src/test/regress/expected/follower_single_node.out +++ b/src/test/regress/expected/follower_single_node.out @@ -34,6 +34,55 @@ CREATE TABLE local(c int, d int); INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5); INSERT INTO ref VALUES (1, 2), (5, 6), (7, 8); INSERT INTO local VALUES (1, 2), (3, 4), (7, 8); +-- Check repartion joins are supported +SET citus.enable_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +RESET citus.enable_single_hash_repartition_joins; +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +RESET citus.enable_repartition_joins; -- connect to the follower and check that a simple select query works, the follower -- is still in the default cluster and will send queries to the primary nodes \c - - - :follower_master_port @@ -102,15 +151,29 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; 7 | 8 | 7 | 8 (2 rows) --- Check repartion joins are support -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; count @@ -136,6 +199,30 @@ SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); --------------------------------------------------------------------- (0 rows) +SET citus.task_assignment_policy TO 'greedy'; +SELECT count(*) FROM test WHERE false; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); + count +--------------------------------------------------------------------- +(0 rows) + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT count(*) FROM test WHERE false; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); + count +--------------------------------------------------------------------- +(0 rows) + RESET citus.task_assignment_policy; -- now, connect to the follower but tell it to use secondary nodes. There are no -- secondary nodes so this should fail. @@ -226,15 +313,16 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; 7 | 8 | 7 | 8 (2 rows) --- Check repartion joins are support -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; count diff --git a/src/test/regress/expected/multi_follower_select_statements.out b/src/test/regress/expected/multi_follower_select_statements.out index 3f4340d61..73ce25895 100644 --- a/src/test/regress/expected/multi_follower_select_statements.out +++ b/src/test/regress/expected/multi_follower_select_statements.out @@ -33,6 +33,21 @@ SELECT create_distributed_table('stock','s_w_id'); (1 row) INSERT INTO stock SELECT c, c, c FROM generate_series(1, 5) as c; +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM the_table t1 JOIN the_table t2 USING(b); + count +--------------------------------------------------------------------- + 2 +(1 row) + +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT count(*) FROM the_table t1 , the_table t2 WHERE t1.a = t2.b; + count +--------------------------------------------------------------------- + 2 +(1 row) + +RESET citus.enable_repartition_joins; -- connect to the follower and check that a simple select query works, the follower -- is still in the default cluster and will send queries to the primary nodes \c - - - :follower_master_port @@ -100,6 +115,14 @@ order by s_i_id; 5 | 5 (3 rows) +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM the_table t1 JOIN the_table t2 USING(b); +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT count(*) FROM the_table t1 , the_table t2 WHERE t1.a = t2.b; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only SELECT node_name, node_port FROM diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 31561e4fb..e71c3ad79 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -101,15 +101,55 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; 7 | 8 | 7 | 8 (2 rows) --- Check repartion joins are support -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. +-- Check repartion joins are supported SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- INSERT SELECT router BEGIN; INSERT INTO test(x, y) SELECT x, y FROM test WHERE x = 1; @@ -226,6 +266,51 @@ SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); (0 rows) RESET citus.task_assignment_policy; +-- single node task tracker tests: +SET citus.task_executor_type to 'task-tracker'; +SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 5 +(1 row) + +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO ref(a, b) SELECT x, y FROM test; +SELECT count(*) from ref; +ERROR: cannot switch local execution status from local execution required to local execution disabled since it can cause visibility problems in the current transaction +ROLLBACK; +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO ref(a, b) SELECT c, d FROM local; +SELECT count(*) from ref; + count +--------------------------------------------------------------------- + 6 +(1 row) + +ROLLBACK; +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO local(c, d) SELECT x, y FROM test; +SELECT count(*) from local; + count +--------------------------------------------------------------------- + 8 +(1 row) + +ROLLBACK; +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO local(c, d) SELECT a, b FROM ref; +SELECT count(*) from local; + count +--------------------------------------------------------------------- + 6 +(1 row) + +ROLLBACK; +RESET citus.task_executor_type; -- Cleanup SET client_min_messages TO WARNING; DROP SCHEMA single_node CASCADE; diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 8816579d2..18ebcb55a 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -87,13 +87,14 @@ SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none'); SELECT count(*) FROM dist_table; ROLLBACK; --- Commented out since it currently does not clean up task files on the --- coordinator --- See this issue for details: https://github.com/citusdata/citus/issues/3996 --- BEGIN; --- SET citus.enable_repartition_joins TO ON; --- SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; --- ROLLBACK; +-- repartition queries should work fine +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; + +BEGIN; +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; +END; BEGIN; SET citus.enable_repartition_joins TO ON; @@ -190,6 +191,7 @@ BEGIN; -- copying task INSERT INTO dist_table SELECT a + 1 FROM dist_table; ROLLBACK; +SET citus.shard_replication_factor TO 1; BEGIN; SET citus.shard_replication_factor TO 2; @@ -204,6 +206,7 @@ RESET citus.enable_cte_inlining; DELETE FROM test; DROP TABLE test; DROP TABLE dist_table; +DROP TABLE ref; DROP SCHEMA coordinator_shouldhaveshards CASCADE; diff --git a/src/test/regress/sql/follower_single_node.sql b/src/test/regress/sql/follower_single_node.sql index 8fc26346d..9d0d38db1 100644 --- a/src/test/regress/sql/follower_single_node.sql +++ b/src/test/regress/sql/follower_single_node.sql @@ -18,6 +18,27 @@ INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5); INSERT INTO ref VALUES (1, 2), (5, 6), (7, 8); INSERT INTO local VALUES (1, 2), (3, 4), (7, 8); +-- Check repartion joins are supported +SET citus.enable_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +RESET citus.enable_single_hash_repartition_joins; + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +RESET citus.enable_repartition_joins; + + + -- connect to the follower and check that a simple select query works, the follower -- is still in the default cluster and will send queries to the primary nodes \c - - - :follower_master_port @@ -34,11 +55,23 @@ SELECT count(*) FROM local; SELECT * FROM local ORDER BY c; SELECT * FROM ref, local WHERE a = c ORDER BY a; --- Check repartion joins are support -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; @@ -47,6 +80,12 @@ SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); SET citus.task_assignment_policy TO 'round-robin'; SELECT count(*) FROM test WHERE false; SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); +SET citus.task_assignment_policy TO 'greedy'; +SELECT count(*) FROM test WHERE false; +SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); +SET citus.task_assignment_policy TO 'first-replica'; +SELECT count(*) FROM test WHERE false; +SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); RESET citus.task_assignment_policy; @@ -82,11 +121,12 @@ SELECT count(*) FROM local; SELECT * FROM local ORDER BY c; SELECT * FROM ref, local WHERE a = c ORDER BY a; --- Check repartion joins are support -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; diff --git a/src/test/regress/sql/multi_follower_select_statements.sql b/src/test/regress/sql/multi_follower_select_statements.sql index 9fa8dc8ba..436b08cda 100644 --- a/src/test/regress/sql/multi_follower_select_statements.sql +++ b/src/test/regress/sql/multi_follower_select_statements.sql @@ -21,6 +21,14 @@ SELECT create_distributed_table('stock','s_w_id'); INSERT INTO stock SELECT c, c, c FROM generate_series(1, 5) as c; +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM the_table t1 JOIN the_table t2 USING(b); + +SET citus.enable_single_hash_repartition_joins TO ON; + +SELECT count(*) FROM the_table t1 , the_table t2 WHERE t1.a = t2.b; +RESET citus.enable_repartition_joins; + -- connect to the follower and check that a simple select query works, the follower -- is still in the default cluster and will send queries to the primary nodes @@ -66,6 +74,13 @@ group by s_i_id having sum(s_order_cnt) > (select max(s_order_cnt) - 3 as having_query from stock) order by s_i_id; +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM the_table t1 JOIN the_table t2 USING(b); + +SET citus.enable_single_hash_repartition_joins TO ON; + +SELECT count(*) FROM the_table t1 , the_table t2 WHERE t1.a = t2.b; + SELECT node_name, node_port diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index ce0e36285..e11701ef5 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -33,11 +33,24 @@ SELECT count(*) FROM local; SELECT * FROM local ORDER BY c; SELECT * FROM ref, local WHERE a = c ORDER BY a; --- Check repartion joins are support -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +-- Check repartion joins are supported SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- INSERT SELECT router BEGIN; @@ -102,6 +115,35 @@ SELECT count(*) FROM test WHERE false; SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); RESET citus.task_assignment_policy; +-- single node task tracker tests: +SET citus.task_executor_type to 'task-tracker'; +SELECT count(*) FROM test; + +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO ref(a, b) SELECT x, y FROM test; +SELECT count(*) from ref; +ROLLBACK; + +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO ref(a, b) SELECT c, d FROM local; +SELECT count(*) from ref; +ROLLBACK; + +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO local(c, d) SELECT x, y FROM test; +SELECT count(*) from local; +ROLLBACK; + +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO local(c, d) SELECT a, b FROM ref; +SELECT count(*) from local; +ROLLBACK; + +RESET citus.task_executor_type; -- Cleanup SET client_min_messages TO WARNING;