mirror of https://github.com/citusdata/citus.git
Add tests for MERGE ... WHEN NOT MATCHED BY SOURCE
parent
32c058e448
commit
0c4c001ad7
|
@ -2195,6 +2195,472 @@ CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXEC
|
||||||
|
|
||||||
RESET citus.log_remote_commands;
|
RESET citus.log_remote_commands;
|
||||||
-- End of EXPLAIN MEMORY SERIALIZE tests
|
-- End of EXPLAIN MEMORY SERIALIZE tests
|
||||||
|
-- Add support for MERGE ... WHEN NOT MATCHED BY SOURCE.
|
||||||
|
-- Relevant PG commit:
|
||||||
|
-- https://github.com/postgres/postgres/commit/0294df2f1
|
||||||
|
SET citus.next_shard_id TO 25122024;
|
||||||
|
-- Citus local tables
|
||||||
|
CREATE TABLE citus_local_target (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE citus_local_source (sid integer, delta float);
|
||||||
|
SELECT citus_add_local_table_to_metadata('citus_local_target');
|
||||||
|
ERROR: duplicate key value violates unique constraint "pg_dist_shard_shardid_index"
|
||||||
|
DETAIL: Key (shardid)=(25122024) already exists.
|
||||||
|
SELECT citus_add_local_table_to_metadata('citus_local_source');
|
||||||
|
ERROR: duplicate key value violates unique constraint "pg_dist_shard_shardid_index"
|
||||||
|
DETAIL: Key (shardid)=(25122025) already exists.
|
||||||
|
INSERT INTO citus_local_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO citus_local_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||||
|
-- Citus distributed tables
|
||||||
|
CREATE TABLE citus_distributed_target (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE citus_distributed_source (sid integer, delta float);
|
||||||
|
SELECT create_distributed_table('citus_distributed_target', 'tid');
|
||||||
|
ERROR: duplicate key value violates unique constraint "pg_dist_shard_shardid_index"
|
||||||
|
DETAIL: Key (shardid)=(25122026) already exists.
|
||||||
|
SELECT create_distributed_table('citus_distributed_source', 'sid');
|
||||||
|
ERROR: duplicate key value violates unique constraint "pg_dist_shard_shardid_index"
|
||||||
|
DETAIL: Key (shardid)=(25122027) already exists.
|
||||||
|
INSERT INTO citus_distributed_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO citus_distributed_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||||
|
-- Citus reference tables
|
||||||
|
CREATE TABLE citus_reference_target (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE citus_reference_source (sid integer, delta float);
|
||||||
|
SELECT create_reference_table('citus_reference_target');
|
||||||
|
ERROR: duplicate key value violates unique constraint "pg_dist_shard_shardid_index"
|
||||||
|
DETAIL: Key (shardid)=(25122028) already exists.
|
||||||
|
SELECT create_reference_table('citus_reference_source');
|
||||||
|
ERROR: duplicate key value violates unique constraint "pg_dist_shard_shardid_index"
|
||||||
|
DETAIL: Key (shardid)=(25122029) already exists.
|
||||||
|
INSERT INTO citus_reference_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO citus_reference_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||||
|
-- Try all combinations of tables with two queries:
|
||||||
|
-- 1: Simple Merge
|
||||||
|
-- 2: Merge with a constant qual
|
||||||
|
--
|
||||||
|
-- Simple Merge expected output
|
||||||
|
-- tid | balance | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
-- 1 | 110 | initial updated by merge
|
||||||
|
-- 2 | 20 | inserted by merge
|
||||||
|
-- 3 | 330 | initial updated by merge
|
||||||
|
-- 4 | 40 | inserted by merge
|
||||||
|
-- 5 | 550 | initial updated by merge
|
||||||
|
-- 6 | 60 | inserted by merge
|
||||||
|
-- 7 | 770 | initial updated by merge
|
||||||
|
-- 8 | 80 | inserted by merge
|
||||||
|
-- 9 | 990 | initial updated by merge
|
||||||
|
-- 10 | 100 | inserted by merge
|
||||||
|
-- 11 | 1210 | initial updated by merge
|
||||||
|
-- 12 | 120 | inserted by merge
|
||||||
|
-- 13 | 1430 | initial updated by merge
|
||||||
|
-- 14 | 140 | inserted by merge
|
||||||
|
-- 15 | 1500 | initial not matched by source
|
||||||
|
-- (15 rows)
|
||||||
|
--
|
||||||
|
-- Merge with a constant qual expected output
|
||||||
|
-- tid | balance | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
-- 1 | 110 | initial updated by merge
|
||||||
|
-- 2 | 20 | inserted by merge
|
||||||
|
-- 3 | 300 | initial not matched by source
|
||||||
|
-- 3 | 30 | inserted by merge
|
||||||
|
-- 4 | 40 | inserted by merge
|
||||||
|
-- 5 | 500 | initial not matched by source
|
||||||
|
-- 5 | 50 | inserted by merge
|
||||||
|
-- 6 | 60 | inserted by merge
|
||||||
|
-- 7 | 700 | initial not matched by source
|
||||||
|
-- 7 | 70 | inserted by merge
|
||||||
|
-- 8 | 80 | inserted by merge
|
||||||
|
-- 9 | 900 | initial not matched by source
|
||||||
|
-- 9 | 90 | inserted by merge
|
||||||
|
-- 10 | 100 | inserted by merge
|
||||||
|
-- 11 | 1100 | initial not matched by source
|
||||||
|
-- 11 | 110 | inserted by merge
|
||||||
|
-- 12 | 120 | inserted by merge
|
||||||
|
-- 13 | 1300 | initial not matched by source
|
||||||
|
-- 13 | 130 | inserted by merge
|
||||||
|
-- 14 | 140 | inserted by merge
|
||||||
|
-- 15 | 1500 | initial not matched by source
|
||||||
|
-- (21 rows)
|
||||||
|
-- Local-Local
|
||||||
|
-- Let's also print the command here
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
SET citus.log_local_commands TO on;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_local_target ORDER BY tid, val;
|
||||||
|
tid | balance | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 110 | initial updated by merge
|
||||||
|
2 | 20 | inserted by merge
|
||||||
|
3 | 330 | initial updated by merge
|
||||||
|
4 | 40 | inserted by merge
|
||||||
|
5 | 550 | initial updated by merge
|
||||||
|
6 | 60 | inserted by merge
|
||||||
|
7 | 770 | initial updated by merge
|
||||||
|
8 | 80 | inserted by merge
|
||||||
|
9 | 990 | initial updated by merge
|
||||||
|
10 | 100 | inserted by merge
|
||||||
|
11 | 1210 | initial updated by merge
|
||||||
|
12 | 120 | inserted by merge
|
||||||
|
13 | 1430 | initial updated by merge
|
||||||
|
14 | 140 | inserted by merge
|
||||||
|
15 | 1500 | initial not matched by source
|
||||||
|
(15 rows)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_local_target ORDER BY tid, val;
|
||||||
|
tid | balance | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 110 | initial updated by merge
|
||||||
|
2 | 20 | inserted by merge
|
||||||
|
3 | 300 | initial not matched by source
|
||||||
|
3 | 30 | inserted by merge
|
||||||
|
4 | 40 | inserted by merge
|
||||||
|
5 | 500 | initial not matched by source
|
||||||
|
5 | 50 | inserted by merge
|
||||||
|
6 | 60 | inserted by merge
|
||||||
|
7 | 700 | initial not matched by source
|
||||||
|
7 | 70 | inserted by merge
|
||||||
|
8 | 80 | inserted by merge
|
||||||
|
9 | 900 | initial not matched by source
|
||||||
|
9 | 90 | inserted by merge
|
||||||
|
10 | 100 | inserted by merge
|
||||||
|
11 | 1100 | initial not matched by source
|
||||||
|
11 | 110 | inserted by merge
|
||||||
|
12 | 120 | inserted by merge
|
||||||
|
13 | 1300 | initial not matched by source
|
||||||
|
13 | 130 | inserted by merge
|
||||||
|
14 | 140 | inserted by merge
|
||||||
|
15 | 1500 | initial not matched by source
|
||||||
|
(21 rows)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Local-Reference
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_local_target ORDER BY tid, val;
|
||||||
|
tid | balance | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 110 | initial updated by merge
|
||||||
|
2 | 20 | inserted by merge
|
||||||
|
3 | 330 | initial updated by merge
|
||||||
|
4 | 40 | inserted by merge
|
||||||
|
5 | 550 | initial updated by merge
|
||||||
|
6 | 60 | inserted by merge
|
||||||
|
7 | 770 | initial updated by merge
|
||||||
|
8 | 80 | inserted by merge
|
||||||
|
9 | 990 | initial updated by merge
|
||||||
|
10 | 100 | inserted by merge
|
||||||
|
11 | 1210 | initial updated by merge
|
||||||
|
12 | 120 | inserted by merge
|
||||||
|
13 | 1430 | initial updated by merge
|
||||||
|
14 | 140 | inserted by merge
|
||||||
|
15 | 1500 | initial not matched by source
|
||||||
|
(15 rows)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_local_target ORDER BY tid, val;
|
||||||
|
tid | balance | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 110 | initial updated by merge
|
||||||
|
2 | 20 | inserted by merge
|
||||||
|
3 | 300 | initial not matched by source
|
||||||
|
3 | 30 | inserted by merge
|
||||||
|
4 | 40 | inserted by merge
|
||||||
|
5 | 500 | initial not matched by source
|
||||||
|
5 | 50 | inserted by merge
|
||||||
|
6 | 60 | inserted by merge
|
||||||
|
7 | 700 | initial not matched by source
|
||||||
|
7 | 70 | inserted by merge
|
||||||
|
8 | 80 | inserted by merge
|
||||||
|
9 | 900 | initial not matched by source
|
||||||
|
9 | 90 | inserted by merge
|
||||||
|
10 | 100 | inserted by merge
|
||||||
|
11 | 1100 | initial not matched by source
|
||||||
|
11 | 110 | inserted by merge
|
||||||
|
12 | 120 | inserted by merge
|
||||||
|
13 | 1300 | initial not matched by source
|
||||||
|
13 | 130 | inserted by merge
|
||||||
|
14 | 140 | inserted by merge
|
||||||
|
15 | 1500 | initial not matched by source
|
||||||
|
(21 rows)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Local-Distributed - Merge currently not supported, Feature in development.
|
||||||
|
-- try simple MERGE
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
-- Distributed-Local
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_distributed_target ORDER BY tid, val;
|
||||||
|
tid | balance | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 110 | initial updated by merge
|
||||||
|
2 | 20 | inserted by merge
|
||||||
|
3 | 330 | initial updated by merge
|
||||||
|
4 | 40 | inserted by merge
|
||||||
|
5 | 550 | initial updated by merge
|
||||||
|
6 | 60 | inserted by merge
|
||||||
|
7 | 770 | initial updated by merge
|
||||||
|
8 | 80 | inserted by merge
|
||||||
|
9 | 990 | initial updated by merge
|
||||||
|
10 | 100 | inserted by merge
|
||||||
|
11 | 1210 | initial updated by merge
|
||||||
|
12 | 120 | inserted by merge
|
||||||
|
13 | 1430 | initial updated by merge
|
||||||
|
14 | 140 | inserted by merge
|
||||||
|
15 | 1500 | initial not matched by source
|
||||||
|
(15 rows)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_distributed_target ORDER BY tid, val;
|
||||||
|
tid | balance | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 110 | initial updated by merge
|
||||||
|
2 | 20 | inserted by merge
|
||||||
|
3 | 300 | initial not matched by source
|
||||||
|
3 | 30 | inserted by merge
|
||||||
|
4 | 40 | inserted by merge
|
||||||
|
5 | 500 | initial not matched by source
|
||||||
|
5 | 50 | inserted by merge
|
||||||
|
6 | 60 | inserted by merge
|
||||||
|
7 | 700 | initial not matched by source
|
||||||
|
7 | 70 | inserted by merge
|
||||||
|
8 | 80 | inserted by merge
|
||||||
|
9 | 900 | initial not matched by source
|
||||||
|
9 | 90 | inserted by merge
|
||||||
|
10 | 100 | inserted by merge
|
||||||
|
11 | 1100 | initial not matched by source
|
||||||
|
11 | 110 | inserted by merge
|
||||||
|
12 | 120 | inserted by merge
|
||||||
|
13 | 1300 | initial not matched by source
|
||||||
|
13 | 130 | inserted by merge
|
||||||
|
14 | 140 | inserted by merge
|
||||||
|
15 | 1500 | initial not matched by source
|
||||||
|
(21 rows)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Distributed-Distributed
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_distributed_target ORDER BY tid, val;
|
||||||
|
tid | balance | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 110 | initial updated by merge
|
||||||
|
2 | 20 | inserted by merge
|
||||||
|
3 | 330 | initial updated by merge
|
||||||
|
4 | 40 | inserted by merge
|
||||||
|
5 | 550 | initial updated by merge
|
||||||
|
6 | 60 | inserted by merge
|
||||||
|
7 | 770 | initial updated by merge
|
||||||
|
8 | 80 | inserted by merge
|
||||||
|
9 | 990 | initial updated by merge
|
||||||
|
10 | 100 | inserted by merge
|
||||||
|
11 | 1210 | initial updated by merge
|
||||||
|
12 | 120 | inserted by merge
|
||||||
|
13 | 1430 | initial updated by merge
|
||||||
|
14 | 140 | inserted by merge
|
||||||
|
15 | 1500 | initial not matched by source
|
||||||
|
(15 rows)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_distributed_target ORDER BY tid, val;
|
||||||
|
tid | balance | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 110 | initial updated by merge
|
||||||
|
2 | 20 | inserted by merge
|
||||||
|
3 | 300 | initial not matched by source
|
||||||
|
3 | 30 | inserted by merge
|
||||||
|
4 | 40 | inserted by merge
|
||||||
|
5 | 500 | initial not matched by source
|
||||||
|
5 | 50 | inserted by merge
|
||||||
|
6 | 60 | inserted by merge
|
||||||
|
7 | 700 | initial not matched by source
|
||||||
|
7 | 70 | inserted by merge
|
||||||
|
8 | 80 | inserted by merge
|
||||||
|
9 | 900 | initial not matched by source
|
||||||
|
9 | 90 | inserted by merge
|
||||||
|
10 | 100 | inserted by merge
|
||||||
|
11 | 1100 | initial not matched by source
|
||||||
|
11 | 110 | inserted by merge
|
||||||
|
12 | 120 | inserted by merge
|
||||||
|
13 | 1300 | initial not matched by source
|
||||||
|
13 | 130 | inserted by merge
|
||||||
|
14 | 140 | inserted by merge
|
||||||
|
15 | 1500 | initial not matched by source
|
||||||
|
(21 rows)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Distributed-Reference
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_distributed_target ORDER BY tid, val;
|
||||||
|
tid | balance | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 110 | initial updated by merge
|
||||||
|
2 | 20 | inserted by merge
|
||||||
|
3 | 330 | initial updated by merge
|
||||||
|
4 | 40 | inserted by merge
|
||||||
|
5 | 550 | initial updated by merge
|
||||||
|
6 | 60 | inserted by merge
|
||||||
|
7 | 770 | initial updated by merge
|
||||||
|
8 | 80 | inserted by merge
|
||||||
|
9 | 990 | initial updated by merge
|
||||||
|
10 | 100 | inserted by merge
|
||||||
|
11 | 1210 | initial updated by merge
|
||||||
|
12 | 120 | inserted by merge
|
||||||
|
13 | 1430 | initial updated by merge
|
||||||
|
14 | 140 | inserted by merge
|
||||||
|
15 | 1500 | initial not matched by source
|
||||||
|
(15 rows)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_distributed_target ORDER BY tid, val;
|
||||||
|
tid | balance | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 110 | initial updated by merge
|
||||||
|
2 | 20 | inserted by merge
|
||||||
|
3 | 300 | initial not matched by source
|
||||||
|
3 | 30 | inserted by merge
|
||||||
|
4 | 40 | inserted by merge
|
||||||
|
5 | 500 | initial not matched by source
|
||||||
|
5 | 50 | inserted by merge
|
||||||
|
6 | 60 | inserted by merge
|
||||||
|
7 | 700 | initial not matched by source
|
||||||
|
7 | 70 | inserted by merge
|
||||||
|
8 | 80 | inserted by merge
|
||||||
|
9 | 900 | initial not matched by source
|
||||||
|
9 | 90 | inserted by merge
|
||||||
|
10 | 100 | inserted by merge
|
||||||
|
11 | 1100 | initial not matched by source
|
||||||
|
11 | 110 | inserted by merge
|
||||||
|
12 | 120 | inserted by merge
|
||||||
|
13 | 1300 | initial not matched by source
|
||||||
|
13 | 130 | inserted by merge
|
||||||
|
14 | 140 | inserted by merge
|
||||||
|
15 | 1500 | initial not matched by source
|
||||||
|
(21 rows)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Reference-N/A - Reference table as target is not allowed in Merge
|
||||||
|
-- try simple MERGE
|
||||||
|
MERGE INTO citus_reference_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
-- End of MERGE ... WHEN NOT MATCHED BY SOURCE tests
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA pg17 CASCADE;
|
DROP SCHEMA pg17 CASCADE;
|
||||||
|
|
|
@ -1080,6 +1080,256 @@ select public.explain_filter('explain (analyze,serialize) create temp table expl
|
||||||
|
|
||||||
RESET citus.log_remote_commands;
|
RESET citus.log_remote_commands;
|
||||||
-- End of EXPLAIN MEMORY SERIALIZE tests
|
-- End of EXPLAIN MEMORY SERIALIZE tests
|
||||||
|
-- Add support for MERGE ... WHEN NOT MATCHED BY SOURCE.
|
||||||
|
-- Relevant PG commit:
|
||||||
|
-- https://github.com/postgres/postgres/commit/0294df2f1
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 25122024;
|
||||||
|
|
||||||
|
-- Citus local tables
|
||||||
|
CREATE TABLE citus_local_target (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE citus_local_source (sid integer, delta float);
|
||||||
|
SELECT citus_add_local_table_to_metadata('citus_local_target');
|
||||||
|
SELECT citus_add_local_table_to_metadata('citus_local_source');
|
||||||
|
INSERT INTO citus_local_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO citus_local_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||||
|
-- Citus distributed tables
|
||||||
|
CREATE TABLE citus_distributed_target (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE citus_distributed_source (sid integer, delta float);
|
||||||
|
SELECT create_distributed_table('citus_distributed_target', 'tid');
|
||||||
|
SELECT create_distributed_table('citus_distributed_source', 'sid');
|
||||||
|
INSERT INTO citus_distributed_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO citus_distributed_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||||
|
-- Citus reference tables
|
||||||
|
CREATE TABLE citus_reference_target (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE citus_reference_source (sid integer, delta float);
|
||||||
|
SELECT create_reference_table('citus_reference_target');
|
||||||
|
SELECT create_reference_table('citus_reference_source');
|
||||||
|
INSERT INTO citus_reference_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO citus_reference_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||||
|
|
||||||
|
-- Try all combinations of tables with two queries:
|
||||||
|
-- 1: Simple Merge
|
||||||
|
-- 2: Merge with a constant qual
|
||||||
|
--
|
||||||
|
-- Simple Merge expected output
|
||||||
|
-- tid | balance | val
|
||||||
|
-----+---------+-------------------------------
|
||||||
|
-- 1 | 110 | initial updated by merge
|
||||||
|
-- 2 | 20 | inserted by merge
|
||||||
|
-- 3 | 330 | initial updated by merge
|
||||||
|
-- 4 | 40 | inserted by merge
|
||||||
|
-- 5 | 550 | initial updated by merge
|
||||||
|
-- 6 | 60 | inserted by merge
|
||||||
|
-- 7 | 770 | initial updated by merge
|
||||||
|
-- 8 | 80 | inserted by merge
|
||||||
|
-- 9 | 990 | initial updated by merge
|
||||||
|
-- 10 | 100 | inserted by merge
|
||||||
|
-- 11 | 1210 | initial updated by merge
|
||||||
|
-- 12 | 120 | inserted by merge
|
||||||
|
-- 13 | 1430 | initial updated by merge
|
||||||
|
-- 14 | 140 | inserted by merge
|
||||||
|
-- 15 | 1500 | initial not matched by source
|
||||||
|
-- (15 rows)
|
||||||
|
--
|
||||||
|
-- Merge with a constant qual expected output
|
||||||
|
-- tid | balance | val
|
||||||
|
-----+---------+-------------------------------
|
||||||
|
-- 1 | 110 | initial updated by merge
|
||||||
|
-- 2 | 20 | inserted by merge
|
||||||
|
-- 3 | 300 | initial not matched by source
|
||||||
|
-- 3 | 30 | inserted by merge
|
||||||
|
-- 4 | 40 | inserted by merge
|
||||||
|
-- 5 | 500 | initial not matched by source
|
||||||
|
-- 5 | 50 | inserted by merge
|
||||||
|
-- 6 | 60 | inserted by merge
|
||||||
|
-- 7 | 700 | initial not matched by source
|
||||||
|
-- 7 | 70 | inserted by merge
|
||||||
|
-- 8 | 80 | inserted by merge
|
||||||
|
-- 9 | 900 | initial not matched by source
|
||||||
|
-- 9 | 90 | inserted by merge
|
||||||
|
-- 10 | 100 | inserted by merge
|
||||||
|
-- 11 | 1100 | initial not matched by source
|
||||||
|
-- 11 | 110 | inserted by merge
|
||||||
|
-- 12 | 120 | inserted by merge
|
||||||
|
-- 13 | 1300 | initial not matched by source
|
||||||
|
-- 13 | 130 | inserted by merge
|
||||||
|
-- 14 | 140 | inserted by merge
|
||||||
|
-- 15 | 1500 | initial not matched by source
|
||||||
|
-- (21 rows)
|
||||||
|
|
||||||
|
-- Local-Local
|
||||||
|
-- Let's also print the command here
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
SET citus.log_local_commands TO on;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_local_target ORDER BY tid, val;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_local_target ORDER BY tid, val;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Local-Reference
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_local_target ORDER BY tid, val;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_local_target ORDER BY tid, val;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Local-Distributed - Merge currently not supported, Feature in development.
|
||||||
|
-- try simple MERGE
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
|
||||||
|
-- Distributed-Local
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_distributed_target ORDER BY tid, val;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_distributed_target ORDER BY tid, val;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Distributed-Distributed
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_distributed_target ORDER BY tid, val;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_distributed_target ORDER BY tid, val;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Distributed-Reference
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_distributed_target ORDER BY tid, val;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM citus_distributed_target ORDER BY tid, val;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Reference-N/A - Reference table as target is not allowed in Merge
|
||||||
|
-- try simple MERGE
|
||||||
|
MERGE INTO citus_reference_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
|
||||||
|
-- End of MERGE ... WHEN NOT MATCHED BY SOURCE tests
|
||||||
|
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
|
|
Loading…
Reference in New Issue