This implements MERGE phase3

Support pushdown query where all the tables in the merge-sql are Citus-distributed, co-located, and both
the source and target relations are joined on the distribution column. This will generate multiple tasks
which execute independently after pushdown.

SELECT create_distributed_table('t1', 'id');
SELECT create_distributed_table('s1', 'id', colocate_with => ‘t1’);

MERGE INTO t1
USING s1
ON t1.id = s1.id
        WHEN MATCHED THEN
                UPDATE SET val = s1.val + 10
        WHEN MATCHED THEN
                DELETE
        WHEN NOT MATCHED THEN
                INSERT (id, val, src) VALUES (s1.id, s1.val, s1.src)

*The only exception for both the phases II and III is, UPDATEs and INSERTs must be done on the same shard-group
as the joined key; for example, below scenarios are NOT supported as the key-value to be inserted/updated is not
guaranteed to be on the same node as the id distribution-column.

MERGE INTO target t
USING source s ON (t.customer_id = s.customer_id)
WHEN NOT MATCHED THEN - -
     INSERT(customer_id, …) VALUES (<non-local-constant-key-value>, ……);

OR this scenario where we update the distribution column itself

MERGE INTO target t
USING source s On (t.customer_id = s.customer_id)
WHEN MATCHED THEN
     UPDATE SET customer_id = 100;
merge_phase3
Teja Mupparti 2023-01-20 18:58:10 -08:00
parent 93fcc5c5d8
commit fa7b8949a8
6 changed files with 1109 additions and 186 deletions

View File

@ -164,7 +164,6 @@ static uint32 HashPartitionCount(void);
static Job * BuildJobTreeTaskList(Job *jobTree,
PlannerRestrictionContext *plannerRestrictionContext);
static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction);
static bool IsOuterTableOfOuterJoin(RelationRestriction *relationRestriction);
static void ErrorIfUnsupportedShardDistribution(Query *query);
static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext,
@ -2226,34 +2225,22 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
}
/*
* Skip adding shards of non-target (outer)relations.
* Note: This is a stop-gap arrangement for phase-I where in sql
* generates a single task on the shard identified by constant
* qual(filter) on the target relation.
* For left joins we don't care about the shards pruned for
* the right hand side. If the right hand side would prune
* to a smaller set we should still send it to all tables
* of the left hand side. However if the right hand side is
* bigger than the left hand side we don't have to send the
* query to any shard that is not matching anything on the
* left hand side.
*
* Instead we will simply skip any RelationRestriction if it
* is an OUTER join and the table is part of the non-outer
* side of the join.
*/
if (IsMergeQuery(query) &&
IsOuterTableOfOuterJoin(relationRestriction))
if (IsInnerTableOfOuterJoin(relationRestriction))
{
continue;
}
else if (!IsMergeQuery(query) &&
IsInnerTableOfOuterJoin(relationRestriction))
{
/*
* For left joins we don't care about the shards pruned for
* the right hand side. If the right hand side would prune
* to a smaller set we should still send it to all tables
* of the left hand side. However if the right hand side is
* bigger than the left hand side we don't have to send the
* query to any shard that is not matching anything on the
* left hand side.
*
* Instead we will simply skip any RelationRestriction if it
* is an OUTER join and the table is part of the non-outer
* side of the join.
*/
continue;
}
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, prunedShardList)
@ -2318,45 +2305,6 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
}
/*
* IsOuterTableOfOuterJoin tests based on the join information envoded in a
* RelationRestriction if the table accessed for this relation is
* a) in an outer join
* b) on the outer part of said join
*
* The function returns true only if both conditions above hold true
*/
static bool
IsOuterTableOfOuterJoin(RelationRestriction *relationRestriction)
{
RestrictInfo *joinInfo = NULL;
foreach_ptr(joinInfo, relationRestriction->relOptInfo->joininfo)
{
if (joinInfo->outer_relids == NULL)
{
/* not an outer join */
continue;
}
/*
* This join restriction info describes an outer join, we need to figure out if
* our table is in the outer part of this join. If that is the case this is a
* outer table of an outer join.
*/
bool isInOuter = bms_is_member(relationRestriction->relOptInfo->relid,
joinInfo->outer_relids);
if (isInOuter)
{
/* this table is joined in the outer part of an outer join */
return true;
}
}
/* we have not found any join clause that satisfies both requirements */
return false;
}
/*
* IsInnerTableOfOuterJoin tests based on the join information envoded in a
* RelationRestriction if the table accessed for this relation is

View File

@ -4108,8 +4108,6 @@ IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte)
* - All the distributed tables are indeed colocated.
* - MERGE relations are joined on the distribution column
* MERGE .. USING .. ON target.dist_key = source.dist_key
* - The query should touch only a single shard i.e. JOIN AND with a constant qual
* MERGE .. USING .. ON target.dist_key = source.dist_key AND target.dist_key = <>
*
* If any of the conditions are not met, it raises an exception.
*/
@ -4142,29 +4140,6 @@ ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList,
NULL, NULL);
}
/* Look for a constant qual i.e. AND target.dist_key = <> */
Node *distributionKeyValue = NULL;
Oid targetRelId = ResultRelationOidForQuery(parse);
Var *distributionKey = PartitionColumn(targetRelId, 1);
Assert(distributionKey);
/* convert list of expressions into expression tree for further processing */
Node *quals = parse->jointree->quals;
if (quals && IsA(quals, List))
{
quals = (Node *) make_ands_explicit((List *) quals);
}
if (!ConjunctionContainsColumnFilter(quals, distributionKey, &distributionKeyValue))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"MERGE on a distributed table requires a constant filter "
"on the distribution column of the target table", NULL,
"Consider adding AND target.dist_key = <> to the ON clause");
}
return NULL;
}

View File

@ -28,6 +28,10 @@ s/\(ref_id\)=\([0-9]+\)/(ref_id)=(X)/g
# shard table names for multi_subtransactions
s/"t2_[0-9]+"/"t2_xxxxxxx"/g
# shard table names for MERGE tests
s/merge_schema\.([_a-z0-9]+)_40[0-9]+ /merge_schema.\1_xxxxxxx /g
s/pgmerge_schema\.([_a-z0-9]+)_40[0-9]+ /pgmerge_schema.\1_xxxxxxx /g
# shard table names for multi_subquery
s/ keyval(1|2|ref)_[0-9]+ / keyval\1_xxxxxxx /g

View File

