Merge pull request #2723 from citusdata/simplify-round-robin-on-router-queries

Simplify round robin logic on router queries
pull/2724/head
Hanefi Onaldi 2019-05-24 14:24:05 +03:00 committed by GitHub
commit b31fbcb28d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 82 additions and 45 deletions

View File

@ -174,7 +174,6 @@ static bool HasMergeTaskDependencies(List *sqlTaskList);
static List * GreedyAssignTaskList(List *taskList); static List * GreedyAssignTaskList(List *taskList);
static Task * GreedyAssignTask(WorkerNode *workerNode, List *taskList, static Task * GreedyAssignTask(WorkerNode *workerNode, List *taskList,
List *activeShardPlacementLists); List *activeShardPlacementLists);
static List * RoundRobinReorder(Task *task, List *placementList);
static List * ReorderAndAssignTaskList(List *taskList, static List * ReorderAndAssignTaskList(List *taskList,
List * (*reorderFunction)(Task *, List *)); List * (*reorderFunction)(Task *, List *));
static int CompareTasksByShardId(const void *leftElement, const void *rightElement); static int CompareTasksByShardId(const void *leftElement, const void *rightElement);
@ -5088,7 +5087,7 @@ RoundRobinAssignTaskList(List *taskList)
* Citus generates since the distributed transactionId is generated during the execution * Citus generates since the distributed transactionId is generated during the execution
* where as task-assignment happens duing the planning. * where as task-assignment happens duing the planning.
*/ */
static List * List *
RoundRobinReorder(Task *task, List *placementList) RoundRobinReorder(Task *task, List *placementList)
{ {
TransactionId transactionId = GetMyProcLocalTransactionId(); TransactionId transactionId = GetMyProcLocalTransactionId();

View File

@ -155,8 +155,10 @@ static bool RowLocksOnRelations(Node *node, List **rtiLockList);
static List * SingleShardModifyTaskList(Query *query, uint64 jobId, static List * SingleShardModifyTaskList(Query *query, uint64 jobId,
List *relationShardList, List *placementList, List *relationShardList, List *placementList,
uint64 shardId); uint64 shardId);
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
taskAssignmentPolicy); TaskAssignmentPolicyType
taskAssignmentPolicy,
List *placementList);
/* /*
@ -1640,7 +1642,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
*/ */
if (shardId != INVALID_SHARD_ID) if (shardId != INVALID_SHARD_ID)
{ {
ReorderTaskPlacementsByTaskAssignmentPolicy(job, TaskAssignmentPolicy); ReorderTaskPlacementsByTaskAssignmentPolicy(job, TaskAssignmentPolicy,
placementList);
} }
} }
else if (isMultiShardModifyQuery) else if (isMultiShardModifyQuery)
@ -1673,12 +1676,31 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
* By default it does not reorder the task list, implying a first-replica strategy. * By default it does not reorder the task list, implying a first-replica strategy.
*/ */
static void static void
ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
taskAssignmentPolicy) TaskAssignmentPolicyType taskAssignmentPolicy,
List *placementList)
{ {
if (taskAssignmentPolicy == TASK_ASSIGNMENT_ROUND_ROBIN) if (taskAssignmentPolicy == TASK_ASSIGNMENT_ROUND_ROBIN)
{ {
job->taskList = RoundRobinAssignTaskList(job->taskList); Task *task = NULL;
List *reorderedPlacementList = NIL;
ShardPlacement *primaryPlacement = NULL;
/*
* We hit a single shard on router plans, and there should be only
* one task in the task list
*/
Assert(list_length(job->taskList) == 1);
task = (Task *) linitial(job->taskList);
/* reorder the placement list */
reorderedPlacementList = RoundRobinReorder(task, placementList);
task->taskPlacementList = reorderedPlacementList;
primaryPlacement = (ShardPlacement *) linitial(reorderedPlacementList);
ereport(DEBUG3, (errmsg("assigned task %u to node %s:%u", task->taskId,
primaryPlacement->nodeName,
primaryPlacement->nodePort)));
} }
} }

View File

@ -351,7 +351,7 @@ extern List * TaskListDifference(const List *list1, const List *list2);
extern List * AssignAnchorShardTaskList(List *taskList); extern List * AssignAnchorShardTaskList(List *taskList);
extern List * FirstReplicaAssignTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList);
extern List * RoundRobinAssignTaskList(List *taskList); extern List * RoundRobinAssignTaskList(List *taskList);
extern List * RoundRobinPerTransactionAssignTaskList(List *taskList); extern List * RoundRobinReorder(Task *task, List *placementList);
extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement); extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement);
/* function declaration for creating Task */ /* function declaration for creating Task */

View File

