From db1b78148c877f0d1ad61d93a106694f494728f3 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Wed, 8 Jul 2020 21:05:58 +0300 Subject: [PATCH 1/5] send schema creation/cleanup to coordinator in repartitions We were using ALL_WORKERS TargetWorkerSet while sending temporary schema creation and cleanup. We(well mostly I) thought that ALL_WORKERS would also include coordinator when it is added as a worker. It turns out that it was FILTERING OUT the coordinator even if it is added as a worker to the cluster. So to have some context here, in repartitions, for each jobId we create (at least we were supposed to) a schema in each worker node in the cluster. Then we partition each shard table into some intermediate files, which is called the PARTITION step. So after this partition step each node has some intermediate files having tuples in those nodes. Then we fetch the partition files to necessary worker nodes, which is called the FETCH step. Then from the files we create intermediate tables in the temporarily created schemas, which is called a MERGE step. Then after evaluating the result, we remove the temporary schemas(one for each job ID in each node) and files. If node 1 has file1, and node 2 has file2 after PARTITION step, it is enough to either move file1 from node1 to node2 or vice versa. So we prune one of them. In the MERGE step, if the schema for a given jobID doesn't exist, the node tries to use the `public` schema if it is a superuser, which is actually added for testing in the past. So when we were not sending schema creation comands for each job ID to the coordinator(because we were using ALL_WORKERS flag, and it doesn't include the coordinator), we would basically not have any schemas for repartitions in the coordinator. The PARTITION step would be executed on the coordinator (because the tasks are generated in the planner part) and it wouldn't give us any error because it doesn't have anything to do with the temporary schemas(that we didn't create). But later two things would happen: - If by chance the fetch is pruned on the coordinator side, we the other nodes would fetch the partitioned files from the coordinator and execute the query as expected, because it has all the information. - If the fetch tasks are not pruned in the coordinator, in the MERGE step, the coordinator would either error out saying that the necessary schema doesn't exist, or it would try to create the temporary tables under public schema ( if it is a superuser). But then if we had the same task ID with different jobID it would fail saying that the table already exists, which is an error we were getting. In the first case, the query would work okay, but it would still not do the cleanup, hence we would leave the partitioned files from the PARTITION step there. Hence ensure_no_intermediate_data_leak would fail. To make things more explicit and prevent such bugs in the future, ALL_WORKERS is named as ALL_NON_COORD_WORKERS. And a new flag to return all the active nodes is added as ALL_DATA_NODES. For repartition case, we don't use the only-reference table nodes but this version makes the code simpler and there shouldn't be any significant performance issue with that. --- .../executor/repartition_join_execution.c | 6 ++--- .../transaction/worker_transaction.c | 10 +++++++- src/include/distributed/worker_transaction.h | 3 ++- .../expected/coordinator_shouldhaveshards.out | 24 +++++++++++++------ .../sql/coordinator_shouldhaveshards.sql | 15 ++++++------ 5 files changed, 39 insertions(+), 19 deletions(-) diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 696c1ad9f..7809008de 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -104,7 +104,7 @@ CreateTemporarySchemasForMergeTasks(Job *topLeveLJob) { List *jobIds = ExtractJobsInJobTree(topLeveLJob); char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName()); - SendCommandToWorkersInParallel(NON_COORDINATOR_NODES, createSchemasCommand, + SendCommandToWorkersInParallel(ALL_DATA_NODES, createSchemasCommand, CitusExtensionOwnerName()); return jobIds; } @@ -191,8 +191,8 @@ GenerateJobCommands(List *jobIds, char *templateCommand) void DoRepartitionCleanup(List *jobIds) { - SendCommandToWorkersOptionalInParallel(NON_COORDINATOR_NODES, - GenerateDeleteJobsCommand(jobIds), + SendCommandToWorkersOptionalInParallel(ALL_DATA_NODES, GenerateDeleteJobsCommand( + jobIds), CitusExtensionOwnerName()); } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 1415e1d4a..31a8e9073 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -130,7 +130,15 @@ SendCommandToWorkersWithMetadata(const char *command) List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { - List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); + List *workerNodeList = NIL; + if (targetWorkerSet == ALL_DATA_NODES) + { + workerNodeList = ActivePrimaryNodeList(lockMode); + } + else + { + workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); + } List *result = NIL; WorkerNode *workerNode = NULL; diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 7ea24fd97..21aa47e34 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -24,7 +24,8 @@ typedef enum TargetWorkerSet { NON_COORDINATOR_METADATA_NODES, - NON_COORDINATOR_NODES + NON_COORDINATOR_NODES, + ALL_DATA_NODES } TargetWorkerSet; diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 58ad5cb09..40f239547 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -198,13 +198,23 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato (1 row) ROLLBACK; --- Commented out since it currently does not clean up task files on the --- coordinator --- See this issue for details: https://github.com/citusdata/citus/issues/3996 --- BEGIN; --- SET citus.enable_repartition_joins TO ON; --- SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; --- ROLLBACK; +-- repartition queries should work fine +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; + count +--------------------------------------------------------------------- + 100 +(1 row) + +BEGIN; +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; + count +--------------------------------------------------------------------- + 100 +(1 row) + +END; BEGIN; SET citus.enable_repartition_joins TO ON; -- trigger local execution diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 8816579d2..0707be471 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -87,13 +87,14 @@ SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none'); SELECT count(*) FROM dist_table; ROLLBACK; --- Commented out since it currently does not clean up task files on the --- coordinator --- See this issue for details: https://github.com/citusdata/citus/issues/3996 --- BEGIN; --- SET citus.enable_repartition_joins TO ON; --- SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; --- ROLLBACK; +-- repartition queries should work fine +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; + +BEGIN; +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; +END; BEGIN; SET citus.enable_repartition_joins TO ON; From d97d03ec65a01aa1e24a9261116aee48cf1a89df Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Wed, 8 Jul 2020 21:55:59 +0300 Subject: [PATCH 2/5] use ActivePrimaryNodeList to include coordinator ActiveReadableWorkerNodeList doesn't include coordinator, however if coordinator is added as a worker, we should also include that while planning. The current methods are very easily misusable and this requires a refactoring to make the distinction between methods that include coordinator and that don't very explicit as they can introduce subtle/major bugs pretty easily. --- .../operations/worker_node_manager.c | 1 - .../planner/multi_physical_planner.c | 4 +-- .../expected/coordinator_shouldhaveshards.out | 10 +++--- .../regress/expected/follower_single_node.out | 33 ++++++++++--------- src/test/regress/expected/single_node.out | 15 ++++++--- .../sql/coordinator_shouldhaveshards.sql | 2 ++ src/test/regress/sql/follower_single_node.sql | 15 ++++----- src/test/regress/sql/single_node.sql | 2 +- 8 files changed, 45 insertions(+), 37 deletions(-) diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 565215f15..8cd99adda 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -319,7 +319,6 @@ ActiveReadableNonCoordinatorNodeCount(void) return liveWorkerCount; } - /* * NodeIsCoordinator returns true if the given node represents the coordinator. */ diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 3e389e8a9..df8c4ec9b 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2107,7 +2107,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependentJobList, Var *partitionKey, static uint32 HashPartitionCount(void) { - uint32 groupCount = ActiveReadableNonCoordinatorNodeCount(); + uint32 groupCount = list_length(ActiveReadableNodeList()); double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0; uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode); @@ -5717,7 +5717,7 @@ AssignDualHashTaskList(List *taskList) * if subsequent jobs have a small number of tasks, we won't allocate the * tasks to the same worker repeatedly. */ - List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); + List *workerNodeList = ActiveReadableNodeList(); uint32 workerNodeCount = (uint32) list_length(workerNodeList); uint32 beginningNodeIndex = jobId % workerNodeCount; diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 40f239547..da38c3108 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -456,6 +456,7 @@ BEGIN; -- copying task INSERT INTO dist_table SELECT a + 1 FROM dist_table; ROLLBACK; +SET citus.shard_replication_factor TO 1; BEGIN; SET citus.shard_replication_factor TO 2; CREATE TABLE dist_table1(a int); @@ -476,11 +477,12 @@ RESET citus.enable_cte_inlining; DELETE FROM test; DROP TABLE test; DROP TABLE dist_table; +DROP TABLE ref; +NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE +CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 19 at PERFORM DROP SCHEMA coordinator_shouldhaveshards CASCADE; -NOTICE: drop cascades to 3 other objects -DETAIL: drop cascades to table ref -drop cascades to table ref_1503016 -drop cascades to table local +NOTICE: drop cascades to table local SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/follower_single_node.out b/src/test/regress/expected/follower_single_node.out index 785206709..9c5ae8a1b 100644 --- a/src/test/regress/expected/follower_single_node.out +++ b/src/test/regress/expected/follower_single_node.out @@ -34,6 +34,17 @@ CREATE TABLE local(c int, d int); INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5); INSERT INTO ref VALUES (1, 2), (5, 6), (7, 8); INSERT INTO local VALUES (1, 2), (3, 4), (7, 8); +-- Check repartion joins are supported +SET citus.enable_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +RESET citus.enable_repartition_joins; -- connect to the follower and check that a simple select query works, the follower -- is still in the default cluster and will send queries to the primary nodes \c - - - :follower_master_port @@ -102,15 +113,10 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; 7 | 8 | 7 | 8 (2 rows) --- Check repartion joins are support +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. -SET citus.enable_repartition_joins TO ON; -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. -RESET citus.enable_repartition_joins; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is in recovery mode -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; count @@ -226,15 +232,10 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; 7 | 8 | 7 | 8 (2 rows) --- Check repartion joins are support +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. -SET citus.enable_repartition_joins TO ON; -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. -RESET citus.enable_repartition_joins; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is in recovery mode -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; count diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 31561e4fb..a52699926 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -101,14 +101,19 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; 7 | 8 | 7 | 8 (2 rows) --- Check repartion joins are support +-- Check repartion joins are supported SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: floating-point exception -DETAIL: An invalid floating-point operation was signaled. This probably means an out-of-range result or an invalid operation, such as division by zero. + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + RESET citus.enable_repartition_joins; -- INSERT SELECT router BEGIN; diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 0707be471..18ebcb55a 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -191,6 +191,7 @@ BEGIN; -- copying task INSERT INTO dist_table SELECT a + 1 FROM dist_table; ROLLBACK; +SET citus.shard_replication_factor TO 1; BEGIN; SET citus.shard_replication_factor TO 2; @@ -205,6 +206,7 @@ RESET citus.enable_cte_inlining; DELETE FROM test; DROP TABLE test; DROP TABLE dist_table; +DROP TABLE ref; DROP SCHEMA coordinator_shouldhaveshards CASCADE; diff --git a/src/test/regress/sql/follower_single_node.sql b/src/test/regress/sql/follower_single_node.sql index 8fc26346d..64467ddfb 100644 --- a/src/test/regress/sql/follower_single_node.sql +++ b/src/test/regress/sql/follower_single_node.sql @@ -18,6 +18,11 @@ INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5); INSERT INTO ref VALUES (1, 2), (5, 6), (7, 8); INSERT INTO local VALUES (1, 2), (3, 4), (7, 8); +-- Check repartion joins are supported +SET citus.enable_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +RESET citus.enable_repartition_joins; + -- connect to the follower and check that a simple select query works, the follower -- is still in the default cluster and will send queries to the primary nodes \c - - - :follower_master_port @@ -34,11 +39,8 @@ SELECT count(*) FROM local; SELECT * FROM local ORDER BY c; SELECT * FROM ref, local WHERE a = c ORDER BY a; --- Check repartion joins are support +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -SET citus.enable_repartition_joins TO ON; -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -RESET citus.enable_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; @@ -82,11 +84,8 @@ SELECT count(*) FROM local; SELECT * FROM local ORDER BY c; SELECT * FROM ref, local WHERE a = c ORDER BY a; --- Check repartion joins are support +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -SET citus.enable_repartition_joins TO ON; -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -RESET citus.enable_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index ce0e36285..340f89882 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -33,7 +33,7 @@ SELECT count(*) FROM local; SELECT * FROM local ORDER BY c; SELECT * FROM ref, local WHERE a = c ORDER BY a; --- Check repartion joins are support +-- Check repartion joins are supported SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; From 41ec76a6ade9da3a79e368f7768986c2eb097e12 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Thu, 9 Jul 2020 15:32:11 +0300 Subject: [PATCH 3/5] use ActiveReadableNodeList in JobExecutorType and task tracker The reason we should use ActiveReadableNodeList instead of ActiveReadableNonCoordinatorNodeList is that if coordinator is added to cluster as a worker, it should be counted as well. Otherwise if there is only coordinator in the cluster, the count will be 0, hence we get a warning. In MultiTaskTrackerExecute, we should connect to coordinator if it is added to the cluster because it will also be assigned tasks. --- .../executor/multi_server_executor.c | 3 +- .../executor/multi_task_tracker_executor.c | 2 +- .../operations/worker_node_manager.c | 1 + src/test/regress/expected/single_node.out | 45 +++++++++++++++++++ src/test/regress/sql/follower_single_node.sql | 4 +- src/test/regress/sql/single_node.sql | 29 ++++++++++++ 6 files changed, 79 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 3ca1a2e40..6bbdc02de 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -118,8 +118,7 @@ JobExecutorType(DistributedPlan *distributedPlan) } else { - List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); - int workerNodeCount = list_length(workerNodeList); + int workerNodeCount = list_length(ActiveReadableNodeList()); int taskCount = list_length(job->taskList); double tasksPerNode = taskCount / ((double) workerNodeCount); diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index d27fd2a9d..e804b0d8d 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -209,7 +209,7 @@ MultiTaskTrackerExecute(Job *job) * assigning and checking the status of tasks. The second (temporary) hash * helps us in fetching results data from worker nodes to the master node. */ - List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); + List *workerNodeList = ActiveReadableNodeList(); uint32 taskTrackerCount = (uint32) list_length(workerNodeList); /* connect as the current user for running queries */ diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 8cd99adda..565215f15 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -319,6 +319,7 @@ ActiveReadableNonCoordinatorNodeCount(void) return liveWorkerCount; } + /* * NodeIsCoordinator returns true if the given node represents the coordinator. */ diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index a52699926..cb50b4917 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -231,6 +231,51 @@ SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); (0 rows) RESET citus.task_assignment_policy; +-- single node task tracker tests: +SET citus.task_executor_type to 'task-tracker'; +SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 5 +(1 row) + +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO ref(a, b) SELECT x, y FROM test; +SELECT count(*) from ref; +ERROR: cannot switch local execution status from local execution required to local execution disabled since it can cause visibility problems in the current transaction +ROLLBACK; +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO ref(a, b) SELECT c, d FROM local; +SELECT count(*) from ref; + count +--------------------------------------------------------------------- + 6 +(1 row) + +ROLLBACK; +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO local(c, d) SELECT x, y FROM test; +SELECT count(*) from local; + count +--------------------------------------------------------------------- + 8 +(1 row) + +ROLLBACK; +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO local(c, d) SELECT a, b FROM ref; +SELECT count(*) from local; + count +--------------------------------------------------------------------- + 6 +(1 row) + +ROLLBACK; +RESET citus.task_executor_type; -- Cleanup SET client_min_messages TO WARNING; DROP SCHEMA single_node CASCADE; diff --git a/src/test/regress/sql/follower_single_node.sql b/src/test/regress/sql/follower_single_node.sql index 64467ddfb..91aa27abe 100644 --- a/src/test/regress/sql/follower_single_node.sql +++ b/src/test/regress/sql/follower_single_node.sql @@ -39,7 +39,7 @@ SELECT count(*) FROM local; SELECT * FROM local ORDER BY c; SELECT * FROM ref, local WHERE a = c ORDER BY a; -SET citus.enable_repartition_joins TO ON; +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -- Confirm that dummy placements work @@ -84,7 +84,7 @@ SELECT count(*) FROM local; SELECT * FROM local ORDER BY c; SELECT * FROM ref, local WHERE a = c ORDER BY a; -SET citus.enable_repartition_joins TO ON; +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -- Confirm that dummy placements work diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 340f89882..3c2fb35ab 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -102,6 +102,35 @@ SELECT count(*) FROM test WHERE false; SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); RESET citus.task_assignment_policy; +-- single node task tracker tests: +SET citus.task_executor_type to 'task-tracker'; +SELECT count(*) FROM test; + +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO ref(a, b) SELECT x, y FROM test; +SELECT count(*) from ref; +ROLLBACK; + +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO ref(a, b) SELECT c, d FROM local; +SELECT count(*) from ref; +ROLLBACK; + +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO local(c, d) SELECT x, y FROM test; +SELECT count(*) from local; +ROLLBACK; + +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO local(c, d) SELECT a, b FROM ref; +SELECT count(*) from local; +ROLLBACK; + +RESET citus.task_executor_type; -- Cleanup SET client_min_messages TO WARNING; From 510535f558fb2194da09579161b77bf683acdf07 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Mon, 13 Jul 2020 15:26:12 +0300 Subject: [PATCH 4/5] address feedback --- .../executor/multi_task_tracker_executor.c | 2 +- .../executor/repartition_join_execution.c | 4 +- .../transaction/worker_transaction.c | 2 +- src/include/distributed/worker_transaction.h | 5 +- .../regress/expected/follower_single_node.out | 95 ++++++++++++++++++- src/test/regress/expected/single_node.out | 41 +++++++- src/test/regress/sql/follower_single_node.sql | 41 ++++++++ src/test/regress/sql/single_node.sql | 15 ++- 8 files changed, 190 insertions(+), 15 deletions(-) diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index e804b0d8d..54ad1e8a6 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -209,7 +209,7 @@ MultiTaskTrackerExecute(Job *job) * assigning and checking the status of tasks. The second (temporary) hash * helps us in fetching results data from worker nodes to the master node. */ - List *workerNodeList = ActiveReadableNodeList(); + List *workerNodeList = ActivePrimaryNodeList(ShareLock); uint32 taskTrackerCount = (uint32) list_length(workerNodeList); /* connect as the current user for running queries */ diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 7809008de..045f4b4f5 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -104,7 +104,7 @@ CreateTemporarySchemasForMergeTasks(Job *topLeveLJob) { List *jobIds = ExtractJobsInJobTree(topLeveLJob); char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName()); - SendCommandToWorkersInParallel(ALL_DATA_NODES, createSchemasCommand, + SendCommandToWorkersInParallel(ALL_SHARD_NODES, createSchemasCommand, CitusExtensionOwnerName()); return jobIds; } @@ -191,7 +191,7 @@ GenerateJobCommands(List *jobIds, char *templateCommand) void DoRepartitionCleanup(List *jobIds) { - SendCommandToWorkersOptionalInParallel(ALL_DATA_NODES, GenerateDeleteJobsCommand( + SendCommandToWorkersOptionalInParallel(ALL_SHARD_NODES, GenerateDeleteJobsCommand( jobIds), CitusExtensionOwnerName()); } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 31a8e9073..bf65d1e04 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -131,7 +131,7 @@ List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { List *workerNodeList = NIL; - if (targetWorkerSet == ALL_DATA_NODES) + if (targetWorkerSet == ALL_SHARD_NODES) { workerNodeList = ActivePrimaryNodeList(lockMode); } diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 21aa47e34..3322596af 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -18,14 +18,13 @@ /* * TargetWorkerSet is used for determining the type of workers that a command - * is targeted to. Currently it doesn't include coordinator even if it is added - * as a worker. + * is targeted to. */ typedef enum TargetWorkerSet { NON_COORDINATOR_METADATA_NODES, NON_COORDINATOR_NODES, - ALL_DATA_NODES + ALL_SHARD_NODES } TargetWorkerSet; diff --git a/src/test/regress/expected/follower_single_node.out b/src/test/regress/expected/follower_single_node.out index 9c5ae8a1b..a4cd51561 100644 --- a/src/test/regress/expected/follower_single_node.out +++ b/src/test/regress/expected/follower_single_node.out @@ -44,6 +44,44 @@ SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; 5 | 6 | 4 | 5 (3 rows) +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +RESET citus.enable_single_hash_repartition_joins; +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + RESET citus.enable_repartition_joins; -- connect to the follower and check that a simple select query works, the follower -- is still in the default cluster and will send queries to the primary nodes @@ -113,10 +151,29 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; 7 | 8 | 7 | 8 (2 rows) -SET citus.enable_repartition_joins TO ON; +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; ERROR: writing to worker nodes is not currently allowed -DETAIL: the database is in recovery mode +DETAIL: the database is read-only +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; count @@ -142,6 +199,30 @@ SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); --------------------------------------------------------------------- (0 rows) +SET citus.task_assignment_policy TO 'greedy'; +SELECT count(*) FROM test WHERE false; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); + count +--------------------------------------------------------------------- +(0 rows) + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT count(*) FROM test WHERE false; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); + count +--------------------------------------------------------------------- +(0 rows) + RESET citus.task_assignment_policy; -- now, connect to the follower but tell it to use secondary nodes. There are no -- secondary nodes so this should fail. @@ -232,10 +313,16 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; 7 | 8 | 7 | 8 (2 rows) -SET citus.enable_repartition_joins TO ON; +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; ERROR: writing to worker nodes is not currently allowed -DETAIL: the database is in recovery mode +DETAIL: the database is read-only +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; count diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index cb50b4917..e71c3ad79 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -102,9 +102,6 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; (2 rows) -- Check repartion joins are supported -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -ERROR: the query contains a join that requires repartitioning -HINT: Set citus.enable_repartition_joins to on to enable repartitioning SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; x | y | x | y @@ -114,7 +111,45 @@ SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; 5 | 6 | 4 | 5 (3 rows) +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + x | y | x | y +--------------------------------------------------------------------- + 2 | 7 | 1 | 2 + 4 | 5 | 3 | 4 + 5 | 6 | 4 | 5 +(3 rows) + RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- INSERT SELECT router BEGIN; INSERT INTO test(x, y) SELECT x, y FROM test WHERE x = 1; diff --git a/src/test/regress/sql/follower_single_node.sql b/src/test/regress/sql/follower_single_node.sql index 91aa27abe..9d0d38db1 100644 --- a/src/test/regress/sql/follower_single_node.sql +++ b/src/test/regress/sql/follower_single_node.sql @@ -21,8 +21,24 @@ INSERT INTO local VALUES (1, 2), (3, 4), (7, 8); -- Check repartion joins are supported SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +RESET citus.enable_single_hash_repartition_joins; + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + RESET citus.enable_repartition_joins; + + -- connect to the follower and check that a simple select query works, the follower -- is still in the default cluster and will send queries to the primary nodes \c - - - :follower_master_port @@ -41,6 +57,21 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; @@ -49,6 +80,12 @@ SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); SET citus.task_assignment_policy TO 'round-robin'; SELECT count(*) FROM test WHERE false; SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); +SET citus.task_assignment_policy TO 'greedy'; +SELECT count(*) FROM test WHERE false; +SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); +SET citus.task_assignment_policy TO 'first-replica'; +SELECT count(*) FROM test WHERE false; +SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); RESET citus.task_assignment_policy; @@ -86,6 +123,10 @@ SELECT * FROM ref, local WHERE a = c ORDER BY a; SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 3c2fb35ab..e11701ef5 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -34,10 +34,23 @@ SELECT * FROM local ORDER BY c; SELECT * FROM ref, local WHERE a = c ORDER BY a; -- Check repartion joins are supported -SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'greedy'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + +SET citus.task_assignment_policy TO 'first-replica'; +SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; + RESET citus.enable_repartition_joins; +RESET citus.enable_single_hash_repartition_joins; -- INSERT SELECT router BEGIN; From 1b5ed45a58ec350d1be9635e46619fa6a5af17f7 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Mon, 13 Jul 2020 19:37:17 +0300 Subject: [PATCH 5/5] add multi follower repartition tests --- .../multi_follower_select_statements.out | 23 +++++++++++++++++++ .../sql/multi_follower_select_statements.sql | 15 ++++++++++++ 2 files changed, 38 insertions(+) diff --git a/src/test/regress/expected/multi_follower_select_statements.out b/src/test/regress/expected/multi_follower_select_statements.out index 3f4340d61..73ce25895 100644 --- a/src/test/regress/expected/multi_follower_select_statements.out +++ b/src/test/regress/expected/multi_follower_select_statements.out @@ -33,6 +33,21 @@ SELECT create_distributed_table('stock','s_w_id'); (1 row) INSERT INTO stock SELECT c, c, c FROM generate_series(1, 5) as c; +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM the_table t1 JOIN the_table t2 USING(b); + count +--------------------------------------------------------------------- + 2 +(1 row) + +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT count(*) FROM the_table t1 , the_table t2 WHERE t1.a = t2.b; + count +--------------------------------------------------------------------- + 2 +(1 row) + +RESET citus.enable_repartition_joins; -- connect to the follower and check that a simple select query works, the follower -- is still in the default cluster and will send queries to the primary nodes \c - - - :follower_master_port @@ -100,6 +115,14 @@ order by s_i_id; 5 | 5 (3 rows) +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM the_table t1 JOIN the_table t2 USING(b); +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only +SET citus.enable_single_hash_repartition_joins TO ON; +SELECT count(*) FROM the_table t1 , the_table t2 WHERE t1.a = t2.b; +ERROR: writing to worker nodes is not currently allowed +DETAIL: the database is read-only SELECT node_name, node_port FROM diff --git a/src/test/regress/sql/multi_follower_select_statements.sql b/src/test/regress/sql/multi_follower_select_statements.sql index 9fa8dc8ba..436b08cda 100644 --- a/src/test/regress/sql/multi_follower_select_statements.sql +++ b/src/test/regress/sql/multi_follower_select_statements.sql @@ -21,6 +21,14 @@ SELECT create_distributed_table('stock','s_w_id'); INSERT INTO stock SELECT c, c, c FROM generate_series(1, 5) as c; +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM the_table t1 JOIN the_table t2 USING(b); + +SET citus.enable_single_hash_repartition_joins TO ON; + +SELECT count(*) FROM the_table t1 , the_table t2 WHERE t1.a = t2.b; +RESET citus.enable_repartition_joins; + -- connect to the follower and check that a simple select query works, the follower -- is still in the default cluster and will send queries to the primary nodes @@ -66,6 +74,13 @@ group by s_i_id having sum(s_order_cnt) > (select max(s_order_cnt) - 3 as having_query from stock) order by s_i_id; +SET citus.enable_repartition_joins TO ON; +SELECT count(*) FROM the_table t1 JOIN the_table t2 USING(b); + +SET citus.enable_single_hash_repartition_joins TO ON; + +SELECT count(*) FROM the_table t1 , the_table t2 WHERE t1.a = t2.b; + SELECT node_name, node_port