Remove obsoleted transaction infrastructure.

pull/775/head
Andres Freund 2016-10-07 17:09:55 -07:00
parent a8f9e983a0
commit 3b9826e6e9
5 changed files with 0 additions and 696 deletions

View File

@ -18,7 +18,6 @@
#include "commands/explain.h"
#include "executor/executor.h"
#include "distributed/commit_protocol.h"
#include "distributed/connection_management.h"
#include "distributed/master_protocol.h"
#include "distributed/multi_copy.h"
@ -30,7 +29,6 @@
#include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/multi_utility.h"
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
@ -158,9 +156,6 @@ _PG_init(void)
InitializeTransactionManagement();
InitializeConnectionManagement();
InitPlacementConnectionManagement();
/* initialize transaction callbacks */
RegisterShardPlacementXactCallbacks();
}

View File

@ -1,260 +0,0 @@
/*-------------------------------------------------------------------------
*
* commit_protocol.c
* This file contains functions for managing 1PC or 2PC transactions
* across many shard placements.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/multi_shard_transaction.h"
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
/* Local functions forward declarations */
static uint32 DistributedTransactionId = 0;
/* Local functions forward declarations */
static StringInfo BuildTransactionName(int connectionId);
/*
* InitializeDistributedTransaction prepares the distributed transaction ID
* used in transaction names.
*/
void
InitializeDistributedTransaction(void)
{
DistributedTransactionId++;
}
/*
* PrepareRemoteTransactions prepares all transactions on connections in
* connectionList for commit if the 2PC commit protocol is enabled.
* On failure, it reports an error and stops.
*/
void
PrepareRemoteTransactions(List *connectionList)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int64 connectionId = transactionConnection->connectionId;
PGresult *result = NULL;
StringInfo command = makeStringInfo();
StringInfo transactionName = BuildTransactionName(connectionId);
appendStringInfo(command, "PREPARE TRANSACTION '%s'", transactionName->data);
result = PQexec(connection, command->data);
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
/* a failure to prepare is an implicit rollback */
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
WarnRemoteError(connection, result);
PQclear(result);
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("failed to prepare transaction")));
}
ereport(DEBUG2, (errmsg("sent PREPARE TRANSACTION over connection %ld",
connectionId)));
PQclear(result);
transactionConnection->transactionState = TRANSACTION_STATE_PREPARED;
}
}
/*
* AbortRemoteTransactions aborts all transactions on connections in connectionList.
* On failure, it reports a warning and continues to abort all of them.
*/
void
AbortRemoteTransactions(List *connectionList)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int64 connectionId = transactionConnection->connectionId;
PGresult *result = NULL;
if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED)
{
StringInfo command = makeStringInfo();
StringInfo transactionName = BuildTransactionName(connectionId);
appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName->data);
result = PQexec(connection, command->data);
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
/* log a warning so the user may abort the transaction later */
ereport(WARNING, (errmsg("failed to roll back prepared transaction '%s'",
transactionName->data),
errhint("Run \"%s\" on %s:%s",
command->data, nodeName, nodePort)));
}
ereport(DEBUG2, (errmsg("sent ROLLBACK over connection %ld", connectionId)));
PQclear(result);
}
else if (transactionConnection->transactionState == TRANSACTION_STATE_OPEN)
{
/* try to roll back cleanly, if it fails then we won't commit anyway */
result = PQexec(connection, "ROLLBACK");
PQclear(result);
}
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
}
}
/*
* CommitRemoteTransactions commits all transactions on connections in connectionList.
* If stopOnFailure is true, then CommitRemoteTransactions reports an error on
* failure, otherwise it reports a warning.
* Note that if the caller of this function wants the transactions to roll back
* on a failing commit, stopOnFailure should be used as true. On the other hand,
* if the caller does not want the transactions to roll back on a failing commit,
* stopOnFailure should be used as false.
*/
void
CommitRemoteTransactions(List *connectionList, bool stopOnFailure)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int64 connectionId = transactionConnection->connectionId;
PGresult *result = NULL;
if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED)
{
StringInfo command = makeStringInfo();
StringInfo transactionName = BuildTransactionName(connectionId);
/* we shouldn't be committing if any transactions are not prepared */
Assert(transactionConnection->transactionState == TRANSACTION_STATE_PREPARED);
appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName->data);
result = PQexec(connection, command->data);
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
/*
* If stopOnFailure is false, log a warning so the user may
* commit the transaction later.
*/
if (stopOnFailure)
{
ereport(ERROR, (errmsg("failed to commit prepared transaction '%s'",
transactionName->data),
errhint("Run \"%s\" on %s:%s",
command->data, nodeName, nodePort)));
}
else
{
ereport(WARNING, (errmsg("failed to commit prepared transaction '%s'",
transactionName->data),
errhint("Run \"%s\" on %s:%s",
command->data, nodeName, nodePort)));
}
}
ereport(DEBUG2, (errmsg("sent COMMIT PREPARED over connection %ld",
connectionId)));
}
else
{
/* we shouldn't be committing if any transactions are not open */
Assert(transactionConnection->transactionState == TRANSACTION_STATE_OPEN);
/*
* Try to commit, if it fails and stopOnFailure is false then
* the user might lose data.
*/
result = PQexec(connection, "COMMIT");
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
if (stopOnFailure)
{
ereport(ERROR, (errmsg("failed to commit transaction on %s:%s",
nodeName, nodePort)));
}
else
{
ereport(WARNING, (errmsg("failed to commit transaction on %s:%s",
nodeName, nodePort)));
}
}
ereport(DEBUG2, (errmsg("sent COMMIT over connection %ld", connectionId)));
}
PQclear(result);
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
}
}
/*
* BuildTransactionName constructs a transaction name that ensures there are no
* collisions with concurrent transactions by the same master node, subsequent
* transactions by the same backend, or transactions on a different shard.
*
* Collisions may occur over time if transactions fail to commit or abort and
* are left to linger. This would cause a PREPARE failure for the second
* transaction, which causes it to be rolled back. In general, the user
* should ensure that prepared transactions do not linger.
*/
static StringInfo
BuildTransactionName(int connectionId)
{
StringInfo commandString = makeStringInfo();
appendStringInfo(commandString, "citus_%d_%u_%d", MyProcPid,
DistributedTransactionId, connectionId);
return commandString;
}

