From 6eeb43af15274019a527acdd777fcd4979312e71 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 8 Dec 2016 18:29:42 -0800 Subject: [PATCH 1/4] Add PQgetResult() wrapper handling interrupts. This makes it possible to implement cancelling queries blocked on communication with remote nodes. --- .../distributed/connection/remote_commands.c | 193 +++++++++++++++++- src/include/distributed/remote_commands.h | 4 + 2 files changed, 193 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 22ba1c9f7..6c5658988 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -14,6 +14,8 @@ #include "distributed/connection_management.h" #include "distributed/remote_commands.h" +#include "miscadmin.h" +#include "storage/latch.h" /* GUC, determining whether statements sent to remote nodes are logged */ @@ -52,7 +54,9 @@ ForgetResults(MultiConnection *connection) while (true) { PGresult *result = NULL; - result = PQgetResult(connection->pgConn); + const bool dontRaiseErrors = false; + + result = GetRemoteCommandResult(connection, dontRaiseErrors); if (result == NULL) { break; @@ -184,12 +188,193 @@ LogRemoteCommand(MultiConnection *connection, const char *command) /* wrappers around libpq functions, with command logging support */ /* - * SendRemoteCommand is a tiny PQsendQuery wrapper that logs remote commands, - * and accepts a MultiConnection instead of a plain PGconn. + * SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and + * accepts a MultiConnection instead of a plain PGconn. It makes sure it can + * send commands asynchronously without blocking (at the potential expense of + * an additional memory allocation). */ int SendRemoteCommand(MultiConnection *connection, const char *command) { + PGconn *pgConn = connection->pgConn; + bool wasNonblocking = PQisnonblocking(pgConn); + int rc = 0; + LogRemoteCommand(connection, command); - return PQsendQuery(connection->pgConn, command); + + /* make sure not to block anywhere */ + if (!wasNonblocking) + { + PQsetnonblocking(pgConn, true); + } + + rc = PQsendQuery(pgConn, command); + + /* reset nonblocking connection to its original state */ + if (!wasNonblocking) + { + PQsetnonblocking(pgConn, false); + } + + return rc; +} + + +/* + * GetCommandResult is a wrapper around PQgetResult() that handles interrupts. + * + * If raiseInterrupts is true and an interrupt arrives, e.g. the query is + * being cancelled, CHECK_FOR_INTERRUPTS() will be called, which then throws + * an error. + * + * If raiseInterrupts is false and an interrupt arrives that'd otherwise raise + * an error, GetRemotecommandResult returns NULL, and the transaction is + * marked as having failed. While that's not a perfect way to signal failure, + * callers will usually treat that as an error, and it's easy to use. + * + * Handling of interrupts is important to allow queries being cancelled while + * waiting on remote nodes. In a distributed deadlock scenario cancelling + * might be the only way to resolve the deadlock. + */ +PGresult * +GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) +{ + PGconn *pgConn = connection->pgConn; + int socket = 0; + int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET; + bool wasNonblocking = false; + PGresult *result = NULL; + bool failed = false; + + /* short circuit tests around the more expensive parts of this routine */ + if (!PQisBusy(pgConn)) + { + return PQgetResult(connection->pgConn); + } + + socket = PQsocket(pgConn); + wasNonblocking = PQisnonblocking(pgConn); + + /* make sure not to block anywhere */ + if (!wasNonblocking) + { + PQsetnonblocking(pgConn, true); + } + + if (raiseInterrupts) + { + CHECK_FOR_INTERRUPTS(); + } + + /* make sure command has been sent out */ + while (!failed) + { + int rc = 0; + + ResetLatch(MyLatch); + + /* try to send all the data */ + rc = PQflush(pgConn); + + /* stop writing if all data has been sent, or there was none to send */ + if (rc == 0) + { + break; + } + + /* if sending failed, there's nothing more we can do */ + if (rc == -1) + { + failed = true; + break; + } + + /* this means we have to wait for data to go out */ + Assert(rc == 1); + + rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_WRITEABLE, socket, 0); + + if (rc & WL_POSTMASTER_DEATH) + { + ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + } + + if (rc & WL_LATCH_SET) + { + /* if allowed raise errors */ + if (raiseInterrupts) + { + CHECK_FOR_INTERRUPTS(); + } + + /* + * If raising errors allowed, or called within in a section with + * interrupts held, return NULL instead, and mark the transaction + * as failed. + */ + if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending)) + { + connection->remoteTransaction.transactionFailed = true; + failed = true; + break; + } + } + } + + /* wait for the result of the command to come in */ + while (!failed) + { + int rc = 0; + + ResetLatch(MyLatch); + + /* if reading fails, there's not much we can do */ + if (PQconsumeInput(pgConn) == 0) + { + failed = true; + break; + } + + /* check if all the necessary data is now available */ + if (!PQisBusy(pgConn)) + { + result = PQgetResult(connection->pgConn); + break; + } + + rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_READABLE, socket, 0); + + if (rc & WL_POSTMASTER_DEATH) + { + ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + } + + if (rc & WL_LATCH_SET) + { + /* if allowed raise errors */ + if (raiseInterrupts) + { + CHECK_FOR_INTERRUPTS(); + } + + /* + * If raising errors allowed, or called within in a section with + * interrupts held, return NULL instead, and mark the transaction + * as failed. + */ + if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending)) + { + connection->remoteTransaction.transactionFailed = true; + failed = true; + break; + } + } + } + + if (!wasNonblocking) + { + PQsetnonblocking(pgConn, false); + } + + return result; } diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 6ce25ccfd..66430ea64 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -14,6 +14,8 @@ #include "distributed/connection_management.h" +struct pg_result; /* target of the PGresult typedef */ + /* GUC, determining whether statements sent to remote nodes are logged */ extern bool LogRemoteCommands; @@ -31,6 +33,8 @@ extern void LogRemoteCommand(MultiConnection *connection, const char *command); /* wrappers around libpq functions, with command logging support */ extern int SendRemoteCommand(MultiConnection *connection, const char *command); +extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection, + bool raiseInterrupts); #endif /* REMOTE_COMMAND_H */ From fc298ec095eafe6501cc17c3fd7a7186b1ea6bad Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 7 Oct 2016 15:26:26 -0700 Subject: [PATCH 2/4] Coordinated remote transaction management. --- .../connection/connection_management.c | 8 +- .../transaction/remote_transaction.c | 863 ++++++++++++++++++ .../transaction/transaction_management.c | 112 +++ .../distributed/connection_management.h | 7 + src/include/distributed/remote_transaction.h | 109 +++ .../distributed/transaction_management.h | 24 +- 6 files changed, 1120 insertions(+), 3 deletions(-) create mode 100644 src/backend/distributed/transaction/remote_transaction.c create mode 100644 src/include/distributed/remote_transaction.h diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index b023fcbec..6e9714101 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -329,9 +329,12 @@ CloseConnection(MultiConnection *connection) if (found) { - /* unlink from list */ + /* unlink from list of open connections */ dlist_delete(&connection->connectionNode); + /* same for transaction state */ + CloseRemoteTransaction(connection); + /* we leave the per-host entry alive */ pfree(connection); } @@ -632,6 +635,9 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) } else { + /* reset per-transaction state */ + ResetRemoteTransaction(connection); + UnclaimConnection(connection); } } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c new file mode 100644 index 000000000..e9f7d9af6 --- /dev/null +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -0,0 +1,863 @@ +/*------------------------------------------------------------------------- + * + * remote_transaction.c + * Management of transaction spanning more than one node. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "libpq-fe.h" + +#include "miscadmin.h" + +#include "access/xact.h" +#include "distributed/connection_management.h" +#include "distributed/transaction_management.h" +#include "distributed/remote_commands.h" +#include "distributed/remote_transaction.h" +#include "utils/hsearch.h" + + +static void CheckTransactionHealth(void); +static void Assign2PCIdentifier(MultiConnection *connection); +static void WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool commit); + + +/* + * StartRemoteTransactionBeging initiates beginning the remote transaction in + * a non-blocking manner. + */ +void +StartRemoteTransactionBegin(struct MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + + Assert(transaction->transactionState == REMOTE_TRANS_INVALID); + + /* remember transaction as being in-progress */ + dlist_push_tail(&InProgressTransactions, &connection->transactionNode); + + transaction->transactionState = REMOTE_TRANS_STARTING; + + /* + * Explicitly specify READ COMMITTED, the default on the remote + * side might have been changed, and that would cause problematic + * behaviour. + */ + if (!SendRemoteCommand(connection, + "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED")) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, true); + } +} + + +/* + * FinishRemoteTransactionBegin finishes the work StartRemoteTransactionBegin + * initiated. It blocks if necessary (i.e. if PQisBusy() would return true). + */ +void +FinishRemoteTransactionBegin(struct MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + PGresult *result = NULL; + const bool raiseErrors = true; + + Assert(transaction->transactionState == REMOTE_TRANS_STARTING); + + result = GetRemoteCommandResult(connection, raiseErrors); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, raiseErrors); + } + else + { + transaction->transactionState = REMOTE_TRANS_STARTED; + } + + PQclear(result); + ForgetResults(connection); + + if (!transaction->transactionFailed) + { + Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS); + } +} + + +/* + * RemoteTransactionBegin begins a remote transaction in a blocking manner. + */ +void +RemoteTransactionBegin(struct MultiConnection *connection) +{ + StartRemoteTransactionBegin(connection); + FinishRemoteTransactionBegin(connection); +} + + +/* + * StartRemoteTransactionCommit initiates transaction commit in a non-blocking + * manner. If the transaction is in a failed state, it'll instead get rolled + * back. + */ +void +StartRemoteTransactionCommit(MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + const bool dontRaiseError = false; + const bool isCommit = true; + + /* can only commit if transaction is in progress */ + Assert(transaction->transactionState != REMOTE_TRANS_INVALID); + + /* can't commit if we already started to commit or abort */ + Assert(transaction->transactionState < REMOTE_TRANS_1PC_ABORTING); + + if (transaction->transactionFailed) + { + /* abort the transaction if it failed */ + transaction->transactionState = REMOTE_TRANS_1PC_ABORTING; + + /* + * Try sending an ROLLBACK; Depending on the state that won't + * succeed, but let's try. Have to clear previous results + * first. + */ + ForgetResults(connection); /* try to clear pending stuff */ + if (!SendRemoteCommand(connection, "ROLLBACK")) + { + /* no point in reporting a likely redundant message */ + } + } + else if (transaction->transactionState == REMOTE_TRANS_PREPARED) + { + /* commit the prepared transaction */ + StringInfoData command; + + initStringInfo(&command); + appendStringInfo(&command, "COMMIT PREPARED '%s'", + transaction->preparedName); + + transaction->transactionState = REMOTE_TRANS_2PC_COMMITTING; + + if (!SendRemoteCommand(connection, command.data)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, dontRaiseError); + + WarnAboutLeakedPreparedTransaction(connection, isCommit); + } + } + else + { + /* initiate remote transaction commit */ + transaction->transactionState = REMOTE_TRANS_1PC_COMMITTING; + + if (!SendRemoteCommand(connection, "COMMIT")) + { + /* + * For a moment there I thought we were in trouble. + * + * Failing in this state means that we don't know whether the the + * commit has succeeded. + */ + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, dontRaiseError); + } + } +} + + +/* + * FinishRemoteTransactionCommit finishes the work + * StartRemoteTransactionCommit initiated. It blocks if necessary (i.e. if + * PQisBusy() would return true). + */ +void +FinishRemoteTransactionCommit(MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + PGresult *result = NULL; + const bool dontRaiseErrors = false; + const bool isCommit = true; + + Assert(transaction->transactionState == REMOTE_TRANS_1PC_ABORTING || + transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING || + transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING); + + result = GetRemoteCommandResult(connection, dontRaiseErrors); + + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, dontRaiseErrors); + + /* + * Failing in this state means that we will often not know whether + * the the commit has succeeded (particularly in case of network + * troubles). + * + * XXX: It might be worthwhile to discern cases where we got a + * proper error back from postgres (i.e. COMMIT was received but + * produced an error) from cases where the connection failed + * before getting a reply. + */ + + if (transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING) + { + if (transaction->transactionCritical) + { + ereport(WARNING, (errmsg("failed to commit critical transaction " + "on %s:%d, metadata is likely out of sync", + connection->hostname, connection->port))); + } + else + { + ereport(WARNING, (errmsg("failed to commit transaction on %s:%d", + connection->hostname, connection->port))); + } + } + else if (transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING) + { + ereport(WARNING, (errmsg("failed to commit transaction on %s:%d", + connection->hostname, connection->port))); + WarnAboutLeakedPreparedTransaction(connection, isCommit); + } + } + else if (transaction->transactionState == REMOTE_TRANS_1PC_ABORTING || + transaction->transactionState == REMOTE_TRANS_2PC_ABORTING) + { + transaction->transactionState = REMOTE_TRANS_ABORTED; + } + else + { + transaction->transactionState = REMOTE_TRANS_COMMITTED; + } + + PQclear(result); + + ForgetResults(connection); +} + + +/* + * RemoteTransactionCommit commits (or aborts, if the transaction failed) a + * remote transaction in a blocking manner. + */ +void +RemoteTransactionCommit(MultiConnection *connection) +{ + StartRemoteTransactionCommit(connection); + FinishRemoteTransactionCommit(connection); +} + + +/* + * StartRemoteTransactionAbort initiates abortin the transaction in a + * non-blocking manner. + */ +void +StartRemoteTransactionAbort(MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + const bool dontRaiseErrors = false; + const bool isNotCommit = false; + + Assert(transaction->transactionState != REMOTE_TRANS_INVALID); + + /* + * Clear previous results, so we have a better chance to send + * ROLLBACK [PREPARED]; + */ + ForgetResults(connection); + + if (transaction->transactionState == REMOTE_TRANS_PREPARING || + transaction->transactionState == REMOTE_TRANS_PREPARED) + { + StringInfoData command; + + initStringInfo(&command); + appendStringInfo(&command, "ROLLBACK PREPARED '%s'", + transaction->preparedName); + + if (!SendRemoteCommand(connection, command.data)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, dontRaiseErrors); + + WarnAboutLeakedPreparedTransaction(connection, isNotCommit); + } + else + { + transaction->transactionState = REMOTE_TRANS_2PC_ABORTING; + } + } + else + { + if (!SendRemoteCommand(connection, "ROLLBACK")) + { + /* no point in reporting a likely redundant message */ + MarkRemoteTransactionFailed(connection, dontRaiseErrors); + } + else + { + transaction->transactionState = REMOTE_TRANS_1PC_ABORTING; + } + } +} + + +/* + * FinishRemoteTransactionAbort finishes the work StartRemoteTransactionAbort + * initiated. It blocks if necessary (i.e. if PQisBusy() would return true). + */ +void +FinishRemoteTransactionAbort(MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + PGresult *result = NULL; + const bool dontRaiseErrors = false; + const bool isNotCommit = false; + + result = GetRemoteCommandResult(connection, dontRaiseErrors); + + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, dontRaiseErrors); + + if (transaction->transactionState == REMOTE_TRANS_1PC_ABORTING) + { + ereport(WARNING, + (errmsg("failed to abort 2PC transaction \"%s\" on %s:%d", + transaction->preparedName, connection->hostname, + connection->port))); + } + else + { + WarnAboutLeakedPreparedTransaction(connection, isNotCommit); + } + } + + PQclear(result); + + result = GetRemoteCommandResult(connection, dontRaiseErrors); + Assert(!result); + + transaction->transactionState = REMOTE_TRANS_ABORTED; +} + + +/* + * RemoteTransactionAbort aborts a remote transaction in a blocking manner. + */ +void +RemoteTransactionAbort(MultiConnection *connection) +{ + StartRemoteTransactionAbort(connection); + FinishRemoteTransactionAbort(connection); +} + + +/* + * StartRemoteTransactionPrepare initiates preparing the transaction in a + * non-blocking manner. + */ +void +StartRemoteTransactionPrepare(struct MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + StringInfoData command; + const bool raiseErrors = true; + + /* can't prepare a nonexistant transaction */ + Assert(transaction->transactionState != REMOTE_TRANS_INVALID); + + /* can't prepare in a failed transaction */ + Assert(!transaction->transactionFailed); + + /* can't prepare if already started to prepare/abort/commit */ + Assert(transaction->transactionState < REMOTE_TRANS_PREPARING); + + Assign2PCIdentifier(connection); + + initStringInfo(&command); + appendStringInfo(&command, "PREPARE TRANSACTION '%s'", + transaction->preparedName); + + if (!SendRemoteCommand(connection, command.data)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, raiseErrors); + } + else + { + transaction->transactionState = REMOTE_TRANS_PREPARING; + } +} + + +/* + * FinishRemoteTransactionPrepare finishes the work + * StartRemoteTransactionPrepare initiated. It blocks if necessary (i.e. if + * PQisBusy() would return true). + */ +void +FinishRemoteTransactionPrepare(struct MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + PGresult *result = NULL; + const bool raiseErrors = true; + + Assert(transaction->transactionState == REMOTE_TRANS_PREPARING); + + result = GetRemoteCommandResult(connection, raiseErrors); + + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + transaction->transactionState = REMOTE_TRANS_ABORTED; + MarkRemoteTransactionFailed(connection, raiseErrors); + } + else + { + transaction->transactionState = REMOTE_TRANS_PREPARED; + } + + result = GetRemoteCommandResult(connection, raiseErrors); + Assert(!result); +} + + +/* + * RemoteTransactionPrepare prepares a remote transaction in a blocking + * manner. + */ +void +RemoteTransactionPrepare(struct MultiConnection *connection) +{ + StartRemoteTransactionPrepare(connection); + FinishRemoteTransactionPrepare(connection); +} + + +/* + * RemoteTransactionBeginIfNecessary is a convenience wrapper around + * RemoteTransactionsBeginIfNecessary(), for a single connection. + */ +void +RemoteTransactionBeginIfNecessary(MultiConnection *connection) +{ + /* just delegate */ + if (InCoordinatedTransaction()) + { + List *connectionList = list_make1(connection); + + RemoteTransactionsBeginIfNecessary(connectionList); + list_free(connectionList); + } +} + + +/* + * RemoteTransactionsBeginIfNecessary begins, if necessary according to this + * session's coordinated transaction state, and the remote transaction's + * state, an explicit transaction on all the connections. This is done in + * parallel, to lessen latency penalties. + */ +void +RemoteTransactionsBeginIfNecessary(List *connectionList) +{ + ListCell *connectionCell = NULL; + + /* + * Don't do anything if not in a coordinated transaction. That allows the + * same code to work both in situations that uses transactions, and when + * not. + */ + if (!InCoordinatedTransaction()) + { + return; + } + + /* issue BEGIN to all connections needing it */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + RemoteTransaction *transaction = &connection->remoteTransaction; + + /* can't send BEGIN if a command already is in progress */ + Assert(PQtransactionStatus(connection->pgConn) != PQTRANS_ACTIVE); + + /* + * If a transaction already is in progress (including having failed), + * don't start it again. Thats quite normal if a piece of code allows + * cached connections. + */ + if (transaction->transactionState != REMOTE_TRANS_INVALID) + { + continue; + } + + StartRemoteTransactionBegin(connection); + } + + /* XXX: Should perform network IO for all connections in a non-blocking manner */ + + /* get result of all the BEGINs */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + RemoteTransaction *transaction = &connection->remoteTransaction; + + /* + * Only handle BEGIN results on connections that are in process of + * starting a transaction, and haven't already failed (e.g. by not + * being able to send BEGIN due to a network failure). + */ + if (transaction->transactionFailed || + transaction->transactionState != REMOTE_TRANS_STARTING) + { + continue; + } + + FinishRemoteTransactionBegin(connection); + } +} + + +/* + * MarkRemoteTransactionFailed records a transaction as having failed. + * + * If the connection is marked as critical, and allowErrorPromotion is true, + * this routine will ERROR out. The allowErrorPromotion case is primarily + * required for the transaction management code itself. Usually it is helpful + * to fail as soon as possible. If !allowErrorPromotion transaction commit + * will instead issue an error before committing on any node. + */ +void +MarkRemoteTransactionFailed(MultiConnection *connection, bool allowErrorPromotion) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + + transaction->transactionFailed = true; + + /* + * If the connection is marked as critical, fail the entire coordinated + * transaction. If allowed. + */ + if (transaction->transactionCritical && allowErrorPromotion) + { + ereport(ERROR, (errmsg("failure on connection marked as essential: %s:%d", + connection->hostname, connection->port))); + } +} + + +/* + * MarkRemoteTransactionCritical signals that failures on this remote + * transaction should fail the entire coordinated transaction. + */ +void +MarkRemoteTransactionCritical(struct MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + + transaction->transactionCritical = true; +} + + +/* + * CloseRemoteTransaction handles closing a connection that, potentially, is + * part of a coordinated transaction. This should only ever be called from + * connection_management.c, while closing a connection during a transaction. + */ +void +CloseRemoteTransaction(struct MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + + /* unlink from list of open transactions, if necessary */ + if (transaction->transactionState != REMOTE_TRANS_INVALID) + { + /* XXX: Should we error out for a critical transaction? */ + + dlist_delete(&connection->transactionNode); + } +} + + +/* + * ResetRemoteTransaction resets the state of the transaction after the end of + * the main transaction, if the connection is being reused. + */ +void +ResetRemoteTransaction(struct MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + + /* just reset the entire state, relying on 0 being invalid/false */ + memset(transaction, 0, sizeof(*transaction)); +} + + +/* + * CoordinatedRemoteTransactionsPrepare PREPAREs a 2PC transaction on all + * non-failed transactions participating in the coordinated transaction. + */ +void +CoordinatedRemoteTransactionsPrepare(void) +{ + dlist_iter iter; + + /* issue PREPARE TRANSACTION; to all relevant remote nodes */ + + /* asynchronously send PREPARE */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + + Assert(transaction->transactionState != REMOTE_TRANS_INVALID); + + /* can't PREPARE a transaction that failed */ + if (transaction->transactionFailed) + { + continue; + } + + StartRemoteTransactionPrepare(connection); + } + + /* XXX: Should perform network IO for all connections in a non-blocking manner */ + + /* Wait for result */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + + if (transaction->transactionState != REMOTE_TRANS_PREPARING) + { + continue; + } + + FinishRemoteTransactionPrepare(connection); + } + + CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED; +} + + +/* + * CoordinatedRemoteTransactionsCommit performs distributed transactions + * handling at commit time. This will be called at XACT_EVENT_PRE_COMMIT if + * 1PC commits are used - so shards can still be invalidated - and at + * XACT_EVENT_COMMIT if 2PC is being used. + * + * Note that this routine has to issue rollbacks for failed transactions. + */ +void +CoordinatedRemoteTransactionsCommit(void) +{ + dlist_iter iter; + + /* + * Before starting to commit on any of the nodes - after which we can't + * completely roll-back anymore - check that things are in a good state. + */ + CheckTransactionHealth(); + + /* + * Issue appropriate transaction commands to remote nodes. If everything + * went well that's going to be COMMIT or COMMIT PREPARED, if individual + * connections had errors, some or all of them might require a ROLLBACK. + * + * First send the command asynchronously over all connections. + */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + + if (transaction->transactionState == REMOTE_TRANS_INVALID || + transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING || + transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING || + transaction->transactionState == REMOTE_TRANS_COMMITTED) + { + continue; + } + + StartRemoteTransactionCommit(connection); + } + + /* XXX: Should perform network IO for all connections in a non-blocking manner */ + + /* wait for the replies to the commands to come in */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + + /* nothing to do if not committing / aborting */ + if (transaction->transactionState != REMOTE_TRANS_1PC_COMMITTING && + transaction->transactionState != REMOTE_TRANS_2PC_COMMITTING && + transaction->transactionState != REMOTE_TRANS_1PC_ABORTING && + transaction->transactionState != REMOTE_TRANS_2PC_ABORTING) + { + continue; + } + + FinishRemoteTransactionCommit(connection); + } +} + + +/* + * CoordinatedRemoteTransactionsAbort performs distributed transactions + * handling at abort time. + * + * This issues ROLLBACKS and ROLLBACK PREPARED depending on whether the remote + * transaction has been prepared or not. + */ +void +CoordinatedRemoteTransactionsAbort(void) +{ + dlist_iter iter; + + /* asynchronously send ROLLBACK [PREPARED] */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + + if (transaction->transactionState == REMOTE_TRANS_INVALID || + transaction->transactionState == REMOTE_TRANS_1PC_ABORTING || + transaction->transactionState == REMOTE_TRANS_2PC_ABORTING || + transaction->transactionState == REMOTE_TRANS_ABORTED) + { + continue; + } + + StartRemoteTransactionAbort(connection); + } + + /* XXX: Should perform network IO for all connections in a non-blocking manner */ + + /* and wait for the results */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + + if (transaction->transactionState != REMOTE_TRANS_1PC_ABORTING && + transaction->transactionState != REMOTE_TRANS_2PC_ABORTING) + { + continue; + } + + FinishRemoteTransactionAbort(connection); + } +} + + +/* + * CheckTransactionHealth checks if any of the participating transactions in a + * coordinated transaction failed, and what consequence that should have. + * This needs to be called before the coordinated transaction commits (but + * after they've been PREPAREd if 2PC is in use). + */ +static void +CheckTransactionHealth(void) +{ + dlist_iter iter; + + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + PGTransactionStatusType status = PQtransactionStatus(connection->pgConn); + + /* if the connection is in a bad state, so is the transaction's state */ + if (status == PQTRANS_INERROR || status == PQTRANS_UNKNOWN) + { + transaction->transactionFailed = true; + } + + /* + * If a critical connection is marked as failed (and no error has been + * raised yet) do so now. + */ + if (transaction->transactionFailed && transaction->transactionCritical) + { + ereport(ERROR, (errmsg("failure on connection marked as essential: %s:%d", + connection->hostname, connection->port))); + } + } +} + + +/* + * Assign2PCIdentifier compute the 2PC transaction name to use for a + * transaction. + * + * Every 2PC transaction should get a new name, i.e. this function will need + * to be called again. + * + * NB: we rely on the fact that we don't need to do full escaping on the names + * generated here. + */ +static void +Assign2PCIdentifier(MultiConnection *connection) +{ + static uint64 sequence = 0; + snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN, + "citus_%d_"UINT64_FORMAT, + MyProcPid, sequence++); +} + + +/* + * WarnAboutLeakedPreparedTransaction issues a WARNING explaining that a + * prepared transaction could not be committed or rolled back, and explains + * how to perform cleanup. + */ +static void +WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool commit) +{ + StringInfoData command; + RemoteTransaction *transaction = &connection->remoteTransaction; + + initStringInfo(&command); + + if (commit) + { + appendStringInfo(&command, "COMMIT PREPARED '%s'", + transaction->preparedName); + } + else + { + appendStringInfo(&command, "ROLLBACK PREPARED '%s'", + transaction->preparedName); + } + + /* log a warning so the user may abort the transaction later */ + ereport(WARNING, (errmsg("failed to roll back prepared transaction '%s'", + transaction->preparedName), + errhint("Run \"%s\" on %s:%u", + command.data, connection->hostname, connection->port))); +} diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 20d6227f4..a14d4b98f 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -34,9 +34,18 @@ int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC; /* state needed to keep track of operations used during a transaction */ XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; +/* list of connections that are part of the current coordinated transaction */ +dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions); + static bool subXactAbortAttempted = false; +/* + * Should this coordinated transaction use 2PC? Set by + * CoordinatedTransactionUse2PC(), e.g. if DDL was issued and + * MultiShardCommitProtocol was set to 2PC. + */ +static bool CurrentTransactionUse2PC = false; /* transaction management functions */ static void CoordinatedTransactionCallback(XactEvent event, void *arg); @@ -47,6 +56,64 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction static void AdjustMaxPreparedTransactions(void); +/* + * BeginCoordinatedTransaction begins a coordinated transaction. No + * pre-existing coordinated transaction may be in progress. + */ +void +BeginCoordinatedTransaction(void) +{ + if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE && + CurrentCoordinatedTransactionState != COORD_TRANS_IDLE) + { + ereport(ERROR, (errmsg("starting transaction in wrong state"))); + } + + CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; +} + + +/* + * BeginOrContinueCoordinatedTransaction starts a coordinated transaction, + * unless one already is in progress. + */ +void +BeginOrContinueCoordinatedTransaction(void) +{ + if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED) + { + return; + } + + BeginCoordinatedTransaction(); +} + + +/* + * InCoordinatedTransaction returns whether a coordinated transaction has been + * started. + */ +bool +InCoordinatedTransaction(void) +{ + return CurrentCoordinatedTransactionState != COORD_TRANS_NONE && + CurrentCoordinatedTransactionState != COORD_TRANS_IDLE; +} + + +/* + * CoordinatedTransactionUse2PC() signals that the current coordinated + * transaction should use 2PC to commit. + */ +void +CoordinatedTransactionUse2PC(void) +{ + Assert(InCoordinatedTransaction()); + + CurrentTransactionUse2PC = true; +} + + void InitializeTransactionManagement(void) { @@ -73,6 +140,12 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) { case XACT_EVENT_COMMIT: { + if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) + { + /* handles both already prepared and open transactions */ + CoordinatedRemoteTransactionsCommit(); + } + /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { @@ -82,11 +155,25 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) Assert(!subXactAbortAttempted); CurrentCoordinatedTransactionState = COORD_TRANS_NONE; XactModificationLevel = XACT_MODIFICATION_NONE; + dlist_init(&InProgressTransactions); + CurrentTransactionUse2PC = false; } break; case XACT_EVENT_ABORT: { + /* + * FIXME: Add warning for the COORD_TRANS_COMMITTED case. That + * can be reached if this backend fails after the + * XACT_EVENT_PRE_COMMIT state. + */ + + /* handles both already prepared and open transactions */ + if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) + { + CoordinatedRemoteTransactionsAbort(); + } + /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { @@ -95,6 +182,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) CurrentCoordinatedTransactionState = COORD_TRANS_NONE; XactModificationLevel = XACT_MODIFICATION_NONE; + dlist_init(&InProgressTransactions); + CurrentTransactionUse2PC = false; subXactAbortAttempted = false; } break; @@ -118,6 +207,29 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) "which modify distributed tables"))); } } + + /* nothing further to do if there's no managed remote xacts */ + if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE) + { + break; + } + + + if (CurrentTransactionUse2PC) + { + CoordinatedRemoteTransactionsPrepare(); + CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED; + } + else + { + /* + * Have to commit remote transactions in PRE_COMMIT, to allow + * us to mark failed placements as invalid. Better don't use + * this for anything important (i.e. DDL/metadata). + */ + CoordinatedRemoteTransactionsCommit(); + CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED; + } } break; diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index be88c79a3..ddc0b26e1 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -12,6 +12,7 @@ #define CONNECTION_MANAGMENT_H #include "distributed/transaction_management.h" +#include "distributed/remote_transaction.h" #include "lib/ilist.h" #include "utils/hsearch.h" #include "utils/timestamp.h" @@ -63,6 +64,12 @@ typedef struct MultiConnection /* membership in list of list of connections in ConnectionHashEntry */ dlist_node connectionNode; + + /* information about the associated remote transaction */ + RemoteTransaction remoteTransaction; + + /* membership in list of in-progress transactions */ + dlist_node transactionNode; } MultiConnection; diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h new file mode 100644 index 000000000..e0ffb5dcd --- /dev/null +++ b/src/include/distributed/remote_transaction.h @@ -0,0 +1,109 @@ +/*------------------------------------------------------------------------- + * remote_transaction.h + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + + +#ifndef REMOTE_TRANSACTION_H +#define REMOTE_TRANSACTION_H + + +#include "nodes/pg_list.h" +#include "lib/ilist.h" + + +/* forward declare, to avoid recursive includes */ +struct MultiConnection; + +/* + * Enum that defines different remote transaction states, of a single remote + * transaction. + */ +typedef enum +{ + /* no transaction active */ + REMOTE_TRANS_INVALID = 0, + + /* transaction start */ + REMOTE_TRANS_STARTING, + REMOTE_TRANS_STARTED, + + /* 2pc prepare */ + REMOTE_TRANS_PREPARING, + REMOTE_TRANS_PREPARED, + + /* transaction abort */ + REMOTE_TRANS_1PC_ABORTING, + REMOTE_TRANS_2PC_ABORTING, + REMOTE_TRANS_ABORTED, + + /* transaction commit */ + REMOTE_TRANS_1PC_COMMITTING, + REMOTE_TRANS_2PC_COMMITTING, + REMOTE_TRANS_COMMITTED +} RemoteTransactionState; + + +/* + * Transaction state associated associated with a single MultiConnection. + */ +typedef struct RemoteTransaction +{ + /* what state is the remote side transaction in */ + RemoteTransactionState transactionState; + + /* failures on this connection should abort entire coordinated transaction */ + bool transactionCritical; + + /* failed in current transaction */ + bool transactionFailed; + + /* 2PC transaction name currently associated with connection */ + char preparedName[NAMEDATALEN]; +} RemoteTransaction; + + +/* change an individual remote transaction's state */ +extern void StartRemoteTransactionBegin(struct MultiConnection *connection); +extern void FinishRemoteTransactionBegin(struct MultiConnection *connection); +extern void RemoteTransactionBegin(struct MultiConnection *connection); + +extern void StartRemoteTransactionPrepare(struct MultiConnection *connection); +extern void FinishRemoteTransactionPrepare(struct MultiConnection *connection); +extern void RemoteTransactionPrepare(struct MultiConnection *connection); + +extern void StartRemoteTransactionCommit(struct MultiConnection *connection); +extern void FinishRemoteTransactionCommit(struct MultiConnection *connection); +extern void RemoteTransactionCommit(struct MultiConnection *connection); + +extern void StartRemoteTransactionAbort(struct MultiConnection *connection); +extern void FinishRemoteTransactionAbort(struct MultiConnection *connection); +extern void RemoteTransactionAbort(struct MultiConnection *connection); + +/* start transaction if necessary */ +extern void RemoteTransactionBeginIfNecessary(struct MultiConnection *connection); +extern void RemoteTransactionsBeginIfNecessary(List *connectionList); + +/* other public functionality */ +extern void MarkRemoteTransactionFailed(struct MultiConnection *connection, + bool allowErrorPromotion); +extern void MarkRemoteTransactionCritical(struct MultiConnection *connection); + + +/* + * The following functions should all only be called by connection / + * transaction managment code. + */ + +extern void CloseRemoteTransaction(struct MultiConnection *connection); +extern void ResetRemoteTransaction(struct MultiConnection *connection); + +/* perform handling for all in-progress transactions */ +extern void CoordinatedRemoteTransactionsPrepare(void); +extern void CoordinatedRemoteTransactionsCommit(void); +extern void CoordinatedRemoteTransactionsAbort(void); + +#endif /* REMOTE_TRANSACTION_H */ diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index 91a5cd181..b2eda9af6 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -9,6 +9,8 @@ #ifndef TRANSACTION_MANAGMENT_H #define TRANSACTION_MANAGMENT_H +#include "lib/ilist.h" + /* describes what kind of modifications have occurred in the current transaction */ typedef enum { @@ -29,7 +31,16 @@ typedef enum CoordinatedTransactionState COORD_TRANS_NONE, /* no coordinated transaction in progress, but connections established */ - COORD_TRANS_IDLE + COORD_TRANS_IDLE, + + /* coordinated transaction in progress */ + COORD_TRANS_STARTED, + + /* coordinated transaction prepared on all workers */ + COORD_TRANS_PREPARED, + + /* coordinated transaction committed */ + COORD_TRANS_COMMITTED } CoordinatedTransactionState; @@ -46,9 +57,18 @@ extern int MultiShardCommitProtocol; /* state needed to prevent new connections during modifying transactions */ extern XactModificationType XactModificationLevel; - extern CoordinatedTransactionState CurrentCoordinatedTransactionState; +/* list of connections that are part of the current coordinated transaction */ +extern dlist_head InProgressTransactions; + +/* + * Coordinated transaction management. + */ +extern void BeginCoordinatedTransaction(void); +extern void BeginOrContinueCoordinatedTransaction(void); +extern bool InCoordinatedTransaction(void); +extern void CoordinatedTransactionUse2PC(void); /* initialization function(s) */ extern void InitializeTransactionManagement(void); From fa5e202403867f5ac4d0733f1ba3765f9c213191 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 6 Dec 2016 20:03:47 -0800 Subject: [PATCH 3/4] Convert multi_shard_transaction.[ch] to new framework. --- src/backend/distributed/shared_library_init.c | 2 - .../transaction/multi_shard_transaction.c | 141 ++++---------- .../transaction/transaction_management.c | 15 ++ .../distributed/multi_shard_transaction.h | 2 +- .../regress/expected/multi_insert_select.out | 175 ------------------ .../expected/multi_join_order_additional.out | 2 - .../regress/expected/multi_shard_modify.out | 4 - .../multi_alter_table_statements.source | 7 +- 8 files changed, 54 insertions(+), 294 deletions(-) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 60c4f3656..3f092d3d3 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -31,7 +31,6 @@ #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" -#include "distributed/multi_shard_transaction.h" #include "distributed/multi_utility.h" #include "distributed/remote_commands.h" #include "distributed/task_tracker.h" @@ -163,7 +162,6 @@ _PG_init(void) /* initialize transaction callbacks */ RegisterRouterExecutorXactCallbacks(); - RegisterShardPlacementXactCallbacks(); /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index 9f3d15c0f..f62507af8 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -30,14 +30,8 @@ #define INITIAL_CONNECTION_CACHE_SIZE 1001 -/* Global variables used in commit handler */ +/* per-transaction state */ static HTAB *shardConnectionHash = NULL; -static bool subXactAbortAttempted = false; - -/* functions needed by callbacks and hooks */ -static void CompleteShardPlacementTransactions(XactEvent event, void *arg); -static void MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId, - SubTransactionId parentSubid, void *arg); /* @@ -118,6 +112,12 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName) UINT64_FORMAT, shardId))); } + BeginOrContinueCoordinatedTransaction(); + if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) + { + CoordinatedTransactionUse2PC(); + } + /* get existing connections to the shard placements, if any */ shardConnections = GetShardConnections(shardId, &shardConnectionsFound); if (shardConnectionsFound) @@ -129,11 +129,11 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName) foreach(placementCell, shardPlacementList) { ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell); - PGconn *connection = NULL; + MultiConnection *connection = NULL; TransactionConnection *transactionConnection = NULL; WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName, shardPlacement->nodePort); - PGresult *result = NULL; + int connectionFlags = FORCE_NEW_CONNECTION; if (workerNode == NULL) { @@ -141,10 +141,13 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName) shardPlacement->nodeName, shardPlacement->nodePort))); } - connection = ConnectToNode(shardPlacement->nodeName, shardPlacement->nodePort, - userName); - - if (connection == NULL) + /* XXX: It'd be nicer to establish connections asynchronously here */ + connection = GetNodeUserDatabaseConnection(connectionFlags, + shardPlacement->nodeName, + shardPlacement->nodePort, + userName, + NULL); + if (PQstatus(connection->pgConn) != CONNECTION_OK) { ereport(ERROR, (errmsg("could not establish a connection to all " "placements of shard %lu", shardId))); @@ -158,7 +161,7 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName) transactionConnection->groupId = workerNode->groupId; transactionConnection->connectionId = shardConnections->shardId; transactionConnection->transactionState = TRANSACTION_STATE_OPEN; - transactionConnection->connection = connection; + transactionConnection->connection = connection->pgConn; transactionConnection->nodeName = shardPlacement->nodeName; transactionConnection->nodePort = shardPlacement->nodePort; @@ -167,12 +170,14 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName) MemoryContextSwitchTo(oldContext); - /* now that connection is tracked, issue BEGIN */ - result = PQexec(connection, "BEGIN"); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - ReraiseRemoteError(connection, result); - } + /* + * Every individual failure should cause entire distributed + * transaction to fail. + */ + MarkRemoteTransactionCritical(connection); + + /* issue BEGIN */ + RemoteTransactionBegin(connection); } } @@ -253,98 +258,18 @@ ConnectionList(HTAB *connectionHash) /* - * RegisterShardPlacementXactCallbacks registers transaction callbacks needed - * for multi-shard transactions. + * ResetShardPlacementTransactionState performs cleanup after the end of a + * transaction. */ void -RegisterShardPlacementXactCallbacks(void) +ResetShardPlacementTransactionState(void) { - RegisterXactCallback(CompleteShardPlacementTransactions, NULL); - RegisterSubXactCallback(MultiShardSubXactCallback, NULL); -} - - -/* - * CompleteShardPlacementTransactions commits or aborts pending shard placement - * transactions when the local transaction commits or aborts. - */ -static void -CompleteShardPlacementTransactions(XactEvent event, void *arg) -{ - List *connectionList = ConnectionList(shardConnectionHash); - - if (shardConnectionHash == NULL) - { - /* nothing to do */ - return; - } - - if (event == XACT_EVENT_PRE_COMMIT) - { - if (subXactAbortAttempted) - { - subXactAbortAttempted = false; - - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot ROLLBACK TO SAVEPOINT in transactions " - "which modify distributed tables"))); - } - - /* - * Any failure here will cause local changes to be rolled back, - * and remote changes to either roll back (1PC) or, in case of - * connection or node failure, leave a prepared transaction - * (2PC). - */ - - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) - { - PrepareRemoteTransactions(connectionList); - } - - return; - } - else if (event == XACT_EVENT_COMMIT) - { - /* - * A failure here will cause some remote changes to either - * roll back (1PC) or, in case of connection or node failure, - * leave a prepared transaction (2PC). However, the local - * changes have already been committed. - */ - - CommitRemoteTransactions(connectionList, false); - } - else if (event == XACT_EVENT_ABORT) - { - /* - * A failure here will cause some remote changes to either - * roll back (1PC) or, in case of connection or node failure, - * leave a prepared transaction (2PC). The local changes have - * already been rolled back. - */ - - AbortRemoteTransactions(connectionList); - } - else - { - return; - } - - CloseConnections(connectionList); + /* + * Now that transaction management does most of our work, nothing remains + * but to reset the connection hash, which wouldn't be valid next time + * round. + */ shardConnectionHash = NULL; - subXactAbortAttempted = false; -} - - -static void -MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId, - SubTransactionId parentSubid, void *arg) -{ - if ((shardConnectionHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB)) - { - subXactAbortAttempted = true; - } } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index a14d4b98f..0dadbd0c9 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,6 +21,7 @@ #include "access/xact.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" +#include "distributed/multi_shard_transaction.h" #include "distributed/transaction_management.h" #include "utils/hsearch.h" #include "utils/guc.h" @@ -140,6 +141,13 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) { case XACT_EVENT_COMMIT: { + /* + * Call other parts of citus that need to integrate into + * transaction management. Do so before doing other work, so the + * callbacks still can perform work if needed. + */ + ResetShardPlacementTransactionState(); + if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) { /* handles both already prepared and open transactions */ @@ -168,6 +176,13 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * XACT_EVENT_PRE_COMMIT state. */ + /* + * Call other parts of citus that need to integrate into + * transaction management. Do so before doing other work, so the + * callbacks still can perform work if needed. + */ + ResetShardPlacementTransactionState(); + /* handles both already prepared and open transactions */ if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) { diff --git a/src/include/distributed/multi_shard_transaction.h b/src/include/distributed/multi_shard_transaction.h index 4b5ec354a..95dd29d32 100644 --- a/src/include/distributed/multi_shard_transaction.h +++ b/src/include/distributed/multi_shard_transaction.h @@ -33,7 +33,7 @@ extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 sh bool *connectionsFound); extern List * ConnectionList(HTAB *connectionHash); extern void CloseConnections(List *connectionList); -extern void RegisterShardPlacementXactCallbacks(void); +extern void ResetShardPlacementTransactionState(void); #endif /* MULTI_SHARD_TRANSACTION_H */ diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 4bcc20e00..7d675b60e 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -80,14 +80,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 -- see that our first multi shard INSERT...SELECT works expected SET client_min_messages TO INFO; DEBUG: StartTransactionCommand @@ -283,14 +275,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 user_id | time | value_1 | value_2 | value_3 | value_4 ---------+------+---------+---------+---------+--------- 9 | | 90 | | 9000 | @@ -363,14 +347,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 -- group by column not exists on the SELECT target list INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, user_id) SELECT @@ -574,14 +550,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 -- add one more level subqueris on top of subquery JOINs INSERT INTO agg_events (user_id, value_4_agg) @@ -659,14 +627,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 -- subqueries in WHERE clause INSERT INTO raw_events_second (user_id) @@ -711,14 +671,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 -- some UPSERTS INSERT INTO agg_events AS ae ( @@ -758,14 +710,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 -- upserts with returning INSERT INTO agg_events AS ae ( @@ -806,14 +750,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 user_id | value_1_agg ---------+------------- 7 | @@ -848,14 +784,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 -- FILTER CLAUSE INSERT INTO agg_events (user_id, value_1_agg) SELECT @@ -886,14 +814,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 -- a test with reference table JOINs INSERT INTO agg_events (user_id, value_1_agg) @@ -929,14 +849,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 -- a note on the outer joins is that -- we filter out outer join results -- where partition column returns @@ -1024,14 +936,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 SET client_min_messages TO INFO; DEBUG: StartTransactionCommand DEBUG: StartTransaction @@ -1094,14 +998,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 -- we don't want to see constraint vialotions, so truncate first SET client_min_messages TO INFO; DEBUG: StartTransactionCommand @@ -1168,14 +1064,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 -- We do not support any set operations INSERT INTO raw_events_first(user_id) @@ -1547,14 +1435,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 SET client_min_messages TO INFO; DEBUG: StartTransactionCommand DEBUG: StartTransaction @@ -1695,13 +1575,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300001 -DEBUG: sent COMMIT over connection 13300000 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300002 -DEBUG: sent COMMIT over connection 13300003 -DEBUG: sent COMMIT over connection 13300003 SET client_min_messages TO INFO; DEBUG: StartTransactionCommand DEBUG: StartTransaction @@ -1784,10 +1657,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300014 -DEBUG: sent COMMIT over connection 13300014 -- see that defaults are filled INSERT INTO table_with_defaults (store_id, first_name) SELECT @@ -1806,10 +1675,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300014 -DEBUG: sent COMMIT over connection 13300014 -- shuffle one of the defaults and skip the other INSERT INTO table_with_defaults (default_2, store_id, first_name) SELECT @@ -1828,10 +1693,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300014 -DEBUG: sent COMMIT over connection 13300014 -- shuffle both defaults INSERT INTO table_with_defaults (default_2, store_id, default_1, first_name) SELECT @@ -1850,10 +1711,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300014 -DEBUG: sent COMMIT over connection 13300014 -- use constants instead of non-default column INSERT INTO table_with_defaults (default_2, last_name, store_id, first_name) SELECT @@ -1872,10 +1729,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300014 -DEBUG: sent COMMIT over connection 13300014 -- use constants instead of non-default column and skip both defauls INSERT INTO table_with_defaults (last_name, store_id, first_name) SELECT @@ -1894,10 +1747,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300014 -DEBUG: sent COMMIT over connection 13300014 -- use constants instead of default columns INSERT INTO table_with_defaults (default_2, last_name, store_id, first_name, default_1) SELECT @@ -1916,10 +1765,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300014 -DEBUG: sent COMMIT over connection 13300014 -- use constants instead of both default columns and non-default columns INSERT INTO table_with_defaults (default_2, last_name, store_id, first_name, default_1) SELECT @@ -1938,10 +1783,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300014 -DEBUG: sent COMMIT over connection 13300014 -- some of the the ultimate queries where we have constants, -- defaults and group by entry is not on the target entry INSERT INTO table_with_defaults (default_2, store_id, first_name) @@ -1963,10 +1804,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300014 -DEBUG: sent COMMIT over connection 13300014 INSERT INTO table_with_defaults (default_1, store_id, first_name, default_2) SELECT 1000, store_id, 'Andres', '2000' @@ -1986,10 +1823,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300014 -DEBUG: sent COMMIT over connection 13300014 INSERT INTO table_with_defaults (default_1, store_id, first_name, default_2) SELECT 1000, store_id, 'Andres', '2000' @@ -2009,10 +1842,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300014 -DEBUG: sent COMMIT over connection 13300014 INSERT INTO table_with_defaults (default_1, store_id, first_name) SELECT 1000, store_id, 'Andres' @@ -2032,10 +1861,6 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300013 -DEBUG: sent COMMIT over connection 13300014 -DEBUG: sent COMMIT over connection 13300014 -- set back to the default SET citus.shard_count TO DEFAULT; DEBUG: StartTransactionCommand diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index 2d9090150..7cebe1ec9 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -45,8 +45,6 @@ CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate); NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash" -DEBUG: sent COMMIT over connection 650000 -DEBUG: sent COMMIT over connection 650001 CREATE TABLE orders_hash ( o_orderkey bigint not null, o_custkey integer not null, diff --git a/src/test/regress/expected/multi_shard_modify.out b/src/test/regress/expected/multi_shard_modify.out index 328b6ce33..f0ae3bc1d 100644 --- a/src/test/regress/expected/multi_shard_modify.out +++ b/src/test/regress/expected/multi_shard_modify.out @@ -119,10 +119,6 @@ SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE DEBUG: predicate pruning for shardId 350001 DEBUG: predicate pruning for shardId 350002 DEBUG: predicate pruning for shardId 350003 -DEBUG: sent PREPARE TRANSACTION over connection 350000 -DEBUG: sent PREPARE TRANSACTION over connection 350000 -DEBUG: sent COMMIT PREPARED over connection 350000 -DEBUG: sent COMMIT PREPARED over connection 350000 master_modify_multiple_shards ------------------------------- 1 diff --git a/src/test/regress/output/multi_alter_table_statements.source b/src/test/regress/output/multi_alter_table_statements.source index 93a826c4a..4ecd9ab33 100644 --- a/src/test/regress/output/multi_alter_table_statements.source +++ b/src/test/regress/output/multi_alter_table_statements.source @@ -559,7 +559,7 @@ COMMIT; WARNING: duplicate key value violates unique constraint "ddl_commands_command_key" DETAIL: Key (command)=(CREATE INDEX) already exists. CONTEXT: while executing command on localhost:57638 -ERROR: failed to prepare transaction +ERROR: failure on connection marked as essential: localhost:57638 -- Nothing from the block should have committed SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items'; indexname | tablename @@ -574,7 +574,10 @@ NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' CREATE INDEX single_index_3 ON single_shard_items(name); COMMIT; -WARNING: failed to commit transaction on localhost:57638 +WARNING: duplicate key value violates unique constraint "ddl_commands_command_key" +DETAIL: Key (command)=(CREATE INDEX) already exists. +CONTEXT: while executing command on localhost:57638 +WARNING: failed to commit critical transaction on localhost:57638, metadata is likely out of sync -- The block should have committed with a warning SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items'; indexname | tablename From 80b34a5d6b3075c0c528e990eea1056d74b42074 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 8 Dec 2016 19:29:48 -0800 Subject: [PATCH 4/4] Integrate router executor into transaction management framework. One less place managing remote transactions. It also makes it fairly easy to use 2PC for certain modifications (e.g. reference tables). Just issue a CoordinatedTransactionUse2PC(). If every placement failure should cause the whole transaction to abort, additionally mark the relevant transactions as critical. --- .../executor/multi_router_executor.c | 230 ++++-------------- src/backend/distributed/shared_library_init.c | 3 - .../transaction/transaction_management.c | 11 + src/include/distributed/connection_cache.h | 2 +- .../distributed/multi_router_executor.h | 3 +- .../expected/multi_modifying_xacts.out | 4 + 6 files changed, 71 insertions(+), 182 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index f2294fca0..ebb99c26c 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -41,6 +41,7 @@ #include "distributed/multi_shard_transaction.h" #include "distributed/relay_utility.h" #include "distributed/remote_commands.h" +#include "distributed/remote_transaction.h" #include "distributed/resource_lock.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -80,13 +81,9 @@ bool AllModificationsCommutative = false; * of XactShardConnSets, which map a shard identifier to a set of connection * hash entries. This list is walked by MarkRemainingInactivePlacements to * ensure we mark placements as failed if they reject a COMMIT. - * - * Beyond that, there's a backend hook to register xact callbacks and a flag to - * track when a user tries to roll back to a savepoint (not allowed). */ static HTAB *xactParticipantHash = NULL; static List *xactShardConnSetList = NIL; -static bool subXactAbortAttempted = false; /* functions needed during start phase */ static void InitTransactionStateForTask(Task *task); @@ -124,11 +121,7 @@ static bool ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows static void RecordShardIdParticipant(uint64 affectedShardId, NodeConnectionEntry *participantEntry); -/* functions needed by callbacks and hooks */ -static void RouterTransactionCallback(XactEvent event, void *arg); -static void RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId, - SubTransactionId parentSubid, void *arg); -static void ExecuteTransactionEnd(bool commit); +/* to verify the health of shards after a transactional modification command */ static void MarkRemainingInactivePlacements(void); @@ -249,6 +242,8 @@ InitTransactionStateForTask(Task *task) { ListCell *placementCell = NULL; + BeginCoordinatedTransaction(); + xactParticipantHash = CreateXactParticipantHash(); foreach(placementCell, task->taskPlacementList) @@ -257,8 +252,9 @@ InitTransactionStateForTask(Task *task) NodeConnectionKey participantKey; NodeConnectionEntry *participantEntry = NULL; bool entryFound = false; - - PGconn *connection = NULL; + int connectionFlags = SESSION_LIFESPAN; + MultiConnection *connection = + GetNodeConnection(connectionFlags, placement->nodeName, placement->nodePort); MemSet(&participantKey, 0, sizeof(participantKey)); strlcpy(participantKey.nodeName, placement->nodeName, @@ -269,21 +265,8 @@ InitTransactionStateForTask(Task *task) HASH_ENTER, &entryFound); Assert(!entryFound); - connection = GetOrEstablishConnection(placement->nodeName, - placement->nodePort); - if (connection != NULL) - { - PGresult *result = PQexec(connection, "BEGIN"); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - WarnRemoteError(connection, result); - CloseConnectionByPGconn(connection); - - connection = NULL; - } - - PQclear(result); - } + /* issue BEGIN if necessary */ + RemoteTransactionBeginIfNecessary(connection); participantEntry->connection = connection; } @@ -1212,7 +1195,7 @@ GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery) RecordShardIdParticipant(placement->shardId, participantEntry); } - return participantEntry->connection; + return participantEntry->connection->pgConn; } else { @@ -1721,173 +1704,52 @@ RouterExecutorEnd(QueryDesc *queryDesc) /* - * RegisterRouterExecutorXactCallbacks registers this executor's callbacks. + * RouterExecutorPreCommitCheck() gets called after remote transactions have + * committed, so it can invalidate failed shards and perform related checks. */ void -RegisterRouterExecutorXactCallbacks(void) +RouterExecutorPreCommitCheck(void) { - RegisterXactCallback(RouterTransactionCallback, NULL); - RegisterSubXactCallback(RouterSubtransactionCallback, NULL); -} - - -/* - * RouterTransactionCallback handles committing or aborting remote transactions - * after the local one has committed or aborted. It only sends COMMIT or ABORT - * commands to still-healthy remotes; the failed ones are marked as inactive if - * after a successful COMMIT (no need to mark on ABORTs). - */ -static void -RouterTransactionCallback(XactEvent event, void *arg) -{ - if (XactModificationLevel != XACT_MODIFICATION_DATA) - { - return; - } - - switch (event) - { - case XACT_EVENT_PARALLEL_COMMIT: - case XACT_EVENT_COMMIT: - { - break; - } - - case XACT_EVENT_PARALLEL_ABORT: - case XACT_EVENT_ABORT: - { - bool commit = false; - - ExecuteTransactionEnd(commit); - - break; - } - - /* no support for prepare with multi-statement transactions */ - case XACT_EVENT_PREPARE: - case XACT_EVENT_PRE_PREPARE: - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot prepare a transaction that modified " - "distributed tables"))); - - break; - } - - case XACT_EVENT_PARALLEL_PRE_COMMIT: - case XACT_EVENT_PRE_COMMIT: - { - bool commit = true; - - if (subXactAbortAttempted) - { - subXactAbortAttempted = false; - - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot ROLLBACK TO SAVEPOINT in transactions " - "which modify distributed tables"))); - } - - ExecuteTransactionEnd(commit); - MarkRemainingInactivePlacements(); - - /* leave early to avoid resetting transaction state */ - return; - } - } - - /* reset transaction state */ - xactParticipantHash = NULL; - xactShardConnSetList = NIL; - subXactAbortAttempted = false; -} - - -/* - * RouterSubtransactionCallback silently keeps track of any attempt to ROLLBACK - * TO SAVEPOINT, which is not permitted by this executor. At transaction end, - * the executor checks whether such a rollback was attempted and, if so, errors - * out entirely (with an appropriate message). - * - * This implementation permits savepoints so long as no rollbacks occur. - */ -static void -RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId, - SubTransactionId parentSubid, void *arg) -{ - if ((xactParticipantHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB)) - { - subXactAbortAttempted = true; - } -} - - -/* - * ExecuteTransactionEnd ends any remote transactions still taking place on - * remote nodes. It uses xactParticipantHash to know which nodes need any - * final COMMIT or ABORT commands. Nodes which fail a final COMMIT will have - * their connection field set to NULL to permit placement invalidation. - */ -static void -ExecuteTransactionEnd(bool commit) -{ - const char *sqlCommand = commit ? "COMMIT TRANSACTION" : "ABORT TRANSACTION"; - HASH_SEQ_STATUS scan; - NodeConnectionEntry *participant; - bool completed = !commit; /* aborts are assumed completed */ - + /* no transactional router modification were issued, nothing to do */ if (xactParticipantHash == NULL) { return; } - hash_seq_init(&scan, xactParticipantHash); - while ((participant = (NodeConnectionEntry *) hash_seq_search(&scan))) - { - PGconn *connection = participant->connection; - PGresult *result = NULL; + MarkRemainingInactivePlacements(); +} - if (PQstatus(connection) != CONNECTION_OK) - { - continue; - } - result = PQexec(connection, sqlCommand); - if (PQresultStatus(result) == PGRES_COMMAND_OK) - { - completed = true; - } - else - { - WarnRemoteError(connection, result); - CloseConnectionByPGconn(participant->connection); - - participant->connection = NULL; - } - - PQclear(result); - } - - if (!completed) - { - ereport(ERROR, (errmsg("could not commit transaction on any active nodes"))); - } +/* + * Cleanup callback called after a transaction commits or aborts. + */ +void +RouterExecutorPostCommit(void) +{ + /* reset transaction state */ + xactParticipantHash = NULL; + xactShardConnSetList = NIL; } /* * MarkRemainingInactivePlacements takes care of marking placements of a shard * inactive after some of the placements rejected the final COMMIT phase of a - * transaction. This step is skipped if all placements reject the COMMIT, since - * in that case no modifications to the placement have persisted. + * transaction. * - * Failures are detected by checking the connection field of the entries in the - * connection set for each shard: it is always set to NULL after errors. + * Failures are detected by checking the connection & transaction state for + * each of the entries in the connection set for each shard. */ static void MarkRemainingInactivePlacements(void) { ListCell *shardConnSetCell = NULL; + int totalSuccesses = 0; + + if (xactParticipantHash == NULL) + { + return; + } foreach(shardConnSetCell, xactShardConnSetList) { @@ -1899,11 +1761,16 @@ MarkRemainingInactivePlacements(void) /* determine how many actual successes there were: subtract failures */ foreach(participantCell, participantList) { - NodeConnectionEntry *participant = NULL; - participant = (NodeConnectionEntry *) lfirst(participantCell); + NodeConnectionEntry *participant = + (NodeConnectionEntry *) lfirst(participantCell); + MultiConnection *connection = participant->connection; - /* other codes sets connection to NULL after errors */ - if (participant->connection == NULL) + /* + * Fail if the connection has been set to NULL after an error, or + * if the transaction failed for other reasons (e.g. COMMIT + * failed). + */ + if (connection == NULL || connection->remoteTransaction.transactionFailed) { successes--; } @@ -1921,7 +1788,8 @@ MarkRemainingInactivePlacements(void) NodeConnectionEntry *participant = NULL; participant = (NodeConnectionEntry *) lfirst(participantCell); - if (participant->connection == NULL) + if (participant->connection == NULL || + participant->connection->remoteTransaction.transactionFailed) { uint64 shardId = shardConnSet->shardId; NodeConnectionKey *nodeKey = &participant->cacheKey; @@ -1934,5 +1802,13 @@ MarkRemainingInactivePlacements(void) nodeKey->nodeName, nodeKey->nodePort); } } + + totalSuccesses++; + } + + /* If no shards could be modified at all, error out. */ + if (totalSuccesses == 0) + { + ereport(ERROR, (errmsg("could not commit transaction on any active nodes"))); } } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 3f092d3d3..093bc6e89 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -160,9 +160,6 @@ _PG_init(void) InitializeTransactionManagement(); InitializeConnectionManagement(); - /* initialize transaction callbacks */ - RegisterRouterExecutorXactCallbacks(); - /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) { diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 0dadbd0c9..8f8cf8515 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,6 +21,7 @@ #include "access/xact.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" +#include "distributed/multi_router_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/transaction_management.h" #include "utils/hsearch.h" @@ -147,6 +148,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * callbacks still can perform work if needed. */ ResetShardPlacementTransactionState(); + RouterExecutorPostCommit(); if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) { @@ -182,6 +184,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * callbacks still can perform work if needed. */ ResetShardPlacementTransactionState(); + RouterExecutorPostCommit(); /* handles both already prepared and open transactions */ if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) @@ -245,6 +248,14 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) CoordinatedRemoteTransactionsCommit(); CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED; } + + /* + * Call other parts of citus that need to integrate into + * transaction management. Call them *after* committing/preparing + * the remote transactions, to allow marking shards as invalid + * (e.g. if the remote commit failed). + */ + RouterExecutorPreCommitCheck(); } break; diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 363042a94..9e24c8570 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -35,7 +35,7 @@ typedef struct NodeConnectionKey typedef struct NodeConnectionEntry { NodeConnectionKey cacheKey; /* hash entry key */ - PGconn *connection; /* connection to remote server, if any */ + MultiConnection *connection; /* connection to remote server, if any */ } NodeConnectionEntry; diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 48f7f6499..f584abdc8 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -37,7 +37,8 @@ extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); -extern void RegisterRouterExecutorXactCallbacks(void); +extern void RouterExecutorPreCommitCheck(void); +extern void RouterExecutorPostCommit(void); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index dbad2d1e4..44e683db3 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -442,6 +442,7 @@ INSERT INTO objects VALUES (2, 'BAD'); INSERT INTO labs VALUES (9, 'Umbrella Corporation'); COMMIT; WARNING: illegal value +WARNING: failed to commit transaction on localhost:57638 -- data should be persisted SELECT * FROM objects WHERE id = 2; id | name @@ -490,7 +491,9 @@ INSERT INTO labs VALUES (8, 'Aperture Science'); INSERT INTO labs VALUES (9, 'BAD'); COMMIT; WARNING: illegal value +WARNING: failed to commit transaction on localhost:57637 WARNING: illegal value +WARNING: failed to commit transaction on localhost:57638 ERROR: could not commit transaction on any active nodes -- data should NOT be persisted SELECT * FROM objects WHERE id = 1; @@ -526,6 +529,7 @@ INSERT INTO labs VALUES (8, 'Aperture Science'); INSERT INTO labs VALUES (9, 'BAD'); COMMIT; WARNING: illegal value +WARNING: failed to commit transaction on localhost:57637 \set VERBOSITY default -- data to objects should be persisted, but labs should not... SELECT * FROM objects WHERE id = 1;