@ -222,7 +222,7 @@ SELECT * from target t WHERE t.customer_id = 30002;
30002 | 103 | AX | -1 | Sun Jan 17 19:53:00 2021
(1 row)
-- Turn on notice to print tasks sent to nodes (it should be a single task)
-- Turn on notice to print tasks sent to nodes
SET citus.log_remote_commands to true;
MERGE INTO target t
USING source s
@ -233,10 +233,9 @@ MERGE INTO target t
UPDATE SET -- Existing customer, update the order count and last_order_id
order_count = t.order_count + 1,
last_order_id = s.order_id
WHEN NOT MATCHED THEN -- New entry, record it.
INSERT (customer_id, last_order_id, order_center, order_count, last_order)
VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time);
NOTICE: issuing MERGE INTO merge_schema.target_4000002 t USING merge_schema.source_4000006 s ON ((t.customer_id OPERATOR(pg_catalog.=) s.customer_id) AND (t.customer_id OPERATOR(pg_catalog.=) 30002)) WHEN MATCHED AND ((t.order_center COLLATE "default") OPERATOR(pg_catalog.=) 'XX'::text) THEN DELETE WHEN MATCHED THEN UPDATE SET last_order_id = s.order_id, order_count = (t.order_count OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (customer_id, last_order_id, order_center, order_count, last_order) VALUES (s.customer_id, s.order_id, s.order_center, 123, s.order_time)
WHEN NOT MATCHED THEN
DO NOTHING;
NOTICE: issuing MERGE INTO merge_schema.target_xxxxxxx t USING merge_schema.source_xxxxxxx s ON ((t.customer_id OPERATOR(pg_catalog.=) s.customer_id) AND (t.customer_id OPERATOR(pg_catalog.=) 30002)) WHEN MATCHED AND ((t.order_center COLLATE "default") OPERATOR(pg_catalog.=) 'XX'::text) THEN DELETE WHEN MATCHED THEN UPDATE SET last_order_id = s.order_id, order_count = (t.order_count OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN DO NOTHING
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT * from target t WHERE t.customer_id = 30002;
@ -448,18 +447,40 @@ MERGE INTO t1
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1_res.id, s1_res.val);
NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_4000018 s1) MERGE INTO merge_schema.t1_4000014 t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val)
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_xxxxxxx s1) MERGE INTO merge_schema.t1_xxxxxxx t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_xxxxxxx s1) MERGE INTO merge_schema.t1_xxxxxxx t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_xxxxxxx s1) MERGE INTO merge_schema.t1_xxxxxxx t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_xxxxxxx s1) MERGE INTO merge_schema.t1_xxxxxxx t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
-- As the id 6 is NO match, VALUES(6, 1) should appear in target
SELECT * FROM t1 order by id;
-- Other than id 6 everything else is a NO match, and should appear in target
SELECT * FROM t1 order by 1, 2;
id | val
---------------------------------------------------------------------
1 | 0
1 | 0
2 | 0
2 | 1
3 | 1
4 | 1
5 | 0
6 | 1
(4 rows)
(8 rows)
--
-- Test with multiple join conditions
@ -634,8 +655,8 @@ ON t2.id = s2.id AND t2.src = s2.src AND t2.id = 4
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src);
NOTICE: issuing MERGE INTO merge_schema.t2_4000023 t2 USING merge_schema.s2_4000027 s2 ON ((t2.id OPERATOR(pg_catalog.=) s2.id) AND (t2.src OPERATOR(pg_catalog.=) s2.src) AND (t2.id OPERATOR(pg_catalog.=) 4)) WHEN MATCHED AND (t2.val OPERATOR(pg_catalog.=) 1) THEN UPDATE SET val = (s2.val OPERATOR(pg_catalog.+) 10) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src)
DO NOTHING;
NOTICE: issuing MERGE INTO merge_schema.t2_xxxxxxx t2 USING merge_schema.s2_xxxxxxx s2 ON ((t2.id OPERATOR(pg_catalog.=) s2.id) AND (t2.src OPERATOR(pg_catalog.=) s2.src) AND (t2.id OPERATOR(pg_catalog.=) 4)) WHEN MATCHED AND (t2.val OPERATOR(pg_catalog.=) 1) THEN UPDATE SET val = (s2.val OPERATOR(pg_catalog.+) 10) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN DO NOTHING
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
-- Row with id = 4 is a match for delete clause, row should be deleted
@ -646,8 +667,7 @@ SELECT * FROM t2 ORDER BY 1;
1 | 0 | target
2 | 0 | target
3 | 1 | match
3 | 10 | match
(4 rows)
(3 rows)
--
-- With sub-query as the MERGE source
@ -1029,7 +1049,7 @@ WHEN MATCHED THEN
UPDATE SET value = vl_source.value, id = vl_target.id + 1
WHEN NOT MATCHED THEN
INSERT VALUES(vl_source.ID, vl_source.value);
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.vl_target_4000036 vl_target USING (SELECT vl.id, vl.value FROM (VALUES (100,'source1'::text), (200,'source2'::text)) vl(id, value)) vl_source ON (vl_source.id OPERATOR(pg_catalog.=) vl_target.id) WHEN MATCHED THEN UPDATE SET id = (vl_target.id OPERATOR(pg_catalog.+) 1), value = (vl_source.value COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, value) VALUES (vl_source.id, (vl_source.value COLLATE "default"))>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.vl_target_xxxxxxx vl_target USING (SELECT vl.id, vl.value FROM (VALUES (100,'source1'::text), (200,'source2'::text)) vl(id, value)) vl_source ON (vl_source.id OPERATOR(pg_catalog.=) vl_target.id) WHEN MATCHED THEN UPDATE SET id = (vl_target.id OPERATOR(pg_catalog.+) 1), value = (vl_source.value COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, value) VALUES (vl_source.id, (vl_source.value COLLATE "default"))>
RESET client_min_messages;
SELECT * INTO vl_local FROM vl_target ORDER BY 1 ;
-- Should be equal
@ -1082,7 +1102,7 @@ WHEN MATCHED THEN
DO NOTHING
WHEN NOT MATCHED THEN
INSERT VALUES(rs_source.id);
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.rs_target_4000037 rs_target USING (SELECT id.id FROM merge_schema.f_immutable(99) id(id) WHERE (id.id OPERATOR(pg_catalog.=) ANY (SELECT 99))) rs_source ON (rs_source.id OPERATOR(pg_catalog.=) rs_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id) VALUES (rs_source.id)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.rs_target_xxxxxxx rs_target USING (SELECT id.id FROM merge_schema.f_immutable(99) id(id) WHERE (id.id OPERATOR(pg_catalog.=) ANY (SELECT 99))) rs_source ON (rs_source.id OPERATOR(pg_catalog.=) rs_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id) VALUES (rs_source.id)>
RESET client_min_messages;
SELECT * INTO rs_local FROM rs_target ORDER BY 1 ;
-- Should be equal
@ -1218,7 +1238,7 @@ DEBUG: function does not have co-located tables
DEBUG: generating subplan XXX_1 for subquery SELECT id, source FROM merge_schema.f_dist() f(id integer, source character varying)
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target USING (SELECT intermediate_result.id, intermediate_result.source FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)>
DEBUG: Plan XXX query after replacing subqueries and CTEs: MERGE INTO merge_schema.fn_target USING (SELECT intermediate_result.id, intermediate_result.source FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target_4000040 fn_target USING (SELECT intermediate_result.id, intermediate_result.source FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target_xxxxxxx fn_target USING (SELECT intermediate_result.id, intermediate_result.source FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)>
RESET client_min_messages;
SELECT * INTO fn_local FROM fn_target ORDER BY 1 ;
-- Should be equal
@ -1290,7 +1310,7 @@ MERGE INTO ft_target
DELETE
WHEN NOT MATCHED THEN
INSERT (id, user_val) VALUES (foreign_table.id, foreign_table.user_val);
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.ft_target USING merge_schema.foreign_table_4000046 foreign_table ON (foreign_table.id OPERATOR(pg_catalog.=) ft_target.id) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, user_val) VALUES (foreign_table.id, (foreign_table.user_val COLLATE "default"))>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.ft_target USING merge_schema.foreign_table_xxxxxxx foreign_table ON (foreign_table.id OPERATOR(pg_catalog.=) ft_target.id) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, user_val) VALUES (foreign_table.id, (foreign_table.user_val COLLATE "default"))>
RESET client_min_messages;
SELECT * FROM ft_target;
id | user_val
@ -1394,7 +1414,7 @@ WHEN NOT MATCHED THEN
DO NOTHING;
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.target_cj_4000050 t USING (merge_schema.source_cj1_4000054 s1 JOIN merge_schema.source_cj2_4000058 s2 ON ((s1.sid1 OPERATOR(pg_catalog.=) s2.sid2))) ON ((t.tid OPERATOR(pg_catalog.=) s1.sid1) AND (t.tid OPERATOR(pg_catalog.=) 2)) WHEN MATCHED THEN UPDATE SET src = s2.src2 WHEN NOT MATCHED THEN DO NOTHING
NOTICE: issuing MERGE INTO merge_schema.target_cj_xxxxxxx t USING (merge_schema.source_cj1_xxxxxxx s1 JOIN merge_schema.source_cj2_xxxxxxx s2 ON ((s1.sid1 OPERATOR(pg_catalog.=) s2.sid2))) ON ((t.tid OPERATOR(pg_catalog.=) s1.sid1) AND (t.tid OPERATOR(pg_catalog.=) 2)) WHEN MATCHED THEN UPDATE SET src = s2.src2 WHEN NOT MATCHED THEN DO NOTHING
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT * FROM target_cj ORDER BY 1;
@ -1465,7 +1485,7 @@ WHEN MATCHED THEN
UPDATE SET src = sub.src, val = sub.val + 100
WHEN NOT MATCHED THEN
DO NOTHING;
NOTICE: issuing MERGE INTO merge_schema.target_cj_4000048 t1 USING (SELECT target_cj.tid, target_cj.src, target_cj.val FROM merge_schema.target_cj_4000048 target_cj) sub ON ((t1.tid OPERATOR(pg_catalog.=) sub.tid) AND (t1.tid OPERATOR(pg_catalog.=) 3)) WHEN MATCHED THEN UPDATE SET src = sub.src, val = (sub.val OPERATOR(pg_catalog.+) 100) WHEN NOT MATCHED THEN DO NOTHING
NOTICE: issuing MERGE INTO merge_schema.target_cj_xxxxxxx t1 USING (SELECT target_cj.tid, target_cj.src, target_cj.val FROM merge_schema.target_cj_xxxxxxx target_cj) sub ON ((t1.tid OPERATOR(pg_catalog.=) sub.tid) AND (t1.tid OPERATOR(pg_catalog.=) 3)) WHEN MATCHED THEN UPDATE SET src = sub.src, val = (sub.val OPERATOR(pg_catalog.+) 100) WHEN NOT MATCHED THEN DO NOTHING
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
set citus.log_remote_commands to false;
SELECT * FROM target_cj ORDER BY 1;
@ -1516,18 +1536,18 @@ BEGIN;
SET citus.log_remote_commands to true;
SET client_min_messages TO DEBUG1;
EXECUTE foo(2);
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_4000047 target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_4000051 source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_4000048 target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_4000052 source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_4000049 target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_4000053 source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_4000050 target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_4000054 source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_4000050 target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_4000054 source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.target_cj_4000050 target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_4000054 source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING
NOTICE: issuing MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
RESET client_min_messages;
EXECUTE foo(2);
NOTICE: issuing MERGE INTO merge_schema.target_cj_4000050 target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_4000054 source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING
NOTICE: issuing MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT * FROM target_cj ORDER BY 1;
@ -1539,6 +1559,611 @@ SELECT * FROM target_cj ORDER BY 1;
3 | target | 0
(4 rows)
ROLLBACK;
-- Test distributed tables, must be co-located and joined on distribution column.
--
-- We create two sets of source and target tables, one set is Postgres and the other
-- is Citus distributed. Run the _exact_ MERGE SQL on both the sets and compare the
-- final results of target tables of Postgres and Citus, the result should match.
-- This is repeated for various MERGE SQL combinations
--
CREATE TABLE pg_target(id int, val varchar);
CREATE TABLE pg_source(id int, val varchar);
CREATE TABLE citus_target(id int, val varchar);
CREATE TABLE citus_source(id int, val varchar);
-- Half of the source rows do not match
INSERT INTO pg_target SELECT i, 'target' FROM generate_series(250, 500) i;
INSERT INTO pg_source SELECT i, 'source' FROM generate_series(1, 500) i;
INSERT INTO citus_target SELECT i, 'target' FROM generate_series(250, 500) i;
INSERT INTO citus_source SELECT i, 'source' FROM generate_series(1, 500) i;
SELECT create_distributed_table('citus_target', 'id');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.citus_target$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('citus_source', 'id');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.citus_source$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
--
-- This routine compares the target tables of Postgres and Citus and
-- returns true if they match, false if the results do not match.
--
CREATE OR REPLACE FUNCTION compare_tables() RETURNS BOOLEAN AS $$
DECLARE ret BOOL;
BEGIN
SELECT count(1) = 0 INTO ret
FROM pg_target
FULL OUTER JOIN citus_target
USING (id, val)
WHERE pg_target.id IS NULL
OR citus_target.id IS NULL;
RETURN ret;
END
$$ LANGUAGE PLPGSQL;
-- Make sure we start with exact data in Postgres and Citus
SELECT compare_tables();
compare_tables
---------------------------------------------------------------------
t
(1 row)
-- Run the MERGE on both Postgres and Citus, and compare the final target tables
BEGIN;
SET citus.log_remote_commands to true;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id
WHEN MATCHED AND t.id > 400 THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id
WHEN MATCHED AND t.id > 400 THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT compare_tables();
compare_tables
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
--
-- ON clause filter on source
--
BEGIN;
SET citus.log_remote_commands to true;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id AND s.id < 100
WHEN MATCHED AND t.id > 400 THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id AND s.id < 100
WHEN MATCHED AND t.id > 400 THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT compare_tables();
compare_tables
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
--
-- ON clause filter on target
--
BEGIN;
SET citus.log_remote_commands to true;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id AND t.id < 100
WHEN MATCHED AND t.id > 400 THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id AND t.id < 100
WHEN MATCHED AND t.id > 400 THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT compare_tables();
compare_tables
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
--
-- NOT MATCHED clause filter on source
--
BEGIN;
SET citus.log_remote_commands to true;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id
WHEN MATCHED THEN
DO NOTHING
WHEN NOT MATCHED AND s.id < 100 THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id
WHEN MATCHED THEN
DO NOTHING
WHEN NOT MATCHED AND s.id < 100 THEN
INSERT VALUES(s.id, s.val);
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT compare_tables();
compare_tables
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
--
-- Test constant filter in ON clause to check if shards are pruned
-- with restriction information
--
--
-- Though constant filter is present, this won't prune shards as
-- NOT MATCHED clause is present
--
BEGIN;
SET citus.log_remote_commands to true;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id AND s.id = 250
WHEN MATCHED THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id AND s.id = 250
WHEN MATCHED THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT compare_tables();
compare_tables
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
-- This will prune shards with restriction information as NOT MATCHED is void
BEGIN;
SET citus.log_remote_commands to true;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id AND s.id = 250
WHEN MATCHED THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN NOT MATCHED THEN
DO NOTHING;
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id AND s.id = 250
WHEN MATCHED THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN NOT MATCHED THEN
DO NOTHING;
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT compare_tables();
compare_tables
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
-- Test CTE with distributed tables
CREATE VIEW pg_source_view AS SELECT * FROM pg_source WHERE id < 400;
WARNING: "view pg_source_view" has dependency to "table pg_source" that is not in Citus' metadata
DETAIL: "view pg_source_view" will be created only locally
HINT: Distribute "table pg_source" first to distribute "view pg_source_view"
CREATE VIEW citus_source_view AS SELECT * FROM citus_source WHERE id < 400;
BEGIN;
SEt citus.log_remote_commands to true;
WITH cte AS (
SELECT * FROM pg_source_view
)
MERGE INTO pg_target t
USING cte
ON cte.id = t.id
WHEN MATCHED AND t.id > 350 THEN
UPDATE SET val = t.val || 'Updated by CTE'
WHEN NOT MATCHED THEN
INSERT VALUES (cte.id, cte.val)
WHEN MATCHED AND t.id < 350 THEN
DELETE;
WITH cte AS (
SELECT * FROM citus_source_view
)
MERGE INTO citus_target t
USING cte
ON cte.id = t.id
WHEN MATCHED AND t.id > 350 THEN
UPDATE SET val = t.val || 'Updated by CTE'
WHEN NOT MATCHED THEN
INSERT VALUES (cte.id, cte.val)
WHEN MATCHED AND t.id < 350 THEN
DELETE;
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT compare_tables();
compare_tables
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
-- Test sub-query with distributed tables
BEGIN;
SEt citus.log_remote_commands to true;
MERGE INTO pg_target t
USING (SELECT * FROM pg_source) subq
ON subq.id = t.id
WHEN MATCHED AND t.id > 350 THEN
UPDATE SET val = t.val || 'Updated by subquery'
WHEN NOT MATCHED THEN
INSERT VALUES (subq.id, subq.val)
WHEN MATCHED AND t.id < 350 THEN
DELETE;
MERGE INTO citus_target t
USING (SELECT * FROM citus_source) subq
ON subq.id = t.id
WHEN MATCHED AND t.id > 350 THEN
UPDATE SET val = t.val || 'Updated by subquery'
WHEN NOT MATCHED THEN
INSERT VALUES (subq.id, subq.val)
WHEN MATCHED AND t.id < 350 THEN
DELETE;
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false;
SELECT compare_tables();
compare_tables
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
-- Test PREPARE
PREPARE pg_prep(int) AS
MERGE INTO pg_target
USING (SELECT * FROM pg_source) sub
ON pg_target.id = sub.id AND pg_target.id = $1
WHEN MATCHED THEN
UPDATE SET val = 'Updated by prepare using ' || sub.val
WHEN NOT MATCHED THEN
DO NOTHING;
PREPARE citus_prep(int) AS
MERGE INTO citus_target
USING (SELECT * FROM citus_source) sub
ON citus_target.id = sub.id AND citus_target.id = $1
WHEN MATCHED THEN
UPDATE SET val = 'Updated by prepare using ' || sub.val
WHEN NOT MATCHED THEN
DO NOTHING;
BEGIN;
SET citus.log_remote_commands to true;
SELECT * FROM pg_target WHERE id = 500; -- before merge
id | val
---------------------------------------------------------------------
500 | target
(1 row)
EXECUTE pg_prep(500);
SELECT * FROM pg_target WHERE id = 500; -- non-cached
id | val
---------------------------------------------------------------------
500 | Updated by prepare using source
(1 row)
EXECUTE pg_prep(500);
EXECUTE pg_prep(500);
EXECUTE pg_prep(500);
EXECUTE pg_prep(500);
EXECUTE pg_prep(500);
SELECT * FROM pg_target WHERE id = 500; -- cached
id | val
---------------------------------------------------------------------
500 | Updated by prepare using source
(1 row)
SELECT * FROM citus_target WHERE id = 500; -- before merge
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT id, val FROM merge_schema.citus_target_xxxxxxx citus_target WHERE (id OPERATOR(pg_catalog.=) 500)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
id | val
---------------------------------------------------------------------
500 | target
(1 row)
EXECUTE citus_prep(500);
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SELECT * FROM citus_target WHERE id = 500; -- non-cached
NOTICE: issuing SELECT id, val FROM merge_schema.citus_target_xxxxxxx citus_target WHERE (id OPERATOR(pg_catalog.=) 500)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
id | val
---------------------------------------------------------------------
500 | Updated by prepare using source
(1 row)
EXECUTE citus_prep(500);
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
EXECUTE citus_prep(500);
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
EXECUTE citus_prep(500);
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
EXECUTE citus_prep(500);
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
EXECUTE citus_prep(500);
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SELECT * FROM citus_target WHERE id = 500; -- cached
NOTICE: issuing SELECT id, val FROM merge_schema.citus_target_xxxxxxx citus_target WHERE (id OPERATOR(pg_catalog.=) 500)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
id | val
---------------------------------------------------------------------
500 | Updated by prepare using source
(1 row)
SET citus.log_remote_commands to false;
SELECT compare_tables();
compare_tables
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
-- Test partitions + distributed tables
CREATE TABLE pg_pa_target (tid integer, balance float, val text)
PARTITION BY LIST (tid);
CREATE TABLE citus_pa_target (tid integer, balance float, val text)
PARTITION BY LIST (tid);
CREATE TABLE part1 PARTITION OF pg_pa_target FOR VALUES IN (1,4)
WITH (autovacuum_enabled=off);
CREATE TABLE part2 PARTITION OF pg_pa_target FOR VALUES IN (2,5,6)
WITH (autovacuum_enabled=off);
CREATE TABLE part3 PARTITION OF pg_pa_target FOR VALUES IN (3,8,9)
WITH (autovacuum_enabled=off);
CREATE TABLE part4 PARTITION OF pg_pa_target DEFAULT
WITH (autovacuum_enabled=off);
CREATE TABLE part5 PARTITION OF citus_pa_target FOR VALUES IN (1,4)
WITH (autovacuum_enabled=off);
CREATE TABLE part6 PARTITION OF citus_pa_target FOR VALUES IN (2,5,6)
WITH (autovacuum_enabled=off);
CREATE TABLE part7 PARTITION OF citus_pa_target FOR VALUES IN (3,8,9)
WITH (autovacuum_enabled=off);
CREATE TABLE part8 PARTITION OF citus_pa_target DEFAULT
WITH (autovacuum_enabled=off);
CREATE TABLE pg_pa_source (sid integer, delta float);
CREATE TABLE citus_pa_source (sid integer, delta float);
-- insert many rows to the source table
INSERT INTO pg_pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
INSERT INTO citus_pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
-- insert a few rows in the target table (odd numbered tid)
INSERT INTO pg_pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id;
INSERT INTO citus_pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id;
SELECT create_distributed_table('citus_pa_target', 'tid');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.part5$$)
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.part6$$)
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.part7$$)
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.part8$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('citus_pa_source', 'sid');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.citus_pa_source$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION pa_compare_tables() RETURNS BOOLEAN AS $$
DECLARE ret BOOL;
BEGIN
SELECT count(1) = 0 INTO ret
FROM pg_pa_target
FULL OUTER JOIN citus_pa_target
USING (tid, balance, val)
WHERE pg_pa_target.tid IS NULL
OR citus_pa_target.tid IS NULL;
RETURN ret;
END
$$ LANGUAGE PLPGSQL;
-- try simple MERGE
BEGIN;
MERGE INTO pg_pa_target t
USING pg_pa_source s
ON t.tid = s.sid
WHEN MATCHED THEN
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
WHEN NOT MATCHED THEN
INSERT VALUES (sid, delta, 'inserted by merge');
MERGE INTO citus_pa_target t
USING citus_pa_source s
ON t.tid = s.sid
WHEN MATCHED THEN
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
WHEN NOT MATCHED THEN
INSERT VALUES (sid, delta, 'inserted by merge');
SELECT pa_compare_tables();
pa_compare_tables
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
-- same with a constant qual
BEGIN;
MERGE INTO pg_pa_target t
USING pg_pa_source s
ON t.tid = s.sid AND tid = 1
WHEN MATCHED THEN
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
WHEN NOT MATCHED THEN
INSERT VALUES (sid, delta, 'inserted by merge');
MERGE INTO citus_pa_target t
USING citus_pa_source s
ON t.tid = s.sid AND tid = 1
WHEN MATCHED THEN
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
WHEN NOT MATCHED THEN
INSERT VALUES (sid, delta, 'inserted by merge');
SELECT pa_compare_tables();
pa_compare_tables
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
--
-- Error and Unsupported scenarios
@ -1935,37 +2560,6 @@ UPDATE SET val = dist_colocated.val
WHEN NOT MATCHED THEN
INSERT VALUES(dist_colocated.id, dist_colocated.val);
ERROR: MERGE command is only supported when distributed tables are joined on their distribution column
-- MERGE command must be joined with with a constant qual on target relation
-- AND clause is missing
MERGE INTO dist_target
USING dist_colocated
ON dist_target.id = dist_colocated.id
WHEN MATCHED THEN
UPDATE SET val = dist_colocated.val
WHEN NOT MATCHED THEN
INSERT VALUES(dist_colocated.id, dist_colocated.val);
ERROR: MERGE on a distributed table requires a constant filter on the distribution column of the target table
HINT: Consider adding AND target.dist_key = <> to the ON clause
-- AND clause incorrect table (must be target)
MERGE INTO dist_target
USING dist_colocated
ON dist_target.id = dist_colocated.id AND dist_colocated.id = 1
WHEN MATCHED THEN
UPDATE SET val = dist_colocated.val
WHEN NOT MATCHED THEN
INSERT VALUES(dist_colocated.id, dist_colocated.val);
ERROR: MERGE on a distributed table requires a constant filter on the distribution column of the target table
HINT: Consider adding AND target.dist_key = <> to the ON clause
-- AND clause incorrect column (must be distribution column)
MERGE INTO dist_target
USING dist_colocated
ON dist_target.id = dist_colocated.id AND dist_target.val = 'const'
WHEN MATCHED THEN
UPDATE SET val = dist_colocated.val
WHEN NOT MATCHED THEN
INSERT VALUES(dist_colocated.id, dist_colocated.val);
ERROR: MERGE on a distributed table requires a constant filter on the distribution column of the target table
HINT: Consider adding AND target.dist_key = <> to the ON clause
-- Both the source and target must be distributed
MERGE INTO dist_target
USING (SELECT 100 id) AS source
@ -2165,7 +2759,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_
PL/pgSQL function citus_drop_trigger() line XX at PERFORM
DROP FUNCTION merge_when_and_write();
DROP SCHEMA merge_schema CASCADE;
NOTICE: drop cascades to 63 other objects
NOTICE: drop cascades to 75 other objects
DETAIL: drop cascades to function insert_data()
drop cascades to table pg_result
drop cascades to table local_local
@ -2220,9 +2814,21 @@ drop cascades to extension postgres_fdw
drop cascades to table target_cj
drop cascades to table source_cj1
drop cascades to table source_cj2
drop cascades to table pg_target
drop cascades to table pg_source
drop cascades to table citus_target
drop cascades to table citus_source
drop cascades to function compare_tables()
drop cascades to view pg_source_view
drop cascades to view citus_source_view
drop cascades to table pg_pa_target
drop cascades to table citus_pa_target
drop cascades to table pg_pa_source
drop cascades to table citus_pa_source
drop cascades to function pa_compare_tables()
drop cascades to table pg
drop cascades to table t1_4000078
drop cascades to table s1_4000079
drop cascades to table t1_4000110
drop cascades to table s1_4000111
drop cascades to table t1
drop cascades to table s1
drop cascades to table dist_colocated

