From e7cd1ed0ee0b80d518e9e4cb2eb851e0b4458264 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Mon, 21 Sep 2020 15:20:38 +0300 Subject: [PATCH] Not take ShareUpdateExlusiveLock on pg_dist_transaction (#4184) * Not take ShareUpdateExlusiveLock on pg_dist_transaction We were taking ShareUpdateExlusiveLock on pg_dist_transaction during recovery to prevent multiple recoveries happening concurrenly. VACUUM( not FULL) also takes ShareUpdateExclusiveLock, and they can conflict. It seems that VACUUM will skip the table if there is a conflicting lock already taken unless it is doing the vacuum to prevent id wraparound, in which case there can be a deadlock. I guess the deadlock happens if: - VACUUM takes a lock on pg_dist_transaction and is done for id wraparound problem - The transaction in the maintenance tries to take a lock but cannot as that conflicts with the lock acquired by VACUUM - The transaction in the maintenance daemon has a very old xid hence VACUUM cannot proceed. If we take a row exclusive lock in transaction recovery then it wouldn't conflict with VACUUM hence it could proceed so the deadlock would be resolved. To prevent concurrent transaction recoveries happening, an advisory lock is taken with ShareUpdateExlusiveLock as before. * Use CITUS_OPERATIONS tag --- .../transaction/transaction_recovery.c | 7 +++++-- src/backend/distributed/utils/resource_lock.c | 14 ++++++++++++++ src/include/distributed/resource_lock.h | 17 +++++++++++++++++ 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 4fc90f705..a3f13388a 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -36,6 +36,7 @@ #include "distributed/metadata_cache.h" #include "distributed/pg_dist_transaction.h" #include "distributed/remote_commands.h" +#include "distributed/resource_lock.h" #include "distributed/transaction_recovery.h" #include "distributed/worker_manager.h" #include "distributed/version_compat.h" @@ -119,6 +120,9 @@ RecoverTwoPhaseCommits(void) { int recoveredTransactionCount = 0; + /* take advisory lock first to avoid running concurrently */ + LockTransactionRecovery(ShareUpdateExclusiveLock); + List *workerList = ActivePrimaryNodeList(NoLock); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerList) @@ -171,9 +175,8 @@ RecoverWorkerTransactions(WorkerNode *workerNode) MemoryContext oldContext = MemoryContextSwitchTo(localContext); - /* take table lock first to avoid running concurrently */ Relation pgDistTransaction = table_open(DistTransactionRelationId(), - ShareUpdateExclusiveLock); + RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction); /* diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 74f0db25b..231252e40 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -543,6 +543,20 @@ UnlockShardResource(uint64 shardId, LOCKMODE lockmode) } +/* LockTransactionRecovery acquires a lock for transaction recovery */ +void +LockTransactionRecovery(LOCKMODE lockmode) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = false; + + SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_TRANSACTION_RECOVERY); + + (void) LockAcquire(&tag, lockmode, sessionLock, dontWait); +} + + /* * LockJobResource acquires a lock for creating resources associated with the * given jobId. This resource is typically a job schema (namespace), and less diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index bb1e6ccdd..010e318ae 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -38,8 +38,14 @@ typedef enum AdvisoryLocktagClass ADV_LOCKTAG_CLASS_CITUS_JOB = 6, ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7, ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8, + ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9 } AdvisoryLocktagClass; +/* CitusOperations has constants for citus operations */ +typedef enum CitusOperations +{ + CITUS_TRANSACTION_RECOVERY = 0 +} CitusOperations; /* reuse advisory lock, but with different, unused field 4 (4)*/ #define SET_LOCKTAG_SHARD_METADATA_RESOURCE(tag, db, shardid) \ @@ -83,6 +89,14 @@ typedef enum AdvisoryLocktagClass (uint32) (colocationOrTableId), \ ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION) +/* advisory lock for citus operations, also it has the database hardcoded to MyDatabaseId, + * to ensure the locks are local to each database */ +#define SET_LOCKTAG_CITUS_OPERATION(tag, operationId) \ + SET_LOCKTAG_ADVISORY(tag, \ + MyDatabaseId, \ + (uint32) 0, \ + (uint32) operationId, \ + ADV_LOCKTAG_CLASS_CITUS_OPERATIONS) /* Lock shard/relation metadata for safe modifications */ extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode); @@ -110,6 +124,9 @@ extern void UnlockColocationId(int colocationId, LOCKMODE lockMode); extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode); extern void LockShardsInPlacementListMetadata(List *shardPlacementList, LOCKMODE lockMode); + +extern void LockTransactionRecovery(LOCKMODE lockMode); + extern void SerializeNonCommutativeWrites(List *shardIntervalList, LOCKMODE lockMode); extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode); extern List * GetSortedReferenceShardIntervals(List *relationList);