mirror of https://github.com/citusdata/citus.git
Because we want to track PR numbers and to make backporting easy we (pretty much always) use squash-merges when merging to master. We accidentally used a rebase merge for PR #7620. This reverts those changes so we can redo the merge using squash merge. This reverts all commits frompull/7627/headeedb607c
to9e71750fc
.
parent
9e71750fcd
commit
fa4fc0b372
|
@ -182,6 +182,14 @@ CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery, Query *query,
|
||||||
return distributedPlan;
|
return distributedPlan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Var *insertVar =
|
||||||
|
FetchAndValidateInsertVarIfExists(targetRelationId, originalQuery);
|
||||||
|
if (insertVar &&
|
||||||
|
!IsDistributionColumnInMergeSource((Expr *) insertVar, originalQuery, true))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("MERGE INSERT must use the source table "
|
||||||
|
"distribution column value")));
|
||||||
|
}
|
||||||
|
|
||||||
Job *job = RouterJob(originalQuery, plannerRestrictionContext,
|
Job *job = RouterJob(originalQuery, plannerRestrictionContext,
|
||||||
&distributedPlan->planningError);
|
&distributedPlan->planningError);
|
||||||
|
@ -1116,27 +1124,6 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
|
||||||
"repartitioning")));
|
"repartitioning")));
|
||||||
return deferredError;
|
return deferredError;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If execution has reached this point, it indicates that the query can be delegated to the worker.
|
|
||||||
* However, before proceeding with this delegation, we need to confirm that the user is utilizing
|
|
||||||
* the distribution column of the source table in the Insert variable.
|
|
||||||
* If this is not the case, we should refrain from pushing down the query.
|
|
||||||
* This is just a deffered error which will be handle by caller.
|
|
||||||
*/
|
|
||||||
|
|
||||||
Var *insertVar =
|
|
||||||
FetchAndValidateInsertVarIfExists(targetRelationId, query);
|
|
||||||
if (insertVar &&
|
|
||||||
!IsDistributionColumnInMergeSource((Expr *) insertVar, query, true))
|
|
||||||
{
|
|
||||||
ereport(DEBUG1, (errmsg(
|
|
||||||
"MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied")));
|
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
|
||||||
"MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied",
|
|
||||||
NULL, NULL);
|
|
||||||
}
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1128,7 +1128,7 @@ DO NOTHING
|
||||||
WHEN NOT MATCHED THEN
|
WHEN NOT MATCHED THEN
|
||||||
INSERT VALUES(rs_source.id);
|
INSERT VALUES(rs_source.id);
|
||||||
DEBUG: Creating MERGE router plan
|
DEBUG: Creating MERGE router plan
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.rs_target_xxxxxxx rs_target USING (SELECT id.id FROM merge_schema.f_immutable(99) id(id) WHERE (id.id OPERATOR(pg_catalog.=) ANY (SELECT 99))) rs_source ON (rs_source.id OPERATOR(pg_catalog.=) rs_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id) VALUES (rs_source.id)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.rs_target_xxxxxxx rs_target USING (SELECT id.id FROM merge_schema.f_immutable(99) id(id) WHERE (id.id OPERATOR(pg_catalog.=) ANY (SELECT 99))) rs_source ON (rs_source.id OPERATOR(pg_catalog.=) rs_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id) VALUES (rs_source.id)>
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
SELECT * INTO rs_local FROM rs_target ORDER BY 1 ;
|
SELECT * INTO rs_local FROM rs_target ORDER BY 1 ;
|
||||||
-- Should be equal
|
-- Should be equal
|
||||||
|
@ -1259,7 +1259,7 @@ DO NOTHING
|
||||||
WHEN NOT MATCHED THEN
|
WHEN NOT MATCHED THEN
|
||||||
INSERT VALUES(fn_source.id, fn_source.source);
|
INSERT VALUES(fn_source.id, fn_source.source);
|
||||||
DEBUG: Creating MERGE router plan
|
DEBUG: Creating MERGE router plan
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target_xxxxxxx fn_target USING (SELECT dist_table.id, dist_table.source FROM merge_schema.dist_table_xxxxxxx dist_table) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target_xxxxxxx fn_target USING (SELECT dist_table.id, dist_table.source FROM merge_schema.dist_table_xxxxxxx dist_table) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)>
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
SELECT * INTO fn_local FROM fn_target ORDER BY 1 ;
|
SELECT * INTO fn_local FROM fn_target ORDER BY 1 ;
|
||||||
-- Should be equal
|
-- Should be equal
|
||||||
|
@ -1552,7 +1552,7 @@ BEGIN;
|
||||||
SET citus.log_remote_commands to true;
|
SET citus.log_remote_commands to true;
|
||||||
SET client_min_messages TO DEBUG1;
|
SET client_min_messages TO DEBUG1;
|
||||||
EXECUTE merge_prepare(2);
|
EXECUTE merge_prepare(2);
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING>
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
|
||||||
DEBUG: Creating MERGE router plan
|
DEBUG: Creating MERGE router plan
|
||||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
@ -1782,13 +1782,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val)
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
SET citus.log_remote_commands to false;
|
SET citus.log_remote_commands to false;
|
||||||
SELECT compare_tables();
|
SELECT compare_tables();
|
||||||
|
@ -1842,297 +1842,6 @@ SELECT compare_tables();
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- let's create source and target table
|
|
||||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000;
|
|
||||||
CREATE TABLE source_pushdowntest (id integer);
|
|
||||||
CREATE TABLE target_pushdowntest (id integer );
|
|
||||||
-- let's distribute both table on id field
|
|
||||||
SELECT create_distributed_table('source_pushdowntest', 'id');
|
|
||||||
create_distributed_table
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT create_distributed_table('target_pushdowntest', 'id');
|
|
||||||
create_distributed_table
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
-- we are doing this operation on single node setup let's figure out colocation id of both tables
|
|
||||||
-- both has same colocation id so both are colocated.
|
|
||||||
WITH colocations AS (
|
|
||||||
SELECT colocationid
|
|
||||||
FROM pg_dist_partition
|
|
||||||
WHERE logicalrelid = 'source_pushdowntest'::regclass
|
|
||||||
OR logicalrelid = 'target_pushdowntest'::regclass
|
|
||||||
)
|
|
||||||
SELECT
|
|
||||||
CASE
|
|
||||||
WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same'
|
|
||||||
ELSE 'Different'
|
|
||||||
END AS colocation_status
|
|
||||||
FROM colocations;
|
|
||||||
colocation_status
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
Same
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SET client_min_messages TO DEBUG1;
|
|
||||||
-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers.
|
|
||||||
EXPLAIN (costs off, timing off, summary off)
|
|
||||||
MERGE INTO target_pushdowntest t
|
|
||||||
USING source_pushdowntest s
|
|
||||||
ON t.id = s.id
|
|
||||||
WHEN NOT MATCHED THEN
|
|
||||||
INSERT (id)
|
|
||||||
VALUES (s.id);
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
|
|
||||||
DEBUG: Creating MERGE router plan
|
|
||||||
QUERY PLAN
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
Custom Scan (Citus Adaptive)
|
|
||||||
Task Count: 4
|
|
||||||
Tasks Shown: All
|
|
||||||
-> Task
|
|
||||||
Node: host=localhost port=xxxxx dbname=regression
|
|
||||||
-> Merge on target_pushdowntest_4000068 t
|
|
||||||
-> Merge Left Join
|
|
||||||
Merge Cond: (s.id = t.id)
|
|
||||||
-> Sort
|
|
||||||
Sort Key: s.id
|
|
||||||
-> Seq Scan on source_pushdowntest_4000064 s
|
|
||||||
-> Sort
|
|
||||||
Sort Key: t.id
|
|
||||||
-> Seq Scan on target_pushdowntest_4000068 t
|
|
||||||
-> Task
|
|
||||||
Node: host=localhost port=xxxxx dbname=regression
|
|
||||||
-> Merge on target_pushdowntest_4000069 t
|
|
||||||
-> Merge Left Join
|
|
||||||
Merge Cond: (s.id = t.id)
|
|
||||||
-> Sort
|
|
||||||
Sort Key: s.id
|
|
||||||
-> Seq Scan on source_pushdowntest_4000065 s
|
|
||||||
-> Sort
|
|
||||||
Sort Key: t.id
|
|
||||||
-> Seq Scan on target_pushdowntest_4000069 t
|
|
||||||
-> Task
|
|
||||||
Node: host=localhost port=xxxxx dbname=regression
|
|
||||||
-> Merge on target_pushdowntest_4000070 t
|
|
||||||
-> Merge Left Join
|
|
||||||
Merge Cond: (s.id = t.id)
|
|
||||||
-> Sort
|
|
||||||
Sort Key: s.id
|
|
||||||
-> Seq Scan on source_pushdowntest_4000066 s
|
|
||||||
-> Sort
|
|
||||||
Sort Key: t.id
|
|
||||||
-> Seq Scan on target_pushdowntest_4000070 t
|
|
||||||
-> Task
|
|
||||||
Node: host=localhost port=xxxxx dbname=regression
|
|
||||||
-> Merge on target_pushdowntest_4000071 t
|
|
||||||
-> Merge Left Join
|
|
||||||
Merge Cond: (s.id = t.id)
|
|
||||||
-> Sort
|
|
||||||
Sort Key: s.id
|
|
||||||
-> Seq Scan on source_pushdowntest_4000067 s
|
|
||||||
-> Sort
|
|
||||||
Sort Key: t.id
|
|
||||||
-> Seq Scan on target_pushdowntest_4000071 t
|
|
||||||
(47 rows)
|
|
||||||
|
|
||||||
-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker.
|
|
||||||
-- DEBUG LOGS show that query is getting pushed down
|
|
||||||
MERGE INTO target_pushdowntest t
|
|
||||||
USING (SELECT * from source_pushdowntest where id = 1) s
|
|
||||||
on t.id = s.id
|
|
||||||
WHEN NOT MATCHED THEN
|
|
||||||
INSERT (id)
|
|
||||||
VALUES (s.id);
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING (SELECT source_pushdowntest.id FROM merge_schema.source_pushdowntest_xxxxxxx source_pushdowntest WHERE (source_pushdowntest.id OPERATOR(pg_catalog.=) 1)) s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
|
|
||||||
DEBUG: Creating MERGE router plan
|
|
||||||
-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown.
|
|
||||||
INSERT INTO source_pushdowntest (id) VALUES (3);
|
|
||||||
EXPLAIN (costs off, timing off, summary off)
|
|
||||||
MERGE INTO target_pushdowntest t
|
|
||||||
USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s
|
|
||||||
on t.id = s.somekey
|
|
||||||
WHEN NOT MATCHED THEN
|
|
||||||
INSERT (id)
|
|
||||||
VALUES (s.somekey);
|
|
||||||
DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied
|
|
||||||
DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied
|
|
||||||
DEBUG: Creating MERGE repartition plan
|
|
||||||
DEBUG: Using column - index:0 from the source list to redistribute
|
|
||||||
QUERY PLAN
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
Custom Scan (Citus MERGE INTO ...)
|
|
||||||
MERGE INTO target_pushdowntest method: pull to coordinator
|
|
||||||
-> Custom Scan (Citus Adaptive)
|
|
||||||
Task Count: 1
|
|
||||||
Tasks Shown: All
|
|
||||||
-> Task
|
|
||||||
Node: host=localhost port=xxxxx dbname=regression
|
|
||||||
-> Seq Scan on source_pushdowntest_4000064 source_pushdowntest
|
|
||||||
Filter: (id = 1)
|
|
||||||
(9 rows)
|
|
||||||
|
|
||||||
-- let's verify if we use some other column from source for value of distributed column in target.
|
|
||||||
-- it should be inserted to correct shard of target.
|
|
||||||
CREATE TABLE source_withdata (id integer, some_number integer);
|
|
||||||
CREATE TABLE target_table (id integer, name text);
|
|
||||||
SELECT create_distributed_table('source_withdata', 'id');
|
|
||||||
create_distributed_table
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT create_distributed_table('target_table', 'id');
|
|
||||||
create_distributed_table
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
INSERT INTO source_withdata (id, some_number) VALUES (1, 3);
|
|
||||||
-- we will use some_number column from source_withdata to insert into distributed column of target.
|
|
||||||
-- value of some_number is 3 let's verify what shard it should go to.
|
|
||||||
select worker_hash(3);
|
|
||||||
worker_hash
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
-28094569
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard.
|
|
||||||
MERGE INTO target_table t
|
|
||||||
USING (SELECT id, some_number from source_withdata where id = 1) s
|
|
||||||
on t.id = s.some_number
|
|
||||||
WHEN NOT MATCHED THEN
|
|
||||||
INSERT (id, name)
|
|
||||||
VALUES (s.some_number, 'parag');
|
|
||||||
DEBUG: Sub-query is not pushable, try repartitioning
|
|
||||||
DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns
|
|
||||||
DEBUG: Creating MERGE repartition plan
|
|
||||||
DEBUG: Using column - index:1 from the source list to redistribute
|
|
||||||
DEBUG: Collect source query results on coordinator
|
|
||||||
DEBUG: Create a MERGE task list that needs to be routed
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000076'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.some_number, 'parag'::text)>
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000077'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.some_number, 'parag'::text)>
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000078'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.some_number, 'parag'::text)>
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000079'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.some_number, 'parag'::text)>
|
|
||||||
DEBUG: Execute MERGE task list
|
|
||||||
-- let's verify if data inserted to second shard of target.
|
|
||||||
EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table;
|
|
||||||
QUERY PLAN
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
|
|
||||||
Task Count: 4
|
|
||||||
Tuple data received from nodes: 9 bytes
|
|
||||||
Tasks Shown: All
|
|
||||||
-> Task
|
|
||||||
Tuple data received from node: 0 bytes
|
|
||||||
Node: host=localhost port=xxxxx dbname=regression
|
|
||||||
-> Seq Scan on target_table_4000076 target_table (actual rows=0 loops=1)
|
|
||||||
-> Task
|
|
||||||
Tuple data received from node: 9 bytes
|
|
||||||
Node: host=localhost port=xxxxx dbname=regression
|
|
||||||
-> Seq Scan on target_table_4000077 target_table (actual rows=1 loops=1)
|
|
||||||
-> Task
|
|
||||||
Tuple data received from node: 0 bytes
|
|
||||||
Node: host=localhost port=xxxxx dbname=regression
|
|
||||||
-> Seq Scan on target_table_4000078 target_table (actual rows=0 loops=1)
|
|
||||||
-> Task
|
|
||||||
Tuple data received from node: 0 bytes
|
|
||||||
Node: host=localhost port=xxxxx dbname=regression
|
|
||||||
-> Seq Scan on target_table_4000079 target_table (actual rows=0 loops=1)
|
|
||||||
(20 rows)
|
|
||||||
|
|
||||||
-- let's verify target data too.
|
|
||||||
SELECT * FROM target_table;
|
|
||||||
id | name
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
3 | parag
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
-- test UPDATE : when source is single sharded and table are colocated
|
|
||||||
MERGE INTO target_table t
|
|
||||||
USING (SELECT id, some_number from source_withdata where id = 1) s
|
|
||||||
on t.id = s.some_number
|
|
||||||
WHEN MATCHED THEN
|
|
||||||
UPDATE SET name = 'parag jain';
|
|
||||||
DEBUG: Sub-query is not pushable, try repartitioning
|
|
||||||
DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns
|
|
||||||
DEBUG: Creating MERGE repartition plan
|
|
||||||
DEBUG: Using column - index:1 from the source list to redistribute
|
|
||||||
DEBUG: Collect source query results on coordinator
|
|
||||||
DEBUG: Create a MERGE task list that needs to be routed
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000076'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN UPDATE SET name = 'parag jain'::text>
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000077'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN UPDATE SET name = 'parag jain'::text>
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000078'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN UPDATE SET name = 'parag jain'::text>
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000079'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN UPDATE SET name = 'parag jain'::text>
|
|
||||||
DEBUG: Execute MERGE task list
|
|
||||||
-- let's verify if data updated properly.
|
|
||||||
SELECT * FROM target_table;
|
|
||||||
id | name
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
3 | parag jain
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
-- let's see what happend when we try to update distributed key of target table
|
|
||||||
MERGE INTO target_table t
|
|
||||||
USING (SELECT id, some_number from source_withdata where id = 1) s
|
|
||||||
on t.id = s.some_number
|
|
||||||
WHEN MATCHED THEN
|
|
||||||
UPDATE SET id = 1500;
|
|
||||||
ERROR: updating the distribution column is not allowed in MERGE actions
|
|
||||||
SELECT * FROM target_table;
|
|
||||||
id | name
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
3 | parag jain
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
-- test DELETE : when source is single sharded and table are colocated
|
|
||||||
MERGE INTO target_table t
|
|
||||||
USING (SELECT id, some_number from source_withdata where id = 1) s
|
|
||||||
on t.id = s.some_number
|
|
||||||
WHEN MATCHED THEN
|
|
||||||
DELETE;
|
|
||||||
DEBUG: Sub-query is not pushable, try repartitioning
|
|
||||||
DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns
|
|
||||||
DEBUG: Creating MERGE repartition plan
|
|
||||||
DEBUG: Using column - index:1 from the source list to redistribute
|
|
||||||
DEBUG: Collect source query results on coordinator
|
|
||||||
DEBUG: Create a MERGE task list that needs to be routed
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000076'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN DELETE>
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000077'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN DELETE>
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000078'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN DELETE>
|
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_table_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000079'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN MATCHED THEN DELETE>
|
|
||||||
DEBUG: Execute MERGE task list
|
|
||||||
-- let's verify if data deleted properly.
|
|
||||||
SELECT * FROM target_table;
|
|
||||||
id | name
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
(0 rows)
|
|
||||||
|
|
||||||
--
|
|
||||||
DELETE FROM source_withdata;
|
|
||||||
DELETE FROM target_table;
|
|
||||||
INSERT INTO source VALUES (1,1);
|
|
||||||
merge into target_table sda
|
|
||||||
using source_withdata sdn
|
|
||||||
on sda.id = sdn.id AND sda.id = 1
|
|
||||||
when not matched then
|
|
||||||
insert (id)
|
|
||||||
values (10000);
|
|
||||||
ERROR: MERGE INSERT is using unsupported expression type for distribution column
|
|
||||||
DETAIL: Inserting arbitrary values that don't correspond to the joined column values can lead to unpredictable outcomes where rows are incorrectly distributed among different shards
|
|
||||||
SELECT * FROM target_table WHERE id = 10000;
|
|
||||||
id | name
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
(0 rows)
|
|
||||||
|
|
||||||
RESET client_min_messages;
|
|
||||||
-- This will prune shards with restriction information as NOT MATCHED is void
|
-- This will prune shards with restriction information as NOT MATCHED is void
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET citus.log_remote_commands to true;
|
SET citus.log_remote_commands to true;
|
||||||
|
@ -3189,14 +2898,14 @@ WHEN NOT MATCHED THEN
|
||||||
-> Limit
|
-> Limit
|
||||||
-> Sort
|
-> Sort
|
||||||
Sort Key: id2
|
Sort Key: id2
|
||||||
-> Seq Scan on demo_source_table_4000151 demo_source_table
|
-> Seq Scan on demo_source_table_4000135 demo_source_table
|
||||||
-> Distributed Subplan XXX_2
|
-> Distributed Subplan XXX_2
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
Task Count: 4
|
Task Count: 4
|
||||||
Tasks Shown: One of 4
|
Tasks Shown: One of 4
|
||||||
-> Task
|
-> Task
|
||||||
Node: host=localhost port=xxxxx dbname=regression
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
-> Seq Scan on demo_source_table_4000151 demo_source_table
|
-> Seq Scan on demo_source_table_4000135 demo_source_table
|
||||||
Task Count: 1
|
Task Count: 1
|
||||||
Tasks Shown: All
|
Tasks Shown: All
|
||||||
-> Task
|
-> Task
|
||||||
|
@ -3410,10 +3119,10 @@ DEBUG: Creating MERGE repartition plan
|
||||||
DEBUG: Using column - index:0 from the source list to redistribute
|
DEBUG: Using column - index:0 from the source list to redistribute
|
||||||
DEBUG: Collect source query results on coordinator
|
DEBUG: Collect source query results on coordinator
|
||||||
DEBUG: Create a MERGE task list that needs to be routed
|
DEBUG: Create a MERGE task list that needs to be routed
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000163'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000147'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000164'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000148'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000165'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000149'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000166'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000150'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
||||||
DEBUG: Execute MERGE task list
|
DEBUG: Execute MERGE task list
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
SELECT * FROM target_6785 ORDER BY 1;
|
SELECT * FROM target_6785 ORDER BY 1;
|
||||||
|
@ -3531,7 +3240,7 @@ USING s1 s
|
||||||
ON t.id = s.id
|
ON t.id = s.id
|
||||||
WHEN NOT MATCHED THEN
|
WHEN NOT MATCHED THEN
|
||||||
INSERT (id) VALUES(s.val);
|
INSERT (id) VALUES(s.val);
|
||||||
ERROR: MERGE INSERT must use the source's joining column for target's distribution column
|
ERROR: MERGE INSERT must use the source table distribution column value
|
||||||
MERGE INTO t1 t
|
MERGE INTO t1 t
|
||||||
USING s1 s
|
USING s1 s
|
||||||
ON t.id = s.id
|
ON t.id = s.id
|
||||||
|
@ -4257,7 +3966,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_
|
||||||
PL/pgSQL function citus_drop_trigger() line XX at PERFORM
|
PL/pgSQL function citus_drop_trigger() line XX at PERFORM
|
||||||
DROP FUNCTION merge_when_and_write();
|
DROP FUNCTION merge_when_and_write();
|
||||||
DROP SCHEMA merge_schema CASCADE;
|
DROP SCHEMA merge_schema CASCADE;
|
||||||
NOTICE: drop cascades to 107 other objects
|
NOTICE: drop cascades to 103 other objects
|
||||||
DETAIL: drop cascades to function insert_data()
|
DETAIL: drop cascades to function insert_data()
|
||||||
drop cascades to table local_local
|
drop cascades to table local_local
|
||||||
drop cascades to table target
|
drop cascades to table target
|
||||||
|
@ -4317,10 +4026,6 @@ drop cascades to table pg_source
|
||||||
drop cascades to table citus_target
|
drop cascades to table citus_target
|
||||||
drop cascades to table citus_source
|
drop cascades to table citus_source
|
||||||
drop cascades to function compare_tables()
|
drop cascades to function compare_tables()
|
||||||
drop cascades to table source_pushdowntest
|
|
||||||
drop cascades to table target_pushdowntest
|
|
||||||
drop cascades to table source_withdata
|
|
||||||
drop cascades to table target_table
|
|
||||||
drop cascades to view pg_source_view
|
drop cascades to view pg_source_view
|
||||||
drop cascades to view citus_source_view
|
drop cascades to view citus_source_view
|
||||||
drop cascades to table pg_pa_target
|
drop cascades to table pg_pa_target
|
||||||
|
@ -4337,7 +4042,7 @@ drop cascades to table target_set
|
||||||
drop cascades to table source_set
|
drop cascades to table source_set
|
||||||
drop cascades to table refsource_ref
|
drop cascades to table refsource_ref
|
||||||
drop cascades to table pg_result
|
drop cascades to table pg_result
|
||||||
drop cascades to table refsource_ref_4000128
|
drop cascades to table refsource_ref_4000112
|
||||||
drop cascades to table pg_ref
|
drop cascades to table pg_ref
|
||||||
drop cascades to table local_ref
|
drop cascades to table local_ref
|
||||||
drop cascades to table reftarget_local
|
drop cascades to table reftarget_local
|
||||||
|
@ -4355,7 +4060,11 @@ drop cascades to table source_6785
|
||||||
drop cascades to table target_6785
|
drop cascades to table target_6785
|
||||||
drop cascades to function add_s(integer,integer)
|
drop cascades to function add_s(integer,integer)
|
||||||
drop cascades to table pg
|
drop cascades to table pg
|
||||||
drop cascades to table t1_4000190
|
drop cascades to table t1_4000174
|
||||||
drop cascades to table s1_4000191
|
drop cascades to table s1_4000175
|
||||||
drop cascades to table t1
|
drop cascades to table t1
|
||||||
and 7 other objects (see server log for list)
|
drop cascades to table s1
|
||||||
|
drop cascades to table dist_target
|
||||||
|
drop cascades to table dist_source
|
||||||
|
drop cascades to view show_tables
|
||||||
|
and 3 other objects (see server log for list)
|
||||||
|
|
|
@ -116,8 +116,7 @@ test: function_with_case_when
|
||||||
test: clock
|
test: clock
|
||||||
|
|
||||||
# MERGE tests
|
# MERGE tests
|
||||||
test: merge pgmerge
|
test: merge pgmerge merge_repartition2
|
||||||
test: merge_repartition2
|
|
||||||
test: merge_repartition1 merge_schema_sharding
|
test: merge_repartition1 merge_schema_sharding
|
||||||
test: merge_partition_tables
|
test: merge_partition_tables
|
||||||
|
|
||||||
|
|
|
@ -1206,139 +1206,6 @@ SET citus.log_remote_commands to false;
|
||||||
SELECT compare_tables();
|
SELECT compare_tables();
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
||||||
-- let's create source and target table
|
|
||||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000;
|
|
||||||
CREATE TABLE source_pushdowntest (id integer);
|
|
||||||
CREATE TABLE target_pushdowntest (id integer );
|
|
||||||
|
|
||||||
-- let's distribute both table on id field
|
|
||||||
SELECT create_distributed_table('source_pushdowntest', 'id');
|
|
||||||
SELECT create_distributed_table('target_pushdowntest', 'id');
|
|
||||||
|
|
||||||
-- we are doing this operation on single node setup let's figure out colocation id of both tables
|
|
||||||
-- both has same colocation id so both are colocated.
|
|
||||||
WITH colocations AS (
|
|
||||||
SELECT colocationid
|
|
||||||
FROM pg_dist_partition
|
|
||||||
WHERE logicalrelid = 'source_pushdowntest'::regclass
|
|
||||||
OR logicalrelid = 'target_pushdowntest'::regclass
|
|
||||||
)
|
|
||||||
SELECT
|
|
||||||
CASE
|
|
||||||
WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same'
|
|
||||||
ELSE 'Different'
|
|
||||||
END AS colocation_status
|
|
||||||
FROM colocations;
|
|
||||||
|
|
||||||
SET client_min_messages TO DEBUG1;
|
|
||||||
-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers.
|
|
||||||
|
|
||||||
EXPLAIN (costs off, timing off, summary off)
|
|
||||||
MERGE INTO target_pushdowntest t
|
|
||||||
USING source_pushdowntest s
|
|
||||||
ON t.id = s.id
|
|
||||||
WHEN NOT MATCHED THEN
|
|
||||||
INSERT (id)
|
|
||||||
VALUES (s.id);
|
|
||||||
|
|
||||||
-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker.
|
|
||||||
-- DEBUG LOGS show that query is getting pushed down
|
|
||||||
MERGE INTO target_pushdowntest t
|
|
||||||
USING (SELECT * from source_pushdowntest where id = 1) s
|
|
||||||
on t.id = s.id
|
|
||||||
WHEN NOT MATCHED THEN
|
|
||||||
INSERT (id)
|
|
||||||
VALUES (s.id);
|
|
||||||
|
|
||||||
|
|
||||||
-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown.
|
|
||||||
INSERT INTO source_pushdowntest (id) VALUES (3);
|
|
||||||
|
|
||||||
EXPLAIN (costs off, timing off, summary off)
|
|
||||||
MERGE INTO target_pushdowntest t
|
|
||||||
USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s
|
|
||||||
on t.id = s.somekey
|
|
||||||
WHEN NOT MATCHED THEN
|
|
||||||
INSERT (id)
|
|
||||||
VALUES (s.somekey);
|
|
||||||
|
|
||||||
|
|
||||||
-- let's verify if we use some other column from source for value of distributed column in target.
|
|
||||||
-- it should be inserted to correct shard of target.
|
|
||||||
CREATE TABLE source_withdata (id integer, some_number integer);
|
|
||||||
CREATE TABLE target_table (id integer, name text);
|
|
||||||
SELECT create_distributed_table('source_withdata', 'id');
|
|
||||||
SELECT create_distributed_table('target_table', 'id');
|
|
||||||
|
|
||||||
INSERT INTO source_withdata (id, some_number) VALUES (1, 3);
|
|
||||||
|
|
||||||
-- we will use some_number column from source_withdata to insert into distributed column of target.
|
|
||||||
-- value of some_number is 3 let's verify what shard it should go to.
|
|
||||||
select worker_hash(3);
|
|
||||||
|
|
||||||
-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard.
|
|
||||||
MERGE INTO target_table t
|
|
||||||
USING (SELECT id, some_number from source_withdata where id = 1) s
|
|
||||||
on t.id = s.some_number
|
|
||||||
WHEN NOT MATCHED THEN
|
|
||||||
INSERT (id, name)
|
|
||||||
VALUES (s.some_number, 'parag');
|
|
||||||
|
|
||||||
-- let's verify if data inserted to second shard of target.
|
|
||||||
EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table;
|
|
||||||
|
|
||||||
-- let's verify target data too.
|
|
||||||
SELECT * FROM target_table;
|
|
||||||
|
|
||||||
|
|
||||||
-- test UPDATE : when source is single sharded and table are colocated
|
|
||||||
MERGE INTO target_table t
|
|
||||||
USING (SELECT id, some_number from source_withdata where id = 1) s
|
|
||||||
on t.id = s.some_number
|
|
||||||
WHEN MATCHED THEN
|
|
||||||
UPDATE SET name = 'parag jain';
|
|
||||||
|
|
||||||
-- let's verify if data updated properly.
|
|
||||||
SELECT * FROM target_table;
|
|
||||||
|
|
||||||
-- let's see what happend when we try to update distributed key of target table
|
|
||||||
MERGE INTO target_table t
|
|
||||||
USING (SELECT id, some_number from source_withdata where id = 1) s
|
|
||||||
on t.id = s.some_number
|
|
||||||
WHEN MATCHED THEN
|
|
||||||
UPDATE SET id = 1500;
|
|
||||||
|
|
||||||
SELECT * FROM target_table;
|
|
||||||
|
|
||||||
-- test DELETE : when source is single sharded and table are colocated
|
|
||||||
MERGE INTO target_table t
|
|
||||||
USING (SELECT id, some_number from source_withdata where id = 1) s
|
|
||||||
on t.id = s.some_number
|
|
||||||
WHEN MATCHED THEN
|
|
||||||
DELETE;
|
|
||||||
|
|
||||||
-- let's verify if data deleted properly.
|
|
||||||
SELECT * FROM target_table;
|
|
||||||
|
|
||||||
--
|
|
||||||
DELETE FROM source_withdata;
|
|
||||||
DELETE FROM target_table;
|
|
||||||
INSERT INTO source VALUES (1,1);
|
|
||||||
|
|
||||||
merge into target_table sda
|
|
||||||
using source_withdata sdn
|
|
||||||
on sda.id = sdn.id AND sda.id = 1
|
|
||||||
when not matched then
|
|
||||||
insert (id)
|
|
||||||
values (10000);
|
|
||||||
|
|
||||||
SELECT * FROM target_table WHERE id = 10000;
|
|
||||||
|
|
||||||
RESET client_min_messages;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- This will prune shards with restriction information as NOT MATCHED is void
|
-- This will prune shards with restriction information as NOT MATCHED is void
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET citus.log_remote_commands to true;
|
SET citus.log_remote_commands to true;
|
||||||
|
|
Loading…
Reference in New Issue