use ActiveReadableNodeList in JobExecutorType and task tracker

The reason we should use ActiveReadableNodeList instead of ActiveReadableNonCoordinatorNodeList is that if coordinator is added to cluster as a worker, it should be counted as well. Otherwise if there is only coordinator in the cluster, the count will be 0, hence we get a warning.

In MultiTaskTrackerExecute, we should connect to coordinator if it is
added to the cluster because it will also be assigned tasks.
pull/4005/head
Sait Talha Nisanci 2020-07-09 15:32:11 +03:00
parent d97d03ec65
commit 41ec76a6ad
6 changed files with 79 additions and 5 deletions

View File

@ -118,8 +118,7 @@ JobExecutorType(DistributedPlan *distributedPlan)
}
else
{
List *workerNodeList = ActiveReadableNonCoordinatorNodeList();
int workerNodeCount = list_length(workerNodeList);
int workerNodeCount = list_length(ActiveReadableNodeList());
int taskCount = list_length(job->taskList);
double tasksPerNode = taskCount / ((double) workerNodeCount);

View File

@ -209,7 +209,7 @@ MultiTaskTrackerExecute(Job *job)
* assigning and checking the status of tasks. The second (temporary) hash
* helps us in fetching results data from worker nodes to the master node.
*/
List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
List *workerNodeList = ActiveReadableNodeList();
uint32 taskTrackerCount = (uint32) list_length(workerNodeList);
/* connect as the current user for running queries */

View File

@ -319,6 +319,7 @@ ActiveReadableNonCoordinatorNodeCount(void)
return liveWorkerCount;
}
/*
* NodeIsCoordinator returns true if the given node represents the coordinator.
*/

View File

@ -231,6 +231,51 @@ SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
(0 rows)
RESET citus.task_assignment_policy;
-- single node task tracker tests:
SET citus.task_executor_type to 'task-tracker';
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
5
(1 row)
-- INSERT SELECT from distributed table to local table
BEGIN;
INSERT INTO ref(a, b) SELECT x, y FROM test;
SELECT count(*) from ref;
ERROR: cannot switch local execution status from local execution required to local execution disabled since it can cause visibility problems in the current transaction
ROLLBACK;
-- INSERT SELECT from distributed table to local table
BEGIN;
INSERT INTO ref(a, b) SELECT c, d FROM local;
SELECT count(*) from ref;
count
---------------------------------------------------------------------
6
(1 row)
ROLLBACK;
-- INSERT SELECT from distributed table to local table
BEGIN;
INSERT INTO local(c, d) SELECT x, y FROM test;
SELECT count(*) from local;
count
---------------------------------------------------------------------
8
(1 row)
ROLLBACK;
-- INSERT SELECT from distributed table to local table
BEGIN;
INSERT INTO local(c, d) SELECT a, b FROM ref;
SELECT count(*) from local;
count
---------------------------------------------------------------------
6
(1 row)
ROLLBACK;
RESET citus.task_executor_type;
-- Cleanup
SET client_min_messages TO WARNING;
DROP SCHEMA single_node CASCADE;

View File

@ -102,6 +102,35 @@ SELECT count(*) FROM test WHERE false;
SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
RESET citus.task_assignment_policy;
-- single node task tracker tests:
SET citus.task_executor_type to 'task-tracker';
SELECT count(*) FROM test;
-- INSERT SELECT from distributed table to local table
BEGIN;
INSERT INTO ref(a, b) SELECT x, y FROM test;
SELECT count(*) from ref;
ROLLBACK;
-- INSERT SELECT from distributed table to local table
BEGIN;
INSERT INTO ref(a, b) SELECT c, d FROM local;
SELECT count(*) from ref;
ROLLBACK;
-- INSERT SELECT from distributed table to local table
BEGIN;
INSERT INTO local(c, d) SELECT x, y FROM test;
SELECT count(*) from local;
ROLLBACK;
-- INSERT SELECT from distributed table to local table
BEGIN;
INSERT INTO local(c, d) SELECT a, b FROM ref;
SELECT count(*) from local;
ROLLBACK;
RESET citus.task_executor_type;
-- Cleanup
SET client_min_messages TO WARNING;