Do not include coordinator shards when round-robin is selected

When the user picks "round-robin" policy, the aim is that the load
is distributed across nodes. However, for reference tables on the
coordinator, since local execution kicks in immediately, round-robin
is ignored.

With this change, we're excluding the placement on the coordinator.
Although the approach seems a little bit invasive because of
modifications in the placement list, that sounds acceptable.

We could have done this in some other ways such as:

1) Add a field to "Task->roundRobinPlacement" (or such), which is
updated as the first element after RoundRobinPolicy is applied.
During the execution, if that placement is local to the coordinator,
skip it and try the other remote placements.

2) On TaskAccessesLocalNode()@local_execution.c, check
task_assignment_policy, if round-robin selected and there is local
placement on the coordinator, skip it. However, task assignment is done
on planning, but this decision is happening on the execution, which
could create weird edge cases.
pull/3170/head
Onder Kalaci 2019-11-08 15:58:36 +01:00 committed by Hadi Moshayedi
parent f8459f81a8
commit 90943a6ce6
5 changed files with 129 additions and 2 deletions

View File

@ -558,7 +558,6 @@ static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution
static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
static bool DistributedExecutionRequiresRollback(DistributedExecution *execution);
static bool TaskListRequires2PC(List *taskList);
static bool ReadOnlyTask(TaskType taskType);
static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList);
static void AssignTasksToConnections(DistributedExecution *execution);
static void UnclaimAllSessionConnections(List *sessionList);
@ -1167,7 +1166,7 @@ TaskListRequires2PC(List *taskList)
* ReadOnlyTask returns true if the input task does a read-only operation
* on the database.
*/
static bool
bool
ReadOnlyTask(TaskType taskType)
{
if (taskType == ROUTER_TASK || taskType == SQL_TASK)

View File

@ -30,6 +30,7 @@
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_logical_optimizer.h"
@ -167,6 +168,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
TaskAssignmentPolicyType
taskAssignmentPolicy,
List *placementList);
static List * RemoveCoordinatorPlacement(List *placementList);
/*
@ -1779,6 +1781,20 @@ ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
Assert(list_length(job->taskList) == 1);
task = (Task *) linitial(job->taskList);
/*
* For round-robin SELECT queries, we don't want to include the coordinator
* because the user is trying to distributed the load across nodes via
* round-robin policy. Otherwise, the local execution would prioritize
* executing the local tasks and especially for reference tables on the
* coordinator this would prevent load balancing accross nodes.
*
* For other worker nodes in Citus MX, we let the local execution to kick-in
* even for round-robin policy, that's because we expect the clients to evenly
* connect to the worker nodes.
*/
Assert(ReadOnlyTask(task->taskType));
placementList = RemoveCoordinatorPlacement(placementList);
/* reorder the placement list */
reorderedPlacementList = RoundRobinReorder(task, placementList);
task->taskPlacementList = reorderedPlacementList;
@ -1791,6 +1807,37 @@ ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
}
/*
* RemoveCoordinatorPlacement gets a task placement list and returns the list
* by removing the placement belonging to the coordinator (if any).
*
* If the list has a single entry or no placements on the coordinator, the list
* is return unmodified.
*/
static List *
RemoveCoordinatorPlacement(List *placementList)
{
ListCell *placementCell = NULL;
if (list_length(placementList) < 2)
{
return placementList;
}
foreach(placementCell, placementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
if (placement->groupId == COORDINATOR_GROUP_ID)
{
return list_delete_ptr(placementList, placement);
}
}
return placementList;
}
/*
* SingleShardSelectTaskList generates a task for single shard select query
* and returns it as a list.

View File

@ -62,6 +62,7 @@ extern void SetLocalMultiShardModifyModeToSequential(void);
extern void SetLocalForceMaxQueryParallelization(void);
extern void SortTupleStore(CitusScanState *scanState);
extern bool DistributedPlanModifiesDatabase(DistributedPlan *plan);
extern bool ReadOnlyTask(TaskType taskType);
extern void ExtractParametersFromParamList(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues, bool

View File

@ -33,6 +33,66 @@ SELECT create_reference_table('ref');
(1 row)
SET citus.log_local_commands TO ON;
SET client_min_messages TO DEBUG;
-- if the placement policy is not round-robin, SELECTs on the reference
-- tables use local execution
SELECT count(*) FROM ref;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
LOG: executing the command locally: SELECT count(*) AS count FROM mx_add_coordinator.ref_7000000 ref
count
-------
0
(1 row)
SELECT count(*) FROM ref;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
LOG: executing the command locally: SELECT count(*) AS count FROM mx_add_coordinator.ref_7000000 ref
count
-------
0
(1 row)
-- for round-robin policy, always go to workers
SET citus.task_assignment_policy TO "round-robin";
SELECT count(*) FROM ref;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
-------
0
(1 row)
SELECT count(*) FROM ref;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
-------
0
(1 row)
SELECT count(*) FROM ref;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
-------
0
(1 row)
-- modifications always go through local shard as well as remote ones
INSERT INTO ref VALUES (1);
DEBUG: Creating router plan
DEBUG: Plan is router executable
LOG: executing the command locally: INSERT INTO mx_add_coordinator.ref_7000000 (a) VALUES (1)
-- get it ready for the next executions
TRUNCATE ref;
-- test that changes from a metadata node is reflected in the coordinator placement
\c - - - :worker_1_port
SET search_path TO mx_add_coordinator,public;

View File

@ -18,6 +18,26 @@ SELECT verify_metadata('localhost', :worker_1_port),
CREATE TABLE ref(a int);
SELECT create_reference_table('ref');
SET citus.log_local_commands TO ON;
SET client_min_messages TO DEBUG;
-- if the placement policy is not round-robin, SELECTs on the reference
-- tables use local execution
SELECT count(*) FROM ref;
SELECT count(*) FROM ref;
-- for round-robin policy, always go to workers
SET citus.task_assignment_policy TO "round-robin";
SELECT count(*) FROM ref;
SELECT count(*) FROM ref;
SELECT count(*) FROM ref;
-- modifications always go through local shard as well as remote ones
INSERT INTO ref VALUES (1);
-- get it ready for the next executions
TRUNCATE ref;
-- test that changes from a metadata node is reflected in the coordinator placement
\c - - - :worker_1_port
SET search_path TO mx_add_coordinator,public;