Avoid deadlock in ColocatedTableId

pull/1856/head
Marco Slot 2017-12-05 13:10:54 +01:00
parent 75eff340e1
commit eab15aa035
1 changed files with 26 additions and 5 deletions

View File

@ -963,25 +963,46 @@ ColocatedTableId(Oid colocationId)
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid,
BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId));
/* do not allow any tables to be dropped while we read from pg_dist_partition */
pgDistPartition = heap_open(DistPartitionRelationId(), ShareLock);
pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
tupleDescriptor = RelationGetDescr(pgDistPartition);
scanDescriptor = systable_beginscan(pgDistPartition,
DistPartitionColocationidIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
while (HeapTupleIsValid(heapTuple))
{
Relation colocatedRelation = NULL;
colocatedTableId = heap_getattr(heapTuple, Anum_pg_dist_partition_logicalrelid,
tupleDescriptor, &isNull);
/* make sure the table isn't dropped for the remainder of the transaction */
/*
* Make sure the relation isn't dropped for the remainder of
* the transaction.
*/
LockRelationOid(colocatedTableId, AccessShareLock);
/*
* The relation might have been dropped just before we locked it.
* Let's look it up.
*/
colocatedRelation = RelationIdGetRelation(colocatedTableId);
if (RelationIsValid(colocatedRelation))
{
/* relation still exists, we can use it */
RelationClose(colocatedRelation);
break;
}
/* relation was dropped, try the next one */
colocatedTableId = InvalidOid;
heapTuple = systable_getnext(scanDescriptor);
}
systable_endscan(scanDescriptor);
heap_close(pgDistPartition, ShareLock);
heap_close(pgDistPartition, AccessShareLock);
return colocatedTableId;
}