View File

@ -1,341 +0,0 @@
/*-------------------------------------------------------------------------
*
* multi_shard_transaction.c
* This file contains functions for managing 1PC or 2PC transactions
* across many shard placements.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "libpq-fe.h"
#include "postgres.h"
#include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/multi_shard_transaction.h"
#include "nodes/pg_list.h"
#include "storage/ipc.h"
#include "utils/memutils.h"
#define INITIAL_CONNECTION_CACHE_SIZE 1001
/* Global variables used in commit handler */
static HTAB *shardConnectionHash = NULL;
static bool subXactAbortAttempted = false;
/* functions needed by callbacks and hooks */
static void CompleteShardPlacementTransactions(XactEvent event, void *arg);
static void MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId,
SubTransactionId parentSubid, void *arg);
/*
* OpenTransactionsToAllShardPlacements opens connections to all placements
* using the provided shard identifier list. Connections accumulate in a global
* shardConnectionHash variable for use (and re-use) within this transaction.
*/
void
OpenTransactionsToAllShardPlacements(List *shardIntervalList, char *userName)
{
ListCell *shardIntervalCell = NULL;
if (shardConnectionHash == NULL)
{
shardConnectionHash = CreateShardConnectionHash(TopTransactionContext);
}
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
BeginTransactionOnShardPlacements(shardId, userName);
}
}
/*
* CreateShardConnectionHash constructs a hash table which maps from shard
* identifier to connection lists, passing the provided MemoryContext to
* hash_create for hash allocations.
*/
HTAB *
CreateShardConnectionHash(MemoryContext memoryContext)
{
HTAB *shardConnectionsHash = NULL;
int hashFlags = 0;
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(int64);
info.entrysize = sizeof(ShardConnections);
info.hcxt = memoryContext;
hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
shardConnectionsHash = hash_create("Shard Connections Hash",
INITIAL_CONNECTION_CACHE_SIZE, &info,
hashFlags);
return shardConnectionsHash;
}
/*
* BeginTransactionOnShardPlacements opens new connections (if necessary) to
* all placements of a shard (specified by shard identifier). After sending a
* BEGIN command on all connections, they are added to shardConnectionHash for
* use within this transaction. Exits early if connections already exist for
* the specified shard, and errors if no placements can be found, a connection
* cannot be made, or if the BEGIN command fails.
*/
void
BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
{
List *shardPlacementList = NIL;
ListCell *placementCell = NULL;
ShardConnections *shardConnections = NULL;
bool shardConnectionsFound = false;
MemoryContext oldContext = NULL;
shardPlacementList = FinalizedShardPlacementList(shardId);
if (shardPlacementList == NIL)
{
/* going to have to have some placements to do any work */
ereport(ERROR, (errmsg("could not find any shard placements for the shard "
UINT64_FORMAT, shardId)));
}
/* get existing connections to the shard placements, if any */
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
if (shardConnectionsFound)
{
/* exit early if we've already established shard transactions */
return;
}
foreach(placementCell, shardPlacementList)
{
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell);
PGconn *connection = NULL;
TransactionConnection *transactionConnection = NULL;
PGresult *result = NULL;
connection = ConnectToNode(shardPlacement->nodeName, shardPlacement->nodePort,
userName);
if (connection == NULL)
{
ereport(ERROR, (errmsg("could not establish a connection to all "
"placements of shard %lu", shardId)));
}
/* entries must last through the whole top-level transaction */
oldContext = MemoryContextSwitchTo(TopTransactionContext);
transactionConnection = palloc0(sizeof(TransactionConnection));
transactionConnection->connectionId = shardConnections->shardId;
transactionConnection->transactionState = TRANSACTION_STATE_INVALID;
transactionConnection->connection = connection;
shardConnections->connectionList = lappend(shardConnections->connectionList,
transactionConnection);
MemoryContextSwitchTo(oldContext);
/* now that connection is tracked, issue BEGIN */
result = PQexec(connection, "BEGIN");
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
ReraiseRemoteError(connection, result);
}
}
}
/*
* GetShardConnections finds existing connections for a shard in the global
* connection hash. If not found, then a ShardConnections structure with empty
* connectionList is returned and the shardConnectionsFound output parameter
* will be set to false.
*/
ShardConnections *
GetShardConnections(int64 shardId, bool *shardConnectionsFound)
{
return GetShardHashConnections(shardConnectionHash, shardId, shardConnectionsFound);
}
/*
* GetShardHashConnections finds existing connections for a shard in the
* provided hash. If not found, then a ShardConnections structure with empty
* connectionList is returned.
*/
ShardConnections *
GetShardHashConnections(HTAB *connectionHash, int64 shardId, bool *connectionsFound)
{
ShardConnections *shardConnections = NULL;
shardConnections = (ShardConnections *) hash_search(connectionHash, &shardId,
HASH_ENTER, connectionsFound);
if (!*connectionsFound)
{
shardConnections->shardId = shardId;
shardConnections->connectionList = NIL;
}
return shardConnections;
}
/*
* ConnectionList flattens the connection hash to a list of placement connections.
*/
List *
ConnectionList(HTAB *connectionHash)
{
List *connectionList = NIL;
HASH_SEQ_STATUS status;
ShardConnections *shardConnections = NULL;
if (connectionHash == NULL)
{
return NIL;
}
hash_seq_init(&status, connectionHash);
shardConnections = (ShardConnections *) hash_seq_search(&status);
while (shardConnections != NULL)
{
List *shardConnectionsList = list_copy(shardConnections->connectionList);
connectionList = list_concat(connectionList, shardConnectionsList);
shardConnections = (ShardConnections *) hash_seq_search(&status);
}
return connectionList;
}
/*
* RegisterShardPlacementXactCallbacks registers transaction callbacks needed
* for multi-shard transactions.
*/
void
RegisterShardPlacementXactCallbacks(void)
{
RegisterXactCallback(CompleteShardPlacementTransactions, NULL);
RegisterSubXactCallback(MultiShardSubXactCallback, NULL);
}
/*
* CompleteShardPlacementTransactions commits or aborts pending shard placement
* transactions when the local transaction commits or aborts.
*/
static void
CompleteShardPlacementTransactions(XactEvent event, void *arg)
{
List *connectionList = ConnectionList(shardConnectionHash);
if (shardConnectionHash == NULL)
{
/* nothing to do */
return;
}
if (event == XACT_EVENT_PRE_COMMIT)
{
if (subXactAbortAttempted)
{
subXactAbortAttempted = false;
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot ROLLBACK TO SAVEPOINT in transactions "
"which modify distributed tables")));
}
/*
* Any failure here will cause local changes to be rolled back,
* and remote changes to either roll back (1PC) or, in case of
* connection or node failure, leave a prepared transaction
* (2PC).
*/
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{
PrepareRemoteTransactions(connectionList);
}
return;
}
else if (event == XACT_EVENT_COMMIT)
{
/*
* A failure here will cause some remote changes to either
* roll back (1PC) or, in case of connection or node failure,
* leave a prepared transaction (2PC). However, the local
* changes have already been committed.
*/
CommitRemoteTransactions(connectionList, false);
}
else if (event == XACT_EVENT_ABORT)
{
/*
* A failure here will cause some remote changes to either
* roll back (1PC) or, in case of connection or node failure,
* leave a prepared transaction (2PC). The local changes have
* already been rolled back.
*/
AbortRemoteTransactions(connectionList);
}
else
{
return;
}
CloseConnections(connectionList);
shardConnectionHash = NULL;
subXactAbortAttempted = false;
}
static void
MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId,
SubTransactionId parentSubid, void *arg)
{
if ((shardConnectionHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB))
{
subXactAbortAttempted = true;
}
}
/*
* CloseConnections closes all connections in connectionList.
*/
void
CloseConnections(List *connectionList)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
PQfinish(connection);
}
}

