From d953f084dbc3f18832182b0934677d545b46c271 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 12 Jun 2020 10:15:56 +0200 Subject: [PATCH] Rename FindRouterWorkerList to CreateTaskPlacementListForShardIntervals --- .../distributed/executor/citus_custom_scan.c | 3 +- .../planner/multi_router_planner.c | 48 ++++++++++++------- .../distributed/multi_router_planner.h | 8 ++-- 3 files changed, 38 insertions(+), 21 deletions(-) diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index ba4c4836a..537f1a6bd 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -545,7 +545,8 @@ RegenerateTaskForFasthPathQuery(Job *workerJob) bool hasLocalRelation = false; List *placementList = - FindRouterWorkerList(shardIntervalList, shardsPresent, true, hasLocalRelation); + CreateTaskPlacementListForShardIntervals(shardIntervalList, shardsPresent, true, + hasLocalRelation); uint64 shardId = INVALID_SHARD_ID; if (shardsPresent) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index c1ef5df8e..636515b37 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -2138,11 +2138,12 @@ PlanRouterQuery(Query *originalQuery, bool hasLocalRelation = relationRestrictionContext->hasLocalRelation; - List *workerList = - FindRouterWorkerList(*prunedShardIntervalListList, shardsPresent, - replacePrunedQueryWithDummy, hasLocalRelation); - - if (workerList == NIL) + List *taskPlacementList = + CreateTaskPlacementListForShardIntervals(*prunedShardIntervalListList, + shardsPresent, + replacePrunedQueryWithDummy, + hasLocalRelation); + if (taskPlacementList == NIL) { ereport(DEBUG2, (errmsg("Found no worker with all shard placements"))); @@ -2162,28 +2163,43 @@ PlanRouterQuery(Query *originalQuery, } *multiShardModifyQuery = false; - *placementList = workerList; + *placementList = taskPlacementList; *anchorShardId = shardId; return planningError; } +/* + * CreateTaskPlacementListForShardIntervals returns a list of shard placements + * on which it can access all shards in shardIntervalListList, which contains + * a list of shards for each relation in the query. + * + * If the query contains a local table then hasLocalRelation should be set to + * true. In that case, CreateTaskPlacementListForShardIntervals only returns + * a placement for the local node or an empty list if the shards cannot be + * accessed locally. + * + * If generateDummyPlacement is true and there are no shards that need to be + * accessed to answer the query (shardsPresent is false), then as single + * placement is returned that is either local or follows a round-robin policy. + * A typical example is a router query that only reads an intermediate result. + * This will happen on the coordinator, unless the user wants to balance the + * load by setting the citus.task_assignment_policy. + */ List * -FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, - bool replacePrunedQueryWithDummy, bool hasLocalRelation) +CreateTaskPlacementListForShardIntervals(List *shardIntervalListList, bool shardsPresent, + bool generateDummyPlacement, + bool hasLocalRelation) { List *placementList = NIL; - /* - * Determine the worker that has all shard placements if a shard placement found. - * If no shard placement exists and replacePrunedQueryWithDummy flag is set, we will - * still run the query but the result will be empty. We create a dummy shard - * placement for the first active worker. - */ if (shardsPresent) { - List *workerList = WorkersContainingAllShards(shardIntervalList); + /* + * Determine the workers that have all shard placements, if any. + */ + List *workerList = WorkersContainingAllShards(shardIntervalListList); if (hasLocalRelation) { @@ -2207,7 +2223,7 @@ FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, placementList = workerList; } } - else if (replacePrunedQueryWithDummy) + else if (generateDummyPlacement) { ShardPlacement *dummyPlacement = CreateDummyPlacement(hasLocalRelation); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 9dc5d9978..50fdd7d90 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -45,9 +45,10 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, Const **partitionValueConst); extern List * RelationShardListForShardIntervalList(List *shardIntervalList, bool *shardsPresent); -extern List * FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, - bool replacePrunedQueryWithDummy, - bool hasLocalRelation); +extern List * CreateTaskPlacementListForShardIntervals(List *shardIntervalList, + bool shardsPresent, + bool generateDummyPlacement, + bool hasLocalRelation); extern List * RouterInsertTaskList(Query *query, bool parametersInQueryResolved, DeferredErrorMessage **planningError); extern Const * ExtractInsertPartitionKeyValue(Query *query); @@ -75,7 +76,6 @@ extern bool IsMultiRowInsert(Query *query); extern void AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval); extern bool UpdateOrDeleteQuery(Query *query); -extern List * WorkersContainingAllShards(List *prunedShardIntervalsList); extern uint64 GetAnchorShardId(List *relationShardList); extern List * TargetShardIntervalForFastPathQuery(Query *query,