Merge pull request #1057 from citusdata/bugfix/add_node_failure

Convert worker_transactions to new connection API
pull/1031/head
Marco Slot 2016-12-23 16:30:40 +01:00 committed by GitHub
commit 25e28763b2
12 changed files with 346 additions and 450 deletions

View File

@ -308,6 +308,43 @@ GetConnectionFromPGconn(struct pg_conn *pqConn)
} }
/*
* CloseNodeConnections closes all the connections to a particular node.
* This is mainly used when a worker leaves the cluster
*/
void
CloseNodeConnections(char *nodeName, int nodePort)
{
HASH_SEQ_STATUS status;
ConnectionHashEntry *entry;
hash_seq_init(&status, ConnectionHash);
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
{
dlist_head *connections = entry->connections;
if (strcmp(entry->key.hostname, nodeName) != 0 || entry->key.port != nodePort)
{
continue;
}
while (!dlist_is_empty(connections))
{
dlist_node *currentNode = dlist_pop_head_node(connections);
MultiConnection *connection =
dlist_container(MultiConnection, connectionNode, currentNode);
/* same for transaction state */
CloseRemoteTransaction(connection);
/* we leave the per-host entry alive */
pfree(connection);
}
}
}
/* /*
* Close a previously established connection. * Close a previously established connection.
*/ */

View File

@ -187,6 +187,35 @@ LogRemoteCommand(MultiConnection *connection, const char *command)
/* wrappers around libpq functions, with command logging support */ /* wrappers around libpq functions, with command logging support */
/*
* ExecuteCriticalRemoteCommand executes a remote command that is critical
* to the transaction. If the command fails then the transaction aborts.
*/
void
ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command)
{
int querySent = 0;
PGresult *result = NULL;
bool raiseInterrupts = true;
querySent = SendRemoteCommand(connection, command);
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
ForgetResults(connection);
}
/* /*
* SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and * SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and
* accepts a MultiConnection instead of a plain PGconn. It makes sure it can * accepts a MultiConnection instead of a plain PGconn. It makes sure it can
@ -194,7 +223,9 @@ LogRemoteCommand(MultiConnection *connection, const char *command)
* an additional memory allocation). * an additional memory allocation).
*/ */
int int
SendRemoteCommand(MultiConnection *connection, const char *command) SendRemoteCommandParams(MultiConnection *connection, const char *command,
int parameterCount, const Oid *parameterTypes,
const char *const *parameterValues)
{ {
PGconn *pgConn = connection->pgConn; PGconn *pgConn = connection->pgConn;
bool wasNonblocking = PQisnonblocking(pgConn); bool wasNonblocking = PQisnonblocking(pgConn);
@ -208,7 +239,8 @@ SendRemoteCommand(MultiConnection *connection, const char *command)
PQsetnonblocking(pgConn, true); PQsetnonblocking(pgConn, true);
} }
rc = PQsendQuery(pgConn, command); rc = PQsendQueryParams(pgConn, command, parameterCount, parameterTypes,
parameterValues, NULL, NULL, 0);
/* reset nonblocking connection to its original state */ /* reset nonblocking connection to its original state */
if (!wasNonblocking) if (!wasNonblocking)
@ -220,6 +252,19 @@ SendRemoteCommand(MultiConnection *connection, const char *command)
} }
/*
* SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and
* accepts a MultiConnection instead of a plain PGconn. It makes sure it can
* send commands asynchronously without blocking (at the potential expense of
* an additional memory allocation).
*/
int
SendRemoteCommand(MultiConnection *connection, const char *command)
{
return SendRemoteCommandParams(connection, command, 0, NULL, NULL);
}
/* /*
* GetCommandResult is a wrapper around PQgetResult() that handles interrupts. * GetCommandResult is a wrapper around PQgetResult() that handles interrupts.
* *

View File

@ -244,7 +244,7 @@ InitTransactionStateForTask(Task *task)
{ {
ListCell *placementCell = NULL; ListCell *placementCell = NULL;
BeginCoordinatedTransaction(); BeginOrContinueCoordinatedTransaction();
xactParticipantHash = CreateXactParticipantHash(); xactParticipantHash = CreateXactParticipantHash();

View File

@ -16,9 +16,12 @@
#include "access/xact.h" #include "access/xact.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/transaction_management.h" #include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h" #include "distributed/remote_transaction.h"
#include "distributed/transaction_management.h"
#include "distributed/transaction_recovery.h"
#include "distributed/worker_manager.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
@ -376,6 +379,7 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection)
RemoteTransaction *transaction = &connection->remoteTransaction; RemoteTransaction *transaction = &connection->remoteTransaction;
StringInfoData command; StringInfoData command;
const bool raiseErrors = true; const bool raiseErrors = true;
WorkerNode *workerNode = NULL;
/* can't prepare a nonexistant transaction */ /* can't prepare a nonexistant transaction */
Assert(transaction->transactionState != REMOTE_TRANS_INVALID); Assert(transaction->transactionState != REMOTE_TRANS_INVALID);
@ -388,6 +392,13 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection)
Assign2PCIdentifier(connection); Assign2PCIdentifier(connection);
/* log transactions to workers in pg_dist_transaction */
workerNode = FindWorkerNode(connection->hostname, connection->port);
if (workerNode != NULL)
{
LogTransactionRecord(workerNode->groupId, transaction->preparedName);
}
initStringInfo(&command); initStringInfo(&command);
appendStringInfo(&command, "PREPARE TRANSACTION '%s'", appendStringInfo(&command, "PREPARE TRANSACTION '%s'",
transaction->preparedName); transaction->preparedName);
@ -826,7 +837,7 @@ Assign2PCIdentifier(MultiConnection *connection)
{ {
static uint64 sequence = 0; static uint64 sequence = 0;
snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN, snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN,
"citus_%d_"UINT64_FORMAT, "citus_%d_%d_"UINT64_FORMAT, GetLocalGroupId(),
MyProcPid, sequence++); MyProcPid, sequence++);
} }

