diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 1e617b4c5..282e07518 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -253,10 +253,13 @@ ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, { int32 shardSplitPointValue = DatumGetInt32(shardSplitPoint); - /* All Split points should lie within the shard interval range. */ - int splitPointShardIndex = FindShardIntervalIndex(shardSplitPoint, - cachedTableEntry); - if (shardIntervalToSplit->shardIndex != splitPointShardIndex) + /* + * 1) All Split points should lie within the shard interval range. + * 2) Given our split points inclusive, you cannot specify the max value in a range as a split point. + * Example: Shard 81060002 range is from (0,1073741823). '1073741823' as split point is invalid. + * '1073741822' is correct and will split shard to: (0, 1073741822) and (1073741823, 1073741823). + */ + if (shardSplitPointValue < minValue || shardSplitPointValue > maxValue) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -267,6 +270,16 @@ ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, DatumGetInt32(shardIntervalToSplit->maxValue), shardIntervalToSplit->shardId))); } + else if (maxValue == shardSplitPointValue) + { + int32 validSplitPoint = shardIntervalToSplit->maxValue - 1; + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg( + "Invalid split point %d, as split points should be inclusive. Please use %d instead.", + maxValue, + validSplitPoint))); + } /* Split points should be in strictly increasing order */ int32 lastShardSplitPointValue = DatumGetInt32(lastShardSplitPoint.value); @@ -282,22 +295,6 @@ ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, shardSplitPointValue))); } - /* - * Given our split points inclusive, you cannot specify the max value in a range as a split point. - * Example: Shard 81060002 range is from (0,1073741823). '1073741823' as split point is invalid. - * '1073741822' is correct and will split shard to: (0, 1073741822) and (1073741823, 1073741823). - */ - if (maxValue == shardSplitPointValue) - { - int32 validSplitPoint = shardIntervalToSplit->maxValue - 1; - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg( - "Invalid split point %d, as split points should be inclusive. Please use %d instead.", - maxValue, - validSplitPoint))); - } - lastShardSplitPoint = (NullableDatum) { shardSplitPoint, false }; diff --git a/src/test/regress/enterprise_isolation_schedule b/src/test/regress/enterprise_isolation_schedule index ef64eff92..0afef6f57 100644 --- a/src/test/regress/enterprise_isolation_schedule +++ b/src/test/regress/enterprise_isolation_schedule @@ -11,3 +11,4 @@ test: isolation_ref2ref_foreign_keys_enterprise test: isolation_pg_send_cancellation test: isolation_shard_move_vs_start_metadata_sync test: isolation_tenant_isolation +test: isolation_blocking_shard_split diff --git a/src/test/regress/expected/isolation_blocking_shard_split.out b/src/test/regress/expected/isolation_blocking_shard_split.out new file mode 100644 index 000000000..ff3c250fd --- /dev/null +++ b/src/test/regress/expected/isolation_blocking_shard_split.out @@ -0,0 +1,951 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-begin s2-blocking-shard-split s1-update s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + -- Indirect way to load cache. + TRUNCATE to_split_table; + +step s1-insert: + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500002 +(1 row) + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM to_split_table WHERE id = 123456789; + +count +--------------------------------------------------------------------- + 1 +(1 row) + +step s2-begin: + BEGIN; + +step s2-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'blocking'); + +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-update: + UPDATE to_split_table SET value = 111 WHERE id = 123456789; + +step s2-commit: + COMMIT; + +step s1-update: <... completed> +ERROR: could not find valid entry for shard xxxxx +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 0 + 57637|1500003|t | 1 + 57638|1500004|t | 0 +(3 rows) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + + +starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-begin s2-blocking-shard-split s1-delete s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + -- Indirect way to load cache. + TRUNCATE to_split_table; + +step s1-insert: + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500002 +(1 row) + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM to_split_table WHERE id = 123456789; + +count +--------------------------------------------------------------------- + 1 +(1 row) + +step s2-begin: + BEGIN; + +step s2-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'blocking'); + +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-delete: + DELETE FROM to_split_table WHERE id = 123456789; + +step s2-commit: + COMMIT; + +step s1-delete: <... completed> +ERROR: could not find valid entry for shard xxxxx +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 0 + 57637|1500003|t | 1 + 57638|1500004|t | 0 +(3 rows) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + + +starting permutation: s1-load-cache s1-begin s1-select s2-begin s2-blocking-shard-split s1-insert s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + -- Indirect way to load cache. + TRUNCATE to_split_table; + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM to_split_table WHERE id = 123456789; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +step s2-begin: + BEGIN; + +step s2-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'blocking'); + +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-insert: + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +step s2-commit: + COMMIT; + +step s1-insert: <... completed> +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500002 +(1 row) + +ERROR: could not find valid entry for shard xxxxx +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 0 + 57637|1500003|t | 0 + 57638|1500004|t | 0 +(3 rows) + +id|value +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s1-load-cache s1-begin s1-select s2-begin s2-blocking-shard-split s1-copy s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + -- Indirect way to load cache. + TRUNCATE to_split_table; + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM to_split_table WHERE id = 123456789; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +step s2-begin: + BEGIN; + +step s2-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'blocking'); + +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-copy: + COPY to_split_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; + +step s2-commit: + COMMIT; + +step s1-copy: <... completed> +ERROR: could not find valid entry for shard xxxxx +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 0 + 57637|1500003|t | 0 + 57638|1500004|t | 0 +(3 rows) + +id|value +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s1-insert s1-begin s1-select s2-begin s2-blocking-shard-split s1-update s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-insert: + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500002 +(1 row) + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM to_split_table WHERE id = 123456789; + +count +--------------------------------------------------------------------- + 1 +(1 row) + +step s2-begin: + BEGIN; + +step s2-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'blocking'); + +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-update: + UPDATE to_split_table SET value = 111 WHERE id = 123456789; + +step s2-commit: + COMMIT; + +step s1-update: <... completed> +ERROR: could not find valid entry for shard xxxxx +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 0 + 57637|1500003|t | 1 + 57638|1500004|t | 0 +(3 rows) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + + +starting permutation: s1-insert s1-begin s1-select s2-begin s2-blocking-shard-split s1-delete s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-insert: + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500002 +(1 row) + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM to_split_table WHERE id = 123456789; + +count +--------------------------------------------------------------------- + 1 +(1 row) + +step s2-begin: + BEGIN; + +step s2-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'blocking'); + +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-delete: + DELETE FROM to_split_table WHERE id = 123456789; + +step s2-commit: + COMMIT; + +step s1-delete: <... completed> +ERROR: could not find valid entry for shard xxxxx +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 0 + 57637|1500003|t | 1 + 57638|1500004|t | 0 +(3 rows) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + + +starting permutation: s1-begin s1-select s2-begin s2-blocking-shard-split s1-insert s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM to_split_table WHERE id = 123456789; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +step s2-begin: + BEGIN; + +step s2-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'blocking'); + +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-insert: + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +step s2-commit: + COMMIT; + +step s1-insert: <... completed> +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500002 +(1 row) + +ERROR: could not find valid entry for shard xxxxx +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 0 + 57637|1500003|t | 0 + 57638|1500004|t | 0 +(3 rows) + +id|value +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s1-begin s1-select s2-begin s2-blocking-shard-split s1-copy s2-commit s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM to_split_table WHERE id = 123456789; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +step s2-begin: + BEGIN; + +step s2-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'blocking'); + +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-copy: + COPY to_split_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; + +step s2-commit: + COMMIT; + +step s1-copy: <... completed> +ERROR: could not find valid entry for shard xxxxx +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 0 + 57637|1500003|t | 0 + 57638|1500004|t | 0 +(3 rows) + +id|value +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s1-load-cache s1-insert s1-begin s1-blocking-shard-split s2-blocking-shard-split s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + -- Indirect way to load cache. + TRUNCATE to_split_table; + +step s1-insert: + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500002 +(1 row) + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[1, 2], + 'blocking'); + +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s2-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'blocking'); + +step s1-commit: + COMMIT; + +step s2-blocking-shard-split: <... completed> +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500003|t | 0 + 57637|1500005|t | 1 + 57638|1500004|t | 0 + 57638|1500006|t | 0 +(4 rows) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + + +starting permutation: s1-insert s1-begin s1-blocking-shard-split s2-blocking-shard-split s1-commit s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-insert: + -- Id '123456789' maps to shard xxxxx. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500002 +(1 row) + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[1, 2], + 'blocking'); + +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s2-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'blocking'); + +step s1-commit: + COMMIT; + +step s2-blocking-shard-split: <... completed> +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500003|t | 0 + 57637|1500005|t | 1 + 57638|1500004|t | 0 + 57638|1500006|t | 0 +(4 rows) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + + +starting permutation: s1-load-cache s1-begin s1-select s2-begin s2-blocking-shard-split s1-ddl s2-commit s1-commit s2-print-cluster s2-print-index-count +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + -- Indirect way to load cache. + TRUNCATE to_split_table; + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM to_split_table WHERE id = 123456789; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +step s2-begin: + BEGIN; + +step s2-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'blocking'); + +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-ddl: + CREATE INDEX test_table_index ON to_split_table(id); + +step s2-commit: + COMMIT; + +step s1-ddl: <... completed> +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 0 + 57637|1500003|t | 0 + 57638|1500004|t | 0 +(3 rows) + +id|value +--------------------------------------------------------------------- +(0 rows) + +step s2-print-index-count: + SELECT + nodeport, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''') + ORDER BY + nodeport; + +nodeport|success|result +--------------------------------------------------------------------- + 57637|t | 1 + 57637|t | 1 + 57638|t | 1 +(3 rows) + + +starting permutation: s1-begin s1-select s2-begin s2-blocking-shard-split s1-ddl s2-commit s1-commit s2-print-cluster s2-print-index-count +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; + +step s1-select: + SELECT count(*) FROM to_split_table WHERE id = 123456789; + +count +--------------------------------------------------------------------- + 0 +(1 row) + +step s2-begin: + BEGIN; + +step s2-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'blocking'); + +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-ddl: + CREATE INDEX test_table_index ON to_split_table(id); + +step s2-commit: + COMMIT; + +step s1-ddl: <... completed> +step s1-commit: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 0 + 57637|1500003|t | 0 + 57638|1500004|t | 0 +(3 rows) + +id|value +--------------------------------------------------------------------- +(0 rows) + +step s2-print-index-count: + SELECT + nodeport, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''') + ORDER BY + nodeport; + +nodeport|success|result +--------------------------------------------------------------------- + 57637|t | 1 + 57637|t | 1 + 57638|t | 1 +(3 rows) + diff --git a/src/test/regress/spec/isolation_blocking_shard_split.spec b/src/test/regress/spec/isolation_blocking_shard_split.spec new file mode 100644 index 000000000..815bd2385 --- /dev/null +++ b/src/test/regress/spec/isolation_blocking_shard_split.spec @@ -0,0 +1,146 @@ +setup +{ + SET citus.shard_count to 2; + SET citus.shard_replication_factor to 1; + select setval('pg_dist_shardid_seq', 1500000); + + CREATE TABLE to_split_table (id int, value int); + SELECT create_distributed_table('to_split_table', 'id'); +} + +teardown +{ + DROP TABLE to_split_table; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; + + -- the tests are written with the logic where single shard SELECTs + -- do not to open transaction blocks + SET citus.select_opens_transaction_block TO false; +} + +// cache all placements +step "s1-load-cache" +{ + -- Indirect way to load cache. + TRUNCATE to_split_table; +} + +step "s1-insert" +{ + -- Id '123456789' maps to shard 1500002. + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + + INSERT INTO to_split_table VALUES (123456789, 1); +} + +step "s1-update" +{ + UPDATE to_split_table SET value = 111 WHERE id = 123456789; +} + +step "s1-delete" +{ + DELETE FROM to_split_table WHERE id = 123456789; +} + +step "s1-select" +{ + SELECT count(*) FROM to_split_table WHERE id = 123456789; +} + +step "s1-ddl" +{ + CREATE INDEX test_table_index ON to_split_table(id); +} + +step "s1-copy" +{ + COPY to_split_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; +} + +step "s1-blocking-shard-split" +{ + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[1, 2], + 'blocking'); +} + +step "s1-commit" +{ + COMMIT; +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-blocking-shard-split" +{ + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500002, + ARRAY['1073741824'], + ARRAY[1, 2], + 'blocking'); +} + +step "s2-commit" +{ + COMMIT; +} + +step "s2-print-cluster" +{ + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; +} + +step "s2-print-index-count" +{ + SELECT + nodeport, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''') + ORDER BY + nodeport; +} + +// Run shard split while concurrently performing DML and index creation +// We expect DML,Copy to fail because the shard they are waiting for is destroyed. + permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster" + permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster" + permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster" + permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster" + // The same tests without loading the cache at first + permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster" + permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster" + permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster" + permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster" + +// Concurrent shard split blocks on different shards of the same table (or any colocated table) + permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-blocking-shard-split" "s2-blocking-shard-split" "s1-commit" "s2-print-cluster" + // The same test above without loading the cache at first + permutation "s1-insert" "s1-begin" "s1-blocking-shard-split" "s2-blocking-shard-split" "s1-commit" "s2-print-cluster" + +// Concurrent DDL blocks on different shards of the same table (or any colocated table) + permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count" + // The same tests without loading the cache at first + permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count"