diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index f601237de..48b5b069d 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -311,14 +311,6 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio /* we need to calculate these variables before creating distributed metadata */ localTableEmpty = LocalTableEmpty(relationId); colocatedTableId = ColocatedTableId(colocationId); - if (colocatedTableId != InvalidOid) - { - /* - * We take lock on colocatedTableId, because we want to ensure that colocated - * table is not dropped until we create all colocated shards. - */ - colocatedRelation = relation_open(colocatedTableId, AccessShareLock); - } /* create an entry for distributed table in pg_dist_partition */ InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn, @@ -530,6 +522,7 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn, Relation pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); Oid distributionColumnType = distributionColumn->vartype; + bool createdColocationGroup = false; if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0) { @@ -541,11 +534,14 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn, { colocationId = CreateColocationGroup(ShardCount, ShardReplicationFactor, distributionColumnType); + createdColocationGroup = true; } } else if (pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) == 0) { colocationId = GetNextColocationId(); + + createdColocationGroup = true; } else { @@ -558,7 +554,22 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn, colocationId = TableColocationId(sourceRelationId); } - heap_close(pgDistColocation, NoLock); + /* + * If we created a new colocation group then we need to keep the lock to + * prevent a concurrent create_distributed_table call from creating another + * colocation group with the same parameters. If we're using an existing + * colocation group then other transactions will use the same one. + */ + if (createdColocationGroup) + { + /* keep the exclusive lock */ + heap_close(pgDistColocation, NoLock); + } + else + { + /* release the exclusive lock */ + heap_close(pgDistColocation, ExclusiveLock); + } } return colocationId; diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 7a494f753..076c29d54 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -31,6 +31,7 @@ #include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" +#include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" @@ -929,6 +930,9 @@ ColocatedShardIntervalList(ShardInterval *shardInterval) /* * ColocatedTableId returns an arbitrary table which belongs to given colocation * group. If there is not such a colocation group, it returns invalid oid. + * + * This function also takes an AccessShareLock on the co-colocated table to + * guarantee that the table isn't dropped for the remainder of the transaction. */ Oid ColocatedTableId(Oid colocationId) @@ -955,7 +959,7 @@ ColocatedTableId(Oid colocationId) ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid, BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId)); - /* prevent DELETE statements */ + /* do not allow any tables to be dropped while we read from pg_dist_partition */ pgDistPartition = heap_open(DistPartitionRelationId(), ShareLock); tupleDescriptor = RelationGetDescr(pgDistPartition); scanDescriptor = systable_beginscan(pgDistPartition, @@ -967,6 +971,9 @@ ColocatedTableId(Oid colocationId) { colocatedTableId = heap_getattr(heapTuple, Anum_pg_dist_partition_logicalrelid, tupleDescriptor, &isNull); + + /* make sure the table isn't dropped for the remainder of the transaction */ + LockRelationOid(colocatedTableId, AccessShareLock); } systable_endscan(scanDescriptor);