View File

@ -45,7 +45,6 @@ PG_FUNCTION_INFO_V1(recover_prepared_transactions);
/* Local functions forward declarations */ /* Local functions forward declarations */
static void LogTransactionRecord(int groupId, char *transactionName);
static int RecoverPreparedTransactions(void); static int RecoverPreparedTransactions(void);
static int RecoverWorkerTransactions(WorkerNode *workerNode); static int RecoverWorkerTransactions(WorkerNode *workerNode);
static List * NameListDifference(List *nameList, List *subtractList); static List * NameListDifference(List *nameList, List *subtractList);
@ -106,7 +105,7 @@ LogPreparedTransactions(List *connectionList)
* prepared on a worker. The presence of this record indicates that the * prepared on a worker. The presence of this record indicates that the
* prepared transaction should be committed. * prepared transaction should be committed.
*/ */
static void void
LogTransactionRecord(int groupId, char *transactionName) LogTransactionRecord(int groupId, char *transactionName)
{ {
Relation pgDistTransaction = NULL; Relation pgDistTransaction = NULL;

View File

@ -20,9 +20,11 @@
#include "access/xact.h" #include "access/xact.h"
#include "distributed/commit_protocol.h" #include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
#include "distributed/connection_management.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/remote_commands.h"
#include "distributed/pg_dist_node.h" #include "distributed/pg_dist_node.h"
#include "distributed/pg_dist_transaction.h" #include "distributed/pg_dist_transaction.h"
#include "distributed/transaction_recovery.h" #include "distributed/transaction_recovery.h"
@ -31,42 +33,6 @@
#include "utils/memutils.h" #include "utils/memutils.h"
/* Local functions forward declarations */
static void EnableXactCallback(void);
static void CompleteWorkerTransactions(XactEvent event, void *arg);
static List * OpenWorkerTransactions(void);
static TransactionConnection * GetWorkerTransaction(char *nodeName, int32 nodePort);
static List * GetTargetWorkerTransactions(TargetWorkerSet targetWorkerSet);
static bool IsResponseOK(ExecStatusType resultStatus);
/* Global worker connection list */
static List *workerConnectionList = NIL;
static bool isXactCallbackRegistered = false;
/*
* GetWorkerTransactions opens connections to all workers and starts
* a transaction block that is committed or aborted when the local
* transaction commits or aborts. Multiple invocations of
* GetWorkerTransactions within the same transaction will return
* the same list of connections.
*/
List *
GetWorkerTransactions(void)
{
if (workerConnectionList == NIL)
{
InitializeDistributedTransaction();
EnableXactCallback();
workerConnectionList = OpenWorkerTransactions();
}
return workerConnectionList;
}
/* /*
* SendCommandToWorker sends a command to a particular worker as part of the * SendCommandToWorker sends a command to a particular worker as part of the
* 2PC. * 2PC.
@ -74,25 +40,19 @@ GetWorkerTransactions(void)
void void
SendCommandToWorker(char *nodeName, int32 nodePort, char *command) SendCommandToWorker(char *nodeName, int32 nodePort, char *command)
{ {
TransactionConnection *transactionConnection = NULL; MultiConnection *transactionConnection = NULL;
PGresult *queryResult = NULL; char *nodeUser = CitusExtensionOwnerName();
ExecStatusType resultStatus = PGRES_EMPTY_QUERY; int connectionFlags = 0;
transactionConnection = GetWorkerTransaction(nodeName, nodePort); BeginOrContinueCoordinatedTransaction();
if (transactionConnection == NULL) CoordinatedTransactionUse2PC();
{
ereport(ERROR, (errmsg("worker %s:%d is not part of current transaction",
nodeName, nodePort)));
}
queryResult = PQexec(transactionConnection->connection, command); transactionConnection = GetNodeUserDatabaseConnection(connectionFlags, nodeName,
resultStatus = PQresultStatus(queryResult); nodePort, nodeUser, NULL);
if (resultStatus != PGRES_COMMAND_OK && resultStatus != PGRES_TUPLES_OK)
{
ReraiseRemoteError(transactionConnection->connection, queryResult);
}
PQclear(queryResult); MarkRemoteTransactionCritical(transactionConnection);
RemoteTransactionBeginIfNecessary(transactionConnection);
ExecuteCriticalRemoteCommand(transactionConnection, command);
} }
@ -122,42 +82,74 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command,
int parameterCount, const Oid *parameterTypes, int parameterCount, const Oid *parameterTypes,
const char *const *parameterValues) const char *const *parameterValues)
{ {
List *connectionList = NIL;
ListCell *connectionCell = NULL; ListCell *connectionCell = NULL;
List *targetConnectionList = GetTargetWorkerTransactions(targetWorkerSet); List *workerNodeList = WorkerNodeList();
ListCell *workerNodeCell = NULL;
char *nodeUser = CitusExtensionOwnerName();
foreach(connectionCell, targetConnectionList) BeginOrContinueCoordinatedTransaction();
CoordinatedTransactionUse2PC();
/* open connections in parallel */
foreach(workerNodeCell, workerNodeList)
{ {
TransactionConnection *transactionConnection = WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
(TransactionConnection *) lfirst(connectionCell); char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
MultiConnection *connection = NULL;
int connectionFlags = 0;
PGconn *connection = transactionConnection->connection; if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata)
{
continue;
}
int querySent = PQsendQueryParams(connection, command, parameterCount, connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
parameterTypes, parameterValues, NULL, NULL, 0); nodeUser, NULL);
MarkRemoteTransactionCritical(connection);
connectionList = lappend(connectionList, connection);
}
/* finish opening connections */
foreach(connectionCell, connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
FinishConnectionEstablishment(connection);
}
RemoteTransactionsBeginIfNecessary(connectionList);
/* send commands in parallel */
foreach(connectionCell, connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
int querySent = SendRemoteCommandParams(connection, command, parameterCount,
parameterTypes, parameterValues);
if (querySent == 0) if (querySent == 0)
{ {
ReraiseRemoteError(connection, NULL); ReportConnectionError(connection, ERROR);
} }
} }
foreach(connectionCell, targetConnectionList) /* get results */
foreach(connectionCell, connectionList)
{ {
TransactionConnection *transactionConnection = MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection; PGresult *result = GetRemoteCommandResult(connection, true);
PGresult *result = PQgetResult(connection); if (!IsResponseOK(result))
ExecStatusType resultStatus = PQresultStatus(result);
if (!IsResponseOK(resultStatus))
{ {
ReraiseRemoteError(connection, result); ReportResultError(connection, result, ERROR);
} }
PQclear(result); PQclear(result);
/* clear NULL result */ ForgetResults(connection);
PQgetResult(connection);
} }
} }
@ -172,9 +164,9 @@ void
SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char *nodeUser, SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char *nodeUser,
List *commandList) List *commandList)
{ {
PGconn *workerConnection = NULL; MultiConnection *workerConnection = NULL;
PGresult *queryResult = NULL;
ListCell *commandCell = NULL; ListCell *commandCell = NULL;
int connectionFlags = FORCE_NEW_CONNECTION;
if (XactModificationLevel > XACT_MODIFICATION_NONE) if (XactModificationLevel > XACT_MODIFICATION_NONE)
{ {
@ -183,376 +175,20 @@ SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char
"command within a transaction"))); "command within a transaction")));
} }
workerConnection = ConnectToNode(nodeName, nodePort, nodeUser); workerConnection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
if (workerConnection == NULL) nodeUser, NULL);
MarkRemoteTransactionCritical(workerConnection);
RemoteTransactionBegin(workerConnection);
/* iterate over the commands and execute them in the same connection */
foreach(commandCell, commandList)
{ {
ereport(ERROR, (errmsg("could not open connection to %s:%d as %s", char *commandString = lfirst(commandCell);
nodeName, nodePort, nodeUser)));
ExecuteCriticalRemoteCommand(workerConnection, commandString);
} }
PG_TRY(); RemoteTransactionCommit(workerConnection);
{ CloseConnection(workerConnection);
/* start the transaction on the worker node */
queryResult = PQexec(workerConnection, "BEGIN");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{
ReraiseRemoteError(workerConnection, queryResult);
}
PQclear(queryResult);
/* iterate over the commands and execute them in the same connection */
foreach(commandCell, commandList)
{
char *commandString = lfirst(commandCell);
ExecStatusType resultStatus = PGRES_EMPTY_QUERY;
CHECK_FOR_INTERRUPTS();
queryResult = PQexec(workerConnection, commandString);
resultStatus = PQresultStatus(queryResult);
if (!(resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK ||
resultStatus == PGRES_COMMAND_OK))
{
ReraiseRemoteError(workerConnection, queryResult);
}
PQclear(queryResult);
}
/* commit the transaction on the worker node */
queryResult = PQexec(workerConnection, "COMMIT");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{
ReraiseRemoteError(workerConnection, queryResult);
}
PQclear(queryResult);
/* clear NULL result */
PQgetResult(workerConnection);
/* we no longer need this connection */
CloseConnectionByPGconn(workerConnection);
}
PG_CATCH();
{
/* close the connection */
CloseConnectionByPGconn(workerConnection);
PG_RE_THROW();
}
PG_END_TRY();
}
/*
* IsWorkerTransactionActive returns true if there exists any on going
* worker transactions.
*/
bool
IsWorkerTransactionActive(void)
{
bool isWorkerTransactionActive = false;
if (workerConnectionList != NIL)
{
isWorkerTransactionActive = true;
}
return isWorkerTransactionActive;
}
/*
* RemoveWorkerTransaction removes the transaction connection to the specified node from
* the transaction connection list.
*/
void
RemoveWorkerTransaction(char *nodeName, int32 nodePort)
{
TransactionConnection *transactionConnection =
GetWorkerTransaction(nodeName, nodePort);
/* transactionConnection = NULL if the worker transactions have not opened before */
if (transactionConnection != NULL)
{
PGconn *connection = transactionConnection->connection;
/* closing the connection will rollback all uncommited transactions */
CloseConnectionByPGconn(connection);
workerConnectionList = list_delete(workerConnectionList, transactionConnection);
}
}
/*
* EnableXactCallback registers the CompleteWorkerTransactions function as the callback
* of the worker transactions.
*/
static void
EnableXactCallback(void)
{
if (!isXactCallbackRegistered)
{
RegisterXactCallback(CompleteWorkerTransactions, NULL);
isXactCallbackRegistered = true;
}
}
/*
* CompleteWorkerTransaction commits or aborts pending worker transactions
* when the local transaction commits or aborts.
*/
static void
CompleteWorkerTransactions(XactEvent event, void *arg)
{
if (workerConnectionList == NIL)
{
/* nothing to do */
return;
}
else if (event == XACT_EVENT_PRE_COMMIT)
{
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{
/*
* Any failure here will cause local changes to be rolled back,
* and may leave a prepared transaction on the remote node.
*/
PrepareRemoteTransactions(workerConnectionList);
/*
* We are now ready to commit the local transaction, followed
* by the remote transaction. As a final step, write commit
* records to a table. If there is a last-minute crash
* on the local machine, then the absence of these records
* will indicate that the remote transactions should be rolled
* back. Otherwise, the presence of these records indicates
* that the remote transactions should be committed.
*/
LogPreparedTransactions(workerConnectionList);
}
return;
}
else if (event == XACT_EVENT_COMMIT)
{
/*
* A failure here may cause some prepared transactions to be
* left pending. However, the local change have already been
* committed and a commit record exists to indicate that the
* remote transaction should be committed as well.
*/
CommitRemoteTransactions(workerConnectionList, false);
/*
* At this point, it is safe to remove the transaction records
* for all commits that have succeeded. However, we are no
* longer in a transaction and therefore cannot make changes
* to the metadata.
*/
}
else if (event == XACT_EVENT_ABORT)
{
/*
* A failure here may cause some prepared transactions to be
* left pending. The local changes have already been rolled
* back and the absence of a commit record indicates that
* the remote transaction should be rolled back as well.
*/
AbortRemoteTransactions(workerConnectionList);
}
else if (event == XACT_EVENT_PREPARE || event == XACT_EVENT_PRE_PREPARE)
{
/*
* If we allow a prepare we might not get to the commit handler
* in this session. We could resolve that if we intercept
* COMMIT/ABORT PREPARED commands. For now, we just error out.
*/
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot prepare a transaction that modified "
"distributed tables")));
}
else
{
return;
}
CloseConnections(workerConnectionList);
/*
* Memory allocated in workerConnectionList will be reclaimed when
* TopTransactionContext is released.
*/
workerConnectionList = NIL;
}
/*
* OpenWorkerTransactions opens connections to all primary workers and sends
* BEGIN commands. The returned TransactionConnection's are allocated in the
* top transaction context, such that they can still be used in the commit
* handler. The connections are made as the extension owner, such that they
* have write access to the Citus metadata tables.
*/
static List *
OpenWorkerTransactions(void)
{
ListCell *workerNodeCell = NULL;
List *connectionList = NIL;
MemoryContext oldContext = NULL;
List *workerList = NIL;
/*
* A new node addition might be in progress which will invalidate the
* worker list. The following statement blocks until the node addition and
* metadata syncing finishes after which we reload the worker list.
* It also ensures that no new node addition and metadata synchronization
* will run until this transaction finishes.
*/
LockMetadataSnapshot(AccessShareLock);
workerList = WorkerNodeList();
oldContext = MemoryContextSwitchTo(TopTransactionContext);
foreach(workerNodeCell, workerList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
char *nodeUser = CitusExtensionOwnerName();
char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
PGconn *connection = NULL;
TransactionConnection *transactionConnection = NULL;
PGresult *result = NULL;
connection = ConnectToNode(nodeName, nodePort, nodeUser);
if (connection == NULL)
{
ereport(ERROR, (errmsg("could not open connection to %s:%d as %s",
nodeName, nodePort, nodeUser)));
}
result = PQexec(connection, "BEGIN");
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
ReraiseRemoteError(connection, result);
}
PQclear(result);
transactionConnection = palloc0(sizeof(TransactionConnection));
transactionConnection->groupId = workerNode->groupId;
transactionConnection->connectionId = 0;
transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
transactionConnection->connection = connection;
transactionConnection->nodeName = pstrdup(nodeName);
transactionConnection->nodePort = nodePort;
connectionList = lappend(connectionList, transactionConnection);
}
MemoryContextSwitchTo(oldContext);
return connectionList;
}
/*
* GetNodeTransactionConnection finds the opened connection for the specified
* node. Note that it opens transaction connections to all workers, by
* calling GetWorkerTransactions therefore, it is suggested to use this
* function in operations that sends commands to all workers inside a
* distributed transaction.
*
* GetNodeTransactionConnection returns NULL, if the node with the specified
* nodeName and nodePort is not found. Note that this worker may open
* connections to all workers if there were not open already.
*/
static TransactionConnection *
GetWorkerTransaction(char *nodeName, int32 nodePort)
{
List *connectionList = NIL;
ListCell *connectionCell = NULL;
TransactionConnection *workerTransaction = NULL;
connectionList = GetWorkerTransactions();
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
if (strcmp(transactionConnection->nodeName, nodeName) == 0 &&
transactionConnection->nodePort == nodePort)
{
workerTransaction = transactionConnection;
break;
}
}
return workerTransaction;
}
/*
* GetTargetWorkerTransactions returns a subset of all worker transactions
* matching the given target worker set.
*/
static List *
GetTargetWorkerTransactions(TargetWorkerSet targetWorkerSet)
{
List *allWorkerConnectionsList = GetWorkerTransactions();
List *targetConnectionList = NIL;
ListCell *connectionCell = NULL;
if (targetWorkerSet == WORKERS_WITH_METADATA)
{
foreach(connectionCell, allWorkerConnectionsList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
char *nodeName = pstrdup(transactionConnection->nodeName);
int nodePort = transactionConnection->nodePort;
WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort);
if (workerNode->hasMetadata)
{
targetConnectionList = lappend(targetConnectionList,
transactionConnection);
}
}
}
else
{
targetConnectionList = allWorkerConnectionsList;
}
return targetConnectionList;
}
/*
* IsResponseOK checks the resultStatus and returns true if the status is OK.
*/
static bool
IsResponseOK(ExecStatusType resultStatus)
{
if (resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK ||
resultStatus == PGRES_COMMAND_OK)
{
return true;
}
return false;
} }

