Redo #7620: Fix merge command when insert value does not have source distributed column (#7627)

Related to issue #7619, #7620
Merge command fails when source query is single sharded and source and
target are co-located and insert is not using distribution key of
source.

Example
```
CREATE TABLE source (id integer);
CREATE TABLE target (id integer );

-- let's distribute both table on id field
SELECT create_distributed_table('source', 'id');
SELECT create_distributed_table('target', 'id');

MERGE INTO target t
  USING ( SELECT 1 AS somekey
          FROM source
        WHERE source.id = 1) s
  ON t.id = s.somekey
  WHEN NOT MATCHED
  THEN INSERT (id)
    VALUES (s.somekey)

ERROR:  MERGE INSERT must use the source table distribution column value
HINT:  MERGE INSERT must use the source table distribution column value
```

Author's Opinion: If join is not between source and target distributed
column, we should not force user to use source distributed column while
inserting value of target distributed column.

Fix: If user is not using distributed key of source for insertion let's
not push down query to workers and don't force user to use source
distributed column if it is not part of join.

This reverts commit fa4fc0b372.

Co-authored-by: paragjain <paragjain@microsoft.com>
(cherry picked from commit aaaf637a6b)
pull/7658/head
Jelte Fennema-Nio 2024-06-17 16:07:25 +02:00 committed by Jelte Fennema-Nio
parent 3594bd7ac0
commit 4f0053ed6d
4 changed files with 470 additions and 32 deletions

View File

@ -182,14 +182,6 @@ CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery, Query *query,
return distributedPlan; return distributedPlan;
} }
Var *insertVar =
FetchAndValidateInsertVarIfExists(targetRelationId, originalQuery);
if (insertVar &&
!IsDistributionColumnInMergeSource((Expr *) insertVar, originalQuery, true))
{
ereport(ERROR, (errmsg("MERGE INSERT must use the source table "
"distribution column value")));
}
Job *job = RouterJob(originalQuery, plannerRestrictionContext, Job *job = RouterJob(originalQuery, plannerRestrictionContext,
&distributedPlan->planningError); &distributedPlan->planningError);
@ -1124,6 +1116,27 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
"repartitioning"))); "repartitioning")));
return deferredError; return deferredError;
} }
/*
* If execution has reached this point, it indicates that the query can be delegated to the worker.
* However, before proceeding with this delegation, we need to confirm that the user is utilizing
* the distribution column of the source table in the Insert variable.
* If this is not the case, we should refrain from pushing down the query.
* This is just a deffered error which will be handle by caller.
*/
Var *insertVar =
FetchAndValidateInsertVarIfExists(targetRelationId, query);
if (insertVar &&
!IsDistributionColumnInMergeSource((Expr *) insertVar, query, true))
{
ereport(DEBUG1, (errmsg(
"MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied")));
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied",
NULL, NULL);
}
return NULL; return NULL;
} }

View File

