diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index a2f41465b..b9a8bbc97 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -174,7 +174,6 @@ static bool HasMergeTaskDependencies(List *sqlTaskList); static List * GreedyAssignTaskList(List *taskList); static Task * GreedyAssignTask(WorkerNode *workerNode, List *taskList, List *activeShardPlacementLists); -static List * RoundRobinReorder(Task *task, List *placementList); static List * ReorderAndAssignTaskList(List *taskList, List * (*reorderFunction)(Task *, List *)); static int CompareTasksByShardId(const void *leftElement, const void *rightElement); @@ -5088,7 +5087,7 @@ RoundRobinAssignTaskList(List *taskList) * Citus generates since the distributed transactionId is generated during the execution * where as task-assignment happens duing the planning. */ -static List * +List * RoundRobinReorder(Task *task, List *placementList) { TransactionId transactionId = GetMyProcLocalTransactionId(); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 02d14c58b..bcdc6dff2 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -155,8 +155,10 @@ static bool RowLocksOnRelations(Node *node, List **rtiLockList); static List * SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId); -static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType - taskAssignmentPolicy); +static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, + TaskAssignmentPolicyType + taskAssignmentPolicy, + List *placementList); /* @@ -1640,7 +1642,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon */ if (shardId != INVALID_SHARD_ID) { - ReorderTaskPlacementsByTaskAssignmentPolicy(job, TaskAssignmentPolicy); + ReorderTaskPlacementsByTaskAssignmentPolicy(job, TaskAssignmentPolicy, + placementList); } } else if (isMultiShardModifyQuery) @@ -1673,12 +1676,31 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon * By default it does not reorder the task list, implying a first-replica strategy. */ static void -ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType - taskAssignmentPolicy) +ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, + TaskAssignmentPolicyType taskAssignmentPolicy, + List *placementList) { if (taskAssignmentPolicy == TASK_ASSIGNMENT_ROUND_ROBIN) { - job->taskList = RoundRobinAssignTaskList(job->taskList); + Task *task = NULL; + List *reorderedPlacementList = NIL; + ShardPlacement *primaryPlacement = NULL; + + /* + * We hit a single shard on router plans, and there should be only + * one task in the task list + */ + Assert(list_length(job->taskList) == 1); + task = (Task *) linitial(job->taskList); + + /* reorder the placement list */ + reorderedPlacementList = RoundRobinReorder(task, placementList); + task->taskPlacementList = reorderedPlacementList; + + primaryPlacement = (ShardPlacement *) linitial(reorderedPlacementList); + ereport(DEBUG3, (errmsg("assigned task %u to node %s:%u", task->taskId, + primaryPlacement->nodeName, + primaryPlacement->nodePort))); } } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index c4d662d5f..6fbab4915 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -351,6 +351,7 @@ extern List * TaskListDifference(const List *list1, const List *list2); extern List * AssignAnchorShardTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList); extern List * RoundRobinAssignTaskList(List *taskList); +extern List * RoundRobinReorder(Task *task, List *placementList); extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement); /* function declaration for creating Task */