diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index a2f41465b..b9a8bbc97 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -174,7 +174,6 @@ static bool HasMergeTaskDependencies(List *sqlTaskList); static List * GreedyAssignTaskList(List *taskList); static Task * GreedyAssignTask(WorkerNode *workerNode, List *taskList, List *activeShardPlacementLists); -static List * RoundRobinReorder(Task *task, List *placementList); static List * ReorderAndAssignTaskList(List *taskList, List * (*reorderFunction)(Task *, List *)); static int CompareTasksByShardId(const void *leftElement, const void *rightElement); @@ -5088,7 +5087,7 @@ RoundRobinAssignTaskList(List *taskList) * Citus generates since the distributed transactionId is generated during the execution * where as task-assignment happens duing the planning. */ -static List * +List * RoundRobinReorder(Task *task, List *placementList) { TransactionId transactionId = GetMyProcLocalTransactionId(); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 02d14c58b..bcdc6dff2 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -155,8 +155,10 @@ static bool RowLocksOnRelations(Node *node, List **rtiLockList); static List * SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId); -static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType - taskAssignmentPolicy); +static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, + TaskAssignmentPolicyType + taskAssignmentPolicy, + List *placementList); /* @@ -1640,7 +1642,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon */ if (shardId != INVALID_SHARD_ID) { - ReorderTaskPlacementsByTaskAssignmentPolicy(job, TaskAssignmentPolicy); + ReorderTaskPlacementsByTaskAssignmentPolicy(job, TaskAssignmentPolicy, + placementList); } } else if (isMultiShardModifyQuery) @@ -1673,12 +1676,31 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon * By default it does not reorder the task list, implying a first-replica strategy. */ static void -ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType - taskAssignmentPolicy) +ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, + TaskAssignmentPolicyType taskAssignmentPolicy, + List *placementList) { if (taskAssignmentPolicy == TASK_ASSIGNMENT_ROUND_ROBIN) { - job->taskList = RoundRobinAssignTaskList(job->taskList); + Task *task = NULL; + List *reorderedPlacementList = NIL; + ShardPlacement *primaryPlacement = NULL; + + /* + * We hit a single shard on router plans, and there should be only + * one task in the task list + */ + Assert(list_length(job->taskList) == 1); + task = (Task *) linitial(job->taskList); + + /* reorder the placement list */ + reorderedPlacementList = RoundRobinReorder(task, placementList); + task->taskPlacementList = reorderedPlacementList; + + primaryPlacement = (ShardPlacement *) linitial(reorderedPlacementList); + ereport(DEBUG3, (errmsg("assigned task %u to node %s:%u", task->taskId, + primaryPlacement->nodeName, + primaryPlacement->nodePort))); } } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 68831294a..6fbab4915 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -351,7 +351,7 @@ extern List * TaskListDifference(const List *list1, const List *list2); extern List * AssignAnchorShardTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList); extern List * RoundRobinAssignTaskList(List *taskList); -extern List * RoundRobinPerTransactionAssignTaskList(List *taskList); +extern List * RoundRobinReorder(Task *task, List *placementList); extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement); /* function declaration for creating Task */ diff --git a/src/test/regress/expected/multi_task_assignment_policy.out b/src/test/regress/expected/multi_task_assignment_policy.out index 70bae799b..6f8f841ea 100644 --- a/src/test/regress/expected/multi_task_assignment_policy.out +++ b/src/test/regress/expected/multi_task_assignment_policy.out @@ -19,17 +19,16 @@ DECLARE shardOfTheTask text; begin for r in execute qry loop - IF r LIKE '%port%' THEN - portOfTheTask = substring(r, '([0-9]{1,10})'); + IF r LIKE '%port%' THEN + portOfTheTask = substring(r, '([0-9]{1,10})'); END IF; IF r LIKE '%' || table_name || '%' THEN - shardOfTheTask = substring(r, '([0-9]{1,10})'); - return QUERY SELECT shardOfTheTask || '@' || portOfTheTask ; + shardOfTheTask = substring(r, '([0-9]{5,10})'); END IF; end loop; - return; + return QUERY SELECT shardOfTheTask || '@' || portOfTheTask; end; $$ language plpgsql; SET citus.explain_distributed_queries TO off; -- Check that our policies for assigning tasks to worker nodes run as expected. @@ -300,11 +299,8 @@ SELECT count(DISTINCT value) FROM explain_outputs; (1 row) TRUNCATE explain_outputs; -RESET citus.task_assignment_policy; -RESET client_min_messages; --- we should be able to use round-robin with router queries that +-- We should be able to use round-robin with router queries that -- only contains intermediate results -BEGIN; CREATE TABLE task_assignment_test_table_2 (test_id integer); SELECT create_distributed_table('task_assignment_test_table_2', 'test_id'); create_distributed_table @@ -312,17 +308,26 @@ SELECT create_distributed_table('task_assignment_test_table_2', 'test_id'); (1 row) -WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; - test_id ---------- -(0 rows) +SET citus.task_assignment_policy TO 'round-robin'; +-- Run the query two times to make sure that it hits two different workers +-- on consecutive runs +INSERT INTO explain_outputs +SELECT parse_explain_output($cmd$ +EXPLAIN WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1 +$cmd$, 'task_assignment_test_table_2'); +INSERT INTO explain_outputs +SELECT parse_explain_output($cmd$ +EXPLAIN WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1 +$cmd$, 'task_assignment_test_table_2'); +-- The count should be 2 since the intermediate results are processed on +-- different workers +SELECT count(DISTINCT value) FROM explain_outputs; + count +------- + 2 +(1 row) -SET LOCAL citus.task_assignment_policy TO 'round-robin'; -WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; - test_id ---------- -(0 rows) - -ROLLBACK; +RESET citus.task_assignment_policy; +RESET client_min_messages; DROP TABLE task_assignment_replicated_hash, task_assignment_nonreplicated_hash, - task_assignment_reference_table, explain_outputs; + task_assignment_reference_table, task_assignment_test_table_2, explain_outputs; diff --git a/src/test/regress/sql/multi_task_assignment_policy.sql b/src/test/regress/sql/multi_task_assignment_policy.sql index 67341b8cc..95a57f204 100644 --- a/src/test/regress/sql/multi_task_assignment_policy.sql +++ b/src/test/regress/sql/multi_task_assignment_policy.sql @@ -18,17 +18,16 @@ DECLARE shardOfTheTask text; begin for r in execute qry loop - IF r LIKE '%port%' THEN - portOfTheTask = substring(r, '([0-9]{1,10})'); + IF r LIKE '%port%' THEN + portOfTheTask = substring(r, '([0-9]{1,10})'); END IF; IF r LIKE '%' || table_name || '%' THEN - shardOfTheTask = substring(r, '([0-9]{1,10})'); - return QUERY SELECT shardOfTheTask || '@' || portOfTheTask ; + shardOfTheTask = substring(r, '([0-9]{5,10})'); END IF; end loop; - return; + return QUERY SELECT shardOfTheTask || '@' || portOfTheTask; end; $$ language plpgsql; @@ -236,19 +235,31 @@ $cmd$, 'task_assignment_nonreplicated_hash'); SELECT count(DISTINCT value) FROM explain_outputs; TRUNCATE explain_outputs; -RESET citus.task_assignment_policy; -RESET client_min_messages; - --- we should be able to use round-robin with router queries that +-- We should be able to use round-robin with router queries that -- only contains intermediate results -BEGIN; CREATE TABLE task_assignment_test_table_2 (test_id integer); SELECT create_distributed_table('task_assignment_test_table_2', 'test_id'); -WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; -SET LOCAL citus.task_assignment_policy TO 'round-robin'; -WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; -ROLLBACK; +SET citus.task_assignment_policy TO 'round-robin'; + +-- Run the query two times to make sure that it hits two different workers +-- on consecutive runs +INSERT INTO explain_outputs +SELECT parse_explain_output($cmd$ +EXPLAIN WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1 +$cmd$, 'task_assignment_test_table_2'); + +INSERT INTO explain_outputs +SELECT parse_explain_output($cmd$ +EXPLAIN WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1 +$cmd$, 'task_assignment_test_table_2'); + +-- The count should be 2 since the intermediate results are processed on +-- different workers +SELECT count(DISTINCT value) FROM explain_outputs; + +RESET citus.task_assignment_policy; +RESET client_min_messages; DROP TABLE task_assignment_replicated_hash, task_assignment_nonreplicated_hash, - task_assignment_reference_table, explain_outputs; + task_assignment_reference_table, task_assignment_test_table_2, explain_outputs;