Make citus_create_restore_point MX-safe by blocking 2PC commit decisions

Problem:
--------
In coordinator-only mode, citus_create_restore_point() creates consistent
restore points by blocking distributed writes at the coordinator level,
which is safe because all distributed transactions are coordinated through
the coordinator.

However, in MX mode (multi-writer), any worker with metadata can initiate
distributed transactions. The existing implementation only blocks writes
at the coordinator, allowing metadata workers to continue making 2PC commit
decisions. This can result in an inconsistent cluster state where restore
points on different nodes represent different transaction visibility.

Solution:
---------
Block distributed transaction commit decisions cluster-wide by acquiring
ExclusiveLock on pg_dist_transaction on all metadata nodes (coordinator
and MX workers). Additionally, on the coordinator only, lock pg_dist_node
and pg_dist_partition to prevent topology and schema changes.

This selective locking strategy is based on the MX mode architecture:
- DDL operations (topology changes, table creation) can ONLY be executed
  through the coordinator node, even in MX mode
- MX workers can only initiate distributed DML transactions (INSERT/UPDATE/
  DELETE) that use 2PC
- Therefore, locking pg_dist_transaction on remote metadata nodes is
  sufficient to block all distributed writes they can perform, while the
  coordinator's additional locks on pg_dist_node and pg_dist_partition
  provide cluster-wide protection against DDL changes

The implementation:
-------------------
1. Opens connections to all nodes (metadata and non-metadata workers)
2. Begins coordinated transactions on all remote connections
3. Acquires ExclusiveLock on pg_dist_node, pg_dist_partition, and
   pg_dist_transaction locally on the coordinator via LockRelationOid()
4. Acquires ExclusiveLock on pg_dist_transaction on all remote metadata
   nodes via SQL LOCK TABLE command (executed in parallel)
5. Creates restore points on all nodes in parallel (both metadata and
   non-metadata nodes need WAL restore points)
6. Closes remote connections, which releases locks via implicit ROLLBACK

Key Insight - Why No Transaction Drainage Is Needed:
-----------------------------------------------------
The commit decision in Citus 2PC occurs when LogTransactionRecord() writes
to pg_dist_transaction (using RowExclusiveLock for the insert), which
happens BEFORE the writer's local commit (in the PRE_COMMIT callback).

By holding ExclusiveLock on pg_dist_transaction:
- Transactions that have already recorded their commit decision (already
  inserted their row) will complete normally
- Transactions that haven't recorded their commit decision yet will block
  on the ExclusiveLock (which conflicts with the RowExclusiveLock needed
  for inserts), preventing them from proceeding

This creates a clean cut point for consistency without requiring us to
drain in-flight transactions. The restore point captures the exact state
of committed transactions across the cluster.

Recovery Correctness:
---------------------
The maintenance daemon's recovery logic relies on the presence of
pg_dist_transaction records to determine whether to COMMIT PREPARED
or ROLLBACK PREPARED. Our blocking ensures that:
- Prepared transactions WITH commit records will be committed on recovery
- Prepared transactions WITHOUT commit records will be rolled back on recovery

Since we create restore points while holding these locks, all nodes capture
the same set of commit decisions, ensuring cluster-wide consistency.

Backward Compatibility:
-----------------------
- Return type unchanged: still returns coordinator LSN (pg_lsn)
- Coordinator-only mode: unchanged behavior
- MX mode: automatic detection and enhanced safety (transparent)
- No SQL function signature changes required
muusama/consistent_rp
Muhammad Usama 2025-11-14 19:40:53 +03:00
parent 662b7248db
commit 0d32050145
1 changed files with 155 additions and 17 deletions

View File

