diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 4fc90f705..a3f13388a 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -36,6 +36,7 @@ #include "distributed/metadata_cache.h" #include "distributed/pg_dist_transaction.h" #include "distributed/remote_commands.h" +#include "distributed/resource_lock.h" #include "distributed/transaction_recovery.h" #include "distributed/worker_manager.h" #include "distributed/version_compat.h" @@ -119,6 +120,9 @@ RecoverTwoPhaseCommits(void) { int recoveredTransactionCount = 0; + /* take advisory lock first to avoid running concurrently */ + LockTransactionRecovery(ShareUpdateExclusiveLock); + List *workerList = ActivePrimaryNodeList(NoLock); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerList) @@ -171,9 +175,8 @@ RecoverWorkerTransactions(WorkerNode *workerNode) MemoryContext oldContext = MemoryContextSwitchTo(localContext); - /* take table lock first to avoid running concurrently */ Relation pgDistTransaction = table_open(DistTransactionRelationId(), - ShareUpdateExclusiveLock); + RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction); /* diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 74f0db25b..231252e40 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -543,6 +543,20 @@ UnlockShardResource(uint64 shardId, LOCKMODE lockmode) } +/* LockTransactionRecovery acquires a lock for transaction recovery */ +void +LockTransactionRecovery(LOCKMODE lockmode) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = false; + + SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_TRANSACTION_RECOVERY); + + (void) LockAcquire(&tag, lockmode, sessionLock, dontWait); +} + + /* * LockJobResource acquires a lock for creating resources associated with the * given jobId. This resource is typically a job schema (namespace), and less diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index bb1e6ccdd..010e318ae 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -38,8 +38,14 @@ typedef enum AdvisoryLocktagClass ADV_LOCKTAG_CLASS_CITUS_JOB = 6, ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7, ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8, + ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9 } AdvisoryLocktagClass; +/* CitusOperations has constants for citus operations */ +typedef enum CitusOperations +{ + CITUS_TRANSACTION_RECOVERY = 0 +} CitusOperations; /* reuse advisory lock, but with different, unused field 4 (4)*/ #define SET_LOCKTAG_SHARD_METADATA_RESOURCE(tag, db, shardid) \ @@ -83,6 +89,14 @@ typedef enum AdvisoryLocktagClass (uint32) (colocationOrTableId), \ ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION) +/* advisory lock for citus operations, also it has the database hardcoded to MyDatabaseId, + * to ensure the locks are local to each database */ +#define SET_LOCKTAG_CITUS_OPERATION(tag, operationId) \ + SET_LOCKTAG_ADVISORY(tag, \ + MyDatabaseId, \ + (uint32) 0, \ + (uint32) operationId, \ + ADV_LOCKTAG_CLASS_CITUS_OPERATIONS) /* Lock shard/relation metadata for safe modifications */ extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode); @@ -110,6 +124,9 @@ extern void UnlockColocationId(int colocationId, LOCKMODE lockMode); extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode); extern void LockShardsInPlacementListMetadata(List *shardPlacementList, LOCKMODE lockMode); + +extern void LockTransactionRecovery(LOCKMODE lockMode); + extern void SerializeNonCommutativeWrites(List *shardIntervalList, LOCKMODE lockMode); extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode); extern List * GetSortedReferenceShardIntervals(List *relationList);