View File

@ -23,6 +23,7 @@
#include "access/xact.h" #include "access/xact.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "commands/sequence.h" #include "commands/sequence.h"
#include "distributed/connection_management.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -116,7 +117,8 @@ master_remove_node(PG_FUNCTION_ARGS)
nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
RemoveWorkerTransaction(nodeNameString, nodePort); /* make sure we don't have any open connections */
CloseNodeConnections(nodeNameString, nodePort);
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand); SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand);

View File

@ -124,6 +124,7 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
const char *user, const char *user,
const char *database); const char *database);
extern MultiConnection * GetConnectionFromPGconn(struct pg_conn *pqConn); extern MultiConnection * GetConnectionFromPGconn(struct pg_conn *pqConn);
extern void CloseNodeConnections(char *nodeName, int nodePort);
extern void CloseConnection(MultiConnection *connection); extern void CloseConnection(MultiConnection *connection);
extern void CloseConnectionByPGconn(struct pg_conn *pqConn); extern void CloseConnectionByPGconn(struct pg_conn *pqConn);

View File

@ -32,7 +32,12 @@ extern void ReportResultError(MultiConnection *connection, struct pg_result *res
extern void LogRemoteCommand(MultiConnection *connection, const char *command); extern void LogRemoteCommand(MultiConnection *connection, const char *command);
/* wrappers around libpq functions, with command logging support */ /* wrappers around libpq functions, with command logging support */
extern void ExecuteCriticalRemoteCommand(MultiConnection *connection,
const char *command);
extern int SendRemoteCommand(MultiConnection *connection, const char *command); extern int SendRemoteCommand(MultiConnection *connection, const char *command);
extern int SendRemoteCommandParams(MultiConnection *connection, const char *command,
int parameterCount, const Oid *parameterTypes,
const char *const *parameterValues);
extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection, extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection,
bool raiseInterrupts); bool raiseInterrupts);

