diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ec724c9da..9a4d92d21 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -558,7 +558,6 @@ static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); static bool DistributedExecutionRequiresRollback(DistributedExecution *execution); static bool TaskListRequires2PC(List *taskList); -static bool ReadOnlyTask(TaskType taskType); static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList); static void AssignTasksToConnections(DistributedExecution *execution); static void UnclaimAllSessionConnections(List *sessionList); @@ -1167,7 +1166,7 @@ TaskListRequires2PC(List *taskList) * ReadOnlyTask returns true if the input task does a read-only operation * on the database. */ -static bool +bool ReadOnlyTask(TaskType taskType) { if (taskType == ROUTER_TASK || taskType == SQL_TASK) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 57ee6fe7f..abdb3dd58 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -30,6 +30,7 @@ #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" #include "distributed/multi_join_order.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_optimizer.h" @@ -167,6 +168,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType taskAssignmentPolicy, List *placementList); +static List * RemoveCoordinatorPlacement(List *placementList); /* @@ -1779,6 +1781,20 @@ ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, Assert(list_length(job->taskList) == 1); task = (Task *) linitial(job->taskList); + /* + * For round-robin SELECT queries, we don't want to include the coordinator + * because the user is trying to distributed the load across nodes via + * round-robin policy. Otherwise, the local execution would prioritize + * executing the local tasks and especially for reference tables on the + * coordinator this would prevent load balancing accross nodes. + * + * For other worker nodes in Citus MX, we let the local execution to kick-in + * even for round-robin policy, that's because we expect the clients to evenly + * connect to the worker nodes. + */ + Assert(ReadOnlyTask(task->taskType)); + placementList = RemoveCoordinatorPlacement(placementList); + /* reorder the placement list */ reorderedPlacementList = RoundRobinReorder(task, placementList); task->taskPlacementList = reorderedPlacementList; @@ -1791,6 +1807,37 @@ ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, } +/* + * RemoveCoordinatorPlacement gets a task placement list and returns the list + * by removing the placement belonging to the coordinator (if any). + * + * If the list has a single entry or no placements on the coordinator, the list + * is return unmodified. + */ +static List * +RemoveCoordinatorPlacement(List *placementList) +{ + ListCell *placementCell = NULL; + + if (list_length(placementList) < 2) + { + return placementList; + } + + foreach(placementCell, placementList) + { + ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); + + if (placement->groupId == COORDINATOR_GROUP_ID) + { + return list_delete_ptr(placementList, placement); + } + } + + return placementList; +} + + /* * SingleShardSelectTaskList generates a task for single shard select query * and returns it as a list. diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 27ef96d70..9ea5c4cbd 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -62,6 +62,7 @@ extern void SetLocalMultiShardModifyModeToSequential(void); extern void SetLocalForceMaxQueryParallelization(void); extern void SortTupleStore(CitusScanState *scanState); extern bool DistributedPlanModifiesDatabase(DistributedPlan *plan); +extern bool ReadOnlyTask(TaskType taskType); extern void ExtractParametersFromParamList(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues, bool diff --git a/src/test/regress/expected/multi_mx_add_coordinator.out b/src/test/regress/expected/multi_mx_add_coordinator.out index bb347c2f6..2c700dc05 100644 --- a/src/test/regress/expected/multi_mx_add_coordinator.out +++ b/src/test/regress/expected/multi_mx_add_coordinator.out @@ -33,6 +33,66 @@ SELECT create_reference_table('ref'); (1 row) +SET citus.log_local_commands TO ON; +SET client_min_messages TO DEBUG; +-- if the placement policy is not round-robin, SELECTs on the reference +-- tables use local execution +SELECT count(*) FROM ref; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +LOG: executing the command locally: SELECT count(*) AS count FROM mx_add_coordinator.ref_7000000 ref + count +------- + 0 +(1 row) + +SELECT count(*) FROM ref; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +LOG: executing the command locally: SELECT count(*) AS count FROM mx_add_coordinator.ref_7000000 ref + count +------- + 0 +(1 row) + +-- for round-robin policy, always go to workers +SET citus.task_assignment_policy TO "round-robin"; +SELECT count(*) FROM ref; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable + count +------- + 0 +(1 row) + +SELECT count(*) FROM ref; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable + count +------- + 0 +(1 row) + +SELECT count(*) FROM ref; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable + count +------- + 0 +(1 row) + +-- modifications always go through local shard as well as remote ones +INSERT INTO ref VALUES (1); +DEBUG: Creating router plan +DEBUG: Plan is router executable +LOG: executing the command locally: INSERT INTO mx_add_coordinator.ref_7000000 (a) VALUES (1) +-- get it ready for the next executions +TRUNCATE ref; -- test that changes from a metadata node is reflected in the coordinator placement \c - - - :worker_1_port SET search_path TO mx_add_coordinator,public; diff --git a/src/test/regress/sql/multi_mx_add_coordinator.sql b/src/test/regress/sql/multi_mx_add_coordinator.sql index b35949ddb..3ac026ee6 100644 --- a/src/test/regress/sql/multi_mx_add_coordinator.sql +++ b/src/test/regress/sql/multi_mx_add_coordinator.sql @@ -18,6 +18,26 @@ SELECT verify_metadata('localhost', :worker_1_port), CREATE TABLE ref(a int); SELECT create_reference_table('ref'); +SET citus.log_local_commands TO ON; +SET client_min_messages TO DEBUG; + +-- if the placement policy is not round-robin, SELECTs on the reference +-- tables use local execution +SELECT count(*) FROM ref; +SELECT count(*) FROM ref; + +-- for round-robin policy, always go to workers +SET citus.task_assignment_policy TO "round-robin"; +SELECT count(*) FROM ref; +SELECT count(*) FROM ref; +SELECT count(*) FROM ref; + +-- modifications always go through local shard as well as remote ones +INSERT INTO ref VALUES (1); + +-- get it ready for the next executions +TRUNCATE ref; + -- test that changes from a metadata node is reflected in the coordinator placement \c - - - :worker_1_port SET search_path TO mx_add_coordinator,public;