Merge pull request #3170 from citusdata/fix_round_robin

Do not include coordinator shards when round-robin is selected
pull/3188/head
Hadi Moshayedi 2019-11-15 06:11:41 -08:00 committed by GitHub
commit 4230b96247
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 129 additions and 2 deletions

View File

@ -558,7 +558,6 @@ static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution
static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
static bool DistributedExecutionRequiresRollback(DistributedExecution *execution);
static bool TaskListRequires2PC(List *taskList);
static bool ReadOnlyTask(TaskType taskType);
static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList);
static void AssignTasksToConnections(DistributedExecution *execution);
static void UnclaimAllSessionConnections(List *sessionList);
@ -1167,7 +1166,7 @@ TaskListRequires2PC(List *taskList)
* ReadOnlyTask returns true if the input task does a read-only operation
* on the database.
*/
static bool
bool
ReadOnlyTask(TaskType taskType)
{
if (taskType == ROUTER_TASK || taskType == SQL_TASK)

View File

@ -30,6 +30,7 @@
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_logical_optimizer.h"
@ -167,6 +168,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
TaskAssignmentPolicyType
taskAssignmentPolicy,
List *placementList);
static List * RemoveCoordinatorPlacement(List *placementList);
/*
@ -1779,6 +1781,20 @@ ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
Assert(list_length(job->taskList) == 1);
task = (Task *) linitial(job->taskList);
/*
* For round-robin SELECT queries, we don't want to include the coordinator
* because the user is trying to distributed the load across nodes via
* round-robin policy. Otherwise, the local execution would prioritize
* executing the local tasks and especially for reference tables on the
* coordinator this would prevent load balancing accross nodes.
*
* For other worker nodes in Citus MX, we let the local execution to kick-in
* even for round-robin policy, that's because we expect the clients to evenly
* connect to the worker nodes.
*/
Assert(ReadOnlyTask(task->taskType));
placementList = RemoveCoordinatorPlacement(placementList);
/* reorder the placement list */
reorderedPlacementList = RoundRobinReorder(task, placementList);
task->taskPlacementList = reorderedPlacementList;
@ -1791,6 +1807,37 @@ ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
}
/*
* RemoveCoordinatorPlacement gets a task placement list and returns the list
* by removing the placement belonging to the coordinator (if any).
*
* If the list has a single entry or no placements on the coordinator, the list
* is return unmodified.
*/
static List *
RemoveCoordinatorPlacement(List *placementList)
{
ListCell *placementCell = NULL;
if (list_length(placementList) < 2)
{
return placementList;
}
foreach(placementCell, placementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
if (placement->groupId == COORDINATOR_GROUP_ID)
{
return list_delete_ptr(placementList, placement);
}
}
return placementList;
}
/*
* SingleShardSelectTaskList generates a task for single shard select query
* and returns it as a list.

View File

@ -62,6 +62,7 @@ extern void SetLocalMultiShardModifyModeToSequential(void);
extern void SetLocalForceMaxQueryParallelization(void);
extern void SortTupleStore(CitusScanState *scanState);
extern bool DistributedPlanModifiesDatabase(DistributedPlan *plan);
extern bool ReadOnlyTask(TaskType taskType);
extern void ExtractParametersFromParamList(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues, bool

View File

@ -33,6 +33,66 @@ SELECT create_reference_table('ref');
(1 row)
SET citus.log_local_commands TO ON;
SET client_min_messages TO DEBUG;
-- if the placement policy is not round-robin, SELECTs on the reference
-- tables use local execution
SELECT count(*) FROM ref;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
LOG: executing the command locally: SELECT count(*) AS count FROM mx_add_coordinator.ref_7000000 ref
count
-------
0
(1 row)
SELECT count(*) FROM ref;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
LOG: executing the command locally: SELECT count(*) AS count FROM mx_add_coordinator.ref_7000000 ref
count
-------
0
(1 row)
-- for round-robin policy, always go to workers
SET citus.task_assignment_policy TO "round-robin";
SELECT count(*) FROM ref;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
-------
0
(1 row)
SELECT count(*) FROM ref;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
-------
0
(1 row)
SELECT count(*) FROM ref;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
-------
0
(1 row)
-- modifications always go through local shard as well as remote ones
INSERT INTO ref VALUES (1);
DEBUG: Creating router plan
DEBUG: Plan is router executable
LOG: executing the command locally: INSERT INTO mx_add_coordinator.ref_7000000 (a) VALUES (1)
-- get it ready for the next executions
TRUNCATE ref;
-- test that changes from a metadata node is reflected in the coordinator placement
\c - - - :worker_1_port
SET search_path TO mx_add_coordinator,public;

View File

@ -18,6 +18,26 @@ SELECT verify_metadata('localhost', :worker_1_port),
CREATE TABLE ref(a int);
SELECT create_reference_table('ref');
SET citus.log_local_commands TO ON;
SET client_min_messages TO DEBUG;
-- if the placement policy is not round-robin, SELECTs on the reference
-- tables use local execution
SELECT count(*) FROM ref;
SELECT count(*) FROM ref;
-- for round-robin policy, always go to workers
SET citus.task_assignment_policy TO "round-robin";
SELECT count(*) FROM ref;
SELECT count(*) FROM ref;
SELECT count(*) FROM ref;
-- modifications always go through local shard as well as remote ones
INSERT INTO ref VALUES (1);
-- get it ready for the next executions
TRUNCATE ref;
-- test that changes from a metadata node is reflected in the coordinator placement
\c - - - :worker_1_port
SET search_path TO mx_add_coordinator,public;