From b935dfb8c844d6f8b68272b2ddf45c986448edf6 Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Tue, 21 May 2019 11:50:36 +0300 Subject: [PATCH 1/3] Cleanup deleted function declaration --- src/include/distributed/multi_physical_planner.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 68831294a..c4d662d5f 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -351,7 +351,6 @@ extern List * TaskListDifference(const List *list1, const List *list2); extern List * AssignAnchorShardTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList); extern List * RoundRobinAssignTaskList(List *taskList); -extern List * RoundRobinPerTransactionAssignTaskList(List *taskList); extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement); /* function declaration for creating Task */ From 4d737177e69a6d596eff0c623cf9a151286312d2 Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Wed, 22 May 2019 14:54:52 +0300 Subject: [PATCH 2/3] Remove redundant active placement filters and unneded sort operations If a query is router executable, it hits a single shard and therefore has a single task associated with it. Therefore there is no need to sort the task list that has a single element. Also we already have a list of active shard placements, sending it in param and reuse it. --- .../planner/multi_physical_planner.c | 3 +- .../planner/multi_router_planner.c | 34 +++++++++++++++---- .../distributed/multi_physical_planner.h | 1 + 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index a2f41465b..b9a8bbc97 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -174,7 +174,6 @@ static bool HasMergeTaskDependencies(List *sqlTaskList); static List * GreedyAssignTaskList(List *taskList); static Task * GreedyAssignTask(WorkerNode *workerNode, List *taskList, List *activeShardPlacementLists); -static List * RoundRobinReorder(Task *task, List *placementList); static List * ReorderAndAssignTaskList(List *taskList, List * (*reorderFunction)(Task *, List *)); static int CompareTasksByShardId(const void *leftElement, const void *rightElement); @@ -5088,7 +5087,7 @@ RoundRobinAssignTaskList(List *taskList) * Citus generates since the distributed transactionId is generated during the execution * where as task-assignment happens duing the planning. */ -static List * +List * RoundRobinReorder(Task *task, List *placementList) { TransactionId transactionId = GetMyProcLocalTransactionId(); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 02d14c58b..bcdc6dff2 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -155,8 +155,10 @@ static bool RowLocksOnRelations(Node *node, List **rtiLockList); static List * SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId); -static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType - taskAssignmentPolicy); +static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, + TaskAssignmentPolicyType + taskAssignmentPolicy, + List *placementList); /* @@ -1640,7 +1642,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon */ if (shardId != INVALID_SHARD_ID) { - ReorderTaskPlacementsByTaskAssignmentPolicy(job, TaskAssignmentPolicy); + ReorderTaskPlacementsByTaskAssignmentPolicy(job, TaskAssignmentPolicy, + placementList); } } else if (isMultiShardModifyQuery) @@ -1673,12 +1676,31 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon * By default it does not reorder the task list, implying a first-replica strategy. */ static void -ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType - taskAssignmentPolicy) +ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, + TaskAssignmentPolicyType taskAssignmentPolicy, + List *placementList) { if (taskAssignmentPolicy == TASK_ASSIGNMENT_ROUND_ROBIN) { - job->taskList = RoundRobinAssignTaskList(job->taskList); + Task *task = NULL; + List *reorderedPlacementList = NIL; + ShardPlacement *primaryPlacement = NULL; + + /* + * We hit a single shard on router plans, and there should be only + * one task in the task list + */ + Assert(list_length(job->taskList) == 1); + task = (Task *) linitial(job->taskList); + + /* reorder the placement list */ + reorderedPlacementList = RoundRobinReorder(task, placementList); + task->taskPlacementList = reorderedPlacementList; + + primaryPlacement = (ShardPlacement *) linitial(reorderedPlacementList); + ereport(DEBUG3, (errmsg("assigned task %u to node %s:%u", task->taskId, + primaryPlacement->nodeName, + primaryPlacement->nodePort))); } } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index c4d662d5f..6fbab4915 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -351,6 +351,7 @@ extern List * TaskListDifference(const List *list1, const List *list2); extern List * AssignAnchorShardTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList); extern List * RoundRobinAssignTaskList(List *taskList); +extern List * RoundRobinReorder(Task *task, List *placementList); extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement); /* function declaration for creating Task */ From 7443191397381153e85c24ef696b44ff70b52907 Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Wed, 22 May 2019 18:30:48 +0300 Subject: [PATCH 3/3] Improve tests for round robin & router queries --- .../expected/multi_task_assignment_policy.out | 47 ++++++++++--------- .../sql/multi_task_assignment_policy.sql | 41 ++++++++++------ 2 files changed, 52 insertions(+), 36 deletions(-) diff --git a/src/test/regress/expected/multi_task_assignment_policy.out b/src/test/regress/expected/multi_task_assignment_policy.out index 70bae799b..6f8f841ea 100644 --- a/src/test/regress/expected/multi_task_assignment_policy.out +++ b/src/test/regress/expected/multi_task_assignment_policy.out @@ -19,17 +19,16 @@ DECLARE shardOfTheTask text; begin for r in execute qry loop - IF r LIKE '%port%' THEN - portOfTheTask = substring(r, '([0-9]{1,10})'); + IF r LIKE '%port%' THEN + portOfTheTask = substring(r, '([0-9]{1,10})'); END IF; IF r LIKE '%' || table_name || '%' THEN - shardOfTheTask = substring(r, '([0-9]{1,10})'); - return QUERY SELECT shardOfTheTask || '@' || portOfTheTask ; + shardOfTheTask = substring(r, '([0-9]{5,10})'); END IF; end loop; - return; + return QUERY SELECT shardOfTheTask || '@' || portOfTheTask; end; $$ language plpgsql; SET citus.explain_distributed_queries TO off; -- Check that our policies for assigning tasks to worker nodes run as expected. @@ -300,11 +299,8 @@ SELECT count(DISTINCT value) FROM explain_outputs; (1 row) TRUNCATE explain_outputs; -RESET citus.task_assignment_policy; -RESET client_min_messages; --- we should be able to use round-robin with router queries that +-- We should be able to use round-robin with router queries that -- only contains intermediate results -BEGIN; CREATE TABLE task_assignment_test_table_2 (test_id integer); SELECT create_distributed_table('task_assignment_test_table_2', 'test_id'); create_distributed_table @@ -312,17 +308,26 @@ SELECT create_distributed_table('task_assignment_test_table_2', 'test_id'); (1 row) -WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; - test_id ---------- -(0 rows) +SET citus.task_assignment_policy TO 'round-robin'; +-- Run the query two times to make sure that it hits two different workers +-- on consecutive runs +INSERT INTO explain_outputs +SELECT parse_explain_output($cmd$ +EXPLAIN WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1 +$cmd$, 'task_assignment_test_table_2'); +INSERT INTO explain_outputs +SELECT parse_explain_output($cmd$ +EXPLAIN WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1 +$cmd$, 'task_assignment_test_table_2'); +-- The count should be 2 since the intermediate results are processed on +-- different workers +SELECT count(DISTINCT value) FROM explain_outputs; + count +------- + 2 +(1 row) -SET LOCAL citus.task_assignment_policy TO 'round-robin'; -WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; - test_id ---------- -(0 rows) - -ROLLBACK; +RESET citus.task_assignment_policy; +RESET client_min_messages; DROP TABLE task_assignment_replicated_hash, task_assignment_nonreplicated_hash, - task_assignment_reference_table, explain_outputs; + task_assignment_reference_table, task_assignment_test_table_2, explain_outputs; diff --git a/src/test/regress/sql/multi_task_assignment_policy.sql b/src/test/regress/sql/multi_task_assignment_policy.sql index 67341b8cc..95a57f204 100644 --- a/src/test/regress/sql/multi_task_assignment_policy.sql +++ b/src/test/regress/sql/multi_task_assignment_policy.sql @@ -18,17 +18,16 @@ DECLARE shardOfTheTask text; begin for r in execute qry loop - IF r LIKE '%port%' THEN - portOfTheTask = substring(r, '([0-9]{1,10})'); + IF r LIKE '%port%' THEN + portOfTheTask = substring(r, '([0-9]{1,10})'); END IF; IF r LIKE '%' || table_name || '%' THEN - shardOfTheTask = substring(r, '([0-9]{1,10})'); - return QUERY SELECT shardOfTheTask || '@' || portOfTheTask ; + shardOfTheTask = substring(r, '([0-9]{5,10})'); END IF; end loop; - return; + return QUERY SELECT shardOfTheTask || '@' || portOfTheTask; end; $$ language plpgsql; @@ -236,19 +235,31 @@ $cmd$, 'task_assignment_nonreplicated_hash'); SELECT count(DISTINCT value) FROM explain_outputs; TRUNCATE explain_outputs; -RESET citus.task_assignment_policy; -RESET client_min_messages; - --- we should be able to use round-robin with router queries that +-- We should be able to use round-robin with router queries that -- only contains intermediate results -BEGIN; CREATE TABLE task_assignment_test_table_2 (test_id integer); SELECT create_distributed_table('task_assignment_test_table_2', 'test_id'); -WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; -SET LOCAL citus.task_assignment_policy TO 'round-robin'; -WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; -ROLLBACK; +SET citus.task_assignment_policy TO 'round-robin'; + +-- Run the query two times to make sure that it hits two different workers +-- on consecutive runs +INSERT INTO explain_outputs +SELECT parse_explain_output($cmd$ +EXPLAIN WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1 +$cmd$, 'task_assignment_test_table_2'); + +INSERT INTO explain_outputs +SELECT parse_explain_output($cmd$ +EXPLAIN WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1 +$cmd$, 'task_assignment_test_table_2'); + +-- The count should be 2 since the intermediate results are processed on +-- different workers +SELECT count(DISTINCT value) FROM explain_outputs; + +RESET citus.task_assignment_policy; +RESET client_min_messages; DROP TABLE task_assignment_replicated_hash, task_assignment_nonreplicated_hash, - task_assignment_reference_table, explain_outputs; + task_assignment_reference_table, task_assignment_test_table_2, explain_outputs;