diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index df2a96d9f..14e270aa6 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -308,6 +308,43 @@ GetConnectionFromPGconn(struct pg_conn *pqConn) } +/* + * CloseNodeConnections closes all the connections to a particular node. + * This is mainly used when a worker leaves the cluster + */ +void +CloseNodeConnections(char *nodeName, int nodePort) +{ + HASH_SEQ_STATUS status; + ConnectionHashEntry *entry; + + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + dlist_head *connections = entry->connections; + + if (strcmp(entry->key.hostname, nodeName) != 0 || entry->key.port != nodePort) + { + continue; + } + + while (!dlist_is_empty(connections)) + { + dlist_node *currentNode = dlist_pop_head_node(connections); + + MultiConnection *connection = + dlist_container(MultiConnection, connectionNode, currentNode); + + /* same for transaction state */ + CloseRemoteTransaction(connection); + + /* we leave the per-host entry alive */ + pfree(connection); + } + } +} + + /* * Close a previously established connection. */ diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 6c5658988..d17a44d5a 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -187,6 +187,35 @@ LogRemoteCommand(MultiConnection *connection, const char *command) /* wrappers around libpq functions, with command logging support */ + +/* + * ExecuteCriticalRemoteCommand executes a remote command that is critical + * to the transaction. If the command fails then the transaction aborts. + */ +void +ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command) +{ + int querySent = 0; + PGresult *result = NULL; + bool raiseInterrupts = true; + + querySent = SendRemoteCommand(connection, command); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + + result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + + PQclear(result); + ForgetResults(connection); +} + + /* * SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and * accepts a MultiConnection instead of a plain PGconn. It makes sure it can @@ -194,7 +223,9 @@ LogRemoteCommand(MultiConnection *connection, const char *command) * an additional memory allocation). */ int -SendRemoteCommand(MultiConnection *connection, const char *command) +SendRemoteCommandParams(MultiConnection *connection, const char *command, + int parameterCount, const Oid *parameterTypes, + const char *const *parameterValues) { PGconn *pgConn = connection->pgConn; bool wasNonblocking = PQisnonblocking(pgConn); @@ -208,7 +239,8 @@ SendRemoteCommand(MultiConnection *connection, const char *command) PQsetnonblocking(pgConn, true); } - rc = PQsendQuery(pgConn, command); + rc = PQsendQueryParams(pgConn, command, parameterCount, parameterTypes, + parameterValues, NULL, NULL, 0); /* reset nonblocking connection to its original state */ if (!wasNonblocking) @@ -220,6 +252,19 @@ SendRemoteCommand(MultiConnection *connection, const char *command) } +/* + * SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and + * accepts a MultiConnection instead of a plain PGconn. It makes sure it can + * send commands asynchronously without blocking (at the potential expense of + * an additional memory allocation). + */ +int +SendRemoteCommand(MultiConnection *connection, const char *command) +{ + return SendRemoteCommandParams(connection, command, 0, NULL, NULL); +} + + /* * GetCommandResult is a wrapper around PQgetResult() that handles interrupts. * diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 9d3f99541..39a3bdfca 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -244,7 +244,7 @@ InitTransactionStateForTask(Task *task) { ListCell *placementCell = NULL; - BeginCoordinatedTransaction(); + BeginOrContinueCoordinatedTransaction(); xactParticipantHash = CreateXactParticipantHash(); diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index e9f7d9af6..01bfe0eea 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -16,9 +16,12 @@ #include "access/xact.h" #include "distributed/connection_management.h" -#include "distributed/transaction_management.h" +#include "distributed/metadata_cache.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" +#include "distributed/transaction_management.h" +#include "distributed/transaction_recovery.h" +#include "distributed/worker_manager.h" #include "utils/hsearch.h" @@ -376,6 +379,7 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection) RemoteTransaction *transaction = &connection->remoteTransaction; StringInfoData command; const bool raiseErrors = true; + WorkerNode *workerNode = NULL; /* can't prepare a nonexistant transaction */ Assert(transaction->transactionState != REMOTE_TRANS_INVALID); @@ -388,6 +392,13 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection) Assign2PCIdentifier(connection); + /* log transactions to workers in pg_dist_transaction */ + workerNode = FindWorkerNode(connection->hostname, connection->port); + if (workerNode != NULL) + { + LogTransactionRecord(workerNode->groupId, transaction->preparedName); + } + initStringInfo(&command); appendStringInfo(&command, "PREPARE TRANSACTION '%s'", transaction->preparedName); @@ -826,7 +837,7 @@ Assign2PCIdentifier(MultiConnection *connection) { static uint64 sequence = 0; snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN, - "citus_%d_"UINT64_FORMAT, + "citus_%d_%d_"UINT64_FORMAT, GetLocalGroupId(), MyProcPid, sequence++); } diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 9c59d57af..88e4e25d1 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -45,7 +45,6 @@ PG_FUNCTION_INFO_V1(recover_prepared_transactions); /* Local functions forward declarations */ -static void LogTransactionRecord(int groupId, char *transactionName); static int RecoverPreparedTransactions(void); static int RecoverWorkerTransactions(WorkerNode *workerNode); static List * NameListDifference(List *nameList, List *subtractList); @@ -106,7 +105,7 @@ LogPreparedTransactions(List *connectionList) * prepared on a worker. The presence of this record indicates that the * prepared transaction should be committed. */ -static void +void LogTransactionRecord(int groupId, char *transactionName) { Relation pgDistTransaction = NULL; diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index faca72802..8fd0e5395 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -20,9 +20,11 @@ #include "access/xact.h" #include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/multi_shard_transaction.h" #include "distributed/resource_lock.h" +#include "distributed/remote_commands.h" #include "distributed/pg_dist_node.h" #include "distributed/pg_dist_transaction.h" #include "distributed/transaction_recovery.h" @@ -31,42 +33,6 @@ #include "utils/memutils.h" -/* Local functions forward declarations */ -static void EnableXactCallback(void); -static void CompleteWorkerTransactions(XactEvent event, void *arg); -static List * OpenWorkerTransactions(void); -static TransactionConnection * GetWorkerTransaction(char *nodeName, int32 nodePort); -static List * GetTargetWorkerTransactions(TargetWorkerSet targetWorkerSet); -static bool IsResponseOK(ExecStatusType resultStatus); - - -/* Global worker connection list */ -static List *workerConnectionList = NIL; -static bool isXactCallbackRegistered = false; - - -/* - * GetWorkerTransactions opens connections to all workers and starts - * a transaction block that is committed or aborted when the local - * transaction commits or aborts. Multiple invocations of - * GetWorkerTransactions within the same transaction will return - * the same list of connections. - */ -List * -GetWorkerTransactions(void) -{ - if (workerConnectionList == NIL) - { - InitializeDistributedTransaction(); - EnableXactCallback(); - - workerConnectionList = OpenWorkerTransactions(); - } - - return workerConnectionList; -} - - /* * SendCommandToWorker sends a command to a particular worker as part of the * 2PC. @@ -74,25 +40,19 @@ GetWorkerTransactions(void) void SendCommandToWorker(char *nodeName, int32 nodePort, char *command) { - TransactionConnection *transactionConnection = NULL; - PGresult *queryResult = NULL; - ExecStatusType resultStatus = PGRES_EMPTY_QUERY; + MultiConnection *transactionConnection = NULL; + char *nodeUser = CitusExtensionOwnerName(); + int connectionFlags = 0; - transactionConnection = GetWorkerTransaction(nodeName, nodePort); - if (transactionConnection == NULL) - { - ereport(ERROR, (errmsg("worker %s:%d is not part of current transaction", - nodeName, nodePort))); - } + BeginOrContinueCoordinatedTransaction(); + CoordinatedTransactionUse2PC(); - queryResult = PQexec(transactionConnection->connection, command); - resultStatus = PQresultStatus(queryResult); - if (resultStatus != PGRES_COMMAND_OK && resultStatus != PGRES_TUPLES_OK) - { - ReraiseRemoteError(transactionConnection->connection, queryResult); - } + transactionConnection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, + nodePort, nodeUser, NULL); - PQclear(queryResult); + MarkRemoteTransactionCritical(transactionConnection); + RemoteTransactionBeginIfNecessary(transactionConnection); + ExecuteCriticalRemoteCommand(transactionConnection, command); } @@ -122,42 +82,74 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command, int parameterCount, const Oid *parameterTypes, const char *const *parameterValues) { + List *connectionList = NIL; ListCell *connectionCell = NULL; - List *targetConnectionList = GetTargetWorkerTransactions(targetWorkerSet); + List *workerNodeList = WorkerNodeList(); + ListCell *workerNodeCell = NULL; + char *nodeUser = CitusExtensionOwnerName(); - foreach(connectionCell, targetConnectionList) + BeginOrContinueCoordinatedTransaction(); + CoordinatedTransactionUse2PC(); + + /* open connections in parallel */ + foreach(workerNodeCell, workerNodeList) { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + MultiConnection *connection = NULL; + int connectionFlags = 0; - PGconn *connection = transactionConnection->connection; + if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata) + { + continue; + } - int querySent = PQsendQueryParams(connection, command, parameterCount, - parameterTypes, parameterValues, NULL, NULL, 0); + connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, + nodeUser, NULL); + + MarkRemoteTransactionCritical(connection); + + connectionList = lappend(connectionList, connection); + } + + /* finish opening connections */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + + FinishConnectionEstablishment(connection); + } + + RemoteTransactionsBeginIfNecessary(connectionList); + + /* send commands in parallel */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + + int querySent = SendRemoteCommandParams(connection, command, parameterCount, + parameterTypes, parameterValues); if (querySent == 0) { - ReraiseRemoteError(connection, NULL); + ReportConnectionError(connection, ERROR); } } - foreach(connectionCell, targetConnectionList) + /* get results */ + foreach(connectionCell, connectionList) { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - PGresult *result = PQgetResult(connection); - ExecStatusType resultStatus = PQresultStatus(result); - - if (!IsResponseOK(resultStatus)) + PGresult *result = GetRemoteCommandResult(connection, true); + if (!IsResponseOK(result)) { - ReraiseRemoteError(connection, result); + ReportResultError(connection, result, ERROR); } PQclear(result); - /* clear NULL result */ - PQgetResult(connection); + ForgetResults(connection); } } @@ -172,9 +164,9 @@ void SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char *nodeUser, List *commandList) { - PGconn *workerConnection = NULL; - PGresult *queryResult = NULL; + MultiConnection *workerConnection = NULL; ListCell *commandCell = NULL; + int connectionFlags = FORCE_NEW_CONNECTION; if (XactModificationLevel > XACT_MODIFICATION_NONE) { @@ -183,376 +175,20 @@ SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char "command within a transaction"))); } - workerConnection = ConnectToNode(nodeName, nodePort, nodeUser); - if (workerConnection == NULL) + workerConnection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, + nodeUser, NULL); + + MarkRemoteTransactionCritical(workerConnection); + RemoteTransactionBegin(workerConnection); + + /* iterate over the commands and execute them in the same connection */ + foreach(commandCell, commandList) { - ereport(ERROR, (errmsg("could not open connection to %s:%d as %s", - nodeName, nodePort, nodeUser))); + char *commandString = lfirst(commandCell); + + ExecuteCriticalRemoteCommand(workerConnection, commandString); } - PG_TRY(); - { - /* start the transaction on the worker node */ - queryResult = PQexec(workerConnection, "BEGIN"); - if (PQresultStatus(queryResult) != PGRES_COMMAND_OK) - { - ReraiseRemoteError(workerConnection, queryResult); - } - - PQclear(queryResult); - - /* iterate over the commands and execute them in the same connection */ - foreach(commandCell, commandList) - { - char *commandString = lfirst(commandCell); - ExecStatusType resultStatus = PGRES_EMPTY_QUERY; - - CHECK_FOR_INTERRUPTS(); - - queryResult = PQexec(workerConnection, commandString); - resultStatus = PQresultStatus(queryResult); - if (!(resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK || - resultStatus == PGRES_COMMAND_OK)) - { - ReraiseRemoteError(workerConnection, queryResult); - } - - PQclear(queryResult); - } - - /* commit the transaction on the worker node */ - queryResult = PQexec(workerConnection, "COMMIT"); - if (PQresultStatus(queryResult) != PGRES_COMMAND_OK) - { - ReraiseRemoteError(workerConnection, queryResult); - } - - PQclear(queryResult); - - /* clear NULL result */ - PQgetResult(workerConnection); - - /* we no longer need this connection */ - CloseConnectionByPGconn(workerConnection); - } - PG_CATCH(); - { - /* close the connection */ - CloseConnectionByPGconn(workerConnection); - - PG_RE_THROW(); - } - PG_END_TRY(); -} - - -/* - * IsWorkerTransactionActive returns true if there exists any on going - * worker transactions. - */ -bool -IsWorkerTransactionActive(void) -{ - bool isWorkerTransactionActive = false; - - if (workerConnectionList != NIL) - { - isWorkerTransactionActive = true; - } - - return isWorkerTransactionActive; -} - - -/* - * RemoveWorkerTransaction removes the transaction connection to the specified node from - * the transaction connection list. - */ -void -RemoveWorkerTransaction(char *nodeName, int32 nodePort) -{ - TransactionConnection *transactionConnection = - GetWorkerTransaction(nodeName, nodePort); - - /* transactionConnection = NULL if the worker transactions have not opened before */ - if (transactionConnection != NULL) - { - PGconn *connection = transactionConnection->connection; - - /* closing the connection will rollback all uncommited transactions */ - CloseConnectionByPGconn(connection); - - workerConnectionList = list_delete(workerConnectionList, transactionConnection); - } -} - - -/* - * EnableXactCallback registers the CompleteWorkerTransactions function as the callback - * of the worker transactions. - */ -static void -EnableXactCallback(void) -{ - if (!isXactCallbackRegistered) - { - RegisterXactCallback(CompleteWorkerTransactions, NULL); - isXactCallbackRegistered = true; - } -} - - -/* - * CompleteWorkerTransaction commits or aborts pending worker transactions - * when the local transaction commits or aborts. - */ -static void -CompleteWorkerTransactions(XactEvent event, void *arg) -{ - if (workerConnectionList == NIL) - { - /* nothing to do */ - return; - } - else if (event == XACT_EVENT_PRE_COMMIT) - { - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) - { - /* - * Any failure here will cause local changes to be rolled back, - * and may leave a prepared transaction on the remote node. - */ - - PrepareRemoteTransactions(workerConnectionList); - - /* - * We are now ready to commit the local transaction, followed - * by the remote transaction. As a final step, write commit - * records to a table. If there is a last-minute crash - * on the local machine, then the absence of these records - * will indicate that the remote transactions should be rolled - * back. Otherwise, the presence of these records indicates - * that the remote transactions should be committed. - */ - - LogPreparedTransactions(workerConnectionList); - } - - return; - } - else if (event == XACT_EVENT_COMMIT) - { - /* - * A failure here may cause some prepared transactions to be - * left pending. However, the local change have already been - * committed and a commit record exists to indicate that the - * remote transaction should be committed as well. - */ - - CommitRemoteTransactions(workerConnectionList, false); - - /* - * At this point, it is safe to remove the transaction records - * for all commits that have succeeded. However, we are no - * longer in a transaction and therefore cannot make changes - * to the metadata. - */ - } - else if (event == XACT_EVENT_ABORT) - { - /* - * A failure here may cause some prepared transactions to be - * left pending. The local changes have already been rolled - * back and the absence of a commit record indicates that - * the remote transaction should be rolled back as well. - */ - - AbortRemoteTransactions(workerConnectionList); - } - else if (event == XACT_EVENT_PREPARE || event == XACT_EVENT_PRE_PREPARE) - { - /* - * If we allow a prepare we might not get to the commit handler - * in this session. We could resolve that if we intercept - * COMMIT/ABORT PREPARED commands. For now, we just error out. - */ - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot prepare a transaction that modified " - "distributed tables"))); - } - else - { - return; - } - - CloseConnections(workerConnectionList); - - /* - * Memory allocated in workerConnectionList will be reclaimed when - * TopTransactionContext is released. - */ - - workerConnectionList = NIL; -} - - -/* - * OpenWorkerTransactions opens connections to all primary workers and sends - * BEGIN commands. The returned TransactionConnection's are allocated in the - * top transaction context, such that they can still be used in the commit - * handler. The connections are made as the extension owner, such that they - * have write access to the Citus metadata tables. - */ -static List * -OpenWorkerTransactions(void) -{ - ListCell *workerNodeCell = NULL; - List *connectionList = NIL; - MemoryContext oldContext = NULL; - List *workerList = NIL; - - /* - * A new node addition might be in progress which will invalidate the - * worker list. The following statement blocks until the node addition and - * metadata syncing finishes after which we reload the worker list. - * It also ensures that no new node addition and metadata synchronization - * will run until this transaction finishes. - */ - LockMetadataSnapshot(AccessShareLock); - - workerList = WorkerNodeList(); - - oldContext = MemoryContextSwitchTo(TopTransactionContext); - - foreach(workerNodeCell, workerList) - { - WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); - char *nodeUser = CitusExtensionOwnerName(); - char *nodeName = workerNode->workerName; - int nodePort = workerNode->workerPort; - PGconn *connection = NULL; - - TransactionConnection *transactionConnection = NULL; - PGresult *result = NULL; - - connection = ConnectToNode(nodeName, nodePort, nodeUser); - if (connection == NULL) - { - ereport(ERROR, (errmsg("could not open connection to %s:%d as %s", - nodeName, nodePort, nodeUser))); - } - - result = PQexec(connection, "BEGIN"); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - ReraiseRemoteError(connection, result); - } - - PQclear(result); - - transactionConnection = palloc0(sizeof(TransactionConnection)); - - transactionConnection->groupId = workerNode->groupId; - transactionConnection->connectionId = 0; - transactionConnection->transactionState = TRANSACTION_STATE_OPEN; - transactionConnection->connection = connection; - transactionConnection->nodeName = pstrdup(nodeName); - transactionConnection->nodePort = nodePort; - - connectionList = lappend(connectionList, transactionConnection); - } - - MemoryContextSwitchTo(oldContext); - - return connectionList; -} - - -/* - * GetNodeTransactionConnection finds the opened connection for the specified - * node. Note that it opens transaction connections to all workers, by - * calling GetWorkerTransactions therefore, it is suggested to use this - * function in operations that sends commands to all workers inside a - * distributed transaction. - * - * GetNodeTransactionConnection returns NULL, if the node with the specified - * nodeName and nodePort is not found. Note that this worker may open - * connections to all workers if there were not open already. - */ -static TransactionConnection * -GetWorkerTransaction(char *nodeName, int32 nodePort) -{ - List *connectionList = NIL; - ListCell *connectionCell = NULL; - TransactionConnection *workerTransaction = NULL; - - connectionList = GetWorkerTransactions(); - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - - if (strcmp(transactionConnection->nodeName, nodeName) == 0 && - transactionConnection->nodePort == nodePort) - { - workerTransaction = transactionConnection; - break; - } - } - - return workerTransaction; -} - - -/* - * GetTargetWorkerTransactions returns a subset of all worker transactions - * matching the given target worker set. - */ -static List * -GetTargetWorkerTransactions(TargetWorkerSet targetWorkerSet) -{ - List *allWorkerConnectionsList = GetWorkerTransactions(); - List *targetConnectionList = NIL; - ListCell *connectionCell = NULL; - - if (targetWorkerSet == WORKERS_WITH_METADATA) - { - foreach(connectionCell, allWorkerConnectionsList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - char *nodeName = pstrdup(transactionConnection->nodeName); - int nodePort = transactionConnection->nodePort; - WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort); - - if (workerNode->hasMetadata) - { - targetConnectionList = lappend(targetConnectionList, - transactionConnection); - } - } - } - else - { - targetConnectionList = allWorkerConnectionsList; - } - - return targetConnectionList; -} - - -/* - * IsResponseOK checks the resultStatus and returns true if the status is OK. - */ -static bool -IsResponseOK(ExecStatusType resultStatus) -{ - if (resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK || - resultStatus == PGRES_COMMAND_OK) - { - return true; - } - - return false; + RemoteTransactionCommit(workerConnection); + CloseConnection(workerConnection); } diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index c8c5eaf07..0471503a8 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -23,6 +23,7 @@ #include "access/xact.h" #include "catalog/indexing.h" #include "commands/sequence.h" +#include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" @@ -116,7 +117,8 @@ master_remove_node(PG_FUNCTION_ARGS) nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); - RemoveWorkerTransaction(nodeNameString, nodePort); + /* make sure we don't have any open connections */ + CloseNodeConnections(nodeNameString, nodePort); SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 969b652c9..aa103be41 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -124,6 +124,7 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, const char *user, const char *database); extern MultiConnection * GetConnectionFromPGconn(struct pg_conn *pqConn); +extern void CloseNodeConnections(char *nodeName, int nodePort); extern void CloseConnection(MultiConnection *connection); extern void CloseConnectionByPGconn(struct pg_conn *pqConn); diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 66430ea64..e4b9ee78c 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -32,7 +32,12 @@ extern void ReportResultError(MultiConnection *connection, struct pg_result *res extern void LogRemoteCommand(MultiConnection *connection, const char *command); /* wrappers around libpq functions, with command logging support */ +extern void ExecuteCriticalRemoteCommand(MultiConnection *connection, + const char *command); extern int SendRemoteCommand(MultiConnection *connection, const char *command); +extern int SendRemoteCommandParams(MultiConnection *connection, const char *command, + int parameterCount, const Oid *parameterTypes, + const char *const *parameterValues); extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts); diff --git a/src/include/distributed/transaction_recovery.h b/src/include/distributed/transaction_recovery.h index 82c7d0dde..15bbecd08 100644 --- a/src/include/distributed/transaction_recovery.h +++ b/src/include/distributed/transaction_recovery.h @@ -17,6 +17,7 @@ /* Functions declarations for worker transactions */ extern void LogPreparedTransactions(List *connectionList); +extern void LogTransactionRecord(int groupId, char *transactionName); #endif /* TRANSACTION_RECOVERY_H */ diff --git a/src/test/regress/expected/multi_transaction_recovery.out b/src/test/regress/expected/multi_transaction_recovery.out index a0bc331bf..3ef82a7c0 100644 --- a/src/test/regress/expected/multi_transaction_recovery.out +++ b/src/test/regress/expected/multi_transaction_recovery.out @@ -60,3 +60,114 @@ SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit'; 1 (1 row) +\c - - - :master_port +SET citus.shard_replication_factor TO 2; +SET citus.shard_count TO 2; +SET citus.multi_shard_commit_protocol TO '2pc'; +CREATE TABLE test_recovery (x text); +SELECT create_distributed_table('test_recovery', 'x'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO test_recovery VALUES ('hello'); +-- Committed DDL commands should write 4 transaction recovery records +BEGIN; +ALTER TABLE test_recovery ADD COLUMN y text; +ROLLBACK; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +ALTER TABLE test_recovery ADD COLUMN y text; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 4 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +-- Committed master_modify_multiple_shards should write 4 transaction recovery records +BEGIN; +SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$); + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + +ROLLBACK; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$); + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +------- + 4 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +-- Committed INSERT..SELECT should write 4 transaction recovery records +BEGIN; +INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery; +ROLLBACK; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 4 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +\c - - - :master_port +DROP TABLE test_recovery; diff --git a/src/test/regress/sql/multi_transaction_recovery.sql b/src/test/regress/sql/multi_transaction_recovery.sql index b21e297f6..5f8f6bb3c 100644 --- a/src/test/regress/sql/multi_transaction_recovery.sql +++ b/src/test/regress/sql/multi_transaction_recovery.sql @@ -35,3 +35,51 @@ SELECT count(*) FROM pg_dist_transaction; \c - - - :worker_1_port SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort'; SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit'; + +\c - - - :master_port +SET citus.shard_replication_factor TO 2; +SET citus.shard_count TO 2; +SET citus.multi_shard_commit_protocol TO '2pc'; + +CREATE TABLE test_recovery (x text); +SELECT create_distributed_table('test_recovery', 'x'); +INSERT INTO test_recovery VALUES ('hello'); + +-- Committed DDL commands should write 4 transaction recovery records +BEGIN; +ALTER TABLE test_recovery ADD COLUMN y text; +ROLLBACK; +SELECT count(*) FROM pg_dist_transaction; + +ALTER TABLE test_recovery ADD COLUMN y text; + +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); +SELECT count(*) FROM pg_dist_transaction; + +-- Committed master_modify_multiple_shards should write 4 transaction recovery records +BEGIN; +SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$); +ROLLBACK; +SELECT count(*) FROM pg_dist_transaction; + +SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$); + +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); +SELECT count(*) FROM pg_dist_transaction; + +-- Committed INSERT..SELECT should write 4 transaction recovery records +BEGIN; +INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery; +ROLLBACK; +SELECT count(*) FROM pg_dist_transaction; + +INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery; + +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); +SELECT count(*) FROM pg_dist_transaction; + +\c - - - :master_port +DROP TABLE test_recovery;