View File

@ -1,51 +0,0 @@
/*-------------------------------------------------------------------------
*
* commit_protocol.h
* Type and function declarations used in performing transactions across
* shard placements.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef COMMIT_PROTOCOL_H
#define COMMIT_PROTOCOL_H
#include "access/xact.h"
#include "distributed/connection_management.h"
#include "libpq-fe.h"
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
/* Enumeration that defines different remote transaction states */
typedef enum
{
TRANSACTION_STATE_INVALID = 0,
TRANSACTION_STATE_OPEN,
TRANSACTION_STATE_COPY_STARTED,
TRANSACTION_STATE_PREPARED,
TRANSACTION_STATE_CLOSED
} TransactionState;
/*
* TransactionConnection represents a connection to a remote node which is
* used to perform a transaction on shard placements.
*/
typedef struct TransactionConnection
{
int64 connectionId;
TransactionState transactionState;
PGconn *connection;
} TransactionConnection;
/* Functions declarations for transaction and connection management */
extern void InitializeDistributedTransaction(void);
extern void PrepareRemoteTransactions(List *connectionList);
extern void AbortRemoteTransactions(List *connectionList);
extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure);
#endif /* COMMIT_PROTOCOL_H */

View File

@ -1,39 +0,0 @@
/*-------------------------------------------------------------------------
*
* multi_shard_transaction.h
* Type and function declarations used in performing transactions across
* shard placements.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef MULTI_SHARD_TRANSACTION_H
#define MULTI_SHARD_TRANSACTION_H
#include "utils/hsearch.h"
#include "nodes/pg_list.h"
/* ShardConnections represents a set of connections for each placement of a shard */
typedef struct ShardConnections
{
int64 shardId;
List *connectionList;
} ShardConnections;
extern void OpenTransactionsToAllShardPlacements(List *shardIdList, char *relationOwner);
extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext);
extern void BeginTransactionOnShardPlacements(uint64 shardId, char *nodeUser);
extern ShardConnections * GetShardConnections(int64 shardId, bool *shardConnectionsFound);
extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId,
bool *connectionsFound);
extern List * ConnectionList(HTAB *connectionHash);
extern void CloseConnections(List *connectionList);
extern void RegisterShardPlacementXactCallbacks(void);
#endif /* MULTI_SHARD_TRANSACTION_H */