mirror of https://github.com/citusdata/citus.git
Related to issue #7619, #7620
Merge command fails when source query is single sharded and source and
target are co-located and insert is not using distribution key of
source.
Example
```
CREATE TABLE source (id integer);
CREATE TABLE target (id integer );
-- let's distribute both table on id field
SELECT create_distributed_table('source', 'id');
SELECT create_distributed_table('target', 'id');
MERGE INTO target t
USING ( SELECT 1 AS somekey
FROM source
WHERE source.id = 1) s
ON t.id = s.somekey
WHEN NOT MATCHED
THEN INSERT (id)
VALUES (s.somekey)
ERROR: MERGE INSERT must use the source table distribution column value
HINT: MERGE INSERT must use the source table distribution column value
```
Author's Opinion: If join is not between source and target distributed
column, we should not force user to use source distributed column while
inserting value of target distributed column.
Fix: If user is not using distributed key of source for insertion let's
not push down query to workers and don't force user to use source
distributed column if it is not part of join.
This reverts commit fa4fc0b372
.
Co-authored-by: paragjain <paragjain@microsoft.com>
pull/7634/head
parent
fa4fc0b372
commit
aaaf637a6b
|
@ -182,14 +182,6 @@ CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery, Query *query,
|
||||||
return distributedPlan;
|
return distributedPlan;
|
||||||
}
|
}
|
||||||
|
|
||||||
Var *insertVar =
|
|
||||||
FetchAndValidateInsertVarIfExists(targetRelationId, originalQuery);
|
|
||||||
if (insertVar &&
|
|
||||||
!IsDistributionColumnInMergeSource((Expr *) insertVar, originalQuery, true))
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("MERGE INSERT must use the source table "
|
|
||||||
"distribution column value")));
|
|
||||||
}
|
|
||||||
|
|
||||||
Job *job = RouterJob(originalQuery, plannerRestrictionContext,
|
Job *job = RouterJob(originalQuery, plannerRestrictionContext,
|
||||||
&distributedPlan->planningError);
|
&distributedPlan->planningError);
|
||||||
|
@ -1124,6 +1116,27 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
|
||||||
"repartitioning")));
|
"repartitioning")));
|
||||||
return deferredError;
|
return deferredError;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If execution has reached this point, it indicates that the query can be delegated to the worker.
|
||||||
|
* However, before proceeding with this delegation, we need to confirm that the user is utilizing
|
||||||
|
* the distribution column of the source table in the Insert variable.
|
||||||
|
* If this is not the case, we should refrain from pushing down the query.
|
||||||
|
* This is just a deffered error which will be handle by caller.
|
||||||
|
*/
|
||||||
|
|
||||||
|
Var *insertVar =
|
||||||
|
FetchAndValidateInsertVarIfExists(targetRelationId, query);
|
||||||
|
if (insertVar &&
|
||||||
|
!IsDistributionColumnInMergeSource((Expr *) insertVar, query, true))
|
||||||
|
{
|
||||||
|
ereport(DEBUG1, (errmsg(
|
||||||
|
"MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied")));
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied",
|
||||||
|
NULL, NULL);
|
||||||
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1128,7 +1128,7 @@ DO NOTHING
|
||||||
WHEN NOT MATCHED THEN
|
WHEN NOT MATCHED THEN
|
||||||
INSERT VALUES(rs_source.id);
|
INSERT VALUES(rs_source.id);
|
||||||
DEBUG: Creating MERGE router plan
|
DEBUG: Creating MERGE router plan
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.rs_target_xxxxxxx rs_target USING (SELECT id.id FROM merge_schema.f_immutable(99) id(id) WHERE (id.id OPERATOR(pg_catalog.=) ANY (SELECT 99))) rs_source ON (rs_source.id OPERATOR(pg_catalog.=) rs_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id) VALUES (rs_source.id)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.rs_target_xxxxxxx rs_target USING (SELECT id.id FROM merge_schema.f_immutable(99) id(id) WHERE (id.id OPERATOR(pg_catalog.=) ANY (SELECT 99))) rs_source ON (rs_source.id OPERATOR(pg_catalog.=) rs_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id) VALUES (rs_source.id)>
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
SELECT * INTO rs_local FROM rs_target ORDER BY 1 ;
|
SELECT * INTO rs_local FROM rs_target ORDER BY 1 ;
|
||||||
-- Should be equal
|
-- Should be equal
|
||||||
|
@ -1259,7 +1259,7 @@ DO NOTHING
|
||||||
WHEN NOT MATCHED THEN
|
WHEN NOT MATCHED THEN
|
||||||
INSERT VALUES(fn_source.id, fn_source.source);
|
INSERT VALUES(fn_source.id, fn_source.source);
|
||||||
DEBUG: Creating MERGE router plan
|
DEBUG: Creating MERGE router plan
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target_xxxxxxx fn_target USING (SELECT dist_table.id, dist_table.source FROM merge_schema.dist_table_xxxxxxx dist_table) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target_xxxxxxx fn_target USING (SELECT dist_table.id, dist_table.source FROM merge_schema.dist_table_xxxxxxx dist_table) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)>
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
SELECT * INTO fn_local FROM fn_target ORDER BY 1 ;
|
SELECT * INTO fn_local FROM fn_target ORDER BY 1 ;
|
||||||
-- Should be equal
|
-- Should be equal
|
||||||
|
@ -1552,7 +1552,7 @@ BEGIN;
|
||||||
SET citus.log_remote_commands to true;
|
SET citus.log_remote_commands to true;
|
||||||
SET client_min_messages TO DEBUG1;
|
SET client_min_messages TO DEBUG1;
|
||||||
EXECUTE merge_prepare(2);
|
EXECUTE merge_prepare(2);
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING>
|
||||||
DEBUG: Creating MERGE router plan
|
DEBUG: Creating MERGE router plan
|
||||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
@ -1782,13 +1782,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
SET citus.log_remote_commands to false;
|
SET citus.log_remote_commands to false;
|
||||||
SELECT compare_tables();
|
SELECT compare_tables();
|
||||||
|
@ -1842,6 +1842,297 @@ SELECT compare_tables();
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
-- let's create source and target table
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000;
|
||||||
|
CREATE TABLE source_pushdowntest (id integer);
|
||||||
|
CREATE TABLE target_pushdowntest (id integer );
|
||||||
|
-- let's distribute both table on id field
|
||||||
|
SELECT create_distributed_table('source_pushdowntest', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('target_pushdowntest', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- we are doing this operation on single node setup let's figure out colocation id of both tables
|
||||||
|
-- both has same colocation id so both are colocated.
|
||||||
|
WITH colocations AS (
|
||||||
|
SELECT colocationid
|
||||||
|
FROM pg_dist_partition
|
||||||
|
WHERE logicalrelid = 'source_pushdowntest'::regclass
|
||||||
|
OR logicalrelid = 'target_pushdowntest'::regclass
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
CASE
|
||||||
|
WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same'
|
||||||
|
ELSE 'Different'
|
||||||
|
END AS colocation_status
|
||||||
|
FROM colocations;
|
||||||
|
colocation_status
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Same
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers.
|
||||||
|
EXPLAIN (costs off, timing off, summary off)
|
||||||
|
MERGE INTO target_pushdowntest t
|
||||||
|
USING source_pushdowntest s
|
||||||
|
ON t.id = s.id
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id)
|
||||||
|
VALUES (s.id);
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
|
||||||
|
DEBUG: Creating MERGE router plan
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: All
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Merge on target_pushdowntest_4000068 t
|
||||||
|
-> Merge Left Join
|
||||||
|
Merge Cond: (s.id = t.id)
|
||||||
|
-> Sort
|
||||||
|
Sort Key: s.id
|
||||||
|
-> Seq Scan on source_pushdowntest_4000064 s
|
||||||
|
-> Sort
|
||||||
|
Sort Key: t.id
|
||||||
|
-> Seq Scan on target_pushdowntest_4000068 t
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Merge on target_pushdowntest_4000069 t
|
||||||
|
-> Merge Left Join
|
||||||
|
Merge Cond: (s.id = t.id)
|
||||||
|
-> Sort
|
||||||
|
Sort Key: s.id
|
||||||
|
-> Seq Scan on source_pushdowntest_4000065 s
|
||||||
|
-> Sort
|
||||||
|
Sort Key: t.id
|
||||||
|
-> Seq Scan on target_pushdowntest_4000069 t
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Merge on target_pushdowntest_4000070 t
|
||||||
|
-> Merge Left Join
|
||||||
|
Merge Cond: (s.id = t.id)
|
||||||
|
-> Sort
|
||||||
|
Sort Key: s.id
|
||||||
|
-> Seq Scan on source_pushdowntest_4000066 s
|
||||||
|
-> Sort
|
||||||
|
Sort Key: t.id
|
||||||
|
-> Seq Scan on target_pushdowntest_4000070 t
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Merge on target_pushdowntest_4000071 t
|
||||||
|
-> Merge Left Join
|
||||||
|
Merge Cond: (s.id = t.id)
|
||||||
|
-> Sort
|
||||||
|
Sort Key: s.id
|
||||||
|
-> Seq Scan on source_pushdowntest_4000067 s
|
||||||
|
-> Sort
|
||||||
|
Sort Key: t.id
|
||||||
|
-> Seq Scan on target_pushdowntest_4000071 t
|
||||||
|
(47 rows)
|
||||||
|
|
||||||
|
-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker.
|
||||||
|
-- DEBUG LOGS show that query is getting pushed down
|
||||||
|
MERGE INTO target_pushdowntest t
|
||||||
|
USING (SELECT * from source_pushdowntest where id = 1) s
|
||||||
|
on t.id = s.id
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id)
|
||||||
|
VALUES (s.id);
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING (SELECT source_pushdowntest.id FROM merge_schema.source_pushdowntest_xxxxxxx source_pushdowntest WHERE (source_pushdowntest.id OPERATOR(pg_catalog.=) 1)) s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
|
||||||
|
DEBUG: Creating MERGE router plan
|
||||||
|
-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown.
|
||||||
|
INSERT INTO source_pushdowntest (id) VALUES (3);
|
||||||
|
EXPLAIN (costs off, timing off, summary off)
|
||||||
|
MERGE INTO target_pushdowntest t
|
||||||
|
USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s
|
||||||
|
on t.id = s.somekey
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id)
|
||||||
|
VALUES (s.somekey);
|
||||||
|
DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied
|
||||||
|
DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied
|
||||||
|
DEBUG: Creating MERGE repartition plan
|
||||||
|
DEBUG: Using column - index:0 from the source list to redistribute
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus MERGE INTO ...)
|
||||||
|
MERGE INTO target_pushdowntest method: pull to coordinator
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 1
|
||||||
|
Tasks Shown: All
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_pushdowntest_4000064 source_pushdowntest
|
||||||
|
Filter: (id = 1)
|
||||||
|
(9 rows)
|
||||||
|
|
||||||
|
-- let's verify if we use some other column from source for value of distributed column in target.
|
||||||
|
-- it should be inserted to correct shard of target.
|
||||||
|
CREATE TABLE source_withdata (id integer, some_number integer);
|
||||||
|
CREATE TABLE target_table (id integer, name text);
|
||||||
|
SELECT create_distributed_table('source_withdata', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('target_table', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO source_withdata (id, some_number) VALUES (1, 3);
|
||||||
|
-- we will use some_number column from source_withdata to insert into distributed column of target.
|
||||||
|
-- value of some_number is 3 let's verify what shard it should go to.
|
||||||
|
select worker_hash(3);
|
||||||
|
worker_hash
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
-28094569
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard.
|
||||||
|
MERGE INTO target_table t
|
||||||
|
USING (SELECT id, some_number from source_withdata where id = 1) s
|
||||||
|
on t.id = s.some_number
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, name)
|
||||||
|
VALUES (s.some_number, 'parag');
|
||||||
|
DEBUG: Sub-query is not pushable, try repartitioning
|
||||||
|
DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns
|
||||||
|
DEBUG: Creating MERGE repartition plan
|
||||||
|
DEBUG: Using column - index:1 from the source list to redistribute
|
||||||
|
DEBUG: Collect source query results on coordinator
|
||||||
|
DEBUG: Create a MERGE task list that needs to be routed
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000076'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.some_number, 'parag'::text)>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000077'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.some_number, 'parag'::text)>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000078'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.some_number, 'parag'::text)>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000079'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.some_number, 'parag'::text)>
|
||||||
|
DEBUG: Execute MERGE task list
|
||||||
|
-- let's verify if data inserted to second shard of target.
|
||||||
|
EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
|
||||||
|
Task Count: 4
|
||||||
|
Tuple data received from nodes: 9 bytes
|
||||||
|
Tasks Shown: All
|
||||||
|
-> Task
|
||||||
|
Tuple data received from node: 0 bytes
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on target_table_4000076 target_table (actual rows=0 loops=1)
|
||||||
|
-> Task
|
||||||
|
Tuple data received from node: 9 bytes
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on target_table_4000077 target_table (actual rows=1 loops=1)
|
||||||
|
-> Task
|
||||||
|
Tuple data received from node: 0 bytes
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on target_table_4000078 target_table (actual rows=0 loops=1)
|
||||||
|
-> Task
|
||||||
|
Tuple data received from node: 0 bytes
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on target_table_4000079 target_table (actual rows=0 loops=1)
|
||||||
|
(20 rows)
|
||||||
|
|
||||||
|
-- let's verify target data too.
|
||||||
|
SELECT * FROM target_table;
|
||||||
|
id | name
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
3 | parag
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test UPDATE : when source is single sharded and table are colocated
|
||||||
|
MERGE INTO target_table t
|
||||||
|
USING (SELECT id, some_number from source_withdata where id = 1) s
|
||||||
|
on t.id = s.some_number
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET name = 'parag jain';
|
||||||
|
DEBUG: Sub-query is not pushable, try repartitioning
|
||||||
|
DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns
|
||||||
|
DEBUG: Creating MERGE repartition plan
|
||||||
|
DEBUG: Using column - index:1 from the source list to redistribute
|
||||||
|
DEBUG: Collect source query results on coordinator
|
||||||
|
DEBUG: Create a MERGE task list that needs to be routed
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000076'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN UPDATE SET name = 'parag jain'::text>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000077'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN UPDATE SET name = 'parag jain'::text>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000078'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN UPDATE SET name = 'parag jain'::text>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000079'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN UPDATE SET name = 'parag jain'::text>
|
||||||
|
DEBUG: Execute MERGE task list
|
||||||
|
-- let's verify if data updated properly.
|
||||||
|
SELECT * FROM target_table;
|
||||||
|
id | name
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
3 | parag jain
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- let's see what happend when we try to update distributed key of target table
|
||||||
|
MERGE INTO target_table t
|
||||||
|
USING (SELECT id, some_number from source_withdata where id = 1) s
|
||||||
|
on t.id = s.some_number
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET id = 1500;
|
||||||
|
ERROR: updating the distribution column is not allowed in MERGE actions
|
||||||
|
SELECT * FROM target_table;
|
||||||
|
id | name
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
3 | parag jain
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test DELETE : when source is single sharded and table are colocated
|
||||||
|
MERGE INTO target_table t
|
||||||
|
USING (SELECT id, some_number from source_withdata where id = 1) s
|
||||||
|
on t.id = s.some_number
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
DELETE;
|
||||||
|
DEBUG: Sub-query is not pushable, try repartitioning
|
||||||
|
DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns
|
||||||
|
DEBUG: Creating MERGE repartition plan
|
||||||
|
DEBUG: Using column - index:1 from the source list to redistribute
|
||||||
|
DEBUG: Collect source query results on coordinator
|
||||||
|
DEBUG: Create a MERGE task list that needs to be routed
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000076'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN DELETE>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000077'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN DELETE>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000078'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN DELETE>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000079'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN DELETE>
|
||||||
|
DEBUG: Execute MERGE task list
|
||||||
|
-- let's verify if data deleted properly.
|
||||||
|
SELECT * FROM target_table;
|
||||||
|
id | name
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
--
|
||||||
|
DELETE FROM source_withdata;
|
||||||
|
DELETE FROM target_table;
|
||||||
|
INSERT INTO source VALUES (1,1);
|
||||||
|
merge into target_table sda
|
||||||
|
using source_withdata sdn
|
||||||
|
on sda.id = sdn.id AND sda.id = 1
|
||||||
|
when not matched then
|
||||||
|
insert (id)
|
||||||
|
values (10000);
|
||||||
|
ERROR: MERGE INSERT is using unsupported expression type for distribution column
|
||||||
|
DETAIL: Inserting arbitrary values that don't correspond to the joined column values can lead to unpredictable outcomes where rows are incorrectly distributed among different shards
|
||||||
|
SELECT * FROM target_table WHERE id = 10000;
|
||||||
|
id | name
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
-- This will prune shards with restriction information as NOT MATCHED is void
|
-- This will prune shards with restriction information as NOT MATCHED is void
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET citus.log_remote_commands to true;
|
SET citus.log_remote_commands to true;
|
||||||
|
@ -2898,14 +3189,14 @@ WHEN NOT MATCHED THEN
|
||||||
-> Limit
|
-> Limit
|
||||||
-> Sort
|
-> Sort
|
||||||
Sort Key: id2
|
Sort Key: id2
|
||||||
-> Seq Scan on demo_source_table_4000135 demo_source_table
|
-> Seq Scan on demo_source_table_4000151 demo_source_table
|
||||||
-> Distributed Subplan XXX_2
|
-> Distributed Subplan XXX_2
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
Task Count: 4
|
Task Count: 4
|
||||||
Tasks Shown: One of 4
|
Tasks Shown: One of 4
|
||||||
-> Task
|
-> Task
|
||||||
Node: host=localhost port=xxxxx dbname=regression
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
-> Seq Scan on demo_source_table_4000135 demo_source_table
|
-> Seq Scan on demo_source_table_4000151 demo_source_table
|
||||||
Task Count: 1
|
Task Count: 1
|
||||||
Tasks Shown: All
|
Tasks Shown: All
|
||||||
-> Task
|
-> Task
|
||||||
|
@ -3119,10 +3410,10 @@ DEBUG: Creating MERGE repartition plan
|
||||||
DEBUG: Using column - index:0 from the source list to redistribute
|
DEBUG: Using column - index:0 from the source list to redistribute
|
||||||
DEBUG: Collect source query results on coordinator
|
DEBUG: Collect source query results on coordinator
|
||||||
DEBUG: Create a MERGE task list that needs to be routed
|
DEBUG: Create a MERGE task list that needs to be routed
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000147'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000163'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000148'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000164'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000149'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000165'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000150'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000166'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
||||||
DEBUG: Execute MERGE task list
|
DEBUG: Execute MERGE task list
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
SELECT * FROM target_6785 ORDER BY 1;
|
SELECT * FROM target_6785 ORDER BY 1;
|
||||||
|
@ -3240,7 +3531,7 @@ USING s1 s
|
||||||
ON t.id = s.id
|
ON t.id = s.id
|
||||||
WHEN NOT MATCHED THEN
|
WHEN NOT MATCHED THEN
|
||||||
INSERT (id) VALUES(s.val);
|
INSERT (id) VALUES(s.val);
|
||||||
ERROR: MERGE INSERT must use the source table distribution column value
|
ERROR: MERGE INSERT must use the source's joining column for target's distribution column
|
||||||
MERGE INTO t1 t
|
MERGE INTO t1 t
|
||||||
USING s1 s
|
USING s1 s
|
||||||
ON t.id = s.id
|
ON t.id = s.id
|
||||||
|
@ -3966,7 +4257,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_
|
||||||
PL/pgSQL function citus_drop_trigger() line XX at PERFORM
|
PL/pgSQL function citus_drop_trigger() line XX at PERFORM
|
||||||
DROP FUNCTION merge_when_and_write();
|
DROP FUNCTION merge_when_and_write();
|
||||||
DROP SCHEMA merge_schema CASCADE;
|
DROP SCHEMA merge_schema CASCADE;
|
||||||
NOTICE: drop cascades to 103 other objects
|
NOTICE: drop cascades to 107 other objects
|
||||||
DETAIL: drop cascades to function insert_data()
|
DETAIL: drop cascades to function insert_data()
|
||||||
drop cascades to table local_local
|
drop cascades to table local_local
|
||||||
drop cascades to table target
|
drop cascades to table target
|
||||||
|
@ -4026,6 +4317,10 @@ drop cascades to table pg_source
|
||||||
drop cascades to table citus_target
|
drop cascades to table citus_target
|
||||||
drop cascades to table citus_source
|
drop cascades to table citus_source
|
||||||
drop cascades to function compare_tables()
|
drop cascades to function compare_tables()
|
||||||
|
drop cascades to table source_pushdowntest
|
||||||
|
drop cascades to table target_pushdowntest
|
||||||
|
drop cascades to table source_withdata
|
||||||
|
drop cascades to table target_table
|
||||||
drop cascades to view pg_source_view
|
drop cascades to view pg_source_view
|
||||||
drop cascades to view citus_source_view
|
drop cascades to view citus_source_view
|
||||||
drop cascades to table pg_pa_target
|
drop cascades to table pg_pa_target
|
||||||
|
@ -4042,7 +4337,7 @@ drop cascades to table target_set
|
||||||
drop cascades to table source_set
|
drop cascades to table source_set
|
||||||
drop cascades to table refsource_ref
|
drop cascades to table refsource_ref
|
||||||
drop cascades to table pg_result
|
drop cascades to table pg_result
|
||||||
drop cascades to table refsource_ref_4000112
|
drop cascades to table refsource_ref_4000128
|
||||||
drop cascades to table pg_ref
|
drop cascades to table pg_ref
|
||||||
drop cascades to table local_ref
|
drop cascades to table local_ref
|
||||||
drop cascades to table reftarget_local
|
drop cascades to table reftarget_local
|
||||||
|
@ -4060,11 +4355,7 @@ drop cascades to table source_6785
|
||||||
drop cascades to table target_6785
|
drop cascades to table target_6785
|
||||||
drop cascades to function add_s(integer,integer)
|
drop cascades to function add_s(integer,integer)
|
||||||
drop cascades to table pg
|
drop cascades to table pg
|
||||||
drop cascades to table t1_4000174
|
drop cascades to table t1_4000190
|
||||||
drop cascades to table s1_4000175
|
drop cascades to table s1_4000191
|
||||||
drop cascades to table t1
|
drop cascades to table t1
|
||||||
drop cascades to table s1
|
and 7 other objects (see server log for list)
|
||||||
drop cascades to table dist_target
|
|
||||||
drop cascades to table dist_source
|
|
||||||
drop cascades to view show_tables
|
|
||||||
and 3 other objects (see server log for list)
|
|
||||||
|
|
|
@ -116,7 +116,8 @@ test: function_with_case_when
|
||||||
test: clock
|
test: clock
|
||||||
|
|
||||||
# MERGE tests
|
# MERGE tests
|
||||||
test: merge pgmerge merge_repartition2
|
test: merge pgmerge
|
||||||
|
test: merge_repartition2
|
||||||
test: merge_repartition1 merge_schema_sharding
|
test: merge_repartition1 merge_schema_sharding
|
||||||
test: merge_partition_tables
|
test: merge_partition_tables
|
||||||
|
|
||||||
|
|
|
@ -1206,6 +1206,139 @@ SET citus.log_remote_commands to false;
|
||||||
SELECT compare_tables();
|
SELECT compare_tables();
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
||||||
|
-- let's create source and target table
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000;
|
||||||
|
CREATE TABLE source_pushdowntest (id integer);
|
||||||
|
CREATE TABLE target_pushdowntest (id integer );
|
||||||
|
|
||||||
|
-- let's distribute both table on id field
|
||||||
|
SELECT create_distributed_table('source_pushdowntest', 'id');
|
||||||
|
SELECT create_distributed_table('target_pushdowntest', 'id');
|
||||||
|
|
||||||
|
-- we are doing this operation on single node setup let's figure out colocation id of both tables
|
||||||
|
-- both has same colocation id so both are colocated.
|
||||||
|
WITH colocations AS (
|
||||||
|
SELECT colocationid
|
||||||
|
FROM pg_dist_partition
|
||||||
|
WHERE logicalrelid = 'source_pushdowntest'::regclass
|
||||||
|
OR logicalrelid = 'target_pushdowntest'::regclass
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
CASE
|
||||||
|
WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same'
|
||||||
|
ELSE 'Different'
|
||||||
|
END AS colocation_status
|
||||||
|
FROM colocations;
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers.
|
||||||
|
|
||||||
|
EXPLAIN (costs off, timing off, summary off)
|
||||||
|
MERGE INTO target_pushdowntest t
|
||||||
|
USING source_pushdowntest s
|
||||||
|
ON t.id = s.id
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id)
|
||||||
|
VALUES (s.id);
|
||||||
|
|
||||||
|
-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker.
|
||||||
|
-- DEBUG LOGS show that query is getting pushed down
|
||||||
|
MERGE INTO target_pushdowntest t
|
||||||
|
USING (SELECT * from source_pushdowntest where id = 1) s
|
||||||
|
on t.id = s.id
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id)
|
||||||
|
VALUES (s.id);
|
||||||
|
|
||||||
|
|
||||||
|
-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown.
|
||||||
|
INSERT INTO source_pushdowntest (id) VALUES (3);
|
||||||
|
|
||||||
|
EXPLAIN (costs off, timing off, summary off)
|
||||||
|
MERGE INTO target_pushdowntest t
|
||||||
|
USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s
|
||||||
|
on t.id = s.somekey
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id)
|
||||||
|
VALUES (s.somekey);
|
||||||
|
|
||||||
|
|
||||||
|
-- let's verify if we use some other column from source for value of distributed column in target.
|
||||||
|
-- it should be inserted to correct shard of target.
|
||||||
|
CREATE TABLE source_withdata (id integer, some_number integer);
|
||||||
|
CREATE TABLE target_table (id integer, name text);
|
||||||
|
SELECT create_distributed_table('source_withdata', 'id');
|
||||||
|
SELECT create_distributed_table('target_table', 'id');
|
||||||
|
|
||||||
|
INSERT INTO source_withdata (id, some_number) VALUES (1, 3);
|
||||||
|
|
||||||
|
-- we will use some_number column from source_withdata to insert into distributed column of target.
|
||||||
|
-- value of some_number is 3 let's verify what shard it should go to.
|
||||||
|
select worker_hash(3);
|
||||||
|
|
||||||
|
-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard.
|
||||||
|
MERGE INTO target_table t
|
||||||
|
USING (SELECT id, some_number from source_withdata where id = 1) s
|
||||||
|
on t.id = s.some_number
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, name)
|
||||||
|
VALUES (s.some_number, 'parag');
|
||||||
|
|
||||||
|
-- let's verify if data inserted to second shard of target.
|
||||||
|
EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table;
|
||||||
|
|
||||||
|
-- let's verify target data too.
|
||||||
|
SELECT * FROM target_table;
|
||||||
|
|
||||||
|
|
||||||
|
-- test UPDATE : when source is single sharded and table are colocated
|
||||||
|
MERGE INTO target_table t
|
||||||
|
USING (SELECT id, some_number from source_withdata where id = 1) s
|
||||||
|
on t.id = s.some_number
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET name = 'parag jain';
|
||||||
|
|
||||||
|
-- let's verify if data updated properly.
|
||||||
|
SELECT * FROM target_table;
|
||||||
|
|
||||||
|
-- let's see what happend when we try to update distributed key of target table
|
||||||
|
MERGE INTO target_table t
|
||||||
|
USING (SELECT id, some_number from source_withdata where id = 1) s
|
||||||
|
on t.id = s.some_number
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET id = 1500;
|
||||||
|
|
||||||
|
SELECT * FROM target_table;
|
||||||
|
|
||||||
|
-- test DELETE : when source is single sharded and table are colocated
|
||||||
|
MERGE INTO target_table t
|
||||||
|
USING (SELECT id, some_number from source_withdata where id = 1) s
|
||||||
|
on t.id = s.some_number
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
DELETE;
|
||||||
|
|
||||||
|
-- let's verify if data deleted properly.
|
||||||
|
SELECT * FROM target_table;
|
||||||
|
|
||||||
|
--
|
||||||
|
DELETE FROM source_withdata;
|
||||||
|
DELETE FROM target_table;
|
||||||
|
INSERT INTO source VALUES (1,1);
|
||||||
|
|
||||||
|
merge into target_table sda
|
||||||
|
using source_withdata sdn
|
||||||
|
on sda.id = sdn.id AND sda.id = 1
|
||||||
|
when not matched then
|
||||||
|
insert (id)
|
||||||
|
values (10000);
|
||||||
|
|
||||||
|
SELECT * FROM target_table WHERE id = 10000;
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- This will prune shards with restriction information as NOT MATCHED is void
|
-- This will prune shards with restriction information as NOT MATCHED is void
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET citus.log_remote_commands to true;
|
SET citus.log_remote_commands to true;
|
||||||
|
|
Loading…
Reference in New Issue