From 6ec34bed84b42653e05cbe7249c0c81876d885a9 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 20 Jan 2017 22:27:33 -0800 Subject: [PATCH] Remove remnants of commit_protocol.[ch]. --- .../executor/multi_router_executor.c | 1 - .../distributed/executor/multi_utility.c | 1 - .../master/master_modify_multiple_shards.c | 1 - src/backend/distributed/shared_library_init.c | 1 - .../distributed/transaction/commit_protocol.c | 256 ------------------ .../transaction/multi_shard_transaction.c | 20 -- .../transaction/worker_transaction.c | 1 - src/include/distributed/commit_protocol.h | 55 ---- .../distributed/multi_shard_transaction.h | 1 - 9 files changed, 337 deletions(-) delete mode 100644 src/backend/distributed/transaction/commit_protocol.c delete mode 100644 src/include/distributed/commit_protocol.h diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 9067d748f..337494d3d 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -28,7 +28,6 @@ #include "catalog/pg_type.h" #include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" -#include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" #include "distributed/connection_management.h" #include "distributed/deparse_shard_query.h" diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 613df3adc..f0b81e40a 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -32,7 +32,6 @@ #include "commands/prepare.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" -#include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 2c7378466..8fc1abe0b 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -26,7 +26,6 @@ #include "commands/event_trigger.h" #include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" -#include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index c1452392c..fae01da04 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -20,7 +20,6 @@ #include "executor/executor.h" #include "distributed/citus_nodefuncs.h" #include "distributed/connection_management.h" -#include "distributed/commit_protocol.h" #include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/multi_copy.h" diff --git a/src/backend/distributed/transaction/commit_protocol.c b/src/backend/distributed/transaction/commit_protocol.c deleted file mode 100644 index dd81d6f5c..000000000 --- a/src/backend/distributed/transaction/commit_protocol.c +++ /dev/null @@ -1,256 +0,0 @@ -/*------------------------------------------------------------------------- - * - * commit_protocol.c - * This file contains functions for managing 1PC or 2PC transactions - * across many shard placements. - * - * Copyright (c) 2016, Citus Data, Inc. - * - *------------------------------------------------------------------------- - */ - - -#include "postgres.h" -#include "libpq-fe.h" -#include "miscadmin.h" - -#include "distributed/commit_protocol.h" -#include "distributed/connection_cache.h" -#include "distributed/master_metadata_utility.h" -#include "distributed/multi_shard_transaction.h" -#include "lib/stringinfo.h" -#include "nodes/pg_list.h" - - -/* Local functions forward declarations */ -static uint32 DistributedTransactionId = 0; - - -/* - * InitializeDistributedTransaction prepares the distributed transaction ID - * used in transaction names. - */ -void -InitializeDistributedTransaction(void) -{ - DistributedTransactionId++; -} - - -/* - * PrepareRemoteTransactions prepares all transactions on connections in - * connectionList for commit if the 2PC commit protocol is enabled. - * On failure, it reports an error and stops. - */ -void -PrepareRemoteTransactions(List *connectionList) -{ - ListCell *connectionCell = NULL; - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - int64 connectionId = transactionConnection->connectionId; - - PGresult *result = NULL; - StringInfo command = makeStringInfo(); - StringInfo transactionName = BuildTransactionName(connectionId); - - appendStringInfo(command, "PREPARE TRANSACTION '%s'", transactionName->data); - - result = PQexec(connection, command->data); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - /* a failure to prepare is an implicit rollback */ - transactionConnection->transactionState = TRANSACTION_STATE_CLOSED; - - WarnRemoteError(connection, result); - PQclear(result); - - ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("failed to prepare transaction"))); - } - - ereport(DEBUG2, (errmsg("sent PREPARE TRANSACTION over connection %ld", - connectionId))); - - PQclear(result); - - transactionConnection->transactionState = TRANSACTION_STATE_PREPARED; - } -} - - -/* - * AbortRemoteTransactions aborts all transactions on connections in connectionList. - * On failure, it reports a warning and continues to abort all of them. - */ -void -AbortRemoteTransactions(List *connectionList) -{ - ListCell *connectionCell = NULL; - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - int64 connectionId = transactionConnection->connectionId; - PGresult *result = NULL; - - if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED) - { - StringInfo command = makeStringInfo(); - StringInfo transactionName = BuildTransactionName(connectionId); - - appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName->data); - - result = PQexec(connection, command->data); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); - - /* log a warning so the user may abort the transaction later */ - ereport(WARNING, (errmsg("failed to roll back prepared transaction '%s'", - transactionName->data), - errhint("Run \"%s\" on %s:%s", - command->data, nodeName, nodePort))); - } - - ereport(DEBUG2, (errmsg("sent ROLLBACK over connection %ld", connectionId))); - - PQclear(result); - } - else if (transactionConnection->transactionState == TRANSACTION_STATE_OPEN) - { - /* try to roll back cleanly, if it fails then we won't commit anyway */ - result = PQexec(connection, "ROLLBACK"); - PQclear(result); - } - - transactionConnection->transactionState = TRANSACTION_STATE_CLOSED; - } -} - - -/* - * CommitRemoteTransactions commits all transactions on connections in connectionList. - * If stopOnFailure is true, then CommitRemoteTransactions reports an error on - * failure, otherwise it reports a warning. - * Note that if the caller of this function wants the transactions to roll back - * on a failing commit, stopOnFailure should be used as true. On the other hand, - * if the caller does not want the transactions to roll back on a failing commit, - * stopOnFailure should be used as false. - */ -void -CommitRemoteTransactions(List *connectionList, bool stopOnFailure) -{ - ListCell *connectionCell = NULL; - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - int64 connectionId = transactionConnection->connectionId; - PGresult *result = NULL; - - if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED) - { - StringInfo command = makeStringInfo(); - StringInfo transactionName = BuildTransactionName(connectionId); - - /* we shouldn't be committing if any transactions are not prepared */ - Assert(transactionConnection->transactionState == TRANSACTION_STATE_PREPARED); - - appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName->data); - - result = PQexec(connection, command->data); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); - - /* - * If stopOnFailure is false, log a warning so the user may - * commit the transaction later. - */ - if (stopOnFailure) - { - ereport(ERROR, (errmsg("failed to commit prepared transaction '%s'", - transactionName->data), - errhint("Run \"%s\" on %s:%s", - command->data, nodeName, nodePort))); - } - else - { - ereport(WARNING, (errmsg("failed to commit prepared transaction '%s'", - transactionName->data), - errhint("Run \"%s\" on %s:%s", - command->data, nodeName, nodePort))); - } - } - - ereport(DEBUG2, (errmsg("sent COMMIT PREPARED over connection %ld", - connectionId))); - } - else - { - /* we shouldn't be committing if any transactions are not open */ - Assert(transactionConnection->transactionState == TRANSACTION_STATE_OPEN); - - /* - * Try to commit, if it fails and stopOnFailure is false then - * the user might lose data. - */ - result = PQexec(connection, "COMMIT"); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); - - if (stopOnFailure) - { - ereport(ERROR, (errmsg("failed to commit transaction on %s:%s", - nodeName, nodePort))); - } - else - { - ereport(WARNING, (errmsg("failed to commit transaction on %s:%s", - nodeName, nodePort))); - } - } - - ereport(DEBUG2, (errmsg("sent COMMIT over connection %ld", connectionId))); - } - - PQclear(result); - - transactionConnection->transactionState = TRANSACTION_STATE_CLOSED; - } -} - - -/* - * BuildTransactionName constructs a transaction name that ensures there are no - * collisions with concurrent transactions by the same master node, subsequent - * transactions by the same backend, or transactions on a different shard. - * - * Collisions may occur over time if transactions fail to commit or abort and - * are left to linger. This would cause a PREPARE failure for the second - * transaction, which causes it to be rolled back. In general, the user - * should ensure that prepared transactions do not linger. - */ -StringInfo -BuildTransactionName(int connectionId) -{ - StringInfo commandString = makeStringInfo(); - - appendStringInfo(commandString, "citus_%d_%u_%d", MyProcPid, - DistributedTransactionId, connectionId); - - return commandString; -} diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index 7e43ac595..3f18ecad3 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -14,7 +14,6 @@ #include "postgres.h" #include "distributed/colocation_utils.h" -#include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" #include "distributed/connection_management.h" #include "distributed/master_metadata_utility.h" @@ -250,22 +249,3 @@ ResetShardPlacementTransactionState(void) SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE; } } - - -/* - * CloseConnections closes all connections in connectionList. - */ -void -CloseConnections(List *connectionList) -{ - ListCell *connectionCell = NULL; - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - - CloseConnectionByPGconn(connection); - } -} diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 8fd0e5395..8a9ea8822 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -18,7 +18,6 @@ #include #include "access/xact.h" -#include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" diff --git a/src/include/distributed/commit_protocol.h b/src/include/distributed/commit_protocol.h deleted file mode 100644 index 5805dfe9f..000000000 --- a/src/include/distributed/commit_protocol.h +++ /dev/null @@ -1,55 +0,0 @@ -/*------------------------------------------------------------------------- - * - * commit_protocol.h - * Type and function declarations used in performing transactions across - * shard placements. - * - * Copyright (c) 2016, Citus Data, Inc. - * - *------------------------------------------------------------------------- - */ - -#ifndef COMMIT_PROTOCOL_H -#define COMMIT_PROTOCOL_H - - -#include "access/xact.h" -#include "distributed/connection_management.h" -#include "libpq-fe.h" -#include "lib/stringinfo.h" -#include "nodes/pg_list.h" - - -/* Enumeration that defines different remote transaction states */ -typedef enum -{ - TRANSACTION_STATE_INVALID = 0, - TRANSACTION_STATE_OPEN, - TRANSACTION_STATE_COPY_STARTED, - TRANSACTION_STATE_PREPARED, - TRANSACTION_STATE_CLOSED -} TransactionState; - -/* - * TransactionConnection represents a connection to a remote node which is - * used to perform a transaction on shard placements. - */ -typedef struct TransactionConnection -{ - int groupId; - int64 connectionId; - TransactionState transactionState; - PGconn *connection; - const char *nodeName; - int nodePort; -} TransactionConnection; - - -/* Functions declarations for transaction and connection management */ -extern void InitializeDistributedTransaction(void); -extern void PrepareRemoteTransactions(List *connectionList); -extern void AbortRemoteTransactions(List *connectionList); -extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure); -extern StringInfo BuildTransactionName(int connectionId); - -#endif /* COMMIT_PROTOCOL_H */ diff --git a/src/include/distributed/multi_shard_transaction.h b/src/include/distributed/multi_shard_transaction.h index c6c66bc46..36c6b51cc 100644 --- a/src/include/distributed/multi_shard_transaction.h +++ b/src/include/distributed/multi_shard_transaction.h @@ -33,7 +33,6 @@ extern ShardConnections * GetShardConnections(int64 shardId, bool *shardConnecti extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId, bool *connectionsFound); extern List * ShardConnectionList(HTAB *connectionHash); -extern void CloseConnections(List *connectionList); extern void ResetShardPlacementTransactionState(void);