View File

@ -1899,7 +1899,7 @@ MERGE INTO pa_target t
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
WHEN NOT MATCHED THEN
INSERT VALUES (slogts::timestamp, sid, delta, 'inserted by merge');
DEBUG: <Deparsed MERGE query: MERGE INTO pgmerge_schema.pa_target t USING (SELECT '2017-01-15'::text AS slogts, pa_source.sid, pa_source.delta FROM pgmerge_schema.pa_source_4001021 pa_source WHERE (pa_source.sid OPERATOR(pg_catalog.<) 10)) s ON (t.tid OPERATOR(pg_catalog.=) s.sid) WHEN MATCHED THEN UPDATE SET balance = (t.balance OPERATOR(pg_catalog.+) s.delta), val = (t.val OPERATOR(pg_catalog.||) ' updated by merge'::text) WHEN NOT MATCHED THEN INSERT (logts, tid, balance, val) VALUES ((s.slogts)::timestamp without time zone, s.sid, s.delta, 'inserted by merge'::text)>
DEBUG: <Deparsed MERGE query: MERGE INTO pgmerge_schema.pa_target t USING (SELECT '2017-01-15'::text AS slogts, pa_source.sid, pa_source.delta FROM pgmerge_schema.pa_source_xxxxxxx pa_source WHERE (pa_source.sid OPERATOR(pg_catalog.<) 10)) s ON (t.tid OPERATOR(pg_catalog.=) s.sid) WHEN MATCHED THEN UPDATE SET balance = (t.balance OPERATOR(pg_catalog.+) s.delta), val = (t.val OPERATOR(pg_catalog.||) ' updated by merge'::text) WHEN NOT MATCHED THEN INSERT (logts, tid, balance, val) VALUES ((s.slogts)::timestamp without time zone, s.sid, s.delta, 'inserted by merge'::text)>
SELECT * FROM pa_target ORDER BY tid;
logts | tid | balance | val
---------------------------------------------------------------------
@ -2091,7 +2091,7 @@ WHEN MATCHED THEN UPDATE
WHEN NOT MATCHED THEN INSERT
(city_id, logdate, peaktemp, unitsales)
VALUES (city_id, logdate, peaktemp, unitsales);
DEBUG: <Deparsed MERGE query: MERGE INTO pgmerge_schema.measurement m USING pgmerge_schema.new_measurement_4001026 nm ON ((m.city_id OPERATOR(pg_catalog.=) nm.city_id) AND (m.logdate OPERATOR(pg_catalog.=) nm.logdate)) WHEN MATCHED AND (nm.peaktemp IS NULL) THEN DELETE WHEN MATCHED THEN UPDATE SET peaktemp = GREATEST(m.peaktemp, nm.peaktemp), unitsales = (m.unitsales OPERATOR(pg_catalog.+) COALESCE(nm.unitsales, 0)) WHEN NOT MATCHED THEN INSERT (city_id, logdate, peaktemp, unitsales) VALUES (nm.city_id, nm.logdate, nm.peaktemp, nm.unitsales)>
DEBUG: <Deparsed MERGE query: MERGE INTO pgmerge_schema.measurement m USING pgmerge_schema.new_measurement_xxxxxxx nm ON ((m.city_id OPERATOR(pg_catalog.=) nm.city_id) AND (m.logdate OPERATOR(pg_catalog.=) nm.logdate)) WHEN MATCHED AND (nm.peaktemp IS NULL) THEN DELETE WHEN MATCHED THEN UPDATE SET peaktemp = GREATEST(m.peaktemp, nm.peaktemp), unitsales = (m.unitsales OPERATOR(pg_catalog.+) COALESCE(nm.unitsales, 0)) WHEN NOT MATCHED THEN INSERT (city_id, logdate, peaktemp, unitsales) VALUES (nm.city_id, nm.logdate, nm.peaktemp, nm.unitsales)>
RESET client_min_messages;
SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate;
tableoid | city_id | logdate | peaktemp | unitsales

