Use GetPlacementListConnection for router SELECTs

pull/1455/head
Marco Slot 2017-06-21 13:25:49 +02:00
parent 63676f5d65
commit 01c9b1f921
5 changed files with 121 additions and 3 deletions

View File

@ -75,6 +75,8 @@ bool EnableDeadlockPrevention = true;
/* functions needed during run phase */
static void ReacquireMetadataLocks(List *taskList);
static void AssignInsertTaskShardId(Query *jobQuery, List *taskList);
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType);
static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
bool expectResults);
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
@ -601,6 +603,7 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
List *taskPlacementList = task->taskPlacementList;
ListCell *taskPlacementCell = NULL;
char *queryString = task->queryString;
List *relationShardList = task->relationShardList;
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
{
@ -620,8 +623,32 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
bool dontFailOnError = false;
int64 currentAffectedTupleCount = 0;
int connectionFlags = SESSION_LIFESPAN;
MultiConnection *connection =
GetPlacementConnection(connectionFlags, taskPlacement, NULL);
List *placementAccessList = NIL;
MultiConnection *connection = NULL;
if (list_length(relationShardList) > 0)
{
placementAccessList = BuildPlacementSelectList(taskPlacement->nodeName,
taskPlacement->nodePort,
relationShardList);
}
else
{
/*
* When the SELECT prunes down to 0 shards, just use the dummy placement.
*
* FIXME: it would be preferable to evaluate the SELECT locally since no
* data from the workers is required.
*/
ShardPlacementAccess *placementAccess =
CreatePlacementAccess(taskPlacement, PLACEMENT_ACCESS_SELECT);
placementAccessList = list_make1(placementAccess);
}
connection = GetPlacementListConnection(connectionFlags, placementAccessList,
NULL);
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
if (!queryOK)
@ -641,6 +668,56 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
}
/*
* BuildPlacementSelectList builds a list of SELECT placement accesses
* which can be used to call StartPlacementListConnection or
* GetPlacementListConnection.
*/
List *
BuildPlacementSelectList(char *nodeName, int nodePort, List *relationShardList)
{
ListCell *relationShardCell = NULL;
List *placementAccessList = NIL;
foreach(relationShardCell, relationShardList)
{
RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell);
ShardPlacement *placement = NULL;
ShardPlacementAccess *placementAccess = NULL;
placement = FindShardPlacementOnNode(nodeName, nodePort, relationShard->shardId);
if (placement == NULL)
{
ereport(ERROR, (errmsg("no active placement of shard %ld found on node "
"%s:%d",
relationShard->shardId, nodeName, nodePort)));
}
placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_SELECT);
placementAccessList = lappend(placementAccessList, placementAccess);
}
return placementAccessList;
}
/*
* CreatePlacementAccess returns a new ShardPlacementAccess for the given placement
* and access type.
*/
static ShardPlacementAccess *
CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType)
{
ShardPlacementAccess *placementAccess = NULL;
placementAccess = (ShardPlacementAccess *) palloc0(sizeof(ShardPlacementAccess));
placementAccess->placement = placement;
placementAccess->accessType = accessType;
return placementAccess;
}
/*
* ExecuteSingleModifyTask executes the task on the remote node, retrieves the
* results and stores them, if RETURNING is used, in a tuple store.

View File

@ -356,6 +356,43 @@ LoadShardPlacement(uint64 shardId, uint64 placementId)
}
/*
* FindShardPlacementOnNode returns the shard placement for the given shard
* on the given node, or returns NULL of no placement for the shard exists
* on the node.
*/
ShardPlacement *
FindShardPlacementOnNode(char *nodeName, int nodePort, uint64 shardId)
{
ShardCacheEntry *shardEntry = NULL;
DistTableCacheEntry *tableEntry = NULL;
ShardPlacement *placementArray = NULL;
int numberOfPlacements = 0;
ShardPlacement *placementOnNode = NULL;
int placementIndex = 0;
shardEntry = LookupShardCacheEntry(shardId);
tableEntry = shardEntry->tableEntry;
placementArray = tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex];
numberOfPlacements = tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex];
for (placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++)
{
ShardPlacement *placement = &placementArray[placementIndex];
if (strncmp(nodeName, placement->nodeName, WORKER_LENGTH) == 0 &&
nodePort == placement->nodePort)
{
placementOnNode = CitusMakeNode(ShardPlacement);
CopyShardPlacement(placement, placementOnNode);
break;
}
}
return placementOnNode;
}
/*
* ShardPlacementList returns the list of placements for the given shard from
* the cache.

View File

@ -70,6 +70,8 @@ extern bool IsDistributedTable(Oid relationId);
extern List * DistributedTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId);
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
extern ShardPlacement * FindShardPlacementOnNode(char *nodeName, int nodePort,
uint64 shardId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern int GetLocalGroupId(void);
extern List * DistTableOidList(void);

View File

@ -186,7 +186,7 @@ typedef struct Task
char replicationModel; /* only applies to modify tasks */
bool insertSelectQuery;
List *relationShardList; /* only applies INSERT/SELECT tasks */
List *relationShardList;
} Task;

View File

@ -43,5 +43,7 @@ extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
extern void ExecuteTasksSequentiallyWithoutResults(List *taskList);
extern List * BuildPlacementSelectList(char *nodeName, int nodePort,
List *relationShardList);
#endif /* MULTI_ROUTER_EXECUTOR_H_ */