diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index ed7e1a2f5..6a4beb1c8 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -75,6 +75,8 @@ bool EnableDeadlockPrevention = true; /* functions needed during run phase */ static void ReacquireMetadataLocks(List *taskList); static void AssignInsertTaskShardId(Query *jobQuery, List *taskList); +static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, + ShardPlacementAccessType accessType); static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); @@ -601,6 +603,7 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) List *taskPlacementList = task->taskPlacementList; ListCell *taskPlacementCell = NULL; char *queryString = task->queryString; + List *relationShardList = task->relationShardList; if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) { @@ -620,8 +623,32 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) bool dontFailOnError = false; int64 currentAffectedTupleCount = 0; int connectionFlags = SESSION_LIFESPAN; - MultiConnection *connection = - GetPlacementConnection(connectionFlags, taskPlacement, NULL); + List *placementAccessList = NIL; + MultiConnection *connection = NULL; + + if (list_length(relationShardList) > 0) + { + placementAccessList = BuildPlacementSelectList(taskPlacement->nodeName, + taskPlacement->nodePort, + relationShardList); + } + else + { + /* + * When the SELECT prunes down to 0 shards, just use the dummy placement. + * + * FIXME: it would be preferable to evaluate the SELECT locally since no + * data from the workers is required. + */ + + ShardPlacementAccess *placementAccess = + CreatePlacementAccess(taskPlacement, PLACEMENT_ACCESS_SELECT); + + placementAccessList = list_make1(placementAccess); + } + + connection = GetPlacementListConnection(connectionFlags, placementAccessList, + NULL); queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) @@ -641,6 +668,56 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) } +/* + * BuildPlacementSelectList builds a list of SELECT placement accesses + * which can be used to call StartPlacementListConnection or + * GetPlacementListConnection. + */ +List * +BuildPlacementSelectList(char *nodeName, int nodePort, List *relationShardList) +{ + ListCell *relationShardCell = NULL; + List *placementAccessList = NIL; + + foreach(relationShardCell, relationShardList) + { + RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell); + ShardPlacement *placement = NULL; + ShardPlacementAccess *placementAccess = NULL; + + placement = FindShardPlacementOnNode(nodeName, nodePort, relationShard->shardId); + if (placement == NULL) + { + ereport(ERROR, (errmsg("no active placement of shard %ld found on node " + "%s:%d", + relationShard->shardId, nodeName, nodePort))); + } + + placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_SELECT); + placementAccessList = lappend(placementAccessList, placementAccess); + } + + return placementAccessList; +} + + +/* + * CreatePlacementAccess returns a new ShardPlacementAccess for the given placement + * and access type. + */ +static ShardPlacementAccess * +CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType) +{ + ShardPlacementAccess *placementAccess = NULL; + + placementAccess = (ShardPlacementAccess *) palloc0(sizeof(ShardPlacementAccess)); + placementAccess->placement = placement; + placementAccess->accessType = accessType; + + return placementAccess; +} + + /* * ExecuteSingleModifyTask executes the task on the remote node, retrieves the * results and stores them, if RETURNING is used, in a tuple store. diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 9dd62087e..a9428568c 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -356,6 +356,43 @@ LoadShardPlacement(uint64 shardId, uint64 placementId) } +/* + * FindShardPlacementOnNode returns the shard placement for the given shard + * on the given node, or returns NULL of no placement for the shard exists + * on the node. + */ +ShardPlacement * +FindShardPlacementOnNode(char *nodeName, int nodePort, uint64 shardId) +{ + ShardCacheEntry *shardEntry = NULL; + DistTableCacheEntry *tableEntry = NULL; + ShardPlacement *placementArray = NULL; + int numberOfPlacements = 0; + ShardPlacement *placementOnNode = NULL; + int placementIndex = 0; + + shardEntry = LookupShardCacheEntry(shardId); + tableEntry = shardEntry->tableEntry; + placementArray = tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex]; + numberOfPlacements = tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex]; + + for (placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++) + { + ShardPlacement *placement = &placementArray[placementIndex]; + + if (strncmp(nodeName, placement->nodeName, WORKER_LENGTH) == 0 && + nodePort == placement->nodePort) + { + placementOnNode = CitusMakeNode(ShardPlacement); + CopyShardPlacement(placement, placementOnNode); + break; + } + } + + return placementOnNode; +} + + /* * ShardPlacementList returns the list of placements for the given shard from * the cache. diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 48b03672e..ce3ac71f7 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -70,6 +70,8 @@ extern bool IsDistributedTable(Oid relationId); extern List * DistributedTableList(void); extern ShardInterval * LoadShardInterval(uint64 shardId); extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId); +extern ShardPlacement * FindShardPlacementOnNode(char *nodeName, int nodePort, + uint64 shardId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern int GetLocalGroupId(void); extern List * DistTableOidList(void); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 4593da4da..1f43851f3 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -186,7 +186,7 @@ typedef struct Task char replicationModel; /* only applies to modify tasks */ bool insertSelectQuery; - List *relationShardList; /* only applies INSERT/SELECT tasks */ + List *relationShardList; } Task; diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index b5db9ddf9..5aae0190d 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -43,5 +43,7 @@ extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); extern void ExecuteTasksSequentiallyWithoutResults(List *taskList); +extern List * BuildPlacementSelectList(char *nodeName, int nodePort, + List *relationShardList); #endif /* MULTI_ROUTER_EXECUTOR_H_ */