@ -1128,7 +1128,7 @@ DO NOTHING
WHEN NOT MATCHED THEN WHEN NOT MATCHED THEN
INSERT VALUES(rs_source.id); INSERT VALUES(rs_source.id);
DEBUG: Creating MERGE router plan DEBUG: Creating MERGE router plan
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.rs_target_xxxxxxx rs_target USING (SELECT id.id FROM merge_schema.f_immutable(99) id(id) WHERE (id.id OPERATOR(pg_catalog.=) ANY (SELECT 99))) rs_source ON (rs_source.id OPERATOR(pg_catalog.=) rs_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id) VALUES (rs_source.id)> DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.rs_target_xxxxxxx rs_target USING (SELECT id.id FROM merge_schema.f_immutable(99) id(id) WHERE (id.id OPERATOR(pg_catalog.=) ANY (SELECT 99))) rs_source ON (rs_source.id OPERATOR(pg_catalog.=) rs_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id) VALUES (rs_source.id)>
RESET client_min_messages; RESET client_min_messages;
SELECT * INTO rs_local FROM rs_target ORDER BY 1 ; SELECT * INTO rs_local FROM rs_target ORDER BY 1 ;
-- Should be equal -- Should be equal
@ -1259,7 +1259,7 @@ DO NOTHING
WHEN NOT MATCHED THEN WHEN NOT MATCHED THEN
INSERT VALUES(fn_source.id, fn_source.source); INSERT VALUES(fn_source.id, fn_source.source);
DEBUG: Creating MERGE router plan DEBUG: Creating MERGE router plan
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target_xxxxxxx fn_target USING (SELECT dist_table.id, dist_table.source FROM merge_schema.dist_table_xxxxxxx dist_table) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)> DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target_xxxxxxx fn_target USING (SELECT dist_table.id, dist_table.source FROM merge_schema.dist_table_xxxxxxx dist_table) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)>
RESET client_min_messages; RESET client_min_messages;
SELECT * INTO fn_local FROM fn_target ORDER BY 1 ; SELECT * INTO fn_local FROM fn_target ORDER BY 1 ;
-- Should be equal -- Should be equal
@ -1552,7 +1552,7 @@ BEGIN;
SET citus.log_remote_commands to true; SET citus.log_remote_commands to true;
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
EXECUTE merge_prepare(2); EXECUTE merge_prepare(2);
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING > DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING>
DEBUG: Creating MERGE router plan DEBUG: Creating MERGE router plan
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -1782,13 +1782,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.log_remote_commands to false; SET citus.log_remote_commands to false;
SELECT compare_tables(); SELECT compare_tables();
@ -1842,6 +1842,297 @@ SELECT compare_tables();
(1 row) (1 row)
ROLLBACK; ROLLBACK;
-- let's create source and target table
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000;
CREATE TABLE source_pushdowntest (id integer);
CREATE TABLE target_pushdowntest (id integer );
-- let's distribute both table on id field
SELECT create_distributed_table('source_pushdowntest', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('target_pushdowntest', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- we are doing this operation on single node setup let's figure out colocation id of both tables
-- both has same colocation id so both are colocated.
WITH colocations AS (
SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'source_pushdowntest'::regclass
OR logicalrelid = 'target_pushdowntest'::regclass
)
SELECT
CASE
WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same'
ELSE 'Different'
END AS colocation_status
FROM colocations;
colocation_status
---------------------------------------------------------------------
Same
(1 row)
SET client_min_messages TO DEBUG1;
-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers.
EXPLAIN (costs off, timing off, summary off)
MERGE INTO target_pushdowntest t
USING source_pushdowntest s
ON t.id = s.id
WHEN NOT MATCHED THEN
INSERT (id)
VALUES (s.id);
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
DEBUG: Creating MERGE router plan
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_pushdowntest_4000068 t
-> Merge Left Join
Merge Cond: (s.id = t.id)
-> Sort
Sort Key: s.id
-> Seq Scan on source_pushdowntest_4000064 s
-> Sort
Sort Key: t.id
-> Seq Scan on target_pushdowntest_4000068 t
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_pushdowntest_4000069 t
-> Merge Left Join
Merge Cond: (s.id = t.id)
-> Sort
Sort Key: s.id
-> Seq Scan on source_pushdowntest_4000065 s
-> Sort
Sort Key: t.id
-> Seq Scan on target_pushdowntest_4000069 t
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_pushdowntest_4000070 t
-> Merge Left Join
Merge Cond: (s.id = t.id)
-> Sort
Sort Key: s.id
-> Seq Scan on source_pushdowntest_4000066 s
-> Sort
Sort Key: t.id
-> Seq Scan on target_pushdowntest_4000070 t
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_pushdowntest_4000071 t
-> Merge Left Join
Merge Cond: (s.id = t.id)
-> Sort
Sort Key: s.id
-> Seq Scan on source_pushdowntest_4000067 s
-> Sort
Sort Key: t.id
-> Seq Scan on target_pushdowntest_4000071 t
(47 rows)
-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker.
-- DEBUG LOGS show that query is getting pushed down
MERGE INTO target_pushdowntest t
USING (SELECT * from source_pushdowntest where id = 1) s
on t.id = s.id
WHEN NOT MATCHED THEN
INSERT (id)
VALUES (s.id);
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING (SELECT source_pushdowntest.id FROM merge_schema.source_pushdowntest_xxxxxxx source_pushdowntest WHERE (source_pushdowntest.id OPERATOR(pg_catalog.=) 1)) s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
DEBUG: Creating MERGE router plan
-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown.
INSERT INTO source_pushdowntest (id) VALUES (3);
EXPLAIN (costs off, timing off, summary off)
MERGE INTO target_pushdowntest t
USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s
on t.id = s.somekey
WHEN NOT MATCHED THEN
INSERT (id)
VALUES (s.somekey);
DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied
DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied
DEBUG: Creating MERGE repartition plan
DEBUG: Using column - index:0 from the source list to redistribute
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus MERGE INTO ...)
MERGE INTO target_pushdowntest method: pull to coordinator
-> Custom Scan (Citus Adaptive)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on source_pushdowntest_4000064 source_pushdowntest
Filter: (id = 1)
(9 rows)
-- let's verify if we use some other column from source for value of distributed column in target.
-- it should be inserted to correct shard of target.
CREATE TABLE source_withdata (id integer, some_number integer);
CREATE TABLE target_table (id integer, name text);
SELECT create_distributed_table('source_withdata', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('target_table', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO source_withdata (id, some_number) VALUES (1, 3);
-- we will use some_number column from source_withdata to insert into distributed column of target.
-- value of some_number is 3 let's verify what shard it should go to.
select worker_hash(3);
worker_hash
---------------------------------------------------------------------
-28094569
(1 row)
-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard.
MERGE INTO target_table t
USING (SELECT id, some_number from source_withdata where id = 1) s
on t.id = s.some_number
WHEN NOT MATCHED THEN
INSERT (id, name)
VALUES (s.some_number, 'parag');
DEBUG: Sub-query is not pushable, try repartitioning
DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns
DEBUG: Creating MERGE repartition plan
DEBUG: Using column - index:1 from the source list to redistribute
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000076'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.some_number, 'parag'::text)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000077'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.some_number, 'parag'::text)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000078'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.some_number, 'parag'::text)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000079'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.some_number, 'parag'::text)>
DEBUG: Execute MERGE task list
-- let's verify if data inserted to second shard of target.
EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 4
Tuple data received from nodes: 9 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on target_table_4000076 target_table (actual rows=0 loops=1)
-> Task
Tuple data received from node: 9 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on target_table_4000077 target_table (actual rows=1 loops=1)
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on target_table_4000078 target_table (actual rows=0 loops=1)
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on target_table_4000079 target_table (actual rows=0 loops=1)
(20 rows)
-- let's verify target data too.
SELECT * FROM target_table;
id | name
---------------------------------------------------------------------
3 | parag
(1 row)
-- test UPDATE : when source is single sharded and table are colocated
MERGE INTO target_table t
USING (SELECT id, some_number from source_withdata where id = 1) s
on t.id = s.some_number
WHEN MATCHED THEN
UPDATE SET name = 'parag jain';
DEBUG: Sub-query is not pushable, try repartitioning
DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns
DEBUG: Creating MERGE repartition plan
DEBUG: Using column - index:1 from the source list to redistribute
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000076'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN UPDATE SET name = 'parag jain'::text>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000077'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN UPDATE SET name = 'parag jain'::text>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000078'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN UPDATE SET name = 'parag jain'::text>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000079'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN UPDATE SET name = 'parag jain'::text>
DEBUG: Execute MERGE task list
-- let's verify if data updated properly.
SELECT * FROM target_table;
id | name
---------------------------------------------------------------------
3 | parag jain
(1 row)
-- let's see what happend when we try to update distributed key of target table
MERGE INTO target_table t
USING (SELECT id, some_number from source_withdata where id = 1) s
on t.id = s.some_number
WHEN MATCHED THEN
UPDATE SET id = 1500;
ERROR: updating the distribution column is not allowed in MERGE actions
SELECT * FROM target_table;
id | name
---------------------------------------------------------------------
3 | parag jain
(1 row)
-- test DELETE : when source is single sharded and table are colocated
MERGE INTO target_table t
USING (SELECT id, some_number from source_withdata where id = 1) s
on t.id = s.some_number
WHEN MATCHED THEN
DELETE;
DEBUG: Sub-query is not pushable, try repartitioning
DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns
DEBUG: Creating MERGE repartition plan
DEBUG: Using column - index:1 from the source list to redistribute
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000076'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN DELETE>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000077'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN DELETE>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000078'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN DELETE>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000079'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN DELETE>
DEBUG: Execute MERGE task list
-- let's verify if data deleted properly.
SELECT * FROM target_table;
id | name
---------------------------------------------------------------------
(0 rows)
--
DELETE FROM source_withdata;
DELETE FROM target_table;
INSERT INTO source VALUES (1,1);
merge into target_table sda
using source_withdata sdn
on sda.id = sdn.id AND sda.id = 1
when not matched then
insert (id)
values (10000);
ERROR: MERGE INSERT is using unsupported expression type for distribution column
DETAIL: Inserting arbitrary values that don't correspond to the joined column values can lead to unpredictable outcomes where rows are incorrectly distributed among different shards
SELECT * FROM target_table WHERE id = 10000;
id | name
---------------------------------------------------------------------
(0 rows)
RESET client_min_messages;
-- This will prune shards with restriction information as NOT MATCHED is void -- This will prune shards with restriction information as NOT MATCHED is void
BEGIN; BEGIN;
SET citus.log_remote_commands to true; SET citus.log_remote_commands to true;
@ -2898,14 +3189,14 @@ WHEN NOT MATCHED THEN
-> Limit -> Limit
-> Sort -> Sort
Sort Key: id2 Sort Key: id2
-> Seq Scan on demo_source_table_4000135 demo_source_table -> Seq Scan on demo_source_table_4000151 demo_source_table
-> Distributed Subplan XXX_2 -> Distributed Subplan XXX_2
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
Task Count: 4 Task Count: 4
Tasks Shown: One of 4 Tasks Shown: One of 4
-> Task -> Task
Node: host=localhost port=xxxxx dbname=regression Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on demo_source_table_4000135 demo_source_table -> Seq Scan on demo_source_table_4000151 demo_source_table
Task Count: 1 Task Count: 1
Tasks Shown: All Tasks Shown: All
-> Task -> Task
@ -3119,10 +3410,10 @@ DEBUG: Creating MERGE repartition plan
DEBUG: Using column - index:0 from the source list to redistribute DEBUG: Using column - index:0 from the source list to redistribute
DEBUG: Collect source query results on coordinator DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000147'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)> DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000163'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000148'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)> DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000164'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000149'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)> DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000165'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000150'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)> DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000166'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
DEBUG: Execute MERGE task list DEBUG: Execute MERGE task list
RESET client_min_messages; RESET client_min_messages;
SELECT * FROM target_6785 ORDER BY 1; SELECT * FROM target_6785 ORDER BY 1;
@ -3240,7 +3531,7 @@ USING s1 s
ON t.id = s.id ON t.id = s.id
WHEN NOT MATCHED THEN WHEN NOT MATCHED THEN
INSERT (id) VALUES(s.val); INSERT (id) VALUES(s.val);
ERROR: MERGE INSERT must use the source table distribution column value ERROR: MERGE INSERT must use the source's joining column for target's distribution column
MERGE INTO t1 t MERGE INTO t1 t
USING s1 s USING s1 s
ON t.id = s.id ON t.id = s.id
@ -3966,7 +4257,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_
PL/pgSQL function citus_drop_trigger() line XX at PERFORM PL/pgSQL function citus_drop_trigger() line XX at PERFORM
DROP FUNCTION merge_when_and_write(); DROP FUNCTION merge_when_and_write();
DROP SCHEMA merge_schema CASCADE; DROP SCHEMA merge_schema CASCADE;
NOTICE: drop cascades to 103 other objects NOTICE: drop cascades to 107 other objects
DETAIL: drop cascades to function insert_data() DETAIL: drop cascades to function insert_data()
drop cascades to table local_local drop cascades to table local_local
drop cascades to table target drop cascades to table target
@ -4026,6 +4317,10 @@ drop cascades to table pg_source
drop cascades to table citus_target drop cascades to table citus_target
drop cascades to table citus_source drop cascades to table citus_source
drop cascades to function compare_tables() drop cascades to function compare_tables()
drop cascades to table source_pushdowntest
drop cascades to table target_pushdowntest
drop cascades to table source_withdata
drop cascades to table target_table
drop cascades to view pg_source_view drop cascades to view pg_source_view
drop cascades to view citus_source_view drop cascades to view citus_source_view
drop cascades to table pg_pa_target drop cascades to table pg_pa_target
@ -4042,7 +4337,7 @@ drop cascades to table target_set
drop cascades to table source_set drop cascades to table source_set
drop cascades to table refsource_ref drop cascades to table refsource_ref
drop cascades to table pg_result drop cascades to table pg_result
drop cascades to table refsource_ref_4000112 drop cascades to table refsource_ref_4000128
drop cascades to table pg_ref drop cascades to table pg_ref
drop cascades to table local_ref drop cascades to table local_ref
drop cascades to table reftarget_local drop cascades to table reftarget_local
@ -4060,11 +4355,7 @@ drop cascades to table source_6785
drop cascades to table target_6785 drop cascades to table target_6785
drop cascades to function add_s(integer,integer) drop cascades to function add_s(integer,integer)
drop cascades to table pg drop cascades to table pg
drop cascades to table t1_4000174 drop cascades to table t1_4000190
drop cascades to table s1_4000175 drop cascades to table s1_4000191
drop cascades to table t1 drop cascades to table t1
drop cascades to table s1 and 7 other objects (see server log for list)
drop cascades to table dist_target
drop cascades to table dist_source
drop cascades to view show_tables
and 3 other objects (see server log for list)

View File

@ -113,7 +113,8 @@ test: function_with_case_when
test: clock test: clock
# MERGE tests # MERGE tests
test: merge pgmerge merge_repartition2 test: merge pgmerge
test: merge_repartition2
test: merge_repartition1 merge_schema_sharding test: merge_repartition1 merge_schema_sharding
test: merge_partition_tables test: merge_partition_tables

View File

@ -1206,6 +1206,139 @@ SET citus.log_remote_commands to false;
SELECT compare_tables(); SELECT compare_tables();
ROLLBACK; ROLLBACK;
-- let's create source and target table
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000;
CREATE TABLE source_pushdowntest (id integer);
CREATE TABLE target_pushdowntest (id integer );
-- let's distribute both table on id field
SELECT create_distributed_table('source_pushdowntest', 'id');
SELECT create_distributed_table('target_pushdowntest', 'id');
-- we are doing this operation on single node setup let's figure out colocation id of both tables
-- both has same colocation id so both are colocated.
WITH colocations AS (
SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'source_pushdowntest'::regclass
OR logicalrelid = 'target_pushdowntest'::regclass
)
SELECT
CASE
WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same'
ELSE 'Different'
END AS colocation_status
FROM colocations;
SET client_min_messages TO DEBUG1;
-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers.
EXPLAIN (costs off, timing off, summary off)
MERGE INTO target_pushdowntest t
USING source_pushdowntest s
ON t.id = s.id
WHEN NOT MATCHED THEN
INSERT (id)
VALUES (s.id);
-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker.
-- DEBUG LOGS show that query is getting pushed down
MERGE INTO target_pushdowntest t
USING (SELECT * from source_pushdowntest where id = 1) s
on t.id = s.id
WHEN NOT MATCHED THEN
INSERT (id)
VALUES (s.id);
-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown.
INSERT INTO source_pushdowntest (id) VALUES (3);
EXPLAIN (costs off, timing off, summary off)
MERGE INTO target_pushdowntest t
USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s
on t.id = s.somekey
WHEN NOT MATCHED THEN
INSERT (id)
VALUES (s.somekey);
-- let's verify if we use some other column from source for value of distributed column in target.
-- it should be inserted to correct shard of target.
CREATE TABLE source_withdata (id integer, some_number integer);
CREATE TABLE target_table (id integer, name text);
SELECT create_distributed_table('source_withdata', 'id');
SELECT create_distributed_table('target_table', 'id');
INSERT INTO source_withdata (id, some_number) VALUES (1, 3);
-- we will use some_number column from source_withdata to insert into distributed column of target.
-- value of some_number is 3 let's verify what shard it should go to.
select worker_hash(3);
-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard.
MERGE INTO target_table t
USING (SELECT id, some_number from source_withdata where id = 1) s
on t.id = s.some_number
WHEN NOT MATCHED THEN
INSERT (id, name)
VALUES (s.some_number, 'parag');
-- let's verify if data inserted to second shard of target.
EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table;
-- let's verify target data too.
SELECT * FROM target_table;
-- test UPDATE : when source is single sharded and table are colocated
MERGE INTO target_table t
USING (SELECT id, some_number from source_withdata where id = 1) s
on t.id = s.some_number
WHEN MATCHED THEN
UPDATE SET name = 'parag jain';
-- let's verify if data updated properly.
SELECT * FROM target_table;
-- let's see what happend when we try to update distributed key of target table
MERGE INTO target_table t
USING (SELECT id, some_number from source_withdata where id = 1) s
on t.id = s.some_number
WHEN MATCHED THEN
UPDATE SET id = 1500;
SELECT * FROM target_table;
-- test DELETE : when source is single sharded and table are colocated
MERGE INTO target_table t
USING (SELECT id, some_number from source_withdata where id = 1) s
on t.id = s.some_number
WHEN MATCHED THEN
DELETE;
-- let's verify if data deleted properly.
SELECT * FROM target_table;
--
DELETE FROM source_withdata;
DELETE FROM target_table;
INSERT INTO source VALUES (1,1);
merge into target_table sda
using source_withdata sdn
on sda.id = sdn.id AND sda.id = 1
when not matched then
insert (id)
values (10000);
SELECT * FROM target_table WHERE id = 10000;
RESET client_min_messages;
-- This will prune shards with restriction information as NOT MATCHED is void -- This will prune shards with restriction information as NOT MATCHED is void
BEGIN; BEGIN;
SET citus.log_remote_commands to true; SET citus.log_remote_commands to true;