diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index be6caf0e2..f488a1cd5 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -4573,7 +4573,8 @@ RowModifyLevelForQuery(Query *query) } if (commandType == CMD_UPDATE || - commandType == CMD_DELETE) + commandType == CMD_DELETE || + commandType == CMD_MERGE) { return ROW_MODIFY_NONCOMMUTATIVE; } diff --git a/src/test/regress/expected/isolation_merge.out b/src/test/regress/expected/isolation_merge.out new file mode 100644 index 000000000..d78c46c64 --- /dev/null +++ b/src/test/regress/expected/isolation_merge.out @@ -0,0 +1,147 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s1-upd-ins s2-result s1-commit s2-result +step s1-begin: BEGIN; +step s1-upd-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1 + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); + +step s2-result: SELECT * FROM prept ORDER BY 1; + t1|t2 +--------------------------------------------------------------------- +100| 0 +(1 row) + +step s1-commit: COMMIT; +step s2-result: SELECT * FROM prept ORDER BY 1; + t1|t2 +--------------------------------------------------------------------- +100| 1 +200| 0 +(2 rows) + + +starting permutation: s1-begin s1-upd-ins s2-begin s2-upd-del s1-commit s2-commit s2-result +step s1-begin: BEGIN; +step s1-upd-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1 + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); + +step s2-begin: BEGIN; +step s2-upd-del: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED AND prept.t2 = 0 THEN DELETE + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1; + +step s1-commit: COMMIT; +step s2-upd-del: <... completed> +step s2-commit: COMMIT; +step s2-result: SELECT * FROM prept ORDER BY 1; + t1|t2 +--------------------------------------------------------------------- +100| 2 +(1 row) + + +starting permutation: s2-begin s2-upd-del s1-begin s1-upd-ins s2-commit s1-commit s2-result +step s2-begin: BEGIN; +step s2-upd-del: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED AND prept.t2 = 0 THEN DELETE + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1; + +step s1-begin: BEGIN; +step s1-upd-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1 + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); + +step s2-commit: COMMIT; +step s1-upd-ins: <... completed> +step s1-commit: COMMIT; +step s2-result: SELECT * FROM prept ORDER BY 1; + t1|t2 +--------------------------------------------------------------------- +100| 0 +200| 0 +(2 rows) + + +starting permutation: s1-begin s1-upd-ins s2-begin s2-upd s1-commit s2-commit s2-result +step s1-begin: BEGIN; +step s1-upd-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1 + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); + +step s2-begin: BEGIN; +step s2-upd: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1; + +step s1-commit: COMMIT; +step s2-upd: <... completed> +step s2-commit: COMMIT; +step s2-result: SELECT * FROM prept ORDER BY 1; + t1|t2 +--------------------------------------------------------------------- +100| 2 +200| 1 +(2 rows) + + +starting permutation: s2-begin s2-ins s1-begin s1-del s2-upd s2-result s2-commit s1-commit s2-result +step s2-begin: BEGIN; +step s2-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); + +step s1-begin: BEGIN; +step s1-del: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN DELETE; + +step s2-upd: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1; + +step s2-result: SELECT * FROM prept ORDER BY 1; + t1|t2 +--------------------------------------------------------------------- +100| 1 +200| 1 +(2 rows) + +step s2-commit: COMMIT; +step s1-del: <... completed> +step s1-commit: COMMIT; +step s2-result: SELECT * FROM prept ORDER BY 1; +t1|t2 +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s1-begin s1-del-ins s2-begin s2-upd s1-result s1-ins s1-commit s2-upd s2-commit s2-result +step s1-begin: BEGIN; +step s1-del-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN DELETE + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); + +step s2-begin: BEGIN; +step s2-upd: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1; + +step s1-result: SELECT * FROM prept ORDER BY 1; + t1|t2 +--------------------------------------------------------------------- +200| 0 +(1 row) + +step s1-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); + +step s1-commit: COMMIT; +step s2-upd: <... completed> +step s2-upd: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1; + +step s2-commit: COMMIT; +step s2-result: SELECT * FROM prept ORDER BY 1; + t1|t2 +--------------------------------------------------------------------- +100| 2 +200| 2 +(2 rows) + diff --git a/src/test/regress/expected/isolation_merge_0.out b/src/test/regress/expected/isolation_merge_0.out new file mode 100644 index 000000000..3b43a25e6 --- /dev/null +++ b/src/test/regress/expected/isolation_merge_0.out @@ -0,0 +1,5 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s1-upd-ins s2-result s1-commit s2-result +setup failed: ERROR: MERGE is not supported on PG versions below 15 +CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE diff --git a/src/test/regress/expected/isolation_merge_replicated.out b/src/test/regress/expected/isolation_merge_replicated.out new file mode 100644 index 000000000..e7e8b36ba --- /dev/null +++ b/src/test/regress/expected/isolation_merge_replicated.out @@ -0,0 +1,26 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s1-upd-ins s2-begin s2-update s1-commit s2-commit s1-result s2-result +step s1-begin: BEGIN; +step s1-upd-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1 + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); +step s2-begin: BEGIN; +step s2-update: UPDATE preps SET s2 = s2 + 1; +step s1-commit: COMMIT; +step s2-update: <... completed> +step s2-commit: COMMIT; +step s1-result: SELECT * FROM preps ORDER BY 1; + s1|s2 +--------------------------------------------------------------------- +100| 1 +200| 1 +(2 rows) + +step s2-result: SELECT * FROM prept ORDER BY 1; + t1|t2 +--------------------------------------------------------------------- +100| 1 +200| 0 +(2 rows) + diff --git a/src/test/regress/expected/isolation_merge_replicated_0.out b/src/test/regress/expected/isolation_merge_replicated_0.out new file mode 100644 index 000000000..51161dfb7 --- /dev/null +++ b/src/test/regress/expected/isolation_merge_replicated_0.out @@ -0,0 +1,5 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s1-upd-ins s2-begin s2-update s1-commit s2-commit s1-result s2-result +setup failed: ERROR: MERGE is not supported on PG versions below 15 +CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 90d1463ad..ca893ba8d 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -107,4 +107,10 @@ test: isolation_multiuser_locking test: isolation_acquire_distributed_locks test: isolation_concurrent_move_create_table +# MERGE +test: isolation_merge +test: isolation_merge_replicated + + +# Note: Always keep this test at the end test: isolation_check_mx diff --git a/src/test/regress/spec/isolation_merge.spec b/src/test/regress/spec/isolation_merge.spec new file mode 100644 index 000000000..042ce9155 --- /dev/null +++ b/src/test/regress/spec/isolation_merge.spec @@ -0,0 +1,92 @@ +// +// How we organize this isolation test spec, is explained at README.md file in this directory. +// + +// create distributed tables to test behavior of MERGE in concurrent operations +setup +{ + DO + $do$ + DECLARE ver int; + BEGIN + SELECT substring(version(), '\d+')::int into ver; + IF (ver < 15) + THEN + RAISE EXCEPTION 'MERGE is not supported on PG versions below 15'; + END IF; + END + $do$; + + SET citus.shard_replication_factor TO 1; + SELECT 1 FROM master_add_node('localhost', 57637); + SELECT 1 FROM master_add_node('localhost', 57638); + + CREATE TABLE prept(t1 int, t2 int); + CREATE TABLE preps(s1 int, s2 int); + SELECT create_distributed_table('prept', 't1'), create_distributed_table('preps', 's1'); + INSERT INTO prept VALUES(100, 0); + INSERT INTO preps VALUES(100, 0); + INSERT INTO preps VALUES(200, 0); +} + +// drop distributed tables +teardown +{ + DROP TABLE IF EXISTS prept CASCADE; + DROP TABLE IF EXISTS preps CASCADE; +} + +// session 1 +session "s1" + +step "s1-begin" { BEGIN; } + +step "s1-upd-ins" { MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1 + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); + } + +step "s1-del-ins" { MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN DELETE + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); + } + +step "s1-del" { MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN DELETE; + } + +step "s1-ins" { MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); + } + +step "s1-commit" { COMMIT; } +step "s1-result" { SELECT * FROM prept ORDER BY 1; } + +// session 2 +session "s2" + +step "s2-begin" { BEGIN; } + +step "s2-upd-del" { MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED AND prept.t2 = 0 THEN DELETE + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1; + } + +step "s2-upd" { MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1; + } + +step "s2-ins" { MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); + } + +step "s2-commit" { COMMIT; } +step "s2-result" { SELECT * FROM prept ORDER BY 1; } + +// permutations - MERGE vs MERGE +permutation "s1-begin" "s1-upd-ins" "s2-result" "s1-commit" "s2-result" +permutation "s1-begin" "s1-upd-ins" "s2-begin" "s2-upd-del" "s1-commit" "s2-commit" "s2-result" +permutation "s2-begin" "s2-upd-del" "s1-begin" "s1-upd-ins" "s2-commit" "s1-commit" "s2-result" +permutation "s1-begin" "s1-upd-ins" "s2-begin" "s2-upd" "s1-commit" "s2-commit" "s2-result" +permutation "s2-begin" "s2-ins" "s1-begin" "s1-del" "s2-upd" "s2-result" "s2-commit" "s1-commit" "s2-result" +permutation "s1-begin" "s1-del-ins" "s2-begin" "s2-upd" "s1-result" "s1-ins" "s1-commit" "s2-upd" "s2-commit" "s2-result" diff --git a/src/test/regress/spec/isolation_merge_replicated.spec b/src/test/regress/spec/isolation_merge_replicated.spec new file mode 100644 index 000000000..a586bdfa1 --- /dev/null +++ b/src/test/regress/spec/isolation_merge_replicated.spec @@ -0,0 +1,58 @@ +// +// How we organize this isolation test spec, is explained at README.md file in this directory. +// + +// create distributed tables to test behavior of MERGE in concurrent operations +setup +{ + DO + $do$ + DECLARE ver int; + BEGIN + SELECT substring(version(), '\d+')::int into ver; + IF (ver < 15) + THEN + RAISE EXCEPTION 'MERGE is not supported on PG versions below 15'; + END IF; + END + $do$; + + SET citus.shard_replication_factor TO 2; + SELECT 1 FROM master_add_node('localhost', 57637); + SELECT 1 FROM master_add_node('localhost', 57638); + + CREATE TABLE prept(t1 int, t2 int); + CREATE TABLE preps(s1 int, s2 int); + SELECT create_distributed_table('prept', 't1'), create_distributed_table('preps', 's1'); + INSERT INTO prept VALUES(100, 0); + INSERT INTO preps VALUES(100, 0); + INSERT INTO preps VALUES(200, 0); +} + +// drop distributed tables +teardown +{ + DROP TABLE IF EXISTS prept CASCADE; + DROP TABLE IF EXISTS preps CASCADE; +} + +// session 1 +session "s1" + +step "s1-begin" { BEGIN; } +step "s1-upd-ins" { MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1 + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); } +step "s1-result" { SELECT * FROM preps ORDER BY 1; } +step "s1-commit" { COMMIT; } + +// session 2 +session "s2" + +step "s2-begin" { BEGIN; } +step "s2-update" { UPDATE preps SET s2 = s2 + 1; } +step "s2-commit" { COMMIT; } +step "s2-result" { SELECT * FROM prept ORDER BY 1; } + +// permutations - MERGE vs UPDATE (on source) +permutation "s1-begin" "s1-upd-ins" "s2-begin" "s2-update" "s1-commit" "s2-commit" "s1-result" "s2-result"