From 5bc8a81aa7001bf7e983d6ada84a1bef21108df0 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 21 Jul 2022 13:13:02 +0200 Subject: [PATCH] Add colocation checks for shard splits --- .../distributed/operations/shard_split.c | 4 + .../isolation_blocking_shard_split.out | 580 +++++++++--------- ...isolation_concurrent_move_create_table.out | 55 +- .../spec/isolation_blocking_shard_split.spec | 30 +- ...solation_concurrent_move_create_table.spec | 13 +- 5 files changed, 349 insertions(+), 333 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index d39780e0d..76687434e 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -35,6 +35,7 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_physical_planner.h" #include "distributed/deparse_shard_query.h" +#include "distributed/shard_rebalancer.h" /* * Entry for map that tracks ShardInterval -> Placement Node @@ -329,6 +330,9 @@ SplitShard(SplitMode splitMode, ShardInterval *shardIntervalToSplit = LoadShardInterval(shardIdToSplit); List *colocatedTableList = ColocatedTableList(shardIntervalToSplit->relationId); + Oid relationId = RelationIdForShard(shardIdToSplit); + AcquirePlacementColocationLock(relationId, ExclusiveLock, "split"); + /* sort the tables to avoid deadlocks */ colocatedTableList = SortList(colocatedTableList, CompareOids); Oid colocatedTableId = InvalidOid; diff --git a/src/test/regress/expected/isolation_blocking_shard_split.out b/src/test/regress/expected/isolation_blocking_shard_split.out index 02a23174e..d720f3a32 100644 --- a/src/test/regress/expected/isolation_blocking_shard_split.out +++ b/src/test/regress/expected/isolation_blocking_shard_split.out @@ -7,13 +7,13 @@ create_distributed_table (1 row) step s1-load-cache: - -- Indirect way to load cache. - TRUNCATE to_split_table; + -- Indirect way to load cache. + TRUNCATE to_split_table; step s1-insert: - -- Id '123456789' maps to shard xxxxx. - SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); - INSERT INTO to_split_table VALUES (123456789, 1); + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); get_shard_id_for_distribution_column --------------------------------------------------------------------- @@ -27,7 +27,7 @@ step s1-begin: SET citus.select_opens_transaction_block TO false; step s1-select: - SELECT count(*) FROM to_split_table WHERE id = 123456789; + SELECT count(*) FROM to_split_table WHERE id = 123456789; count --------------------------------------------------------------------- @@ -35,14 +35,14 @@ count (1 row) step s2-begin: - BEGIN; + BEGIN; step s2-blocking-shard-split: - SELECT pg_catalog.citus_split_shard_by_split_points( - 1500002, - ARRAY['1073741824'], - ARRAY[1, 2], - 'block_writes'); + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -50,26 +50,26 @@ citus_split_shard_by_split_points (1 row) step s1-update: - UPDATE to_split_table SET value = 111 WHERE id = 123456789; + UPDATE to_split_table SET value = 111 WHERE id = 123456789; step s2-commit: - COMMIT; + COMMIT; step s1-update: <... completed> ERROR: could not find valid entry for shard xxxxx step s1-commit: - COMMIT; + COMMIT; step s2-print-cluster: - -- row count per shard - SELECT - nodeport, shardid, success, result - FROM - run_command_on_placements('to_split_table', 'select count(*) from %s') - ORDER BY - nodeport, shardid; - -- rows - SELECT id, value FROM to_split_table ORDER BY id, value; + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; nodeport|shardid|success|result --------------------------------------------------------------------- @@ -91,13 +91,13 @@ create_distributed_table (1 row) step s1-load-cache: - -- Indirect way to load cache. - TRUNCATE to_split_table; + -- Indirect way to load cache. + TRUNCATE to_split_table; step s1-insert: - -- Id '123456789' maps to shard xxxxx. - SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); - INSERT INTO to_split_table VALUES (123456789, 1); + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); get_shard_id_for_distribution_column --------------------------------------------------------------------- @@ -111,7 +111,7 @@ step s1-begin: SET citus.select_opens_transaction_block TO false; step s1-select: - SELECT count(*) FROM to_split_table WHERE id = 123456789; + SELECT count(*) FROM to_split_table WHERE id = 123456789; count --------------------------------------------------------------------- @@ -119,14 +119,14 @@ count (1 row) step s2-begin: - BEGIN; + BEGIN; step s2-blocking-shard-split: - SELECT pg_catalog.citus_split_shard_by_split_points( - 1500002, - ARRAY['1073741824'], - ARRAY[1, 2], - 'block_writes'); + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -134,26 +134,26 @@ citus_split_shard_by_split_points (1 row) step s1-delete: - DELETE FROM to_split_table WHERE id = 123456789; + DELETE FROM to_split_table WHERE id = 123456789; step s2-commit: - COMMIT; + COMMIT; step s1-delete: <... completed> ERROR: could not find valid entry for shard xxxxx step s1-commit: - COMMIT; + COMMIT; step s2-print-cluster: - -- row count per shard - SELECT - nodeport, shardid, success, result - FROM - run_command_on_placements('to_split_table', 'select count(*) from %s') - ORDER BY - nodeport, shardid; - -- rows - SELECT id, value FROM to_split_table ORDER BY id, value; + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; nodeport|shardid|success|result --------------------------------------------------------------------- @@ -175,8 +175,8 @@ create_distributed_table (1 row) step s1-load-cache: - -- Indirect way to load cache. - TRUNCATE to_split_table; + -- Indirect way to load cache. + TRUNCATE to_split_table; step s1-begin: BEGIN; @@ -185,7 +185,7 @@ step s1-begin: SET citus.select_opens_transaction_block TO false; step s1-select: - SELECT count(*) FROM to_split_table WHERE id = 123456789; + SELECT count(*) FROM to_split_table WHERE id = 123456789; count --------------------------------------------------------------------- @@ -193,14 +193,14 @@ count (1 row) step s2-begin: - BEGIN; + BEGIN; step s2-blocking-shard-split: - SELECT pg_catalog.citus_split_shard_by_split_points( - 1500002, - ARRAY['1073741824'], - ARRAY[1, 2], - 'block_writes'); + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -208,12 +208,12 @@ citus_split_shard_by_split_points (1 row) step s1-insert: - -- Id '123456789' maps to shard xxxxx. - SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); - INSERT INTO to_split_table VALUES (123456789, 1); + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); step s2-commit: - COMMIT; + COMMIT; step s1-insert: <... completed> get_shard_id_for_distribution_column @@ -223,18 +223,18 @@ get_shard_id_for_distribution_column ERROR: could not find valid entry for shard xxxxx step s1-commit: - COMMIT; + COMMIT; step s2-print-cluster: - -- row count per shard - SELECT - nodeport, shardid, success, result - FROM - run_command_on_placements('to_split_table', 'select count(*) from %s') - ORDER BY - nodeport, shardid; - -- rows - SELECT id, value FROM to_split_table ORDER BY id, value; + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; nodeport|shardid|success|result --------------------------------------------------------------------- @@ -255,8 +255,8 @@ create_distributed_table (1 row) step s1-load-cache: - -- Indirect way to load cache. - TRUNCATE to_split_table; + -- Indirect way to load cache. + TRUNCATE to_split_table; step s1-begin: BEGIN; @@ -265,7 +265,7 @@ step s1-begin: SET citus.select_opens_transaction_block TO false; step s1-select: - SELECT count(*) FROM to_split_table WHERE id = 123456789; + SELECT count(*) FROM to_split_table WHERE id = 123456789; count --------------------------------------------------------------------- @@ -273,14 +273,14 @@ count (1 row) step s2-begin: - BEGIN; + BEGIN; step s2-blocking-shard-split: - SELECT pg_catalog.citus_split_shard_by_split_points( - 1500002, - ARRAY['1073741824'], - ARRAY[1, 2], - 'block_writes'); + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -288,26 +288,26 @@ citus_split_shard_by_split_points (1 row) step s1-copy: - COPY to_split_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; + COPY to_split_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; step s2-commit: - COMMIT; + COMMIT; step s1-copy: <... completed> ERROR: could not find valid entry for shard xxxxx step s1-commit: - COMMIT; + COMMIT; step s2-print-cluster: - -- row count per shard - SELECT - nodeport, shardid, success, result - FROM - run_command_on_placements('to_split_table', 'select count(*) from %s') - ORDER BY - nodeport, shardid; - -- rows - SELECT id, value FROM to_split_table ORDER BY id, value; + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; nodeport|shardid|success|result --------------------------------------------------------------------- @@ -328,9 +328,9 @@ create_distributed_table (1 row) step s1-insert: - -- Id '123456789' maps to shard xxxxx. - SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); - INSERT INTO to_split_table VALUES (123456789, 1); + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); get_shard_id_for_distribution_column --------------------------------------------------------------------- @@ -344,7 +344,7 @@ step s1-begin: SET citus.select_opens_transaction_block TO false; step s1-select: - SELECT count(*) FROM to_split_table WHERE id = 123456789; + SELECT count(*) FROM to_split_table WHERE id = 123456789; count --------------------------------------------------------------------- @@ -352,14 +352,14 @@ count (1 row) step s2-begin: - BEGIN; + BEGIN; step s2-blocking-shard-split: - SELECT pg_catalog.citus_split_shard_by_split_points( - 1500002, - ARRAY['1073741824'], - ARRAY[1, 2], - 'block_writes'); + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -367,26 +367,26 @@ citus_split_shard_by_split_points (1 row) step s1-update: - UPDATE to_split_table SET value = 111 WHERE id = 123456789; + UPDATE to_split_table SET value = 111 WHERE id = 123456789; step s2-commit: - COMMIT; + COMMIT; step s1-update: <... completed> ERROR: could not find valid entry for shard xxxxx step s1-commit: - COMMIT; + COMMIT; step s2-print-cluster: - -- row count per shard - SELECT - nodeport, shardid, success, result - FROM - run_command_on_placements('to_split_table', 'select count(*) from %s') - ORDER BY - nodeport, shardid; - -- rows - SELECT id, value FROM to_split_table ORDER BY id, value; + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; nodeport|shardid|success|result --------------------------------------------------------------------- @@ -408,9 +408,9 @@ create_distributed_table (1 row) step s1-insert: - -- Id '123456789' maps to shard xxxxx. - SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); - INSERT INTO to_split_table VALUES (123456789, 1); + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); get_shard_id_for_distribution_column --------------------------------------------------------------------- @@ -424,7 +424,7 @@ step s1-begin: SET citus.select_opens_transaction_block TO false; step s1-select: - SELECT count(*) FROM to_split_table WHERE id = 123456789; + SELECT count(*) FROM to_split_table WHERE id = 123456789; count --------------------------------------------------------------------- @@ -432,14 +432,14 @@ count (1 row) step s2-begin: - BEGIN; + BEGIN; step s2-blocking-shard-split: - SELECT pg_catalog.citus_split_shard_by_split_points( - 1500002, - ARRAY['1073741824'], - ARRAY[1, 2], - 'block_writes'); + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -447,26 +447,26 @@ citus_split_shard_by_split_points (1 row) step s1-delete: - DELETE FROM to_split_table WHERE id = 123456789; + DELETE FROM to_split_table WHERE id = 123456789; step s2-commit: - COMMIT; + COMMIT; step s1-delete: <... completed> ERROR: could not find valid entry for shard xxxxx step s1-commit: - COMMIT; + COMMIT; step s2-print-cluster: - -- row count per shard - SELECT - nodeport, shardid, success, result - FROM - run_command_on_placements('to_split_table', 'select count(*) from %s') - ORDER BY - nodeport, shardid; - -- rows - SELECT id, value FROM to_split_table ORDER BY id, value; + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; nodeport|shardid|success|result --------------------------------------------------------------------- @@ -494,7 +494,7 @@ step s1-begin: SET citus.select_opens_transaction_block TO false; step s1-select: - SELECT count(*) FROM to_split_table WHERE id = 123456789; + SELECT count(*) FROM to_split_table WHERE id = 123456789; count --------------------------------------------------------------------- @@ -502,14 +502,14 @@ count (1 row) step s2-begin: - BEGIN; + BEGIN; step s2-blocking-shard-split: - SELECT pg_catalog.citus_split_shard_by_split_points( - 1500002, - ARRAY['1073741824'], - ARRAY[1, 2], - 'block_writes'); + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -517,12 +517,12 @@ citus_split_shard_by_split_points (1 row) step s1-insert: - -- Id '123456789' maps to shard xxxxx. - SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); - INSERT INTO to_split_table VALUES (123456789, 1); + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); step s2-commit: - COMMIT; + COMMIT; step s1-insert: <... completed> get_shard_id_for_distribution_column @@ -532,18 +532,18 @@ get_shard_id_for_distribution_column ERROR: could not find valid entry for shard xxxxx step s1-commit: - COMMIT; + COMMIT; step s2-print-cluster: - -- row count per shard - SELECT - nodeport, shardid, success, result - FROM - run_command_on_placements('to_split_table', 'select count(*) from %s') - ORDER BY - nodeport, shardid; - -- rows - SELECT id, value FROM to_split_table ORDER BY id, value; + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; nodeport|shardid|success|result --------------------------------------------------------------------- @@ -570,7 +570,7 @@ step s1-begin: SET citus.select_opens_transaction_block TO false; step s1-select: - SELECT count(*) FROM to_split_table WHERE id = 123456789; + SELECT count(*) FROM to_split_table WHERE id = 123456789; count --------------------------------------------------------------------- @@ -578,14 +578,14 @@ count (1 row) step s2-begin: - BEGIN; + BEGIN; step s2-blocking-shard-split: - SELECT pg_catalog.citus_split_shard_by_split_points( - 1500002, - ARRAY['1073741824'], - ARRAY[1, 2], - 'block_writes'); + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -593,26 +593,26 @@ citus_split_shard_by_split_points (1 row) step s1-copy: - COPY to_split_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; + COPY to_split_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; step s2-commit: - COMMIT; + COMMIT; step s1-copy: <... completed> ERROR: could not find valid entry for shard xxxxx step s1-commit: - COMMIT; + COMMIT; step s2-print-cluster: - -- row count per shard - SELECT - nodeport, shardid, success, result - FROM - run_command_on_placements('to_split_table', 'select count(*) from %s') - ORDER BY - nodeport, shardid; - -- rows - SELECT id, value FROM to_split_table ORDER BY id, value; + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; nodeport|shardid|success|result --------------------------------------------------------------------- @@ -633,13 +633,13 @@ create_distributed_table (1 row) step s1-load-cache: - -- Indirect way to load cache. - TRUNCATE to_split_table; + -- Indirect way to load cache. + TRUNCATE to_split_table; step s1-insert: - -- Id '123456789' maps to shard xxxxx. - SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); - INSERT INTO to_split_table VALUES (123456789, 1); + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); get_shard_id_for_distribution_column --------------------------------------------------------------------- @@ -653,11 +653,11 @@ step s1-begin: SET citus.select_opens_transaction_block TO false; step s1-blocking-shard-split: - SELECT pg_catalog.citus_split_shard_by_split_points( - 1500001, - ARRAY['-1073741824'], - ARRAY[1, 2], - 'block_writes'); + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[1, 2], + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -665,39 +665,33 @@ citus_split_shard_by_split_points (1 row) step s2-blocking-shard-split: - SELECT pg_catalog.citus_split_shard_by_split_points( - 1500002, - ARRAY['1073741824'], - ARRAY[1, 2], - 'block_writes'); - -step s1-commit: - COMMIT; + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'block_writes'); -step s2-blocking-shard-split: <... completed> -citus_split_shard_by_split_points ---------------------------------------------------------------------- - -(1 row) +ERROR: could not acquire the lock required to split public.to_split_table +step s1-commit: + COMMIT; step s2-print-cluster: - -- row count per shard - SELECT - nodeport, shardid, success, result - FROM - run_command_on_placements('to_split_table', 'select count(*) from %s') - ORDER BY - nodeport, shardid; - -- rows - SELECT id, value FROM to_split_table ORDER BY id, value; + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; nodeport|shardid|success|result --------------------------------------------------------------------- 57637|1500003|t | 0 - 57637|1500005|t | 1 + 57638|1500002|t | 1 57638|1500004|t | 0 - 57638|1500006|t | 0 -(4 rows) +(3 rows) id|value --------------------------------------------------------------------- @@ -712,9 +706,9 @@ create_distributed_table (1 row) step s1-insert: - -- Id '123456789' maps to shard xxxxx. - SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); - INSERT INTO to_split_table VALUES (123456789, 1); + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); get_shard_id_for_distribution_column --------------------------------------------------------------------- @@ -728,11 +722,11 @@ step s1-begin: SET citus.select_opens_transaction_block TO false; step s1-blocking-shard-split: - SELECT pg_catalog.citus_split_shard_by_split_points( - 1500001, - ARRAY['-1073741824'], - ARRAY[1, 2], - 'block_writes'); + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[1, 2], + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -740,39 +734,33 @@ citus_split_shard_by_split_points (1 row) step s2-blocking-shard-split: - SELECT pg_catalog.citus_split_shard_by_split_points( - 1500002, - ARRAY['1073741824'], - ARRAY[1, 2], - 'block_writes'); - -step s1-commit: - COMMIT; + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'block_writes'); -step s2-blocking-shard-split: <... completed> -citus_split_shard_by_split_points ---------------------------------------------------------------------- - -(1 row) +ERROR: could not acquire the lock required to split public.to_split_table +step s1-commit: + COMMIT; step s2-print-cluster: - -- row count per shard - SELECT - nodeport, shardid, success, result - FROM - run_command_on_placements('to_split_table', 'select count(*) from %s') - ORDER BY - nodeport, shardid; - -- rows - SELECT id, value FROM to_split_table ORDER BY id, value; + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; nodeport|shardid|success|result --------------------------------------------------------------------- 57637|1500003|t | 0 - 57637|1500005|t | 1 + 57638|1500002|t | 1 57638|1500004|t | 0 - 57638|1500006|t | 0 -(4 rows) +(3 rows) id|value --------------------------------------------------------------------- @@ -787,8 +775,8 @@ create_distributed_table (1 row) step s1-load-cache: - -- Indirect way to load cache. - TRUNCATE to_split_table; + -- Indirect way to load cache. + TRUNCATE to_split_table; step s1-begin: BEGIN; @@ -797,7 +785,7 @@ step s1-begin: SET citus.select_opens_transaction_block TO false; step s1-select: - SELECT count(*) FROM to_split_table WHERE id = 123456789; + SELECT count(*) FROM to_split_table WHERE id = 123456789; count --------------------------------------------------------------------- @@ -805,14 +793,14 @@ count (1 row) step s2-begin: - BEGIN; + BEGIN; step s2-blocking-shard-split: - SELECT pg_catalog.citus_split_shard_by_split_points( - 1500002, - ARRAY['1073741824'], - ARRAY[1, 2], - 'block_writes'); + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -820,25 +808,25 @@ citus_split_shard_by_split_points (1 row) step s1-ddl: - CREATE INDEX test_table_index ON to_split_table(id); + CREATE INDEX test_table_index ON to_split_table(id); step s2-commit: - COMMIT; + COMMIT; step s1-ddl: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-cluster: - -- row count per shard - SELECT - nodeport, shardid, success, result - FROM - run_command_on_placements('to_split_table', 'select count(*) from %s') - ORDER BY - nodeport, shardid; - -- rows - SELECT id, value FROM to_split_table ORDER BY id, value; + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; nodeport|shardid|success|result --------------------------------------------------------------------- @@ -852,12 +840,12 @@ id|value (0 rows) step s2-print-index-count: - SELECT - nodeport, success, result - FROM - run_command_on_placements('to_split_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''') - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''') + ORDER BY + nodeport; nodeport|success|result --------------------------------------------------------------------- @@ -880,7 +868,7 @@ step s1-begin: SET citus.select_opens_transaction_block TO false; step s1-select: - SELECT count(*) FROM to_split_table WHERE id = 123456789; + SELECT count(*) FROM to_split_table WHERE id = 123456789; count --------------------------------------------------------------------- @@ -888,14 +876,14 @@ count (1 row) step s2-begin: - BEGIN; + BEGIN; step s2-blocking-shard-split: - SELECT pg_catalog.citus_split_shard_by_split_points( - 1500002, - ARRAY['1073741824'], - ARRAY[1, 2], - 'block_writes'); + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'block_writes'); citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -903,25 +891,25 @@ citus_split_shard_by_split_points (1 row) step s1-ddl: - CREATE INDEX test_table_index ON to_split_table(id); + CREATE INDEX test_table_index ON to_split_table(id); step s2-commit: - COMMIT; + COMMIT; step s1-ddl: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-cluster: - -- row count per shard - SELECT - nodeport, shardid, success, result - FROM - run_command_on_placements('to_split_table', 'select count(*) from %s') - ORDER BY - nodeport, shardid; - -- rows - SELECT id, value FROM to_split_table ORDER BY id, value; + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; nodeport|shardid|success|result --------------------------------------------------------------------- @@ -935,12 +923,12 @@ id|value (0 rows) step s2-print-index-count: - SELECT - nodeport, success, result - FROM - run_command_on_placements('to_split_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''') - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''') + ORDER BY + nodeport; nodeport|success|result --------------------------------------------------------------------- diff --git a/src/test/regress/expected/isolation_concurrent_move_create_table.out b/src/test/regress/expected/isolation_concurrent_move_create_table.out index 343955968..4ee46db32 100644 --- a/src/test/regress/expected/isolation_concurrent_move_create_table.out +++ b/src/test/regress/expected/isolation_concurrent_move_create_table.out @@ -1,11 +1,6 @@ Parsed test spec with 5 sessions starting permutation: s2-begin s2-create_distributed_table s3-create_distributed_table s2-commit -create_distributed_table ---------------------------------------------------------------------- - -(1 row) - step s2-begin: BEGIN; @@ -30,11 +25,6 @@ step s2-commit: starting permutation: s2-begin s2-create_distributed_table s1-move-shard-logical s2-commit s3-sanity-check s3-sanity-check-2 -create_distributed_table ---------------------------------------------------------------------- - -(1 row) - step s2-begin: BEGIN; @@ -72,11 +62,6 @@ count starting permutation: s2-begin s2-create_distributed_table s1-move-shard-block s2-commit s3-sanity-check s3-sanity-check-2 -create_distributed_table ---------------------------------------------------------------------- - -(1 row) - step s2-begin: BEGIN; @@ -113,12 +98,45 @@ count (1 row) -starting permutation: s4-begin s4-move-shard-logical s5-setup-rep-factor s5-create_implicit_colocated_distributed_table s4-commit s3-sanity-check s3-sanity-check-3 s3-sanity-check-4 +starting permutation: s2-begin s2-create_distributed_table s1-split-block s2-commit s3-sanity-check s3-sanity-check-2 +step s2-begin: + BEGIN; + +step s2-create_distributed_table: + SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1'); + create_distributed_table --------------------------------------------------------------------- (1 row) +step s1-split-block: + WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1) + SELECT citus_split_shard_by_split_points( + shardid.shardid, ARRAY['2113265921'], ARRAY[(SELECT * FROM first_node_id), (SELECT * FROM first_node_id)], 'block_writes') FROM shardid; + +ERROR: could not acquire the lock required to split public.concurrent_table_1 +step s2-commit: + COMMIT; + +step s3-sanity-check: + SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +step s3-sanity-check-2: + SELECT count(*) FROM concurrent_table_1 JOIN concurrent_table_2 USING (id); + +count +--------------------------------------------------------------------- + 0 +(1 row) + + +starting permutation: s4-begin s4-move-shard-logical s5-setup-rep-factor s5-create_implicit_colocated_distributed_table s4-commit s3-sanity-check s3-sanity-check-3 s3-sanity-check-4 step s4-begin: BEGIN; @@ -167,11 +185,6 @@ count starting permutation: s4-begin s4-move-shard-block s5-setup-rep-factor s5-create_implicit_colocated_distributed_table s4-commit s3-sanity-check s3-sanity-check-3 s3-sanity-check-4 -create_distributed_table ---------------------------------------------------------------------- - -(1 row) - step s4-begin: BEGIN; diff --git a/src/test/regress/spec/isolation_blocking_shard_split.spec b/src/test/regress/spec/isolation_blocking_shard_split.spec index ddac66f5b..bb2f93368 100644 --- a/src/test/regress/spec/isolation_blocking_shard_split.spec +++ b/src/test/regress/spec/isolation_blocking_shard_split.spec @@ -125,22 +125,22 @@ step "s2-print-index-count" // Run shard split while concurrently performing DML and index creation // We expect DML,Copy to fail because the shard they are waiting for is destroyed. - permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster" - permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster" - permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster" - permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster" - // The same tests without loading the cache at first - permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster" - permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster" - permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster" - permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster" +permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster" +permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster" +permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster" +permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster" +// The same tests without loading the cache at first +permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster" +permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster" +permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster" +permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster" // Concurrent shard split blocks on different shards of the same table (or any colocated table) - permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-blocking-shard-split" "s2-blocking-shard-split" "s1-commit" "s2-print-cluster" - // The same test above without loading the cache at first - permutation "s1-insert" "s1-begin" "s1-blocking-shard-split" "s2-blocking-shard-split" "s1-commit" "s2-print-cluster" +permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-blocking-shard-split" "s2-blocking-shard-split" "s1-commit" "s2-print-cluster" +// The same test above without loading the cache at first +permutation "s1-insert" "s1-begin" "s1-blocking-shard-split" "s2-blocking-shard-split" "s1-commit" "s2-print-cluster" // Concurrent DDL blocks on different shards of the same table (or any colocated table) - permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count" - // The same tests without loading the cache at first - permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count" +permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count" +// The same tests without loading the cache at first +permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count" diff --git a/src/test/regress/spec/isolation_concurrent_move_create_table.spec b/src/test/regress/spec/isolation_concurrent_move_create_table.spec index ae8fd0b95..48022425e 100644 --- a/src/test/regress/spec/isolation_concurrent_move_create_table.spec +++ b/src/test/regress/spec/isolation_concurrent_move_create_table.spec @@ -9,11 +9,13 @@ setup SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('concurrent_table_1', 'id', colocate_with := 'none'); SELECT create_distributed_table('concurrent_table_4', 'id'); + + SELECT nodeid INTO first_node_id FROM pg_dist_node WHERE nodeport = 57637; } teardown { - DROP TABLE concurrent_table_1, concurrent_table_2, concurrent_table_3, concurrent_table_4, concurrent_table_5 CASCADE; + DROP TABLE concurrent_table_1, concurrent_table_2, concurrent_table_3, concurrent_table_4, concurrent_table_5, first_node_id CASCADE; } session "s1" @@ -31,6 +33,14 @@ step "s1-move-shard-block" SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638, 'block_writes') FROM shardid; } +step "s1-split-block" +{ + WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1) + SELECT citus_split_shard_by_split_points( + shardid.shardid, ARRAY['2113265921'], ARRAY[(SELECT * FROM first_node_id), (SELECT * FROM first_node_id)], 'block_writes') FROM shardid; +} + + session "s2" step "s2-begin" @@ -119,6 +129,7 @@ permutation "s2-begin" "s2-create_distributed_table" "s3-create_distributed_tab // concurrent create colocated table and shard move properly block each other, and cluster is healthy permutation "s2-begin" "s2-create_distributed_table" "s1-move-shard-logical" "s2-commit" "s3-sanity-check" "s3-sanity-check-2" permutation "s2-begin" "s2-create_distributed_table" "s1-move-shard-block" "s2-commit" "s3-sanity-check" "s3-sanity-check-2" +permutation "s2-begin" "s2-create_distributed_table" "s1-split-block" "s2-commit" "s3-sanity-check" "s3-sanity-check-2" // same test above, but this time implicitly colocated tables permutation "s4-begin" "s4-move-shard-logical" "s5-setup-rep-factor" "s5-create_implicit_colocated_distributed_table" "s4-commit" "s3-sanity-check" "s3-sanity-check-3" "s3-sanity-check-4"