diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c new file mode 100644 index 000000000..ae9deab03 --- /dev/null +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -0,0 +1,546 @@ +/*------------------------------------------------------------------------- + * + * remote_transaction.c + * Management of transaction spanning more than one node. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "libpq-fe.h" + +#include "miscadmin.h" + +#include "access/xact.h" +#include "distributed/connection_management.h" +#include "distributed/transaction_management.h" +#include "distributed/remote_commands.h" +#include "distributed/remote_transaction.h" +#include "utils/hsearch.h" + + +static void Assign2PCIdentifier(MultiConnection *connection); +static void WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool commit); + + +/* + * Begin, if necessary according to this session's coordinated transaction + * state, and the connection's state, an explicit transaction on all the + * connections. This is done in parallel, to lessen latency penalties. + */ +void +AdjustRemoteTransactionStates(List *connectionList) +{ + ListCell *connectionCell = NULL; + + if (!InCoordinatedTransaction()) + { + return; + } + + /* issue BEGIN to all connections needing it */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + RemoteTransaction *transaction = &connection->remoteTransaction; + + Assert(PQtransactionStatus(connection->conn) != PQTRANS_ACTIVE); + + if (transaction->transactionFailed || + transaction->transactionState != REMOTE_TRANS_INVALID) + { + continue; + } + + if (PQtransactionStatus(connection->conn) != PQTRANS_INTRANS) + { + /* + * Check whether we're right now allowed to start new client + * transaction. FIXME: This likely can be removed soon. + */ + if (XactModificationLevel > XACT_MODIFICATION_NONE) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot open new connections after the first " + "modification command within a transaction"))); + } + + /* + * Explicitly specify READ COMMITTED, the default on the remote + * side might have been changed, and that would cause problematic + * behaviour. + */ + if (!SendRemoteCommand(connection, "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;")) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, true); + } + else + { + transaction->transactionState = REMOTE_TRANS_STARTING; + } + } + } + + + /* get result of all the BEGINs */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + RemoteTransaction *transaction = &connection->remoteTransaction; + PGresult *result = NULL; + + if (transaction->transactionFailed) + { + continue; + } + + if (!(transaction->transactionState == REMOTE_TRANS_STARTING)) + { + continue; + } + + result = PQgetResult(connection->conn); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, true); + } + else + { + transaction->transactionState = REMOTE_TRANS_STARTED; + } + + PQclear(result); + + ForgetResults(connection); + + Assert(PQtransactionStatus(connection->conn) == PQTRANS_INTRANS); + } +} + + +/* + * Begin, if necessary according to this session's coordinated transaction + * state, and the connection's state, an explicit transaction on the + * connections. + */ +void +AdjustRemoteTransactionState(MultiConnection *connection) +{ + /* just delegate */ + if (InCoordinatedTransaction()) + { + List *connectionList = list_make1(connection); + + AdjustRemoteTransactionStates(connectionList); + list_free(connectionList); + } +} + + +/* + * Record a connection as being failed. That'll, if a coordinated transaction + * is in progress, mean coordinated transactions will take appropriate action + * to handle with the failure. + * + * If the connection is marked as critical, and allowErrorPromotion is true, + * this routine will ERROR out. The allowErrorPromotion case is primarily + * required for the transaction management code itself. Usually it is helpful + * to fail as soon as possible. + */ +void +MarkRemoteTransactionFailed(MultiConnection *connection, bool allowErrorPromotion) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + + transaction->transactionFailed = true; + + /* + * If the connection is marked as critical, fail the entire coordinated + * transaction. If allowed. + */ + if (transaction->criticalTransaction && allowErrorPromotion) + { + ereport(ERROR, (errmsg("failure on connection marked as essential: %s:%d", + connection->hostname, connection->port))); + } +} + + +/* + * Perform distributed transactions handling at commit time. This will be + * called at XACT_EVENT_PRE_COMMIT if 1PC commits are used - so shards can + * still be invalidated - and at XACT_EVENT_COMMIT if 2PC is being used. + * + * Note that this routine has to issue rollbacks for failed transactions. In + * that case affected placements will be marked as invalid (via + * CheckForFailedPlacements()). + */ +void +CoordinatedRemoteTransactionsCommit(void) +{ + HASH_SEQ_STATUS status; + ConnectionHashEntry *entry; + ListCell *connectionCell; + + /* + * Issue appropriate transaction commands to remote nodes. If everything + * went well that's going to be COMMIT or COMMIT PREPARED, if individual + * connections had errors, some or all of them might require a ROLLBACK. + */ + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + foreach(connectionCell, entry->connections) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + RemoteTransaction *transaction = &connection->remoteTransaction; + + /* nothing to do if no transaction active */ + if (transaction->transactionState == REMOTE_TRANS_INVALID) + { + continue; + } + + if (transaction->transactionFailed) + { + /* + * Try sending an ROLLBACK; Depending on the state that won't + * succeed, but let's try. Have to clear previous results + * first. + */ + ForgetResults(connection); /* try to clear pending stuff */ + if (!SendRemoteCommand(connection, "ROLLBACK;")) + { + /* no point in reporting a likely redundant message */ + MarkRemoteTransactionFailed(connection, false); + } + else + { + transaction->transactionState = REMOTE_TRANS_1PC_ABORTING; + } + } + else if (transaction->transactionState == REMOTE_TRANS_PREPARED) + { + StringInfoData command; + + initStringInfo(&command); + appendStringInfo(&command, "COMMIT PREPARED '%s';", + transaction->preparedName); + + transaction->transactionState = REMOTE_TRANS_2PC_COMMITTING; + + if (!SendRemoteCommand(connection, command.data)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, false); + + WarnAboutLeakedPreparedTransaction(connection, true); + } + } + else + { + transaction->transactionState = REMOTE_TRANS_1PC_COMMITTING; + + if (!SendRemoteCommand(connection, "COMMIT;")) + { + /* for a moment there I thought we were in trouble */ + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, false); + } + } + } + } + + /* Wait for result */ + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + foreach(connectionCell, entry->connections) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + RemoteTransaction *transaction = &connection->remoteTransaction; + PGresult *result = NULL; + + if (transaction->transactionState != REMOTE_TRANS_1PC_COMMITTING && + transaction->transactionState != REMOTE_TRANS_2PC_COMMITTING && + transaction->transactionState != REMOTE_TRANS_1PC_ABORTING && + transaction->transactionState != REMOTE_TRANS_2PC_ABORTING) + { + continue; + } + + result = PQgetResult(connection->conn); + + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, false); + + if (transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING) + { + WarnAboutLeakedPreparedTransaction(connection, true); + } + } + else if (transaction->transactionState == REMOTE_TRANS_1PC_ABORTING || + transaction->transactionState == REMOTE_TRANS_2PC_ABORTING) + { + transaction->transactionState = REMOTE_TRANS_ABORTED; + } + else + { + transaction->transactionState = REMOTE_TRANS_COMMITTED; + } + + PQclear(result); + + ForgetResults(connection); + } + } +} + + +/* + * Perform distributed transactions handling at abort time. This issues + * ROLLBACKS and ROLLBACK PREPARED depending on whether the remote transaction + * has been prepared or not. + */ +void +CoordinatedRemoteTransactionsAbort(void) +{ + HASH_SEQ_STATUS status; + ConnectionHashEntry *entry; + ListCell *connectionCell; + + /* issue ROLLBACK; to all relevant remote nodes */ + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + foreach(connectionCell, entry->connections) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + RemoteTransaction *transaction = &connection->remoteTransaction; + + if (transaction->transactionState == REMOTE_TRANS_INVALID) + { + continue; + } + + if (transaction->transactionState == REMOTE_TRANS_PREPARED) + { + StringInfoData command; + + initStringInfo(&command); + appendStringInfo(&command, "ROLLBACK PREPARED '%s';", + transaction->preparedName); + + if (!SendRemoteCommand(connection, command.data)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, false); + + WarnAboutLeakedPreparedTransaction(connection, false); + } + else + { + transaction->transactionState = REMOTE_TRANS_2PC_ABORTING; + } + } + else + { + /* + * Try sending an ROLLBACK; Depending on the state + * that won't have success, but let's try. Have + * to clear previous results first. + */ + ForgetResults(connection); + if (!SendRemoteCommand(connection, "ROLLBACK;")) + { + /* no point in reporting a likely redundant message */ + MarkRemoteTransactionFailed(connection, false); + } + else + { + transaction->transactionState = REMOTE_TRANS_1PC_ABORTING; + } + } + } + } + + /* Wait for result */ + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + foreach(connectionCell, entry->connections) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + RemoteTransaction *transaction = &connection->remoteTransaction; + PGresult *result = NULL; + + if (transaction->transactionState != REMOTE_TRANS_1PC_ABORTING && + transaction->transactionState != REMOTE_TRANS_2PC_ABORTING) + { + continue; + } + + result = PQgetResult(connection->conn); + + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, false); + + if (transaction->transactionState == REMOTE_TRANS_1PC_ABORTING) + { + ereport(WARNING, + (errmsg("failed to abort 2PC transaction \"%s\" on %s:%d", + transaction->preparedName, connection->hostname, + connection->port))); + } + else + { + WarnAboutLeakedPreparedTransaction(connection, false); + } + } + + PQclear(result); + + result = PQgetResult(connection->conn); + Assert(!result); + + transaction->transactionState = REMOTE_TRANS_ABORTED; + } + } +} + + +/* + * Perform 2PC prepare on all non-failed transactions participating in the + * coordinated transaction. + */ +void +CoordinatedRemoteTransactionsPrepare(void) +{ + HASH_SEQ_STATUS status; + ConnectionHashEntry *entry; + ListCell *connectionCell; + + /* issue PREPARE TRANSACTION; to all relevant remote nodes */ + + /* TODO: skip connections that haven't done any DML/DDL */ + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + foreach(connectionCell, entry->connections) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + RemoteTransaction *transaction = &connection->remoteTransaction; + + if (transaction->transactionState == REMOTE_TRANS_INVALID) + { + continue; + } + + if (!(transaction->transactionFailed)) + { + StringInfoData command; + + initStringInfo(&command); + + Assign2PCIdentifier(connection); + + appendStringInfo(&command, "PREPARE TRANSACTION '%s'", + transaction->preparedName); + + if (!SendRemoteCommand(connection, command.data)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, false); + } + else + { + transaction->transactionState = REMOTE_TRANS_PREPARING; + } + } + } + } + + /* Wait for result */ + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + foreach(connectionCell, entry->connections) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + RemoteTransaction *transaction = &connection->remoteTransaction; + PGresult *result = NULL; + + if (transaction->transactionState != REMOTE_TRANS_PREPARING) + { + continue; + } + + result = PQgetResult(connection->conn); + + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, false); + } + else + { + transaction->transactionState = REMOTE_TRANS_PREPARED; + } + + result = PQgetResult(connection->conn); + Assert(!result); + } + } + + CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED; +} + + +/* + * Compute the 2PC transaction name to use. Every 2PC transaction should get a + * new name, i.e. this function will need to be called again. + * + * NB: we rely on the fact that we don't need to do full escaping on the names + * generated here. + */ +static void +Assign2PCIdentifier(MultiConnection *connection) +{ + static uint64 sequence = 0; + snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN, + "citus_%d_"UINT64_FORMAT, + MyProcPid, sequence++); +} + + +static void +WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool commit) +{ + StringInfoData command; + RemoteTransaction *transaction = &connection->remoteTransaction; + + initStringInfo(&command); + + if (commit) + appendStringInfo(&command, "COMMIT PREPARED '%s';", + transaction->preparedName); + else + appendStringInfo(&command, "ROLLBACK PREPARED '%s';", + transaction->preparedName); + + /* log a warning so the user may abort the transaction later */ + ereport(WARNING, (errmsg("failed to roll back prepared transaction '%s'", + transaction->preparedName), + errhint("Run \"%s\" on %s:%u", + command.data, connection->hostname, connection->port))); + +} diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index e03f1476b..269aebd57 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -42,6 +42,51 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction SubTransactionId parentSubid, void *arg); +/* + * BeginCoordinatedTransaction begins a coordinated transaction. No + * pre-existing coordinated transaction may be in progress. + */ +void +BeginCoordinatedTransaction(void) +{ + if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE && + CurrentCoordinatedTransactionState != COORD_TRANS_IDLE) + { + ereport(ERROR, (errmsg("starting transaction in wrong state"))); + } + + CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; +} + + +/* + * BeginOrContinueCoordinatedTransaction starts a coordinated transaction, + * unless one already is in progress. + */ +void +BeginOrContinueCoordinatedTransaction(void) +{ + if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED) + { + return; + } + + BeginCoordinatedTransaction(); +} + + +/* + * InCoordinatedTransaction returns whether a coordinated transaction has been + * started. + */ +bool +InCoordinatedTransaction(void) +{ + return CurrentCoordinatedTransactionState != COORD_TRANS_NONE && + CurrentCoordinatedTransactionState != COORD_TRANS_IDLE; +} + + void InitializeTransactionManagement(void) { @@ -66,6 +111,12 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) { case XACT_EVENT_COMMIT: { + if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) + { + /* handles both already prepared and open transactions */ + CoordinatedRemoteTransactionsCommit(); + } + /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { @@ -80,6 +131,18 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) case XACT_EVENT_ABORT: { + /* + * FIXME: Add warning for the COORD_TRANS_COMMITTED case. That + * can be reached if this backend fails after the + * XACT_EVENT_PRE_COMMIT state. + */ + + /* handles both already prepared and open transactions */ + if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) + { + CoordinatedRemoteTransactionsAbort(); + } + /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { @@ -108,6 +171,29 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) errmsg("cannot ROLLBACK TO SAVEPOINT in transactions " "which modify distributed tables"))); } + + /* nothing further to do if there's no managed remote xacts */ + if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE) + { + break; + } + + + if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) + { + CoordinatedRemoteTransactionsPrepare(); + CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED; + } + else + { + /* + * Have to commit remote transactions in PRE_COMMIT, to allow + * us to mark failed placements as invalid. Better don't use + * this for anything important (i.e. DDL/metadata). + */ + CoordinatedRemoteTransactionsCommit(); + CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED; + } } break; diff --git a/src/backend/distributed/utils/connection_management.c b/src/backend/distributed/utils/connection_management.c index 462cc5061..e4828021f 100644 --- a/src/backend/distributed/utils/connection_management.c +++ b/src/backend/distributed/utils/connection_management.c @@ -138,6 +138,8 @@ AtEOXact_Connections(bool isCommit) { /* reset per-transaction state */ connection->activeInTransaction = false; + connection->remoteTransaction.transactionState = REMOTE_TRANS_INVALID; + connection->remoteTransaction.transactionFailed = false; UnclaimConnection(connection); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index cc22b7794..0eba1b1e2 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -12,6 +12,7 @@ #define CONNECTION_MANAGMENT_H #include "distributed/transaction_management.h" +#include "distributed/remote_transaction.h" #include "nodes/pg_list.h" #include "utils/hsearch.h" @@ -71,6 +72,9 @@ typedef struct MultiConnection /* has the connection been used in the current coordinated transaction? */ bool activeInTransaction; + + /* information about the associated remote transaction */ + RemoteTransaction remoteTransaction; } MultiConnection; diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h new file mode 100644 index 000000000..e16e0d898 --- /dev/null +++ b/src/include/distributed/remote_transaction.h @@ -0,0 +1,78 @@ +/*------------------------------------------------------------------------- + * remote_transaction.h + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + + +#ifndef REMOTE_TRANSACTION_H +#define REMOTE_TRANSACTION_H + + +#include "nodes/pg_list.h" + + +/* forward declare, to avoid recursive includes */ +struct MultiConnection; + +/* + * Enum that defines different remote transaction states, of a single remote + * transaction. + */ +typedef enum +{ + /* no transaction active */ + REMOTE_TRANS_INVALID = 0, + + /* transaction start */ + REMOTE_TRANS_STARTING, + REMOTE_TRANS_STARTED, + + /* 2pc prepare */ + REMOTE_TRANS_PREPARING, + REMOTE_TRANS_PREPARED, + + /* transaction abort */ + REMOTE_TRANS_1PC_ABORTING, + REMOTE_TRANS_2PC_ABORTING, + REMOTE_TRANS_ABORTED, + + /* transaction commit */ + REMOTE_TRANS_1PC_COMMITTING, + REMOTE_TRANS_2PC_COMMITTING, + REMOTE_TRANS_COMMITTED +} RemoteTransactionState; + + +/* + * Transaction state associated associated with a single MultiConnection. + */ +typedef struct RemoteTransaction +{ + /* what state is the remote side transaction in */ + RemoteTransactionState transactionState; + + /* failures on this connection should abort entire coordinated transaction */ + bool criticalTransaction; + + /* failed in current transaction */ + bool transactionFailed; + + /* 2PC transaction name currently associated with connection */ + char preparedName[NAMEDATALEN]; +} RemoteTransaction; + + +/* change remote transaction state */ +extern void AdjustRemoteTransactionState(struct MultiConnection *connection); +extern void AdjustRemoteTransactionStates(List *connectionList); +extern void MarkRemoteTransactionFailed(struct MultiConnection *connection, + bool allowErrorPromotion); + +extern void CoordinatedRemoteTransactionsCommit(void); +extern void CoordinatedRemoteTransactionsAbort(void); +extern void CoordinatedRemoteTransactionsPrepare(void); + +#endif /* REMOTE_TRANSACTION_H */ diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index 77f4cfa37..14d5cf970 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -29,7 +29,16 @@ typedef enum CoordinatedTransactionState COORD_TRANS_NONE, /* no coordinated transaction in progress, but connections established */ - COORD_TRANS_IDLE + COORD_TRANS_IDLE, + + /* coordinated transaction in progress */ + COORD_TRANS_STARTED, + + /* coordinated transaction prepared on all workers */ + COORD_TRANS_PREPARED, + + /* coordinated transaction committed */ + COORD_TRANS_COMMITTED } CoordinatedTransactionState; @@ -50,6 +59,13 @@ extern XactModificationType XactModificationLevel; extern CoordinatedTransactionState CurrentCoordinatedTransactionState; +/* + * Coordinated transaction management. + */ +extern void BeginCoordinatedTransaction(void); +extern void BeginOrContinueCoordinatedTransaction(void); +extern bool InCoordinatedTransaction(void); + /* * Initialization. */