SHOW server_version \gset SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 \gset \if :server_version_ge_15 \else \q \endif -- MERGE command performs a join from data_source to target_table_name DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE; NOTICE: schema "merge_vcore_schema" does not exist, skipping --MERGE INTO target --USING source --WHEN NOT MATCHED --WHEN MATCHED AND --WHEN MATCHED CREATE SCHEMA merge_vcore_schema; SET search_path TO merge_vcore_schema; SET citus.shard_count TO 4; SET citus.next_shard_id TO 4000000; SET citus.explain_all_tasks TO true; SET citus.shard_replication_factor TO 1; SET citus.max_adaptive_executor_pool_size TO 1; SET client_min_messages = warning; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); ?column? --------------------------------------------------------------------- 1 (1 row) RESET client_min_messages; -- ****************************************** CASE 1 : Both are singleSharded*************************************** CREATE TABLE source ( id bigint, doc text ); CREATE TABLE target ( id bigint, doc text ); SELECT create_distributed_table('source', null, colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('target', null, colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); -- insert MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"a" : 1} 2 | {"a" : 2} (2 rows) -- update MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"b" : 1} 2 | {"b" : 1} (2 rows) -- Explain EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) WHEN MATCHED THEN DO NOTHING; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus MERGE INTO ...) MERGE INTO target method: pull to coordinator -> Custom Scan (Citus Adaptive) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000000 source (8 rows) DROP TABLE IF EXISTS source; DROP TABLE IF EXISTS target; -- *************** CASE 2 : source is single sharded and target is distributed ******************************* CREATE TABLE source ( id bigint, doc text ); CREATE TABLE target ( id bigint, doc text ); SELECT create_distributed_table('source', null, colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('target', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); -- insert MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"a" : 1} 2 | {"a" : 2} (2 rows) -- update MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"b" : 1} 2 | {"b" : 1} (2 rows) -- Explain EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) WHEN MATCHED THEN DO NOTHING; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus MERGE INTO ...) MERGE INTO target method: pull to coordinator -> Custom Scan (Citus Adaptive) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000002 source (8 rows) DROP TABLE IF EXISTS source; DROP TABLE IF EXISTS target; -- *************** CASE 3 : source is distributed and target is single sharded ******************************* CREATE TABLE source ( id bigint, doc text ); CREATE TABLE target ( id bigint, doc text ); SELECT create_distributed_table('source', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('target', null); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); -- insert MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"a" : 1} 2 | {"a" : 2} (2 rows) -- update MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"b" : 1} 2 | {"b" : 1} (2 rows) -- Explain EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) WHEN MATCHED THEN DO NOTHING; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus MERGE INTO ...) MERGE INTO target method: pull to coordinator -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000007 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000008 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000009 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000010 source (17 rows) DROP TABLE IF EXISTS source; DROP TABLE IF EXISTS target; -- *************** CASE 4 : both are distributed ******************************* CREATE TABLE source ( id bigint, doc text ); CREATE TABLE target ( id bigint, doc text ); SELECT create_distributed_table('source', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('target', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); -- insert MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"a" : 1} 2 | {"a" : 2} (2 rows) -- update MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"b" : 1} 2 | {"b" : 1} (2 rows) -- Explain EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) WHEN MATCHED THEN DO NOTHING; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus MERGE INTO ...) MERGE INTO target method: repartition -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000012 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000013 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000014 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000015 source (17 rows) DROP TABLE IF EXISTS source; DROP TABLE IF EXISTS target; -- *************** CASE 5 : both are distributed & colocated ******************************* CREATE TABLE source ( id bigint, doc text ); CREATE TABLE target ( id bigint, doc text ); SELECT create_distributed_table('source', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('target', 'id', colocate_with=>'source'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); -- insert MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"a" : 1} 2 | {"a" : 2} (2 rows) -- update MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"b" : 1} 2 | {"b" : 1} (2 rows) -- Explain EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) WHEN MATCHED THEN DO NOTHING; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus MERGE INTO ...) MERGE INTO target method: repartition -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000020 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000021 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000022 source -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_4000023 source (17 rows) DROP TABLE IF EXISTS source; DROP TABLE IF EXISTS target; -- *************** CASE 6 : both are singlesharded & colocated ******************************* CREATE TABLE source ( id bigint, doc text ); CREATE TABLE target ( id bigint, doc text ); SELECT create_distributed_table('source', null); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('target', null, colocate_with=>'source'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}'); -- insert MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"a" : 1} 2 | {"a" : 2} (2 rows) -- update MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) AND src.doc = target.doc WHEN MATCHED THEN UPDATE SET doc = '{"b" : 1}' WHEN NOT MATCHED THEN INSERT (id, doc) VALUES (src.t_id, doc); SELECT * FROM target; id | doc --------------------------------------------------------------------- 2 | {"b" : 1} 2 | {"b" : 1} (2 rows) -- Explain EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src ON (src.t_id = target.id) WHEN MATCHED THEN DO NOTHING; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus Adaptive) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression -> Merge on target_4000029 target -> Nested Loop -> Seq Scan on source_4000028 source -> Materialize -> Seq Scan on target_4000029 target Filter: ('2'::bigint = id) (11 rows) DROP TABLE IF EXISTS source; DROP TABLE IF EXISTS target; DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE;