mirror of https://github.com/citusdata/citus.git
Convert router executor to use new connection API
This commit slightly changes router executor to use new connection API for getting the connections.pull/1053/head
parent
4ea4bfbf45
commit
c9e569bf3d
|
@ -106,8 +106,9 @@ static bool RequiresConsistentSnapshot(Task *task);
|
|||
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
|
||||
DestReceiver *destination,
|
||||
Tuplestorestate *tupleStore);
|
||||
static PGconn * GetConnectionForPlacement(ShardPlacement *placement,
|
||||
bool isModificationQuery);
|
||||
static void ManageShardIdParticipantForTransactionBlock(uint64 shardId,
|
||||
MultiConnection *connection,
|
||||
bool isModificationQuery);
|
||||
static void PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement);
|
||||
static void RemoveXactConnection(PGconn *connection);
|
||||
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
|
||||
|
@ -765,15 +766,24 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
|
|||
bool queryOK = false;
|
||||
bool failOnError = false;
|
||||
int64 currentAffectedTupleCount = 0;
|
||||
PGconn *connection = GetConnectionForPlacement(taskPlacement,
|
||||
isModificationQuery);
|
||||
PGconn *connection = GetOrEstablishConnection(taskPlacement->nodeName,
|
||||
taskPlacement->nodePort);
|
||||
MultiConnection *multiConnection = NULL;
|
||||
|
||||
if (connection == NULL)
|
||||
{
|
||||
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
multiConnection = GetConnectionFromPGconn(connection);
|
||||
|
||||
ManageShardIdParticipantForTransactionBlock(task->anchorShardId, multiConnection,
|
||||
isModificationQuery);
|
||||
|
||||
connection = multiConnection->pgConn;
|
||||
|
||||
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
|
||||
if (!queryOK)
|
||||
{
|
||||
|
@ -1156,34 +1166,50 @@ ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
|
|||
|
||||
|
||||
/*
|
||||
* GetConnectionForPlacement is the main entry point for acquiring a connection
|
||||
* within the router executor. By using placements (rather than node names and
|
||||
* ports) to identify connections, the router executor can keep track of shards
|
||||
* used by multi-statement transactions and error out if a transaction tries
|
||||
* to reach a new node altogether). In the single-statement commands context,
|
||||
* GetConnectionForPlacement simply falls through to GetOrEstablishConnection.
|
||||
* ManageShardIdParticipantForTransactionBlock for the given shardId and connection
|
||||
* does the following:
|
||||
*
|
||||
* i) if not in a transaction block, simply do nothing and return
|
||||
* ii) if the query is not modification and no modifications has happened
|
||||
* so far, do nothing
|
||||
* iii) if the node has already participated in a modification and the current
|
||||
* query is also a modification query, record the shard id participant
|
||||
* iv) if the query is attempting to expand the set of affected worker nodes
|
||||
* within the transaction block, error out
|
||||
*/
|
||||
static PGconn *
|
||||
GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery)
|
||||
static void
|
||||
ManageShardIdParticipantForTransactionBlock(uint64 shardId, MultiConnection *connection,
|
||||
bool isModificationQuery)
|
||||
{
|
||||
NodeConnectionKey participantKey;
|
||||
NodeConnectionEntry *participantEntry = NULL;
|
||||
bool entryFound = false;
|
||||
|
||||
/* if not in a transaction, fall through to connection cache */
|
||||
if (xactParticipantHash == NULL)
|
||||
/* we only care about sessions that are in a transaction */
|
||||
if (!IsTransactionBlock())
|
||||
{
|
||||
PGconn *connection = GetOrEstablishConnection(placement->nodeName,
|
||||
placement->nodePort);
|
||||
|
||||
return connection;
|
||||
return;
|
||||
}
|
||||
|
||||
Assert(IsTransactionBlock());
|
||||
/* if a SELECT query executed before any modifications do not need to proceed */
|
||||
if (xactParticipantHash == NULL && !isModificationQuery)
|
||||
{
|
||||
return;
|
||||
}
|
||||
else if (xactParticipantHash == NULL && isModificationQuery)
|
||||
{
|
||||
/*
|
||||
* We should never get here given that modification queries
|
||||
* within a transaction should have already created the hash.
|
||||
* But still an error message is better than an assertion.
|
||||
*/
|
||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("no transaction participant found")));
|
||||
}
|
||||
|
||||
MemSet(&participantKey, 0, sizeof(participantKey));
|
||||
strlcpy(participantKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
|
||||
participantKey.nodePort = placement->nodePort;
|
||||
strlcpy(participantKey.nodeName, connection->hostname, MAX_NODE_LENGTH + 1);
|
||||
participantKey.nodePort = connection->port;
|
||||
|
||||
participantEntry = hash_search(xactParticipantHash, &participantKey, HASH_FIND,
|
||||
&entryFound);
|
||||
|
@ -1192,16 +1218,14 @@ GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery)
|
|||
{
|
||||
if (isModificationQuery)
|
||||
{
|
||||
RecordShardIdParticipant(placement->shardId, participantEntry);
|
||||
RecordShardIdParticipant(shardId, participantEntry);
|
||||
}
|
||||
|
||||
return participantEntry->connection->pgConn;
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
|
||||
errmsg("no transaction participant matches %s:%d",
|
||||
placement->nodeName, placement->nodePort),
|
||||
connection->hostname, connection->port),
|
||||
errdetail("Transactions which modify distributed tables may only "
|
||||
"target nodes affected by the modification command "
|
||||
"which began the transaction.")));
|
||||
|
|
Loading…
Reference in New Issue