diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index f7cff4197..793b0086a 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -59,6 +59,7 @@ #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" +#include "distributed/resource_lock.h" #include "distributed/shared_library_init.h" #include "distributed/shard_rebalancer.h" #include "distributed/worker_protocol.h" @@ -471,9 +472,22 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName, /* * Make sure that existing reference tables have been replicated to all the nodes * such that we can create foreign keys and joins work immediately after creation. + * + * This will take a lock on the nodes to make sure no nodes are added after we have + * verified and ensured the reference tables are copied everywhere. + * Although copying reference tables here for anything but creating a new colocation + * group, it requires significant refactoring which we don't want to perform now. */ EnsureReferenceTablesExistOnAllNodes(); + /* + * While adding tables to a colocation group we need to make sure no concurrent + * mutations happen on the colocation group with regards to its placements. It is + * important that we have already copied any reference tables before acquiring this + * lock as these are competing operations. + */ + LockColocationId(colocationId, ShareLock); + /* we need to calculate these variables before creating distributed metadata */ bool localTableEmpty = TableEmpty(relationId); Oid colocatedTableId = ColocatedTableId(colocationId); diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 1cf1e54a5..f1d27aa22 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -153,11 +153,14 @@ PreprocessDropTableStmt(Node *node, const char *queryString, continue; } - if (IsCitusTableType(relationId, REFERENCE_TABLE)) + /* + * While changing the tables that are part of a colocation group we need to + * prevent concurrent mutations to the placements of the shard groups. + */ + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); + if (cacheEntry->colocationId != INVALID_COLOCATION_ID) { - /* prevent concurrent EnsureReferenceTablesExistOnAllNodes */ - int colocationId = CreateReferenceTableColocationId(); - LockColocationId(colocationId, ExclusiveLock); + LockColocationId(cacheEntry->colocationId, ShareLock); } /* invalidate foreign key cache if the table involved in any foreign key */ diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index b9841dabf..cc75c8171 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -365,7 +365,7 @@ CreateReferenceTableShard(Oid distributedTableId) List *nodeList = ReferenceTablePlacementNodeList(ShareLock); nodeList = SortList(nodeList, CompareWorkerNodes); - int replicationFactor = ReferenceTableReplicationFactor(); + int replicationFactor = list_length(nodeList); /* get the next shard id */ uint64 shardId = GetNextShardId(); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 0ea02fe2c..43b89f7a7 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -64,8 +64,17 @@ replicate_reference_tables(PG_FUNCTION_ARGS) { Oid shardReplicationModeOid = PG_GETARG_OID(0); char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); + + /* to prevent concurrent node additions while copying reference tables */ + LockRelationOid(DistNodeRelationId(), ShareLock); EnsureReferenceTablesExistOnAllNodesExtended(shardReplicationMode); + /* + * Given the copying of reference tables and updating metadata have been done via a + * loopback connection we do not have to retain the lock on pg_dist_node anymore. + */ + UnlockRelationOid(DistNodeRelationId(), ShareLock); + PG_RETURN_VOID(); } @@ -95,56 +104,85 @@ EnsureReferenceTablesExistOnAllNodes(void) void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) { - /* - * Prevent this function from running concurrently with itself. - * - * It also prevents concurrent DROP TABLE or DROP SCHEMA. We need this - * because through-out this function we assume values in referenceTableIdList - * are still valid. - * - * We don't need to handle other kinds of reference table DML/DDL here, since - * master_copy_shard_placement gets enough locks for that. - * - * We also don't need special handling for concurrent create_refernece_table. - * Since that will trigger a call to this function from another backend, - * which will block until our call is finished. - */ + List *referenceTableIdList = NIL; + uint64 shardId = INVALID_SHARD_ID; + List *newWorkersList = NIL; + const char *referenceTableName = NULL; int colocationId = CreateReferenceTableColocationId(); - LockColocationId(colocationId, ExclusiveLock); - - List *referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE); - if (referenceTableIdList == NIL) - { - /* no reference tables exist */ - UnlockColocationId(colocationId, ExclusiveLock); - return; - } - - Oid referenceTableId = linitial_oid(referenceTableIdList); - const char *referenceTableName = get_rel_name(referenceTableId); - List *shardIntervalList = LoadShardIntervalList(referenceTableId); - if (list_length(shardIntervalList) == 0) - { - /* check for corrupt metadata */ - ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", - referenceTableName))); - } - - ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); - uint64 shardId = shardInterval->shardId; /* - * We only take an access share lock, otherwise we'll hold up citus_add_node. - * In case of create_reference_table() where we don't want concurrent writes - * to pg_dist_node, we have already acquired ShareLock on pg_dist_node. + * Most of the time this function should result in a conclusion where we do not need + * to copy any reference tables. To prevent excessive locking the majority of the time + * we run our precondition checks first with a lower lock. If, after checking with the + * lower lock, that we might need to copy reference tables we check with a more + * aggressive and self conflicting lock. It is important to be self conflicting in the + * second run to make sure that two concurrent calls to this routine will actually not + * run concurrently after the initial check. + * + * If after two iterations of precondition checks we still find the need for copying + * reference tables we exit the loop with all locks held. This will prevent concurrent + * DROP TABLE and create_reference_table calls so that the list of reference tables we + * operate on are stable. + * + * Since the changes to the reference table placements are made via loopback + * connections we release the locks held at the end of this function. Due to Citus + * only running transactions in READ COMMITTED mode we can be sure that other + * transactions correctly find the metadata entries. */ - List *newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, - AccessShareLock); - if (list_length(newWorkersList) == 0) + LOCKMODE lockmodes[] = { AccessShareLock, ExclusiveLock }; + for (int lockmodeIndex = 0; lockmodeIndex < lengthof(lockmodes); lockmodeIndex++) { - /* nothing to do, no need for lock */ - UnlockColocationId(colocationId, ExclusiveLock); - return; + LockColocationId(colocationId, lockmodes[lockmodeIndex]); + + referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE); + if (referenceTableIdList == NIL) + { + /* + * No reference tables exist, make sure that any locks obtained earlier are + * released. It will probably not matter, but we release the locks in the + * reverse order we obtained them in. + */ + for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return; + } + + Oid referenceTableId = linitial_oid(referenceTableIdList); + referenceTableName = get_rel_name(referenceTableId); + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + if (list_length(shardIntervalList) == 0) + { + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", + referenceTableName))); + } + + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + shardId = shardInterval->shardId; + + /* + * We only take an access share lock, otherwise we'll hold up citus_add_node. + * In case of create_reference_table() where we don't want concurrent writes + * to pg_dist_node, we have already acquired ShareLock on pg_dist_node. + */ + newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, AccessShareLock); + if (list_length(newWorkersList) == 0) + { + /* + * All workers alreaddy have a copy of the reference tables, make sure that + * any locks obtained earlier are released. It will probably not matter, but + * we release the locks in the reverse order we obtained them in. + */ + for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return; + } } /* @@ -234,10 +272,17 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) } /* - * Unblock other backends, they will probably observe that there are no - * more worker nodes without placements, unless nodes were added concurrently + * Since reference tables have been copied via a loopback connection we do not have to + * retain our locks. Since Citus only runs well in READ COMMITTED mode we can be sure + * that other transactions will find the reference tables copied. + * We have obtained and held multiple locks, here we unlock them all in the reverse + * order we have obtained them in. */ - UnlockColocationId(colocationId, ExclusiveLock); + for (int releaseLockmodeIndex = lengthof(lockmodes) - 1; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } } @@ -424,6 +469,28 @@ CreateReferenceTableColocationId() } +uint32 +GetReferenceTableColocationId() +{ + int shardCount = 1; + Oid distributionColumnType = InvalidOid; + Oid distributionColumnCollation = InvalidOid; + + /* + * We don't maintain replication factor of reference tables anymore and + * just use -1 instead. We don't use this value in any places. + */ + int replicationFactor = -1; + + /* check for existing colocations */ + uint32 colocationId = + ColocationId(shardCount, replicationFactor, distributionColumnType, + distributionColumnCollation); + + return colocationId; +} + + /* * DeleteAllReplicatedTablePlacementsFromNodeGroup function iterates over * list of reference and replicated hash distributed tables and deletes @@ -528,19 +595,6 @@ CompareOids(const void *leftElement, const void *rightElement) } -/* - * ReferenceTableReplicationFactor returns the replication factor for - * reference tables. - */ -int -ReferenceTableReplicationFactor(void) -{ - List *nodeList = ReferenceTablePlacementNodeList(NoLock); - int replicationFactor = list_length(nodeList); - return replicationFactor; -} - - /* * ReplicateAllReferenceTablesToNode function finds all reference tables and * replicates them to the given worker node. It also modifies pg_dist_colocation @@ -551,6 +605,16 @@ ReferenceTableReplicationFactor(void) void ReplicateAllReferenceTablesToNode(WorkerNode *workerNode) { + int colocationId = GetReferenceTableColocationId(); + if (colocationId == INVALID_COLOCATION_ID) + { + /* no reference tables in system */ + return; + } + + /* prevent changes in table set while replicating reference tables */ + LockColocationId(colocationId, RowExclusiveLock); + List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE); /* if there is no reference table, we do not need to replicate anything */ diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index e8969eec0..a26f16630 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -21,10 +21,10 @@ extern void EnsureReferenceTablesExistOnAllNodes(void); extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode); extern uint32 CreateReferenceTableColocationId(void); +extern uint32 GetReferenceTableColocationId(void); extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly); extern int CompareOids(const void *leftElement, const void *rightElement); -extern int ReferenceTableReplicationFactor(void); extern void ReplicateAllReferenceTablesToNode(WorkerNode *workerNode); #endif /* REFERENCE_TABLE_UTILS_H_ */ diff --git a/src/test/regress/expected/isolation_reference_table.out b/src/test/regress/expected/isolation_reference_table.out new file mode 100644 index 000000000..cbce57630 --- /dev/null +++ b/src/test/regress/expected/isolation_reference_table.out @@ -0,0 +1,137 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s1-create s2-create s1-commit +create_reference_table +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +step s1-create: + CREATE TABLE reference_table_s1(a int); + SELECT create_reference_table('reference_table_s1'); + +create_reference_table +--------------------------------------------------------------------- + +(1 row) + +step s2-create: + CREATE TABLE reference_table_s2(a int); + SELECT create_reference_table('reference_table_s2'); + +create_reference_table +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + + +starting permutation: s1-create s2-create s1-begin s1-drop s2-drop s1-commit +create_reference_table +--------------------------------------------------------------------- + +(1 row) + +step s1-create: + CREATE TABLE reference_table_s1(a int); + SELECT create_reference_table('reference_table_s1'); + +create_reference_table +--------------------------------------------------------------------- + +(1 row) + +step s2-create: + CREATE TABLE reference_table_s2(a int); + SELECT create_reference_table('reference_table_s2'); + +create_reference_table +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +step s1-drop: + DROP TABLE reference_table_s1; + +step s2-drop: + DROP TABLE reference_table_s2; + +step s1-commit: + COMMIT; + + +starting permutation: s1-create s2-begin s2-create s1-drop s2-commit +create_reference_table +--------------------------------------------------------------------- + +(1 row) + +step s1-create: + CREATE TABLE reference_table_s1(a int); + SELECT create_reference_table('reference_table_s1'); + +create_reference_table +--------------------------------------------------------------------- + +(1 row) + +step s2-begin: + BEGIN; + +step s2-create: + CREATE TABLE reference_table_s2(a int); + SELECT create_reference_table('reference_table_s2'); + +create_reference_table +--------------------------------------------------------------------- + +(1 row) + +step s1-drop: + DROP TABLE reference_table_s1; + +step s2-commit: + COMMIT; + + +starting permutation: s2-create s2-begin s2-drop s1-create s2-commit +create_reference_table +--------------------------------------------------------------------- + +(1 row) + +step s2-create: + CREATE TABLE reference_table_s2(a int); + SELECT create_reference_table('reference_table_s2'); + +create_reference_table +--------------------------------------------------------------------- + +(1 row) + +step s2-begin: + BEGIN; + +step s2-drop: + DROP TABLE reference_table_s2; + +step s1-create: + CREATE TABLE reference_table_s1(a int); + SELECT create_reference_table('reference_table_s1'); + +create_reference_table +--------------------------------------------------------------------- + +(1 row) + +step s2-commit: + COMMIT; + diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index b9a0f8e88..94444c7ce 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -69,6 +69,7 @@ test: isolation_undistribute_table test: isolation_fix_partition_shard_index_names test: isolation_global_pid test: isolation_citus_locks +test: isolation_reference_table # Rebalancer test: isolation_blocking_move_single_shard_commands diff --git a/src/test/regress/spec/isolation_reference_table.spec b/src/test/regress/spec/isolation_reference_table.spec new file mode 100644 index 000000000..854d54842 --- /dev/null +++ b/src/test/regress/spec/isolation_reference_table.spec @@ -0,0 +1,71 @@ +// reference tables _do_ lock on the first reference table in the shardgroup due to the lack of shardgroups in the +// system. When we run the tests we want to make sure the tables we are testing against cannot be the first reference +// table. For this purpose we create a reference table that we will _not_ interact with during the tests. +setup +{ + CREATE TABLE first_reference_table(a int); + SELECT create_reference_table('first_reference_table'); +} + +teardown +{ + DROP TABLE first_reference_table; + DROP TABLE IF EXISTS reference_table_s1; + DROP TABLE IF EXISTS reference_table_s2; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-create" +{ + CREATE TABLE reference_table_s1(a int); + SELECT create_reference_table('reference_table_s1'); +} + +step "s1-drop" +{ + DROP TABLE reference_table_s1; +} + +step "s1-commit" +{ + COMMIT; +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-create" +{ + CREATE TABLE reference_table_s2(a int); + SELECT create_reference_table('reference_table_s2'); +} + +step "s2-drop" +{ + DROP TABLE reference_table_s2; +} + +step "s2-commit" +{ + COMMIT; +} + +// creates don't block each other +permutation "s1-begin" "s1-create" "s2-create" "s1-commit" + +// drops don't block each other +permutation "s1-create" "s2-create" "s1-begin" "s1-drop" "s2-drop" "s1-commit" + +// create and drop don't block each other +permutation "s1-create" "s2-begin" "s2-create" "s1-drop" "s2-commit" +permutation "s2-create" "s2-begin" "s2-drop" "s1-create" "s2-commit"