mirror of https://github.com/citusdata/citus.git
Merge pull request #4680 from citusdata/fix_deadlock
Don't include stripe reservation locks in lock graphpull/4679/head^2
commit
1d4b2e3fd0
|
@ -30,6 +30,7 @@
|
||||||
#include "commands/sequence.h"
|
#include "commands/sequence.h"
|
||||||
#include "commands/trigger.h"
|
#include "commands/trigger.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/resource_lock.h"
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
#include "executor/spi.h"
|
#include "executor/spi.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
@ -79,6 +80,8 @@ static void InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe);
|
||||||
static void GetHighestUsedAddressAndId(uint64 storageId,
|
static void GetHighestUsedAddressAndId(uint64 storageId,
|
||||||
uint64 *highestUsedAddress,
|
uint64 *highestUsedAddress,
|
||||||
uint64 *highestUsedId);
|
uint64 *highestUsedId);
|
||||||
|
static void LockForStripeReservation(Relation rel, LOCKMODE mode);
|
||||||
|
static void UnlockForStripeReservation(Relation rel, LOCKMODE mode);
|
||||||
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
|
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
|
||||||
static Oid ColumnarStorageIdSequenceRelationId(void);
|
static Oid ColumnarStorageIdSequenceRelationId(void);
|
||||||
static Oid ColumnarStripeRelationId(void);
|
static Oid ColumnarStripeRelationId(void);
|
||||||
|
@ -284,8 +287,8 @@ WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool overwrite)
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan_ordered(scanDescriptor);
|
systable_endscan_ordered(scanDescriptor);
|
||||||
index_close(index, NoLock);
|
index_close(index, AccessShareLock);
|
||||||
relation_close(columnarOptions, NoLock);
|
relation_close(columnarOptions, RowExclusiveLock);
|
||||||
|
|
||||||
return written;
|
return written;
|
||||||
}
|
}
|
||||||
|
@ -335,8 +338,8 @@ DeleteColumnarTableOptions(Oid regclass, bool missingOk)
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan_ordered(scanDescriptor);
|
systable_endscan_ordered(scanDescriptor);
|
||||||
index_close(index, NoLock);
|
index_close(index, AccessShareLock);
|
||||||
relation_close(columnarOptions, NoLock);
|
relation_close(columnarOptions, RowExclusiveLock);
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -365,7 +368,7 @@ ReadColumnarOptions(Oid regclass, ColumnarOptions *options)
|
||||||
Relation index = try_relation_open(ColumnarOptionsIndexRegclass(), AccessShareLock);
|
Relation index = try_relation_open(ColumnarOptionsIndexRegclass(), AccessShareLock);
|
||||||
if (index == NULL)
|
if (index == NULL)
|
||||||
{
|
{
|
||||||
table_close(columnarOptions, NoLock);
|
table_close(columnarOptions, AccessShareLock);
|
||||||
|
|
||||||
/* extension has been dropped */
|
/* extension has been dropped */
|
||||||
return false;
|
return false;
|
||||||
|
@ -394,8 +397,8 @@ ReadColumnarOptions(Oid regclass, ColumnarOptions *options)
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan_ordered(scanDescriptor);
|
systable_endscan_ordered(scanDescriptor);
|
||||||
index_close(index, NoLock);
|
index_close(index, AccessShareLock);
|
||||||
relation_close(columnarOptions, NoLock);
|
relation_close(columnarOptions, AccessShareLock);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -464,7 +467,7 @@ SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe, StripeSkipList *chunk
|
||||||
}
|
}
|
||||||
|
|
||||||
FinishModifyRelation(modifyState);
|
FinishModifyRelation(modifyState);
|
||||||
table_close(columnarChunk, NoLock);
|
table_close(columnarChunk, RowExclusiveLock);
|
||||||
|
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
}
|
}
|
||||||
|
@ -610,8 +613,8 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan_ordered(scanDescriptor);
|
systable_endscan_ordered(scanDescriptor);
|
||||||
index_close(index, NoLock);
|
index_close(index, AccessShareLock);
|
||||||
table_close(columnarChunk, NoLock);
|
table_close(columnarChunk, AccessShareLock);
|
||||||
|
|
||||||
return chunkList;
|
return chunkList;
|
||||||
}
|
}
|
||||||
|
@ -646,7 +649,7 @@ InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe)
|
||||||
|
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
|
|
||||||
table_close(columnarStripes, NoLock);
|
table_close(columnarStripes, RowExclusiveLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -727,6 +730,35 @@ GetHighestUsedAddressAndId(uint64 storageId,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LockForStripeReservation acquires a lock for stripe reservation.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
LockForStripeReservation(Relation rel, LOCKMODE mode)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We use an advisory lock here so we can easily detect these kind of
|
||||||
|
* locks in IsProcessWaitingForSafeOperations() and don't include them
|
||||||
|
* in the lock graph.
|
||||||
|
*/
|
||||||
|
LOCKTAG tag;
|
||||||
|
SET_LOCKTAG_COLUMNAR_STRIPE_RESERVATION(tag, rel);
|
||||||
|
LockAcquire(&tag, mode, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UnlockForStripeReservation releases the stripe reservation lock.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
UnlockForStripeReservation(Relation rel, LOCKMODE mode)
|
||||||
|
{
|
||||||
|
LOCKTAG tag;
|
||||||
|
SET_LOCKTAG_COLUMNAR_STRIPE_RESERVATION(tag, rel);
|
||||||
|
LockRelease(&tag, mode, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ReserveStripe reserves and stripe of given size for the given relation,
|
* ReserveStripe reserves and stripe of given size for the given relation,
|
||||||
* and inserts it into columnar.stripe. It is guaranteed that concurrent
|
* and inserts it into columnar.stripe. It is guaranteed that concurrent
|
||||||
|
@ -742,16 +774,13 @@ ReserveStripe(Relation rel, uint64 sizeBytes,
|
||||||
uint64 highestId = 0;
|
uint64 highestId = 0;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We take ShareUpdateExclusiveLock here, so two space
|
* We take ExclusiveLock here, so two space reservations conflict.
|
||||||
* reservations conflict, space reservation <-> vacuum
|
|
||||||
* conflict, but space reservation doesn't conflict with
|
|
||||||
* reads & writes.
|
|
||||||
*/
|
*/
|
||||||
LockRelation(rel, ShareUpdateExclusiveLock);
|
LOCKMODE lockMode = ExclusiveLock;
|
||||||
|
LockForStripeReservation(rel, lockMode);
|
||||||
|
|
||||||
RelFileNode relfilenode = rel->rd_node;
|
RelFileNode relfilenode = rel->rd_node;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If this is the first stripe for this relation, initialize the
|
* If this is the first stripe for this relation, initialize the
|
||||||
* metapage, otherwise use the previously initialized metapage.
|
* metapage, otherwise use the previously initialized metapage.
|
||||||
|
@ -793,7 +822,7 @@ ReserveStripe(Relation rel, uint64 sizeBytes,
|
||||||
|
|
||||||
InsertStripeMetadataRow(metapage->storageId, &stripe);
|
InsertStripeMetadataRow(metapage->storageId, &stripe);
|
||||||
|
|
||||||
UnlockRelation(rel, ShareUpdateExclusiveLock);
|
UnlockForStripeReservation(rel, lockMode);
|
||||||
|
|
||||||
return stripe;
|
return stripe;
|
||||||
}
|
}
|
||||||
|
@ -849,8 +878,8 @@ ReadDataFileStripeList(uint64 storageId, Snapshot snapshot)
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan_ordered(scanDescriptor);
|
systable_endscan_ordered(scanDescriptor);
|
||||||
index_close(index, NoLock);
|
index_close(index, AccessShareLock);
|
||||||
table_close(columnarStripes, NoLock);
|
table_close(columnarStripes, AccessShareLock);
|
||||||
|
|
||||||
return stripeMetadataList;
|
return stripeMetadataList;
|
||||||
}
|
}
|
||||||
|
@ -911,8 +940,8 @@ DeleteMetadataRows(RelFileNode relfilenode)
|
||||||
FinishModifyRelation(modifyState);
|
FinishModifyRelation(modifyState);
|
||||||
|
|
||||||
systable_endscan_ordered(scanDescriptor);
|
systable_endscan_ordered(scanDescriptor);
|
||||||
index_close(index, NoLock);
|
index_close(index, AccessShareLock);
|
||||||
table_close(columnarStripes, NoLock);
|
table_close(columnarStripes, AccessShareLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#include "distributed/lock_graph.h"
|
#include "distributed/lock_graph.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/tuplestore.h"
|
#include "distributed/tuplestore.h"
|
||||||
#include "storage/proc.h"
|
#include "storage/proc.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
|
@ -471,9 +472,18 @@ IsProcessWaitingForSafeOperations(PGPROC *proc)
|
||||||
PROCLOCK *waitProcLock = proc->waitProcLock;
|
PROCLOCK *waitProcLock = proc->waitProcLock;
|
||||||
LOCK *waitLock = waitProcLock->tag.myLock;
|
LOCK *waitLock = waitProcLock->tag.myLock;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Stripe reservation locks are temporary & don't hold until end of
|
||||||
|
* transaction, so we shouldn't include them in the lock graph.
|
||||||
|
*/
|
||||||
|
bool stripeReservationLock =
|
||||||
|
waitLock->tag.locktag_type == LOCKTAG_ADVISORY &&
|
||||||
|
waitLock->tag.locktag_field4 == ADV_LOCKTAG_CLASS_COLUMNAR_STRIPE_RESERVATION;
|
||||||
|
|
||||||
return waitLock->tag.locktag_type == LOCKTAG_RELATION_EXTEND ||
|
return waitLock->tag.locktag_type == LOCKTAG_RELATION_EXTEND ||
|
||||||
waitLock->tag.locktag_type == LOCKTAG_PAGE ||
|
waitLock->tag.locktag_type == LOCKTAG_PAGE ||
|
||||||
waitLock->tag.locktag_type == LOCKTAG_SPECULATIVE_TOKEN;
|
waitLock->tag.locktag_type == LOCKTAG_SPECULATIVE_TOKEN ||
|
||||||
|
stripeReservationLock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,10 @@ typedef enum AdvisoryLocktagClass
|
||||||
ADV_LOCKTAG_CLASS_CITUS_JOB = 6,
|
ADV_LOCKTAG_CLASS_CITUS_JOB = 6,
|
||||||
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7,
|
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7,
|
||||||
ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8,
|
ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8,
|
||||||
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9
|
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9,
|
||||||
|
|
||||||
|
/* Columnar lock types */
|
||||||
|
ADV_LOCKTAG_CLASS_COLUMNAR_STRIPE_RESERVATION = 10
|
||||||
} AdvisoryLocktagClass;
|
} AdvisoryLocktagClass;
|
||||||
|
|
||||||
/* CitusOperations has constants for citus operations */
|
/* CitusOperations has constants for citus operations */
|
||||||
|
@ -98,6 +101,13 @@ typedef enum CitusOperations
|
||||||
(uint32) operationId, \
|
(uint32) operationId, \
|
||||||
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS)
|
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS)
|
||||||
|
|
||||||
|
#define SET_LOCKTAG_COLUMNAR_STRIPE_RESERVATION(tag, relation) \
|
||||||
|
SET_LOCKTAG_ADVISORY(tag, \
|
||||||
|
relation->rd_lockInfo.lockRelId.dbId, \
|
||||||
|
relation->rd_lockInfo.lockRelId.relId, \
|
||||||
|
0, \
|
||||||
|
ADV_LOCKTAG_CLASS_COLUMNAR_STRIPE_RESERVATION)
|
||||||
|
|
||||||
/* Lock shard/relation metadata for safe modifications */
|
/* Lock shard/relation metadata for safe modifications */
|
||||||
extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
|
extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
|
||||||
extern void LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList);
|
extern void LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList);
|
||||||
|
|
Loading…
Reference in New Issue