merge command fix

pull/7626/head
paragjain 2024-06-07 16:35:11 +00:00 committed by Teja Mupparti
parent 8c9de08b76
commit eedb607cd5
3 changed files with 314 additions and 30 deletions

View File

@ -182,14 +182,6 @@ CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery, Query *query,
return distributedPlan;
}
Var *insertVar =
FetchAndValidateInsertVarIfExists(targetRelationId, originalQuery);
if (insertVar &&
!IsDistributionColumnInMergeSource((Expr *) insertVar, originalQuery, true))
{
ereport(ERROR, (errmsg("MERGE INSERT must use the source table "
"distribution column value")));
}
Job *job = RouterJob(originalQuery, plannerRestrictionContext,
&distributedPlan->planningError);
@ -1124,6 +1116,26 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
"repartitioning")));
return deferredError;
}
/*
* If execution has reached this point, it indicates that the query can be delegated to the worker.
* However, before proceeding with this delegation, we need to confirm that the user is utilizing
* the distribution column of the source table in the Insert variable.
* If this is not the case, we should refrain from pushing down the query.
*/
Var *insertVar =
FetchAndValidateInsertVarIfExists(targetRelationId, query);
if (insertVar &&
!IsDistributionColumnInMergeSource((Expr *) insertVar, query, true))
{
ereport(DEBUG1, (errmsg(
"MERGE INSERT must use the source table distribution column value, try repartitioning")));
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"MERGE INSERT must use the source table "
"distribution column value", NULL, NULL);
}
return NULL;
}

View File