View File

@ -17,6 +17,7 @@
/* Functions declarations for worker transactions */ /* Functions declarations for worker transactions */
extern void LogPreparedTransactions(List *connectionList); extern void LogPreparedTransactions(List *connectionList);
extern void LogTransactionRecord(int groupId, char *transactionName);
#endif /* TRANSACTION_RECOVERY_H */ #endif /* TRANSACTION_RECOVERY_H */

View File

@ -60,3 +60,114 @@ SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit';
1 1
(1 row) (1 row)
\c - - - :master_port
SET citus.shard_replication_factor TO 2;
SET citus.shard_count TO 2;
SET citus.multi_shard_commit_protocol TO '2pc';
CREATE TABLE test_recovery (x text);
SELECT create_distributed_table('test_recovery', 'x');
create_distributed_table
--------------------------
(1 row)
INSERT INTO test_recovery VALUES ('hello');
-- Committed DDL commands should write 4 transaction recovery records
BEGIN;
ALTER TABLE test_recovery ADD COLUMN y text;
ROLLBACK;
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
ALTER TABLE test_recovery ADD COLUMN y text;
SELECT count(*) FROM pg_dist_transaction;
count
-------
4
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
-- Committed master_modify_multiple_shards should write 4 transaction recovery records
BEGIN;
SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$);
master_modify_multiple_shards
-------------------------------
1
(1 row)
ROLLBACK;
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$);
master_modify_multiple_shards
-------------------------------
1
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
-------
4
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
-- Committed INSERT..SELECT should write 4 transaction recovery records
BEGIN;
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
ROLLBACK;
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
SELECT count(*) FROM pg_dist_transaction;
count
-------
4
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
\c - - - :master_port
DROP TABLE test_recovery;

