From 80b34a5d6b3075c0c528e990eea1056d74b42074 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 8 Dec 2016 19:29:48 -0800 Subject: [PATCH] Integrate router executor into transaction management framework. One less place managing remote transactions. It also makes it fairly easy to use 2PC for certain modifications (e.g. reference tables). Just issue a CoordinatedTransactionUse2PC(). If every placement failure should cause the whole transaction to abort, additionally mark the relevant transactions as critical. --- .../executor/multi_router_executor.c | 230 ++++-------------- src/backend/distributed/shared_library_init.c | 3 - .../transaction/transaction_management.c | 11 + src/include/distributed/connection_cache.h | 2 +- .../distributed/multi_router_executor.h | 3 +- .../expected/multi_modifying_xacts.out | 4 + 6 files changed, 71 insertions(+), 182 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index f2294fca0..ebb99c26c 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -41,6 +41,7 @@ #include "distributed/multi_shard_transaction.h" #include "distributed/relay_utility.h" #include "distributed/remote_commands.h" +#include "distributed/remote_transaction.h" #include "distributed/resource_lock.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -80,13 +81,9 @@ bool AllModificationsCommutative = false; * of XactShardConnSets, which map a shard identifier to a set of connection * hash entries. This list is walked by MarkRemainingInactivePlacements to * ensure we mark placements as failed if they reject a COMMIT. - * - * Beyond that, there's a backend hook to register xact callbacks and a flag to - * track when a user tries to roll back to a savepoint (not allowed). */ static HTAB *xactParticipantHash = NULL; static List *xactShardConnSetList = NIL; -static bool subXactAbortAttempted = false; /* functions needed during start phase */ static void InitTransactionStateForTask(Task *task); @@ -124,11 +121,7 @@ static bool ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows static void RecordShardIdParticipant(uint64 affectedShardId, NodeConnectionEntry *participantEntry); -/* functions needed by callbacks and hooks */ -static void RouterTransactionCallback(XactEvent event, void *arg); -static void RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId, - SubTransactionId parentSubid, void *arg); -static void ExecuteTransactionEnd(bool commit); +/* to verify the health of shards after a transactional modification command */ static void MarkRemainingInactivePlacements(void); @@ -249,6 +242,8 @@ InitTransactionStateForTask(Task *task) { ListCell *placementCell = NULL; + BeginCoordinatedTransaction(); + xactParticipantHash = CreateXactParticipantHash(); foreach(placementCell, task->taskPlacementList) @@ -257,8 +252,9 @@ InitTransactionStateForTask(Task *task) NodeConnectionKey participantKey; NodeConnectionEntry *participantEntry = NULL; bool entryFound = false; - - PGconn *connection = NULL; + int connectionFlags = SESSION_LIFESPAN; + MultiConnection *connection = + GetNodeConnection(connectionFlags, placement->nodeName, placement->nodePort); MemSet(&participantKey, 0, sizeof(participantKey)); strlcpy(participantKey.nodeName, placement->nodeName, @@ -269,21 +265,8 @@ InitTransactionStateForTask(Task *task) HASH_ENTER, &entryFound); Assert(!entryFound); - connection = GetOrEstablishConnection(placement->nodeName, - placement->nodePort); - if (connection != NULL) - { - PGresult *result = PQexec(connection, "BEGIN"); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - WarnRemoteError(connection, result); - CloseConnectionByPGconn(connection); - - connection = NULL; - } - - PQclear(result); - } + /* issue BEGIN if necessary */ + RemoteTransactionBeginIfNecessary(connection); participantEntry->connection = connection; } @@ -1212,7 +1195,7 @@ GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery) RecordShardIdParticipant(placement->shardId, participantEntry); } - return participantEntry->connection; + return participantEntry->connection->pgConn; } else { @@ -1721,173 +1704,52 @@ RouterExecutorEnd(QueryDesc *queryDesc) /* - * RegisterRouterExecutorXactCallbacks registers this executor's callbacks. + * RouterExecutorPreCommitCheck() gets called after remote transactions have + * committed, so it can invalidate failed shards and perform related checks. */ void -RegisterRouterExecutorXactCallbacks(void) +RouterExecutorPreCommitCheck(void) { - RegisterXactCallback(RouterTransactionCallback, NULL); - RegisterSubXactCallback(RouterSubtransactionCallback, NULL); -} - - -/* - * RouterTransactionCallback handles committing or aborting remote transactions - * after the local one has committed or aborted. It only sends COMMIT or ABORT - * commands to still-healthy remotes; the failed ones are marked as inactive if - * after a successful COMMIT (no need to mark on ABORTs). - */ -static void -RouterTransactionCallback(XactEvent event, void *arg) -{ - if (XactModificationLevel != XACT_MODIFICATION_DATA) - { - return; - } - - switch (event) - { - case XACT_EVENT_PARALLEL_COMMIT: - case XACT_EVENT_COMMIT: - { - break; - } - - case XACT_EVENT_PARALLEL_ABORT: - case XACT_EVENT_ABORT: - { - bool commit = false; - - ExecuteTransactionEnd(commit); - - break; - } - - /* no support for prepare with multi-statement transactions */ - case XACT_EVENT_PREPARE: - case XACT_EVENT_PRE_PREPARE: - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot prepare a transaction that modified " - "distributed tables"))); - - break; - } - - case XACT_EVENT_PARALLEL_PRE_COMMIT: - case XACT_EVENT_PRE_COMMIT: - { - bool commit = true; - - if (subXactAbortAttempted) - { - subXactAbortAttempted = false; - - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot ROLLBACK TO SAVEPOINT in transactions " - "which modify distributed tables"))); - } - - ExecuteTransactionEnd(commit); - MarkRemainingInactivePlacements(); - - /* leave early to avoid resetting transaction state */ - return; - } - } - - /* reset transaction state */ - xactParticipantHash = NULL; - xactShardConnSetList = NIL; - subXactAbortAttempted = false; -} - - -/* - * RouterSubtransactionCallback silently keeps track of any attempt to ROLLBACK - * TO SAVEPOINT, which is not permitted by this executor. At transaction end, - * the executor checks whether such a rollback was attempted and, if so, errors - * out entirely (with an appropriate message). - * - * This implementation permits savepoints so long as no rollbacks occur. - */ -static void -RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId, - SubTransactionId parentSubid, void *arg) -{ - if ((xactParticipantHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB)) - { - subXactAbortAttempted = true; - } -} - - -/* - * ExecuteTransactionEnd ends any remote transactions still taking place on - * remote nodes. It uses xactParticipantHash to know which nodes need any - * final COMMIT or ABORT commands. Nodes which fail a final COMMIT will have - * their connection field set to NULL to permit placement invalidation. - */ -static void -ExecuteTransactionEnd(bool commit) -{ - const char *sqlCommand = commit ? "COMMIT TRANSACTION" : "ABORT TRANSACTION"; - HASH_SEQ_STATUS scan; - NodeConnectionEntry *participant; - bool completed = !commit; /* aborts are assumed completed */ - + /* no transactional router modification were issued, nothing to do */ if (xactParticipantHash == NULL) { return; } - hash_seq_init(&scan, xactParticipantHash); - while ((participant = (NodeConnectionEntry *) hash_seq_search(&scan))) - { - PGconn *connection = participant->connection; - PGresult *result = NULL; + MarkRemainingInactivePlacements(); +} - if (PQstatus(connection) != CONNECTION_OK) - { - continue; - } - result = PQexec(connection, sqlCommand); - if (PQresultStatus(result) == PGRES_COMMAND_OK) - { - completed = true; - } - else - { - WarnRemoteError(connection, result); - CloseConnectionByPGconn(participant->connection); - - participant->connection = NULL; - } - - PQclear(result); - } - - if (!completed) - { - ereport(ERROR, (errmsg("could not commit transaction on any active nodes"))); - } +/* + * Cleanup callback called after a transaction commits or aborts. + */ +void +RouterExecutorPostCommit(void) +{ + /* reset transaction state */ + xactParticipantHash = NULL; + xactShardConnSetList = NIL; } /* * MarkRemainingInactivePlacements takes care of marking placements of a shard * inactive after some of the placements rejected the final COMMIT phase of a - * transaction. This step is skipped if all placements reject the COMMIT, since - * in that case no modifications to the placement have persisted. + * transaction. * - * Failures are detected by checking the connection field of the entries in the - * connection set for each shard: it is always set to NULL after errors. + * Failures are detected by checking the connection & transaction state for + * each of the entries in the connection set for each shard. */ static void MarkRemainingInactivePlacements(void) { ListCell *shardConnSetCell = NULL; + int totalSuccesses = 0; + + if (xactParticipantHash == NULL) + { + return; + } foreach(shardConnSetCell, xactShardConnSetList) { @@ -1899,11 +1761,16 @@ MarkRemainingInactivePlacements(void) /* determine how many actual successes there were: subtract failures */ foreach(participantCell, participantList) { - NodeConnectionEntry *participant = NULL; - participant = (NodeConnectionEntry *) lfirst(participantCell); + NodeConnectionEntry *participant = + (NodeConnectionEntry *) lfirst(participantCell); + MultiConnection *connection = participant->connection; - /* other codes sets connection to NULL after errors */ - if (participant->connection == NULL) + /* + * Fail if the connection has been set to NULL after an error, or + * if the transaction failed for other reasons (e.g. COMMIT + * failed). + */ + if (connection == NULL || connection->remoteTransaction.transactionFailed) { successes--; } @@ -1921,7 +1788,8 @@ MarkRemainingInactivePlacements(void) NodeConnectionEntry *participant = NULL; participant = (NodeConnectionEntry *) lfirst(participantCell); - if (participant->connection == NULL) + if (participant->connection == NULL || + participant->connection->remoteTransaction.transactionFailed) { uint64 shardId = shardConnSet->shardId; NodeConnectionKey *nodeKey = &participant->cacheKey; @@ -1934,5 +1802,13 @@ MarkRemainingInactivePlacements(void) nodeKey->nodeName, nodeKey->nodePort); } } + + totalSuccesses++; + } + + /* If no shards could be modified at all, error out. */ + if (totalSuccesses == 0) + { + ereport(ERROR, (errmsg("could not commit transaction on any active nodes"))); } } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 3f092d3d3..093bc6e89 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -160,9 +160,6 @@ _PG_init(void) InitializeTransactionManagement(); InitializeConnectionManagement(); - /* initialize transaction callbacks */ - RegisterRouterExecutorXactCallbacks(); - /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) { diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 0dadbd0c9..8f8cf8515 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,6 +21,7 @@ #include "access/xact.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" +#include "distributed/multi_router_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/transaction_management.h" #include "utils/hsearch.h" @@ -147,6 +148,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * callbacks still can perform work if needed. */ ResetShardPlacementTransactionState(); + RouterExecutorPostCommit(); if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) { @@ -182,6 +184,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * callbacks still can perform work if needed. */ ResetShardPlacementTransactionState(); + RouterExecutorPostCommit(); /* handles both already prepared and open transactions */ if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) @@ -245,6 +248,14 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) CoordinatedRemoteTransactionsCommit(); CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED; } + + /* + * Call other parts of citus that need to integrate into + * transaction management. Call them *after* committing/preparing + * the remote transactions, to allow marking shards as invalid + * (e.g. if the remote commit failed). + */ + RouterExecutorPreCommitCheck(); } break; diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 363042a94..9e24c8570 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -35,7 +35,7 @@ typedef struct NodeConnectionKey typedef struct NodeConnectionEntry { NodeConnectionKey cacheKey; /* hash entry key */ - PGconn *connection; /* connection to remote server, if any */ + MultiConnection *connection; /* connection to remote server, if any */ } NodeConnectionEntry; diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 48f7f6499..f584abdc8 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -37,7 +37,8 @@ extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); -extern void RegisterRouterExecutorXactCallbacks(void); +extern void RouterExecutorPreCommitCheck(void); +extern void RouterExecutorPostCommit(void); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index dbad2d1e4..44e683db3 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -442,6 +442,7 @@ INSERT INTO objects VALUES (2, 'BAD'); INSERT INTO labs VALUES (9, 'Umbrella Corporation'); COMMIT; WARNING: illegal value +WARNING: failed to commit transaction on localhost:57638 -- data should be persisted SELECT * FROM objects WHERE id = 2; id | name @@ -490,7 +491,9 @@ INSERT INTO labs VALUES (8, 'Aperture Science'); INSERT INTO labs VALUES (9, 'BAD'); COMMIT; WARNING: illegal value +WARNING: failed to commit transaction on localhost:57637 WARNING: illegal value +WARNING: failed to commit transaction on localhost:57638 ERROR: could not commit transaction on any active nodes -- data should NOT be persisted SELECT * FROM objects WHERE id = 1; @@ -526,6 +529,7 @@ INSERT INTO labs VALUES (8, 'Aperture Science'); INSERT INTO labs VALUES (9, 'BAD'); COMMIT; WARNING: illegal value +WARNING: failed to commit transaction on localhost:57637 \set VERBOSITY default -- data to objects should be persisted, but labs should not... SELECT * FROM objects WHERE id = 1;