From 41ec76a6ade9da3a79e368f7768986c2eb097e12 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Thu, 9 Jul 2020 15:32:11 +0300 Subject: [PATCH] use ActiveReadableNodeList in JobExecutorType and task tracker The reason we should use ActiveReadableNodeList instead of ActiveReadableNonCoordinatorNodeList is that if coordinator is added to cluster as a worker, it should be counted as well. Otherwise if there is only coordinator in the cluster, the count will be 0, hence we get a warning. In MultiTaskTrackerExecute, we should connect to coordinator if it is added to the cluster because it will also be assigned tasks. --- .../executor/multi_server_executor.c | 3 +- .../executor/multi_task_tracker_executor.c | 2 +- .../operations/worker_node_manager.c | 1 + src/test/regress/expected/single_node.out | 45 +++++++++++++++++++ src/test/regress/sql/follower_single_node.sql | 4 +- src/test/regress/sql/single_node.sql | 29 ++++++++++++ 6 files changed, 79 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 3ca1a2e40..6bbdc02de 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -118,8 +118,7 @@ JobExecutorType(DistributedPlan *distributedPlan) } else { - List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); - int workerNodeCount = list_length(workerNodeList); + int workerNodeCount = list_length(ActiveReadableNodeList()); int taskCount = list_length(job->taskList); double tasksPerNode = taskCount / ((double) workerNodeCount); diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index d27fd2a9d..e804b0d8d 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -209,7 +209,7 @@ MultiTaskTrackerExecute(Job *job) * assigning and checking the status of tasks. The second (temporary) hash * helps us in fetching results data from worker nodes to the master node. */ - List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); + List *workerNodeList = ActiveReadableNodeList(); uint32 taskTrackerCount = (uint32) list_length(workerNodeList); /* connect as the current user for running queries */ diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 8cd99adda..565215f15 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -319,6 +319,7 @@ ActiveReadableNonCoordinatorNodeCount(void) return liveWorkerCount; } + /* * NodeIsCoordinator returns true if the given node represents the coordinator. */ diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index a52699926..cb50b4917 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -231,6 +231,51 @@ SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); (0 rows) RESET citus.task_assignment_policy; +-- single node task tracker tests: +SET citus.task_executor_type to 'task-tracker'; +SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 5 +(1 row) + +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO ref(a, b) SELECT x, y FROM test; +SELECT count(*) from ref; +ERROR: cannot switch local execution status from local execution required to local execution disabled since it can cause visibility problems in the current transaction +ROLLBACK; +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO ref(a, b) SELECT c, d FROM local; +SELECT count(*) from ref; + count +--------------------------------------------------------------------- + 6 +(1 row) + +ROLLBACK; +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO local(c, d) SELECT x, y FROM test; +SELECT count(*) from local; + count +--------------------------------------------------------------------- + 8 +(1 row) + +ROLLBACK; +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO local(c, d) SELECT a, b FROM ref; +SELECT count(*) from local; + count +--------------------------------------------------------------------- + 6 +(1 row) + +ROLLBACK; +RESET citus.task_executor_type; -- Cleanup SET client_min_messages TO WARNING; DROP SCHEMA single_node CASCADE; diff --git a/src/test/regress/sql/follower_single_node.sql b/src/test/regress/sql/follower_single_node.sql index 64467ddfb..91aa27abe 100644 --- a/src/test/regress/sql/follower_single_node.sql +++ b/src/test/regress/sql/follower_single_node.sql @@ -39,7 +39,7 @@ SELECT count(*) FROM local; SELECT * FROM local ORDER BY c; SELECT * FROM ref, local WHERE a = c ORDER BY a; -SET citus.enable_repartition_joins TO ON; +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -- Confirm that dummy placements work @@ -84,7 +84,7 @@ SELECT count(*) FROM local; SELECT * FROM local ORDER BY c; SELECT * FROM ref, local WHERE a = c ORDER BY a; -SET citus.enable_repartition_joins TO ON; +SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; -- Confirm that dummy placements work diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 340f89882..3c2fb35ab 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -102,6 +102,35 @@ SELECT count(*) FROM test WHERE false; SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); RESET citus.task_assignment_policy; +-- single node task tracker tests: +SET citus.task_executor_type to 'task-tracker'; +SELECT count(*) FROM test; + +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO ref(a, b) SELECT x, y FROM test; +SELECT count(*) from ref; +ROLLBACK; + +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO ref(a, b) SELECT c, d FROM local; +SELECT count(*) from ref; +ROLLBACK; + +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO local(c, d) SELECT x, y FROM test; +SELECT count(*) from local; +ROLLBACK; + +-- INSERT SELECT from distributed table to local table +BEGIN; +INSERT INTO local(c, d) SELECT a, b FROM ref; +SELECT count(*) from local; +ROLLBACK; + +RESET citus.task_executor_type; -- Cleanup SET client_min_messages TO WARNING;