diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index cde09697e..e39a4b039 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -18,7 +18,6 @@ #include "commands/explain.h" #include "executor/executor.h" -#include "distributed/commit_protocol.h" #include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/multi_copy.h" @@ -30,7 +29,6 @@ #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" -#include "distributed/multi_shard_transaction.h" #include "distributed/multi_utility.h" #include "distributed/placement_connection.h" #include "distributed/remote_commands.h" @@ -158,9 +156,6 @@ _PG_init(void) InitializeTransactionManagement(); InitializeConnectionManagement(); InitPlacementConnectionManagement(); - - /* initialize transaction callbacks */ - RegisterShardPlacementXactCallbacks(); } diff --git a/src/backend/distributed/transaction/commit_protocol.c b/src/backend/distributed/transaction/commit_protocol.c deleted file mode 100644 index 329512749..000000000 --- a/src/backend/distributed/transaction/commit_protocol.c +++ /dev/null @@ -1,260 +0,0 @@ -/*------------------------------------------------------------------------- - * - * commit_protocol.c - * This file contains functions for managing 1PC or 2PC transactions - * across many shard placements. - * - * Copyright (c) 2016, Citus Data, Inc. - * - *------------------------------------------------------------------------- - */ - - -#include "postgres.h" -#include "libpq-fe.h" -#include "miscadmin.h" - -#include "distributed/commit_protocol.h" -#include "distributed/connection_cache.h" -#include "distributed/master_metadata_utility.h" -#include "distributed/multi_shard_transaction.h" -#include "lib/stringinfo.h" -#include "nodes/pg_list.h" - - -/* Local functions forward declarations */ -static uint32 DistributedTransactionId = 0; - - -/* Local functions forward declarations */ -static StringInfo BuildTransactionName(int connectionId); - - -/* - * InitializeDistributedTransaction prepares the distributed transaction ID - * used in transaction names. - */ -void -InitializeDistributedTransaction(void) -{ - DistributedTransactionId++; -} - - -/* - * PrepareRemoteTransactions prepares all transactions on connections in - * connectionList for commit if the 2PC commit protocol is enabled. - * On failure, it reports an error and stops. - */ -void -PrepareRemoteTransactions(List *connectionList) -{ - ListCell *connectionCell = NULL; - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - int64 connectionId = transactionConnection->connectionId; - - PGresult *result = NULL; - StringInfo command = makeStringInfo(); - StringInfo transactionName = BuildTransactionName(connectionId); - - appendStringInfo(command, "PREPARE TRANSACTION '%s'", transactionName->data); - - result = PQexec(connection, command->data); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - /* a failure to prepare is an implicit rollback */ - transactionConnection->transactionState = TRANSACTION_STATE_CLOSED; - - WarnRemoteError(connection, result); - PQclear(result); - - ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("failed to prepare transaction"))); - } - - ereport(DEBUG2, (errmsg("sent PREPARE TRANSACTION over connection %ld", - connectionId))); - - PQclear(result); - - transactionConnection->transactionState = TRANSACTION_STATE_PREPARED; - } -} - - -/* - * AbortRemoteTransactions aborts all transactions on connections in connectionList. - * On failure, it reports a warning and continues to abort all of them. - */ -void -AbortRemoteTransactions(List *connectionList) -{ - ListCell *connectionCell = NULL; - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - int64 connectionId = transactionConnection->connectionId; - PGresult *result = NULL; - - if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED) - { - StringInfo command = makeStringInfo(); - StringInfo transactionName = BuildTransactionName(connectionId); - - appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName->data); - - result = PQexec(connection, command->data); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); - - /* log a warning so the user may abort the transaction later */ - ereport(WARNING, (errmsg("failed to roll back prepared transaction '%s'", - transactionName->data), - errhint("Run \"%s\" on %s:%s", - command->data, nodeName, nodePort))); - } - - ereport(DEBUG2, (errmsg("sent ROLLBACK over connection %ld", connectionId))); - - PQclear(result); - } - else if (transactionConnection->transactionState == TRANSACTION_STATE_OPEN) - { - /* try to roll back cleanly, if it fails then we won't commit anyway */ - result = PQexec(connection, "ROLLBACK"); - PQclear(result); - } - - transactionConnection->transactionState = TRANSACTION_STATE_CLOSED; - } -} - - -/* - * CommitRemoteTransactions commits all transactions on connections in connectionList. - * If stopOnFailure is true, then CommitRemoteTransactions reports an error on - * failure, otherwise it reports a warning. - * Note that if the caller of this function wants the transactions to roll back - * on a failing commit, stopOnFailure should be used as true. On the other hand, - * if the caller does not want the transactions to roll back on a failing commit, - * stopOnFailure should be used as false. - */ -void -CommitRemoteTransactions(List *connectionList, bool stopOnFailure) -{ - ListCell *connectionCell = NULL; - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - int64 connectionId = transactionConnection->connectionId; - PGresult *result = NULL; - - if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED) - { - StringInfo command = makeStringInfo(); - StringInfo transactionName = BuildTransactionName(connectionId); - - /* we shouldn't be committing if any transactions are not prepared */ - Assert(transactionConnection->transactionState == TRANSACTION_STATE_PREPARED); - - appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName->data); - - result = PQexec(connection, command->data); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); - - /* - * If stopOnFailure is false, log a warning so the user may - * commit the transaction later. - */ - if (stopOnFailure) - { - ereport(ERROR, (errmsg("failed to commit prepared transaction '%s'", - transactionName->data), - errhint("Run \"%s\" on %s:%s", - command->data, nodeName, nodePort))); - } - else - { - ereport(WARNING, (errmsg("failed to commit prepared transaction '%s'", - transactionName->data), - errhint("Run \"%s\" on %s:%s", - command->data, nodeName, nodePort))); - } - } - - ereport(DEBUG2, (errmsg("sent COMMIT PREPARED over connection %ld", - connectionId))); - } - else - { - /* we shouldn't be committing if any transactions are not open */ - Assert(transactionConnection->transactionState == TRANSACTION_STATE_OPEN); - - /* - * Try to commit, if it fails and stopOnFailure is false then - * the user might lose data. - */ - result = PQexec(connection, "COMMIT"); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); - - if (stopOnFailure) - { - ereport(ERROR, (errmsg("failed to commit transaction on %s:%s", - nodeName, nodePort))); - } - else - { - ereport(WARNING, (errmsg("failed to commit transaction on %s:%s", - nodeName, nodePort))); - } - } - - ereport(DEBUG2, (errmsg("sent COMMIT over connection %ld", connectionId))); - } - - PQclear(result); - - transactionConnection->transactionState = TRANSACTION_STATE_CLOSED; - } -} - - -/* - * BuildTransactionName constructs a transaction name that ensures there are no - * collisions with concurrent transactions by the same master node, subsequent - * transactions by the same backend, or transactions on a different shard. - * - * Collisions may occur over time if transactions fail to commit or abort and - * are left to linger. This would cause a PREPARE failure for the second - * transaction, which causes it to be rolled back. In general, the user - * should ensure that prepared transactions do not linger. - */ -static StringInfo -BuildTransactionName(int connectionId) -{ - StringInfo commandString = makeStringInfo(); - - appendStringInfo(commandString, "citus_%d_%u_%d", MyProcPid, - DistributedTransactionId, connectionId); - - return commandString; -} diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c deleted file mode 100644 index cfbb44ec3..000000000 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ /dev/null @@ -1,341 +0,0 @@ -/*------------------------------------------------------------------------- - * - * multi_shard_transaction.c - * This file contains functions for managing 1PC or 2PC transactions - * across many shard placements. - * - * Copyright (c) 2016, Citus Data, Inc. - * - *------------------------------------------------------------------------- - */ - - -#include "libpq-fe.h" -#include "postgres.h" - -#include "distributed/commit_protocol.h" -#include "distributed/connection_cache.h" -#include "distributed/master_metadata_utility.h" -#include "distributed/multi_shard_transaction.h" -#include "nodes/pg_list.h" -#include "storage/ipc.h" -#include "utils/memutils.h" - - -#define INITIAL_CONNECTION_CACHE_SIZE 1001 - - -/* Global variables used in commit handler */ -static HTAB *shardConnectionHash = NULL; -static bool subXactAbortAttempted = false; - -/* functions needed by callbacks and hooks */ -static void CompleteShardPlacementTransactions(XactEvent event, void *arg); -static void MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId, - SubTransactionId parentSubid, void *arg); - - -/* - * OpenTransactionsToAllShardPlacements opens connections to all placements - * using the provided shard identifier list. Connections accumulate in a global - * shardConnectionHash variable for use (and re-use) within this transaction. - */ -void -OpenTransactionsToAllShardPlacements(List *shardIntervalList, char *userName) -{ - ListCell *shardIntervalCell = NULL; - - if (shardConnectionHash == NULL) - { - shardConnectionHash = CreateShardConnectionHash(TopTransactionContext); - } - - foreach(shardIntervalCell, shardIntervalList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); - uint64 shardId = shardInterval->shardId; - - BeginTransactionOnShardPlacements(shardId, userName); - } -} - - -/* - * CreateShardConnectionHash constructs a hash table which maps from shard - * identifier to connection lists, passing the provided MemoryContext to - * hash_create for hash allocations. - */ -HTAB * -CreateShardConnectionHash(MemoryContext memoryContext) -{ - HTAB *shardConnectionsHash = NULL; - int hashFlags = 0; - HASHCTL info; - - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(int64); - info.entrysize = sizeof(ShardConnections); - info.hcxt = memoryContext; - hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); - - shardConnectionsHash = hash_create("Shard Connections Hash", - INITIAL_CONNECTION_CACHE_SIZE, &info, - hashFlags); - - return shardConnectionsHash; -} - - -/* - * BeginTransactionOnShardPlacements opens new connections (if necessary) to - * all placements of a shard (specified by shard identifier). After sending a - * BEGIN command on all connections, they are added to shardConnectionHash for - * use within this transaction. Exits early if connections already exist for - * the specified shard, and errors if no placements can be found, a connection - * cannot be made, or if the BEGIN command fails. - */ -void -BeginTransactionOnShardPlacements(uint64 shardId, char *userName) -{ - List *shardPlacementList = NIL; - ListCell *placementCell = NULL; - - ShardConnections *shardConnections = NULL; - bool shardConnectionsFound = false; - - MemoryContext oldContext = NULL; - shardPlacementList = FinalizedShardPlacementList(shardId); - - if (shardPlacementList == NIL) - { - /* going to have to have some placements to do any work */ - ereport(ERROR, (errmsg("could not find any shard placements for the shard " - UINT64_FORMAT, shardId))); - } - - /* get existing connections to the shard placements, if any */ - shardConnections = GetShardConnections(shardId, &shardConnectionsFound); - if (shardConnectionsFound) - { - /* exit early if we've already established shard transactions */ - return; - } - - foreach(placementCell, shardPlacementList) - { - ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell); - PGconn *connection = NULL; - TransactionConnection *transactionConnection = NULL; - PGresult *result = NULL; - - connection = ConnectToNode(shardPlacement->nodeName, shardPlacement->nodePort, - userName); - - if (connection == NULL) - { - ereport(ERROR, (errmsg("could not establish a connection to all " - "placements of shard %lu", shardId))); - } - - /* entries must last through the whole top-level transaction */ - oldContext = MemoryContextSwitchTo(TopTransactionContext); - - transactionConnection = palloc0(sizeof(TransactionConnection)); - - transactionConnection->connectionId = shardConnections->shardId; - transactionConnection->transactionState = TRANSACTION_STATE_INVALID; - transactionConnection->connection = connection; - - shardConnections->connectionList = lappend(shardConnections->connectionList, - transactionConnection); - - MemoryContextSwitchTo(oldContext); - - /* now that connection is tracked, issue BEGIN */ - result = PQexec(connection, "BEGIN"); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - ReraiseRemoteError(connection, result); - } - } -} - - -/* - * GetShardConnections finds existing connections for a shard in the global - * connection hash. If not found, then a ShardConnections structure with empty - * connectionList is returned and the shardConnectionsFound output parameter - * will be set to false. - */ -ShardConnections * -GetShardConnections(int64 shardId, bool *shardConnectionsFound) -{ - return GetShardHashConnections(shardConnectionHash, shardId, shardConnectionsFound); -} - - -/* - * GetShardHashConnections finds existing connections for a shard in the - * provided hash. If not found, then a ShardConnections structure with empty - * connectionList is returned. - */ -ShardConnections * -GetShardHashConnections(HTAB *connectionHash, int64 shardId, bool *connectionsFound) -{ - ShardConnections *shardConnections = NULL; - - shardConnections = (ShardConnections *) hash_search(connectionHash, &shardId, - HASH_ENTER, connectionsFound); - if (!*connectionsFound) - { - shardConnections->shardId = shardId; - shardConnections->connectionList = NIL; - } - - return shardConnections; -} - - -/* - * ConnectionList flattens the connection hash to a list of placement connections. - */ -List * -ConnectionList(HTAB *connectionHash) -{ - List *connectionList = NIL; - HASH_SEQ_STATUS status; - ShardConnections *shardConnections = NULL; - - if (connectionHash == NULL) - { - return NIL; - } - - hash_seq_init(&status, connectionHash); - - shardConnections = (ShardConnections *) hash_seq_search(&status); - while (shardConnections != NULL) - { - List *shardConnectionsList = list_copy(shardConnections->connectionList); - connectionList = list_concat(connectionList, shardConnectionsList); - - shardConnections = (ShardConnections *) hash_seq_search(&status); - } - - return connectionList; -} - - -/* - * RegisterShardPlacementXactCallbacks registers transaction callbacks needed - * for multi-shard transactions. - */ -void -RegisterShardPlacementXactCallbacks(void) -{ - RegisterXactCallback(CompleteShardPlacementTransactions, NULL); - RegisterSubXactCallback(MultiShardSubXactCallback, NULL); -} - - -/* - * CompleteShardPlacementTransactions commits or aborts pending shard placement - * transactions when the local transaction commits or aborts. - */ -static void -CompleteShardPlacementTransactions(XactEvent event, void *arg) -{ - List *connectionList = ConnectionList(shardConnectionHash); - - if (shardConnectionHash == NULL) - { - /* nothing to do */ - return; - } - - if (event == XACT_EVENT_PRE_COMMIT) - { - if (subXactAbortAttempted) - { - subXactAbortAttempted = false; - - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot ROLLBACK TO SAVEPOINT in transactions " - "which modify distributed tables"))); - } - - /* - * Any failure here will cause local changes to be rolled back, - * and remote changes to either roll back (1PC) or, in case of - * connection or node failure, leave a prepared transaction - * (2PC). - */ - - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) - { - PrepareRemoteTransactions(connectionList); - } - - return; - } - else if (event == XACT_EVENT_COMMIT) - { - /* - * A failure here will cause some remote changes to either - * roll back (1PC) or, in case of connection or node failure, - * leave a prepared transaction (2PC). However, the local - * changes have already been committed. - */ - - CommitRemoteTransactions(connectionList, false); - } - else if (event == XACT_EVENT_ABORT) - { - /* - * A failure here will cause some remote changes to either - * roll back (1PC) or, in case of connection or node failure, - * leave a prepared transaction (2PC). The local changes have - * already been rolled back. - */ - - AbortRemoteTransactions(connectionList); - } - else - { - return; - } - - CloseConnections(connectionList); - shardConnectionHash = NULL; - subXactAbortAttempted = false; -} - - -static void -MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId, - SubTransactionId parentSubid, void *arg) -{ - if ((shardConnectionHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB)) - { - subXactAbortAttempted = true; - } -} - - -/* - * CloseConnections closes all connections in connectionList. - */ -void -CloseConnections(List *connectionList) -{ - ListCell *connectionCell = NULL; - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - - PQfinish(connection); - } -} diff --git a/src/include/distributed/commit_protocol.h b/src/include/distributed/commit_protocol.h deleted file mode 100644 index 43fbb6328..000000000 --- a/src/include/distributed/commit_protocol.h +++ /dev/null @@ -1,51 +0,0 @@ -/*------------------------------------------------------------------------- - * - * commit_protocol.h - * Type and function declarations used in performing transactions across - * shard placements. - * - * Copyright (c) 2016, Citus Data, Inc. - * - *------------------------------------------------------------------------- - */ - -#ifndef COMMIT_PROTOCOL_H -#define COMMIT_PROTOCOL_H - - -#include "access/xact.h" -#include "distributed/connection_management.h" -#include "libpq-fe.h" -#include "lib/stringinfo.h" -#include "nodes/pg_list.h" - - -/* Enumeration that defines different remote transaction states */ -typedef enum -{ - TRANSACTION_STATE_INVALID = 0, - TRANSACTION_STATE_OPEN, - TRANSACTION_STATE_COPY_STARTED, - TRANSACTION_STATE_PREPARED, - TRANSACTION_STATE_CLOSED -} TransactionState; - -/* - * TransactionConnection represents a connection to a remote node which is - * used to perform a transaction on shard placements. - */ -typedef struct TransactionConnection -{ - int64 connectionId; - TransactionState transactionState; - PGconn *connection; -} TransactionConnection; - - -/* Functions declarations for transaction and connection management */ -extern void InitializeDistributedTransaction(void); -extern void PrepareRemoteTransactions(List *connectionList); -extern void AbortRemoteTransactions(List *connectionList); -extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure); - -#endif /* COMMIT_PROTOCOL_H */ diff --git a/src/include/distributed/multi_shard_transaction.h b/src/include/distributed/multi_shard_transaction.h deleted file mode 100644 index 4b5ec354a..000000000 --- a/src/include/distributed/multi_shard_transaction.h +++ /dev/null @@ -1,39 +0,0 @@ -/*------------------------------------------------------------------------- - * - * multi_shard_transaction.h - * Type and function declarations used in performing transactions across - * shard placements. - * - * Copyright (c) 2016, Citus Data, Inc. - * - *------------------------------------------------------------------------- - */ - -#ifndef MULTI_SHARD_TRANSACTION_H -#define MULTI_SHARD_TRANSACTION_H - - -#include "utils/hsearch.h" -#include "nodes/pg_list.h" - - -/* ShardConnections represents a set of connections for each placement of a shard */ -typedef struct ShardConnections -{ - int64 shardId; - List *connectionList; -} ShardConnections; - - -extern void OpenTransactionsToAllShardPlacements(List *shardIdList, char *relationOwner); -extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext); -extern void BeginTransactionOnShardPlacements(uint64 shardId, char *nodeUser); -extern ShardConnections * GetShardConnections(int64 shardId, bool *shardConnectionsFound); -extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId, - bool *connectionsFound); -extern List * ConnectionList(HTAB *connectionHash); -extern void CloseConnections(List *connectionList); -extern void RegisterShardPlacementXactCallbacks(void); - - -#endif /* MULTI_SHARD_TRANSACTION_H */