diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 4d64b8f56..b0ebe774b 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -182,14 +182,6 @@ CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery, Query *query, return distributedPlan; } - Var *insertVar = - FetchAndValidateInsertVarIfExists(targetRelationId, originalQuery); - if (insertVar && - !IsDistributionColumnInMergeSource((Expr *) insertVar, originalQuery, true)) - { - ereport(ERROR, (errmsg("MERGE INSERT must use the source table " - "distribution column value"))); - } Job *job = RouterJob(originalQuery, plannerRestrictionContext, &distributedPlan->planningError); @@ -1124,6 +1116,26 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, "repartitioning"))); return deferredError; } + + + /* + * If execution has reached this point, it indicates that the query can be delegated to the worker. + * However, before proceeding with this delegation, we need to confirm that the user is utilizing + * the distribution column of the source table in the Insert variable. + * If this is not the case, we should refrain from pushing down the query. + */ + + Var *insertVar = + FetchAndValidateInsertVarIfExists(targetRelationId, query); + if (insertVar && + !IsDistributionColumnInMergeSource((Expr *) insertVar, query, true)) + { + ereport(DEBUG1, (errmsg( + "MERGE INSERT must use the source table distribution column value, try repartitioning"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE INSERT must use the source table " + "distribution column value", NULL, NULL); + } return NULL; } diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index a73467e81..42ded746e 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -1128,7 +1128,7 @@ DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(rs_source.id); DEBUG: Creating MERGE router plan -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO rs_local FROM rs_target ORDER BY 1 ; -- Should be equal @@ -1259,7 +1259,7 @@ DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(fn_source.id, fn_source.source); DEBUG: Creating MERGE router plan -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO fn_local FROM fn_target ORDER BY 1 ; -- Should be equal @@ -1552,7 +1552,7 @@ BEGIN; SET citus.log_remote_commands to true; SET client_min_messages TO DEBUG1; EXECUTE merge_prepare(2); -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -1782,13 +1782,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT compare_tables(); @@ -1842,6 +1842,208 @@ SELECT compare_tables(); (1 row) ROLLBACK; +-- let's create source and target table +CREATE TABLE source_pushdowntest (id integer); +CREATE TABLE target_pushdowntest (id integer ); +-- let's distribute both table on id field +SELECT create_distributed_table('source_pushdowntest', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target_pushdowntest', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- we are doing this operation on single node setup let's figure out colocation id of both tables +-- both has same colocation id so both are colocated. +select colocationid,logicalrelid from pg_dist_partition where logicalrelid = 'source_pushdowntest'::regclass OR logicalrelid = 'target_pushdowntest'::regclass; + colocationid | logicalrelid +--------------------------------------------------------------------- + 1390004 | source_pushdowntest + 1390004 | target_pushdowntest +(2 rows) + +SET client_min_messages TO DEBUG1; +-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. +EXPLAIN MERGE INTO target_pushdowntest t +USING source_pushdowntest s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: Creating MERGE router plan + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000068 t (cost=359.57..860.00 rows=0 width=0) + -> Merge Left Join (cost=359.57..860.00 rows=32512 width=16) + Merge Cond: (s.id = t.id) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000064 s (cost=0.00..35.50 rows=2550 width=10) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000068 t (cost=0.00..35.50 rows=2550 width=10) + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000069 t (cost=359.57..860.00 rows=0 width=0) + -> Merge Left Join (cost=359.57..860.00 rows=32512 width=16) + Merge Cond: (s.id = t.id) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000065 s (cost=0.00..35.50 rows=2550 width=10) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000069 t (cost=0.00..35.50 rows=2550 width=10) + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000070 t (cost=359.57..860.00 rows=0 width=0) + -> Merge Left Join (cost=359.57..860.00 rows=32512 width=16) + Merge Cond: (s.id = t.id) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000066 s (cost=0.00..35.50 rows=2550 width=10) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000070 t (cost=0.00..35.50 rows=2550 width=10) + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000071 t (cost=359.57..860.00 rows=0 width=0) + -> Merge Left Join (cost=359.57..860.00 rows=32512 width=16) + Merge Cond: (s.id = t.id) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000067 s (cost=0.00..35.50 rows=2550 width=10) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000071 t (cost=0.00..35.50 rows=2550 width=10) +(47 rows) + +-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. +EXPLAIN MERGE INTO target_pushdowntest t +USING (SELECT * from source_pushdowntest where id = 1) s +on t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); +DEBUG: +DEBUG: Creating MERGE router plan + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000068 t (cost=0.00..85.89 rows=0 width=0) + -> Nested Loop Left Join (cost=0.00..85.89 rows=169 width=16) + -> Seq Scan on source_pushdowntest_4000064 source_pushdowntest (cost=0.00..41.88 rows=13 width=10) + Filter: (id = 1) + -> Materialize (cost=0.00..41.94 rows=13 width=10) + -> Seq Scan on target_pushdowntest_4000068 t (cost=0.00..41.88 rows=13 width=10) + Filter: (id = 1) +(12 rows) + +-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown. +INSERT INTO source_pushdowntest (id) VALUES (3); +EXPLAIN MERGE INTO target_pushdowntest t +USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s +on t.id = s.somekey +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.somekey); +DEBUG: MERGE INSERT must use the source table distribution column value, try repartitioning +DEBUG: MERGE INSERT must use the source table distribution column value +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:0 from the source list to redistribute + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus MERGE INTO ...) (cost=0.00..0.00 rows=0 width=0) + MERGE INTO target_pushdowntest method: pull to coordinator + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_pushdowntest_4000064 source_pushdowntest (cost=0.00..41.88 rows=13 width=8) + Filter: (id = 1) +(9 rows) + +-- let's verify if we use some other column from source for value of distributed column in target. +-- it should be inserted to correct shard of target. +CREATE TABLE source_withdata (id integer, some_number integer); +SELECT create_distributed_table('source_withdata', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source_withdata (id, some_number) VALUES (1, 3); +-- we will use some_number column from source_withdata to insert into distributed column of target. +-- value of some_number is 3 let's verify what shard it should go to. +select worker_hash(3); + worker_hash +--------------------------------------------------------------------- + -28094569 +(1 row) + +-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard. +MERGE INTO target_pushdowntest t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.some_number); +DEBUG: Sub-query is not pushable, try repartitioning +DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:1 from the source list to redistribute +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: Execute MERGE task list +-- let's verify if data inserted to second shard of target. +EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_pushdowntest; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 4 + Tuple data received from nodes: 4 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_pushdowntest_4000068 target_pushdowntest (actual rows=0 loops=1) + -> Task + Tuple data received from node: 4 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_pushdowntest_4000069 target_pushdowntest (actual rows=1 loops=1) + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_pushdowntest_4000070 target_pushdowntest (actual rows=0 loops=1) + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_pushdowntest_4000071 target_pushdowntest (actual rows=0 loops=1) +(20 rows) + +RESET client_min_messages; -- This will prune shards with restriction information as NOT MATCHED is void BEGIN; SET citus.log_remote_commands to true; @@ -2898,14 +3100,14 @@ WHEN NOT MATCHED THEN -> Limit -> Sort Sort Key: id2 - -> Seq Scan on demo_source_table_4000135 demo_source_table + -> Seq Scan on demo_source_table_4000147 demo_source_table -> Distributed Subplan XXX_2 -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on demo_source_table_4000135 demo_source_table + -> Seq Scan on demo_source_table_4000147 demo_source_table Task Count: 1 Tasks Shown: All -> Task @@ -3119,10 +3321,10 @@ DEBUG: Creating MERGE repartition plan DEBUG: Using column - index:0 from the source list to redistribute DEBUG: Collect source query results on coordinator DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: -DEBUG: -DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: DEBUG: Execute MERGE task list RESET client_min_messages; SELECT * FROM target_6785 ORDER BY 1; @@ -3240,7 +3442,7 @@ USING s1 s ON t.id = s.id WHEN NOT MATCHED THEN INSERT (id) VALUES(s.val); -ERROR: MERGE INSERT must use the source table distribution column value +ERROR: MERGE INSERT must use the source's joining column for target's distribution column MERGE INTO t1 t USING s1 s ON t.id = s.id @@ -3966,7 +4168,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_ PL/pgSQL function citus_drop_trigger() line XX at PERFORM DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 103 other objects +NOTICE: drop cascades to 106 other objects DETAIL: drop cascades to function insert_data() drop cascades to table local_local drop cascades to table target @@ -4026,6 +4228,9 @@ drop cascades to table pg_source drop cascades to table citus_target drop cascades to table citus_source drop cascades to function compare_tables() +drop cascades to table source_pushdowntest +drop cascades to table target_pushdowntest +drop cascades to table source_withdata drop cascades to view pg_source_view drop cascades to view citus_source_view drop cascades to table pg_pa_target @@ -4042,7 +4247,7 @@ drop cascades to table target_set drop cascades to table source_set drop cascades to table refsource_ref drop cascades to table pg_result -drop cascades to table refsource_ref_4000112 +drop cascades to table refsource_ref_4000124 drop cascades to table pg_ref drop cascades to table local_ref drop cascades to table reftarget_local @@ -4060,11 +4265,8 @@ drop cascades to table source_6785 drop cascades to table target_6785 drop cascades to function add_s(integer,integer) drop cascades to table pg -drop cascades to table t1_4000174 -drop cascades to table s1_4000175 +drop cascades to table t1_4000186 +drop cascades to table s1_4000187 drop cascades to table t1 drop cascades to table s1 -drop cascades to table dist_target -drop cascades to table dist_source -drop cascades to view show_tables -and 3 other objects (see server log for list) +and 6 other objects (see server log for list) diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index a41e80841..61bb06cea 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1206,6 +1206,76 @@ SET citus.log_remote_commands to false; SELECT compare_tables(); ROLLBACK; + +-- let's create source and target table +CREATE TABLE source_pushdowntest (id integer); +CREATE TABLE target_pushdowntest (id integer ); + +-- let's distribute both table on id field +SELECT create_distributed_table('source_pushdowntest', 'id'); +SELECT create_distributed_table('target_pushdowntest', 'id'); + +-- we are doing this operation on single node setup let's figure out colocation id of both tables +-- both has same colocation id so both are colocated. +select colocationid,logicalrelid from pg_dist_partition where logicalrelid = 'source_pushdowntest'::regclass OR logicalrelid = 'target_pushdowntest'::regclass; + +SET client_min_messages TO DEBUG1; +-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. + +EXPLAIN MERGE INTO target_pushdowntest t +USING source_pushdowntest s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); + +-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. + +EXPLAIN MERGE INTO target_pushdowntest t +USING (SELECT * from source_pushdowntest where id = 1) s +on t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); + + +-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown. +INSERT INTO source_pushdowntest (id) VALUES (3); + +EXPLAIN MERGE INTO target_pushdowntest t +USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s +on t.id = s.somekey +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.somekey); + + +-- let's verify if we use some other column from source for value of distributed column in target. +-- it should be inserted to correct shard of target. +CREATE TABLE source_withdata (id integer, some_number integer); +SELECT create_distributed_table('source_withdata', 'id'); + +INSERT INTO source_withdata (id, some_number) VALUES (1, 3); + +-- we will use some_number column from source_withdata to insert into distributed column of target. +-- value of some_number is 3 let's verify what shard it should go to. +select worker_hash(3); + +-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard. +MERGE INTO target_pushdowntest t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.some_number); + +-- let's verify if data inserted to second shard of target. +EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_pushdowntest; + +RESET client_min_messages; + + + -- This will prune shards with restriction information as NOT MATCHED is void BEGIN; SET citus.log_remote_commands to true;