mirror of https://github.com/citusdata/citus.git
Add some missing locks.
parent
130e999ac7
commit
c582eb89c8
|
@ -48,9 +48,32 @@ static void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE loc
|
||||||
void
|
void
|
||||||
ProcessTruncateStatement(TruncateStmt *truncateStatement)
|
ProcessTruncateStatement(TruncateStmt *truncateStatement)
|
||||||
{
|
{
|
||||||
|
ListCell *rangeVarCell = NULL;
|
||||||
|
ListCell *relationCell = NULL;
|
||||||
|
List *lockedRelations = NIL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Lock relations while calling next three functions, because they expect
|
||||||
|
* relations to be locked. We release them then to avoid distributed
|
||||||
|
* deadlock in MX.
|
||||||
|
*/
|
||||||
|
foreach(rangeVarCell, truncateStatement->relations)
|
||||||
|
{
|
||||||
|
RangeVar *rangeVar = (RangeVar *) lfirst(rangeVarCell);
|
||||||
|
Relation relation = heap_openrv(rangeVar, AccessShareLock);
|
||||||
|
lockedRelations = lappend(lockedRelations, relation);
|
||||||
|
}
|
||||||
|
|
||||||
ErrorIfUnsupportedTruncateStmt(truncateStatement);
|
ErrorIfUnsupportedTruncateStmt(truncateStatement);
|
||||||
EnsurePartitionTableNotReplicatedForTruncate(truncateStatement);
|
EnsurePartitionTableNotReplicatedForTruncate(truncateStatement);
|
||||||
ExecuteTruncateStmtSequentialIfNecessary(truncateStatement);
|
ExecuteTruncateStmtSequentialIfNecessary(truncateStatement);
|
||||||
|
|
||||||
|
foreach(relationCell, lockedRelations)
|
||||||
|
{
|
||||||
|
Relation relation = (Relation) lfirst(relationCell);
|
||||||
|
heap_close(relation, AccessShareLock);
|
||||||
|
}
|
||||||
|
|
||||||
LockTruncatedRelationMetadataInWorkers(truncateStatement);
|
LockTruncatedRelationMetadataInWorkers(truncateStatement);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,7 +90,7 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement)
|
||||||
foreach(relationCell, relationList)
|
foreach(relationCell, relationList)
|
||||||
{
|
{
|
||||||
RangeVar *rangeVar = (RangeVar *) lfirst(relationCell);
|
RangeVar *rangeVar = (RangeVar *) lfirst(relationCell);
|
||||||
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, true);
|
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false);
|
||||||
char relationKind = get_rel_relkind(relationId);
|
char relationKind = get_rel_relkind(relationId);
|
||||||
if (IsDistributedTable(relationId) &&
|
if (IsDistributedTable(relationId) &&
|
||||||
relationKind == RELKIND_FOREIGN_TABLE)
|
relationKind == RELKIND_FOREIGN_TABLE)
|
||||||
|
@ -93,19 +116,15 @@ EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement)
|
||||||
|
|
||||||
foreach(relationCell, truncateStatement->relations)
|
foreach(relationCell, truncateStatement->relations)
|
||||||
{
|
{
|
||||||
RangeVar *relationRV = (RangeVar *) lfirst(relationCell);
|
RangeVar *rangeVar = (RangeVar *) lfirst(relationCell);
|
||||||
Relation relation = heap_openrv(relationRV, NoLock);
|
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false);
|
||||||
Oid relationId = RelationGetRelid(relation);
|
|
||||||
|
|
||||||
if (!IsDistributedTable(relationId))
|
if (!IsDistributedTable(relationId))
|
||||||
{
|
{
|
||||||
heap_close(relation, NoLock);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
EnsurePartitionTableNotReplicated(relationId);
|
EnsurePartitionTableNotReplicated(relationId);
|
||||||
|
|
||||||
heap_close(relation, NoLock);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -181,22 +200,19 @@ LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement)
|
||||||
|
|
||||||
foreach(relationCell, truncateStatement->relations)
|
foreach(relationCell, truncateStatement->relations)
|
||||||
{
|
{
|
||||||
RangeVar *relationRV = (RangeVar *) lfirst(relationCell);
|
RangeVar *rangeVar = (RangeVar *) lfirst(relationCell);
|
||||||
Relation relation = heap_openrv(relationRV, NoLock);
|
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false);
|
||||||
Oid relationId = RelationGetRelid(relation);
|
|
||||||
DistTableCacheEntry *cacheEntry = NULL;
|
DistTableCacheEntry *cacheEntry = NULL;
|
||||||
List *referencingTableList = NIL;
|
List *referencingTableList = NIL;
|
||||||
ListCell *referencingTableCell = NULL;
|
ListCell *referencingTableCell = NULL;
|
||||||
|
|
||||||
if (!IsDistributedTable(relationId))
|
if (!IsDistributedTable(relationId))
|
||||||
{
|
{
|
||||||
heap_close(relation, NoLock);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (list_member_oid(distributedRelationList, relationId))
|
if (list_member_oid(distributedRelationList, relationId))
|
||||||
{
|
{
|
||||||
heap_close(relation, NoLock);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -212,8 +228,6 @@ LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement)
|
||||||
distributedRelationList = list_append_unique_oid(distributedRelationList,
|
distributedRelationList = list_append_unique_oid(distributedRelationList,
|
||||||
referencingRelationId);
|
referencingRelationId);
|
||||||
}
|
}
|
||||||
|
|
||||||
heap_close(relation, NoLock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (distributedRelationList != NIL)
|
if (distributedRelationList != NIL)
|
||||||
|
|
|
@ -259,6 +259,10 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
shardInterval = LoadShardInterval(shardId);
|
shardInterval = LoadShardInterval(shardId);
|
||||||
relationId = shardInterval->relationId;
|
relationId = shardInterval->relationId;
|
||||||
|
|
||||||
|
/* don't allow the table to be dropped */
|
||||||
|
LockRelationOid(relationId, AccessShareLock);
|
||||||
|
|
||||||
cstoreTable = CStoreTable(relationId);
|
cstoreTable = CStoreTable(relationId);
|
||||||
storageType = shardInterval->storageType;
|
storageType = shardInterval->storageType;
|
||||||
|
|
||||||
|
|
|
@ -222,46 +222,15 @@ PlacementAccessTypeToText(ShardPlacementAccessType accessType)
|
||||||
static void
|
static void
|
||||||
RecordRelationAccessBase(Oid relationId, ShardPlacementAccessType accessType)
|
RecordRelationAccessBase(Oid relationId, ShardPlacementAccessType accessType)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* We call this only for reference tables, and we don't support partitioned
|
||||||
|
* reference tables.
|
||||||
|
*/
|
||||||
|
Assert(!PartitionedTable(relationId) && !PartitionTable(relationId));
|
||||||
|
|
||||||
/* make sure that this is not a conflicting access */
|
/* make sure that this is not a conflicting access */
|
||||||
CheckConflictingRelationAccesses(relationId, accessType);
|
CheckConflictingRelationAccesses(relationId, accessType);
|
||||||
|
|
||||||
/*
|
|
||||||
* If a relation is partitioned, record accesses to all of its partitions as well.
|
|
||||||
* We prefer to use PartitionedTableNoLock() because at this point the necessary
|
|
||||||
* locks on the relation has already been acquired.
|
|
||||||
*/
|
|
||||||
if (PartitionedTableNoLock(relationId))
|
|
||||||
{
|
|
||||||
List *partitionList = PartitionList(relationId);
|
|
||||||
ListCell *partitionCell = NULL;
|
|
||||||
|
|
||||||
foreach(partitionCell, partitionList)
|
|
||||||
{
|
|
||||||
Oid partitionOid = lfirst_oid(partitionCell);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* During create_distributed_table, the partitions may not
|
|
||||||
* have been created yet and so there are no placements yet.
|
|
||||||
* We're already going to register them when we distribute
|
|
||||||
* the partitions.
|
|
||||||
*/
|
|
||||||
if (!IsDistributedTable(partitionOid))
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* recursively call the function to cover multi-level partitioned tables */
|
|
||||||
RecordRelationAccessBase(partitionOid, accessType);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (PartitionTableNoLock(relationId))
|
|
||||||
{
|
|
||||||
Oid parentOid = PartitionParentOid(relationId);
|
|
||||||
|
|
||||||
/* record the parent */
|
|
||||||
RecordPlacementAccessToCache(parentOid, accessType);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* always record the relation that is being considered */
|
/* always record the relation that is being considered */
|
||||||
RecordPlacementAccessToCache(relationId, accessType);
|
RecordPlacementAccessToCache(relationId, accessType);
|
||||||
}
|
}
|
||||||
|
@ -521,12 +490,8 @@ RecordParallelRelationAccess(Oid relationId, ShardPlacementAccessType placementA
|
||||||
/* act accordingly if it's a conflicting access */
|
/* act accordingly if it's a conflicting access */
|
||||||
CheckConflictingParallelRelationAccesses(relationId, placementAccess);
|
CheckConflictingParallelRelationAccesses(relationId, placementAccess);
|
||||||
|
|
||||||
/*
|
/* If a relation is partitioned, record accesses to all of its partitions as well. */
|
||||||
* If a relation is partitioned, record accesses to all of its partitions as well.
|
if (PartitionedTable(relationId))
|
||||||
* We prefer to use PartitionedTableNoLock() because at this point the necessary
|
|
||||||
* locks on the relation has already been acquired.
|
|
||||||
*/
|
|
||||||
if (PartitionedTableNoLock(relationId))
|
|
||||||
{
|
{
|
||||||
List *partitionList = PartitionList(relationId);
|
List *partitionList = PartitionList(relationId);
|
||||||
ListCell *partitionCell = NULL;
|
ListCell *partitionCell = NULL;
|
||||||
|
@ -539,7 +504,7 @@ RecordParallelRelationAccess(Oid relationId, ShardPlacementAccessType placementA
|
||||||
RecordParallelRelationAccess(partitionOid, placementAccess);
|
RecordParallelRelationAccess(partitionOid, placementAccess);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (PartitionTableNoLock(relationId))
|
else if (PartitionTable(relationId))
|
||||||
{
|
{
|
||||||
Oid parentOid = PartitionParentOid(relationId);
|
Oid parentOid = PartitionParentOid(relationId);
|
||||||
|
|
||||||
|
|
|
@ -1059,10 +1059,18 @@ DeleteColocationGroup(uint32 colocationId)
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
if (HeapTupleIsValid(heapTuple))
|
if (HeapTupleIsValid(heapTuple))
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* simple_heap_delete() expects that the caller has at least an
|
||||||
|
* AccessShareLock on replica identity index.
|
||||||
|
*/
|
||||||
|
Relation replicaIndex =
|
||||||
|
index_open(RelationGetReplicaIndex(pgDistColocation),
|
||||||
|
AccessShareLock);
|
||||||
simple_heap_delete(pgDistColocation, &(heapTuple->t_self));
|
simple_heap_delete(pgDistColocation, &(heapTuple->t_self));
|
||||||
|
|
||||||
CitusInvalidateRelcacheByRelid(DistColocationRelationId());
|
CitusInvalidateRelcacheByRelid(DistColocationRelationId());
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
|
heap_close(replicaIndex, AccessShareLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
|
|
|
@ -40,9 +40,15 @@ static char * PartitionBound(Oid partitionId);
|
||||||
bool
|
bool
|
||||||
PartitionedTable(Oid relationId)
|
PartitionedTable(Oid relationId)
|
||||||
{
|
{
|
||||||
Relation rel = heap_open(relationId, AccessShareLock);
|
Relation rel = try_relation_open(relationId, AccessShareLock);
|
||||||
bool partitionedTable = false;
|
bool partitionedTable = false;
|
||||||
|
|
||||||
|
/* don't error out for tables that are dropped */
|
||||||
|
if (rel == NULL)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
|
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
|
||||||
{
|
{
|
||||||
partitionedTable = true;
|
partitionedTable = true;
|
||||||
|
@ -90,9 +96,15 @@ PartitionedTableNoLock(Oid relationId)
|
||||||
bool
|
bool
|
||||||
PartitionTable(Oid relationId)
|
PartitionTable(Oid relationId)
|
||||||
{
|
{
|
||||||
Relation rel = heap_open(relationId, AccessShareLock);
|
Relation rel = try_relation_open(relationId, AccessShareLock);
|
||||||
bool partitionTable = false;
|
bool partitionTable = false;
|
||||||
|
|
||||||
|
/* don't error out for tables that are dropped */
|
||||||
|
if (rel == NULL)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
partitionTable = rel->rd_rel->relispartition;
|
partitionTable = rel->rd_rel->relispartition;
|
||||||
|
|
||||||
/* keep the lock */
|
/* keep the lock */
|
||||||
|
|
|
@ -1321,9 +1321,15 @@ DeleteNodeRow(char *nodeName, int32 nodePort)
|
||||||
HeapTuple heapTuple = NULL;
|
HeapTuple heapTuple = NULL;
|
||||||
SysScanDesc heapScan = NULL;
|
SysScanDesc heapScan = NULL;
|
||||||
ScanKeyData scanKey[2];
|
ScanKeyData scanKey[2];
|
||||||
|
|
||||||
Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
|
Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* simple_heap_delete() expects that the caller has at least an
|
||||||
|
* AccessShareLock on replica identity index.
|
||||||
|
*/
|
||||||
|
Relation replicaIndex = index_open(RelationGetReplicaIndex(pgDistNode),
|
||||||
|
AccessShareLock);
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename,
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename,
|
||||||
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
|
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
|
||||||
ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport,
|
ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport,
|
||||||
|
@ -1350,6 +1356,7 @@ DeleteNodeRow(char *nodeName, int32 nodePort)
|
||||||
/* increment the counter so that next command won't see the row */
|
/* increment the counter so that next command won't see the row */
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
|
|
||||||
|
heap_close(replicaIndex, AccessShareLock);
|
||||||
heap_close(pgDistNode, NoLock);
|
heap_close(pgDistNode, NoLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1409,8 +1409,10 @@ SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass
|
||||||
partitioning_locks | relation | AccessExclusiveLock
|
partitioning_locks | relation | AccessExclusiveLock
|
||||||
partitioning_locks | relation | AccessShareLock
|
partitioning_locks | relation | AccessShareLock
|
||||||
partitioning_locks_2009 | relation | AccessExclusiveLock
|
partitioning_locks_2009 | relation | AccessExclusiveLock
|
||||||
|
partitioning_locks_2009 | relation | AccessShareLock
|
||||||
partitioning_locks_2010 | relation | AccessExclusiveLock
|
partitioning_locks_2010 | relation | AccessExclusiveLock
|
||||||
(4 rows)
|
partitioning_locks_2010 | relation | AccessShareLock
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- test locks on TRUNCATE
|
-- test locks on TRUNCATE
|
||||||
|
|
|
@ -1381,8 +1381,10 @@ SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass
|
||||||
partitioning_locks | relation | AccessExclusiveLock
|
partitioning_locks | relation | AccessExclusiveLock
|
||||||
partitioning_locks | relation | AccessShareLock
|
partitioning_locks | relation | AccessShareLock
|
||||||
partitioning_locks_2009 | relation | AccessExclusiveLock
|
partitioning_locks_2009 | relation | AccessExclusiveLock
|
||||||
|
partitioning_locks_2009 | relation | AccessShareLock
|
||||||
partitioning_locks_2010 | relation | AccessExclusiveLock
|
partitioning_locks_2010 | relation | AccessExclusiveLock
|
||||||
(4 rows)
|
partitioning_locks_2010 | relation | AccessShareLock
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- test locks on TRUNCATE
|
-- test locks on TRUNCATE
|
||||||
|
@ -1394,10 +1396,12 @@ SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass
|
||||||
partitioning_locks | relation | AccessExclusiveLock
|
partitioning_locks | relation | AccessExclusiveLock
|
||||||
partitioning_locks | relation | AccessShareLock
|
partitioning_locks | relation | AccessShareLock
|
||||||
partitioning_locks_2009 | relation | AccessExclusiveLock
|
partitioning_locks_2009 | relation | AccessExclusiveLock
|
||||||
|
partitioning_locks_2009 | relation | AccessShareLock
|
||||||
partitioning_locks_2009 | relation | ShareLock
|
partitioning_locks_2009 | relation | ShareLock
|
||||||
partitioning_locks_2010 | relation | AccessExclusiveLock
|
partitioning_locks_2010 | relation | AccessExclusiveLock
|
||||||
|
partitioning_locks_2010 | relation | AccessShareLock
|
||||||
partitioning_locks_2010 | relation | ShareLock
|
partitioning_locks_2010 | relation | ShareLock
|
||||||
(6 rows)
|
(8 rows)
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- test shard resource locks with multi-shard UPDATE
|
-- test shard resource locks with multi-shard UPDATE
|
||||||
|
|
|
@ -1409,8 +1409,10 @@ SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass
|
||||||
partitioning_locks | relation | AccessExclusiveLock
|
partitioning_locks | relation | AccessExclusiveLock
|
||||||
partitioning_locks | relation | AccessShareLock
|
partitioning_locks | relation | AccessShareLock
|
||||||
partitioning_locks_2009 | relation | AccessExclusiveLock
|
partitioning_locks_2009 | relation | AccessExclusiveLock
|
||||||
|
partitioning_locks_2009 | relation | AccessShareLock
|
||||||
partitioning_locks_2010 | relation | AccessExclusiveLock
|
partitioning_locks_2010 | relation | AccessExclusiveLock
|
||||||
(4 rows)
|
partitioning_locks_2010 | relation | AccessShareLock
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- test locks on TRUNCATE
|
-- test locks on TRUNCATE
|
||||||
|
@ -1422,10 +1424,12 @@ SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass
|
||||||
partitioning_locks | relation | AccessExclusiveLock
|
partitioning_locks | relation | AccessExclusiveLock
|
||||||
partitioning_locks | relation | AccessShareLock
|
partitioning_locks | relation | AccessShareLock
|
||||||
partitioning_locks_2009 | relation | AccessExclusiveLock
|
partitioning_locks_2009 | relation | AccessExclusiveLock
|
||||||
|
partitioning_locks_2009 | relation | AccessShareLock
|
||||||
partitioning_locks_2009 | relation | ShareLock
|
partitioning_locks_2009 | relation | ShareLock
|
||||||
partitioning_locks_2010 | relation | AccessExclusiveLock
|
partitioning_locks_2010 | relation | AccessExclusiveLock
|
||||||
|
partitioning_locks_2010 | relation | AccessShareLock
|
||||||
partitioning_locks_2010 | relation | ShareLock
|
partitioning_locks_2010 | relation | ShareLock
|
||||||
(6 rows)
|
(8 rows)
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- test shard resource locks with multi-shard UPDATE
|
-- test shard resource locks with multi-shard UPDATE
|
||||||
|
|
Loading…
Reference in New Issue