diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 39a3bdfca..d993b660e 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -950,7 +950,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn bool shardConnectionsFound = false; ShardConnections *shardConnections = NULL; List *connectionList = NIL; - TransactionConnection *transactionConnection = NULL; + MultiConnection *multiConnection = NULL; PGconn *connection = NULL; bool queryOK = false; @@ -963,9 +963,9 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn continue; } - transactionConnection = - (TransactionConnection *) list_nth(connectionList, placementIndex); - connection = transactionConnection->connection; + multiConnection = + (MultiConnection *) list_nth(connectionList, placementIndex); + connection = multiConnection->pgConn; queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) @@ -982,7 +982,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn bool shardConnectionsFound = false; ShardConnections *shardConnections = NULL; List *connectionList = NIL; - TransactionConnection *transactionConnection = NULL; + MultiConnection *multiConnection = NULL; PGconn *connection = NULL; int64 currentAffectedTupleCount = 0; bool failOnError = true; @@ -1001,9 +1001,9 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn continue; } - transactionConnection = - (TransactionConnection *) list_nth(connectionList, placementIndex); - connection = transactionConnection->connection; + multiConnection = + (MultiConnection *) list_nth(connectionList, placementIndex); + connection = multiConnection->pgConn; /* * If caller is interested, store query results the first time diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index f5ebd1351..e743d6405 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -43,18 +43,95 @@ void OpenTransactionsToAllShardPlacements(List *shardIntervalList, char *userName) { ListCell *shardIntervalCell = NULL; + List *newConnectionList = NIL; + ListCell *connectionCell = NULL; if (shardConnectionHash == NULL) { shardConnectionHash = CreateShardConnectionHash(TopTransactionContext); } + BeginOrContinueCoordinatedTransaction(); + if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) + { + CoordinatedTransactionUse2PC(); + } + + /* open connections to shards which don't have connections yet */ foreach(shardIntervalCell, shardIntervalList) { ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); uint64 shardId = shardInterval->shardId; + ShardConnections *shardConnections = NULL; + bool shardConnectionsFound = false; + List *shardPlacementList = NIL; + ListCell *placementCell = NULL; - BeginTransactionOnShardPlacements(shardId, userName); + shardConnections = GetShardConnections(shardId, &shardConnectionsFound); + if (shardConnectionsFound) + { + continue; + } + + shardPlacementList = FinalizedShardPlacementList(shardId); + if (shardPlacementList == NIL) + { + /* going to have to have some placements to do any work */ + ereport(ERROR, (errmsg("could not find any shard placements for the shard " + UINT64_FORMAT, shardId))); + } + + foreach(placementCell, shardPlacementList) + { + ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell); + MultiConnection *connection = NULL; + MemoryContext oldContext = NULL; + + WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName, + shardPlacement->nodePort); + if (workerNode == NULL) + { + ereport(ERROR, (errmsg("could not find worker node %s:%d", + shardPlacement->nodeName, + shardPlacement->nodePort))); + } + + connection = StartNodeUserDatabaseConnection(FORCE_NEW_CONNECTION, + shardPlacement->nodeName, + shardPlacement->nodePort, + userName, + NULL); + + /* we need to preserve the connection list for the next statement */ + oldContext = MemoryContextSwitchTo(TopTransactionContext); + + shardConnections->connectionList = lappend(shardConnections->connectionList, + connection); + + MemoryContextSwitchTo(oldContext); + + newConnectionList = lappend(newConnectionList, connection); + + /* + * Every individual failure should cause entire distributed + * transaction to fail. + */ + MarkRemoteTransactionCritical(connection); + } + } + + /* finish connection establishment newly opened connections */ + foreach(connectionCell, newConnectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + + FinishConnectionEstablishment(connection); + } + + /* the special BARE mode (for e.g. VACUUM/ANALYZE) skips BEGIN */ + if (MultiShardCommitProtocol > COMMIT_PROTOCOL_BARE) + { + RemoteTransactionsBeginIfNecessary(newConnectionList); } } @@ -85,107 +162,6 @@ CreateShardConnectionHash(MemoryContext memoryContext) } -/* - * BeginTransactionOnShardPlacements opens new connections (if necessary) to - * all placements of a shard (specified by shard identifier). After sending a - * BEGIN command on all connections, they are added to shardConnectionHash for - * use within this transaction. Exits early if connections already exist for - * the specified shard, and errors if no placements can be found, a connection - * cannot be made, or if the BEGIN command fails. - */ -void -BeginTransactionOnShardPlacements(uint64 shardId, char *userName) -{ - List *shardPlacementList = NIL; - ListCell *placementCell = NULL; - - ShardConnections *shardConnections = NULL; - bool shardConnectionsFound = false; - - MemoryContext oldContext = NULL; - shardPlacementList = FinalizedShardPlacementList(shardId); - - if (shardPlacementList == NIL) - { - /* going to have to have some placements to do any work */ - ereport(ERROR, (errmsg("could not find any shard placements for the shard " - UINT64_FORMAT, shardId))); - } - - BeginOrContinueCoordinatedTransaction(); - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) - { - CoordinatedTransactionUse2PC(); - } - - /* get existing connections to the shard placements, if any */ - shardConnections = GetShardConnections(shardId, &shardConnectionsFound); - if (shardConnectionsFound) - { - /* exit early if we've already established shard transactions */ - return; - } - - foreach(placementCell, shardPlacementList) - { - ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell); - MultiConnection *connection = NULL; - TransactionConnection *transactionConnection = NULL; - WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName, - shardPlacement->nodePort); - int connectionFlags = FORCE_NEW_CONNECTION; - - if (workerNode == NULL) - { - ereport(ERROR, (errmsg("could not find worker node %s:%d", - shardPlacement->nodeName, shardPlacement->nodePort))); - } - - /* XXX: It'd be nicer to establish connections asynchronously here */ - connection = GetNodeUserDatabaseConnection(connectionFlags, - shardPlacement->nodeName, - shardPlacement->nodePort, - userName, - NULL); - if (PQstatus(connection->pgConn) != CONNECTION_OK) - { - ereport(ERROR, (errmsg("could not establish a connection to all " - "placements of shard %lu", shardId))); - } - - /* entries must last through the whole top-level transaction */ - oldContext = MemoryContextSwitchTo(TopTransactionContext); - - transactionConnection = palloc0(sizeof(TransactionConnection)); - - transactionConnection->groupId = workerNode->groupId; - transactionConnection->connectionId = shardConnections->shardId; - transactionConnection->transactionState = TRANSACTION_STATE_OPEN; - transactionConnection->connection = connection->pgConn; - transactionConnection->nodeName = shardPlacement->nodeName; - transactionConnection->nodePort = shardPlacement->nodePort; - - shardConnections->connectionList = lappend(shardConnections->connectionList, - transactionConnection); - - MemoryContextSwitchTo(oldContext); - - /* - * Every individual failure should cause entire distributed - * transaction to fail. - */ - MarkRemoteTransactionCritical(connection); - - /* the special BARE mode (for e.g. VACUUM/ANALYZE) skips BEGIN */ - if (MultiShardCommitProtocol > COMMIT_PROTOCOL_BARE) - { - /* issue BEGIN */ - RemoteTransactionBegin(connection); - } - } -} - - /* * GetShardConnections finds existing connections for a shard in the global * connection hash. If not found, then a ShardConnections structure with empty diff --git a/src/include/distributed/multi_shard_transaction.h b/src/include/distributed/multi_shard_transaction.h index 95dd29d32..b79e0e4f9 100644 --- a/src/include/distributed/multi_shard_transaction.h +++ b/src/include/distributed/multi_shard_transaction.h @@ -21,13 +21,18 @@ typedef struct ShardConnections { int64 shardId; + + /* + * XXX: this list contains MultiConnection for multi-shard transactions + * or TransactionConnection for COPY, the latter should be converted to + * use MultiConnection as well. + */ List *connectionList; } ShardConnections; extern void OpenTransactionsToAllShardPlacements(List *shardIdList, char *relationOwner); extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext); -extern void BeginTransactionOnShardPlacements(uint64 shardId, char *nodeUser); extern ShardConnections * GetShardConnections(int64 shardId, bool *shardConnectionsFound); extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId, bool *connectionsFound);