SHOW server_version \gset SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 \gset \if :server_version_ge_15 \else \q \endif -- MERGE command performs a join from data_source to target_table_name DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE; NOTICE: schema "merge_vcore_schema" does not exist, skipping --MERGE INTO target --USING source --WHEN NOT MATCHED --WHEN MATCHED AND --WHEN MATCHED CREATE SCHEMA merge_vcore_schema; SET search_path TO merge_vcore_schema; SET citus.shard_count TO 4; SET citus.next_shard_id TO 4000000; SET citus.explain_all_tasks TO true; SET citus.shard_replication_factor TO 1; SET citus.max_adaptive_executor_pool_size TO 1; SET client_min_messages = warning; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); ?column? --------------------------------------------------------------------- 1 (1 row) RESET client_min_messages; -- ****************************************** CASE 1 : Both are singleSharded*************************************** CREATE TABLE source ( id bigint, doc text ); CREATE TABLE target ( id bigint, doc text ); SELECT create_distributed_table('source', null, colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('target', null, colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); -- insert MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"a" : 1} 2 | {"a" : 2} (2 rows) -- update MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"b" : 1} 2 | {"b" : 1} (2 rows) -- Explain EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) WHEN MATCHED THEN DO NOTHING; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus MERGE INTO ...) MERGE INTO target method: pull to coordinator -> Custom Scan (Citus Adaptive) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000000 source (8 rows) DROP TABLE IF EXISTS source; DROP TABLE IF EXISTS target; -- *************** CASE 2 : source is single sharded and target is distributed ******************************* CREATE TABLE source ( id bigint, doc text ); CREATE TABLE target ( id bigint, doc text ); SELECT create_distributed_table('source', null, colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('target', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); -- insert MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"a" : 1} 2 | {"a" : 2} (2 rows) -- update MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"b" : 1} 2 | {"b" : 1} (2 rows) -- Explain EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) WHEN MATCHED THEN DO NOTHING; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus MERGE INTO ...) MERGE INTO target method: pull to coordinator -> Custom Scan (Citus Adaptive) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000002 source (8 rows) DROP TABLE IF EXISTS source; DROP TABLE IF EXISTS target; -- *************** CASE 3 : source is distributed and target is single sharded ******************************* CREATE TABLE source ( id bigint, doc text ); CREATE TABLE target ( id bigint, doc text ); SELECT create_distributed_table('source', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('target', null); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); -- insert MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"a" : 1} 2 | {"a" : 2} (2 rows) -- update MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"b" : 1} 2 | {"b" : 1} (2 rows) -- Explain EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) WHEN MATCHED THEN DO NOTHING; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus MERGE INTO ...) MERGE INTO target method: pull to coordinator -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000007 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000008 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000009 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000010 source (17 rows) DROP TABLE IF EXISTS source; DROP TABLE IF EXISTS target; -- *************** CASE 4 : both are distributed ******************************* CREATE TABLE source ( id bigint, doc text ); CREATE TABLE target ( id bigint, doc text ); SELECT create_distributed_table('source', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('target', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); -- insert MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"a" : 1} 2 | {"a" : 2} (2 rows) -- update MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"b" : 1} 2 | {"b" : 1} (2 rows) -- Explain EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) WHEN MATCHED THEN DO NOTHING; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus MERGE INTO ...) MERGE INTO target method: repartition -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000012 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000013 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000014 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000015 source (17 rows) DROP TABLE IF EXISTS source; DROP TABLE IF EXISTS target; -- *************** CASE 5 : both are distributed & colocated ******************************* CREATE TABLE source ( id bigint, doc text ); CREATE TABLE target ( id bigint, doc text ); SELECT create_distributed_table('source', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('target', 'id', colocate_with=>'source'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); -- insert MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"a" : 1} 2 | {"a" : 2} (2 rows) -- update MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"b" : 1} 2 | {"b" : 1} (2 rows) -- Explain EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) WHEN MATCHED THEN DO NOTHING; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus MERGE INTO ...) MERGE INTO target method: repartition -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000020 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000021 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000022 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000023 source (17 rows) DROP TABLE IF EXISTS source; DROP TABLE IF EXISTS target; -- *************** CASE 6 : both are singlesharded & colocated ******************************* CREATE TABLE source ( id bigint, doc text ); CREATE TABLE target ( id bigint, doc text ); SELECT create_distributed_table('source', null); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('target', null, colocate_with=>'source'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); -- insert MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"a" : 1} 2 | {"a" : 2} (2 rows) -- update MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"b" : 1} 2 | {"b" : 1} (2 rows) -- Explain EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) WHEN MATCHED THEN DO NOTHING; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus Adaptive) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression -> Merge on target_4000029 target -> Nested Loop -> Seq Scan on source_4000028 source -> Materialize -> Seq Scan on target_4000029 target Filter: ('2'::bigint = id) (11 rows) DROP TABLE IF EXISTS source; DROP TABLE IF EXISTS target; -- Bug Fix Test as part of this PR -- Test 1 CREATE TABLE source ( id int, age int, salary int ); CREATE TABLE target ( id int, age int, salary int ); SELECT create_distributed_table('source', 'id', colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('target', 'id', colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source (id, age, salary) VALUES (1,30, 100000); MERGE INTO ONLY target USING source ON (source.id = target.id) WHEN NOT MATCHED THEN INSERT (id, salary) VALUES (source.id, source.salary); SELECT * FROM TARGET; id | age | salary --------------------------------------------------------------------- 1 | | 100000 (1 row) DROP TABLE IF EXISTS source; DROP TABLE IF EXISTS target; -- Test 2 CREATE TABLE source ( id int, age int, salary int ); CREATE TABLE target ( id int, age int, salary int ); SELECT create_distributed_table('source', 'id', colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('target', 'id', colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source (id, age, salary) VALUES (1,30, 100000); MERGE INTO ONLY target USING source ON (source.id = target.id) WHEN NOT MATCHED THEN INSERT (salary, id) VALUES (source.salary, source.id); SELECT * FROM TARGET; id | age | salary --------------------------------------------------------------------- 1 | | 100000 (1 row) DROP TABLE IF EXISTS source; DROP TABLE IF EXISTS target; -- Test 3 CREATE TABLE source ( id int, age int, salary int ); CREATE TABLE target ( id int, age int, salary int ); SELECT create_distributed_table('source', 'id', colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('target', 'id', colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source (id, age, salary) VALUES (1,30, 100000); MERGE INTO ONLY target USING source ON (source.id = target.id) WHEN NOT MATCHED THEN INSERT (salary, id, age) VALUES (source.age, source.id, source.salary); SELECT * FROM TARGET; id | age | salary --------------------------------------------------------------------- 1 | 100000 | 30 (1 row) DROP TABLE IF EXISTS source; DROP TABLE IF EXISTS target; DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE;