mirror of https://github.com/citusdata/citus.git
Add appropriate locks for MERGE to run in parallel
parent
85b8a2c7a1
commit
37500806d6
|
@ -4573,7 +4573,8 @@ RowModifyLevelForQuery(Query *query)
|
|||
}
|
||||
|
||||
if (commandType == CMD_UPDATE ||
|
||||
commandType == CMD_DELETE)
|
||||
commandType == CMD_DELETE ||
|
||||
commandType == CMD_MERGE)
|
||||
{
|
||||
return ROW_MODIFY_NONCOMMUTATIVE;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,147 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s1-upd-ins s2-result s1-commit s2-result
|
||||
step s1-begin: BEGIN;
|
||||
step s1-upd-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2);
|
||||
|
||||
step s2-result: SELECT * FROM prept ORDER BY 1;
|
||||
t1|t2
|
||||
---------------------------------------------------------------------
|
||||
100| 0
|
||||
(1 row)
|
||||
|
||||
step s1-commit: COMMIT;
|
||||
step s2-result: SELECT * FROM prept ORDER BY 1;
|
||||
t1|t2
|
||||
---------------------------------------------------------------------
|
||||
100| 1
|
||||
200| 0
|
||||
(2 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-upd-ins s2-begin s2-upd-del s1-commit s2-commit s2-result
|
||||
step s1-begin: BEGIN;
|
||||
step s1-upd-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2);
|
||||
|
||||
step s2-begin: BEGIN;
|
||||
step s2-upd-del: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED AND prept.t2 = 0 THEN DELETE
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1;
|
||||
<waiting ...>
|
||||
step s1-commit: COMMIT;
|
||||
step s2-upd-del: <... completed>
|
||||
step s2-commit: COMMIT;
|
||||
step s2-result: SELECT * FROM prept ORDER BY 1;
|
||||
t1|t2
|
||||
---------------------------------------------------------------------
|
||||
100| 2
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: s2-begin s2-upd-del s1-begin s1-upd-ins s2-commit s1-commit s2-result
|
||||
step s2-begin: BEGIN;
|
||||
step s2-upd-del: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED AND prept.t2 = 0 THEN DELETE
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1;
|
||||
|
||||
step s1-begin: BEGIN;
|
||||
step s1-upd-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2);
|
||||
<waiting ...>
|
||||
step s2-commit: COMMIT;
|
||||
step s1-upd-ins: <... completed>
|
||||
step s1-commit: COMMIT;
|
||||
step s2-result: SELECT * FROM prept ORDER BY 1;
|
||||
t1|t2
|
||||
---------------------------------------------------------------------
|
||||
100| 0
|
||||
200| 0
|
||||
(2 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-upd-ins s2-begin s2-upd s1-commit s2-commit s2-result
|
||||
step s1-begin: BEGIN;
|
||||
step s1-upd-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2);
|
||||
|
||||
step s2-begin: BEGIN;
|
||||
step s2-upd: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1;
|
||||
<waiting ...>
|
||||
step s1-commit: COMMIT;
|
||||
step s2-upd: <... completed>
|
||||
step s2-commit: COMMIT;
|
||||
step s2-result: SELECT * FROM prept ORDER BY 1;
|
||||
t1|t2
|
||||
---------------------------------------------------------------------
|
||||
100| 2
|
||||
200| 1
|
||||
(2 rows)
|
||||
|
||||
|
||||
starting permutation: s2-begin s2-ins s1-begin s1-del s2-upd s2-result s2-commit s1-commit s2-result
|
||||
step s2-begin: BEGIN;
|
||||
step s2-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2);
|
||||
|
||||
step s1-begin: BEGIN;
|
||||
step s1-del: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN DELETE;
|
||||
<waiting ...>
|
||||
step s2-upd: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1;
|
||||
|
||||
step s2-result: SELECT * FROM prept ORDER BY 1;
|
||||
t1|t2
|
||||
---------------------------------------------------------------------
|
||||
100| 1
|
||||
200| 1
|
||||
(2 rows)
|
||||
|
||||
step s2-commit: COMMIT;
|
||||
step s1-del: <... completed>
|
||||
step s1-commit: COMMIT;
|
||||
step s2-result: SELECT * FROM prept ORDER BY 1;
|
||||
t1|t2
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-del-ins s2-begin s2-upd s1-result s1-ins s1-commit s2-upd s2-commit s2-result
|
||||
step s1-begin: BEGIN;
|
||||
step s1-del-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN DELETE
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2);
|
||||
|
||||
step s2-begin: BEGIN;
|
||||
step s2-upd: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1;
|
||||
<waiting ...>
|
||||
step s1-result: SELECT * FROM prept ORDER BY 1;
|
||||
t1|t2
|
||||
---------------------------------------------------------------------
|
||||
200| 0
|
||||
(1 row)
|
||||
|
||||
step s1-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2);
|
||||
|
||||
step s1-commit: COMMIT;
|
||||
step s2-upd: <... completed>
|
||||
step s2-upd: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1;
|
||||
|
||||
step s2-commit: COMMIT;
|
||||
step s2-result: SELECT * FROM prept ORDER BY 1;
|
||||
t1|t2
|
||||
---------------------------------------------------------------------
|
||||
100| 2
|
||||
200| 2
|
||||
(2 rows)
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s1-upd-ins s2-result s1-commit s2-result
|
||||
setup failed: ERROR: MERGE is not supported on PG versions below 15
|
||||
CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE
|
|
@ -0,0 +1,26 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s1-upd-ins s2-begin s2-update s1-commit s2-commit s1-result s2-result
|
||||
step s1-begin: BEGIN;
|
||||
step s1-upd-ins: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2);
|
||||
step s2-begin: BEGIN;
|
||||
step s2-update: UPDATE preps SET s2 = s2 + 1; <waiting ...>
|
||||
step s1-commit: COMMIT;
|
||||
step s2-update: <... completed>
|
||||
step s2-commit: COMMIT;
|
||||
step s1-result: SELECT * FROM preps ORDER BY 1;
|
||||
s1|s2
|
||||
---------------------------------------------------------------------
|
||||
100| 1
|
||||
200| 1
|
||||
(2 rows)
|
||||
|
||||
step s2-result: SELECT * FROM prept ORDER BY 1;
|
||||
t1|t2
|
||||
---------------------------------------------------------------------
|
||||
100| 1
|
||||
200| 0
|
||||
(2 rows)
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s1-upd-ins s2-begin s2-update s1-commit s2-commit s1-result s2-result
|
||||
setup failed: ERROR: MERGE is not supported on PG versions below 15
|
||||
CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE
|
|
@ -107,4 +107,10 @@ test: isolation_multiuser_locking
|
|||
test: isolation_acquire_distributed_locks
|
||||
test: isolation_concurrent_move_create_table
|
||||
|
||||
# MERGE
|
||||
test: isolation_merge
|
||||
test: isolation_merge_replicated
|
||||
|
||||
|
||||
# Note: Always keep this test at the end
|
||||
test: isolation_check_mx
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
//
|
||||
// How we organize this isolation test spec, is explained at README.md file in this directory.
|
||||
//
|
||||
|
||||
// create distributed tables to test behavior of MERGE in concurrent operations
|
||||
setup
|
||||
{
|
||||
DO
|
||||
$do$
|
||||
DECLARE ver int;
|
||||
BEGIN
|
||||
SELECT substring(version(), '\d+')::int into ver;
|
||||
IF (ver < 15)
|
||||
THEN
|
||||
RAISE EXCEPTION 'MERGE is not supported on PG versions below 15';
|
||||
END IF;
|
||||
END
|
||||
$do$;
|
||||
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT 1 FROM master_add_node('localhost', 57637);
|
||||
SELECT 1 FROM master_add_node('localhost', 57638);
|
||||
|
||||
CREATE TABLE prept(t1 int, t2 int);
|
||||
CREATE TABLE preps(s1 int, s2 int);
|
||||
SELECT create_distributed_table('prept', 't1'), create_distributed_table('preps', 's1');
|
||||
INSERT INTO prept VALUES(100, 0);
|
||||
INSERT INTO preps VALUES(100, 0);
|
||||
INSERT INTO preps VALUES(200, 0);
|
||||
}
|
||||
|
||||
// drop distributed tables
|
||||
teardown
|
||||
{
|
||||
DROP TABLE IF EXISTS prept CASCADE;
|
||||
DROP TABLE IF EXISTS preps CASCADE;
|
||||
}
|
||||
|
||||
// session 1
|
||||
session "s1"
|
||||
|
||||
step "s1-begin" { BEGIN; }
|
||||
|
||||
step "s1-upd-ins" { MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2);
|
||||
}
|
||||
|
||||
step "s1-del-ins" { MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN DELETE
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2);
|
||||
}
|
||||
|
||||
step "s1-del" { MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN DELETE;
|
||||
}
|
||||
|
||||
step "s1-ins" { MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2);
|
||||
}
|
||||
|
||||
step "s1-commit" { COMMIT; }
|
||||
step "s1-result" { SELECT * FROM prept ORDER BY 1; }
|
||||
|
||||
// session 2
|
||||
session "s2"
|
||||
|
||||
step "s2-begin" { BEGIN; }
|
||||
|
||||
step "s2-upd-del" { MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED AND prept.t2 = 0 THEN DELETE
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1;
|
||||
}
|
||||
|
||||
step "s2-upd" { MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1;
|
||||
}
|
||||
|
||||
step "s2-ins" { MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2);
|
||||
}
|
||||
|
||||
step "s2-commit" { COMMIT; }
|
||||
step "s2-result" { SELECT * FROM prept ORDER BY 1; }
|
||||
|
||||
// permutations - MERGE vs MERGE
|
||||
permutation "s1-begin" "s1-upd-ins" "s2-result" "s1-commit" "s2-result"
|
||||
permutation "s1-begin" "s1-upd-ins" "s2-begin" "s2-upd-del" "s1-commit" "s2-commit" "s2-result"
|
||||
permutation "s2-begin" "s2-upd-del" "s1-begin" "s1-upd-ins" "s2-commit" "s1-commit" "s2-result"
|
||||
permutation "s1-begin" "s1-upd-ins" "s2-begin" "s2-upd" "s1-commit" "s2-commit" "s2-result"
|
||||
permutation "s2-begin" "s2-ins" "s1-begin" "s1-del" "s2-upd" "s2-result" "s2-commit" "s1-commit" "s2-result"
|
||||
permutation "s1-begin" "s1-del-ins" "s2-begin" "s2-upd" "s1-result" "s1-ins" "s1-commit" "s2-upd" "s2-commit" "s2-result"
|
|
@ -0,0 +1,58 @@
|
|||
//
|
||||
// How we organize this isolation test spec, is explained at README.md file in this directory.
|
||||
//
|
||||
|
||||
// create distributed tables to test behavior of MERGE in concurrent operations
|
||||
setup
|
||||
{
|
||||
DO
|
||||
$do$
|
||||
DECLARE ver int;
|
||||
BEGIN
|
||||
SELECT substring(version(), '\d+')::int into ver;
|
||||
IF (ver < 15)
|
||||
THEN
|
||||
RAISE EXCEPTION 'MERGE is not supported on PG versions below 15';
|
||||
END IF;
|
||||
END
|
||||
$do$;
|
||||
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
SELECT 1 FROM master_add_node('localhost', 57637);
|
||||
SELECT 1 FROM master_add_node('localhost', 57638);
|
||||
|
||||
CREATE TABLE prept(t1 int, t2 int);
|
||||
CREATE TABLE preps(s1 int, s2 int);
|
||||
SELECT create_distributed_table('prept', 't1'), create_distributed_table('preps', 's1');
|
||||
INSERT INTO prept VALUES(100, 0);
|
||||
INSERT INTO preps VALUES(100, 0);
|
||||
INSERT INTO preps VALUES(200, 0);
|
||||
}
|
||||
|
||||
// drop distributed tables
|
||||
teardown
|
||||
{
|
||||
DROP TABLE IF EXISTS prept CASCADE;
|
||||
DROP TABLE IF EXISTS preps CASCADE;
|
||||
}
|
||||
|
||||
// session 1
|
||||
session "s1"
|
||||
|
||||
step "s1-begin" { BEGIN; }
|
||||
step "s1-upd-ins" { MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); }
|
||||
step "s1-result" { SELECT * FROM preps ORDER BY 1; }
|
||||
step "s1-commit" { COMMIT; }
|
||||
|
||||
// session 2
|
||||
session "s2"
|
||||
|
||||
step "s2-begin" { BEGIN; }
|
||||
step "s2-update" { UPDATE preps SET s2 = s2 + 1; }
|
||||
step "s2-commit" { COMMIT; }
|
||||
step "s2-result" { SELECT * FROM prept ORDER BY 1; }
|
||||
|
||||
// permutations - MERGE vs UPDATE (on source)
|
||||
permutation "s1-begin" "s1-upd-ins" "s2-begin" "s2-update" "s1-commit" "s2-commit" "s1-result" "s2-result"
|
Loading…
Reference in New Issue