@ -1842,6 +1842,208 @@ SELECT compare_tables();
(1 row)
ROLLBACK;
-- let's create source and target table
CREATE TABLE source_pushdowntest (id integer);
CREATE TABLE target_pushdowntest (id integer );
-- let's distribute both table on id field
SELECT create_distributed_table('source_pushdowntest', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('target_pushdowntest', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- we are doing this operation on single node setup let's figure out colocation id of both tables
-- both has same colocation id so both are colocated.
select colocationid,logicalrelid from pg_dist_partition where logicalrelid = 'source_pushdowntest'::regclass OR logicalrelid = 'target_pushdowntest'::regclass;
colocationid | logicalrelid
---------------------------------------------------------------------
1390004 | source_pushdowntest
1390004 | target_pushdowntest
(2 rows)
SET client_min_messages TO DEBUG1;
-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers.
EXPLAIN MERGE INTO target_pushdowntest t
USING source_pushdowntest s
ON t.id = s.id
WHEN NOT MATCHED THEN
INSERT (id)
VALUES (s.id);
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING merge_schema.source_pushdowntest_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
DEBUG: Creating MERGE router plan
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_pushdowntest_4000068 t (cost=359.57..860.00 rows=0 width=0)
-> Merge Left Join (cost=359.57..860.00 rows=32512 width=16)
Merge Cond: (s.id = t.id)
-> Sort (cost=179.78..186.16 rows=2550 width=10)
Sort Key: s.id
-> Seq Scan on source_pushdowntest_4000064 s (cost=0.00..35.50 rows=2550 width=10)
-> Sort (cost=179.78..186.16 rows=2550 width=10)
Sort Key: t.id
-> Seq Scan on target_pushdowntest_4000068 t (cost=0.00..35.50 rows=2550 width=10)
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_pushdowntest_4000069 t (cost=359.57..860.00 rows=0 width=0)
-> Merge Left Join (cost=359.57..860.00 rows=32512 width=16)
Merge Cond: (s.id = t.id)
-> Sort (cost=179.78..186.16 rows=2550 width=10)
Sort Key: s.id
-> Seq Scan on source_pushdowntest_4000065 s (cost=0.00..35.50 rows=2550 width=10)
-> Sort (cost=179.78..186.16 rows=2550 width=10)
Sort Key: t.id
-> Seq Scan on target_pushdowntest_4000069 t (cost=0.00..35.50 rows=2550 width=10)
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_pushdowntest_4000070 t (cost=359.57..860.00 rows=0 width=0)
-> Merge Left Join (cost=359.57..860.00 rows=32512 width=16)
Merge Cond: (s.id = t.id)
-> Sort (cost=179.78..186.16 rows=2550 width=10)
Sort Key: s.id
-> Seq Scan on source_pushdowntest_4000066 s (cost=0.00..35.50 rows=2550 width=10)
-> Sort (cost=179.78..186.16 rows=2550 width=10)
Sort Key: t.id
-> Seq Scan on target_pushdowntest_4000070 t (cost=0.00..35.50 rows=2550 width=10)
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_pushdowntest_4000071 t (cost=359.57..860.00 rows=0 width=0)
-> Merge Left Join (cost=359.57..860.00 rows=32512 width=16)
Merge Cond: (s.id = t.id)
-> Sort (cost=179.78..186.16 rows=2550 width=10)
Sort Key: s.id
-> Seq Scan on source_pushdowntest_4000067 s (cost=0.00..35.50 rows=2550 width=10)
-> Sort (cost=179.78..186.16 rows=2550 width=10)
Sort Key: t.id
-> Seq Scan on target_pushdowntest_4000071 t (cost=0.00..35.50 rows=2550 width=10)
(47 rows)
-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker.
EXPLAIN MERGE INTO target_pushdowntest t
USING (SELECT * from source_pushdowntest where id = 1) s
on t.id = s.id
WHEN NOT MATCHED THEN
INSERT (id)
VALUES (s.id);
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING (SELECT source_pushdowntest.id FROM merge_schema.source_pushdowntest_xxxxxxx source_pushdowntest WHERE (source_pushdowntest.id OPERATOR(pg_catalog.=) 1)) s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id)>
DEBUG: Creating MERGE router plan
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_pushdowntest_4000068 t (cost=0.00..85.89 rows=0 width=0)
-> Nested Loop Left Join (cost=0.00..85.89 rows=169 width=16)
-> Seq Scan on source_pushdowntest_4000064 source_pushdowntest (cost=0.00..41.88 rows=13 width=10)
Filter: (id = 1)
-> Materialize (cost=0.00..41.94 rows=13 width=10)
-> Seq Scan on target_pushdowntest_4000068 t (cost=0.00..41.88 rows=13 width=10)
Filter: (id = 1)
(12 rows)
-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown.
INSERT INTO source_pushdowntest (id) VALUES (3);
EXPLAIN MERGE INTO target_pushdowntest t
USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s
on t.id = s.somekey
WHEN NOT MATCHED THEN
INSERT (id)
VALUES (s.somekey);
DEBUG: MERGE INSERT must use the source table distribution column value, try repartitioning
DEBUG: MERGE INSERT must use the source table distribution column value
DEBUG: Creating MERGE repartition plan
DEBUG: Using column - index:0 from the source list to redistribute
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus MERGE INTO ...) (cost=0.00..0.00 rows=0 width=0)
MERGE INTO target_pushdowntest method: pull to coordinator
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on source_pushdowntest_4000064 source_pushdowntest (cost=0.00..41.88 rows=13 width=8)
Filter: (id = 1)
(9 rows)
-- let's verify if we use some other column from source for value of distributed column in target.
-- it should be inserted to correct shard of target.
CREATE TABLE source_withdata (id integer, some_number integer);
SELECT create_distributed_table('source_withdata', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO source_withdata (id, some_number) VALUES (1, 3);
-- we will use some_number column from source_withdata to insert into distributed column of target.
-- value of some_number is 3 let's verify what shard it should go to.
select worker_hash(3);
worker_hash
---------------------------------------------------------------------
-28094569
(1 row)
-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard.
MERGE INTO target_pushdowntest t
USING (SELECT id, some_number from source_withdata where id = 1) s
on t.id = s.some_number
WHEN NOT MATCHED THEN
INSERT (id)
VALUES (s.some_number);
DEBUG: Sub-query is not pushable, try repartitioning
DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns
DEBUG: Creating MERGE repartition plan
DEBUG: Using column - index:1 from the source list to redistribute
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000068'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.some_number)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000069'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.some_number)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000070'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.some_number)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_pushdowntest_xxxxxxx t USING (SELECT intermediate_result.id, intermediate_result.some_number FROM read_intermediate_result('merge_into_XXX_4000071'::text, 'binary'::citus_copy_format) intermediate_result(id integer, some_number integer)) s ON (t.id OPERATOR(pg_catalog.=) s.some_number) WHEN NOT MATCHED THEN INSERT (id) VALUES (s.some_number)>
DEBUG: Execute MERGE task list
-- let's verify if data inserted to second shard of target.
EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_pushdowntest;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 4
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on target_pushdowntest_4000068 target_pushdowntest (actual rows=0 loops=1)
-> Task
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on target_pushdowntest_4000069 target_pushdowntest (actual rows=1 loops=1)
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on target_pushdowntest_4000070 target_pushdowntest (actual rows=0 loops=1)
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on target_pushdowntest_4000071 target_pushdowntest (actual rows=0 loops=1)
(20 rows)
RESET client_min_messages;
-- This will prune shards with restriction information as NOT MATCHED is void
BEGIN;
SET citus.log_remote_commands to true;
@ -2898,14 +3100,14 @@ WHEN NOT MATCHED THEN
-> Limit
-> Sort
Sort Key: id2
-> Seq Scan on demo_source_table_4000135 demo_source_table
-> Seq Scan on demo_source_table_4000147 demo_source_table
-> Distributed Subplan XXX_2
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on demo_source_table_4000135 demo_source_table
-> Seq Scan on demo_source_table_4000147 demo_source_table
Task Count: 1
Tasks Shown: All
-> Task
@ -3119,10 +3321,10 @@ DEBUG: Creating MERGE repartition plan
DEBUG: Using column - index:0 from the source list to redistribute
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000147'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000148'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000149'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000150'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000159'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000160'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000161'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000162'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
DEBUG: Execute MERGE task list
RESET client_min_messages;
SELECT * FROM target_6785 ORDER BY 1;
@ -3240,7 +3442,7 @@ USING s1 s
ON t.id = s.id
WHEN NOT MATCHED THEN
INSERT (id) VALUES(s.val);
ERROR: MERGE INSERT must use the source table distribution column value
ERROR: MERGE INSERT must use the source's joining column for target's distribution column
MERGE INTO t1 t
USING s1 s
ON t.id = s.id
@ -3966,7 +4168,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_
PL/pgSQL function citus_drop_trigger() line XX at PERFORM
DROP FUNCTION merge_when_and_write();
DROP SCHEMA merge_schema CASCADE;
NOTICE: drop cascades to 103 other objects
NOTICE: drop cascades to 106 other objects
DETAIL: drop cascades to function insert_data()
drop cascades to table local_local
drop cascades to table target
@ -4026,6 +4228,9 @@ drop cascades to table pg_source
drop cascades to table citus_target
drop cascades to table citus_source
drop cascades to function compare_tables()
drop cascades to table source_pushdowntest
drop cascades to table target_pushdowntest
drop cascades to table source_withdata
drop cascades to view pg_source_view
drop cascades to view citus_source_view
drop cascades to table pg_pa_target
@ -4042,7 +4247,7 @@ drop cascades to table target_set
drop cascades to table source_set
drop cascades to table refsource_ref
drop cascades to table pg_result
drop cascades to table refsource_ref_4000112
drop cascades to table refsource_ref_4000124
drop cascades to table pg_ref
drop cascades to table local_ref
drop cascades to table reftarget_local
@ -4060,11 +4265,8 @@ drop cascades to table source_6785
drop cascades to table target_6785
drop cascades to function add_s(integer,integer)
drop cascades to table pg
drop cascades to table t1_4000174
drop cascades to table s1_4000175
drop cascades to table t1_4000186
drop cascades to table s1_4000187
drop cascades to table t1
drop cascades to table s1
drop cascades to table dist_target
drop cascades to table dist_source
drop cascades to view show_tables
and 3 other objects (see server log for list)
and 6 other objects (see server log for list)

