From 90943a6ce6864244a1fbef8f648020ed0377be25 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 8 Nov 2019 15:58:36 +0100 Subject: [PATCH] Do not include coordinator shards when round-robin is selected When the user picks "round-robin" policy, the aim is that the load is distributed across nodes. However, for reference tables on the coordinator, since local execution kicks in immediately, round-robin is ignored. With this change, we're excluding the placement on the coordinator. Although the approach seems a little bit invasive because of modifications in the placement list, that sounds acceptable. We could have done this in some other ways such as: 1) Add a field to "Task->roundRobinPlacement" (or such), which is updated as the first element after RoundRobinPolicy is applied. During the execution, if that placement is local to the coordinator, skip it and try the other remote placements. 2) On TaskAccessesLocalNode()@local_execution.c, check task_assignment_policy, if round-robin selected and there is local placement on the coordinator, skip it. However, task assignment is done on planning, but this decision is happening on the execution, which could create weird edge cases. --- .../distributed/executor/adaptive_executor.c | 3 +- .../planner/multi_router_planner.c | 47 +++++++++++++++ src/include/distributed/multi_executor.h | 1 + .../expected/multi_mx_add_coordinator.out | 60 +++++++++++++++++++ .../regress/sql/multi_mx_add_coordinator.sql | 20 +++++++ 5 files changed, 129 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ec724c9da..9a4d92d21 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -558,7 +558,6 @@ static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); static bool DistributedExecutionRequiresRollback(DistributedExecution *execution); static bool TaskListRequires2PC(List *taskList); -static bool ReadOnlyTask(TaskType taskType); static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList); static void AssignTasksToConnections(DistributedExecution *execution); static void UnclaimAllSessionConnections(List *sessionList); @@ -1167,7 +1166,7 @@ TaskListRequires2PC(List *taskList) * ReadOnlyTask returns true if the input task does a read-only operation * on the database. */ -static bool +bool ReadOnlyTask(TaskType taskType) { if (taskType == ROUTER_TASK || taskType == SQL_TASK) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 57ee6fe7f..abdb3dd58 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -30,6 +30,7 @@ #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" #include "distributed/multi_join_order.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_optimizer.h" @@ -167,6 +168,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType taskAssignmentPolicy, List *placementList); +static List * RemoveCoordinatorPlacement(List *placementList); /* @@ -1779,6 +1781,20 @@ ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, Assert(list_length(job->taskList) == 1); task = (Task *) linitial(job->taskList); + /* + * For round-robin SELECT queries, we don't want to include the coordinator + * because the user is trying to distributed the load across nodes via + * round-robin policy. Otherwise, the local execution would prioritize + * executing the local tasks and especially for reference tables on the + * coordinator this would prevent load balancing accross nodes. + * + * For other worker nodes in Citus MX, we let the local execution to kick-in + * even for round-robin policy, that's because we expect the clients to evenly + * connect to the worker nodes. + */ + Assert(ReadOnlyTask(task->taskType)); + placementList = RemoveCoordinatorPlacement(placementList); + /* reorder the placement list */ reorderedPlacementList = RoundRobinReorder(task, placementList); task->taskPlacementList = reorderedPlacementList; @@ -1791,6 +1807,37 @@ ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, } +/* + * RemoveCoordinatorPlacement gets a task placement list and returns the list + * by removing the placement belonging to the coordinator (if any). + * + * If the list has a single entry or no placements on the coordinator, the list + * is return unmodified. + */ +static List * +RemoveCoordinatorPlacement(List *placementList) +{ + ListCell *placementCell = NULL; + + if (list_length(placementList) < 2) + { + return placementList; + } + + foreach(placementCell, placementList) + { + ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); + + if (placement->groupId == COORDINATOR_GROUP_ID) + { + return list_delete_ptr(placementList, placement); + } + } + + return placementList; +} + + /* * SingleShardSelectTaskList generates a task for single shard select query * and returns it as a list. diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 27ef96d70..9ea5c4cbd 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -62,6 +62,7 @@ extern void SetLocalMultiShardModifyModeToSequential(void); extern void SetLocalForceMaxQueryParallelization(void); extern void SortTupleStore(CitusScanState *scanState); extern bool DistributedPlanModifiesDatabase(DistributedPlan *plan); +extern bool ReadOnlyTask(TaskType taskType); extern void ExtractParametersFromParamList(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues, bool diff --git a/src/test/regress/expected/multi_mx_add_coordinator.out b/src/test/regress/expected/multi_mx_add_coordinator.out index bb347c2f6..2c700dc05 100644 --- a/src/test/regress/expected/multi_mx_add_coordinator.out +++ b/src/test/regress/expected/multi_mx_add_coordinator.out @@ -33,6 +33,66 @@ SELECT create_reference_table('ref'); (1 row) +SET citus.log_local_commands TO ON; +SET client_min_messages TO DEBUG; +-- if the placement policy is not round-robin, SELECTs on the reference +-- tables use local execution +SELECT count(*) FROM ref; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +LOG: executing the command locally: SELECT count(*) AS count FROM mx_add_coordinator.ref_7000000 ref + count +------- + 0 +(1 row) + +SELECT count(*) FROM ref; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +LOG: executing the command locally: SELECT count(*) AS count FROM mx_add_coordinator.ref_7000000 ref + count +------- + 0 +(1 row) + +-- for round-robin policy, always go to workers +SET citus.task_assignment_policy TO "round-robin"; +SELECT count(*) FROM ref; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable + count +------- + 0 +(1 row) + +SELECT count(*) FROM ref; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable + count +------- + 0 +(1 row) + +SELECT count(*) FROM ref; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable + count +------- + 0 +(1 row) + +-- modifications always go through local shard as well as remote ones +INSERT INTO ref VALUES (1); +DEBUG: Creating router plan +DEBUG: Plan is router executable +LOG: executing the command locally: INSERT INTO mx_add_coordinator.ref_7000000 (a) VALUES (1) +-- get it ready for the next executions +TRUNCATE ref; -- test that changes from a metadata node is reflected in the coordinator placement \c - - - :worker_1_port SET search_path TO mx_add_coordinator,public; diff --git a/src/test/regress/sql/multi_mx_add_coordinator.sql b/src/test/regress/sql/multi_mx_add_coordinator.sql index b35949ddb..3ac026ee6 100644 --- a/src/test/regress/sql/multi_mx_add_coordinator.sql +++ b/src/test/regress/sql/multi_mx_add_coordinator.sql @@ -18,6 +18,26 @@ SELECT verify_metadata('localhost', :worker_1_port), CREATE TABLE ref(a int); SELECT create_reference_table('ref'); +SET citus.log_local_commands TO ON; +SET client_min_messages TO DEBUG; + +-- if the placement policy is not round-robin, SELECTs on the reference +-- tables use local execution +SELECT count(*) FROM ref; +SELECT count(*) FROM ref; + +-- for round-robin policy, always go to workers +SET citus.task_assignment_policy TO "round-robin"; +SELECT count(*) FROM ref; +SELECT count(*) FROM ref; +SELECT count(*) FROM ref; + +-- modifications always go through local shard as well as remote ones +INSERT INTO ref VALUES (1); + +-- get it ready for the next executions +TRUNCATE ref; + -- test that changes from a metadata node is reflected in the coordinator placement \c - - - :worker_1_port SET search_path TO mx_add_coordinator,public;