From c9e569bf3dbf0084ee10d62297bf1820a6ff23d0 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 19 Dec 2016 15:28:19 +0200 Subject: [PATCH] Convert router executor to use new connection API This commit slightly changes router executor to use new connection API for getting the connections. --- .../executor/multi_router_executor.c | 74 ++++++++++++------- 1 file changed, 49 insertions(+), 25 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index ebb99c26c..ca95c9199 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -106,8 +106,9 @@ static bool RequiresConsistentSnapshot(Task *task); static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, DestReceiver *destination, Tuplestorestate *tupleStore); -static PGconn * GetConnectionForPlacement(ShardPlacement *placement, - bool isModificationQuery); +static void ManageShardIdParticipantForTransactionBlock(uint64 shardId, + MultiConnection *connection, + bool isModificationQuery); static void PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement); static void RemoveXactConnection(PGconn *connection); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, @@ -765,15 +766,24 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, bool queryOK = false; bool failOnError = false; int64 currentAffectedTupleCount = 0; - PGconn *connection = GetConnectionForPlacement(taskPlacement, - isModificationQuery); + PGconn *connection = GetOrEstablishConnection(taskPlacement->nodeName, + taskPlacement->nodePort); + MultiConnection *multiConnection = NULL; if (connection == NULL) { failedPlacementList = lappend(failedPlacementList, taskPlacement); + continue; } + multiConnection = GetConnectionFromPGconn(connection); + + ManageShardIdParticipantForTransactionBlock(task->anchorShardId, multiConnection, + isModificationQuery); + + connection = multiConnection->pgConn; + queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { @@ -1156,34 +1166,50 @@ ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, /* - * GetConnectionForPlacement is the main entry point for acquiring a connection - * within the router executor. By using placements (rather than node names and - * ports) to identify connections, the router executor can keep track of shards - * used by multi-statement transactions and error out if a transaction tries - * to reach a new node altogether). In the single-statement commands context, - * GetConnectionForPlacement simply falls through to GetOrEstablishConnection. + * ManageShardIdParticipantForTransactionBlock for the given shardId and connection + * does the following: + * + * i) if not in a transaction block, simply do nothing and return + * ii) if the query is not modification and no modifications has happened + * so far, do nothing + * iii) if the node has already participated in a modification and the current + * query is also a modification query, record the shard id participant + * iv) if the query is attempting to expand the set of affected worker nodes + * within the transaction block, error out */ -static PGconn * -GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery) +static void +ManageShardIdParticipantForTransactionBlock(uint64 shardId, MultiConnection *connection, + bool isModificationQuery) { NodeConnectionKey participantKey; NodeConnectionEntry *participantEntry = NULL; bool entryFound = false; - /* if not in a transaction, fall through to connection cache */ - if (xactParticipantHash == NULL) + /* we only care about sessions that are in a transaction */ + if (!IsTransactionBlock()) { - PGconn *connection = GetOrEstablishConnection(placement->nodeName, - placement->nodePort); - - return connection; + return; } - Assert(IsTransactionBlock()); + /* if a SELECT query executed before any modifications do not need to proceed */ + if (xactParticipantHash == NULL && !isModificationQuery) + { + return; + } + else if (xactParticipantHash == NULL && isModificationQuery) + { + /* + * We should never get here given that modification queries + * within a transaction should have already created the hash. + * But still an error message is better than an assertion. + */ + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("no transaction participant found"))); + } MemSet(&participantKey, 0, sizeof(participantKey)); - strlcpy(participantKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); - participantKey.nodePort = placement->nodePort; + strlcpy(participantKey.nodeName, connection->hostname, MAX_NODE_LENGTH + 1); + participantKey.nodePort = connection->port; participantEntry = hash_search(xactParticipantHash, &participantKey, HASH_FIND, &entryFound); @@ -1192,16 +1218,14 @@ GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery) { if (isModificationQuery) { - RecordShardIdParticipant(placement->shardId, participantEntry); + RecordShardIdParticipant(shardId, participantEntry); } - - return participantEntry->connection->pgConn; } else { ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), errmsg("no transaction participant matches %s:%d", - placement->nodeName, placement->nodePort), + connection->hostname, connection->port), errdetail("Transactions which modify distributed tables may only " "target nodes affected by the modification command " "which began the transaction.")));