View File

@ -146,7 +146,7 @@ SELECT create_distributed_table('source', 'customer_id');
-- Updates one of the row with customer_id = 30002
SELECT * from target t WHERE t.customer_id = 30002;
-- Turn on notice to print tasks sent to nodes (it should be a single task)
-- Turn on notice to print tasks sent to nodes
SET citus.log_remote_commands to true;
MERGE INTO target t
USING source s
@ -160,9 +160,9 @@ MERGE INTO target t
order_count = t.order_count + 1,
last_order_id = s.order_id
WHEN NOT MATCHED THEN -- New entry, record it.
INSERT (customer_id, last_order_id, order_center, order_count, last_order)
VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time);
WHEN NOT MATCHED THEN
DO NOTHING;
SET citus.log_remote_commands to false;
SELECT * from target t WHERE t.customer_id = 30002;
@ -284,8 +284,8 @@ MERGE INTO t1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1_res.id, s1_res.val);
SET citus.log_remote_commands to false;
-- As the id 6 is NO match, VALUES(6, 1) should appear in target
SELECT * FROM t1 order by id;
-- Other than id 6 everything else is a NO match, and should appear in target
SELECT * FROM t1 order by 1, 2;
--
-- Test with multiple join conditions
@ -366,7 +366,7 @@ ON t2.id = s2.id AND t2.src = s2.src AND t2.id = 4
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src);
DO NOTHING;
SET citus.log_remote_commands to false;
-- Row with id = 4 is a match for delete clause, row should be deleted
-- Row with id = 3 is a NO match, row from source will be inserted
@ -999,6 +999,424 @@ SET citus.log_remote_commands to false;
SELECT * FROM target_cj ORDER BY 1;
ROLLBACK;
-- Test distributed tables, must be co-located and joined on distribution column.
--
-- We create two sets of source and target tables, one set is Postgres and the other
-- is Citus distributed. Run the _exact_ MERGE SQL on both the sets and compare the
-- final results of target tables of Postgres and Citus, the result should match.
-- This is repeated for various MERGE SQL combinations
--
CREATE TABLE pg_target(id int, val varchar);
CREATE TABLE pg_source(id int, val varchar);
CREATE TABLE citus_target(id int, val varchar);
CREATE TABLE citus_source(id int, val varchar);
-- Half of the source rows do not match
INSERT INTO pg_target SELECT i, 'target' FROM generate_series(250, 500) i;
INSERT INTO pg_source SELECT i, 'source' FROM generate_series(1, 500) i;
INSERT INTO citus_target SELECT i, 'target' FROM generate_series(250, 500) i;
INSERT INTO citus_source SELECT i, 'source' FROM generate_series(1, 500) i;
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id');
--
-- This routine compares the target tables of Postgres and Citus and
-- returns true if they match, false if the results do not match.
--
CREATE OR REPLACE FUNCTION compare_tables() RETURNS BOOLEAN AS $$
DECLARE ret BOOL;
BEGIN
SELECT count(1) = 0 INTO ret
FROM pg_target
FULL OUTER JOIN citus_target
USING (id, val)
WHERE pg_target.id IS NULL
OR citus_target.id IS NULL;
RETURN ret;
END
$$ LANGUAGE PLPGSQL;
-- Make sure we start with exact data in Postgres and Citus
SELECT compare_tables();
-- Run the MERGE on both Postgres and Citus, and compare the final target tables
BEGIN;
SET citus.log_remote_commands to true;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id
WHEN MATCHED AND t.id > 400 THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id
WHEN MATCHED AND t.id > 400 THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
SET citus.log_remote_commands to false;
SELECT compare_tables();
ROLLBACK;
--
-- ON clause filter on source
--
BEGIN;
SET citus.log_remote_commands to true;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id AND s.id < 100
WHEN MATCHED AND t.id > 400 THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id AND s.id < 100
WHEN MATCHED AND t.id > 400 THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
SET citus.log_remote_commands to false;
SELECT compare_tables();
ROLLBACK;
--
-- ON clause filter on target
--
BEGIN;
SET citus.log_remote_commands to true;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id AND t.id < 100
WHEN MATCHED AND t.id > 400 THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id AND t.id < 100
WHEN MATCHED AND t.id > 400 THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
SET citus.log_remote_commands to false;
SELECT compare_tables();
ROLLBACK;
--
-- NOT MATCHED clause filter on source
--
BEGIN;
SET citus.log_remote_commands to true;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id
WHEN MATCHED THEN
DO NOTHING
WHEN NOT MATCHED AND s.id < 100 THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id
WHEN MATCHED THEN
DO NOTHING
WHEN NOT MATCHED AND s.id < 100 THEN
INSERT VALUES(s.id, s.val);
SET citus.log_remote_commands to false;
SELECT compare_tables();
ROLLBACK;
--
-- Test constant filter in ON clause to check if shards are pruned
-- with restriction information
--
--
-- Though constant filter is present, this won't prune shards as
-- NOT MATCHED clause is present
--
BEGIN;
SET citus.log_remote_commands to true;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id AND s.id = 250
WHEN MATCHED THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id AND s.id = 250
WHEN MATCHED THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
SET citus.log_remote_commands to false;
SELECT compare_tables();
ROLLBACK;
-- This will prune shards with restriction information as NOT MATCHED is void
BEGIN;
SET citus.log_remote_commands to true;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id AND s.id = 250
WHEN MATCHED THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN NOT MATCHED THEN
DO NOTHING;
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id AND s.id = 250
WHEN MATCHED THEN
UPDATE SET val = t.val || 'Updated by Merge'
WHEN NOT MATCHED THEN
DO NOTHING;
SET citus.log_remote_commands to false;
SELECT compare_tables();
ROLLBACK;
-- Test CTE with distributed tables
CREATE VIEW pg_source_view AS SELECT * FROM pg_source WHERE id < 400;
CREATE VIEW citus_source_view AS SELECT * FROM citus_source WHERE id < 400;
BEGIN;
SEt citus.log_remote_commands to true;
WITH cte AS (
SELECT * FROM pg_source_view
)
MERGE INTO pg_target t
USING cte
ON cte.id = t.id
WHEN MATCHED AND t.id > 350 THEN
UPDATE SET val = t.val || 'Updated by CTE'
WHEN NOT MATCHED THEN
INSERT VALUES (cte.id, cte.val)
WHEN MATCHED AND t.id < 350 THEN
DELETE;
WITH cte AS (
SELECT * FROM citus_source_view
)
MERGE INTO citus_target t
USING cte
ON cte.id = t.id
WHEN MATCHED AND t.id > 350 THEN
UPDATE SET val = t.val || 'Updated by CTE'
WHEN NOT MATCHED THEN
INSERT VALUES (cte.id, cte.val)
WHEN MATCHED AND t.id < 350 THEN
DELETE;
SET citus.log_remote_commands to false;
SELECT compare_tables();
ROLLBACK;
-- Test sub-query with distributed tables
BEGIN;
SEt citus.log_remote_commands to true;
MERGE INTO pg_target t
USING (SELECT * FROM pg_source) subq
ON subq.id = t.id
WHEN MATCHED AND t.id > 350 THEN
UPDATE SET val = t.val || 'Updated by subquery'
WHEN NOT MATCHED THEN
INSERT VALUES (subq.id, subq.val)
WHEN MATCHED AND t.id < 350 THEN
DELETE;
MERGE INTO citus_target t
USING (SELECT * FROM citus_source) subq
ON subq.id = t.id
WHEN MATCHED AND t.id > 350 THEN
UPDATE SET val = t.val || 'Updated by subquery'
WHEN NOT MATCHED THEN
INSERT VALUES (subq.id, subq.val)
WHEN MATCHED AND t.id < 350 THEN
DELETE;
SET citus.log_remote_commands to false;
SELECT compare_tables();
ROLLBACK;
-- Test PREPARE
PREPARE pg_prep(int) AS
MERGE INTO pg_target
USING (SELECT * FROM pg_source) sub
ON pg_target.id = sub.id AND pg_target.id = $1
WHEN MATCHED THEN
UPDATE SET val = 'Updated by prepare using ' || sub.val
WHEN NOT MATCHED THEN
DO NOTHING;
PREPARE citus_prep(int) AS
MERGE INTO citus_target
USING (SELECT * FROM citus_source) sub
ON citus_target.id = sub.id AND citus_target.id = $1
WHEN MATCHED THEN
UPDATE SET val = 'Updated by prepare using ' || sub.val
WHEN NOT MATCHED THEN
DO NOTHING;
BEGIN;
SET citus.log_remote_commands to true;
SELECT * FROM pg_target WHERE id = 500; -- before merge
EXECUTE pg_prep(500);
SELECT * FROM pg_target WHERE id = 500; -- non-cached
EXECUTE pg_prep(500);
EXECUTE pg_prep(500);
EXECUTE pg_prep(500);
EXECUTE pg_prep(500);
EXECUTE pg_prep(500);
SELECT * FROM pg_target WHERE id = 500; -- cached
SELECT * FROM citus_target WHERE id = 500; -- before merge
EXECUTE citus_prep(500);
SELECT * FROM citus_target WHERE id = 500; -- non-cached
EXECUTE citus_prep(500);
EXECUTE citus_prep(500);
EXECUTE citus_prep(500);
EXECUTE citus_prep(500);
EXECUTE citus_prep(500);
SELECT * FROM citus_target WHERE id = 500; -- cached
SET citus.log_remote_commands to false;
SELECT compare_tables();
ROLLBACK;
-- Test partitions + distributed tables
CREATE TABLE pg_pa_target (tid integer, balance float, val text)
PARTITION BY LIST (tid);
CREATE TABLE citus_pa_target (tid integer, balance float, val text)
PARTITION BY LIST (tid);
CREATE TABLE part1 PARTITION OF pg_pa_target FOR VALUES IN (1,4)
WITH (autovacuum_enabled=off);
CREATE TABLE part2 PARTITION OF pg_pa_target FOR VALUES IN (2,5,6)
WITH (autovacuum_enabled=off);
CREATE TABLE part3 PARTITION OF pg_pa_target FOR VALUES IN (3,8,9)
WITH (autovacuum_enabled=off);
CREATE TABLE part4 PARTITION OF pg_pa_target DEFAULT
WITH (autovacuum_enabled=off);
CREATE TABLE part5 PARTITION OF citus_pa_target FOR VALUES IN (1,4)
WITH (autovacuum_enabled=off);
CREATE TABLE part6 PARTITION OF citus_pa_target FOR VALUES IN (2,5,6)
WITH (autovacuum_enabled=off);
CREATE TABLE part7 PARTITION OF citus_pa_target FOR VALUES IN (3,8,9)
WITH (autovacuum_enabled=off);
CREATE TABLE part8 PARTITION OF citus_pa_target DEFAULT
WITH (autovacuum_enabled=off);
CREATE TABLE pg_pa_source (sid integer, delta float);
CREATE TABLE citus_pa_source (sid integer, delta float);
-- insert many rows to the source table
INSERT INTO pg_pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
INSERT INTO citus_pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
-- insert a few rows in the target table (odd numbered tid)
INSERT INTO pg_pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id;
INSERT INTO citus_pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id;
SELECT create_distributed_table('citus_pa_target', 'tid');
SELECT create_distributed_table('citus_pa_source', 'sid');
CREATE OR REPLACE FUNCTION pa_compare_tables() RETURNS BOOLEAN AS $$
DECLARE ret BOOL;
BEGIN
SELECT count(1) = 0 INTO ret
FROM pg_pa_target
FULL OUTER JOIN citus_pa_target
USING (tid, balance, val)
WHERE pg_pa_target.tid IS NULL
OR citus_pa_target.tid IS NULL;
RETURN ret;
END
$$ LANGUAGE PLPGSQL;
-- try simple MERGE
BEGIN;
MERGE INTO pg_pa_target t
USING pg_pa_source s
ON t.tid = s.sid
WHEN MATCHED THEN
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
WHEN NOT MATCHED THEN
INSERT VALUES (sid, delta, 'inserted by merge');
MERGE INTO citus_pa_target t
USING citus_pa_source s
ON t.tid = s.sid
WHEN MATCHED THEN
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
WHEN NOT MATCHED THEN
INSERT VALUES (sid, delta, 'inserted by merge');
SELECT pa_compare_tables();
ROLLBACK;
-- same with a constant qual
BEGIN;
MERGE INTO pg_pa_target t
USING pg_pa_source s
ON t.tid = s.sid AND tid = 1
WHEN MATCHED THEN
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
WHEN NOT MATCHED THEN
INSERT VALUES (sid, delta, 'inserted by merge');
MERGE INTO citus_pa_target t
USING citus_pa_source s
ON t.tid = s.sid AND tid = 1
WHEN MATCHED THEN
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
WHEN NOT MATCHED THEN
INSERT VALUES (sid, delta, 'inserted by merge');
SELECT pa_compare_tables();
ROLLBACK;
--
-- Error and Unsupported scenarios
--
@ -1241,34 +1659,6 @@ UPDATE SET val = dist_colocated.val
WHEN NOT MATCHED THEN
INSERT VALUES(dist_colocated.id, dist_colocated.val);
-- MERGE command must be joined with with a constant qual on target relation
-- AND clause is missing
MERGE INTO dist_target
USING dist_colocated
ON dist_target.id = dist_colocated.id
WHEN MATCHED THEN
UPDATE SET val = dist_colocated.val
WHEN NOT MATCHED THEN
INSERT VALUES(dist_colocated.id, dist_colocated.val);
-- AND clause incorrect table (must be target)
MERGE INTO dist_target
USING dist_colocated
ON dist_target.id = dist_colocated.id AND dist_colocated.id = 1
WHEN MATCHED THEN
UPDATE SET val = dist_colocated.val
WHEN NOT MATCHED THEN
INSERT VALUES(dist_colocated.id, dist_colocated.val);
-- AND clause incorrect column (must be distribution column)
MERGE INTO dist_target
USING dist_colocated
ON dist_target.id = dist_colocated.id AND dist_target.val = 'const'
WHEN MATCHED THEN
UPDATE SET val = dist_colocated.val
WHEN NOT MATCHED THEN
INSERT VALUES(dist_colocated.id, dist_colocated.val);
-- Both the source and target must be distributed
MERGE INTO dist_target