mirror of https://github.com/citusdata/citus.git
Not take ShareUpdateExlusiveLock on pg_dist_transaction (#4184)
* Not take ShareUpdateExlusiveLock on pg_dist_transaction We were taking ShareUpdateExlusiveLock on pg_dist_transaction during recovery to prevent multiple recoveries happening concurrenly. VACUUM( not FULL) also takes ShareUpdateExclusiveLock, and they can conflict. It seems that VACUUM will skip the table if there is a conflicting lock already taken unless it is doing the vacuum to prevent id wraparound, in which case there can be a deadlock. I guess the deadlock happens if: - VACUUM takes a lock on pg_dist_transaction and is done for id wraparound problem - The transaction in the maintenance tries to take a lock but cannot as that conflicts with the lock acquired by VACUUM - The transaction in the maintenance daemon has a very old xid hence VACUUM cannot proceed. If we take a row exclusive lock in transaction recovery then it wouldn't conflict with VACUUM hence it could proceed so the deadlock would be resolved. To prevent concurrent transaction recoveries happening, an advisory lock is taken with ShareUpdateExlusiveLock as before. * Use CITUS_OPERATIONS tagpull/4167/head
parent
e69ee407e1
commit
e7cd1ed0ee
|
@ -36,6 +36,7 @@
|
|||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/pg_dist_transaction.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/transaction_recovery.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/version_compat.h"
|
||||
|
@ -119,6 +120,9 @@ RecoverTwoPhaseCommits(void)
|
|||
{
|
||||
int recoveredTransactionCount = 0;
|
||||
|
||||
/* take advisory lock first to avoid running concurrently */
|
||||
LockTransactionRecovery(ShareUpdateExclusiveLock);
|
||||
|
||||
List *workerList = ActivePrimaryNodeList(NoLock);
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, workerList)
|
||||
|
@ -171,9 +175,8 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
|||
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
|
||||
|
||||
/* take table lock first to avoid running concurrently */
|
||||
Relation pgDistTransaction = table_open(DistTransactionRelationId(),
|
||||
ShareUpdateExclusiveLock);
|
||||
RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction);
|
||||
|
||||
/*
|
||||
|
|
|
@ -543,6 +543,20 @@ UnlockShardResource(uint64 shardId, LOCKMODE lockmode)
|
|||
}
|
||||
|
||||
|
||||
/* LockTransactionRecovery acquires a lock for transaction recovery */
|
||||
void
|
||||
LockTransactionRecovery(LOCKMODE lockmode)
|
||||
{
|
||||
LOCKTAG tag;
|
||||
const bool sessionLock = false;
|
||||
const bool dontWait = false;
|
||||
|
||||
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_TRANSACTION_RECOVERY);
|
||||
|
||||
(void) LockAcquire(&tag, lockmode, sessionLock, dontWait);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockJobResource acquires a lock for creating resources associated with the
|
||||
* given jobId. This resource is typically a job schema (namespace), and less
|
||||
|
|
|
@ -38,8 +38,14 @@ typedef enum AdvisoryLocktagClass
|
|||
ADV_LOCKTAG_CLASS_CITUS_JOB = 6,
|
||||
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7,
|
||||
ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8,
|
||||
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9
|
||||
} AdvisoryLocktagClass;
|
||||
|
||||
/* CitusOperations has constants for citus operations */
|
||||
typedef enum CitusOperations
|
||||
{
|
||||
CITUS_TRANSACTION_RECOVERY = 0
|
||||
} CitusOperations;
|
||||
|
||||
/* reuse advisory lock, but with different, unused field 4 (4)*/
|
||||
#define SET_LOCKTAG_SHARD_METADATA_RESOURCE(tag, db, shardid) \
|
||||
|
@ -83,6 +89,14 @@ typedef enum AdvisoryLocktagClass
|
|||
(uint32) (colocationOrTableId), \
|
||||
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION)
|
||||
|
||||
/* advisory lock for citus operations, also it has the database hardcoded to MyDatabaseId,
|
||||
* to ensure the locks are local to each database */
|
||||
#define SET_LOCKTAG_CITUS_OPERATION(tag, operationId) \
|
||||
SET_LOCKTAG_ADVISORY(tag, \
|
||||
MyDatabaseId, \
|
||||
(uint32) 0, \
|
||||
(uint32) operationId, \
|
||||
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS)
|
||||
|
||||
/* Lock shard/relation metadata for safe modifications */
|
||||
extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
|
||||
|
@ -110,6 +124,9 @@ extern void UnlockColocationId(int colocationId, LOCKMODE lockMode);
|
|||
extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode);
|
||||
extern void LockShardsInPlacementListMetadata(List *shardPlacementList,
|
||||
LOCKMODE lockMode);
|
||||
|
||||
extern void LockTransactionRecovery(LOCKMODE lockMode);
|
||||
|
||||
extern void SerializeNonCommutativeWrites(List *shardIntervalList, LOCKMODE lockMode);
|
||||
extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode);
|
||||
extern List * GetSortedReferenceShardIntervals(List *relationList);
|
||||
|
|
Loading…
Reference in New Issue