@ -31,11 +31,14 @@
#define CREATE_RESTORE_POINT_COMMAND "SELECT pg_catalog.pg_create_restore_point($1::text)"
#define BLOCK_TRANSACTIONS_COMMAND \
"LOCK TABLE pg_catalog.pg_dist_transaction IN EXCLUSIVE MODE"
/* local functions forward declarations */
static List * OpenConnectionsToAllWorkerNodes(LOCKMODE lockMode);
static void BlockDistributedTransactions(void);
static void CreateRemoteRestorePoints(char *restoreName, List *connectionList);
static void BlockDistributedTransactionsOnAllMetadataNodes(List *connectionList);
/* exports for SQL callable functions */
@ -43,10 +46,28 @@ PG_FUNCTION_INFO_V1(citus_create_restore_point);
/*
* citus_create_restore_point blocks writes to distributed tables and then
* runs pg_create_restore_point on all nodes. This creates a consistent
* restore point under the assumption that there are no other writers
* than the coordinator.
* citus_create_restore_point creates a cluster-consistent restore point
* across all nodes in the Citus cluster.
*
* In coordinator-only mode, this function blocks new distributed writes
* at the coordinator and creates restore points on all worker nodes.
*
* In MX mode (multi-writer), this function blocks the 2PC commit decision
* point on all MX-enabled nodes by acquiring ExclusiveLock on the
* pg_dist_transaction catalog table across the cluster. This prevents new
* distributed transactions from recording commit decisions, ensuring that
* all restore points represent the same consistent cluster state.
*
* The function returns the LSN of the restore point on the coordinator,
* maintaining backward compatibility with the original implementation.
*
* Key insight: We do NOT need to drain in-flight transactions. The commit
* decision in Citus 2PC happens when LogTransactionRecord() writes to
* pg_dist_transaction, which occurs BEFORE the writer's local commit.
* By blocking writes to pg_dist_transaction, we prevent commit decisions
* from being made. Transactions that have already recorded their commit
* decision will complete normally, while those that haven't will
* be blocked. This creates a clean cut point for consistency.
*/
Datum
citus_create_restore_point(PG_FUNCTION_ARGS)
@ -88,22 +109,56 @@ citus_create_restore_point(PG_FUNCTION_ARGS)
* ShareLock prevents new nodes being added, rendering connectionList incomplete
*/
List *connectionList = OpenConnectionsToAllWorkerNodes(ShareLock);
XLogRecPtr localRestorePoint = InvalidXLogRecPtr;
/*
* Send a BEGIN to bust through pgbouncer. We won't actually commit since
* that takes time. Instead we just close the connections and roll back,
* which doesn't undo pg_create_restore_point.
*/
RemoteTransactionListBegin(connectionList);
PG_TRY();
{
/*
* Send a BEGIN to bust through pgbouncer. We won't actually commit since
* that takes time. Instead we just close the connections and roll back,
* which doesn't undo pg_create_restore_point.
*/
RemoteTransactionListBegin(connectionList);
/* DANGER: finish as quickly as possible after this */
BlockDistributedTransactions();
/* DANGER: finish as quickly as possible after this */
BlockDistributedTransactions();
/* do local restore point first to bail out early if something goes wrong */
XLogRecPtr localRestorePoint = XLogRestorePoint(restoreNameString);
BlockDistributedTransactionsOnAllMetadataNodes(connectionList);
/* run pg_create_restore_point on all nodes */
CreateRemoteRestorePoints(restoreNameString, connectionList);
/* do local restore point first to bail out early if something goes wrong */
localRestorePoint = XLogRestorePoint(restoreNameString);
/* run pg_create_restore_point on all nodes */
CreateRemoteRestorePoints(restoreNameString, connectionList);
/* close connections to all nodes and
* all locks gets released as part of the transaction rollback
*/
MultiConnection *conn = NULL;
foreach_declared_ptr(conn, connectionList)
{
ForgetResults(conn);
CloseConnection(conn);
}
connectionList = NIL;
}
PG_CATCH();
{
/*
* On error, ensure we clean up connections and release locks.
* Rolling back the metadata node transactions releases the
* ExclusiveLocks on pg_dist_transaction cluster-wide.
*/
MultiConnection *conn = NULL;
foreach_declared_ptr(conn, connectionList)
{
ForgetResults(conn);
CloseConnection(conn);
}
connectionList = NIL;
PG_RE_THROW();
}
PG_END_TRY();
PG_RETURN_LSN(localRestorePoint);
}
@ -152,6 +207,90 @@ BlockDistributedTransactions(void)
}
/*
* BlockDistributedTransactionsOnAllMetadataNodes blocks distributed transactions
* on all metadata nodes by executing pg_lock_table remotely.
*
* This is the MX-mode equivalent of BlockDistributedTransactions(), extended
* to all nodes capable of initiating distributed transactions. We must hold
* these locks across the cluster to prevent commit decisions from being made
* on any node.
*
* The function expects that connections are already in a transaction block
* (BEGIN has been sent). The locks will be held until the transaction is
* rolled back or committed.
*/
static void
BlockDistributedTransactionsOnAllMetadataNodes(List *connectionList)
{
/*
* Send LOCK TABLE commands to all metadata nodes in parallel. We use
* standard SQL LOCK TABLE syntax to acquire ExclusiveLock on catalog
* tables, mirroring what BlockDistributedTransactions() does on the
* coordinator via LockRelationOid().
*
* The BLOCK_TRANSACTIONS_COMMAND acquires:
* 1. ExclusiveLock on pg_dist_transaction (blocks 2PC commit decisions)
*
* Note: Unlike the local coordinator lock which also locks pg_dist_node
* and pg_dist_partition, we only lock pg_dist_transaction on remote nodes
* because DDL and node management operations are coordinator-only even in
* MX mode. This is sufficient to block distributed writes while allowing
* the restore point operation to complete quickly.
*
* These locks naturally serialize concurrent restore point operations
* cluster-wide, so no additional advisory lock is needed.
*/
/* Build list of remote metadata node connections */
List *metadataConnectionList = NIL;
MultiConnection *connection = NULL;
foreach_declared_ptr(connection, connectionList)
{
WorkerNode *workerNode = FindWorkerNode(connection->hostname, connection->port);
bool isRemoteMetadataNode = workerNode != NULL &&
NodeIsPrimaryAndRemote(workerNode);
if (isRemoteMetadataNode)
{
metadataConnectionList = lappend(metadataConnectionList, connection);
}
}
/* Send lock commands in parallel to all remote metadata nodes */
foreach_declared_ptr(connection, metadataConnectionList)
{
/*
* We could use ExecuteCriticalRemoteCommand instead, but it would
* not allow us to execute the commands in parallel. So for sake of
* performance, we use SendRemoteCommand and send lock commands in parallel
* to all metadata nodes, and later wait for all lock acquisitions to complete.
*/
int querySent = SendRemoteCommand(connection, BLOCK_TRANSACTIONS_COMMAND);
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
/*
* Wait for all lock acquisitions to complete. If any node fails to
* acquire locks (e.g., due to a conflicting lock), this will error out.
*/
foreach_declared_ptr(connection, metadataConnectionList)
{
PGresult *result = GetRemoteCommandResult(connection, true);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
ForgetResults(connection);
}
}
/*
* CreateRemoteRestorePoints creates a restore point via each of the
* connections in the list in parallel.
@ -186,6 +325,5 @@ CreateRemoteRestorePoints(char *restoreName, List *connectionList)
PQclear(result);
ForgetResults(connection);
CloseConnection(connection);
}
}