diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 26a905f23..26e7e4b9e 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -60,6 +60,7 @@ #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/shared_library_init.h" +#include "distributed/shard_rebalancer.h" #include "distributed/worker_protocol.h" #include "distributed/worker_shard_visibility.h" #include "distributed/worker_transaction.h" @@ -851,6 +852,13 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount, if (colocatedTableId != InvalidOid) { + /* + * If there is a concurrent shard move on this + * colocation group fail this backend. But, allow + * concurrent colocated table creation. + */ + AcquireColocationLock(colocatedTableId, ShareLock, "colocate distributed table"); + CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection); } else diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index f29f0a75a..7e6d3d434 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -37,6 +37,7 @@ #include "distributed/reference_table_utils.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" +#include "distributed/shard_rebalancer.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" @@ -318,11 +319,12 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) ErrorIfMoveUnsupportedTableType(relationId); ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort); + AcquireColocationLock(relationId, ExclusiveLock, "move placement"); + ShardInterval *shardInterval = LoadShardInterval(shardId); Oid distributedTableId = shardInterval->relationId; List *colocatedTableList = ColocatedTableList(distributedTableId); - List *colocatedShardList = ColocatedShardIntervalList(shardInterval); foreach(colocatedTableCell, colocatedTableList) { @@ -351,6 +353,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) } /* we sort colocatedShardList so that lock operations will not cause any deadlocks */ + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); foreach(colocatedShardCell, colocatedShardList) { diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 60cb39e3f..75d4cc6c9 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -230,7 +230,6 @@ static float4 NodeCapacity(WorkerNode *workerNode, void *context); static ShardCost GetShardCost(uint64 shardId, void *context); static List * NonColocatedDistRelationIdList(void); static void RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid); -static void AcquireColocationLock(Oid relationId, const char *operationName); static void ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, char *noticeOperation); static float4 CalculateUtilization(float4 totalCost, float4 capacity); @@ -621,11 +620,11 @@ GetColocatedRebalanceSteps(List *placementUpdateList) /* * AcquireColocationLock tries to acquire a lock for rebalance/replication. If * this is it not possible it fails instantly because this means another - * rebalance/replication is currently happening. This would really mess up - * planning. + * rebalance/replication/colocated table creation is currently happening. + * This would really mess up planning, could even cause lost placements. */ -static void -AcquireColocationLock(Oid relationId, const char *operationName) +void +AcquireColocationLock(Oid relationId, int lockmode, const char *operationName) { uint32 lockId = relationId; LOCKTAG tag; @@ -638,12 +637,17 @@ AcquireColocationLock(Oid relationId, const char *operationName) SET_LOCKTAG_REBALANCE_COLOCATION(tag, (int64) lockId); - LockAcquireResult lockAcquired = LockAcquire(&tag, ExclusiveLock, false, true); + LockAcquireResult lockAcquired = LockAcquire(&tag, lockmode, false, true); if (!lockAcquired) { ereport(ERROR, (errmsg("could not acquire the lock required to %s %s", operationName, generate_qualified_relation_name( - relationId)))); + relationId)), + errdetail("It means that either a concurrent shard move " + "or colocated distributed table creation is " + "happening."), + errhint("Make sure that the concurrent operation has " + "finished and re-run the command"))); } } @@ -945,7 +949,7 @@ replicate_table_shards(PG_FUNCTION_ARGS) char transferMode = LookupShardTransferMode(shardReplicationModeOid); EnsureReferenceTablesExistOnAllNodesExtended(transferMode); - AcquireColocationLock(relationId, "replicate"); + AcquireColocationLock(relationId, ExclusiveLock, "replicate"); List *activeWorkerList = SortedActiveWorkers(); List *shardPlacementList = FullShardPlacementList(relationId, excludedShardArray); @@ -1558,7 +1562,7 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) foreach_oid(relationId, options->relationIdList) { - AcquireColocationLock(relationId, operationName); + AcquireColocationLock(relationId, ExclusiveLock, operationName); } List *placementUpdateList = GetRebalanceSteps(options); diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index ed13248c3..eb1fb7684 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -194,6 +194,8 @@ extern List * RebalancePlacementUpdates(List *workerNodeList, extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList, int shardReplicationFactor); extern void ExecuteRebalancerCommandInSeparateTransaction(char *command); +extern void AcquireColocationLock(Oid relationId, int lockmode, + const char *operationName); #endif /* SHARD_REBALANCER_H */