View File

@ -1206,6 +1206,76 @@ SET citus.log_remote_commands to false;
SELECT compare_tables();
ROLLBACK;
-- let's create source and target table
CREATE TABLE source_pushdowntest (id integer);
CREATE TABLE target_pushdowntest (id integer );
-- let's distribute both table on id field
SELECT create_distributed_table('source_pushdowntest', 'id');
SELECT create_distributed_table('target_pushdowntest', 'id');
-- we are doing this operation on single node setup let's figure out colocation id of both tables
-- both has same colocation id so both are colocated.
select colocationid,logicalrelid from pg_dist_partition where logicalrelid = 'source_pushdowntest'::regclass OR logicalrelid = 'target_pushdowntest'::regclass;
SET client_min_messages TO DEBUG1;
-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers.
EXPLAIN MERGE INTO target_pushdowntest t
USING source_pushdowntest s
ON t.id = s.id
WHEN NOT MATCHED THEN
INSERT (id)
VALUES (s.id);
-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker.
EXPLAIN MERGE INTO target_pushdowntest t
USING (SELECT * from source_pushdowntest where id = 1) s
on t.id = s.id
WHEN NOT MATCHED THEN
INSERT (id)
VALUES (s.id);
-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown.
INSERT INTO source_pushdowntest (id) VALUES (3);
EXPLAIN MERGE INTO target_pushdowntest t
USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s
on t.id = s.somekey
WHEN NOT MATCHED THEN
INSERT (id)
VALUES (s.somekey);
-- let's verify if we use some other column from source for value of distributed column in target.
-- it should be inserted to correct shard of target.
CREATE TABLE source_withdata (id integer, some_number integer);
SELECT create_distributed_table('source_withdata', 'id');
INSERT INTO source_withdata (id, some_number) VALUES (1, 3);
-- we will use some_number column from source_withdata to insert into distributed column of target.
-- value of some_number is 3 let's verify what shard it should go to.
select worker_hash(3);
-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard.
MERGE INTO target_pushdowntest t
USING (SELECT id, some_number from source_withdata where id = 1) s
on t.id = s.some_number
WHEN NOT MATCHED THEN
INSERT (id)
VALUES (s.some_number);
-- let's verify if data inserted to second shard of target.
EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_pushdowntest;
RESET client_min_messages;
-- This will prune shards with restriction information as NOT MATCHED is void
BEGIN;
SET citus.log_remote_commands to true;