Rename FindRouterWorkerList to CreateTaskPlacementListForShardIntervals

pull/3887/head
Marco Slot 2020-06-12 10:15:56 +02:00 committed by Hadi Moshayedi
parent 24feadc230
commit d953f084db
3 changed files with 38 additions and 21 deletions

View File

@ -545,7 +545,8 @@ RegenerateTaskForFasthPathQuery(Job *workerJob)
bool hasLocalRelation = false; bool hasLocalRelation = false;
List *placementList = List *placementList =
FindRouterWorkerList(shardIntervalList, shardsPresent, true, hasLocalRelation); CreateTaskPlacementListForShardIntervals(shardIntervalList, shardsPresent, true,
hasLocalRelation);
uint64 shardId = INVALID_SHARD_ID; uint64 shardId = INVALID_SHARD_ID;
if (shardsPresent) if (shardsPresent)

View File

@ -2138,11 +2138,12 @@ PlanRouterQuery(Query *originalQuery,
bool hasLocalRelation = relationRestrictionContext->hasLocalRelation; bool hasLocalRelation = relationRestrictionContext->hasLocalRelation;
List *workerList = List *taskPlacementList =
FindRouterWorkerList(*prunedShardIntervalListList, shardsPresent, CreateTaskPlacementListForShardIntervals(*prunedShardIntervalListList,
replacePrunedQueryWithDummy, hasLocalRelation); shardsPresent,
replacePrunedQueryWithDummy,
if (workerList == NIL) hasLocalRelation);
if (taskPlacementList == NIL)
{ {
ereport(DEBUG2, (errmsg("Found no worker with all shard placements"))); ereport(DEBUG2, (errmsg("Found no worker with all shard placements")));
@ -2162,28 +2163,43 @@ PlanRouterQuery(Query *originalQuery,
} }
*multiShardModifyQuery = false; *multiShardModifyQuery = false;
*placementList = workerList; *placementList = taskPlacementList;
*anchorShardId = shardId; *anchorShardId = shardId;
return planningError; return planningError;
} }
/*
* CreateTaskPlacementListForShardIntervals returns a list of shard placements
* on which it can access all shards in shardIntervalListList, which contains
* a list of shards for each relation in the query.
*
* If the query contains a local table then hasLocalRelation should be set to
* true. In that case, CreateTaskPlacementListForShardIntervals only returns
* a placement for the local node or an empty list if the shards cannot be
* accessed locally.
*
* If generateDummyPlacement is true and there are no shards that need to be
* accessed to answer the query (shardsPresent is false), then as single
* placement is returned that is either local or follows a round-robin policy.
* A typical example is a router query that only reads an intermediate result.
* This will happen on the coordinator, unless the user wants to balance the
* load by setting the citus.task_assignment_policy.
*/
List * List *
FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, CreateTaskPlacementListForShardIntervals(List *shardIntervalListList, bool shardsPresent,
bool replacePrunedQueryWithDummy, bool hasLocalRelation) bool generateDummyPlacement,
bool hasLocalRelation)
{ {
List *placementList = NIL; List *placementList = NIL;
/*
* Determine the worker that has all shard placements if a shard placement found.
* If no shard placement exists and replacePrunedQueryWithDummy flag is set, we will
* still run the query but the result will be empty. We create a dummy shard
* placement for the first active worker.
*/
if (shardsPresent) if (shardsPresent)
{ {
List *workerList = WorkersContainingAllShards(shardIntervalList); /*
* Determine the workers that have all shard placements, if any.
*/
List *workerList = WorkersContainingAllShards(shardIntervalListList);
if (hasLocalRelation) if (hasLocalRelation)
{ {
@ -2207,7 +2223,7 @@ FindRouterWorkerList(List *shardIntervalList, bool shardsPresent,
placementList = workerList; placementList = workerList;
} }
} }
else if (replacePrunedQueryWithDummy) else if (generateDummyPlacement)
{ {
ShardPlacement *dummyPlacement = CreateDummyPlacement(hasLocalRelation); ShardPlacement *dummyPlacement = CreateDummyPlacement(hasLocalRelation);

View File

@ -45,8 +45,9 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,
Const **partitionValueConst); Const **partitionValueConst);
extern List * RelationShardListForShardIntervalList(List *shardIntervalList, extern List * RelationShardListForShardIntervalList(List *shardIntervalList,
bool *shardsPresent); bool *shardsPresent);
extern List * FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, extern List * CreateTaskPlacementListForShardIntervals(List *shardIntervalList,
bool replacePrunedQueryWithDummy, bool shardsPresent,
bool generateDummyPlacement,
bool hasLocalRelation); bool hasLocalRelation);
extern List * RouterInsertTaskList(Query *query, bool parametersInQueryResolved, extern List * RouterInsertTaskList(Query *query, bool parametersInQueryResolved,
DeferredErrorMessage **planningError); DeferredErrorMessage **planningError);
@ -75,7 +76,6 @@ extern bool IsMultiRowInsert(Query *query);
extern void AddShardIntervalRestrictionToSelect(Query *subqery, extern void AddShardIntervalRestrictionToSelect(Query *subqery,
ShardInterval *shardInterval); ShardInterval *shardInterval);
extern bool UpdateOrDeleteQuery(Query *query); extern bool UpdateOrDeleteQuery(Query *query);
extern List * WorkersContainingAllShards(List *prunedShardIntervalsList);
extern uint64 GetAnchorShardId(List *relationShardList); extern uint64 GetAnchorShardId(List *relationShardList);
extern List * TargetShardIntervalForFastPathQuery(Query *query, extern List * TargetShardIntervalForFastPathQuery(Query *query,