From 466ce05f6c27037ab5d60023e575c17bf28d32a1 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 15 Jul 2022 14:36:14 +0200 Subject: [PATCH] Prevent concurrent colocated table creation vs shard moves Before this commit, there were no restrictions on concurrent operations in the $title. With this commit, we acquire ShareLock on the colocated relation such that it prevents concurrent shard moves -- which acquires ShareUpdateExclusiveLock on all colocated tables. --- .../commands/create_distributed_table.c | 33 ++++- .../distributed/operations/repair_shards.c | 2 +- ...n_create_distributed_table_concurrency.out | 124 ++++++++++++++++++ .../regress/expected/multi_create_table.out | 2 +- src/test/regress/isolation_schedule | 1 + ..._create_distributed_table_concurrency.spec | 71 ++++++++++ src/test/regress/sql/multi_create_table.sql | 2 +- 7 files changed, 229 insertions(+), 6 deletions(-) create mode 100644 src/test/regress/expected/isolation_create_distributed_table_concurrency.out create mode 100644 src/test/regress/spec/isolation_create_distributed_table_concurrency.spec diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 26a905f23..890491b14 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -193,13 +193,20 @@ create_distributed_table(PG_FUNCTION_ARGS) Oid distributionMethodOid = PG_GETARG_OID(2); text *colocateWithTableNameText = PG_GETARG_TEXT_P(3); char *colocateWithTableName = text_to_cstring(colocateWithTableNameText); + Oid colocatedRelationId = InvalidOid; + + /* resolve the colocated table name, if user specified */ + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 && + !IsColocateWithNone(colocateWithTableName)) + { + colocatedRelationId = ResolveRelationId(colocateWithTableNameText, false); + } bool shardCountIsStrict = false; int shardCount = ShardCount; if (!PG_ARGISNULL(4)) { - if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 && - pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0) + if (OidIsValid(colocatedRelationId)) { ereport(ERROR, (errmsg("Cannot use colocate_with with a table " "and shard_count at the same time"))); @@ -230,9 +237,29 @@ create_distributed_table(PG_FUNCTION_ARGS) ereport(ERROR, (errmsg("could not create distributed table: " "relation does not exist"))); } - relation_close(relation, NoLock); + /* + * If user specified a colocated relation, we should acquire a lock + * such that no shard moves happens concurrently. However, we should + * allow creating concurrent distributed tables that are colocated + * with with the same table. citus_move_shard_placement acquires + * ShareUpdateExclusiveLock on all colocated relations, hence we use + * ShareLock here. + */ + if (OidIsValid(colocatedRelationId)) + { + Relation colocatedRelation = + try_relation_open(colocatedRelationId, ShareLock); + if (colocatedRelation == NULL) + { + ereport(ERROR, (errmsg("colocated relation %s does not exist", + colocateWithTableName))); + } + + relation_close(colocatedRelation, NoLock); + } + char *distributionColumnName = text_to_cstring(distributionColumnText); Assert(distributionColumnName != NULL); diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index f29f0a75a..03ee3c7d0 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -322,7 +322,6 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) Oid distributedTableId = shardInterval->relationId; List *colocatedTableList = ColocatedTableList(distributedTableId); - List *colocatedShardList = ColocatedShardIntervalList(shardInterval); foreach(colocatedTableCell, colocatedTableList) { @@ -351,6 +350,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) } /* we sort colocatedShardList so that lock operations will not cause any deadlocks */ + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); foreach(colocatedShardCell, colocatedShardList) { diff --git a/src/test/regress/expected/isolation_create_distributed_table_concurrency.out b/src/test/regress/expected/isolation_create_distributed_table_concurrency.out new file mode 100644 index 000000000..2f8657df9 --- /dev/null +++ b/src/test/regress/expected/isolation_create_distributed_table_concurrency.out @@ -0,0 +1,124 @@ +Parsed test spec with 3 sessions + +starting permutation: s2-begin s2-create_distributed_table s3-create_distributed_table s2-commit +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s2-begin: + BEGIN; + +step s2-create_distributed_table: + SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1'); + +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s3-create_distributed_table: + SELECT create_distributed_table('concurrent_table_3', 'id', colocate_with := 'concurrent_table_1'); + +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s2-commit: + COMMIT; + + +starting permutation: s2-begin s2-create_distributed_table s1-move-shard-logical s2-commit s3-sanity-check s3-sanity-check-2 +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s2-begin: + BEGIN; + +step s2-create_distributed_table: + SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1'); + +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-move-shard-logical: + WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1) + SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638) FROM shardid; + +step s2-commit: + COMMIT; + +step s1-move-shard-logical: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s3-sanity-check: + SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +step s3-sanity-check-2: + SELECT count(*) FROM concurrent_table_1 JOIN concurrent_table_2 USING (id); + +count +--------------------------------------------------------------------- + 0 +(1 row) + + +starting permutation: s2-begin s2-create_distributed_table s1-move-shard-block s2-commit s3-sanity-check s3-sanity-check-2 +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s2-begin: + BEGIN; + +step s2-create_distributed_table: + SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1'); + +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-move-shard-block: + WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1) + SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638, 'block_writes') FROM shardid; + +step s2-commit: + COMMIT; + +step s1-move-shard-block: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s3-sanity-check: + SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +step s3-sanity-check-2: + SELECT count(*) FROM concurrent_table_1 JOIN concurrent_table_2 USING (id); + +count +--------------------------------------------------------------------- + 0 +(1 row) + diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index ee83adff0..3e1ae791e 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -371,7 +371,7 @@ ERROR: -100 is outside the valid range for parameter "shard_count" (1 .. 64000) SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=64001); ERROR: 64001 is outside the valid range for parameter "shard_count" (1 .. 64000) -- shard count with colocate with table should error -SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='shard_count'); +SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='shard_count_table'); ERROR: Cannot use colocate_with with a table and shard_count at the same time -- none should not error SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='none'); diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index c0fbfcfa3..f6cb0a4b6 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -96,5 +96,6 @@ test: isolation_replicated_dist_on_mx test: isolation_replicate_reference_tables_to_coordinator test: isolation_multiuser_locking test: isolation_acquire_distributed_locks +test: isolation_create_distributed_table_concurrency test: isolation_check_mx diff --git a/src/test/regress/spec/isolation_create_distributed_table_concurrency.spec b/src/test/regress/spec/isolation_create_distributed_table_concurrency.spec new file mode 100644 index 000000000..5432234d4 --- /dev/null +++ b/src/test/regress/spec/isolation_create_distributed_table_concurrency.spec @@ -0,0 +1,71 @@ +setup +{ + CREATE TABLE concurrent_table_1(id int PRIMARY KEY); + CREATE TABLE concurrent_table_2(id int PRIMARY KEY); + CREATE TABLE concurrent_table_3(id int PRIMARY KEY); + + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('concurrent_table_1', 'id', colocate_with := 'none'); +} + +teardown +{ + DROP TABLE concurrent_table_1, concurrent_table_2, concurrent_table_3 CASCADE; +} + +session "s1" + + +step "s1-move-shard-logical" +{ + WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1) + SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638) FROM shardid; +} + +step "s1-move-shard-block" +{ + WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1) + SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638, 'block_writes') FROM shardid; +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-create_distributed_table" +{ + SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1'); +} + +step "s2-commit" +{ + COMMIT; +} + +session "s3" + +step "s3-create_distributed_table" +{ + SELECT create_distributed_table('concurrent_table_3', 'id', colocate_with := 'concurrent_table_1'); +} + +step "s3-sanity-check" +{ + SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL; +} + +step "s3-sanity-check-2" +{ + SELECT count(*) FROM concurrent_table_1 JOIN concurrent_table_2 USING (id); +} + +//concurrent create_distributed_table with the same colocation should not block each other +permutation "s2-begin" "s2-create_distributed_table" "s3-create_distributed_table" "s2-commit" + +// concurrent create colocated table and shard move properly block each other, and cluster is healthy +permutation "s2-begin" "s2-create_distributed_table" "s1-move-shard-logical" "s2-commit" "s3-sanity-check" "s3-sanity-check-2" +permutation "s2-begin" "s2-create_distributed_table" "s1-move-shard-block" "s2-commit" "s3-sanity-check" "s3-sanity-check-2" + diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 82febb0fa..384a9be5d 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -245,7 +245,7 @@ SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=-100); SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=64001); -- shard count with colocate with table should error -SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='shard_count'); +SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='shard_count_table'); -- none should not error SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='none');