mirror of https://github.com/citusdata/citus.git
Fix locking in create_distributed_table
parent
7c65001e23
commit
53584affa8
|
@ -311,14 +311,6 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
|
||||||
/* we need to calculate these variables before creating distributed metadata */
|
/* we need to calculate these variables before creating distributed metadata */
|
||||||
localTableEmpty = LocalTableEmpty(relationId);
|
localTableEmpty = LocalTableEmpty(relationId);
|
||||||
colocatedTableId = ColocatedTableId(colocationId);
|
colocatedTableId = ColocatedTableId(colocationId);
|
||||||
if (colocatedTableId != InvalidOid)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* We take lock on colocatedTableId, because we want to ensure that colocated
|
|
||||||
* table is not dropped until we create all colocated shards.
|
|
||||||
*/
|
|
||||||
colocatedRelation = relation_open(colocatedTableId, AccessShareLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* create an entry for distributed table in pg_dist_partition */
|
/* create an entry for distributed table in pg_dist_partition */
|
||||||
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
|
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
|
||||||
|
@ -530,6 +522,7 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
|
||||||
Relation pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
|
Relation pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
|
||||||
|
|
||||||
Oid distributionColumnType = distributionColumn->vartype;
|
Oid distributionColumnType = distributionColumn->vartype;
|
||||||
|
bool createdColocationGroup = false;
|
||||||
|
|
||||||
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0)
|
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0)
|
||||||
{
|
{
|
||||||
|
@ -541,11 +534,14 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
|
||||||
{
|
{
|
||||||
colocationId = CreateColocationGroup(ShardCount, ShardReplicationFactor,
|
colocationId = CreateColocationGroup(ShardCount, ShardReplicationFactor,
|
||||||
distributionColumnType);
|
distributionColumnType);
|
||||||
|
createdColocationGroup = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) == 0)
|
else if (pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) == 0)
|
||||||
{
|
{
|
||||||
colocationId = GetNextColocationId();
|
colocationId = GetNextColocationId();
|
||||||
|
|
||||||
|
createdColocationGroup = true;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -558,7 +554,22 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
|
||||||
colocationId = TableColocationId(sourceRelationId);
|
colocationId = TableColocationId(sourceRelationId);
|
||||||
}
|
}
|
||||||
|
|
||||||
heap_close(pgDistColocation, NoLock);
|
/*
|
||||||
|
* If we created a new colocation group then we need to keep the lock to
|
||||||
|
* prevent a concurrent create_distributed_table call from creating another
|
||||||
|
* colocation group with the same parameters. If we're using an existing
|
||||||
|
* colocation group then other transactions will use the same one.
|
||||||
|
*/
|
||||||
|
if (createdColocationGroup)
|
||||||
|
{
|
||||||
|
/* keep the exclusive lock */
|
||||||
|
heap_close(pgDistColocation, NoLock);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* release the exclusive lock */
|
||||||
|
heap_close(pgDistColocation, ExclusiveLock);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return colocationId;
|
return colocationId;
|
||||||
|
|
|
@ -31,6 +31,7 @@
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
|
#include "storage/lmgr.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/fmgroids.h"
|
#include "utils/fmgroids.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
|
@ -929,6 +930,9 @@ ColocatedShardIntervalList(ShardInterval *shardInterval)
|
||||||
/*
|
/*
|
||||||
* ColocatedTableId returns an arbitrary table which belongs to given colocation
|
* ColocatedTableId returns an arbitrary table which belongs to given colocation
|
||||||
* group. If there is not such a colocation group, it returns invalid oid.
|
* group. If there is not such a colocation group, it returns invalid oid.
|
||||||
|
*
|
||||||
|
* This function also takes an AccessShareLock on the co-colocated table to
|
||||||
|
* guarantee that the table isn't dropped for the remainder of the transaction.
|
||||||
*/
|
*/
|
||||||
Oid
|
Oid
|
||||||
ColocatedTableId(Oid colocationId)
|
ColocatedTableId(Oid colocationId)
|
||||||
|
@ -955,7 +959,7 @@ ColocatedTableId(Oid colocationId)
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid,
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid,
|
||||||
BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId));
|
BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId));
|
||||||
|
|
||||||
/* prevent DELETE statements */
|
/* do not allow any tables to be dropped while we read from pg_dist_partition */
|
||||||
pgDistPartition = heap_open(DistPartitionRelationId(), ShareLock);
|
pgDistPartition = heap_open(DistPartitionRelationId(), ShareLock);
|
||||||
tupleDescriptor = RelationGetDescr(pgDistPartition);
|
tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
scanDescriptor = systable_beginscan(pgDistPartition,
|
scanDescriptor = systable_beginscan(pgDistPartition,
|
||||||
|
@ -967,6 +971,9 @@ ColocatedTableId(Oid colocationId)
|
||||||
{
|
{
|
||||||
colocatedTableId = heap_getattr(heapTuple, Anum_pg_dist_partition_logicalrelid,
|
colocatedTableId = heap_getattr(heapTuple, Anum_pg_dist_partition_logicalrelid,
|
||||||
tupleDescriptor, &isNull);
|
tupleDescriptor, &isNull);
|
||||||
|
|
||||||
|
/* make sure the table isn't dropped for the remainder of the transaction */
|
||||||
|
LockRelationOid(colocatedTableId, AccessShareLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
|
|
Loading…
Reference in New Issue