diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 6e237b546..0af512b2f 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -164,7 +164,6 @@ static uint32 HashPartitionCount(void); static Job * BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestrictionContext); static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction); -static bool IsOuterTableOfOuterJoin(RelationRestriction *relationRestriction); static void ErrorIfUnsupportedShardDistribution(Query *query); static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, RelationRestrictionContext *restrictionContext, @@ -2226,34 +2225,22 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, } /* - * Skip adding shards of non-target (outer)relations. - * Note: This is a stop-gap arrangement for phase-I where in sql - * generates a single task on the shard identified by constant - * qual(filter) on the target relation. + * For left joins we don't care about the shards pruned for + * the right hand side. If the right hand side would prune + * to a smaller set we should still send it to all tables + * of the left hand side. However if the right hand side is + * bigger than the left hand side we don't have to send the + * query to any shard that is not matching anything on the + * left hand side. + * + * Instead we will simply skip any RelationRestriction if it + * is an OUTER join and the table is part of the non-outer + * side of the join. */ - if (IsMergeQuery(query) && - IsOuterTableOfOuterJoin(relationRestriction)) + if (IsInnerTableOfOuterJoin(relationRestriction)) { continue; } - else if (!IsMergeQuery(query) && - IsInnerTableOfOuterJoin(relationRestriction)) - { - /* - * For left joins we don't care about the shards pruned for - * the right hand side. If the right hand side would prune - * to a smaller set we should still send it to all tables - * of the left hand side. However if the right hand side is - * bigger than the left hand side we don't have to send the - * query to any shard that is not matching anything on the - * left hand side. - * - * Instead we will simply skip any RelationRestriction if it - * is an OUTER join and the table is part of the non-outer - * side of the join. - */ - continue; - } ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, prunedShardList) @@ -2318,45 +2305,6 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, } -/* - * IsOuterTableOfOuterJoin tests based on the join information envoded in a - * RelationRestriction if the table accessed for this relation is - * a) in an outer join - * b) on the outer part of said join - * - * The function returns true only if both conditions above hold true - */ -static bool -IsOuterTableOfOuterJoin(RelationRestriction *relationRestriction) -{ - RestrictInfo *joinInfo = NULL; - foreach_ptr(joinInfo, relationRestriction->relOptInfo->joininfo) - { - if (joinInfo->outer_relids == NULL) - { - /* not an outer join */ - continue; - } - - /* - * This join restriction info describes an outer join, we need to figure out if - * our table is in the outer part of this join. If that is the case this is a - * outer table of an outer join. - */ - bool isInOuter = bms_is_member(relationRestriction->relOptInfo->relid, - joinInfo->outer_relids); - if (isInOuter) - { - /* this table is joined in the outer part of an outer join */ - return true; - } - } - - /* we have not found any join clause that satisfies both requirements */ - return false; -} - - /* * IsInnerTableOfOuterJoin tests based on the join information envoded in a * RelationRestriction if the table accessed for this relation is diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 16cf7926b..756a398ce 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -4108,8 +4108,6 @@ IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte) * - All the distributed tables are indeed colocated. * - MERGE relations are joined on the distribution column * MERGE .. USING .. ON target.dist_key = source.dist_key - * - The query should touch only a single shard i.e. JOIN AND with a constant qual - * MERGE .. USING .. ON target.dist_key = source.dist_key AND target.dist_key = <> * * If any of the conditions are not met, it raises an exception. */ @@ -4142,29 +4140,6 @@ ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList, NULL, NULL); } - /* Look for a constant qual i.e. AND target.dist_key = <> */ - Node *distributionKeyValue = NULL; - Oid targetRelId = ResultRelationOidForQuery(parse); - Var *distributionKey = PartitionColumn(targetRelId, 1); - - Assert(distributionKey); - - /* convert list of expressions into expression tree for further processing */ - Node *quals = parse->jointree->quals; - - if (quals && IsA(quals, List)) - { - quals = (Node *) make_ands_explicit((List *) quals); - } - - if (!ConjunctionContainsColumnFilter(quals, distributionKey, &distributionKeyValue)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE on a distributed table requires a constant filter " - "on the distribution column of the target table", NULL, - "Consider adding AND target.dist_key = <> to the ON clause"); - } - return NULL; } diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index df343a077..2ebb31f47 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -28,6 +28,10 @@ s/\(ref_id\)=\([0-9]+\)/(ref_id)=(X)/g # shard table names for multi_subtransactions s/"t2_[0-9]+"/"t2_xxxxxxx"/g +# shard table names for MERGE tests +s/merge_schema\.([_a-z0-9]+)_40[0-9]+ /merge_schema.\1_xxxxxxx /g +s/pgmerge_schema\.([_a-z0-9]+)_40[0-9]+ /pgmerge_schema.\1_xxxxxxx /g + # shard table names for multi_subquery s/ keyval(1|2|ref)_[0-9]+ / keyval\1_xxxxxxx /g diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index cc87193a0..02671acd0 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -222,7 +222,7 @@ SELECT * from target t WHERE t.customer_id = 30002; 30002 | 103 | AX | -1 | Sun Jan 17 19:53:00 2021 (1 row) --- Turn on notice to print tasks sent to nodes (it should be a single task) +-- Turn on notice to print tasks sent to nodes SET citus.log_remote_commands to true; MERGE INTO target t USING source s @@ -233,10 +233,9 @@ MERGE INTO target t UPDATE SET -- Existing customer, update the order count and last_order_id order_count = t.order_count + 1, last_order_id = s.order_id - WHEN NOT MATCHED THEN -- New entry, record it. - INSERT (customer_id, last_order_id, order_center, order_count, last_order) - VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time); -NOTICE: issuing MERGE INTO merge_schema.target_4000002 t USING merge_schema.source_4000006 s ON ((t.customer_id OPERATOR(pg_catalog.=) s.customer_id) AND (t.customer_id OPERATOR(pg_catalog.=) 30002)) WHEN MATCHED AND ((t.order_center COLLATE "default") OPERATOR(pg_catalog.=) 'XX'::text) THEN DELETE WHEN MATCHED THEN UPDATE SET last_order_id = s.order_id, order_count = (t.order_count OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (customer_id, last_order_id, order_center, order_count, last_order) VALUES (s.customer_id, s.order_id, s.order_center, 123, s.order_time) + WHEN NOT MATCHED THEN + DO NOTHING; +NOTICE: issuing MERGE INTO merge_schema.target_xxxxxxx t USING merge_schema.source_xxxxxxx s ON ((t.customer_id OPERATOR(pg_catalog.=) s.customer_id) AND (t.customer_id OPERATOR(pg_catalog.=) 30002)) WHEN MATCHED AND ((t.order_center COLLATE "default") OPERATOR(pg_catalog.=) 'XX'::text) THEN DELETE WHEN MATCHED THEN UPDATE SET last_order_id = s.order_id, order_count = (t.order_count OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN DO NOTHING DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT * from target t WHERE t.customer_id = 30002; @@ -448,18 +447,40 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_4000018 s1) MERGE INTO merge_schema.t1_4000014 t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val) +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_xxxxxxx s1) MERGE INTO merge_schema.t1_xxxxxxx t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_xxxxxxx s1) MERGE INTO merge_schema.t1_xxxxxxx t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_xxxxxxx s1) MERGE INTO merge_schema.t1_xxxxxxx t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_xxxxxxx s1) MERGE INTO merge_schema.t1_xxxxxxx t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; --- As the id 6 is NO match, VALUES(6, 1) should appear in target -SELECT * FROM t1 order by id; +-- Other than id 6 everything else is a NO match, and should appear in target +SELECT * FROM t1 order by 1, 2; id | val --------------------------------------------------------------------- + 1 | 0 1 | 0 2 | 0 + 2 | 1 + 3 | 1 + 4 | 1 5 | 0 6 | 1 -(4 rows) +(8 rows) -- -- Test with multiple join conditions @@ -634,8 +655,8 @@ ON t2.id = s2.id AND t2.src = s2.src AND t2.id = 4 WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN - INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src); -NOTICE: issuing MERGE INTO merge_schema.t2_4000023 t2 USING merge_schema.s2_4000027 s2 ON ((t2.id OPERATOR(pg_catalog.=) s2.id) AND (t2.src OPERATOR(pg_catalog.=) s2.src) AND (t2.id OPERATOR(pg_catalog.=) 4)) WHEN MATCHED AND (t2.val OPERATOR(pg_catalog.=) 1) THEN UPDATE SET val = (s2.val OPERATOR(pg_catalog.+) 10) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src) + DO NOTHING; +NOTICE: issuing MERGE INTO merge_schema.t2_xxxxxxx t2 USING merge_schema.s2_xxxxxxx s2 ON ((t2.id OPERATOR(pg_catalog.=) s2.id) AND (t2.src OPERATOR(pg_catalog.=) s2.src) AND (t2.id OPERATOR(pg_catalog.=) 4)) WHEN MATCHED AND (t2.val OPERATOR(pg_catalog.=) 1) THEN UPDATE SET val = (s2.val OPERATOR(pg_catalog.+) 10) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN DO NOTHING DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; -- Row with id = 4 is a match for delete clause, row should be deleted @@ -646,8 +667,7 @@ SELECT * FROM t2 ORDER BY 1; 1 | 0 | target 2 | 0 | target 3 | 1 | match - 3 | 10 | match -(4 rows) +(3 rows) -- -- With sub-query as the MERGE source @@ -1029,7 +1049,7 @@ WHEN MATCHED THEN UPDATE SET value = vl_source.value, id = vl_target.id + 1 WHEN NOT MATCHED THEN INSERT VALUES(vl_source.ID, vl_source.value); -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO vl_local FROM vl_target ORDER BY 1 ; -- Should be equal @@ -1082,7 +1102,7 @@ WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(rs_source.id); -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO rs_local FROM rs_target ORDER BY 1 ; -- Should be equal @@ -1218,7 +1238,7 @@ DEBUG: function does not have co-located tables DEBUG: generating subplan XXX_1 for subquery SELECT id, source FROM merge_schema.f_dist() f(id integer, source character varying) DEBUG: DEBUG: Plan XXX query after replacing subqueries and CTEs: MERGE INTO merge_schema.fn_target USING (SELECT intermediate_result.id, intermediate_result.source FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source) -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO fn_local FROM fn_target ORDER BY 1 ; -- Should be equal @@ -1290,7 +1310,7 @@ MERGE INTO ft_target DELETE WHEN NOT MATCHED THEN INSERT (id, user_val) VALUES (foreign_table.id, foreign_table.user_val); -DEBUG: +DEBUG: RESET client_min_messages; SELECT * FROM ft_target; id | user_val @@ -1394,7 +1414,7 @@ WHEN NOT MATCHED THEN DO NOTHING; NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.target_cj_4000050 t USING (merge_schema.source_cj1_4000054 s1 JOIN merge_schema.source_cj2_4000058 s2 ON ((s1.sid1 OPERATOR(pg_catalog.=) s2.sid2))) ON ((t.tid OPERATOR(pg_catalog.=) s1.sid1) AND (t.tid OPERATOR(pg_catalog.=) 2)) WHEN MATCHED THEN UPDATE SET src = s2.src2 WHEN NOT MATCHED THEN DO NOTHING +NOTICE: issuing MERGE INTO merge_schema.target_cj_xxxxxxx t USING (merge_schema.source_cj1_xxxxxxx s1 JOIN merge_schema.source_cj2_xxxxxxx s2 ON ((s1.sid1 OPERATOR(pg_catalog.=) s2.sid2))) ON ((t.tid OPERATOR(pg_catalog.=) s1.sid1) AND (t.tid OPERATOR(pg_catalog.=) 2)) WHEN MATCHED THEN UPDATE SET src = s2.src2 WHEN NOT MATCHED THEN DO NOTHING DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT * FROM target_cj ORDER BY 1; @@ -1465,7 +1485,7 @@ WHEN MATCHED THEN UPDATE SET src = sub.src, val = sub.val + 100 WHEN NOT MATCHED THEN DO NOTHING; -NOTICE: issuing MERGE INTO merge_schema.target_cj_4000048 t1 USING (SELECT target_cj.tid, target_cj.src, target_cj.val FROM merge_schema.target_cj_4000048 target_cj) sub ON ((t1.tid OPERATOR(pg_catalog.=) sub.tid) AND (t1.tid OPERATOR(pg_catalog.=) 3)) WHEN MATCHED THEN UPDATE SET src = sub.src, val = (sub.val OPERATOR(pg_catalog.+) 100) WHEN NOT MATCHED THEN DO NOTHING +NOTICE: issuing MERGE INTO merge_schema.target_cj_xxxxxxx t1 USING (SELECT target_cj.tid, target_cj.src, target_cj.val FROM merge_schema.target_cj_xxxxxxx target_cj) sub ON ((t1.tid OPERATOR(pg_catalog.=) sub.tid) AND (t1.tid OPERATOR(pg_catalog.=) 3)) WHEN MATCHED THEN UPDATE SET src = sub.src, val = (sub.val OPERATOR(pg_catalog.+) 100) WHEN NOT MATCHED THEN DO NOTHING DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx set citus.log_remote_commands to false; SELECT * FROM target_cj ORDER BY 1; @@ -1516,18 +1536,18 @@ BEGIN; SET citus.log_remote_commands to true; SET client_min_messages TO DEBUG1; EXECUTE foo(2); -DEBUG: -DEBUG: -DEBUG: -DEBUG: -DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.target_cj_4000050 target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_4000054 source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING +NOTICE: issuing MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx RESET client_min_messages; EXECUTE foo(2); -NOTICE: issuing MERGE INTO merge_schema.target_cj_4000050 target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_4000054 source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING +NOTICE: issuing MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT * FROM target_cj ORDER BY 1; @@ -1539,6 +1559,611 @@ SELECT * FROM target_cj ORDER BY 1; 3 | target | 0 (4 rows) +ROLLBACK; +-- Test distributed tables, must be co-located and joined on distribution column. +-- +-- We create two sets of source and target tables, one set is Postgres and the other +-- is Citus distributed. Run the _exact_ MERGE SQL on both the sets and compare the +-- final results of target tables of Postgres and Citus, the result should match. +-- This is repeated for various MERGE SQL combinations +-- +CREATE TABLE pg_target(id int, val varchar); +CREATE TABLE pg_source(id int, val varchar); +CREATE TABLE citus_target(id int, val varchar); +CREATE TABLE citus_source(id int, val varchar); +-- Half of the source rows do not match +INSERT INTO pg_target SELECT i, 'target' FROM generate_series(250, 500) i; +INSERT INTO pg_source SELECT i, 'source' FROM generate_series(1, 500) i; +INSERT INTO citus_target SELECT i, 'target' FROM generate_series(250, 500) i; +INSERT INTO citus_source SELECT i, 'source' FROM generate_series(1, 500) i; +SELECT create_distributed_table('citus_target', 'id'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.citus_target$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('citus_source', 'id'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.citus_source$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- +-- This routine compares the target tables of Postgres and Citus and +-- returns true if they match, false if the results do not match. +-- +CREATE OR REPLACE FUNCTION compare_tables() RETURNS BOOLEAN AS $$ +DECLARE ret BOOL; +BEGIN +SELECT count(1) = 0 INTO ret + FROM pg_target + FULL OUTER JOIN citus_target + USING (id, val) + WHERE pg_target.id IS NULL + OR citus_target.id IS NULL; +RETURN ret; +END +$$ LANGUAGE PLPGSQL; +-- Make sure we start with exact data in Postgres and Citus +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +-- Run the MERGE on both Postgres and Citus, and compare the final target tables +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- +-- ON clause filter on source +-- +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND s.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND s.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- +-- ON clause filter on target +-- +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND t.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND t.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- +-- NOT MATCHED clause filter on source +-- +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id +WHEN MATCHED THEN + DO NOTHING +WHEN NOT MATCHED AND s.id < 100 THEN + INSERT VALUES(s.id, s.val); +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id +WHEN MATCHED THEN + DO NOTHING +WHEN NOT MATCHED AND s.id < 100 THEN + INSERT VALUES(s.id, s.val); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- +-- Test constant filter in ON clause to check if shards are pruned +-- with restriction information +-- +-- +-- Though constant filter is present, this won't prune shards as +-- NOT MATCHED clause is present +-- +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- This will prune shards with restriction information as NOT MATCHED is void +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + DO NOTHING; +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + DO NOTHING; +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Test CTE with distributed tables +CREATE VIEW pg_source_view AS SELECT * FROM pg_source WHERE id < 400; +WARNING: "view pg_source_view" has dependency to "table pg_source" that is not in Citus' metadata +DETAIL: "view pg_source_view" will be created only locally +HINT: Distribute "table pg_source" first to distribute "view pg_source_view" +CREATE VIEW citus_source_view AS SELECT * FROM citus_source WHERE id < 400; +BEGIN; +SEt citus.log_remote_commands to true; +WITH cte AS ( + SELECT * FROM pg_source_view +) +MERGE INTO pg_target t +USING cte +ON cte.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by CTE' +WHEN NOT MATCHED THEN + INSERT VALUES (cte.id, cte.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; +WITH cte AS ( + SELECT * FROM citus_source_view +) +MERGE INTO citus_target t +USING cte +ON cte.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by CTE' +WHEN NOT MATCHED THEN + INSERT VALUES (cte.id, cte.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Test sub-query with distributed tables +BEGIN; +SEt citus.log_remote_commands to true; +MERGE INTO pg_target t +USING (SELECT * FROM pg_source) subq +ON subq.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by subquery' +WHEN NOT MATCHED THEN + INSERT VALUES (subq.id, subq.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; +MERGE INTO citus_target t +USING (SELECT * FROM citus_source) subq +ON subq.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by subquery' +WHEN NOT MATCHED THEN + INSERT VALUES (subq.id, subq.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Test PREPARE +PREPARE pg_prep(int) AS +MERGE INTO pg_target +USING (SELECT * FROM pg_source) sub +ON pg_target.id = sub.id AND pg_target.id = $1 +WHEN MATCHED THEN + UPDATE SET val = 'Updated by prepare using ' || sub.val +WHEN NOT MATCHED THEN + DO NOTHING; +PREPARE citus_prep(int) AS +MERGE INTO citus_target +USING (SELECT * FROM citus_source) sub +ON citus_target.id = sub.id AND citus_target.id = $1 +WHEN MATCHED THEN + UPDATE SET val = 'Updated by prepare using ' || sub.val +WHEN NOT MATCHED THEN + DO NOTHING; +BEGIN; +SET citus.log_remote_commands to true; +SELECT * FROM pg_target WHERE id = 500; -- before merge + id | val +--------------------------------------------------------------------- + 500 | target +(1 row) + +EXECUTE pg_prep(500); +SELECT * FROM pg_target WHERE id = 500; -- non-cached + id | val +--------------------------------------------------------------------- + 500 | Updated by prepare using source +(1 row) + +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +SELECT * FROM pg_target WHERE id = 500; -- cached + id | val +--------------------------------------------------------------------- + 500 | Updated by prepare using source +(1 row) + +SELECT * FROM citus_target WHERE id = 500; -- before merge +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT id, val FROM merge_schema.citus_target_xxxxxxx citus_target WHERE (id OPERATOR(pg_catalog.=) 500) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + id | val +--------------------------------------------------------------------- + 500 | target +(1 row) + +EXECUTE citus_prep(500); +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SELECT * FROM citus_target WHERE id = 500; -- non-cached +NOTICE: issuing SELECT id, val FROM merge_schema.citus_target_xxxxxxx citus_target WHERE (id OPERATOR(pg_catalog.=) 500) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + id | val +--------------------------------------------------------------------- + 500 | Updated by prepare using source +(1 row) + +EXECUTE citus_prep(500); +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +EXECUTE citus_prep(500); +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +EXECUTE citus_prep(500); +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +EXECUTE citus_prep(500); +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +EXECUTE citus_prep(500); +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SELECT * FROM citus_target WHERE id = 500; -- cached +NOTICE: issuing SELECT id, val FROM merge_schema.citus_target_xxxxxxx citus_target WHERE (id OPERATOR(pg_catalog.=) 500) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + id | val +--------------------------------------------------------------------- + 500 | Updated by prepare using source +(1 row) + +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Test partitions + distributed tables +CREATE TABLE pg_pa_target (tid integer, balance float, val text) + PARTITION BY LIST (tid); +CREATE TABLE citus_pa_target (tid integer, balance float, val text) + PARTITION BY LIST (tid); +CREATE TABLE part1 PARTITION OF pg_pa_target FOR VALUES IN (1,4) + WITH (autovacuum_enabled=off); +CREATE TABLE part2 PARTITION OF pg_pa_target FOR VALUES IN (2,5,6) + WITH (autovacuum_enabled=off); +CREATE TABLE part3 PARTITION OF pg_pa_target FOR VALUES IN (3,8,9) + WITH (autovacuum_enabled=off); +CREATE TABLE part4 PARTITION OF pg_pa_target DEFAULT + WITH (autovacuum_enabled=off); +CREATE TABLE part5 PARTITION OF citus_pa_target FOR VALUES IN (1,4) + WITH (autovacuum_enabled=off); +CREATE TABLE part6 PARTITION OF citus_pa_target FOR VALUES IN (2,5,6) + WITH (autovacuum_enabled=off); +CREATE TABLE part7 PARTITION OF citus_pa_target FOR VALUES IN (3,8,9) + WITH (autovacuum_enabled=off); +CREATE TABLE part8 PARTITION OF citus_pa_target DEFAULT + WITH (autovacuum_enabled=off); +CREATE TABLE pg_pa_source (sid integer, delta float); +CREATE TABLE citus_pa_source (sid integer, delta float); +-- insert many rows to the source table +INSERT INTO pg_pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +INSERT INTO citus_pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- insert a few rows in the target table (odd numbered tid) +INSERT INTO pg_pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id; +INSERT INTO citus_pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id; +SELECT create_distributed_table('citus_pa_target', 'tid'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.part5$$) +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.part6$$) +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.part7$$) +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.part8$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('citus_pa_source', 'sid'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.citus_pa_source$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION pa_compare_tables() RETURNS BOOLEAN AS $$ +DECLARE ret BOOL; +BEGIN +SELECT count(1) = 0 INTO ret + FROM pg_pa_target + FULL OUTER JOIN citus_pa_target + USING (tid, balance, val) + WHERE pg_pa_target.tid IS NULL + OR citus_pa_target.tid IS NULL; +RETURN ret; +END +$$ LANGUAGE PLPGSQL; +-- try simple MERGE +BEGIN; +MERGE INTO pg_pa_target t + USING pg_pa_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); +MERGE INTO citus_pa_target t + USING citus_pa_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); +SELECT pa_compare_tables(); + pa_compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO pg_pa_target t + USING pg_pa_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); +MERGE INTO citus_pa_target t + USING citus_pa_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); +SELECT pa_compare_tables(); + pa_compare_tables +--------------------------------------------------------------------- + t +(1 row) + ROLLBACK; -- -- Error and Unsupported scenarios @@ -1935,37 +2560,6 @@ UPDATE SET val = dist_colocated.val WHEN NOT MATCHED THEN INSERT VALUES(dist_colocated.id, dist_colocated.val); ERROR: MERGE command is only supported when distributed tables are joined on their distribution column --- MERGE command must be joined with with a constant qual on target relation --- AND clause is missing -MERGE INTO dist_target -USING dist_colocated -ON dist_target.id = dist_colocated.id -WHEN MATCHED THEN -UPDATE SET val = dist_colocated.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_colocated.id, dist_colocated.val); -ERROR: MERGE on a distributed table requires a constant filter on the distribution column of the target table -HINT: Consider adding AND target.dist_key = <> to the ON clause --- AND clause incorrect table (must be target) -MERGE INTO dist_target -USING dist_colocated -ON dist_target.id = dist_colocated.id AND dist_colocated.id = 1 -WHEN MATCHED THEN -UPDATE SET val = dist_colocated.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_colocated.id, dist_colocated.val); -ERROR: MERGE on a distributed table requires a constant filter on the distribution column of the target table -HINT: Consider adding AND target.dist_key = <> to the ON clause --- AND clause incorrect column (must be distribution column) -MERGE INTO dist_target -USING dist_colocated -ON dist_target.id = dist_colocated.id AND dist_target.val = 'const' -WHEN MATCHED THEN -UPDATE SET val = dist_colocated.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_colocated.id, dist_colocated.val); -ERROR: MERGE on a distributed table requires a constant filter on the distribution column of the target table -HINT: Consider adding AND target.dist_key = <> to the ON clause -- Both the source and target must be distributed MERGE INTO dist_target USING (SELECT 100 id) AS source @@ -2165,7 +2759,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_ PL/pgSQL function citus_drop_trigger() line XX at PERFORM DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 63 other objects +NOTICE: drop cascades to 75 other objects DETAIL: drop cascades to function insert_data() drop cascades to table pg_result drop cascades to table local_local @@ -2220,9 +2814,21 @@ drop cascades to extension postgres_fdw drop cascades to table target_cj drop cascades to table source_cj1 drop cascades to table source_cj2 +drop cascades to table pg_target +drop cascades to table pg_source +drop cascades to table citus_target +drop cascades to table citus_source +drop cascades to function compare_tables() +drop cascades to view pg_source_view +drop cascades to view citus_source_view +drop cascades to table pg_pa_target +drop cascades to table citus_pa_target +drop cascades to table pg_pa_source +drop cascades to table citus_pa_source +drop cascades to function pa_compare_tables() drop cascades to table pg -drop cascades to table t1_4000078 -drop cascades to table s1_4000079 +drop cascades to table t1_4000110 +drop cascades to table s1_4000111 drop cascades to table t1 drop cascades to table s1 drop cascades to table dist_colocated diff --git a/src/test/regress/expected/pgmerge.out b/src/test/regress/expected/pgmerge.out index 89c3f85ca..0bedf356f 100644 --- a/src/test/regress/expected/pgmerge.out +++ b/src/test/regress/expected/pgmerge.out @@ -1899,7 +1899,7 @@ MERGE INTO pa_target t UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (slogts::timestamp, sid, delta, 'inserted by merge'); -DEBUG: +DEBUG: SELECT * FROM pa_target ORDER BY tid; logts | tid | balance | val --------------------------------------------------------------------- @@ -2091,7 +2091,7 @@ WHEN MATCHED THEN UPDATE WHEN NOT MATCHED THEN INSERT (city_id, logdate, peaktemp, unitsales) VALUES (city_id, logdate, peaktemp, unitsales); -DEBUG: +DEBUG: RESET client_min_messages; SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate; tableoid | city_id | logdate | peaktemp | unitsales diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 539e9814d..12294b2c9 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -146,7 +146,7 @@ SELECT create_distributed_table('source', 'customer_id'); -- Updates one of the row with customer_id = 30002 SELECT * from target t WHERE t.customer_id = 30002; --- Turn on notice to print tasks sent to nodes (it should be a single task) +-- Turn on notice to print tasks sent to nodes SET citus.log_remote_commands to true; MERGE INTO target t USING source s @@ -160,9 +160,9 @@ MERGE INTO target t order_count = t.order_count + 1, last_order_id = s.order_id - WHEN NOT MATCHED THEN -- New entry, record it. - INSERT (customer_id, last_order_id, order_center, order_count, last_order) - VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time); + WHEN NOT MATCHED THEN + DO NOTHING; + SET citus.log_remote_commands to false; SELECT * from target t WHERE t.customer_id = 30002; @@ -284,8 +284,8 @@ MERGE INTO t1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); SET citus.log_remote_commands to false; --- As the id 6 is NO match, VALUES(6, 1) should appear in target -SELECT * FROM t1 order by id; +-- Other than id 6 everything else is a NO match, and should appear in target +SELECT * FROM t1 order by 1, 2; -- -- Test with multiple join conditions @@ -366,7 +366,7 @@ ON t2.id = s2.id AND t2.src = s2.src AND t2.id = 4 WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN - INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src); + DO NOTHING; SET citus.log_remote_commands to false; -- Row with id = 4 is a match for delete clause, row should be deleted -- Row with id = 3 is a NO match, row from source will be inserted @@ -999,6 +999,424 @@ SET citus.log_remote_commands to false; SELECT * FROM target_cj ORDER BY 1; ROLLBACK; +-- Test distributed tables, must be co-located and joined on distribution column. + +-- +-- We create two sets of source and target tables, one set is Postgres and the other +-- is Citus distributed. Run the _exact_ MERGE SQL on both the sets and compare the +-- final results of target tables of Postgres and Citus, the result should match. +-- This is repeated for various MERGE SQL combinations +-- +CREATE TABLE pg_target(id int, val varchar); +CREATE TABLE pg_source(id int, val varchar); +CREATE TABLE citus_target(id int, val varchar); +CREATE TABLE citus_source(id int, val varchar); + +-- Half of the source rows do not match +INSERT INTO pg_target SELECT i, 'target' FROM generate_series(250, 500) i; +INSERT INTO pg_source SELECT i, 'source' FROM generate_series(1, 500) i; + +INSERT INTO citus_target SELECT i, 'target' FROM generate_series(250, 500) i; +INSERT INTO citus_source SELECT i, 'source' FROM generate_series(1, 500) i; + +SELECT create_distributed_table('citus_target', 'id'); +SELECT create_distributed_table('citus_source', 'id'); + +-- +-- This routine compares the target tables of Postgres and Citus and +-- returns true if they match, false if the results do not match. +-- +CREATE OR REPLACE FUNCTION compare_tables() RETURNS BOOLEAN AS $$ +DECLARE ret BOOL; +BEGIN +SELECT count(1) = 0 INTO ret + FROM pg_target + FULL OUTER JOIN citus_target + USING (id, val) + WHERE pg_target.id IS NULL + OR citus_target.id IS NULL; +RETURN ret; +END +$$ LANGUAGE PLPGSQL; + +-- Make sure we start with exact data in Postgres and Citus +SELECT compare_tables(); + +-- Run the MERGE on both Postgres and Citus, and compare the final target tables + +BEGIN; +SET citus.log_remote_commands to true; + +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- +-- ON clause filter on source +-- +BEGIN; +SET citus.log_remote_commands to true; + +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND s.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND s.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- +-- ON clause filter on target +-- +BEGIN; +SET citus.log_remote_commands to true; + +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND t.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND t.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- +-- NOT MATCHED clause filter on source +-- +BEGIN; +SET citus.log_remote_commands to true; + +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id +WHEN MATCHED THEN + DO NOTHING +WHEN NOT MATCHED AND s.id < 100 THEN + INSERT VALUES(s.id, s.val); + +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id +WHEN MATCHED THEN + DO NOTHING +WHEN NOT MATCHED AND s.id < 100 THEN + INSERT VALUES(s.id, s.val); + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- +-- Test constant filter in ON clause to check if shards are pruned +-- with restriction information +-- + +-- +-- Though constant filter is present, this won't prune shards as +-- NOT MATCHED clause is present +-- +BEGIN; +SET citus.log_remote_commands to true; + +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- This will prune shards with restriction information as NOT MATCHED is void +BEGIN; +SET citus.log_remote_commands to true; + +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + DO NOTHING; + +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + DO NOTHING; + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- Test CTE with distributed tables +CREATE VIEW pg_source_view AS SELECT * FROM pg_source WHERE id < 400; +CREATE VIEW citus_source_view AS SELECT * FROM citus_source WHERE id < 400; + +BEGIN; +SEt citus.log_remote_commands to true; + +WITH cte AS ( + SELECT * FROM pg_source_view +) +MERGE INTO pg_target t +USING cte +ON cte.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by CTE' +WHEN NOT MATCHED THEN + INSERT VALUES (cte.id, cte.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; + +WITH cte AS ( + SELECT * FROM citus_source_view +) +MERGE INTO citus_target t +USING cte +ON cte.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by CTE' +WHEN NOT MATCHED THEN + INSERT VALUES (cte.id, cte.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + + +-- Test sub-query with distributed tables +BEGIN; +SEt citus.log_remote_commands to true; + +MERGE INTO pg_target t +USING (SELECT * FROM pg_source) subq +ON subq.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by subquery' +WHEN NOT MATCHED THEN + INSERT VALUES (subq.id, subq.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; + +MERGE INTO citus_target t +USING (SELECT * FROM citus_source) subq +ON subq.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by subquery' +WHEN NOT MATCHED THEN + INSERT VALUES (subq.id, subq.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- Test PREPARE +PREPARE pg_prep(int) AS +MERGE INTO pg_target +USING (SELECT * FROM pg_source) sub +ON pg_target.id = sub.id AND pg_target.id = $1 +WHEN MATCHED THEN + UPDATE SET val = 'Updated by prepare using ' || sub.val +WHEN NOT MATCHED THEN + DO NOTHING; + +PREPARE citus_prep(int) AS +MERGE INTO citus_target +USING (SELECT * FROM citus_source) sub +ON citus_target.id = sub.id AND citus_target.id = $1 +WHEN MATCHED THEN + UPDATE SET val = 'Updated by prepare using ' || sub.val +WHEN NOT MATCHED THEN + DO NOTHING; + +BEGIN; +SET citus.log_remote_commands to true; + +SELECT * FROM pg_target WHERE id = 500; -- before merge +EXECUTE pg_prep(500); +SELECT * FROM pg_target WHERE id = 500; -- non-cached +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +SELECT * FROM pg_target WHERE id = 500; -- cached + +SELECT * FROM citus_target WHERE id = 500; -- before merge +EXECUTE citus_prep(500); +SELECT * FROM citus_target WHERE id = 500; -- non-cached +EXECUTE citus_prep(500); +EXECUTE citus_prep(500); +EXECUTE citus_prep(500); +EXECUTE citus_prep(500); +EXECUTE citus_prep(500); +SELECT * FROM citus_target WHERE id = 500; -- cached + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- Test partitions + distributed tables + +CREATE TABLE pg_pa_target (tid integer, balance float, val text) + PARTITION BY LIST (tid); +CREATE TABLE citus_pa_target (tid integer, balance float, val text) + PARTITION BY LIST (tid); + +CREATE TABLE part1 PARTITION OF pg_pa_target FOR VALUES IN (1,4) + WITH (autovacuum_enabled=off); +CREATE TABLE part2 PARTITION OF pg_pa_target FOR VALUES IN (2,5,6) + WITH (autovacuum_enabled=off); +CREATE TABLE part3 PARTITION OF pg_pa_target FOR VALUES IN (3,8,9) + WITH (autovacuum_enabled=off); +CREATE TABLE part4 PARTITION OF pg_pa_target DEFAULT + WITH (autovacuum_enabled=off); +CREATE TABLE part5 PARTITION OF citus_pa_target FOR VALUES IN (1,4) + WITH (autovacuum_enabled=off); +CREATE TABLE part6 PARTITION OF citus_pa_target FOR VALUES IN (2,5,6) + WITH (autovacuum_enabled=off); +CREATE TABLE part7 PARTITION OF citus_pa_target FOR VALUES IN (3,8,9) + WITH (autovacuum_enabled=off); +CREATE TABLE part8 PARTITION OF citus_pa_target DEFAULT + WITH (autovacuum_enabled=off); + +CREATE TABLE pg_pa_source (sid integer, delta float); +CREATE TABLE citus_pa_source (sid integer, delta float); + +-- insert many rows to the source table +INSERT INTO pg_pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +INSERT INTO citus_pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- insert a few rows in the target table (odd numbered tid) +INSERT INTO pg_pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id; +INSERT INTO citus_pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id; + +SELECT create_distributed_table('citus_pa_target', 'tid'); +SELECT create_distributed_table('citus_pa_source', 'sid'); + +CREATE OR REPLACE FUNCTION pa_compare_tables() RETURNS BOOLEAN AS $$ +DECLARE ret BOOL; +BEGIN +SELECT count(1) = 0 INTO ret + FROM pg_pa_target + FULL OUTER JOIN citus_pa_target + USING (tid, balance, val) + WHERE pg_pa_target.tid IS NULL + OR citus_pa_target.tid IS NULL; +RETURN ret; +END +$$ LANGUAGE PLPGSQL; + +-- try simple MERGE +BEGIN; +MERGE INTO pg_pa_target t + USING pg_pa_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); + +MERGE INTO citus_pa_target t + USING citus_pa_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); + +SELECT pa_compare_tables(); +ROLLBACK; + +-- same with a constant qual +BEGIN; +MERGE INTO pg_pa_target t + USING pg_pa_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); + +MERGE INTO citus_pa_target t + USING citus_pa_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); + +SELECT pa_compare_tables(); +ROLLBACK; + -- -- Error and Unsupported scenarios -- @@ -1241,34 +1659,6 @@ UPDATE SET val = dist_colocated.val WHEN NOT MATCHED THEN INSERT VALUES(dist_colocated.id, dist_colocated.val); --- MERGE command must be joined with with a constant qual on target relation - --- AND clause is missing -MERGE INTO dist_target -USING dist_colocated -ON dist_target.id = dist_colocated.id -WHEN MATCHED THEN -UPDATE SET val = dist_colocated.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_colocated.id, dist_colocated.val); - --- AND clause incorrect table (must be target) -MERGE INTO dist_target -USING dist_colocated -ON dist_target.id = dist_colocated.id AND dist_colocated.id = 1 -WHEN MATCHED THEN -UPDATE SET val = dist_colocated.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_colocated.id, dist_colocated.val); - --- AND clause incorrect column (must be distribution column) -MERGE INTO dist_target -USING dist_colocated -ON dist_target.id = dist_colocated.id AND dist_target.val = 'const' -WHEN MATCHED THEN -UPDATE SET val = dist_colocated.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_colocated.id, dist_colocated.val); -- Both the source and target must be distributed MERGE INTO dist_target