From f9520be01161909b1bd50e8a1399d6c38e74ae23 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Thu, 15 Nov 2018 15:11:15 +0100 Subject: [PATCH] Round robin queries to reference tables with task_assignment_policy set to `round-robin` (#2472) Description: Support round-robin `task_assignment_policy` for queries to reference tables. This PR allows users to query multiple placements of shards in a round robin fashion. When `citus.task_assignment_policy` is set to `'round-robin'` the planner will use a round robin scheduling feature when multiple shard placements are available. The primary use-case is spreading the load of reference table queries to all the nodes in the cluster instead of hammering only the first placement of the reference table. Since reference tables share the same path for selecting the shards with single shard queries that have multiple placements (`citus.shard_replication_factor > 1`) this setting also allows users to spread the query load on these shards. For modifying queries we do not apply a round-robin strategy. This would be negated by an extra reordering step in the executor for such queries where a `first-replica` strategy is enforced. --- .../planner/multi_physical_planner.c | 6 +- .../planner/multi_router_planner.c | 64 ++++++++-- .../distributed/multi_physical_planner.h | 4 + .../expected/multi_task_assignment_policy.out | 72 ++++++++++++ .../multi_task_assignment_policy_0.out | 111 ++++++++++++++++++ .../sql/multi_task_assignment_policy.sql | 25 ++++ 6 files changed, 265 insertions(+), 17 deletions(-) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 35766de01..b1d8d2e64 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -114,7 +114,6 @@ static AttrNumber NewColumnId(Index originalTableId, AttrNumber originalColumnId static Job * JobForRangeTable(List *jobList, RangeTblEntry *rangeTableEntry); static Job * JobForTableIdList(List *jobList, List *searchedTableIdList); static List * ChildNodeList(MultiNode *multiNode); -static uint64 UniqueJobId(void); static Job * BuildJob(Query *jobQuery, List *dependedJobList); static MapMergeJob * BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey, PartitionType partitionType, @@ -172,7 +171,6 @@ static bool HasMergeTaskDependencies(List *sqlTaskList); static List * GreedyAssignTaskList(List *taskList); static Task * GreedyAssignTask(WorkerNode *workerNode, List *taskList, List *activeShardPlacementLists); -static List * RoundRobinAssignTaskList(List *taskList); static List * RoundRobinReorder(Task *task, List *placementList); static List * ReorderAndAssignTaskList(List *taskList, List * (*reorderFunction)(Task *, List *)); @@ -1757,7 +1755,7 @@ ChildNodeList(MultiNode *multiNode) * When citus.enable_unique_job_ids is off then only the local counter is * included to get repeatable results. */ -static uint64 +uint64 UniqueJobId(void) { static uint32 jobIdCounter = 0; @@ -5099,7 +5097,7 @@ FirstReplicaAssignTaskList(List *taskList) * by the number of active shard placements, and ensure that we rotate between * these placements across subsequent queries. */ -static List * +List * RoundRobinAssignTaskList(List *taskList) { taskList = ReorderAndAssignTaskList(taskList, RoundRobinReorder); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index a48d02fcf..bf5ded72e 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -148,11 +148,15 @@ static List * get_all_actual_clauses(List *restrictinfo_list); static int CompareInsertValuesByShardId(const void *leftElement, const void *rightElement); static uint64 GetInitialShardId(List *relationShardList); -static List * SingleShardSelectTaskList(Query *query, List *relationShardList, - List *placementList, uint64 shardId); +static List * SingleShardSelectTaskList(Query *query, uint64 jobId, + List *relationShardList, List *placementList, + uint64 shardId); static bool RowLocksOnRelations(Node *node, List **rtiLockList); -static List * SingleShardModifyTaskList(Query *query, List *relationShardList, - List *placementList, uint64 shardId); +static List * SingleShardModifyTaskList(Query *query, uint64 jobId, + List *relationShardList, List *placementList, + uint64 shardId); +static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType + taskAssignmentPolicy); /* @@ -1388,7 +1392,7 @@ CreateJob(Query *query) Job *job = NULL; job = CitusMakeNode(Job); - job->jobId = INVALID_JOB_ID; + job->jobId = UniqueJobId(); job->jobQuery = query; job->taskList = NIL; job->dependedJobList = NIL; @@ -1625,12 +1629,22 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon if (originalQuery->commandType == CMD_SELECT) { - job->taskList = SingleShardSelectTaskList(originalQuery, relationShardList, - placementList, shardId); + job->taskList = SingleShardSelectTaskList(originalQuery, job->jobId, + relationShardList, placementList, + shardId); + + /* + * Queries to reference tables, or distributed tables with multiple replica's have + * their task placements reordered according to the configured + * task_assignment_policy. This is only applicable to select queries as the modify + * queries will be reordered to _always_ use the first-replica policy during + * execution. + */ + ReorderTaskPlacementsByTaskAssignmentPolicy(job, TaskAssignmentPolicy); } else if (isMultiShardModifyQuery) { - job->taskList = QueryPushdownSqlTaskList(originalQuery, 0, + job->taskList = QueryPushdownSqlTaskList(originalQuery, job->jobId, plannerRestrictionContext-> relationRestrictionContext, relationShardList, MODIFY_TASK, @@ -1638,8 +1652,9 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon } else { - job->taskList = SingleShardModifyTaskList(originalQuery, relationShardList, - placementList, shardId); + job->taskList = SingleShardModifyTaskList(originalQuery, job->jobId, + relationShardList, placementList, + shardId); } job->requiresMasterEvaluation = requiresMasterEvaluation; @@ -1647,12 +1662,33 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon } +/* + * ReorderTaskPlacementsByTaskAssignmentPolicy applies selective reordering for supported + * TaskAssignmentPolicyTypes. + * + * Supported Types + * - TASK_ASSIGNMENT_ROUND_ROBIN round robin schedule queries among placements + * + * By default it does not reorder the task list, implying a first-replica strategy. + */ +static void +ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType + taskAssignmentPolicy) +{ + if (taskAssignmentPolicy == TASK_ASSIGNMENT_ROUND_ROBIN) + { + job->taskList = RoundRobinAssignTaskList(job->taskList); + } +} + + /* * SingleShardSelectTaskList generates a task for single shard select query * and returns it as a list. */ static List * -SingleShardSelectTaskList(Query *query, List *relationShardList, List *placementList, +SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList, + List *placementList, uint64 shardId) { Task *task = CreateTask(ROUTER_TASK); @@ -1664,6 +1700,7 @@ SingleShardSelectTaskList(Query *query, List *relationShardList, List *placement task->queryString = queryString->data; task->anchorShardId = shardId; + task->jobId = jobId; task->taskPlacementList = placementList; task->relationShardList = relationShardList; task->relationRowLockList = relationRowLockList; @@ -1718,8 +1755,8 @@ RowLocksOnRelations(Node *node, List **relationRowLockList) * and returns it as a list. */ static List * -SingleShardModifyTaskList(Query *query, List *relationShardList, List *placementList, - uint64 shardId) +SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, + List *placementList, uint64 shardId) { Task *task = CreateTask(MODIFY_TASK); StringInfo queryString = makeStringInfo(); @@ -1746,6 +1783,7 @@ SingleShardModifyTaskList(Query *query, List *relationShardList, List *placement task->queryString = queryString->data; task->anchorShardId = shardId; + task->jobId = jobId; task->taskPlacementList = placementList; task->relationShardList = relationShardList; task->replicationModel = modificationTableCacheEntry->replicationModel; diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index e8e091209..23b636e20 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -340,6 +340,7 @@ extern bool TaskListMember(const List *taskList, const Task *task); extern List * TaskListDifference(const List *list1, const List *list2); extern List * AssignAnchorShardTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList); +extern List * RoundRobinAssignTaskList(List *taskList); /* function declaration for creating Task */ extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId, @@ -348,5 +349,8 @@ extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId, List *prunedRelationShardList, TaskType taskType, bool modifyRequiresMasterEvaluation); +/* function declarations for managing jobs */ +extern uint64 UniqueJobId(void); + #endif /* MULTI_PHYSICAL_PLANNER_H */ diff --git a/src/test/regress/expected/multi_task_assignment_policy.out b/src/test/regress/expected/multi_task_assignment_policy.out index a73267bc6..8879ae02a 100644 --- a/src/test/regress/expected/multi_task_assignment_policy.out +++ b/src/test/regress/expected/multi_task_assignment_policy.out @@ -140,3 +140,75 @@ DEBUG: assigned task 1 to node localhost:57637 RESET citus.task_assignment_policy; RESET client_min_messages; COMMIT; +BEGIN; +SET LOCAL client_min_messages TO DEBUG3; +SET LOCAL citus.explain_distributed_queries TO off; +-- Check how task_assignment_policy impact planning decisions for reference tables +CREATE TABLE task_assignment_reference_table (test_id integer); +SELECT create_reference_table('task_assignment_reference_table'); + create_reference_table +------------------------ + +(1 row) + +SET LOCAL citus.task_assignment_policy TO 'greedy'; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: Creating router plan +DEBUG: Plan is router executable + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: Creating router plan +DEBUG: Plan is router executable + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +SET LOCAL citus.task_assignment_policy TO 'first-replica'; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: Creating router plan +DEBUG: Plan is router executable + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: Creating router plan +DEBUG: Plan is router executable + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +-- here we expect debug output showing two different hosts for subsequent queries +SET LOCAL citus.task_assignment_policy TO 'round-robin'; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: assigned task 0 to node localhost:57637 +DEBUG: Creating router plan +DEBUG: Plan is router executable + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: assigned task 0 to node localhost:57638 +DEBUG: Creating router plan +DEBUG: Plan is router executable + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +ROLLBACK; diff --git a/src/test/regress/expected/multi_task_assignment_policy_0.out b/src/test/regress/expected/multi_task_assignment_policy_0.out index 36b52ed8c..3aeb624e2 100644 --- a/src/test/regress/expected/multi_task_assignment_policy_0.out +++ b/src/test/regress/expected/multi_task_assignment_policy_0.out @@ -176,3 +176,114 @@ RESET client_min_messages; DEBUG: StartTransactionCommand DEBUG: ProcessUtility COMMIT; +BEGIN; +SET LOCAL client_min_messages TO DEBUG3; +DEBUG: CommitTransactionCommand +SET LOCAL citus.explain_distributed_queries TO off; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +-- Check how task_assignment_policy impact planning decisions for reference tables +CREATE TABLE task_assignment_reference_table (test_id integer); +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +SELECT create_reference_table('task_assignment_reference_table'); +DEBUG: StartTransactionCommand +DEBUG: CommitTransactionCommand + create_reference_table +------------------------ + +(1 row) + +SET LOCAL citus.task_assignment_policy TO 'greedy'; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +SET LOCAL citus.task_assignment_policy TO 'first-replica'; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +-- here we expect debug output showing two different hosts for subsequent queries +SET LOCAL citus.task_assignment_policy TO 'round-robin'; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: assigned task 0 to node localhost:57637 +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: assigned task 0 to node localhost:57638 +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: CommitTransactionCommand + QUERY PLAN +-------------------------------------------------------------- + Custom Scan (Citus Router) + explain statements for distributed queries are not enabled +(2 rows) + +ROLLBACK; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand diff --git a/src/test/regress/sql/multi_task_assignment_policy.sql b/src/test/regress/sql/multi_task_assignment_policy.sql index 341a4bb76..5dce8c727 100644 --- a/src/test/regress/sql/multi_task_assignment_policy.sql +++ b/src/test/regress/sql/multi_task_assignment_policy.sql @@ -94,3 +94,28 @@ RESET citus.task_assignment_policy; RESET client_min_messages; COMMIT; + +BEGIN; + +SET LOCAL client_min_messages TO DEBUG3; +SET LOCAL citus.explain_distributed_queries TO off; + +-- Check how task_assignment_policy impact planning decisions for reference tables + +CREATE TABLE task_assignment_reference_table (test_id integer); +SELECT create_reference_table('task_assignment_reference_table'); + +SET LOCAL citus.task_assignment_policy TO 'greedy'; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; + +SET LOCAL citus.task_assignment_policy TO 'first-replica'; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; + +-- here we expect debug output showing two different hosts for subsequent queries +SET LOCAL citus.task_assignment_policy TO 'round-robin'; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; +EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; + +ROLLBACK; \ No newline at end of file