View File

@ -35,3 +35,51 @@ SELECT count(*) FROM pg_dist_transaction;
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort'; SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort';
SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit'; SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit';
\c - - - :master_port
SET citus.shard_replication_factor TO 2;
SET citus.shard_count TO 2;
SET citus.multi_shard_commit_protocol TO '2pc';
CREATE TABLE test_recovery (x text);
SELECT create_distributed_table('test_recovery', 'x');
INSERT INTO test_recovery VALUES ('hello');
-- Committed DDL commands should write 4 transaction recovery records
BEGIN;
ALTER TABLE test_recovery ADD COLUMN y text;
ROLLBACK;
SELECT count(*) FROM pg_dist_transaction;
ALTER TABLE test_recovery ADD COLUMN y text;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
SELECT count(*) FROM pg_dist_transaction;
-- Committed master_modify_multiple_shards should write 4 transaction recovery records
BEGIN;
SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$);
ROLLBACK;
SELECT count(*) FROM pg_dist_transaction;
SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$);
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
SELECT count(*) FROM pg_dist_transaction;
-- Committed INSERT..SELECT should write 4 transaction recovery records
BEGIN;
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
ROLLBACK;
SELECT count(*) FROM pg_dist_transaction;
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
SELECT count(*) FROM pg_dist_transaction;
\c - - - :master_port
DROP TABLE test_recovery;