diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index f7cff4197..f5ae0fd7e 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -59,6 +59,7 @@ #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" +#include "distributed/resource_lock.h" #include "distributed/shared_library_init.h" #include "distributed/shard_rebalancer.h" #include "distributed/worker_protocol.h" @@ -471,9 +472,22 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName, /* * Make sure that existing reference tables have been replicated to all the nodes * such that we can create foreign keys and joins work immediately after creation. + * + * This will take a lock on the nodes to make sure no nodes are added after we have + * verified and ensured the reference tables are copied everywhere. + * Although copying reference tables here for anything but creating a new colocation + * group, it requires significant refactoring which we don't want to perform now. */ EnsureReferenceTablesExistOnAllNodes(); + /* + * While adding tables to a colocation group we need to make sure no concurrent + * mutations happen on the colocation group with regards to its placements. It is + * important that we have already copied any reference tables before acquiring this + * lock as these are competing operations. + */ + LockColocationId(colocationId, ShareLock); + /* we need to calculate these variables before creating distributed metadata */ bool localTableEmpty = TableEmpty(relationId); Oid colocatedTableId = ColocatedTableId(colocationId); @@ -513,7 +527,7 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName, * This function does not expect to create Citus local table, so we blindly * create reference table when the method is DISTRIBUTE_BY_NONE. */ - CreateReferenceTableShard(relationId); + CreateReferenceTableShard(relationId, colocatedTableId); } if (ShouldSyncTableMetadata(relationId)) diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 1cf1e54a5..e0ffa37f5 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -153,12 +153,12 @@ PreprocessDropTableStmt(Node *node, const char *queryString, continue; } - if (IsCitusTableType(relationId, REFERENCE_TABLE)) - { - /* prevent concurrent EnsureReferenceTablesExistOnAllNodes */ - int colocationId = CreateReferenceTableColocationId(); - LockColocationId(colocationId, ExclusiveLock); - } + /* + * While changing the tables that are part of a colocation group we need to + * prevent concurrent mutations to the placements of the shard groups. + */ + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); + LockColocationId(cacheEntry->colocationId, ShareLock); /* invalidate foreign key cache if the table involved in any foreign key */ if ((TableReferenced(relationId) || TableReferencing(relationId))) diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index b9841dabf..53c43cc46 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -326,7 +326,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool * Also, the shard is replicated to the all active nodes in the cluster. */ void -CreateReferenceTableShard(Oid distributedTableId) +CreateReferenceTableShard(Oid distributedTableId, Oid colocatedTableId) { int workerStartIndex = 0; text *shardMinValue = NULL; @@ -365,7 +365,7 @@ CreateReferenceTableShard(Oid distributedTableId) List *nodeList = ReferenceTablePlacementNodeList(ShareLock); nodeList = SortList(nodeList, CompareWorkerNodes); - int replicationFactor = ReferenceTableReplicationFactor(); + int replicationFactor = list_length(nodeList); /* get the next shard id */ uint64 shardId = GetNextShardId(); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 0ea02fe2c..dd51c1992 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -95,56 +95,83 @@ EnsureReferenceTablesExistOnAllNodes(void) void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) { - /* - * Prevent this function from running concurrently with itself. - * - * It also prevents concurrent DROP TABLE or DROP SCHEMA. We need this - * because through-out this function we assume values in referenceTableIdList - * are still valid. - * - * We don't need to handle other kinds of reference table DML/DDL here, since - * master_copy_shard_placement gets enough locks for that. - * - * We also don't need special handling for concurrent create_refernece_table. - * Since that will trigger a call to this function from another backend, - * which will block until our call is finished. - */ - int colocationId = CreateReferenceTableColocationId(); - LockColocationId(colocationId, ExclusiveLock); + List *referenceTableIdList = NIL; + uint64 shardId = INVALID_SHARD_ID; + List *newWorkersList = NIL; + const char *referenceTableName = NULL; + int colocationId = GetReferenceTableColocationId(); - List *referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE); - if (referenceTableIdList == NIL) + if (colocationId == INVALID_COLOCATION_ID) { - /* no reference tables exist */ - UnlockColocationId(colocationId, ExclusiveLock); + /* no colocation for reference tables available */ return; } - Oid referenceTableId = linitial_oid(referenceTableIdList); - const char *referenceTableName = get_rel_name(referenceTableId); - List *shardIntervalList = LoadShardIntervalList(referenceTableId); - if (list_length(shardIntervalList) == 0) - { - /* check for corrupt metadata */ - ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", - referenceTableName))); - } - - ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); - uint64 shardId = shardInterval->shardId; - /* - * We only take an access share lock, otherwise we'll hold up citus_add_node. - * In case of create_reference_table() where we don't want concurrent writes - * to pg_dist_node, we have already acquired ShareLock on pg_dist_node. + * Most of the time this function should result in a conclusion where we do not need + * to copy any reference tables. To prevent excessive locking the majority of the time + * we run our precondition checks first with a lower lock. If, after checking with the + * lower lock, that we might need to copy reference tables we check with a more + * aggressive and self conflicting lock. It is important to be self conflicting in the + * second run to make sure that two concurrent calls to this routine will actually not + * run concurrently after the initial check. That is also the reason why we release + * the lock between the two iterations of precondition checks. + * + * If after two iterations of precondition checks we still find the need for copying + * reference tables we exit the forloop with the last lock held. This will prevent + * concurrent DROP TABLE and create_reference_table calls so that the list of + * reference tables we operate on are stable. + * + * Since the changes to the reference table placements are made via loopback + * connections we release the final lock held at the end of this function. */ - List *newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, - AccessShareLock); - if (list_length(newWorkersList) == 0) + LOCKMODE lockmodes[] = { AccessShareLock, ExclusiveLock }; + for (int l = 0; l < lengthof(lockmodes); l++) { - /* nothing to do, no need for lock */ - UnlockColocationId(colocationId, ExclusiveLock); - return; + LockColocationId(colocationId, lockmodes[l]); + + referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE); + if (referenceTableIdList == NIL) + { + /* no reference tables exist */ + for (int ll = l; ll >= 0; ll--) + { + UnlockColocationId(colocationId, lockmodes[ll]); + } + return; + } + + Oid referenceTableId = linitial_oid(referenceTableIdList); + referenceTableName = get_rel_name(referenceTableId); + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + if (list_length(shardIntervalList) == 0) + { + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", + referenceTableName))); + } + + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + shardId = shardInterval->shardId; + + /* + * We only take an access share lock, otherwise we'll hold up citus_add_node. + * In case of create_reference_table() where we don't want concurrent writes + * to pg_dist_node, we have already acquired ShareLock on pg_dist_node. + */ + newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, AccessShareLock); + if (list_length(newWorkersList) == 0) + { + /* + * nothing to do, no need for lock, however we need to release all earlier + * locks as well. + */ + for (int ll = l; ll >= 0; ll--) + { + UnlockColocationId(colocationId, lockmodes[ll]); + } + return; + } } /* @@ -233,11 +260,11 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) CloseConnection(connection); } - /* - * Unblock other backends, they will probably observe that there are no - * more worker nodes without placements, unless nodes were added concurrently - */ - UnlockColocationId(colocationId, ExclusiveLock); + /* release all the locks we acquired for the above operations */ + for (int ll = lengthof(lockmodes) - 1; ll >= 0; ll--) + { + UnlockColocationId(colocationId, lockmodes[ll]); + } } @@ -424,6 +451,28 @@ CreateReferenceTableColocationId() } +uint32 +GetReferenceTableColocationId() +{ + int shardCount = 1; + Oid distributionColumnType = InvalidOid; + Oid distributionColumnCollation = InvalidOid; + + /* + * We don't maintain replication factor of reference tables anymore and + * just use -1 instead. We don't use this value in any places. + */ + int replicationFactor = -1; + + /* check for existing colocations */ + uint32 colocationId = + ColocationId(shardCount, replicationFactor, distributionColumnType, + distributionColumnCollation); + + return colocationId; +} + + /* * DeleteAllReplicatedTablePlacementsFromNodeGroup function iterates over * list of reference and replicated hash distributed tables and deletes @@ -528,19 +577,6 @@ CompareOids(const void *leftElement, const void *rightElement) } -/* - * ReferenceTableReplicationFactor returns the replication factor for - * reference tables. - */ -int -ReferenceTableReplicationFactor(void) -{ - List *nodeList = ReferenceTablePlacementNodeList(NoLock); - int replicationFactor = list_length(nodeList); - return replicationFactor; -} - - /* * ReplicateAllReferenceTablesToNode function finds all reference tables and * replicates them to the given worker node. It also modifies pg_dist_colocation @@ -551,6 +587,16 @@ ReferenceTableReplicationFactor(void) void ReplicateAllReferenceTablesToNode(WorkerNode *workerNode) { + int colocationId = GetReferenceTableColocationId(); + if (colocationId == INVALID_COLOCATION_ID) + { + /* no reference tables in system */ + return; + } + + /* prevent changes in table set while replicating reference tables */ + LockColocationId(colocationId, RowExclusiveLock); + List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE); /* if there is no reference table, we do not need to replicate anything */ diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index 501f5c233..d41d16a69 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -249,7 +249,7 @@ extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shard bool useExclusiveConnections); extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool useExclusiveConnections); -extern void CreateReferenceTableShard(Oid distributedTableId); +extern void CreateReferenceTableShard(Oid distributedTableId, Oid colocatedTableId); extern List * WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId, List *ddlCommandList, List *foreignConstraintCommandList); diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index e8969eec0..a26f16630 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -21,10 +21,10 @@ extern void EnsureReferenceTablesExistOnAllNodes(void); extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode); extern uint32 CreateReferenceTableColocationId(void); +extern uint32 GetReferenceTableColocationId(void); extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly); extern int CompareOids(const void *leftElement, const void *rightElement); -extern int ReferenceTableReplicationFactor(void); extern void ReplicateAllReferenceTablesToNode(WorkerNode *workerNode); #endif /* REFERENCE_TABLE_UTILS_H_ */