mirror of https://github.com/citusdata/citus.git
Prevent concurrent colocated table creation vs shard moves
Before this commit, there were no restrictions on concurrent operations in the $title. With this commit, we acquire ShareLock on the colocated relation such that it prevents concurrent shard moves -- which acquires ShareUpdateExclusiveLock on all colocated tables.fix_concurrent_shard_move_create_table
parent
ae58ca5783
commit
466ce05f6c
|
@ -193,13 +193,20 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
Oid distributionMethodOid = PG_GETARG_OID(2);
|
Oid distributionMethodOid = PG_GETARG_OID(2);
|
||||||
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
|
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
|
||||||
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);
|
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);
|
||||||
|
Oid colocatedRelationId = InvalidOid;
|
||||||
|
|
||||||
|
/* resolve the colocated table name, if user specified */
|
||||||
|
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
|
||||||
|
!IsColocateWithNone(colocateWithTableName))
|
||||||
|
{
|
||||||
|
colocatedRelationId = ResolveRelationId(colocateWithTableNameText, false);
|
||||||
|
}
|
||||||
|
|
||||||
bool shardCountIsStrict = false;
|
bool shardCountIsStrict = false;
|
||||||
int shardCount = ShardCount;
|
int shardCount = ShardCount;
|
||||||
if (!PG_ARGISNULL(4))
|
if (!PG_ARGISNULL(4))
|
||||||
{
|
{
|
||||||
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
|
if (OidIsValid(colocatedRelationId))
|
||||||
pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
|
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
|
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
|
||||||
"and shard_count at the same time")));
|
"and shard_count at the same time")));
|
||||||
|
@ -230,9 +237,29 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
ereport(ERROR, (errmsg("could not create distributed table: "
|
ereport(ERROR, (errmsg("could not create distributed table: "
|
||||||
"relation does not exist")));
|
"relation does not exist")));
|
||||||
}
|
}
|
||||||
|
|
||||||
relation_close(relation, NoLock);
|
relation_close(relation, NoLock);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If user specified a colocated relation, we should acquire a lock
|
||||||
|
* such that no shard moves happens concurrently. However, we should
|
||||||
|
* allow creating concurrent distributed tables that are colocated
|
||||||
|
* with with the same table. citus_move_shard_placement acquires
|
||||||
|
* ShareUpdateExclusiveLock on all colocated relations, hence we use
|
||||||
|
* ShareLock here.
|
||||||
|
*/
|
||||||
|
if (OidIsValid(colocatedRelationId))
|
||||||
|
{
|
||||||
|
Relation colocatedRelation =
|
||||||
|
try_relation_open(colocatedRelationId, ShareLock);
|
||||||
|
if (colocatedRelation == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("colocated relation %s does not exist",
|
||||||
|
colocateWithTableName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
relation_close(colocatedRelation, NoLock);
|
||||||
|
}
|
||||||
|
|
||||||
char *distributionColumnName = text_to_cstring(distributionColumnText);
|
char *distributionColumnName = text_to_cstring(distributionColumnText);
|
||||||
Assert(distributionColumnName != NULL);
|
Assert(distributionColumnName != NULL);
|
||||||
|
|
||||||
|
|
|
@ -322,7 +322,6 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
||||||
Oid distributedTableId = shardInterval->relationId;
|
Oid distributedTableId = shardInterval->relationId;
|
||||||
|
|
||||||
List *colocatedTableList = ColocatedTableList(distributedTableId);
|
List *colocatedTableList = ColocatedTableList(distributedTableId);
|
||||||
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
|
|
||||||
|
|
||||||
foreach(colocatedTableCell, colocatedTableList)
|
foreach(colocatedTableCell, colocatedTableList)
|
||||||
{
|
{
|
||||||
|
@ -351,6 +350,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* we sort colocatedShardList so that lock operations will not cause any deadlocks */
|
/* we sort colocatedShardList so that lock operations will not cause any deadlocks */
|
||||||
|
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
|
||||||
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
|
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
|
||||||
foreach(colocatedShardCell, colocatedShardList)
|
foreach(colocatedShardCell, colocatedShardList)
|
||||||
{
|
{
|
||||||
|
|
|
@ -0,0 +1,124 @@
|
||||||
|
Parsed test spec with 3 sessions
|
||||||
|
|
||||||
|
starting permutation: s2-begin s2-create_distributed_table s3-create_distributed_table s2-commit
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-create_distributed_table:
|
||||||
|
SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1');
|
||||||
|
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s3-create_distributed_table:
|
||||||
|
SELECT create_distributed_table('concurrent_table_3', 'id', colocate_with := 'concurrent_table_1');
|
||||||
|
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s2-begin s2-create_distributed_table s1-move-shard-logical s2-commit s3-sanity-check s3-sanity-check-2
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-create_distributed_table:
|
||||||
|
SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1');
|
||||||
|
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-move-shard-logical:
|
||||||
|
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
|
||||||
|
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638) FROM shardid;
|
||||||
|
<waiting ...>
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-move-shard-logical: <... completed>
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s3-sanity-check:
|
||||||
|
SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL;
|
||||||
|
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s3-sanity-check-2:
|
||||||
|
SELECT count(*) FROM concurrent_table_1 JOIN concurrent_table_2 USING (id);
|
||||||
|
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s2-begin s2-create_distributed_table s1-move-shard-block s2-commit s3-sanity-check s3-sanity-check-2
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-create_distributed_table:
|
||||||
|
SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1');
|
||||||
|
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-move-shard-block:
|
||||||
|
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
|
||||||
|
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638, 'block_writes') FROM shardid;
|
||||||
|
<waiting ...>
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-move-shard-block: <... completed>
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s3-sanity-check:
|
||||||
|
SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL;
|
||||||
|
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s3-sanity-check-2:
|
||||||
|
SELECT count(*) FROM concurrent_table_1 JOIN concurrent_table_2 USING (id);
|
||||||
|
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
|
@ -371,7 +371,7 @@ ERROR: -100 is outside the valid range for parameter "shard_count" (1 .. 64000)
|
||||||
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=64001);
|
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=64001);
|
||||||
ERROR: 64001 is outside the valid range for parameter "shard_count" (1 .. 64000)
|
ERROR: 64001 is outside the valid range for parameter "shard_count" (1 .. 64000)
|
||||||
-- shard count with colocate with table should error
|
-- shard count with colocate with table should error
|
||||||
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='shard_count');
|
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='shard_count_table');
|
||||||
ERROR: Cannot use colocate_with with a table and shard_count at the same time
|
ERROR: Cannot use colocate_with with a table and shard_count at the same time
|
||||||
-- none should not error
|
-- none should not error
|
||||||
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='none');
|
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='none');
|
||||||
|
|
|
@ -96,5 +96,6 @@ test: isolation_replicated_dist_on_mx
|
||||||
test: isolation_replicate_reference_tables_to_coordinator
|
test: isolation_replicate_reference_tables_to_coordinator
|
||||||
test: isolation_multiuser_locking
|
test: isolation_multiuser_locking
|
||||||
test: isolation_acquire_distributed_locks
|
test: isolation_acquire_distributed_locks
|
||||||
|
test: isolation_create_distributed_table_concurrency
|
||||||
|
|
||||||
test: isolation_check_mx
|
test: isolation_check_mx
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
setup
|
||||||
|
{
|
||||||
|
CREATE TABLE concurrent_table_1(id int PRIMARY KEY);
|
||||||
|
CREATE TABLE concurrent_table_2(id int PRIMARY KEY);
|
||||||
|
CREATE TABLE concurrent_table_3(id int PRIMARY KEY);
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT create_distributed_table('concurrent_table_1', 'id', colocate_with := 'none');
|
||||||
|
}
|
||||||
|
|
||||||
|
teardown
|
||||||
|
{
|
||||||
|
DROP TABLE concurrent_table_1, concurrent_table_2, concurrent_table_3 CASCADE;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s1"
|
||||||
|
|
||||||
|
|
||||||
|
step "s1-move-shard-logical"
|
||||||
|
{
|
||||||
|
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
|
||||||
|
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638) FROM shardid;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-move-shard-block"
|
||||||
|
{
|
||||||
|
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
|
||||||
|
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638, 'block_writes') FROM shardid;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s2"
|
||||||
|
|
||||||
|
step "s2-begin"
|
||||||
|
{
|
||||||
|
BEGIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-create_distributed_table"
|
||||||
|
{
|
||||||
|
SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-commit"
|
||||||
|
{
|
||||||
|
COMMIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s3"
|
||||||
|
|
||||||
|
step "s3-create_distributed_table"
|
||||||
|
{
|
||||||
|
SELECT create_distributed_table('concurrent_table_3', 'id', colocate_with := 'concurrent_table_1');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s3-sanity-check"
|
||||||
|
{
|
||||||
|
SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s3-sanity-check-2"
|
||||||
|
{
|
||||||
|
SELECT count(*) FROM concurrent_table_1 JOIN concurrent_table_2 USING (id);
|
||||||
|
}
|
||||||
|
|
||||||
|
//concurrent create_distributed_table with the same colocation should not block each other
|
||||||
|
permutation "s2-begin" "s2-create_distributed_table" "s3-create_distributed_table" "s2-commit"
|
||||||
|
|
||||||
|
// concurrent create colocated table and shard move properly block each other, and cluster is healthy
|
||||||
|
permutation "s2-begin" "s2-create_distributed_table" "s1-move-shard-logical" "s2-commit" "s3-sanity-check" "s3-sanity-check-2"
|
||||||
|
permutation "s2-begin" "s2-create_distributed_table" "s1-move-shard-block" "s2-commit" "s3-sanity-check" "s3-sanity-check-2"
|
||||||
|
|
|
@ -245,7 +245,7 @@ SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=-100);
|
||||||
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=64001);
|
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=64001);
|
||||||
|
|
||||||
-- shard count with colocate with table should error
|
-- shard count with colocate with table should error
|
||||||
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='shard_count');
|
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='shard_count_table');
|
||||||
|
|
||||||
-- none should not error
|
-- none should not error
|
||||||
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='none');
|
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='none');
|
||||||
|
|
Loading…
Reference in New Issue