From fc298ec095eafe6501cc17c3fd7a7186b1ea6bad Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 7 Oct 2016 15:26:26 -0700 Subject: [PATCH] Coordinated remote transaction management. --- .../connection/connection_management.c | 8 +- .../transaction/remote_transaction.c | 863 ++++++++++++++++++ .../transaction/transaction_management.c | 112 +++ .../distributed/connection_management.h | 7 + src/include/distributed/remote_transaction.h | 109 +++ .../distributed/transaction_management.h | 24 +- 6 files changed, 1120 insertions(+), 3 deletions(-) create mode 100644 src/backend/distributed/transaction/remote_transaction.c create mode 100644 src/include/distributed/remote_transaction.h diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index b023fcbec..6e9714101 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -329,9 +329,12 @@ CloseConnection(MultiConnection *connection) if (found) { - /* unlink from list */ + /* unlink from list of open connections */ dlist_delete(&connection->connectionNode); + /* same for transaction state */ + CloseRemoteTransaction(connection); + /* we leave the per-host entry alive */ pfree(connection); } @@ -632,6 +635,9 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) } else { + /* reset per-transaction state */ + ResetRemoteTransaction(connection); + UnclaimConnection(connection); } } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c new file mode 100644 index 000000000..e9f7d9af6 --- /dev/null +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -0,0 +1,863 @@ +/*------------------------------------------------------------------------- + * + * remote_transaction.c + * Management of transaction spanning more than one node. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "libpq-fe.h" + +#include "miscadmin.h" + +#include "access/xact.h" +#include "distributed/connection_management.h" +#include "distributed/transaction_management.h" +#include "distributed/remote_commands.h" +#include "distributed/remote_transaction.h" +#include "utils/hsearch.h" + + +static void CheckTransactionHealth(void); +static void Assign2PCIdentifier(MultiConnection *connection); +static void WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool commit); + + +/* + * StartRemoteTransactionBeging initiates beginning the remote transaction in + * a non-blocking manner. + */ +void +StartRemoteTransactionBegin(struct MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + + Assert(transaction->transactionState == REMOTE_TRANS_INVALID); + + /* remember transaction as being in-progress */ + dlist_push_tail(&InProgressTransactions, &connection->transactionNode); + + transaction->transactionState = REMOTE_TRANS_STARTING; + + /* + * Explicitly specify READ COMMITTED, the default on the remote + * side might have been changed, and that would cause problematic + * behaviour. + */ + if (!SendRemoteCommand(connection, + "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED")) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, true); + } +} + + +/* + * FinishRemoteTransactionBegin finishes the work StartRemoteTransactionBegin + * initiated. It blocks if necessary (i.e. if PQisBusy() would return true). + */ +void +FinishRemoteTransactionBegin(struct MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + PGresult *result = NULL; + const bool raiseErrors = true; + + Assert(transaction->transactionState == REMOTE_TRANS_STARTING); + + result = GetRemoteCommandResult(connection, raiseErrors); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, raiseErrors); + } + else + { + transaction->transactionState = REMOTE_TRANS_STARTED; + } + + PQclear(result); + ForgetResults(connection); + + if (!transaction->transactionFailed) + { + Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS); + } +} + + +/* + * RemoteTransactionBegin begins a remote transaction in a blocking manner. + */ +void +RemoteTransactionBegin(struct MultiConnection *connection) +{ + StartRemoteTransactionBegin(connection); + FinishRemoteTransactionBegin(connection); +} + + +/* + * StartRemoteTransactionCommit initiates transaction commit in a non-blocking + * manner. If the transaction is in a failed state, it'll instead get rolled + * back. + */ +void +StartRemoteTransactionCommit(MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + const bool dontRaiseError = false; + const bool isCommit = true; + + /* can only commit if transaction is in progress */ + Assert(transaction->transactionState != REMOTE_TRANS_INVALID); + + /* can't commit if we already started to commit or abort */ + Assert(transaction->transactionState < REMOTE_TRANS_1PC_ABORTING); + + if (transaction->transactionFailed) + { + /* abort the transaction if it failed */ + transaction->transactionState = REMOTE_TRANS_1PC_ABORTING; + + /* + * Try sending an ROLLBACK; Depending on the state that won't + * succeed, but let's try. Have to clear previous results + * first. + */ + ForgetResults(connection); /* try to clear pending stuff */ + if (!SendRemoteCommand(connection, "ROLLBACK")) + { + /* no point in reporting a likely redundant message */ + } + } + else if (transaction->transactionState == REMOTE_TRANS_PREPARED) + { + /* commit the prepared transaction */ + StringInfoData command; + + initStringInfo(&command); + appendStringInfo(&command, "COMMIT PREPARED '%s'", + transaction->preparedName); + + transaction->transactionState = REMOTE_TRANS_2PC_COMMITTING; + + if (!SendRemoteCommand(connection, command.data)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, dontRaiseError); + + WarnAboutLeakedPreparedTransaction(connection, isCommit); + } + } + else + { + /* initiate remote transaction commit */ + transaction->transactionState = REMOTE_TRANS_1PC_COMMITTING; + + if (!SendRemoteCommand(connection, "COMMIT")) + { + /* + * For a moment there I thought we were in trouble. + * + * Failing in this state means that we don't know whether the the + * commit has succeeded. + */ + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, dontRaiseError); + } + } +} + + +/* + * FinishRemoteTransactionCommit finishes the work + * StartRemoteTransactionCommit initiated. It blocks if necessary (i.e. if + * PQisBusy() would return true). + */ +void +FinishRemoteTransactionCommit(MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + PGresult *result = NULL; + const bool dontRaiseErrors = false; + const bool isCommit = true; + + Assert(transaction->transactionState == REMOTE_TRANS_1PC_ABORTING || + transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING || + transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING); + + result = GetRemoteCommandResult(connection, dontRaiseErrors); + + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, dontRaiseErrors); + + /* + * Failing in this state means that we will often not know whether + * the the commit has succeeded (particularly in case of network + * troubles). + * + * XXX: It might be worthwhile to discern cases where we got a + * proper error back from postgres (i.e. COMMIT was received but + * produced an error) from cases where the connection failed + * before getting a reply. + */ + + if (transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING) + { + if (transaction->transactionCritical) + { + ereport(WARNING, (errmsg("failed to commit critical transaction " + "on %s:%d, metadata is likely out of sync", + connection->hostname, connection->port))); + } + else + { + ereport(WARNING, (errmsg("failed to commit transaction on %s:%d", + connection->hostname, connection->port))); + } + } + else if (transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING) + { + ereport(WARNING, (errmsg("failed to commit transaction on %s:%d", + connection->hostname, connection->port))); + WarnAboutLeakedPreparedTransaction(connection, isCommit); + } + } + else if (transaction->transactionState == REMOTE_TRANS_1PC_ABORTING || + transaction->transactionState == REMOTE_TRANS_2PC_ABORTING) + { + transaction->transactionState = REMOTE_TRANS_ABORTED; + } + else + { + transaction->transactionState = REMOTE_TRANS_COMMITTED; + } + + PQclear(result); + + ForgetResults(connection); +} + + +/* + * RemoteTransactionCommit commits (or aborts, if the transaction failed) a + * remote transaction in a blocking manner. + */ +void +RemoteTransactionCommit(MultiConnection *connection) +{ + StartRemoteTransactionCommit(connection); + FinishRemoteTransactionCommit(connection); +} + + +/* + * StartRemoteTransactionAbort initiates abortin the transaction in a + * non-blocking manner. + */ +void +StartRemoteTransactionAbort(MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + const bool dontRaiseErrors = false; + const bool isNotCommit = false; + + Assert(transaction->transactionState != REMOTE_TRANS_INVALID); + + /* + * Clear previous results, so we have a better chance to send + * ROLLBACK [PREPARED]; + */ + ForgetResults(connection); + + if (transaction->transactionState == REMOTE_TRANS_PREPARING || + transaction->transactionState == REMOTE_TRANS_PREPARED) + { + StringInfoData command; + + initStringInfo(&command); + appendStringInfo(&command, "ROLLBACK PREPARED '%s'", + transaction->preparedName); + + if (!SendRemoteCommand(connection, command.data)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, dontRaiseErrors); + + WarnAboutLeakedPreparedTransaction(connection, isNotCommit); + } + else + { + transaction->transactionState = REMOTE_TRANS_2PC_ABORTING; + } + } + else + { + if (!SendRemoteCommand(connection, "ROLLBACK")) + { + /* no point in reporting a likely redundant message */ + MarkRemoteTransactionFailed(connection, dontRaiseErrors); + } + else + { + transaction->transactionState = REMOTE_TRANS_1PC_ABORTING; + } + } +} + + +/* + * FinishRemoteTransactionAbort finishes the work StartRemoteTransactionAbort + * initiated. It blocks if necessary (i.e. if PQisBusy() would return true). + */ +void +FinishRemoteTransactionAbort(MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + PGresult *result = NULL; + const bool dontRaiseErrors = false; + const bool isNotCommit = false; + + result = GetRemoteCommandResult(connection, dontRaiseErrors); + + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, dontRaiseErrors); + + if (transaction->transactionState == REMOTE_TRANS_1PC_ABORTING) + { + ereport(WARNING, + (errmsg("failed to abort 2PC transaction \"%s\" on %s:%d", + transaction->preparedName, connection->hostname, + connection->port))); + } + else + { + WarnAboutLeakedPreparedTransaction(connection, isNotCommit); + } + } + + PQclear(result); + + result = GetRemoteCommandResult(connection, dontRaiseErrors); + Assert(!result); + + transaction->transactionState = REMOTE_TRANS_ABORTED; +} + + +/* + * RemoteTransactionAbort aborts a remote transaction in a blocking manner. + */ +void +RemoteTransactionAbort(MultiConnection *connection) +{ + StartRemoteTransactionAbort(connection); + FinishRemoteTransactionAbort(connection); +} + + +/* + * StartRemoteTransactionPrepare initiates preparing the transaction in a + * non-blocking manner. + */ +void +StartRemoteTransactionPrepare(struct MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + StringInfoData command; + const bool raiseErrors = true; + + /* can't prepare a nonexistant transaction */ + Assert(transaction->transactionState != REMOTE_TRANS_INVALID); + + /* can't prepare in a failed transaction */ + Assert(!transaction->transactionFailed); + + /* can't prepare if already started to prepare/abort/commit */ + Assert(transaction->transactionState < REMOTE_TRANS_PREPARING); + + Assign2PCIdentifier(connection); + + initStringInfo(&command); + appendStringInfo(&command, "PREPARE TRANSACTION '%s'", + transaction->preparedName); + + if (!SendRemoteCommand(connection, command.data)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, raiseErrors); + } + else + { + transaction->transactionState = REMOTE_TRANS_PREPARING; + } +} + + +/* + * FinishRemoteTransactionPrepare finishes the work + * StartRemoteTransactionPrepare initiated. It blocks if necessary (i.e. if + * PQisBusy() would return true). + */ +void +FinishRemoteTransactionPrepare(struct MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + PGresult *result = NULL; + const bool raiseErrors = true; + + Assert(transaction->transactionState == REMOTE_TRANS_PREPARING); + + result = GetRemoteCommandResult(connection, raiseErrors); + + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + transaction->transactionState = REMOTE_TRANS_ABORTED; + MarkRemoteTransactionFailed(connection, raiseErrors); + } + else + { + transaction->transactionState = REMOTE_TRANS_PREPARED; + } + + result = GetRemoteCommandResult(connection, raiseErrors); + Assert(!result); +} + + +/* + * RemoteTransactionPrepare prepares a remote transaction in a blocking + * manner. + */ +void +RemoteTransactionPrepare(struct MultiConnection *connection) +{ + StartRemoteTransactionPrepare(connection); + FinishRemoteTransactionPrepare(connection); +} + + +/* + * RemoteTransactionBeginIfNecessary is a convenience wrapper around + * RemoteTransactionsBeginIfNecessary(), for a single connection. + */ +void +RemoteTransactionBeginIfNecessary(MultiConnection *connection) +{ + /* just delegate */ + if (InCoordinatedTransaction()) + { + List *connectionList = list_make1(connection); + + RemoteTransactionsBeginIfNecessary(connectionList); + list_free(connectionList); + } +} + + +/* + * RemoteTransactionsBeginIfNecessary begins, if necessary according to this + * session's coordinated transaction state, and the remote transaction's + * state, an explicit transaction on all the connections. This is done in + * parallel, to lessen latency penalties. + */ +void +RemoteTransactionsBeginIfNecessary(List *connectionList) +{ + ListCell *connectionCell = NULL; + + /* + * Don't do anything if not in a coordinated transaction. That allows the + * same code to work both in situations that uses transactions, and when + * not. + */ + if (!InCoordinatedTransaction()) + { + return; + } + + /* issue BEGIN to all connections needing it */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + RemoteTransaction *transaction = &connection->remoteTransaction; + + /* can't send BEGIN if a command already is in progress */ + Assert(PQtransactionStatus(connection->pgConn) != PQTRANS_ACTIVE); + + /* + * If a transaction already is in progress (including having failed), + * don't start it again. Thats quite normal if a piece of code allows + * cached connections. + */ + if (transaction->transactionState != REMOTE_TRANS_INVALID) + { + continue; + } + + StartRemoteTransactionBegin(connection); + } + + /* XXX: Should perform network IO for all connections in a non-blocking manner */ + + /* get result of all the BEGINs */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + RemoteTransaction *transaction = &connection->remoteTransaction; + + /* + * Only handle BEGIN results on connections that are in process of + * starting a transaction, and haven't already failed (e.g. by not + * being able to send BEGIN due to a network failure). + */ + if (transaction->transactionFailed || + transaction->transactionState != REMOTE_TRANS_STARTING) + { + continue; + } + + FinishRemoteTransactionBegin(connection); + } +} + + +/* + * MarkRemoteTransactionFailed records a transaction as having failed. + * + * If the connection is marked as critical, and allowErrorPromotion is true, + * this routine will ERROR out. The allowErrorPromotion case is primarily + * required for the transaction management code itself. Usually it is helpful + * to fail as soon as possible. If !allowErrorPromotion transaction commit + * will instead issue an error before committing on any node. + */ +void +MarkRemoteTransactionFailed(MultiConnection *connection, bool allowErrorPromotion) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + + transaction->transactionFailed = true; + + /* + * If the connection is marked as critical, fail the entire coordinated + * transaction. If allowed. + */ + if (transaction->transactionCritical && allowErrorPromotion) + { + ereport(ERROR, (errmsg("failure on connection marked as essential: %s:%d", + connection->hostname, connection->port))); + } +} + + +/* + * MarkRemoteTransactionCritical signals that failures on this remote + * transaction should fail the entire coordinated transaction. + */ +void +MarkRemoteTransactionCritical(struct MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + + transaction->transactionCritical = true; +} + + +/* + * CloseRemoteTransaction handles closing a connection that, potentially, is + * part of a coordinated transaction. This should only ever be called from + * connection_management.c, while closing a connection during a transaction. + */ +void +CloseRemoteTransaction(struct MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + + /* unlink from list of open transactions, if necessary */ + if (transaction->transactionState != REMOTE_TRANS_INVALID) + { + /* XXX: Should we error out for a critical transaction? */ + + dlist_delete(&connection->transactionNode); + } +} + + +/* + * ResetRemoteTransaction resets the state of the transaction after the end of + * the main transaction, if the connection is being reused. + */ +void +ResetRemoteTransaction(struct MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + + /* just reset the entire state, relying on 0 being invalid/false */ + memset(transaction, 0, sizeof(*transaction)); +} + + +/* + * CoordinatedRemoteTransactionsPrepare PREPAREs a 2PC transaction on all + * non-failed transactions participating in the coordinated transaction. + */ +void +CoordinatedRemoteTransactionsPrepare(void) +{ + dlist_iter iter; + + /* issue PREPARE TRANSACTION; to all relevant remote nodes */ + + /* asynchronously send PREPARE */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + + Assert(transaction->transactionState != REMOTE_TRANS_INVALID); + + /* can't PREPARE a transaction that failed */ + if (transaction->transactionFailed) + { + continue; + } + + StartRemoteTransactionPrepare(connection); + } + + /* XXX: Should perform network IO for all connections in a non-blocking manner */ + + /* Wait for result */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + + if (transaction->transactionState != REMOTE_TRANS_PREPARING) + { + continue; + } + + FinishRemoteTransactionPrepare(connection); + } + + CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED; +} + + +/* + * CoordinatedRemoteTransactionsCommit performs distributed transactions + * handling at commit time. This will be called at XACT_EVENT_PRE_COMMIT if + * 1PC commits are used - so shards can still be invalidated - and at + * XACT_EVENT_COMMIT if 2PC is being used. + * + * Note that this routine has to issue rollbacks for failed transactions. + */ +void +CoordinatedRemoteTransactionsCommit(void) +{ + dlist_iter iter; + + /* + * Before starting to commit on any of the nodes - after which we can't + * completely roll-back anymore - check that things are in a good state. + */ + CheckTransactionHealth(); + + /* + * Issue appropriate transaction commands to remote nodes. If everything + * went well that's going to be COMMIT or COMMIT PREPARED, if individual + * connections had errors, some or all of them might require a ROLLBACK. + * + * First send the command asynchronously over all connections. + */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + + if (transaction->transactionState == REMOTE_TRANS_INVALID || + transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING || + transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING || + transaction->transactionState == REMOTE_TRANS_COMMITTED) + { + continue; + } + + StartRemoteTransactionCommit(connection); + } + + /* XXX: Should perform network IO for all connections in a non-blocking manner */ + + /* wait for the replies to the commands to come in */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + + /* nothing to do if not committing / aborting */ + if (transaction->transactionState != REMOTE_TRANS_1PC_COMMITTING && + transaction->transactionState != REMOTE_TRANS_2PC_COMMITTING && + transaction->transactionState != REMOTE_TRANS_1PC_ABORTING && + transaction->transactionState != REMOTE_TRANS_2PC_ABORTING) + { + continue; + } + + FinishRemoteTransactionCommit(connection); + } +} + + +/* + * CoordinatedRemoteTransactionsAbort performs distributed transactions + * handling at abort time. + * + * This issues ROLLBACKS and ROLLBACK PREPARED depending on whether the remote + * transaction has been prepared or not. + */ +void +CoordinatedRemoteTransactionsAbort(void) +{ + dlist_iter iter; + + /* asynchronously send ROLLBACK [PREPARED] */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + + if (transaction->transactionState == REMOTE_TRANS_INVALID || + transaction->transactionState == REMOTE_TRANS_1PC_ABORTING || + transaction->transactionState == REMOTE_TRANS_2PC_ABORTING || + transaction->transactionState == REMOTE_TRANS_ABORTED) + { + continue; + } + + StartRemoteTransactionAbort(connection); + } + + /* XXX: Should perform network IO for all connections in a non-blocking manner */ + + /* and wait for the results */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + + if (transaction->transactionState != REMOTE_TRANS_1PC_ABORTING && + transaction->transactionState != REMOTE_TRANS_2PC_ABORTING) + { + continue; + } + + FinishRemoteTransactionAbort(connection); + } +} + + +/* + * CheckTransactionHealth checks if any of the participating transactions in a + * coordinated transaction failed, and what consequence that should have. + * This needs to be called before the coordinated transaction commits (but + * after they've been PREPAREd if 2PC is in use). + */ +static void +CheckTransactionHealth(void) +{ + dlist_iter iter; + + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + PGTransactionStatusType status = PQtransactionStatus(connection->pgConn); + + /* if the connection is in a bad state, so is the transaction's state */ + if (status == PQTRANS_INERROR || status == PQTRANS_UNKNOWN) + { + transaction->transactionFailed = true; + } + + /* + * If a critical connection is marked as failed (and no error has been + * raised yet) do so now. + */ + if (transaction->transactionFailed && transaction->transactionCritical) + { + ereport(ERROR, (errmsg("failure on connection marked as essential: %s:%d", + connection->hostname, connection->port))); + } + } +} + + +/* + * Assign2PCIdentifier compute the 2PC transaction name to use for a + * transaction. + * + * Every 2PC transaction should get a new name, i.e. this function will need + * to be called again. + * + * NB: we rely on the fact that we don't need to do full escaping on the names + * generated here. + */ +static void +Assign2PCIdentifier(MultiConnection *connection) +{ + static uint64 sequence = 0; + snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN, + "citus_%d_"UINT64_FORMAT, + MyProcPid, sequence++); +} + + +/* + * WarnAboutLeakedPreparedTransaction issues a WARNING explaining that a + * prepared transaction could not be committed or rolled back, and explains + * how to perform cleanup. + */ +static void +WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool commit) +{ + StringInfoData command; + RemoteTransaction *transaction = &connection->remoteTransaction; + + initStringInfo(&command); + + if (commit) + { + appendStringInfo(&command, "COMMIT PREPARED '%s'", + transaction->preparedName); + } + else + { + appendStringInfo(&command, "ROLLBACK PREPARED '%s'", + transaction->preparedName); + } + + /* log a warning so the user may abort the transaction later */ + ereport(WARNING, (errmsg("failed to roll back prepared transaction '%s'", + transaction->preparedName), + errhint("Run \"%s\" on %s:%u", + command.data, connection->hostname, connection->port))); +} diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 20d6227f4..a14d4b98f 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -34,9 +34,18 @@ int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC; /* state needed to keep track of operations used during a transaction */ XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; +/* list of connections that are part of the current coordinated transaction */ +dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions); + static bool subXactAbortAttempted = false; +/* + * Should this coordinated transaction use 2PC? Set by + * CoordinatedTransactionUse2PC(), e.g. if DDL was issued and + * MultiShardCommitProtocol was set to 2PC. + */ +static bool CurrentTransactionUse2PC = false; /* transaction management functions */ static void CoordinatedTransactionCallback(XactEvent event, void *arg); @@ -47,6 +56,64 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction static void AdjustMaxPreparedTransactions(void); +/* + * BeginCoordinatedTransaction begins a coordinated transaction. No + * pre-existing coordinated transaction may be in progress. + */ +void +BeginCoordinatedTransaction(void) +{ + if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE && + CurrentCoordinatedTransactionState != COORD_TRANS_IDLE) + { + ereport(ERROR, (errmsg("starting transaction in wrong state"))); + } + + CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; +} + + +/* + * BeginOrContinueCoordinatedTransaction starts a coordinated transaction, + * unless one already is in progress. + */ +void +BeginOrContinueCoordinatedTransaction(void) +{ + if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED) + { + return; + } + + BeginCoordinatedTransaction(); +} + + +/* + * InCoordinatedTransaction returns whether a coordinated transaction has been + * started. + */ +bool +InCoordinatedTransaction(void) +{ + return CurrentCoordinatedTransactionState != COORD_TRANS_NONE && + CurrentCoordinatedTransactionState != COORD_TRANS_IDLE; +} + + +/* + * CoordinatedTransactionUse2PC() signals that the current coordinated + * transaction should use 2PC to commit. + */ +void +CoordinatedTransactionUse2PC(void) +{ + Assert(InCoordinatedTransaction()); + + CurrentTransactionUse2PC = true; +} + + void InitializeTransactionManagement(void) { @@ -73,6 +140,12 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) { case XACT_EVENT_COMMIT: { + if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) + { + /* handles both already prepared and open transactions */ + CoordinatedRemoteTransactionsCommit(); + } + /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { @@ -82,11 +155,25 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) Assert(!subXactAbortAttempted); CurrentCoordinatedTransactionState = COORD_TRANS_NONE; XactModificationLevel = XACT_MODIFICATION_NONE; + dlist_init(&InProgressTransactions); + CurrentTransactionUse2PC = false; } break; case XACT_EVENT_ABORT: { + /* + * FIXME: Add warning for the COORD_TRANS_COMMITTED case. That + * can be reached if this backend fails after the + * XACT_EVENT_PRE_COMMIT state. + */ + + /* handles both already prepared and open transactions */ + if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) + { + CoordinatedRemoteTransactionsAbort(); + } + /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { @@ -95,6 +182,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) CurrentCoordinatedTransactionState = COORD_TRANS_NONE; XactModificationLevel = XACT_MODIFICATION_NONE; + dlist_init(&InProgressTransactions); + CurrentTransactionUse2PC = false; subXactAbortAttempted = false; } break; @@ -118,6 +207,29 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) "which modify distributed tables"))); } } + + /* nothing further to do if there's no managed remote xacts */ + if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE) + { + break; + } + + + if (CurrentTransactionUse2PC) + { + CoordinatedRemoteTransactionsPrepare(); + CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED; + } + else + { + /* + * Have to commit remote transactions in PRE_COMMIT, to allow + * us to mark failed placements as invalid. Better don't use + * this for anything important (i.e. DDL/metadata). + */ + CoordinatedRemoteTransactionsCommit(); + CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED; + } } break; diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index be88c79a3..ddc0b26e1 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -12,6 +12,7 @@ #define CONNECTION_MANAGMENT_H #include "distributed/transaction_management.h" +#include "distributed/remote_transaction.h" #include "lib/ilist.h" #include "utils/hsearch.h" #include "utils/timestamp.h" @@ -63,6 +64,12 @@ typedef struct MultiConnection /* membership in list of list of connections in ConnectionHashEntry */ dlist_node connectionNode; + + /* information about the associated remote transaction */ + RemoteTransaction remoteTransaction; + + /* membership in list of in-progress transactions */ + dlist_node transactionNode; } MultiConnection; diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h new file mode 100644 index 000000000..e0ffb5dcd --- /dev/null +++ b/src/include/distributed/remote_transaction.h @@ -0,0 +1,109 @@ +/*------------------------------------------------------------------------- + * remote_transaction.h + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + + +#ifndef REMOTE_TRANSACTION_H +#define REMOTE_TRANSACTION_H + + +#include "nodes/pg_list.h" +#include "lib/ilist.h" + + +/* forward declare, to avoid recursive includes */ +struct MultiConnection; + +/* + * Enum that defines different remote transaction states, of a single remote + * transaction. + */ +typedef enum +{ + /* no transaction active */ + REMOTE_TRANS_INVALID = 0, + + /* transaction start */ + REMOTE_TRANS_STARTING, + REMOTE_TRANS_STARTED, + + /* 2pc prepare */ + REMOTE_TRANS_PREPARING, + REMOTE_TRANS_PREPARED, + + /* transaction abort */ + REMOTE_TRANS_1PC_ABORTING, + REMOTE_TRANS_2PC_ABORTING, + REMOTE_TRANS_ABORTED, + + /* transaction commit */ + REMOTE_TRANS_1PC_COMMITTING, + REMOTE_TRANS_2PC_COMMITTING, + REMOTE_TRANS_COMMITTED +} RemoteTransactionState; + + +/* + * Transaction state associated associated with a single MultiConnection. + */ +typedef struct RemoteTransaction +{ + /* what state is the remote side transaction in */ + RemoteTransactionState transactionState; + + /* failures on this connection should abort entire coordinated transaction */ + bool transactionCritical; + + /* failed in current transaction */ + bool transactionFailed; + + /* 2PC transaction name currently associated with connection */ + char preparedName[NAMEDATALEN]; +} RemoteTransaction; + + +/* change an individual remote transaction's state */ +extern void StartRemoteTransactionBegin(struct MultiConnection *connection); +extern void FinishRemoteTransactionBegin(struct MultiConnection *connection); +extern void RemoteTransactionBegin(struct MultiConnection *connection); + +extern void StartRemoteTransactionPrepare(struct MultiConnection *connection); +extern void FinishRemoteTransactionPrepare(struct MultiConnection *connection); +extern void RemoteTransactionPrepare(struct MultiConnection *connection); + +extern void StartRemoteTransactionCommit(struct MultiConnection *connection); +extern void FinishRemoteTransactionCommit(struct MultiConnection *connection); +extern void RemoteTransactionCommit(struct MultiConnection *connection); + +extern void StartRemoteTransactionAbort(struct MultiConnection *connection); +extern void FinishRemoteTransactionAbort(struct MultiConnection *connection); +extern void RemoteTransactionAbort(struct MultiConnection *connection); + +/* start transaction if necessary */ +extern void RemoteTransactionBeginIfNecessary(struct MultiConnection *connection); +extern void RemoteTransactionsBeginIfNecessary(List *connectionList); + +/* other public functionality */ +extern void MarkRemoteTransactionFailed(struct MultiConnection *connection, + bool allowErrorPromotion); +extern void MarkRemoteTransactionCritical(struct MultiConnection *connection); + + +/* + * The following functions should all only be called by connection / + * transaction managment code. + */ + +extern void CloseRemoteTransaction(struct MultiConnection *connection); +extern void ResetRemoteTransaction(struct MultiConnection *connection); + +/* perform handling for all in-progress transactions */ +extern void CoordinatedRemoteTransactionsPrepare(void); +extern void CoordinatedRemoteTransactionsCommit(void); +extern void CoordinatedRemoteTransactionsAbort(void); + +#endif /* REMOTE_TRANSACTION_H */ diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index 91a5cd181..b2eda9af6 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -9,6 +9,8 @@ #ifndef TRANSACTION_MANAGMENT_H #define TRANSACTION_MANAGMENT_H +#include "lib/ilist.h" + /* describes what kind of modifications have occurred in the current transaction */ typedef enum { @@ -29,7 +31,16 @@ typedef enum CoordinatedTransactionState COORD_TRANS_NONE, /* no coordinated transaction in progress, but connections established */ - COORD_TRANS_IDLE + COORD_TRANS_IDLE, + + /* coordinated transaction in progress */ + COORD_TRANS_STARTED, + + /* coordinated transaction prepared on all workers */ + COORD_TRANS_PREPARED, + + /* coordinated transaction committed */ + COORD_TRANS_COMMITTED } CoordinatedTransactionState; @@ -46,9 +57,18 @@ extern int MultiShardCommitProtocol; /* state needed to prevent new connections during modifying transactions */ extern XactModificationType XactModificationLevel; - extern CoordinatedTransactionState CurrentCoordinatedTransactionState; +/* list of connections that are part of the current coordinated transaction */ +extern dlist_head InProgressTransactions; + +/* + * Coordinated transaction management. + */ +extern void BeginCoordinatedTransaction(void); +extern void BeginOrContinueCoordinatedTransaction(void); +extern bool InCoordinatedTransaction(void); +extern void CoordinatedTransactionUse2PC(void); /* initialization function(s) */ extern void InitializeTransactionManagement(void);