diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index df2a96d9f..14e270aa6 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -308,6 +308,43 @@ GetConnectionFromPGconn(struct pg_conn *pqConn) } +/* + * CloseNodeConnections closes all the connections to a particular node. + * This is mainly used when a worker leaves the cluster + */ +void +CloseNodeConnections(char *nodeName, int nodePort) +{ + HASH_SEQ_STATUS status; + ConnectionHashEntry *entry; + + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + dlist_head *connections = entry->connections; + + if (strcmp(entry->key.hostname, nodeName) != 0 || entry->key.port != nodePort) + { + continue; + } + + while (!dlist_is_empty(connections)) + { + dlist_node *currentNode = dlist_pop_head_node(connections); + + MultiConnection *connection = + dlist_container(MultiConnection, connectionNode, currentNode); + + /* same for transaction state */ + CloseRemoteTransaction(connection); + + /* we leave the per-host entry alive */ + pfree(connection); + } + } +} + + /* * Close a previously established connection. */ diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index e342c4403..d17a44d5a 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -202,7 +202,7 @@ ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command) querySent = SendRemoteCommand(connection, command); if (querySent == 0) { - ReportConnectionError(connection, WARNING); + ReportConnectionError(connection, ERROR); } result = GetRemoteCommandResult(connection, raiseInterrupts); diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 9d3f99541..39a3bdfca 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -244,7 +244,7 @@ InitTransactionStateForTask(Task *task) { ListCell *placementCell = NULL; - BeginCoordinatedTransaction(); + BeginOrContinueCoordinatedTransaction(); xactParticipantHash = CreateXactParticipantHash(); diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 775fb4c1d..8fd0e5395 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -33,41 +33,6 @@ #include "utils/memutils.h" -/* Local functions forward declarations */ -static void EnableXactCallback(void); -static void CompleteWorkerTransactions(XactEvent event, void *arg); -static List * OpenWorkerTransactions(void); -static TransactionConnection * GetWorkerTransaction(char *nodeName, int32 nodePort); -static List * GetTargetWorkerTransactions(TargetWorkerSet targetWorkerSet); - - -/* Global worker connection list */ -static List *workerConnectionList = NIL; -static bool isXactCallbackRegistered = false; - - -/* - * GetWorkerTransactions opens connections to all workers and starts - * a transaction block that is committed or aborted when the local - * transaction commits or aborts. Multiple invocations of - * GetWorkerTransactions within the same transaction will return - * the same list of connections. - */ -List * -GetWorkerTransactions(void) -{ - if (workerConnectionList == NIL) - { - InitializeDistributedTransaction(); - EnableXactCallback(); - - workerConnectionList = OpenWorkerTransactions(); - } - - return workerConnectionList; -} - - /* * SendCommandToWorker sends a command to a particular worker as part of the * 2PC. @@ -75,25 +40,19 @@ GetWorkerTransactions(void) void SendCommandToWorker(char *nodeName, int32 nodePort, char *command) { - TransactionConnection *transactionConnection = NULL; - PGresult *queryResult = NULL; - ExecStatusType resultStatus = PGRES_EMPTY_QUERY; + MultiConnection *transactionConnection = NULL; + char *nodeUser = CitusExtensionOwnerName(); + int connectionFlags = 0; - transactionConnection = GetWorkerTransaction(nodeName, nodePort); - if (transactionConnection == NULL) - { - ereport(ERROR, (errmsg("worker %s:%d is not part of current transaction", - nodeName, nodePort))); - } + BeginOrContinueCoordinatedTransaction(); + CoordinatedTransactionUse2PC(); - queryResult = PQexec(transactionConnection->connection, command); - resultStatus = PQresultStatus(queryResult); - if (resultStatus != PGRES_COMMAND_OK && resultStatus != PGRES_TUPLES_OK) - { - ReraiseRemoteError(transactionConnection->connection, queryResult); - } + transactionConnection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, + nodePort, nodeUser, NULL); - PQclear(queryResult); + MarkRemoteTransactionCritical(transactionConnection); + RemoteTransactionBeginIfNecessary(transactionConnection); + ExecuteCriticalRemoteCommand(transactionConnection, command); } @@ -123,41 +82,74 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command, int parameterCount, const Oid *parameterTypes, const char *const *parameterValues) { + List *connectionList = NIL; ListCell *connectionCell = NULL; - List *targetConnectionList = GetTargetWorkerTransactions(targetWorkerSet); + List *workerNodeList = WorkerNodeList(); + ListCell *workerNodeCell = NULL; + char *nodeUser = CitusExtensionOwnerName(); - foreach(connectionCell, targetConnectionList) + BeginOrContinueCoordinatedTransaction(); + CoordinatedTransactionUse2PC(); + + /* open connections in parallel */ + foreach(workerNodeCell, workerNodeList) { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + MultiConnection *connection = NULL; + int connectionFlags = 0; - PGconn *connection = transactionConnection->connection; + if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata) + { + continue; + } - int querySent = PQsendQueryParams(connection, command, parameterCount, - parameterTypes, parameterValues, NULL, NULL, 0); + connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, + nodeUser, NULL); + + MarkRemoteTransactionCritical(connection); + + connectionList = lappend(connectionList, connection); + } + + /* finish opening connections */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + + FinishConnectionEstablishment(connection); + } + + RemoteTransactionsBeginIfNecessary(connectionList); + + /* send commands in parallel */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + + int querySent = SendRemoteCommandParams(connection, command, parameterCount, + parameterTypes, parameterValues); if (querySent == 0) { - ReraiseRemoteError(connection, NULL); + ReportConnectionError(connection, ERROR); } } - foreach(connectionCell, targetConnectionList) + /* get results */ + foreach(connectionCell, connectionList) { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - - PGconn *connection = transactionConnection->connection; - PGresult *result = PQgetResult(connection); + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + PGresult *result = GetRemoteCommandResult(connection, true); if (!IsResponseOK(result)) { - ReraiseRemoteError(connection, result); + ReportResultError(connection, result, ERROR); } PQclear(result); - /* clear NULL result */ - PQgetResult(connection); + ForgetResults(connection); } } @@ -200,299 +192,3 @@ SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char RemoteTransactionCommit(workerConnection); CloseConnection(workerConnection); } - - -/* - * IsWorkerTransactionActive returns true if there exists any on going - * worker transactions. - */ -bool -IsWorkerTransactionActive(void) -{ - bool isWorkerTransactionActive = false; - - if (workerConnectionList != NIL) - { - isWorkerTransactionActive = true; - } - - return isWorkerTransactionActive; -} - - -/* - * RemoveWorkerTransaction removes the transaction connection to the specified node from - * the transaction connection list. - */ -void -RemoveWorkerTransaction(char *nodeName, int32 nodePort) -{ - TransactionConnection *transactionConnection = - GetWorkerTransaction(nodeName, nodePort); - - /* transactionConnection = NULL if the worker transactions have not opened before */ - if (transactionConnection != NULL) - { - PGconn *connection = transactionConnection->connection; - - /* closing the connection will rollback all uncommited transactions */ - CloseConnectionByPGconn(connection); - - workerConnectionList = list_delete(workerConnectionList, transactionConnection); - } -} - - -/* - * EnableXactCallback registers the CompleteWorkerTransactions function as the callback - * of the worker transactions. - */ -static void -EnableXactCallback(void) -{ - if (!isXactCallbackRegistered) - { - RegisterXactCallback(CompleteWorkerTransactions, NULL); - isXactCallbackRegistered = true; - } -} - - -/* - * CompleteWorkerTransaction commits or aborts pending worker transactions - * when the local transaction commits or aborts. - */ -static void -CompleteWorkerTransactions(XactEvent event, void *arg) -{ - if (workerConnectionList == NIL) - { - /* nothing to do */ - return; - } - else if (event == XACT_EVENT_PRE_COMMIT) - { - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) - { - /* - * Any failure here will cause local changes to be rolled back, - * and may leave a prepared transaction on the remote node. - */ - - PrepareRemoteTransactions(workerConnectionList); - - /* - * We are now ready to commit the local transaction, followed - * by the remote transaction. As a final step, write commit - * records to a table. If there is a last-minute crash - * on the local machine, then the absence of these records - * will indicate that the remote transactions should be rolled - * back. Otherwise, the presence of these records indicates - * that the remote transactions should be committed. - */ - - LogPreparedTransactions(workerConnectionList); - } - - return; - } - else if (event == XACT_EVENT_COMMIT) - { - /* - * A failure here may cause some prepared transactions to be - * left pending. However, the local change have already been - * committed and a commit record exists to indicate that the - * remote transaction should be committed as well. - */ - - CommitRemoteTransactions(workerConnectionList, false); - - /* - * At this point, it is safe to remove the transaction records - * for all commits that have succeeded. However, we are no - * longer in a transaction and therefore cannot make changes - * to the metadata. - */ - } - else if (event == XACT_EVENT_ABORT) - { - /* - * A failure here may cause some prepared transactions to be - * left pending. The local changes have already been rolled - * back and the absence of a commit record indicates that - * the remote transaction should be rolled back as well. - */ - - AbortRemoteTransactions(workerConnectionList); - } - else if (event == XACT_EVENT_PREPARE || event == XACT_EVENT_PRE_PREPARE) - { - /* - * If we allow a prepare we might not get to the commit handler - * in this session. We could resolve that if we intercept - * COMMIT/ABORT PREPARED commands. For now, we just error out. - */ - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot prepare a transaction that modified " - "distributed tables"))); - } - else - { - return; - } - - CloseConnections(workerConnectionList); - - /* - * Memory allocated in workerConnectionList will be reclaimed when - * TopTransactionContext is released. - */ - - workerConnectionList = NIL; -} - - -/* - * OpenWorkerTransactions opens connections to all primary workers and sends - * BEGIN commands. The returned TransactionConnection's are allocated in the - * top transaction context, such that they can still be used in the commit - * handler. The connections are made as the extension owner, such that they - * have write access to the Citus metadata tables. - */ -static List * -OpenWorkerTransactions(void) -{ - ListCell *workerNodeCell = NULL; - List *connectionList = NIL; - MemoryContext oldContext = NULL; - List *workerList = NIL; - - /* - * A new node addition might be in progress which will invalidate the - * worker list. The following statement blocks until the node addition and - * metadata syncing finishes after which we reload the worker list. - * It also ensures that no new node addition and metadata synchronization - * will run until this transaction finishes. - */ - LockMetadataSnapshot(AccessShareLock); - - workerList = WorkerNodeList(); - - oldContext = MemoryContextSwitchTo(TopTransactionContext); - - foreach(workerNodeCell, workerList) - { - WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); - char *nodeUser = CitusExtensionOwnerName(); - char *nodeName = workerNode->workerName; - int nodePort = workerNode->workerPort; - PGconn *connection = NULL; - - TransactionConnection *transactionConnection = NULL; - PGresult *result = NULL; - - connection = ConnectToNode(nodeName, nodePort, nodeUser); - if (connection == NULL) - { - ereport(ERROR, (errmsg("could not open connection to %s:%d as %s", - nodeName, nodePort, nodeUser))); - } - - result = PQexec(connection, "BEGIN"); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - ReraiseRemoteError(connection, result); - } - - PQclear(result); - - transactionConnection = palloc0(sizeof(TransactionConnection)); - - transactionConnection->groupId = workerNode->groupId; - transactionConnection->connectionId = 0; - transactionConnection->transactionState = TRANSACTION_STATE_OPEN; - transactionConnection->connection = connection; - transactionConnection->nodeName = pstrdup(nodeName); - transactionConnection->nodePort = nodePort; - - connectionList = lappend(connectionList, transactionConnection); - } - - MemoryContextSwitchTo(oldContext); - - return connectionList; -} - - -/* - * GetNodeTransactionConnection finds the opened connection for the specified - * node. Note that it opens transaction connections to all workers, by - * calling GetWorkerTransactions therefore, it is suggested to use this - * function in operations that sends commands to all workers inside a - * distributed transaction. - * - * GetNodeTransactionConnection returns NULL, if the node with the specified - * nodeName and nodePort is not found. Note that this worker may open - * connections to all workers if there were not open already. - */ -static TransactionConnection * -GetWorkerTransaction(char *nodeName, int32 nodePort) -{ - List *connectionList = NIL; - ListCell *connectionCell = NULL; - TransactionConnection *workerTransaction = NULL; - - connectionList = GetWorkerTransactions(); - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - - if (strcmp(transactionConnection->nodeName, nodeName) == 0 && - transactionConnection->nodePort == nodePort) - { - workerTransaction = transactionConnection; - break; - } - } - - return workerTransaction; -} - - -/* - * GetTargetWorkerTransactions returns a subset of all worker transactions - * matching the given target worker set. - */ -static List * -GetTargetWorkerTransactions(TargetWorkerSet targetWorkerSet) -{ - List *allWorkerConnectionsList = GetWorkerTransactions(); - List *targetConnectionList = NIL; - ListCell *connectionCell = NULL; - - if (targetWorkerSet == WORKERS_WITH_METADATA) - { - foreach(connectionCell, allWorkerConnectionsList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - char *nodeName = pstrdup(transactionConnection->nodeName); - int nodePort = transactionConnection->nodePort; - WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort); - - if (workerNode->hasMetadata) - { - targetConnectionList = lappend(targetConnectionList, - transactionConnection); - } - } - } - else - { - targetConnectionList = allWorkerConnectionsList; - } - - return targetConnectionList; -} diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index c8c5eaf07..0471503a8 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -23,6 +23,7 @@ #include "access/xact.h" #include "catalog/indexing.h" #include "commands/sequence.h" +#include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" @@ -116,7 +117,8 @@ master_remove_node(PG_FUNCTION_ARGS) nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); - RemoveWorkerTransaction(nodeNameString, nodePort); + /* make sure we don't have any open connections */ + CloseNodeConnections(nodeNameString, nodePort); SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 969b652c9..aa103be41 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -124,6 +124,7 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, const char *user, const char *database); extern MultiConnection * GetConnectionFromPGconn(struct pg_conn *pqConn); +extern void CloseNodeConnections(char *nodeName, int nodePort); extern void CloseConnection(MultiConnection *connection); extern void CloseConnectionByPGconn(struct pg_conn *pqConn);