diff --git a/src/backend/distributed/operations/citus_create_restore_point.c b/src/backend/distributed/operations/citus_create_restore_point.c index 18081b6e4..9da5d51b4 100644 --- a/src/backend/distributed/operations/citus_create_restore_point.c +++ b/src/backend/distributed/operations/citus_create_restore_point.c @@ -31,11 +31,14 @@ #define CREATE_RESTORE_POINT_COMMAND "SELECT pg_catalog.pg_create_restore_point($1::text)" +#define BLOCK_TRANSACTIONS_COMMAND \ + "LOCK TABLE pg_catalog.pg_dist_transaction IN EXCLUSIVE MODE" /* local functions forward declarations */ static List * OpenConnectionsToAllWorkerNodes(LOCKMODE lockMode); static void BlockDistributedTransactions(void); static void CreateRemoteRestorePoints(char *restoreName, List *connectionList); +static void BlockDistributedTransactionsOnAllMetadataNodes(List *connectionList); /* exports for SQL callable functions */ @@ -43,10 +46,28 @@ PG_FUNCTION_INFO_V1(citus_create_restore_point); /* - * citus_create_restore_point blocks writes to distributed tables and then - * runs pg_create_restore_point on all nodes. This creates a consistent - * restore point under the assumption that there are no other writers - * than the coordinator. + * citus_create_restore_point creates a cluster-consistent restore point + * across all nodes in the Citus cluster. + * + * In coordinator-only mode, this function blocks new distributed writes + * at the coordinator and creates restore points on all worker nodes. + * + * In MX mode (multi-writer), this function blocks the 2PC commit decision + * point on all MX-enabled nodes by acquiring ExclusiveLock on the + * pg_dist_transaction catalog table across the cluster. This prevents new + * distributed transactions from recording commit decisions, ensuring that + * all restore points represent the same consistent cluster state. + * + * The function returns the LSN of the restore point on the coordinator, + * maintaining backward compatibility with the original implementation. + * + * Key insight: We do NOT need to drain in-flight transactions. The commit + * decision in Citus 2PC happens when LogTransactionRecord() writes to + * pg_dist_transaction, which occurs BEFORE the writer's local commit. + * By blocking writes to pg_dist_transaction, we prevent commit decisions + * from being made. Transactions that have already recorded their commit + * decision will complete normally, while those that haven't will + * be blocked. This creates a clean cut point for consistency. */ Datum citus_create_restore_point(PG_FUNCTION_ARGS) @@ -88,22 +109,56 @@ citus_create_restore_point(PG_FUNCTION_ARGS) * ShareLock prevents new nodes being added, rendering connectionList incomplete */ List *connectionList = OpenConnectionsToAllWorkerNodes(ShareLock); + XLogRecPtr localRestorePoint = InvalidXLogRecPtr; - /* - * Send a BEGIN to bust through pgbouncer. We won't actually commit since - * that takes time. Instead we just close the connections and roll back, - * which doesn't undo pg_create_restore_point. - */ - RemoteTransactionListBegin(connectionList); + PG_TRY(); + { + /* + * Send a BEGIN to bust through pgbouncer. We won't actually commit since + * that takes time. Instead we just close the connections and roll back, + * which doesn't undo pg_create_restore_point. + */ + RemoteTransactionListBegin(connectionList); - /* DANGER: finish as quickly as possible after this */ - BlockDistributedTransactions(); + /* DANGER: finish as quickly as possible after this */ + BlockDistributedTransactions(); - /* do local restore point first to bail out early if something goes wrong */ - XLogRecPtr localRestorePoint = XLogRestorePoint(restoreNameString); + BlockDistributedTransactionsOnAllMetadataNodes(connectionList); - /* run pg_create_restore_point on all nodes */ - CreateRemoteRestorePoints(restoreNameString, connectionList); + /* do local restore point first to bail out early if something goes wrong */ + localRestorePoint = XLogRestorePoint(restoreNameString); + + /* run pg_create_restore_point on all nodes */ + CreateRemoteRestorePoints(restoreNameString, connectionList); + + /* close connections to all nodes and + * all locks gets released as part of the transaction rollback + */ + MultiConnection *conn = NULL; + foreach_declared_ptr(conn, connectionList) + { + ForgetResults(conn); + CloseConnection(conn); + } + connectionList = NIL; + } + PG_CATCH(); + { + /* + * On error, ensure we clean up connections and release locks. + * Rolling back the metadata node transactions releases the + * ExclusiveLocks on pg_dist_transaction cluster-wide. + */ + MultiConnection *conn = NULL; + foreach_declared_ptr(conn, connectionList) + { + ForgetResults(conn); + CloseConnection(conn); + } + connectionList = NIL; + PG_RE_THROW(); + } + PG_END_TRY(); PG_RETURN_LSN(localRestorePoint); } @@ -152,6 +207,90 @@ BlockDistributedTransactions(void) } +/* + * BlockDistributedTransactionsOnAllMetadataNodes blocks distributed transactions + * on all metadata nodes by executing pg_lock_table remotely. + * + * This is the MX-mode equivalent of BlockDistributedTransactions(), extended + * to all nodes capable of initiating distributed transactions. We must hold + * these locks across the cluster to prevent commit decisions from being made + * on any node. + * + * The function expects that connections are already in a transaction block + * (BEGIN has been sent). The locks will be held until the transaction is + * rolled back or committed. + */ +static void +BlockDistributedTransactionsOnAllMetadataNodes(List *connectionList) +{ + /* + * Send LOCK TABLE commands to all metadata nodes in parallel. We use + * standard SQL LOCK TABLE syntax to acquire ExclusiveLock on catalog + * tables, mirroring what BlockDistributedTransactions() does on the + * coordinator via LockRelationOid(). + * + * The BLOCK_TRANSACTIONS_COMMAND acquires: + * 1. ExclusiveLock on pg_dist_transaction (blocks 2PC commit decisions) + * + * Note: Unlike the local coordinator lock which also locks pg_dist_node + * and pg_dist_partition, we only lock pg_dist_transaction on remote nodes + * because DDL and node management operations are coordinator-only even in + * MX mode. This is sufficient to block distributed writes while allowing + * the restore point operation to complete quickly. + * + * These locks naturally serialize concurrent restore point operations + * cluster-wide, so no additional advisory lock is needed. + */ + + /* Build list of remote metadata node connections */ + List *metadataConnectionList = NIL; + MultiConnection *connection = NULL; + foreach_declared_ptr(connection, connectionList) + { + WorkerNode *workerNode = FindWorkerNode(connection->hostname, connection->port); + bool isRemoteMetadataNode = workerNode != NULL && + NodeIsPrimaryAndRemote(workerNode); + + if (isRemoteMetadataNode) + { + metadataConnectionList = lappend(metadataConnectionList, connection); + } + } + + /* Send lock commands in parallel to all remote metadata nodes */ + foreach_declared_ptr(connection, metadataConnectionList) + { + /* + * We could use ExecuteCriticalRemoteCommand instead, but it would + * not allow us to execute the commands in parallel. So for sake of + * performance, we use SendRemoteCommand and send lock commands in parallel + * to all metadata nodes, and later wait for all lock acquisitions to complete. + */ + int querySent = SendRemoteCommand(connection, BLOCK_TRANSACTIONS_COMMAND); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + } + + /* + * Wait for all lock acquisitions to complete. If any node fails to + * acquire locks (e.g., due to a conflicting lock), this will error out. + */ + foreach_declared_ptr(connection, metadataConnectionList) + { + PGresult *result = GetRemoteCommandResult(connection, true); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + + PQclear(result); + ForgetResults(connection); + } +} + + /* * CreateRemoteRestorePoints creates a restore point via each of the * connections in the list in parallel. @@ -186,6 +325,5 @@ CreateRemoteRestorePoints(char *restoreName, List *connectionList) PQclear(result); ForgetResults(connection); - CloseConnection(connection); } }