diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 35766de01..b1d8d2e64 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -114,7 +114,6 @@ static AttrNumber NewColumnId(Index originalTableId, AttrNumber originalColumnId static Job * JobForRangeTable(List *jobList, RangeTblEntry *rangeTableEntry); static Job * JobForTableIdList(List *jobList, List *searchedTableIdList); static List * ChildNodeList(MultiNode *multiNode); -static uint64 UniqueJobId(void); static Job * BuildJob(Query *jobQuery, List *dependedJobList); static MapMergeJob * BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey, PartitionType partitionType, @@ -172,7 +171,6 @@ static bool HasMergeTaskDependencies(List *sqlTaskList); static List * GreedyAssignTaskList(List *taskList); static Task * GreedyAssignTask(WorkerNode *workerNode, List *taskList, List *activeShardPlacementLists); -static List * RoundRobinAssignTaskList(List *taskList); static List * RoundRobinReorder(Task *task, List *placementList); static List * ReorderAndAssignTaskList(List *taskList, List * (*reorderFunction)(Task *, List *)); @@ -1757,7 +1755,7 @@ ChildNodeList(MultiNode *multiNode) * When citus.enable_unique_job_ids is off then only the local counter is * included to get repeatable results. */ -static uint64 +uint64 UniqueJobId(void) { static uint32 jobIdCounter = 0; @@ -5099,7 +5097,7 @@ FirstReplicaAssignTaskList(List *taskList) * by the number of active shard placements, and ensure that we rotate between * these placements across subsequent queries. */ -static List * +List * RoundRobinAssignTaskList(List *taskList) { taskList = ReorderAndAssignTaskList(taskList, RoundRobinReorder); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index a48d02fcf..bf5ded72e 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -148,11 +148,15 @@ static List * get_all_actual_clauses(List *restrictinfo_list); static int CompareInsertValuesByShardId(const void *leftElement, const void *rightElement); static uint64 GetInitialShardId(List *relationShardList); -static List * SingleShardSelectTaskList(Query *query, List *relationShardList, - List *placementList, uint64 shardId); +static List * SingleShardSelectTaskList(Query *query, uint64 jobId, + List *relationShardList, List *placementList, + uint64 shardId); static bool RowLocksOnRelations(Node *node, List **rtiLockList); -static List * SingleShardModifyTaskList(Query *query, List *relationShardList, - List *placementList, uint64 shardId); +static List * SingleShardModifyTaskList(Query *query, uint64 jobId, + List *relationShardList, List *placementList, + uint64 shardId); +static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType + taskAssignmentPolicy); /* @@ -1388,7 +1392,7 @@ CreateJob(Query *query) Job *job = NULL; job = CitusMakeNode(Job); - job->jobId = INVALID_JOB_ID; + job->jobId = UniqueJobId(); job->jobQuery = query; job->taskList = NIL; job->dependedJobList = NIL; @@ -1625,12 +1629,22 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon if (originalQuery->commandType == CMD_SELECT) { - job->taskList = SingleShardSelectTaskList(originalQuery, relationShardList, - placementList, shardId); + job->taskList = SingleShardSelectTaskList(originalQuery, job->jobId, + relationShardList, placementList, + shardId); + + /* + * Queries to reference tables, or distributed tables with multiple replica's have + * their task placements reordered according to the configured + * task_assignment_policy. This is only applicable to select queries as the modify + * queries will be reordered to _always_ use the first-replica policy during + * execution. + */ + ReorderTaskPlacementsByTaskAssignmentPolicy(job, TaskAssignmentPolicy); } else if (isMultiShardModifyQuery) { - job->taskList = QueryPushdownSqlTaskList(originalQuery, 0, + job->taskList = QueryPushdownSqlTaskList(originalQuery, job->jobId, plannerRestrictionContext-> relationRestrictionContext, relationShardList, MODIFY_TASK, @@ -1638,8 +1652,9 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon } else { - job->taskList = SingleShardModifyTaskList(originalQuery, relationShardList, - placementList, shardId); + job->taskList = SingleShardModifyTaskList(originalQuery, job->jobId, + relationShardList, placementList, + shardId); } job->requiresMasterEvaluation = requiresMasterEvaluation; @@ -1647,12 +1662,33 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon } +/* + * ReorderTaskPlacementsByTaskAssignmentPolicy applies selective reordering for supported + * TaskAssignmentPolicyTypes. + * + * Supported Types + * - TASK_ASSIGNMENT_ROUND_ROBIN round robin schedule queries among placements + * + * By default it does not reorder the task list, implying a first-replica strategy. + */ +static void +ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType + taskAssignmentPolicy) +{ + if (taskAssignmentPolicy == TASK_ASSIGNMENT_ROUND_ROBIN) + { + job->taskList = RoundRobinAssignTaskList(job->taskList); + } +} + + /* * SingleShardSelectTaskList generates a task for single shard select query * and returns it as a list. */ static List * -SingleShardSelectTaskList(Query *query, List *relationShardList, List *placementList, +SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList, + List *placementList, uint64 shardId) { Task *task = CreateTask(ROUTER_TASK); @@ -1664,6 +1700,7 @@ SingleShardSelectTaskList(Query *query, List *relationShardList, List *placement task->queryString = queryString->data; task->anchorShardId = shardId; + task->jobId = jobId; task->taskPlacementList = placementList; task->relationShardList = relationShardList; task->relationRowLockList = relationRowLockList; @@ -1718,8 +1755,8 @@ RowLocksOnRelations(Node *node, List **relationRowLockList) * and returns it as a list. */ static List * -SingleShardModifyTaskList(Query *query, List *relationShardList, List *placementList, - uint64 shardId) +SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, + List *placementList, uint64 shardId) { Task *task = CreateTask(MODIFY_TASK); StringInfo queryString = makeStringInfo(); @@ -1746,6 +1783,7 @@ SingleShardModifyTaskList(Query *query, List *relationShardList, List *placement task->queryString = queryString->data; task->anchorShardId = shardId; + task->jobId = jobId; task->taskPlacementList = placementList; task->relationShardList = relationShardList; task->replicationModel = modificationTableCacheEntry->replicationModel; diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index e8e091209..23b636e20 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -340,6 +340,7 @@ extern bool TaskListMember(const List *taskList, const Task *task); extern List * TaskListDifference(const List *list1, const List *list2); extern List * AssignAnchorShardTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList); +extern List * RoundRobinAssignTaskList(List *taskList); /* function declaration for creating Task */ extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId, @@ -348,5 +349,8 @@ extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId, List *prunedRelationShardList, TaskType taskType, bool modifyRequiresMasterEvaluation); +/* function declarations for managing jobs */ +extern uint64 UniqueJobId(void); + #endif /* MULTI_PHYSICAL_PLANNER_H */ diff --git a/src/test/regress/expected/multi_task_assignment_policy.out b/src/test/regress/expected/multi_task_assignment_policy.out index a73267bc6..8879ae02a 100644 --- a/src/test/regress/expected/multi_task_assignment_policy.out +++ b/src/test/regress/expected/multi_task_assignment_policy.out @@ -140,3 +140,75 @@ DEBUG: assigned task 1 to node localhost:57637 RESET citus.task_assignment_policy; RESET client_min_messages; COMMIT; +BEGIN; +SET LOCAL client_min_messages TO DEBUG3; +SET LOCAL citus.explain_distributed_queries TO off; +-- Check how task_assignment_policy impact planning decisions for reference tables +CREATE TABLE task_assignment_reference_table (test_id integer); +SELECT create_reference_table('task_assignment_reference_table'); + create_reference_table +------------------------ + +(1 row) + +SET LOCAL citus.task_assignment_policy TO 'greedy'; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: Creating router plan +DEBUG: Plan is router executable + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: Creating router plan +DEBUG: Plan is router executable + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +SET LOCAL citus.task_assignment_policy TO 'first-replica'; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: Creating router plan +DEBUG: Plan is router executable + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: Creating router plan +DEBUG: Plan is router executable + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +-- here we expect debug output showing two different hosts for subsequent queries +SET LOCAL citus.task_assignment_policy TO 'round-robin'; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: assigned task 0 to node localhost:57637 +DEBUG: Creating router plan +DEBUG: Plan is router executable + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: assigned task 0 to node localhost:57638 +DEBUG: Creating router plan +DEBUG: Plan is router executable + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +ROLLBACK; diff --git a/src/test/regress/expected/multi_task_assignment_policy_0.out b/src/test/regress/expected/multi_task_assignment_policy_0.out index 36b52ed8c..3aeb624e2 100644 --- a/src/test/regress/expected/multi_task_assignment_policy_0.out +++ b/src/test/regress/expected/multi_task_assignment_policy_0.out @@ -176,3 +176,114 @@ RESET client_min_messages; DEBUG: StartTransactionCommand DEBUG: ProcessUtility COMMIT; +BEGIN; +SET LOCAL client_min_messages TO DEBUG3; +DEBUG: CommitTransactionCommand +SET LOCAL citus.explain_distributed_queries TO off; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +-- Check how task_assignment_policy impact planning decisions for reference tables +CREATE TABLE task_assignment_reference_table (test_id integer); +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +SELECT create_reference_table('task_assignment_reference_table'); +DEBUG: StartTransactionCommand +DEBUG: CommitTransactionCommand + create_reference_table +------------------------ + +(1 row) + +SET LOCAL citus.task_assignment_policy TO 'greedy'; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +SET LOCAL citus.task_assignment_policy TO 'first-replica'; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +-- here we expect debug output showing two different hosts for subsequent queries +SET LOCAL citus.task_assignment_policy TO 'round-robin'; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: assigned task 0 to node localhost:57637 +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: assigned task 0 to node localhost:57638 +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +ROLLBACK; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand diff --git a/src/test/regress/sql/multi_task_assignment_policy.sql b/src/test/regress/sql/multi_task_assignment_policy.sql index 341a4bb76..5dce8c727 100644 --- a/src/test/regress/sql/multi_task_assignment_policy.sql +++ b/src/test/regress/sql/multi_task_assignment_policy.sql @@ -94,3 +94,28 @@ RESET citus.task_assignment_policy; RESET client_min_messages; COMMIT; + +BEGIN; + +SET LOCAL client_min_messages TO DEBUG3; +SET LOCAL citus.explain_distributed_queries TO off; + +-- Check how task_assignment_policy impact planning decisions for reference tables + +CREATE TABLE task_assignment_reference_table (test_id integer); +SELECT create_reference_table('task_assignment_reference_table'); + +SET LOCAL citus.task_assignment_policy TO 'greedy'; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; + +SET LOCAL citus.task_assignment_policy TO 'first-replica'; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; + +-- here we expect debug output showing two different hosts for subsequent queries +SET LOCAL citus.task_assignment_policy TO 'round-robin'; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; + +ROLLBACK; \ No newline at end of file