mirror of https://github.com/citusdata/citus.git
Remove remnants of commit_protocol.[ch].
parent
fd717d6da9
commit
6ec34bed84
|
@ -28,7 +28,6 @@
|
|||
#include "catalog/pg_type.h"
|
||||
#include "distributed/citus_clauses.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/commit_protocol.h"
|
||||
#include "distributed/connection_cache.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/deparse_shard_query.h"
|
||||
|
|
|
@ -32,7 +32,6 @@
|
|||
#include "commands/prepare.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/commit_protocol.h"
|
||||
#include "distributed/connection_cache.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
|
|
|
@ -26,7 +26,6 @@
|
|||
#include "commands/event_trigger.h"
|
||||
#include "distributed/citus_clauses.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/commit_protocol.h"
|
||||
#include "distributed/connection_cache.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
#include "executor/executor.h"
|
||||
#include "distributed/citus_nodefuncs.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/commit_protocol.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/multi_copy.h"
|
||||
|
|
|
@ -1,256 +0,0 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* commit_protocol.c
|
||||
* This file contains functions for managing 1PC or 2PC transactions
|
||||
* across many shard placements.
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
|
||||
#include "postgres.h"
|
||||
#include "libpq-fe.h"
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "distributed/commit_protocol.h"
|
||||
#include "distributed/connection_cache.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/multi_shard_transaction.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "nodes/pg_list.h"
|
||||
|
||||
|
||||
/* Local functions forward declarations */
|
||||
static uint32 DistributedTransactionId = 0;
|
||||
|
||||
|
||||
/*
|
||||
* InitializeDistributedTransaction prepares the distributed transaction ID
|
||||
* used in transaction names.
|
||||
*/
|
||||
void
|
||||
InitializeDistributedTransaction(void)
|
||||
{
|
||||
DistributedTransactionId++;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PrepareRemoteTransactions prepares all transactions on connections in
|
||||
* connectionList for commit if the 2PC commit protocol is enabled.
|
||||
* On failure, it reports an error and stops.
|
||||
*/
|
||||
void
|
||||
PrepareRemoteTransactions(List *connectionList)
|
||||
{
|
||||
ListCell *connectionCell = NULL;
|
||||
|
||||
foreach(connectionCell, connectionList)
|
||||
{
|
||||
TransactionConnection *transactionConnection =
|
||||
(TransactionConnection *) lfirst(connectionCell);
|
||||
PGconn *connection = transactionConnection->connection;
|
||||
int64 connectionId = transactionConnection->connectionId;
|
||||
|
||||
PGresult *result = NULL;
|
||||
StringInfo command = makeStringInfo();
|
||||
StringInfo transactionName = BuildTransactionName(connectionId);
|
||||
|
||||
appendStringInfo(command, "PREPARE TRANSACTION '%s'", transactionName->data);
|
||||
|
||||
result = PQexec(connection, command->data);
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
/* a failure to prepare is an implicit rollback */
|
||||
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
|
||||
|
||||
WarnRemoteError(connection, result);
|
||||
PQclear(result);
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
||||
errmsg("failed to prepare transaction")));
|
||||
}
|
||||
|
||||
ereport(DEBUG2, (errmsg("sent PREPARE TRANSACTION over connection %ld",
|
||||
connectionId)));
|
||||
|
||||
PQclear(result);
|
||||
|
||||
transactionConnection->transactionState = TRANSACTION_STATE_PREPARED;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AbortRemoteTransactions aborts all transactions on connections in connectionList.
|
||||
* On failure, it reports a warning and continues to abort all of them.
|
||||
*/
|
||||
void
|
||||
AbortRemoteTransactions(List *connectionList)
|
||||
{
|
||||
ListCell *connectionCell = NULL;
|
||||
|
||||
foreach(connectionCell, connectionList)
|
||||
{
|
||||
TransactionConnection *transactionConnection =
|
||||
(TransactionConnection *) lfirst(connectionCell);
|
||||
PGconn *connection = transactionConnection->connection;
|
||||
int64 connectionId = transactionConnection->connectionId;
|
||||
PGresult *result = NULL;
|
||||
|
||||
if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED)
|
||||
{
|
||||
StringInfo command = makeStringInfo();
|
||||
StringInfo transactionName = BuildTransactionName(connectionId);
|
||||
|
||||
appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName->data);
|
||||
|
||||
result = PQexec(connection, command->data);
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||
|
||||
/* log a warning so the user may abort the transaction later */
|
||||
ereport(WARNING, (errmsg("failed to roll back prepared transaction '%s'",
|
||||
transactionName->data),
|
||||
errhint("Run \"%s\" on %s:%s",
|
||||
command->data, nodeName, nodePort)));
|
||||
}
|
||||
|
||||
ereport(DEBUG2, (errmsg("sent ROLLBACK over connection %ld", connectionId)));
|
||||
|
||||
PQclear(result);
|
||||
}
|
||||
else if (transactionConnection->transactionState == TRANSACTION_STATE_OPEN)
|
||||
{
|
||||
/* try to roll back cleanly, if it fails then we won't commit anyway */
|
||||
result = PQexec(connection, "ROLLBACK");
|
||||
PQclear(result);
|
||||
}
|
||||
|
||||
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CommitRemoteTransactions commits all transactions on connections in connectionList.
|
||||
* If stopOnFailure is true, then CommitRemoteTransactions reports an error on
|
||||
* failure, otherwise it reports a warning.
|
||||
* Note that if the caller of this function wants the transactions to roll back
|
||||
* on a failing commit, stopOnFailure should be used as true. On the other hand,
|
||||
* if the caller does not want the transactions to roll back on a failing commit,
|
||||
* stopOnFailure should be used as false.
|
||||
*/
|
||||
void
|
||||
CommitRemoteTransactions(List *connectionList, bool stopOnFailure)
|
||||
{
|
||||
ListCell *connectionCell = NULL;
|
||||
|
||||
foreach(connectionCell, connectionList)
|
||||
{
|
||||
TransactionConnection *transactionConnection =
|
||||
(TransactionConnection *) lfirst(connectionCell);
|
||||
PGconn *connection = transactionConnection->connection;
|
||||
int64 connectionId = transactionConnection->connectionId;
|
||||
PGresult *result = NULL;
|
||||
|
||||
if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED)
|
||||
{
|
||||
StringInfo command = makeStringInfo();
|
||||
StringInfo transactionName = BuildTransactionName(connectionId);
|
||||
|
||||
/* we shouldn't be committing if any transactions are not prepared */
|
||||
Assert(transactionConnection->transactionState == TRANSACTION_STATE_PREPARED);
|
||||
|
||||
appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName->data);
|
||||
|
||||
result = PQexec(connection, command->data);
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||
|
||||
/*
|
||||
* If stopOnFailure is false, log a warning so the user may
|
||||
* commit the transaction later.
|
||||
*/
|
||||
if (stopOnFailure)
|
||||
{
|
||||
ereport(ERROR, (errmsg("failed to commit prepared transaction '%s'",
|
||||
transactionName->data),
|
||||
errhint("Run \"%s\" on %s:%s",
|
||||
command->data, nodeName, nodePort)));
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(WARNING, (errmsg("failed to commit prepared transaction '%s'",
|
||||
transactionName->data),
|
||||
errhint("Run \"%s\" on %s:%s",
|
||||
command->data, nodeName, nodePort)));
|
||||
}
|
||||
}
|
||||
|
||||
ereport(DEBUG2, (errmsg("sent COMMIT PREPARED over connection %ld",
|
||||
connectionId)));
|
||||
}
|
||||
else
|
||||
{
|
||||
/* we shouldn't be committing if any transactions are not open */
|
||||
Assert(transactionConnection->transactionState == TRANSACTION_STATE_OPEN);
|
||||
|
||||
/*
|
||||
* Try to commit, if it fails and stopOnFailure is false then
|
||||
* the user might lose data.
|
||||
*/
|
||||
result = PQexec(connection, "COMMIT");
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||
|
||||
if (stopOnFailure)
|
||||
{
|
||||
ereport(ERROR, (errmsg("failed to commit transaction on %s:%s",
|
||||
nodeName, nodePort)));
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(WARNING, (errmsg("failed to commit transaction on %s:%s",
|
||||
nodeName, nodePort)));
|
||||
}
|
||||
}
|
||||
|
||||
ereport(DEBUG2, (errmsg("sent COMMIT over connection %ld", connectionId)));
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
||||
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* BuildTransactionName constructs a transaction name that ensures there are no
|
||||
* collisions with concurrent transactions by the same master node, subsequent
|
||||
* transactions by the same backend, or transactions on a different shard.
|
||||
*
|
||||
* Collisions may occur over time if transactions fail to commit or abort and
|
||||
* are left to linger. This would cause a PREPARE failure for the second
|
||||
* transaction, which causes it to be rolled back. In general, the user
|
||||
* should ensure that prepared transactions do not linger.
|
||||
*/
|
||||
StringInfo
|
||||
BuildTransactionName(int connectionId)
|
||||
{
|
||||
StringInfo commandString = makeStringInfo();
|
||||
|
||||
appendStringInfo(commandString, "citus_%d_%u_%d", MyProcPid,
|
||||
DistributedTransactionId, connectionId);
|
||||
|
||||
return commandString;
|
||||
}
|
|
@ -14,7 +14,6 @@
|
|||
#include "postgres.h"
|
||||
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/commit_protocol.h"
|
||||
#include "distributed/connection_cache.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
|
@ -250,22 +249,3 @@ ResetShardPlacementTransactionState(void)
|
|||
SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CloseConnections closes all connections in connectionList.
|
||||
*/
|
||||
void
|
||||
CloseConnections(List *connectionList)
|
||||
{
|
||||
ListCell *connectionCell = NULL;
|
||||
|
||||
foreach(connectionCell, connectionList)
|
||||
{
|
||||
TransactionConnection *transactionConnection =
|
||||
(TransactionConnection *) lfirst(connectionCell);
|
||||
PGconn *connection = transactionConnection->connection;
|
||||
|
||||
CloseConnectionByPGconn(connection);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
#include <unistd.h>
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "distributed/commit_protocol.h"
|
||||
#include "distributed/connection_cache.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
|
|
|
@ -1,55 +0,0 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* commit_protocol.h
|
||||
* Type and function declarations used in performing transactions across
|
||||
* shard placements.
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef COMMIT_PROTOCOL_H
|
||||
#define COMMIT_PROTOCOL_H
|
||||
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "libpq-fe.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "nodes/pg_list.h"
|
||||
|
||||
|
||||
/* Enumeration that defines different remote transaction states */
|
||||
typedef enum
|
||||
{
|
||||
TRANSACTION_STATE_INVALID = 0,
|
||||
TRANSACTION_STATE_OPEN,
|
||||
TRANSACTION_STATE_COPY_STARTED,
|
||||
TRANSACTION_STATE_PREPARED,
|
||||
TRANSACTION_STATE_CLOSED
|
||||
} TransactionState;
|
||||
|
||||
/*
|
||||
* TransactionConnection represents a connection to a remote node which is
|
||||
* used to perform a transaction on shard placements.
|
||||
*/
|
||||
typedef struct TransactionConnection
|
||||
{
|
||||
int groupId;
|
||||
int64 connectionId;
|
||||
TransactionState transactionState;
|
||||
PGconn *connection;
|
||||
const char *nodeName;
|
||||
int nodePort;
|
||||
} TransactionConnection;
|
||||
|
||||
|
||||
/* Functions declarations for transaction and connection management */
|
||||
extern void InitializeDistributedTransaction(void);
|
||||
extern void PrepareRemoteTransactions(List *connectionList);
|
||||
extern void AbortRemoteTransactions(List *connectionList);
|
||||
extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure);
|
||||
extern StringInfo BuildTransactionName(int connectionId);
|
||||
|
||||
#endif /* COMMIT_PROTOCOL_H */
|
|
@ -33,7 +33,6 @@ extern ShardConnections * GetShardConnections(int64 shardId, bool *shardConnecti
|
|||
extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId,
|
||||
bool *connectionsFound);
|
||||
extern List * ShardConnectionList(HTAB *connectionHash);
|
||||
extern void CloseConnections(List *connectionList);
|
||||
extern void ResetShardPlacementTransactionState(void);
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue