diff --git a/src/test/regress/enterprise_isolation_logicalrep_1_schedule b/src/test/regress/enterprise_isolation_logicalrep_1_schedule index 0c0a782d7..8a0e27c4a 100644 --- a/src/test/regress/enterprise_isolation_logicalrep_1_schedule +++ b/src/test/regress/enterprise_isolation_logicalrep_1_schedule @@ -8,3 +8,4 @@ test: isolation_cluster_management test: isolation_logical_replication_single_shard_commands test: isolation_logical_replication_multi_shard_commands test: isolation_non_blocking_shard_split +test: isolation_non_blocking_shard_split_with_index_as_replicaIdentity diff --git a/src/test/regress/expected/isolation_non_blocking_shard_split_with_index_as_replicaIdentity.out b/src/test/regress/expected/isolation_non_blocking_shard_split_with_index_as_replicaIdentity.out new file mode 100644 index 000000000..df959baea --- /dev/null +++ b/src/test/regress/expected/isolation_non_blocking_shard_split_with_index_as_replicaIdentity.out @@ -0,0 +1,647 @@ +unused step name: s2-insert-2 +unused step name: s2-select +Parsed test spec with 3 sessions + +starting permutation: s1-load-cache s2-print-cluster s3-acquire-advisory-lock s1-begin s2-begin s1-non-blocking-shard-split s2-insert s2-end s2-print-cluster s3-release-advisory-lock s1-end s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + -- Indirect way to load cache. + TRUNCATE to_split_table; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 0 +(1 row) + +id|value +--------------------------------------------------------------------- +(0 rows) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist +step s1-non-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[2, 2], + 'force_logical'); + +step s2-insert: + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500001 +(1 row) + +step s2-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 1 +(1 row) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-non-blocking-shard-split: <... completed> +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57638|1500002|t | 0 + 57638|1500003|t | 1 +(2 rows) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + + +starting permutation: s1-load-cache s2-insert s2-print-cluster s3-acquire-advisory-lock s1-begin s1-non-blocking-shard-split s2-update s3-release-advisory-lock s1-end s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + -- Indirect way to load cache. + TRUNCATE to_split_table; + +step s2-insert: + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500001 +(1 row) + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 1 +(1 row) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist +step s1-non-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[2, 2], + 'force_logical'); + +step s2-update: + UPDATE to_split_table SET value = 111 WHERE id = 123456789; + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-non-blocking-shard-split: <... completed> +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57638|1500002|t | 0 + 57638|1500003|t | 1 +(2 rows) + + id|value +--------------------------------------------------------------------- +123456789| 111 +(1 row) + + +starting permutation: s1-load-cache s2-insert s2-print-cluster s3-acquire-advisory-lock s1-begin s1-non-blocking-shard-split s2-delete s3-release-advisory-lock s1-end s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + -- Indirect way to load cache. + TRUNCATE to_split_table; + +step s2-insert: + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500001 +(1 row) + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 1 +(1 row) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist +step s1-non-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[2, 2], + 'force_logical'); + +step s2-delete: + DELETE FROM to_split_table WHERE id = 123456789; + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-non-blocking-shard-split: <... completed> +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57638|1500002|t | 0 + 57638|1500003|t | 0 +(2 rows) + +id|value +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s2-print-cluster s3-acquire-advisory-lock s1-begin s2-begin s1-non-blocking-shard-split s2-insert s2-end s2-print-cluster s3-release-advisory-lock s1-end s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 0 +(1 row) + +id|value +--------------------------------------------------------------------- +(0 rows) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist +step s1-non-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[2, 2], + 'force_logical'); + +step s2-insert: + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500001 +(1 row) + +step s2-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 1 +(1 row) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-non-blocking-shard-split: <... completed> +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57638|1500002|t | 0 + 57638|1500003|t | 1 +(2 rows) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + + +starting permutation: s2-insert s2-print-cluster s3-acquire-advisory-lock s1-begin s1-non-blocking-shard-split s2-update s3-release-advisory-lock s1-end s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s2-insert: + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500001 +(1 row) + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 1 +(1 row) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist +step s1-non-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[2, 2], + 'force_logical'); + +step s2-update: + UPDATE to_split_table SET value = 111 WHERE id = 123456789; + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-non-blocking-shard-split: <... completed> +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57638|1500002|t | 0 + 57638|1500003|t | 1 +(2 rows) + + id|value +--------------------------------------------------------------------- +123456789| 111 +(1 row) + + +starting permutation: s2-insert s2-print-cluster s3-acquire-advisory-lock s1-begin s1-non-blocking-shard-split s2-delete s3-release-advisory-lock s1-end s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s2-insert: + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500001 +(1 row) + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 1 +(1 row) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist +step s1-non-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[2, 2], + 'force_logical'); + +step s2-delete: + DELETE FROM to_split_table WHERE id = 123456789; + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-non-blocking-shard-split: <... completed> +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57638|1500002|t | 0 + 57638|1500003|t | 0 +(2 rows) + +id|value +--------------------------------------------------------------------- +(0 rows) + diff --git a/src/test/regress/spec/isolation_non_blocking_shard_split_with_index_as_replicaIdentity.spec b/src/test/regress/spec/isolation_non_blocking_shard_split_with_index_as_replicaIdentity.spec new file mode 100644 index 000000000..64316f641 --- /dev/null +++ b/src/test/regress/spec/isolation_non_blocking_shard_split_with_index_as_replicaIdentity.spec @@ -0,0 +1,132 @@ +setup +{ + SET citus.shard_count to 1; + SET citus.shard_replication_factor to 1; + SELECT setval('pg_dist_shardid_seq', 1500000); + + CREATE TABLE to_split_table (id int NOT NULL, value int); + CREATE UNIQUE INDEX split_table_index ON to_split_table(id); + ALTER TABLE to_split_table REPLICA IDENTITY USING INDEX split_table_index; + + SELECT create_distributed_table('to_split_table', 'id'); +} + +teardown +{ + DROP TABLE to_split_table CASCADE; +} + + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +// cache all placements +step "s1-load-cache" +{ + -- Indirect way to load cache. + TRUNCATE to_split_table; +} + +step "s1-non-blocking-shard-split" +{ + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[2, 2], + 'force_logical'); +} + +step "s1-end" +{ + COMMIT; +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-insert" +{ + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); +} + +step "s2-insert-2" +{ + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (1234567819, 1); +} + +step "s2-update" +{ + UPDATE to_split_table SET value = 111 WHERE id = 123456789; +} + +step "s2-delete" +{ + DELETE FROM to_split_table WHERE id = 123456789; +} + +step "s2-select" +{ + SELECT count(*) FROM to_split_table WHERE id = 123456789; +} + +step "s2-end" +{ + COMMIT; +} + +step "s2-print-cluster" +{ + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; +} + + +session "s3" + +// this advisory lock with (almost) random values are only used +// for testing purposes. For details, check Citus' logical replication +// source code +step "s3-acquire-advisory-lock" +{ + SELECT pg_advisory_lock(44000, 55152); +} + +step "s3-release-advisory-lock" +{ + SELECT pg_advisory_unlock(44000, 55152); +} + +##// nonblocking tests lie below ### + +// move placement first +// the following tests show the non-blocking modifications while shard is being moved +// in fact, the shard move blocks the writes for a very short duration of time +// by using an advisory and allowing the other commands continue to run, we prevent +// the modifications to block on that blocking duration +//permutation "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-insert" "s3-release-advisory-lock" "s1-end" "s1-select" "s1-get-shard-distribution" + + +permutation "s1-load-cache" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s2-begin" "s1-non-blocking-shard-split" "s2-insert" "s2-end" "s2-print-cluster" "s3-release-advisory-lock" "s1-end" "s2-print-cluster" +permutation "s1-load-cache" "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-update" "s3-release-advisory-lock" "s1-end" "s2-print-cluster" +permutation "s1-load-cache" "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-delete" "s3-release-advisory-lock" "s1-end" "s2-print-cluster" + +permutation "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s2-begin" "s1-non-blocking-shard-split" "s2-insert" "s2-end" "s2-print-cluster" "s3-release-advisory-lock" "s1-end" "s2-print-cluster" +permutation "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-update" "s3-release-advisory-lock" "s1-end" "s2-print-cluster" +permutation "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-delete" "s3-release-advisory-lock" "s1-end" "s2-print-cluster"