@ -24,12 +24,11 @@ begin
END IF; END IF;
IF r LIKE '%' || table_name || '%' THEN IF r LIKE '%' || table_name || '%' THEN
shardOfTheTask = substring(r, '([0-9]{1,10})'); shardOfTheTask = substring(r, '([0-9]{5,10})');
return QUERY SELECT shardOfTheTask || '@' || portOfTheTask ;
END IF; END IF;
end loop; end loop;
return; return QUERY SELECT shardOfTheTask || '@' || portOfTheTask;
end; $$ language plpgsql; end; $$ language plpgsql;
SET citus.explain_distributed_queries TO off; SET citus.explain_distributed_queries TO off;
-- Check that our policies for assigning tasks to worker nodes run as expected. -- Check that our policies for assigning tasks to worker nodes run as expected.
@ -300,11 +299,8 @@ SELECT count(DISTINCT value) FROM explain_outputs;
(1 row) (1 row)
TRUNCATE explain_outputs; TRUNCATE explain_outputs;
RESET citus.task_assignment_policy; -- We should be able to use round-robin with router queries that
RESET client_min_messages;
-- we should be able to use round-robin with router queries that
-- only contains intermediate results -- only contains intermediate results
BEGIN;
CREATE TABLE task_assignment_test_table_2 (test_id integer); CREATE TABLE task_assignment_test_table_2 (test_id integer);
SELECT create_distributed_table('task_assignment_test_table_2', 'test_id'); SELECT create_distributed_table('task_assignment_test_table_2', 'test_id');
create_distributed_table create_distributed_table
@ -312,17 +308,26 @@ SELECT create_distributed_table('task_assignment_test_table_2', 'test_id');
(1 row) (1 row)
WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; SET citus.task_assignment_policy TO 'round-robin';
test_id -- Run the query two times to make sure that it hits two different workers
--------- -- on consecutive runs
(0 rows) INSERT INTO explain_outputs
SELECT parse_explain_output($cmd$
EXPLAIN WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1
$cmd$, 'task_assignment_test_table_2');
INSERT INTO explain_outputs
SELECT parse_explain_output($cmd$
EXPLAIN WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1
$cmd$, 'task_assignment_test_table_2');
-- The count should be 2 since the intermediate results are processed on
-- different workers
SELECT count(DISTINCT value) FROM explain_outputs;
count
-------
2
(1 row)
SET LOCAL citus.task_assignment_policy TO 'round-robin'; RESET citus.task_assignment_policy;
WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; RESET client_min_messages;
test_id
---------
(0 rows)
ROLLBACK;
DROP TABLE task_assignment_replicated_hash, task_assignment_nonreplicated_hash, DROP TABLE task_assignment_replicated_hash, task_assignment_nonreplicated_hash,
task_assignment_reference_table, explain_outputs; task_assignment_reference_table, task_assignment_test_table_2, explain_outputs;

View File

@ -23,12 +23,11 @@ begin
END IF; END IF;
IF r LIKE '%' || table_name || '%' THEN IF r LIKE '%' || table_name || '%' THEN
shardOfTheTask = substring(r, '([0-9]{1,10})'); shardOfTheTask = substring(r, '([0-9]{5,10})');
return QUERY SELECT shardOfTheTask || '@' || portOfTheTask ;
END IF; END IF;
end loop; end loop;
return; return QUERY SELECT shardOfTheTask || '@' || portOfTheTask;
end; $$ language plpgsql; end; $$ language plpgsql;
@ -236,19 +235,31 @@ $cmd$, 'task_assignment_nonreplicated_hash');
SELECT count(DISTINCT value) FROM explain_outputs; SELECT count(DISTINCT value) FROM explain_outputs;
TRUNCATE explain_outputs; TRUNCATE explain_outputs;
RESET citus.task_assignment_policy; -- We should be able to use round-robin with router queries that
RESET client_min_messages;
-- we should be able to use round-robin with router queries that
-- only contains intermediate results -- only contains intermediate results
BEGIN;
CREATE TABLE task_assignment_test_table_2 (test_id integer); CREATE TABLE task_assignment_test_table_2 (test_id integer);
SELECT create_distributed_table('task_assignment_test_table_2', 'test_id'); SELECT create_distributed_table('task_assignment_test_table_2', 'test_id');
WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; SET citus.task_assignment_policy TO 'round-robin';
SET LOCAL citus.task_assignment_policy TO 'round-robin';
WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; -- Run the query two times to make sure that it hits two different workers
ROLLBACK; -- on consecutive runs
INSERT INTO explain_outputs
SELECT parse_explain_output($cmd$
EXPLAIN WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1
$cmd$, 'task_assignment_test_table_2');
INSERT INTO explain_outputs
SELECT parse_explain_output($cmd$
EXPLAIN WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1
$cmd$, 'task_assignment_test_table_2');
-- The count should be 2 since the intermediate results are processed on
-- different workers
SELECT count(DISTINCT value) FROM explain_outputs;
RESET citus.task_assignment_policy;
RESET client_min_messages;
DROP TABLE task_assignment_replicated_hash, task_assignment_nonreplicated_hash, DROP TABLE task_assignment_replicated_hash, task_assignment_nonreplicated_hash,
task_assignment_reference_table, explain_outputs; task_assignment_reference_table, task_assignment_test_table_2, explain_outputs;