diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 8d28dd0b1..41f478a12 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -30,6 +30,7 @@ #include "distributed/metadata_cache.h" #include "distributed/pg_dist_transaction.h" #include "distributed/remote_commands.h" +#include "distributed/resource_lock.h" #include "distributed/transaction_recovery.h" #include "distributed/worker_manager.h" #include "distributed/version_compat.h" @@ -119,6 +120,9 @@ RecoverTwoPhaseCommits(void) ListCell *workerNodeCell = NULL; int recoveredTransactionCount = 0; + /* take advisory lock first to avoid running concurrently */ + LockTransactionRecovery(ShareUpdateExclusiveLock); + workerList = ActivePrimaryNodeList(); foreach(workerNodeCell, workerList) @@ -185,8 +189,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode) oldContext = MemoryContextSwitchTo(localContext); - /* take table lock first to avoid running concurrently */ - pgDistTransaction = heap_open(DistTransactionRelationId(), ShareUpdateExclusiveLock); + pgDistTransaction = heap_open(DistTransactionRelationId(), RowExclusiveLock); tupleDescriptor = RelationGetDescr(pgDistTransaction); /* diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index b04ecc66b..2ba7d3da8 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -435,6 +435,20 @@ UnlockShardResource(uint64 shardId, LOCKMODE lockmode) } +/* LockTransactionRecovery acquires a lock for transaction recovery */ +void +LockTransactionRecovery(LOCKMODE lockmode) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = false; + + SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_TRANSACTION_RECOVERY); + + (void) LockAcquire(&tag, lockmode, sessionLock, dontWait); +} + + /* * LockJobResource acquires a lock for creating resources associated with the * given jobId. This resource is typically a job schema (namespace), and less diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index b38c3ec67..98773e936 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -35,9 +35,17 @@ typedef enum AdvisoryLocktagClass /* Citus lock types */ ADV_LOCKTAG_CLASS_CITUS_SHARD_METADATA = 4, ADV_LOCKTAG_CLASS_CITUS_SHARD = 5, - ADV_LOCKTAG_CLASS_CITUS_JOB = 6 + ADV_LOCKTAG_CLASS_CITUS_JOB = 6, + ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7, + ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8, + ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9 } AdvisoryLocktagClass; +/* CitusOperations has constants for citus operations */ +typedef enum CitusOperations +{ + CITUS_TRANSACTION_RECOVERY = 0 +} CitusOperations; /* reuse advisory lock, but with different, unused field 4 (4)*/ #define SET_LOCKTAG_SHARD_METADATA_RESOURCE(tag, db, shardid) \ @@ -63,6 +71,14 @@ typedef enum AdvisoryLocktagClass (uint32) (jobid), \ ADV_LOCKTAG_CLASS_CITUS_JOB) +/* advisory lock for citus operations, also it has the database hardcoded to MyDatabaseId, + * to ensure the locks are local to each database */ +#define SET_LOCKTAG_CITUS_OPERATION(tag, operationId) \ + SET_LOCKTAG_ADVISORY(tag, \ + MyDatabaseId, \ + (uint32) 0, \ + (uint32) operationId, \ + ADV_LOCKTAG_CLASS_CITUS_OPERATIONS) /* Lock shard/relation metadata for safe modifications */ extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode); @@ -86,6 +102,9 @@ extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode); extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode); extern void LockShardsInPlacementListMetadata(List *shardPlacementList, LOCKMODE lockMode); + +extern void LockTransactionRecovery(LOCKMODE lockMode); + extern void SerializeNonCommutativeWrites(List *shardIntervalList, LOCKMODE lockMode); extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode); extern List * GetSortedReferenceShardIntervals(List *relationList);