Round robin queries to reference tables with task_assignment_policy set to `round-robin` (#2472)

Description: Support round-robin `task_assignment_policy` for queries to reference tables.

This PR allows users to query multiple placements of shards in a round robin fashion. When `citus.task_assignment_policy` is set to `'round-robin'` the planner will use a round robin scheduling feature when multiple shard placements are available.

The primary use-case is spreading the load of reference table queries to all the nodes in the cluster instead of hammering only the first placement of the reference table. Since reference tables share the same path for selecting the shards with single shard queries that have multiple placements (`citus.shard_replication_factor > 1`) this setting also allows users to spread the query load on these shards.

For modifying queries we do not apply a round-robin strategy. This would be negated by an extra reordering step in the executor for such queries where a `first-replica` strategy is enforced.
pull/2475/head
Nils Dijk 2018-11-15 15:11:15 +01:00 committed by GitHub
parent 5f821e6f64
commit f9520be011
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 265 additions and 17 deletions

View File

@ -114,7 +114,6 @@ static AttrNumber NewColumnId(Index originalTableId, AttrNumber originalColumnId
static Job * JobForRangeTable(List *jobList, RangeTblEntry *rangeTableEntry); static Job * JobForRangeTable(List *jobList, RangeTblEntry *rangeTableEntry);
static Job * JobForTableIdList(List *jobList, List *searchedTableIdList); static Job * JobForTableIdList(List *jobList, List *searchedTableIdList);
static List * ChildNodeList(MultiNode *multiNode); static List * ChildNodeList(MultiNode *multiNode);
static uint64 UniqueJobId(void);
static Job * BuildJob(Query *jobQuery, List *dependedJobList); static Job * BuildJob(Query *jobQuery, List *dependedJobList);
static MapMergeJob * BuildMapMergeJob(Query *jobQuery, List *dependedJobList, static MapMergeJob * BuildMapMergeJob(Query *jobQuery, List *dependedJobList,
Var *partitionKey, PartitionType partitionType, Var *partitionKey, PartitionType partitionType,
@ -172,7 +171,6 @@ static bool HasMergeTaskDependencies(List *sqlTaskList);
static List * GreedyAssignTaskList(List *taskList); static List * GreedyAssignTaskList(List *taskList);
static Task * GreedyAssignTask(WorkerNode *workerNode, List *taskList, static Task * GreedyAssignTask(WorkerNode *workerNode, List *taskList,
List *activeShardPlacementLists); List *activeShardPlacementLists);
static List * RoundRobinAssignTaskList(List *taskList);
static List * RoundRobinReorder(Task *task, List *placementList); static List * RoundRobinReorder(Task *task, List *placementList);
static List * ReorderAndAssignTaskList(List *taskList, static List * ReorderAndAssignTaskList(List *taskList,
List * (*reorderFunction)(Task *, List *)); List * (*reorderFunction)(Task *, List *));
@ -1757,7 +1755,7 @@ ChildNodeList(MultiNode *multiNode)
* When citus.enable_unique_job_ids is off then only the local counter is * When citus.enable_unique_job_ids is off then only the local counter is
* included to get repeatable results. * included to get repeatable results.
*/ */
static uint64 uint64
UniqueJobId(void) UniqueJobId(void)
{ {
static uint32 jobIdCounter = 0; static uint32 jobIdCounter = 0;
@ -5099,7 +5097,7 @@ FirstReplicaAssignTaskList(List *taskList)
* by the number of active shard placements, and ensure that we rotate between * by the number of active shard placements, and ensure that we rotate between
* these placements across subsequent queries. * these placements across subsequent queries.
*/ */
static List * List *
RoundRobinAssignTaskList(List *taskList) RoundRobinAssignTaskList(List *taskList)
{ {
taskList = ReorderAndAssignTaskList(taskList, RoundRobinReorder); taskList = ReorderAndAssignTaskList(taskList, RoundRobinReorder);

View File

@ -148,11 +148,15 @@ static List * get_all_actual_clauses(List *restrictinfo_list);
static int CompareInsertValuesByShardId(const void *leftElement, static int CompareInsertValuesByShardId(const void *leftElement,
const void *rightElement); const void *rightElement);
static uint64 GetInitialShardId(List *relationShardList); static uint64 GetInitialShardId(List *relationShardList);
static List * SingleShardSelectTaskList(Query *query, List *relationShardList, static List * SingleShardSelectTaskList(Query *query, uint64 jobId,
List *placementList, uint64 shardId); List *relationShardList, List *placementList,
uint64 shardId);
static bool RowLocksOnRelations(Node *node, List **rtiLockList); static bool RowLocksOnRelations(Node *node, List **rtiLockList);
static List * SingleShardModifyTaskList(Query *query, List *relationShardList, static List * SingleShardModifyTaskList(Query *query, uint64 jobId,
List *placementList, uint64 shardId); List *relationShardList, List *placementList,
uint64 shardId);
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType
taskAssignmentPolicy);
/* /*
@ -1388,7 +1392,7 @@ CreateJob(Query *query)
Job *job = NULL; Job *job = NULL;
job = CitusMakeNode(Job); job = CitusMakeNode(Job);
job->jobId = INVALID_JOB_ID; job->jobId = UniqueJobId();
job->jobQuery = query; job->jobQuery = query;
job->taskList = NIL; job->taskList = NIL;
job->dependedJobList = NIL; job->dependedJobList = NIL;
@ -1625,12 +1629,22 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
if (originalQuery->commandType == CMD_SELECT) if (originalQuery->commandType == CMD_SELECT)
{ {
job->taskList = SingleShardSelectTaskList(originalQuery, relationShardList, job->taskList = SingleShardSelectTaskList(originalQuery, job->jobId,
placementList, shardId); relationShardList, placementList,
shardId);
/*
* Queries to reference tables, or distributed tables with multiple replica's have
* their task placements reordered according to the configured
* task_assignment_policy. This is only applicable to select queries as the modify
* queries will be reordered to _always_ use the first-replica policy during
* execution.
*/
ReorderTaskPlacementsByTaskAssignmentPolicy(job, TaskAssignmentPolicy);
} }
else if (isMultiShardModifyQuery) else if (isMultiShardModifyQuery)
{ {
job->taskList = QueryPushdownSqlTaskList(originalQuery, 0, job->taskList = QueryPushdownSqlTaskList(originalQuery, job->jobId,
plannerRestrictionContext-> plannerRestrictionContext->
relationRestrictionContext, relationRestrictionContext,
relationShardList, MODIFY_TASK, relationShardList, MODIFY_TASK,
@ -1638,8 +1652,9 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
} }
else else
{ {
job->taskList = SingleShardModifyTaskList(originalQuery, relationShardList, job->taskList = SingleShardModifyTaskList(originalQuery, job->jobId,
placementList, shardId); relationShardList, placementList,
shardId);
} }
job->requiresMasterEvaluation = requiresMasterEvaluation; job->requiresMasterEvaluation = requiresMasterEvaluation;
@ -1647,12 +1662,33 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
} }
/*
* ReorderTaskPlacementsByTaskAssignmentPolicy applies selective reordering for supported
* TaskAssignmentPolicyTypes.
*
* Supported Types
* - TASK_ASSIGNMENT_ROUND_ROBIN round robin schedule queries among placements
*
* By default it does not reorder the task list, implying a first-replica strategy.
*/
static void
ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType
taskAssignmentPolicy)
{
if (taskAssignmentPolicy == TASK_ASSIGNMENT_ROUND_ROBIN)
{
job->taskList = RoundRobinAssignTaskList(job->taskList);
}
}
/* /*
* SingleShardSelectTaskList generates a task for single shard select query * SingleShardSelectTaskList generates a task for single shard select query
* and returns it as a list. * and returns it as a list.
*/ */
static List * static List *
SingleShardSelectTaskList(Query *query, List *relationShardList, List *placementList, SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList,
List *placementList,
uint64 shardId) uint64 shardId)
{ {
Task *task = CreateTask(ROUTER_TASK); Task *task = CreateTask(ROUTER_TASK);
@ -1664,6 +1700,7 @@ SingleShardSelectTaskList(Query *query, List *relationShardList, List *placement
task->queryString = queryString->data; task->queryString = queryString->data;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->jobId = jobId;
task->taskPlacementList = placementList; task->taskPlacementList = placementList;
task->relationShardList = relationShardList; task->relationShardList = relationShardList;
task->relationRowLockList = relationRowLockList; task->relationRowLockList = relationRowLockList;
@ -1718,8 +1755,8 @@ RowLocksOnRelations(Node *node, List **relationRowLockList)
* and returns it as a list. * and returns it as a list.
*/ */
static List * static List *
SingleShardModifyTaskList(Query *query, List *relationShardList, List *placementList, SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList,
uint64 shardId) List *placementList, uint64 shardId)
{ {
Task *task = CreateTask(MODIFY_TASK); Task *task = CreateTask(MODIFY_TASK);
StringInfo queryString = makeStringInfo(); StringInfo queryString = makeStringInfo();
@ -1746,6 +1783,7 @@ SingleShardModifyTaskList(Query *query, List *relationShardList, List *placement
task->queryString = queryString->data; task->queryString = queryString->data;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->jobId = jobId;
task->taskPlacementList = placementList; task->taskPlacementList = placementList;
task->relationShardList = relationShardList; task->relationShardList = relationShardList;
task->replicationModel = modificationTableCacheEntry->replicationModel; task->replicationModel = modificationTableCacheEntry->replicationModel;

View File

@ -340,6 +340,7 @@ extern bool TaskListMember(const List *taskList, const Task *task);
extern List * TaskListDifference(const List *list1, const List *list2); extern List * TaskListDifference(const List *list1, const List *list2);
extern List * AssignAnchorShardTaskList(List *taskList); extern List * AssignAnchorShardTaskList(List *taskList);
extern List * FirstReplicaAssignTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList);
extern List * RoundRobinAssignTaskList(List *taskList);
/* function declaration for creating Task */ /* function declaration for creating Task */
extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId, extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId,
@ -348,5 +349,8 @@ extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId,
List *prunedRelationShardList, TaskType taskType, List *prunedRelationShardList, TaskType taskType,
bool modifyRequiresMasterEvaluation); bool modifyRequiresMasterEvaluation);
/* function declarations for managing jobs */
extern uint64 UniqueJobId(void);
#endif /* MULTI_PHYSICAL_PLANNER_H */ #endif /* MULTI_PHYSICAL_PLANNER_H */

View File

@ -140,3 +140,75 @@ DEBUG: assigned task 1 to node localhost:57637
RESET citus.task_assignment_policy; RESET citus.task_assignment_policy;
RESET client_min_messages; RESET client_min_messages;
COMMIT; COMMIT;
BEGIN;
SET LOCAL client_min_messages TO DEBUG3;
SET LOCAL citus.explain_distributed_queries TO off;
-- Check how task_assignment_policy impact planning decisions for reference tables
CREATE TABLE task_assignment_reference_table (test_id integer);
SELECT create_reference_table('task_assignment_reference_table');
create_reference_table
------------------------
(1 row)
SET LOCAL citus.task_assignment_policy TO 'greedy';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: Creating router plan
DEBUG: Plan is router executable
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: Creating router plan
DEBUG: Plan is router executable
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
SET LOCAL citus.task_assignment_policy TO 'first-replica';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: Creating router plan
DEBUG: Plan is router executable
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: Creating router plan
DEBUG: Plan is router executable
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
-- here we expect debug output showing two different hosts for subsequent queries
SET LOCAL citus.task_assignment_policy TO 'round-robin';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: assigned task 0 to node localhost:57637
DEBUG: Creating router plan
DEBUG: Plan is router executable
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: assigned task 0 to node localhost:57638
DEBUG: Creating router plan
DEBUG: Plan is router executable
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
ROLLBACK;

View File

@ -176,3 +176,114 @@ RESET client_min_messages;
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: ProcessUtility DEBUG: ProcessUtility
COMMIT; COMMIT;
BEGIN;
SET LOCAL client_min_messages TO DEBUG3;
DEBUG: CommitTransactionCommand
SET LOCAL citus.explain_distributed_queries TO off;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
-- Check how task_assignment_policy impact planning decisions for reference tables
CREATE TABLE task_assignment_reference_table (test_id integer);
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
SELECT create_reference_table('task_assignment_reference_table');
DEBUG: StartTransactionCommand
DEBUG: CommitTransactionCommand
create_reference_table
------------------------
(1 row)
SET LOCAL citus.task_assignment_policy TO 'greedy';
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: Creating router plan
DEBUG: Plan is router executable
DEBUG: CommitTransactionCommand
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: Creating router plan
DEBUG: Plan is router executable
DEBUG: CommitTransactionCommand
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
SET LOCAL citus.task_assignment_policy TO 'first-replica';
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: Creating router plan
DEBUG: Plan is router executable
DEBUG: CommitTransactionCommand
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: Creating router plan
DEBUG: Plan is router executable
DEBUG: CommitTransactionCommand
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
-- here we expect debug output showing two different hosts for subsequent queries
SET LOCAL citus.task_assignment_policy TO 'round-robin';
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: assigned task 0 to node localhost:57637
DEBUG: Creating router plan
DEBUG: Plan is router executable
DEBUG: CommitTransactionCommand
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: assigned task 0 to node localhost:57638
DEBUG: Creating router plan
DEBUG: Plan is router executable
DEBUG: CommitTransactionCommand
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
ROLLBACK;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand

View File

@ -94,3 +94,28 @@ RESET citus.task_assignment_policy;
RESET client_min_messages; RESET client_min_messages;
COMMIT; COMMIT;
BEGIN;
SET LOCAL client_min_messages TO DEBUG3;
SET LOCAL citus.explain_distributed_queries TO off;
-- Check how task_assignment_policy impact planning decisions for reference tables
CREATE TABLE task_assignment_reference_table (test_id integer);
SELECT create_reference_table('task_assignment_reference_table');
SET LOCAL citus.task_assignment_policy TO 'greedy';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
SET LOCAL citus.task_assignment_policy TO 'first-replica';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
-- here we expect debug output showing two different hosts for subsequent queries
SET LOCAL citus.task_assignment_policy TO 'round-robin';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
ROLLBACK;