From 8c9de08b76332308deb9fd082d0d00f4afba8cd3 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Fri, 14 Jun 2024 16:20:23 +0200 Subject: [PATCH 01/13] Fix CI issues after Github Actions networking changes (#7624) For some reason using localhost in our hba file doesn't have the intended effect anymore in our Github Actions runners. Probably because of some networking change (IPv6 maybe) or some change in the `/etc/hosts` file. Replacing localhost with the equivalent loopback IPv4 and IPv6 addresses resolved this issue. --- src/test/regress/pg_regress_multi.pl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 01e57c469..35671ad26 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -296,10 +296,12 @@ sub generate_hba open(my $fh, ">", catfile($TMP_CHECKDIR, $nodename, "data", "pg_hba.conf")) or die "could not open pg_hba.conf"; - print $fh "host all alice,bob localhost md5\n"; + print $fh "host all alice,bob 127.0.0.1/32 md5\n"; + print $fh "host all alice,bob ::1/128 md5\n"; print $fh "host all all 127.0.0.1/32 trust\n"; print $fh "host all all ::1/128 trust\n"; - print $fh "host replication postgres localhost trust\n"; + print $fh "host replication postgres 127.0.0.1/32 trust\n"; + print $fh "host replication postgres ::1/128 trust\n"; close $fh; } From eedb607cd547a1daac1be32ab6e6fc11b4bfbed9 Mon Sep 17 00:00:00 2001 From: paragjain Date: Fri, 7 Jun 2024 16:35:11 +0000 Subject: [PATCH 02/13] merge command fix --- .../distributed/planner/merge_planner.c | 28 +- src/test/regress/expected/merge.out | 246 ++++++++++++++++-- src/test/regress/sql/merge.sql | 70 +++++ 3 files changed, 314 insertions(+), 30 deletions(-) diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 4d64b8f56..b0ebe774b 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -182,14 +182,6 @@ CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery, Query *query, return distributedPlan; } - Var *insertVar = - FetchAndValidateInsertVarIfExists(targetRelationId, originalQuery); - if (insertVar && - !IsDistributionColumnInMergeSource((Expr *) insertVar, originalQuery, true)) - { - ereport(ERROR, (errmsg("MERGE INSERT must use the source table " - "distribution column value"))); - } Job *job = RouterJob(originalQuery, plannerRestrictionContext, &distributedPlan->planningError); @@ -1124,6 +1116,26 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, "repartitioning"))); return deferredError; } + + + /* + * If execution has reached this point, it indicates that the query can be delegated to the worker. + * However, before proceeding with this delegation, we need to confirm that the user is utilizing + * the distribution column of the source table in the Insert variable. + * If this is not the case, we should refrain from pushing down the query. + */ + + Var *insertVar = + FetchAndValidateInsertVarIfExists(targetRelationId, query); + if (insertVar && + !IsDistributionColumnInMergeSource((Expr *) insertVar, query, true)) + { + ereport(DEBUG1, (errmsg( + "MERGE INSERT must use the source table distribution column value, try repartitioning"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE INSERT must use the source table " + "distribution column value", NULL, NULL); + } return NULL; } diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index a73467e81..42ded746e 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -1128,7 +1128,7 @@ DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(rs_source.id); DEBUG: Creating MERGE router plan -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO rs_local FROM rs_target ORDER BY 1 ; -- Should be equal @@ -1259,7 +1259,7 @@ DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(fn_source.id, fn_source.source); DEBUG: Creating MERGE router plan -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO fn_local FROM fn_target ORDER BY 1 ; -- Should be equal @@ -1552,7 +1552,7 @@ BEGIN; SET citus.log_remote_commands to true; SET client_min_messages TO DEBUG1; EXECUTE merge_prepare(2); -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -1782,13 +1782,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT compare_tables(); @@ -1842,6 +1842,208 @@ SELECT compare_tables(); (1 row) ROLLBACK; +-- let's create source and target table +CREATE TABLE source_pushdowntest (id integer); +CREATE TABLE target_pushdowntest (id integer ); +-- let's distribute both table on id field +SELECT create_distributed_table('source_pushdowntest', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target_pushdowntest', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- we are doing this operation on single node setup let's figure out colocation id of both tables +-- both has same colocation id so both are colocated. +select colocationid,logicalrelid from pg_dist_partition where logicalrelid = 'source_pushdowntest'::regclass OR logicalrelid = 'target_pushdowntest'::regclass; + colocationid | logicalrelid +--------------------------------------------------------------------- + 1390004 | source_pushdowntest + 1390004 | target_pushdowntest +(2 rows) + +SET client_min_messages TO DEBUG1; +-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. +EXPLAIN MERGE INTO target_pushdowntest t +USING source_pushdowntest s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: Creating MERGE router plan + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000068 t (cost=359.57..860.00 rows=0 width=0) + -> Merge Left Join (cost=359.57..860.00 rows=32512 width=16) + Merge Cond: (s.id = t.id) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000064 s (cost=0.00..35.50 rows=2550 width=10) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000068 t (cost=0.00..35.50 rows=2550 width=10) + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000069 t (cost=359.57..860.00 rows=0 width=0) + -> Merge Left Join (cost=359.57..860.00 rows=32512 width=16) + Merge Cond: (s.id = t.id) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000065 s (cost=0.00..35.50 rows=2550 width=10) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000069 t (cost=0.00..35.50 rows=2550 width=10) + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000070 t (cost=359.57..860.00 rows=0 width=0) + -> Merge Left Join (cost=359.57..860.00 rows=32512 width=16) + Merge Cond: (s.id = t.id) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000066 s (cost=0.00..35.50 rows=2550 width=10) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000070 t (cost=0.00..35.50 rows=2550 width=10) + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000071 t (cost=359.57..860.00 rows=0 width=0) + -> Merge Left Join (cost=359.57..860.00 rows=32512 width=16) + Merge Cond: (s.id = t.id) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000067 s (cost=0.00..35.50 rows=2550 width=10) + -> Sort (cost=179.78..186.16 rows=2550 width=10) + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000071 t (cost=0.00..35.50 rows=2550 width=10) +(47 rows) + +-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. +EXPLAIN MERGE INTO target_pushdowntest t +USING (SELECT * from source_pushdowntest where id = 1) s +on t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); +DEBUG: +DEBUG: Creating MERGE router plan + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000068 t (cost=0.00..85.89 rows=0 width=0) + -> Nested Loop Left Join (cost=0.00..85.89 rows=169 width=16) + -> Seq Scan on source_pushdowntest_4000064 source_pushdowntest (cost=0.00..41.88 rows=13 width=10) + Filter: (id = 1) + -> Materialize (cost=0.00..41.94 rows=13 width=10) + -> Seq Scan on target_pushdowntest_4000068 t (cost=0.00..41.88 rows=13 width=10) + Filter: (id = 1) +(12 rows) + +-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown. +INSERT INTO source_pushdowntest (id) VALUES (3); +EXPLAIN MERGE INTO target_pushdowntest t +USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s +on t.id = s.somekey +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.somekey); +DEBUG: MERGE INSERT must use the source table distribution column value, try repartitioning +DEBUG: MERGE INSERT must use the source table distribution column value +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:0 from the source list to redistribute + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus MERGE INTO ...) (cost=0.00..0.00 rows=0 width=0) + MERGE INTO target_pushdowntest method: pull to coordinator + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_pushdowntest_4000064 source_pushdowntest (cost=0.00..41.88 rows=13 width=8) + Filter: (id = 1) +(9 rows) + +-- let's verify if we use some other column from source for value of distributed column in target. +-- it should be inserted to correct shard of target. +CREATE TABLE source_withdata (id integer, some_number integer); +SELECT create_distributed_table('source_withdata', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source_withdata (id, some_number) VALUES (1, 3); +-- we will use some_number column from source_withdata to insert into distributed column of target. +-- value of some_number is 3 let's verify what shard it should go to. +select worker_hash(3); + worker_hash +--------------------------------------------------------------------- + -28094569 +(1 row) + +-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard. +MERGE INTO target_pushdowntest t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.some_number); +DEBUG: Sub-query is not pushable, try repartitioning +DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:1 from the source list to redistribute +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: Execute MERGE task list +-- let's verify if data inserted to second shard of target. +EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_pushdowntest; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 4 + Tuple data received from nodes: 4 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_pushdowntest_4000068 target_pushdowntest (actual rows=0 loops=1) + -> Task + Tuple data received from node: 4 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_pushdowntest_4000069 target_pushdowntest (actual rows=1 loops=1) + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_pushdowntest_4000070 target_pushdowntest (actual rows=0 loops=1) + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_pushdowntest_4000071 target_pushdowntest (actual rows=0 loops=1) +(20 rows) + +RESET client_min_messages; -- This will prune shards with restriction information as NOT MATCHED is void BEGIN; SET citus.log_remote_commands to true; @@ -2898,14 +3100,14 @@ WHEN NOT MATCHED THEN -> Limit -> Sort Sort Key: id2 - -> Seq Scan on demo_source_table_4000135 demo_source_table + -> Seq Scan on demo_source_table_4000147 demo_source_table -> Distributed Subplan XXX_2 -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on demo_source_table_4000135 demo_source_table + -> Seq Scan on demo_source_table_4000147 demo_source_table Task Count: 1 Tasks Shown: All -> Task @@ -3119,10 +3321,10 @@ DEBUG: Creating MERGE repartition plan DEBUG: Using column - index:0 from the source list to redistribute DEBUG: Collect source query results on coordinator DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: -DEBUG: -DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: DEBUG: Execute MERGE task list RESET client_min_messages; SELECT * FROM target_6785 ORDER BY 1; @@ -3240,7 +3442,7 @@ USING s1 s ON t.id = s.id WHEN NOT MATCHED THEN INSERT (id) VALUES(s.val); -ERROR: MERGE INSERT must use the source table distribution column value +ERROR: MERGE INSERT must use the source's joining column for target's distribution column MERGE INTO t1 t USING s1 s ON t.id = s.id @@ -3966,7 +4168,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_ PL/pgSQL function citus_drop_trigger() line XX at PERFORM DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 103 other objects +NOTICE: drop cascades to 106 other objects DETAIL: drop cascades to function insert_data() drop cascades to table local_local drop cascades to table target @@ -4026,6 +4228,9 @@ drop cascades to table pg_source drop cascades to table citus_target drop cascades to table citus_source drop cascades to function compare_tables() +drop cascades to table source_pushdowntest +drop cascades to table target_pushdowntest +drop cascades to table source_withdata drop cascades to view pg_source_view drop cascades to view citus_source_view drop cascades to table pg_pa_target @@ -4042,7 +4247,7 @@ drop cascades to table target_set drop cascades to table source_set drop cascades to table refsource_ref drop cascades to table pg_result -drop cascades to table refsource_ref_4000112 +drop cascades to table refsource_ref_4000124 drop cascades to table pg_ref drop cascades to table local_ref drop cascades to table reftarget_local @@ -4060,11 +4265,8 @@ drop cascades to table source_6785 drop cascades to table target_6785 drop cascades to function add_s(integer,integer) drop cascades to table pg -drop cascades to table t1_4000174 -drop cascades to table s1_4000175 +drop cascades to table t1_4000186 +drop cascades to table s1_4000187 drop cascades to table t1 drop cascades to table s1 -drop cascades to table dist_target -drop cascades to table dist_source -drop cascades to view show_tables -and 3 other objects (see server log for list) +and 6 other objects (see server log for list) diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index a41e80841..61bb06cea 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1206,6 +1206,76 @@ SET citus.log_remote_commands to false; SELECT compare_tables(); ROLLBACK; + +-- let's create source and target table +CREATE TABLE source_pushdowntest (id integer); +CREATE TABLE target_pushdowntest (id integer ); + +-- let's distribute both table on id field +SELECT create_distributed_table('source_pushdowntest', 'id'); +SELECT create_distributed_table('target_pushdowntest', 'id'); + +-- we are doing this operation on single node setup let's figure out colocation id of both tables +-- both has same colocation id so both are colocated. +select colocationid,logicalrelid from pg_dist_partition where logicalrelid = 'source_pushdowntest'::regclass OR logicalrelid = 'target_pushdowntest'::regclass; + +SET client_min_messages TO DEBUG1; +-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. + +EXPLAIN MERGE INTO target_pushdowntest t +USING source_pushdowntest s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); + +-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. + +EXPLAIN MERGE INTO target_pushdowntest t +USING (SELECT * from source_pushdowntest where id = 1) s +on t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); + + +-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown. +INSERT INTO source_pushdowntest (id) VALUES (3); + +EXPLAIN MERGE INTO target_pushdowntest t +USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s +on t.id = s.somekey +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.somekey); + + +-- let's verify if we use some other column from source for value of distributed column in target. +-- it should be inserted to correct shard of target. +CREATE TABLE source_withdata (id integer, some_number integer); +SELECT create_distributed_table('source_withdata', 'id'); + +INSERT INTO source_withdata (id, some_number) VALUES (1, 3); + +-- we will use some_number column from source_withdata to insert into distributed column of target. +-- value of some_number is 3 let's verify what shard it should go to. +select worker_hash(3); + +-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard. +MERGE INTO target_pushdowntest t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.some_number); + +-- let's verify if data inserted to second shard of target. +EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_pushdowntest; + +RESET client_min_messages; + + + -- This will prune shards with restriction information as NOT MATCHED is void BEGIN; SET citus.log_remote_commands to true; From ec25b433d402bea25264c733ec9f64831b39d51b Mon Sep 17 00:00:00 2001 From: paragjain Date: Mon, 10 Jun 2024 06:58:36 +0000 Subject: [PATCH 03/13] adding update and delete tests --- src/test/regress/expected/merge.out | 128 ++++++++++++++++++++++------ src/test/regress/sql/merge.sql | 43 +++++++++- 2 files changed, 140 insertions(+), 31 deletions(-) diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 42ded746e..0fef8342f 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -1984,12 +1984,19 @@ DEBUG: Using column - index:0 from the source list to redistribute -- let's verify if we use some other column from source for value of distributed column in target. -- it should be inserted to correct shard of target. CREATE TABLE source_withdata (id integer, some_number integer); +CREATE TABLE target_table (id integer, name text); SELECT create_distributed_table('source_withdata', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) +SELECT create_distributed_table('target_table', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + INSERT INTO source_withdata (id, some_number) VALUES (1, 3); -- we will use some_number column from source_withdata to insert into distributed column of target. -- value of some_number is 3 let's verify what shard it should go to. @@ -2000,49 +2007,116 @@ select worker_hash(3); (1 row) -- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard. -MERGE INTO target_pushdowntest t +MERGE INTO target_table t USING (SELECT id, some_number from source_withdata where id = 1) s on t.id = s.some_number WHEN NOT MATCHED THEN - INSERT (id) - VALUES (s.some_number); + INSERT (id, name) + VALUES (s.some_number, 'parag'); DEBUG: Sub-query is not pushable, try repartitioning DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Creating MERGE repartition plan DEBUG: Using column - index:1 from the source list to redistribute DEBUG: Collect source query results on coordinator DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: -DEBUG: -DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: DEBUG: Execute MERGE task list -- let's verify if data inserted to second shard of target. -EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_pushdowntest; - QUERY PLAN +EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table; + QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 4 - Tuple data received from nodes: 4 bytes + Tuple data received from nodes: 9 bytes Tasks Shown: All -> Task Tuple data received from node: 0 bytes Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on target_pushdowntest_4000068 target_pushdowntest (actual rows=0 loops=1) + -> Seq Scan on target_table_4000076 target_table (actual rows=0 loops=1) -> Task - Tuple data received from node: 4 bytes + Tuple data received from node: 9 bytes Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on target_pushdowntest_4000069 target_pushdowntest (actual rows=1 loops=1) + -> Seq Scan on target_table_4000077 target_table (actual rows=1 loops=1) -> Task Tuple data received from node: 0 bytes Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on target_pushdowntest_4000070 target_pushdowntest (actual rows=0 loops=1) + -> Seq Scan on target_table_4000078 target_table (actual rows=0 loops=1) -> Task Tuple data received from node: 0 bytes Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on target_pushdowntest_4000071 target_pushdowntest (actual rows=0 loops=1) + -> Seq Scan on target_table_4000079 target_table (actual rows=0 loops=1) (20 rows) +-- let's verify target data too. +SELECT * FROM target_table; + id | name +--------------------------------------------------------------------- + 3 | parag +(1 row) + +-- test UPDATE : when source is single sharded and table are colocated +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + UPDATE SET name = 'parag jain'; +DEBUG: Sub-query is not pushable, try repartitioning +DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:1 from the source list to redistribute +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: Execute MERGE task list +-- let's verify if data updated properly. +SELECT * FROM target_table; + id | name +--------------------------------------------------------------------- + 3 | parag jain +(1 row) + +-- let's see what happend when we try to update distributed key of target table +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + UPDATE SET id = 1500; +ERROR: updating the distribution column is not allowed in MERGE actions +SELECT * FROM target_table; + id | name +--------------------------------------------------------------------- + 3 | parag jain +(1 row) + +-- test DELETE : when source is single sharded and table are colocated +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + DELETE; +DEBUG: Sub-query is not pushable, try repartitioning +DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:1 from the source list to redistribute +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: Execute MERGE task list +-- let's verify if data deleted properly. +SELECT * FROM target_table; + id | name +--------------------------------------------------------------------- +(0 rows) + RESET client_min_messages; -- This will prune shards with restriction information as NOT MATCHED is void BEGIN; @@ -3100,14 +3174,14 @@ WHEN NOT MATCHED THEN -> Limit -> Sort Sort Key: id2 - -> Seq Scan on demo_source_table_4000147 demo_source_table + -> Seq Scan on demo_source_table_4000151 demo_source_table -> Distributed Subplan XXX_2 -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on demo_source_table_4000147 demo_source_table + -> Seq Scan on demo_source_table_4000151 demo_source_table Task Count: 1 Tasks Shown: All -> Task @@ -3321,10 +3395,10 @@ DEBUG: Creating MERGE repartition plan DEBUG: Using column - index:0 from the source list to redistribute DEBUG: Collect source query results on coordinator DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: -DEBUG: -DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: DEBUG: Execute MERGE task list RESET client_min_messages; SELECT * FROM target_6785 ORDER BY 1; @@ -4168,7 +4242,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_ PL/pgSQL function citus_drop_trigger() line XX at PERFORM DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 106 other objects +NOTICE: drop cascades to 107 other objects DETAIL: drop cascades to function insert_data() drop cascades to table local_local drop cascades to table target @@ -4231,6 +4305,7 @@ drop cascades to function compare_tables() drop cascades to table source_pushdowntest drop cascades to table target_pushdowntest drop cascades to table source_withdata +drop cascades to table target_table drop cascades to view pg_source_view drop cascades to view citus_source_view drop cascades to table pg_pa_target @@ -4247,7 +4322,7 @@ drop cascades to table target_set drop cascades to table source_set drop cascades to table refsource_ref drop cascades to table pg_result -drop cascades to table refsource_ref_4000124 +drop cascades to table refsource_ref_4000128 drop cascades to table pg_ref drop cascades to table local_ref drop cascades to table reftarget_local @@ -4265,8 +4340,7 @@ drop cascades to table source_6785 drop cascades to table target_6785 drop cascades to function add_s(integer,integer) drop cascades to table pg -drop cascades to table t1_4000186 -drop cascades to table s1_4000187 +drop cascades to table t1_4000190 +drop cascades to table s1_4000191 drop cascades to table t1 -drop cascades to table s1 -and 6 other objects (see server log for list) +and 7 other objects (see server log for list) diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 61bb06cea..c2206cf52 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1253,7 +1253,9 @@ WHEN NOT MATCHED THEN -- let's verify if we use some other column from source for value of distributed column in target. -- it should be inserted to correct shard of target. CREATE TABLE source_withdata (id integer, some_number integer); +CREATE TABLE target_table (id integer, name text); SELECT create_distributed_table('source_withdata', 'id'); +SELECT create_distributed_table('target_table', 'id'); INSERT INTO source_withdata (id, some_number) VALUES (1, 3); @@ -1262,15 +1264,48 @@ INSERT INTO source_withdata (id, some_number) VALUES (1, 3); select worker_hash(3); -- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard. -MERGE INTO target_pushdowntest t +MERGE INTO target_table t USING (SELECT id, some_number from source_withdata where id = 1) s on t.id = s.some_number WHEN NOT MATCHED THEN - INSERT (id) - VALUES (s.some_number); + INSERT (id, name) + VALUES (s.some_number, 'parag'); -- let's verify if data inserted to second shard of target. -EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_pushdowntest; +EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table; + +-- let's verify target data too. +SELECT * FROM target_table; + + +-- test UPDATE : when source is single sharded and table are colocated +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + UPDATE SET name = 'parag jain'; + +-- let's verify if data updated properly. +SELECT * FROM target_table; + +-- let's see what happend when we try to update distributed key of target table +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + UPDATE SET id = 1500; + +SELECT * FROM target_table; + +-- test DELETE : when source is single sharded and table are colocated +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + DELETE; + +-- let's verify if data deleted properly. +SELECT * FROM target_table; RESET client_min_messages; From 493140287a704e8a5faca937134330451bae29b9 Mon Sep 17 00:00:00 2001 From: paragjain Date: Thu, 13 Jun 2024 11:50:30 +0000 Subject: [PATCH 04/13] fix some indent --- src/test/regress/expected/merge.out | 5 +++-- src/test/regress/sql/merge.sql | 17 +++++++++-------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 0fef8342f..188ed897e 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -27,6 +27,7 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); 1 (1 row) +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000; RESET client_min_messages; CREATE TABLE source ( @@ -1863,8 +1864,8 @@ SELECT create_distributed_table('target_pushdowntest', 'id'); select colocationid,logicalrelid from pg_dist_partition where logicalrelid = 'source_pushdowntest'::regclass OR logicalrelid = 'target_pushdowntest'::regclass; colocationid | logicalrelid --------------------------------------------------------------------- - 1390004 | source_pushdowntest - 1390004 | target_pushdowntest + 13000 | source_pushdowntest + 13000 | target_pushdowntest (2 rows) SET client_min_messages TO DEBUG1; diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index c2206cf52..cc7f0d9a6 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -23,6 +23,7 @@ SET citus.shard_replication_factor TO 1; SET citus.max_adaptive_executor_pool_size TO 1; SET client_min_messages = warning; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000; RESET client_min_messages; CREATE TABLE source @@ -1224,7 +1225,7 @@ SET client_min_messages TO DEBUG1; EXPLAIN MERGE INTO target_pushdowntest t USING source_pushdowntest s -ON t.id = s.id +ON t.id = s.id WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id); @@ -1233,7 +1234,7 @@ WHEN NOT MATCHED THEN EXPLAIN MERGE INTO target_pushdowntest t USING (SELECT * from source_pushdowntest where id = 1) s -on t.id = s.id +on t.id = s.id WHEN NOT MATCHED THEN INSERT (id) VALUES (s.id); @@ -1244,7 +1245,7 @@ INSERT INTO source_pushdowntest (id) VALUES (3); EXPLAIN MERGE INTO target_pushdowntest t USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s -on t.id = s.somekey +on t.id = s.somekey WHEN NOT MATCHED THEN INSERT (id) VALUES (s.somekey); @@ -1266,7 +1267,7 @@ select worker_hash(3); -- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard. MERGE INTO target_table t USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number +on t.id = s.some_number WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.some_number, 'parag'); @@ -1281,7 +1282,7 @@ SELECT * FROM target_table; -- test UPDATE : when source is single sharded and table are colocated MERGE INTO target_table t USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number +on t.id = s.some_number WHEN MATCHED THEN UPDATE SET name = 'parag jain'; @@ -1291,7 +1292,7 @@ SELECT * FROM target_table; -- let's see what happend when we try to update distributed key of target table MERGE INTO target_table t USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number +on t.id = s.some_number WHEN MATCHED THEN UPDATE SET id = 1500; @@ -1300,14 +1301,14 @@ SELECT * FROM target_table; -- test DELETE : when source is single sharded and table are colocated MERGE INTO target_table t USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number +on t.id = s.some_number WHEN MATCHED THEN DELETE; -- let's verify if data deleted properly. SELECT * FROM target_table; -RESET client_min_messages; +RESET client_min_messages; From 06e9c299502af5163fa848a89766fe50cb13371c Mon Sep 17 00:00:00 2001 From: paragjain Date: Fri, 14 Jun 2024 04:35:58 +0000 Subject: [PATCH 05/13] some more --- src/backend/distributed/planner/merge_planner.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index b0ebe774b..09d2d90ac 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -1123,6 +1123,7 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, * However, before proceeding with this delegation, we need to confirm that the user is utilizing * the distribution column of the source table in the Insert variable. * If this is not the case, we should refrain from pushing down the query. + * This is just a deffered error which will be handle by caller. */ Var *insertVar = @@ -1131,10 +1132,10 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, !IsDistributionColumnInMergeSource((Expr *) insertVar, query, true)) { ereport(DEBUG1, (errmsg( - "MERGE INSERT must use the source table distribution column value, try repartitioning"))); + "MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied"))); return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE INSERT must use the source table " - "distribution column value", NULL, NULL); + "MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied", + NULL, NULL); } return NULL; } From 7c8a366ba2cf18e7ab8cb273e3716cb94b3c6376 Mon Sep 17 00:00:00 2001 From: paragjain Date: Fri, 14 Jun 2024 04:39:17 +0000 Subject: [PATCH 06/13] some more --- src/test/regress/expected/merge.out | 21 +++++++++++++++++++-- src/test/regress/multi_schedule | 3 ++- src/test/regress/sql/merge.sql | 14 ++++++++++++++ 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 188ed897e..1a7e58f36 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -1965,8 +1965,8 @@ on t.id = s.somekey WHEN NOT MATCHED THEN INSERT (id) VALUES (s.somekey); -DEBUG: MERGE INSERT must use the source table distribution column value, try repartitioning -DEBUG: MERGE INSERT must use the source table distribution column value +DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied +DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied DEBUG: Creating MERGE repartition plan DEBUG: Using column - index:0 from the source list to redistribute QUERY PLAN @@ -2118,6 +2118,23 @@ SELECT * FROM target_table; --------------------------------------------------------------------- (0 rows) +-- +DELETE FROM source_withdata; +DELETE FROM target_table; +INSERT INTO source VALUES (1,1); +merge into target_table sda +using source_withdata sdn +on sda.id = sdn.id AND sda.id = 1 +when not matched then + insert (id) + values (10000); +ERROR: MERGE INSERT is using unsupported expression type for distribution column +DETAIL: Inserting arbitrary values that don't correspond to the joined column values can lead to unpredictable outcomes where rows are incorrectly distributed among different shards +SELECT * FROM target_table WHERE id = 10000; + id | name +--------------------------------------------------------------------- +(0 rows) + RESET client_min_messages; -- This will prune shards with restriction information as NOT MATCHED is void BEGIN; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index fca36f5ab..7f0c7ca57 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -116,7 +116,8 @@ test: function_with_case_when test: clock # MERGE tests -test: merge pgmerge merge_repartition2 +test: merge pgmerge +test: merge_repartition2 test: merge_repartition1 merge_schema_sharding test: merge_partition_tables diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index cc7f0d9a6..4538d8d81 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1308,6 +1308,20 @@ WHEN MATCHED THEN -- let's verify if data deleted properly. SELECT * FROM target_table; +-- +DELETE FROM source_withdata; +DELETE FROM target_table; +INSERT INTO source VALUES (1,1); + +merge into target_table sda +using source_withdata sdn +on sda.id = sdn.id AND sda.id = 1 +when not matched then + insert (id) + values (10000); + +SELECT * FROM target_table WHERE id = 10000; + RESET client_min_messages; From f883cfdd779a84fa930857795624aa6b4fa5c04f Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Fri, 14 Jun 2024 14:15:04 +0200 Subject: [PATCH 07/13] Try to fix failure --- src/test/regress/expected/multi_multiuser_auth.out | 2 +- src/test/regress/sql/multi_multiuser_auth.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/regress/expected/multi_multiuser_auth.out b/src/test/regress/expected/multi_multiuser_auth.out index 6b0e85b67..05310ce7b 100644 --- a/src/test/regress/expected/multi_multiuser_auth.out +++ b/src/test/regress/expected/multi_multiuser_auth.out @@ -71,7 +71,7 @@ SELECT format('user=%s host=localhost port=%s password=%s dbname=regression', GRANT ALL ON TABLE lineitem, orders, lineitem, customer, nation, part, supplier TO alice, bob; \c :alice_conninfo -- router query (should break because of bad password) -INSERT INTO customer VALUES (12345, 'name', NULL, 5, 'phone', 123.45, 'segment', 'comment'); +INSERT INTO customer VALUES (12345, 'name', 'aaaa', 5, 'phone', 123.45, 'segment', 'comment'); ERROR: connection to the remote node alice@localhost:xxxxx failed with the following error: FATAL: password authentication failed for user "alice" -- fix alice's worker1 password ... UPDATE pg_dist_authinfo diff --git a/src/test/regress/sql/multi_multiuser_auth.sql b/src/test/regress/sql/multi_multiuser_auth.sql index 1cd566b50..80fdebcef 100644 --- a/src/test/regress/sql/multi_multiuser_auth.sql +++ b/src/test/regress/sql/multi_multiuser_auth.sql @@ -75,7 +75,7 @@ GRANT ALL ON TABLE lineitem, orders, lineitem, customer, nation, part, supplier \c :alice_conninfo -- router query (should break because of bad password) -INSERT INTO customer VALUES (12345, 'name', NULL, 5, 'phone', 123.45, 'segment', 'comment'); +INSERT INTO customer VALUES (12345, 'name', 'aaaa', 5, 'phone', 123.45, 'segment', 'comment'); -- fix alice's worker1 password ... UPDATE pg_dist_authinfo From d5231c34ab36c5e1377e231719962e1f7533f0b9 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Fri, 14 Jun 2024 14:20:03 +0200 Subject: [PATCH 08/13] Revert "Try to fix failure" This reverts commit 89f721766059d99e6b406e875e79af038930fb1f. --- src/test/regress/expected/multi_multiuser_auth.out | 2 +- src/test/regress/sql/multi_multiuser_auth.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/regress/expected/multi_multiuser_auth.out b/src/test/regress/expected/multi_multiuser_auth.out index 05310ce7b..6b0e85b67 100644 --- a/src/test/regress/expected/multi_multiuser_auth.out +++ b/src/test/regress/expected/multi_multiuser_auth.out @@ -71,7 +71,7 @@ SELECT format('user=%s host=localhost port=%s password=%s dbname=regression', GRANT ALL ON TABLE lineitem, orders, lineitem, customer, nation, part, supplier TO alice, bob; \c :alice_conninfo -- router query (should break because of bad password) -INSERT INTO customer VALUES (12345, 'name', 'aaaa', 5, 'phone', 123.45, 'segment', 'comment'); +INSERT INTO customer VALUES (12345, 'name', NULL, 5, 'phone', 123.45, 'segment', 'comment'); ERROR: connection to the remote node alice@localhost:xxxxx failed with the following error: FATAL: password authentication failed for user "alice" -- fix alice's worker1 password ... UPDATE pg_dist_authinfo diff --git a/src/test/regress/sql/multi_multiuser_auth.sql b/src/test/regress/sql/multi_multiuser_auth.sql index 80fdebcef..1cd566b50 100644 --- a/src/test/regress/sql/multi_multiuser_auth.sql +++ b/src/test/regress/sql/multi_multiuser_auth.sql @@ -75,7 +75,7 @@ GRANT ALL ON TABLE lineitem, orders, lineitem, customer, nation, part, supplier \c :alice_conninfo -- router query (should break because of bad password) -INSERT INTO customer VALUES (12345, 'name', 'aaaa', 5, 'phone', 123.45, 'segment', 'comment'); +INSERT INTO customer VALUES (12345, 'name', NULL, 5, 'phone', 123.45, 'segment', 'comment'); -- fix alice's worker1 password ... UPDATE pg_dist_authinfo From 76f68f47c4ee242d8751c06585c537c1eb15b42f Mon Sep 17 00:00:00 2001 From: paragjain Date: Fri, 14 Jun 2024 16:15:52 +0000 Subject: [PATCH 09/13] removing flakyness from test --- src/test/regress/expected/merge.out | 83 +++++++++++++++-------------- src/test/regress/sql/merge.sql | 9 ++-- 2 files changed, 49 insertions(+), 43 deletions(-) diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 1a7e58f36..47f122072 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -1870,7 +1870,8 @@ select colocationid,logicalrelid from pg_dist_partition where logicalrelid = 'so SET client_min_messages TO DEBUG1; -- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. -EXPLAIN MERGE INTO target_pushdowntest t +EXPLAIN (costs off, timing off, summary off) +MERGE INTO target_pushdowntest t USING source_pushdowntest s ON t.id = s.id WHEN NOT MATCHED THEN @@ -1881,59 +1882,60 @@ DEBUG: DEBUG: DEBUG: Creating MERGE router plan - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression - -> Merge on target_pushdowntest_4000068 t (cost=359.57..860.00 rows=0 width=0) - -> Merge Left Join (cost=359.57..860.00 rows=32512 width=16) + -> Merge on target_pushdowntest_4000068 t + -> Merge Left Join Merge Cond: (s.id = t.id) - -> Sort (cost=179.78..186.16 rows=2550 width=10) + -> Sort Sort Key: s.id - -> Seq Scan on source_pushdowntest_4000064 s (cost=0.00..35.50 rows=2550 width=10) - -> Sort (cost=179.78..186.16 rows=2550 width=10) + -> Seq Scan on source_pushdowntest_4000064 s + -> Sort Sort Key: t.id - -> Seq Scan on target_pushdowntest_4000068 t (cost=0.00..35.50 rows=2550 width=10) + -> Seq Scan on target_pushdowntest_4000068 t -> Task Node: host=localhost port=xxxxx dbname=regression - -> Merge on target_pushdowntest_4000069 t (cost=359.57..860.00 rows=0 width=0) - -> Merge Left Join (cost=359.57..860.00 rows=32512 width=16) + -> Merge on target_pushdowntest_4000069 t + -> Merge Left Join Merge Cond: (s.id = t.id) - -> Sort (cost=179.78..186.16 rows=2550 width=10) + -> Sort Sort Key: s.id - -> Seq Scan on source_pushdowntest_4000065 s (cost=0.00..35.50 rows=2550 width=10) - -> Sort (cost=179.78..186.16 rows=2550 width=10) + -> Seq Scan on source_pushdowntest_4000065 s + -> Sort Sort Key: t.id - -> Seq Scan on target_pushdowntest_4000069 t (cost=0.00..35.50 rows=2550 width=10) + -> Seq Scan on target_pushdowntest_4000069 t -> Task Node: host=localhost port=xxxxx dbname=regression - -> Merge on target_pushdowntest_4000070 t (cost=359.57..860.00 rows=0 width=0) - -> Merge Left Join (cost=359.57..860.00 rows=32512 width=16) + -> Merge on target_pushdowntest_4000070 t + -> Merge Left Join Merge Cond: (s.id = t.id) - -> Sort (cost=179.78..186.16 rows=2550 width=10) + -> Sort Sort Key: s.id - -> Seq Scan on source_pushdowntest_4000066 s (cost=0.00..35.50 rows=2550 width=10) - -> Sort (cost=179.78..186.16 rows=2550 width=10) + -> Seq Scan on source_pushdowntest_4000066 s + -> Sort Sort Key: t.id - -> Seq Scan on target_pushdowntest_4000070 t (cost=0.00..35.50 rows=2550 width=10) + -> Seq Scan on target_pushdowntest_4000070 t -> Task Node: host=localhost port=xxxxx dbname=regression - -> Merge on target_pushdowntest_4000071 t (cost=359.57..860.00 rows=0 width=0) - -> Merge Left Join (cost=359.57..860.00 rows=32512 width=16) + -> Merge on target_pushdowntest_4000071 t + -> Merge Left Join Merge Cond: (s.id = t.id) - -> Sort (cost=179.78..186.16 rows=2550 width=10) + -> Sort Sort Key: s.id - -> Seq Scan on source_pushdowntest_4000067 s (cost=0.00..35.50 rows=2550 width=10) - -> Sort (cost=179.78..186.16 rows=2550 width=10) + -> Seq Scan on source_pushdowntest_4000067 s + -> Sort Sort Key: t.id - -> Seq Scan on target_pushdowntest_4000071 t (cost=0.00..35.50 rows=2550 width=10) + -> Seq Scan on target_pushdowntest_4000071 t (47 rows) -- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. -EXPLAIN MERGE INTO target_pushdowntest t +EXPLAIN (costs off, timing off, summary off) +MERGE INTO target_pushdowntest t USING (SELECT * from source_pushdowntest where id = 1) s on t.id = s.id WHEN NOT MATCHED THEN @@ -1941,25 +1943,26 @@ WHEN NOT MATCHED THEN VALUES (s.id); DEBUG: DEBUG: Creating MERGE router plan - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Custom Scan (Citus Adaptive) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression - -> Merge on target_pushdowntest_4000068 t (cost=0.00..85.89 rows=0 width=0) - -> Nested Loop Left Join (cost=0.00..85.89 rows=169 width=16) - -> Seq Scan on source_pushdowntest_4000064 source_pushdowntest (cost=0.00..41.88 rows=13 width=10) + -> Merge on target_pushdowntest_4000068 t + -> Nested Loop Left Join + -> Seq Scan on source_pushdowntest_4000064 source_pushdowntest Filter: (id = 1) - -> Materialize (cost=0.00..41.94 rows=13 width=10) - -> Seq Scan on target_pushdowntest_4000068 t (cost=0.00..41.88 rows=13 width=10) + -> Materialize + -> Seq Scan on target_pushdowntest_4000068 t Filter: (id = 1) (12 rows) -- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown. INSERT INTO source_pushdowntest (id) VALUES (3); -EXPLAIN MERGE INTO target_pushdowntest t +EXPLAIN (costs off, timing off, summary off) +MERGE INTO target_pushdowntest t USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s on t.id = s.somekey WHEN NOT MATCHED THEN @@ -1969,16 +1972,16 @@ DEBUG: MERGE INSERT must use the source table distribution column value for pus DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied DEBUG: Creating MERGE repartition plan DEBUG: Using column - index:0 from the source list to redistribute - QUERY PLAN + QUERY PLAN --------------------------------------------------------------------- - Custom Scan (Citus MERGE INTO ...) (cost=0.00..0.00 rows=0 width=0) + Custom Scan (Citus MERGE INTO ...) MERGE INTO target_pushdowntest method: pull to coordinator - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Adaptive) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on source_pushdowntest_4000064 source_pushdowntest (cost=0.00..41.88 rows=13 width=8) + -> Seq Scan on source_pushdowntest_4000064 source_pushdowntest Filter: (id = 1) (9 rows) diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 4538d8d81..7b76957a1 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1223,7 +1223,8 @@ select colocationid,logicalrelid from pg_dist_partition where logicalrelid = 'so SET client_min_messages TO DEBUG1; -- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. -EXPLAIN MERGE INTO target_pushdowntest t +EXPLAIN (costs off, timing off, summary off) +MERGE INTO target_pushdowntest t USING source_pushdowntest s ON t.id = s.id WHEN NOT MATCHED THEN @@ -1232,7 +1233,8 @@ WHEN NOT MATCHED THEN -- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. -EXPLAIN MERGE INTO target_pushdowntest t +EXPLAIN (costs off, timing off, summary off) +MERGE INTO target_pushdowntest t USING (SELECT * from source_pushdowntest where id = 1) s on t.id = s.id WHEN NOT MATCHED THEN @@ -1243,7 +1245,8 @@ WHEN NOT MATCHED THEN -- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown. INSERT INTO source_pushdowntest (id) VALUES (3); -EXPLAIN MERGE INTO target_pushdowntest t +EXPLAIN (costs off, timing off, summary off) +MERGE INTO target_pushdowntest t USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s on t.id = s.somekey WHEN NOT MATCHED THEN From e62ae64d004f7b1514cc3e1320878df5df76fc2b Mon Sep 17 00:00:00 2001 From: paragjain Date: Fri, 14 Jun 2024 17:02:13 +0000 Subject: [PATCH 10/13] some more --- src/test/regress/expected/merge.out | 22 ++++++++++++++++------ src/test/regress/sql/merge.sql | 16 ++++++++++++++-- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 47f122072..5f2bb0522 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -27,7 +27,6 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); 1 (1 row) -ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000; RESET client_min_messages; CREATE TABLE source ( @@ -1844,6 +1843,7 @@ SELECT compare_tables(); ROLLBACK; -- let's create source and target table +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000; CREATE TABLE source_pushdowntest (id integer); CREATE TABLE target_pushdowntest (id integer ); -- let's distribute both table on id field @@ -1861,12 +1861,22 @@ SELECT create_distributed_table('target_pushdowntest', 'id'); -- we are doing this operation on single node setup let's figure out colocation id of both tables -- both has same colocation id so both are colocated. -select colocationid,logicalrelid from pg_dist_partition where logicalrelid = 'source_pushdowntest'::regclass OR logicalrelid = 'target_pushdowntest'::regclass; - colocationid | logicalrelid +WITH colocations AS ( + SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'source_pushdowntest'::regclass + OR logicalrelid = 'target_pushdowntest'::regclass +) +SELECT + CASE + WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same' + ELSE 'Different' + END AS colocation_status +FROM colocations; + colocation_status --------------------------------------------------------------------- - 13000 | source_pushdowntest - 13000 | target_pushdowntest -(2 rows) + Same +(1 row) SET client_min_messages TO DEBUG1; -- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 7b76957a1..583925d8a 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -23,7 +23,6 @@ SET citus.shard_replication_factor TO 1; SET citus.max_adaptive_executor_pool_size TO 1; SET client_min_messages = warning; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); -ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000; RESET client_min_messages; CREATE TABLE source @@ -1209,6 +1208,7 @@ ROLLBACK; -- let's create source and target table +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000; CREATE TABLE source_pushdowntest (id integer); CREATE TABLE target_pushdowntest (id integer ); @@ -1218,7 +1218,19 @@ SELECT create_distributed_table('target_pushdowntest', 'id'); -- we are doing this operation on single node setup let's figure out colocation id of both tables -- both has same colocation id so both are colocated. -select colocationid,logicalrelid from pg_dist_partition where logicalrelid = 'source_pushdowntest'::regclass OR logicalrelid = 'target_pushdowntest'::regclass; +WITH colocations AS ( + SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'source_pushdowntest'::regclass + OR logicalrelid = 'target_pushdowntest'::regclass +) +SELECT + CASE + WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same' + ELSE 'Different' + END AS colocation_status +FROM colocations; + SET client_min_messages TO DEBUG1; -- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. From 9e71750fcd83c0d392c4b42bc9fb55ef4367b346 Mon Sep 17 00:00:00 2001 From: paragjain Date: Fri, 14 Jun 2024 18:06:21 +0000 Subject: [PATCH 11/13] fixing flakyness in test --- src/test/regress/expected/merge.out | 18 +----------------- src/test/regress/sql/merge.sql | 6 ++---- 2 files changed, 3 insertions(+), 21 deletions(-) diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 5f2bb0522..5056ba543 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -1944,7 +1944,7 @@ DEBUG: Creating MERGE router plan (47 rows) -- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. -EXPLAIN (costs off, timing off, summary off) +-- DEBUG LOGS show that query is getting pushed down MERGE INTO target_pushdowntest t USING (SELECT * from source_pushdowntest where id = 1) s on t.id = s.id @@ -1953,22 +1953,6 @@ WHEN NOT MATCHED THEN VALUES (s.id); DEBUG: DEBUG: Creating MERGE router plan - QUERY PLAN ---------------------------------------------------------------------- - Custom Scan (Citus Adaptive) - Task Count: 1 - Tasks Shown: All - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> Merge on target_pushdowntest_4000068 t - -> Nested Loop Left Join - -> Seq Scan on source_pushdowntest_4000064 source_pushdowntest - Filter: (id = 1) - -> Materialize - -> Seq Scan on target_pushdowntest_4000068 t - Filter: (id = 1) -(12 rows) - -- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown. INSERT INTO source_pushdowntest (id) VALUES (3); EXPLAIN (costs off, timing off, summary off) diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 583925d8a..5316b5233 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1224,14 +1224,13 @@ WITH colocations AS ( WHERE logicalrelid = 'source_pushdowntest'::regclass OR logicalrelid = 'target_pushdowntest'::regclass ) -SELECT +SELECT CASE WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same' ELSE 'Different' END AS colocation_status FROM colocations; - SET client_min_messages TO DEBUG1; -- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. @@ -1244,8 +1243,7 @@ WHEN NOT MATCHED THEN VALUES (s.id); -- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. - -EXPLAIN (costs off, timing off, summary off) +-- DEBUG LOGS show that query is getting pushed down MERGE INTO target_pushdowntest t USING (SELECT * from source_pushdowntest where id = 1) s on t.id = s.id From fa4fc0b372e4068e069946e3fdf454137736bcc7 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Mon, 17 Jun 2024 15:46:00 +0200 Subject: [PATCH 12/13] Revert rebase merge of #7620 (#7626) Because we want to track PR numbers and to make backporting easy we (pretty much always) use squash-merges when merging to master. We accidentally used a rebase merge for PR #7620. This reverts those changes so we can redo the merge using squash merge. This reverts all commits from eedb607c to 9e71750fc. --- .../distributed/planner/merge_planner.c | 29 +- src/test/regress/expected/merge.out | 337 ++---------------- src/test/regress/multi_schedule | 3 +- src/test/regress/sql/merge.sql | 133 ------- 4 files changed, 32 insertions(+), 470 deletions(-) diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 09d2d90ac..4d64b8f56 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -182,6 +182,14 @@ CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery, Query *query, return distributedPlan; } + Var *insertVar = + FetchAndValidateInsertVarIfExists(targetRelationId, originalQuery); + if (insertVar && + !IsDistributionColumnInMergeSource((Expr *) insertVar, originalQuery, true)) + { + ereport(ERROR, (errmsg("MERGE INSERT must use the source table " + "distribution column value"))); + } Job *job = RouterJob(originalQuery, plannerRestrictionContext, &distributedPlan->planningError); @@ -1116,27 +1124,6 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, "repartitioning"))); return deferredError; } - - - /* - * If execution has reached this point, it indicates that the query can be delegated to the worker. - * However, before proceeding with this delegation, we need to confirm that the user is utilizing - * the distribution column of the source table in the Insert variable. - * If this is not the case, we should refrain from pushing down the query. - * This is just a deffered error which will be handle by caller. - */ - - Var *insertVar = - FetchAndValidateInsertVarIfExists(targetRelationId, query); - if (insertVar && - !IsDistributionColumnInMergeSource((Expr *) insertVar, query, true)) - { - ereport(DEBUG1, (errmsg( - "MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied"))); - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied", - NULL, NULL); - } return NULL; } diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 5056ba543..a73467e81 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -1128,7 +1128,7 @@ DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(rs_source.id); DEBUG: Creating MERGE router plan -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO rs_local FROM rs_target ORDER BY 1 ; -- Should be equal @@ -1259,7 +1259,7 @@ DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(fn_source.id, fn_source.source); DEBUG: Creating MERGE router plan -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO fn_local FROM fn_target ORDER BY 1 ; -- Should be equal @@ -1552,7 +1552,7 @@ BEGIN; SET citus.log_remote_commands to true; SET client_min_messages TO DEBUG1; EXECUTE merge_prepare(2); -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -1782,13 +1782,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT compare_tables(); @@ -1842,297 +1842,6 @@ SELECT compare_tables(); (1 row) ROLLBACK; --- let's create source and target table -ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000; -CREATE TABLE source_pushdowntest (id integer); -CREATE TABLE target_pushdowntest (id integer ); --- let's distribute both table on id field -SELECT create_distributed_table('source_pushdowntest', 'id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('target_pushdowntest', 'id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - --- we are doing this operation on single node setup let's figure out colocation id of both tables --- both has same colocation id so both are colocated. -WITH colocations AS ( - SELECT colocationid - FROM pg_dist_partition - WHERE logicalrelid = 'source_pushdowntest'::regclass - OR logicalrelid = 'target_pushdowntest'::regclass -) -SELECT - CASE - WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same' - ELSE 'Different' - END AS colocation_status -FROM colocations; - colocation_status ---------------------------------------------------------------------- - Same -(1 row) - -SET client_min_messages TO DEBUG1; --- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. -EXPLAIN (costs off, timing off, summary off) -MERGE INTO target_pushdowntest t -USING source_pushdowntest s -ON t.id = s.id -WHEN NOT MATCHED THEN - INSERT (id) - VALUES (s.id); -DEBUG: -DEBUG: -DEBUG: -DEBUG: -DEBUG: Creating MERGE router plan - QUERY PLAN ---------------------------------------------------------------------- - Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: All - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> Merge on target_pushdowntest_4000068 t - -> Merge Left Join - Merge Cond: (s.id = t.id) - -> Sort - Sort Key: s.id - -> Seq Scan on source_pushdowntest_4000064 s - -> Sort - Sort Key: t.id - -> Seq Scan on target_pushdowntest_4000068 t - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> Merge on target_pushdowntest_4000069 t - -> Merge Left Join - Merge Cond: (s.id = t.id) - -> Sort - Sort Key: s.id - -> Seq Scan on source_pushdowntest_4000065 s - -> Sort - Sort Key: t.id - -> Seq Scan on target_pushdowntest_4000069 t - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> Merge on target_pushdowntest_4000070 t - -> Merge Left Join - Merge Cond: (s.id = t.id) - -> Sort - Sort Key: s.id - -> Seq Scan on source_pushdowntest_4000066 s - -> Sort - Sort Key: t.id - -> Seq Scan on target_pushdowntest_4000070 t - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> Merge on target_pushdowntest_4000071 t - -> Merge Left Join - Merge Cond: (s.id = t.id) - -> Sort - Sort Key: s.id - -> Seq Scan on source_pushdowntest_4000067 s - -> Sort - Sort Key: t.id - -> Seq Scan on target_pushdowntest_4000071 t -(47 rows) - --- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. --- DEBUG LOGS show that query is getting pushed down -MERGE INTO target_pushdowntest t -USING (SELECT * from source_pushdowntest where id = 1) s -on t.id = s.id -WHEN NOT MATCHED THEN - INSERT (id) - VALUES (s.id); -DEBUG: -DEBUG: Creating MERGE router plan --- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown. -INSERT INTO source_pushdowntest (id) VALUES (3); -EXPLAIN (costs off, timing off, summary off) -MERGE INTO target_pushdowntest t -USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s -on t.id = s.somekey -WHEN NOT MATCHED THEN - INSERT (id) - VALUES (s.somekey); -DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied -DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied -DEBUG: Creating MERGE repartition plan -DEBUG: Using column - index:0 from the source list to redistribute - QUERY PLAN ---------------------------------------------------------------------- - Custom Scan (Citus MERGE INTO ...) - MERGE INTO target_pushdowntest method: pull to coordinator - -> Custom Scan (Citus Adaptive) - Task Count: 1 - Tasks Shown: All - -> Task - Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on source_pushdowntest_4000064 source_pushdowntest - Filter: (id = 1) -(9 rows) - --- let's verify if we use some other column from source for value of distributed column in target. --- it should be inserted to correct shard of target. -CREATE TABLE source_withdata (id integer, some_number integer); -CREATE TABLE target_table (id integer, name text); -SELECT create_distributed_table('source_withdata', 'id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('target_table', 'id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO source_withdata (id, some_number) VALUES (1, 3); --- we will use some_number column from source_withdata to insert into distributed column of target. --- value of some_number is 3 let's verify what shard it should go to. -select worker_hash(3); - worker_hash ---------------------------------------------------------------------- - -28094569 -(1 row) - --- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard. -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN NOT MATCHED THEN - INSERT (id, name) - VALUES (s.some_number, 'parag'); -DEBUG: Sub-query is not pushable, try repartitioning -DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -DEBUG: Creating MERGE repartition plan -DEBUG: Using column - index:1 from the source list to redistribute -DEBUG: Collect source query results on coordinator -DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: -DEBUG: -DEBUG: -DEBUG: Execute MERGE task list --- let's verify if data inserted to second shard of target. -EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table; - QUERY PLAN ---------------------------------------------------------------------- - Custom Scan (Citus Adaptive) (actual rows=1 loops=1) - Task Count: 4 - Tuple data received from nodes: 9 bytes - Tasks Shown: All - -> Task - Tuple data received from node: 0 bytes - Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on target_table_4000076 target_table (actual rows=0 loops=1) - -> Task - Tuple data received from node: 9 bytes - Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on target_table_4000077 target_table (actual rows=1 loops=1) - -> Task - Tuple data received from node: 0 bytes - Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on target_table_4000078 target_table (actual rows=0 loops=1) - -> Task - Tuple data received from node: 0 bytes - Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on target_table_4000079 target_table (actual rows=0 loops=1) -(20 rows) - --- let's verify target data too. -SELECT * FROM target_table; - id | name ---------------------------------------------------------------------- - 3 | parag -(1 row) - --- test UPDATE : when source is single sharded and table are colocated -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN MATCHED THEN - UPDATE SET name = 'parag jain'; -DEBUG: Sub-query is not pushable, try repartitioning -DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -DEBUG: Creating MERGE repartition plan -DEBUG: Using column - index:1 from the source list to redistribute -DEBUG: Collect source query results on coordinator -DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: -DEBUG: -DEBUG: -DEBUG: Execute MERGE task list --- let's verify if data updated properly. -SELECT * FROM target_table; - id | name ---------------------------------------------------------------------- - 3 | parag jain -(1 row) - --- let's see what happend when we try to update distributed key of target table -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN MATCHED THEN - UPDATE SET id = 1500; -ERROR: updating the distribution column is not allowed in MERGE actions -SELECT * FROM target_table; - id | name ---------------------------------------------------------------------- - 3 | parag jain -(1 row) - --- test DELETE : when source is single sharded and table are colocated -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN MATCHED THEN - DELETE; -DEBUG: Sub-query is not pushable, try repartitioning -DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -DEBUG: Creating MERGE repartition plan -DEBUG: Using column - index:1 from the source list to redistribute -DEBUG: Collect source query results on coordinator -DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: -DEBUG: -DEBUG: -DEBUG: Execute MERGE task list --- let's verify if data deleted properly. -SELECT * FROM target_table; - id | name ---------------------------------------------------------------------- -(0 rows) - --- -DELETE FROM source_withdata; -DELETE FROM target_table; -INSERT INTO source VALUES (1,1); -merge into target_table sda -using source_withdata sdn -on sda.id = sdn.id AND sda.id = 1 -when not matched then - insert (id) - values (10000); -ERROR: MERGE INSERT is using unsupported expression type for distribution column -DETAIL: Inserting arbitrary values that don't correspond to the joined column values can lead to unpredictable outcomes where rows are incorrectly distributed among different shards -SELECT * FROM target_table WHERE id = 10000; - id | name ---------------------------------------------------------------------- -(0 rows) - -RESET client_min_messages; -- This will prune shards with restriction information as NOT MATCHED is void BEGIN; SET citus.log_remote_commands to true; @@ -3189,14 +2898,14 @@ WHEN NOT MATCHED THEN -> Limit -> Sort Sort Key: id2 - -> Seq Scan on demo_source_table_4000151 demo_source_table + -> Seq Scan on demo_source_table_4000135 demo_source_table -> Distributed Subplan XXX_2 -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on demo_source_table_4000151 demo_source_table + -> Seq Scan on demo_source_table_4000135 demo_source_table Task Count: 1 Tasks Shown: All -> Task @@ -3410,10 +3119,10 @@ DEBUG: Creating MERGE repartition plan DEBUG: Using column - index:0 from the source list to redistribute DEBUG: Collect source query results on coordinator DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: -DEBUG: -DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: DEBUG: Execute MERGE task list RESET client_min_messages; SELECT * FROM target_6785 ORDER BY 1; @@ -3531,7 +3240,7 @@ USING s1 s ON t.id = s.id WHEN NOT MATCHED THEN INSERT (id) VALUES(s.val); -ERROR: MERGE INSERT must use the source's joining column for target's distribution column +ERROR: MERGE INSERT must use the source table distribution column value MERGE INTO t1 t USING s1 s ON t.id = s.id @@ -4257,7 +3966,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_ PL/pgSQL function citus_drop_trigger() line XX at PERFORM DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 107 other objects +NOTICE: drop cascades to 103 other objects DETAIL: drop cascades to function insert_data() drop cascades to table local_local drop cascades to table target @@ -4317,10 +4026,6 @@ drop cascades to table pg_source drop cascades to table citus_target drop cascades to table citus_source drop cascades to function compare_tables() -drop cascades to table source_pushdowntest -drop cascades to table target_pushdowntest -drop cascades to table source_withdata -drop cascades to table target_table drop cascades to view pg_source_view drop cascades to view citus_source_view drop cascades to table pg_pa_target @@ -4337,7 +4042,7 @@ drop cascades to table target_set drop cascades to table source_set drop cascades to table refsource_ref drop cascades to table pg_result -drop cascades to table refsource_ref_4000128 +drop cascades to table refsource_ref_4000112 drop cascades to table pg_ref drop cascades to table local_ref drop cascades to table reftarget_local @@ -4355,7 +4060,11 @@ drop cascades to table source_6785 drop cascades to table target_6785 drop cascades to function add_s(integer,integer) drop cascades to table pg -drop cascades to table t1_4000190 -drop cascades to table s1_4000191 +drop cascades to table t1_4000174 +drop cascades to table s1_4000175 drop cascades to table t1 -and 7 other objects (see server log for list) +drop cascades to table s1 +drop cascades to table dist_target +drop cascades to table dist_source +drop cascades to view show_tables +and 3 other objects (see server log for list) diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 7f0c7ca57..fca36f5ab 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -116,8 +116,7 @@ test: function_with_case_when test: clock # MERGE tests -test: merge pgmerge -test: merge_repartition2 +test: merge pgmerge merge_repartition2 test: merge_repartition1 merge_schema_sharding test: merge_partition_tables diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 5316b5233..a41e80841 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1206,139 +1206,6 @@ SET citus.log_remote_commands to false; SELECT compare_tables(); ROLLBACK; - --- let's create source and target table -ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000; -CREATE TABLE source_pushdowntest (id integer); -CREATE TABLE target_pushdowntest (id integer ); - --- let's distribute both table on id field -SELECT create_distributed_table('source_pushdowntest', 'id'); -SELECT create_distributed_table('target_pushdowntest', 'id'); - --- we are doing this operation on single node setup let's figure out colocation id of both tables --- both has same colocation id so both are colocated. -WITH colocations AS ( - SELECT colocationid - FROM pg_dist_partition - WHERE logicalrelid = 'source_pushdowntest'::regclass - OR logicalrelid = 'target_pushdowntest'::regclass -) -SELECT - CASE - WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same' - ELSE 'Different' - END AS colocation_status -FROM colocations; - -SET client_min_messages TO DEBUG1; --- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. - -EXPLAIN (costs off, timing off, summary off) -MERGE INTO target_pushdowntest t -USING source_pushdowntest s -ON t.id = s.id -WHEN NOT MATCHED THEN - INSERT (id) - VALUES (s.id); - --- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. --- DEBUG LOGS show that query is getting pushed down -MERGE INTO target_pushdowntest t -USING (SELECT * from source_pushdowntest where id = 1) s -on t.id = s.id -WHEN NOT MATCHED THEN - INSERT (id) - VALUES (s.id); - - --- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown. -INSERT INTO source_pushdowntest (id) VALUES (3); - -EXPLAIN (costs off, timing off, summary off) -MERGE INTO target_pushdowntest t -USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s -on t.id = s.somekey -WHEN NOT MATCHED THEN - INSERT (id) - VALUES (s.somekey); - - --- let's verify if we use some other column from source for value of distributed column in target. --- it should be inserted to correct shard of target. -CREATE TABLE source_withdata (id integer, some_number integer); -CREATE TABLE target_table (id integer, name text); -SELECT create_distributed_table('source_withdata', 'id'); -SELECT create_distributed_table('target_table', 'id'); - -INSERT INTO source_withdata (id, some_number) VALUES (1, 3); - --- we will use some_number column from source_withdata to insert into distributed column of target. --- value of some_number is 3 let's verify what shard it should go to. -select worker_hash(3); - --- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard. -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN NOT MATCHED THEN - INSERT (id, name) - VALUES (s.some_number, 'parag'); - --- let's verify if data inserted to second shard of target. -EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table; - --- let's verify target data too. -SELECT * FROM target_table; - - --- test UPDATE : when source is single sharded and table are colocated -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN MATCHED THEN - UPDATE SET name = 'parag jain'; - --- let's verify if data updated properly. -SELECT * FROM target_table; - --- let's see what happend when we try to update distributed key of target table -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN MATCHED THEN - UPDATE SET id = 1500; - -SELECT * FROM target_table; - --- test DELETE : when source is single sharded and table are colocated -MERGE INTO target_table t -USING (SELECT id, some_number from source_withdata where id = 1) s -on t.id = s.some_number -WHEN MATCHED THEN - DELETE; - --- let's verify if data deleted properly. -SELECT * FROM target_table; - --- -DELETE FROM source_withdata; -DELETE FROM target_table; -INSERT INTO source VALUES (1,1); - -merge into target_table sda -using source_withdata sdn -on sda.id = sdn.id AND sda.id = 1 -when not matched then - insert (id) - values (10000); - -SELECT * FROM target_table WHERE id = 10000; - -RESET client_min_messages; - - - -- This will prune shards with restriction information as NOT MATCHED is void BEGIN; SET citus.log_remote_commands to true; From aaaf637a6babebc9d9fa181e3a94b68825e2816f Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Mon, 17 Jun 2024 16:07:25 +0200 Subject: [PATCH 13/13] Redo #7620: Fix merge command when insert value does not have source distributed column (#7627) Related to issue #7619, #7620 Merge command fails when source query is single sharded and source and target are co-located and insert is not using distribution key of source. Example ``` CREATE TABLE source (id integer); CREATE TABLE target (id integer ); -- let's distribute both table on id field SELECT create_distributed_table('source', 'id'); SELECT create_distributed_table('target', 'id'); MERGE INTO target t USING ( SELECT 1 AS somekey FROM source WHERE source.id = 1) s ON t.id = s.somekey WHEN NOT MATCHED THEN INSERT (id) VALUES (s.somekey) ERROR: MERGE INSERT must use the source table distribution column value HINT: MERGE INSERT must use the source table distribution column value ``` Author's Opinion: If join is not between source and target distributed column, we should not force user to use source distributed column while inserting value of target distributed column. Fix: If user is not using distributed key of source for insertion let's not push down query to workers and don't force user to use source distributed column if it is not part of join. This reverts commit fa4fc0b372e4068e069946e3fdf454137736bcc7. Co-authored-by: paragjain --- .../distributed/planner/merge_planner.c | 29 +- src/test/regress/expected/merge.out | 337 ++++++++++++++++-- src/test/regress/multi_schedule | 3 +- src/test/regress/sql/merge.sql | 133 +++++++ 4 files changed, 470 insertions(+), 32 deletions(-) diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 4d64b8f56..09d2d90ac 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -182,14 +182,6 @@ CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery, Query *query, return distributedPlan; } - Var *insertVar = - FetchAndValidateInsertVarIfExists(targetRelationId, originalQuery); - if (insertVar && - !IsDistributionColumnInMergeSource((Expr *) insertVar, originalQuery, true)) - { - ereport(ERROR, (errmsg("MERGE INSERT must use the source table " - "distribution column value"))); - } Job *job = RouterJob(originalQuery, plannerRestrictionContext, &distributedPlan->planningError); @@ -1124,6 +1116,27 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, "repartitioning"))); return deferredError; } + + + /* + * If execution has reached this point, it indicates that the query can be delegated to the worker. + * However, before proceeding with this delegation, we need to confirm that the user is utilizing + * the distribution column of the source table in the Insert variable. + * If this is not the case, we should refrain from pushing down the query. + * This is just a deffered error which will be handle by caller. + */ + + Var *insertVar = + FetchAndValidateInsertVarIfExists(targetRelationId, query); + if (insertVar && + !IsDistributionColumnInMergeSource((Expr *) insertVar, query, true)) + { + ereport(DEBUG1, (errmsg( + "MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied", + NULL, NULL); + } return NULL; } diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index a73467e81..5056ba543 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -1128,7 +1128,7 @@ DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(rs_source.id); DEBUG: Creating MERGE router plan -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO rs_local FROM rs_target ORDER BY 1 ; -- Should be equal @@ -1259,7 +1259,7 @@ DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(fn_source.id, fn_source.source); DEBUG: Creating MERGE router plan -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO fn_local FROM fn_target ORDER BY 1 ; -- Should be equal @@ -1552,7 +1552,7 @@ BEGIN; SET citus.log_remote_commands to true; SET client_min_messages TO DEBUG1; EXECUTE merge_prepare(2); -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -1782,13 +1782,13 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.log_remote_commands to false; SELECT compare_tables(); @@ -1842,6 +1842,297 @@ SELECT compare_tables(); (1 row) ROLLBACK; +-- let's create source and target table +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000; +CREATE TABLE source_pushdowntest (id integer); +CREATE TABLE target_pushdowntest (id integer ); +-- let's distribute both table on id field +SELECT create_distributed_table('source_pushdowntest', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target_pushdowntest', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- we are doing this operation on single node setup let's figure out colocation id of both tables +-- both has same colocation id so both are colocated. +WITH colocations AS ( + SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'source_pushdowntest'::regclass + OR logicalrelid = 'target_pushdowntest'::regclass +) +SELECT + CASE + WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same' + ELSE 'Different' + END AS colocation_status +FROM colocations; + colocation_status +--------------------------------------------------------------------- + Same +(1 row) + +SET client_min_messages TO DEBUG1; +-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. +EXPLAIN (costs off, timing off, summary off) +MERGE INTO target_pushdowntest t +USING source_pushdowntest s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: Creating MERGE router plan + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000068 t + -> Merge Left Join + Merge Cond: (s.id = t.id) + -> Sort + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000064 s + -> Sort + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000068 t + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000069 t + -> Merge Left Join + Merge Cond: (s.id = t.id) + -> Sort + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000065 s + -> Sort + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000069 t + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000070 t + -> Merge Left Join + Merge Cond: (s.id = t.id) + -> Sort + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000066 s + -> Sort + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000070 t + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge on target_pushdowntest_4000071 t + -> Merge Left Join + Merge Cond: (s.id = t.id) + -> Sort + Sort Key: s.id + -> Seq Scan on source_pushdowntest_4000067 s + -> Sort + Sort Key: t.id + -> Seq Scan on target_pushdowntest_4000071 t +(47 rows) + +-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. +-- DEBUG LOGS show that query is getting pushed down +MERGE INTO target_pushdowntest t +USING (SELECT * from source_pushdowntest where id = 1) s +on t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); +DEBUG: +DEBUG: Creating MERGE router plan +-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown. +INSERT INTO source_pushdowntest (id) VALUES (3); +EXPLAIN (costs off, timing off, summary off) +MERGE INTO target_pushdowntest t +USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s +on t.id = s.somekey +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.somekey); +DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied +DEBUG: MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:0 from the source list to redistribute + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus MERGE INTO ...) + MERGE INTO target_pushdowntest method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_pushdowntest_4000064 source_pushdowntest + Filter: (id = 1) +(9 rows) + +-- let's verify if we use some other column from source for value of distributed column in target. +-- it should be inserted to correct shard of target. +CREATE TABLE source_withdata (id integer, some_number integer); +CREATE TABLE target_table (id integer, name text); +SELECT create_distributed_table('source_withdata', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target_table', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source_withdata (id, some_number) VALUES (1, 3); +-- we will use some_number column from source_withdata to insert into distributed column of target. +-- value of some_number is 3 let's verify what shard it should go to. +select worker_hash(3); + worker_hash +--------------------------------------------------------------------- + -28094569 +(1 row) + +-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard. +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN NOT MATCHED THEN + INSERT (id, name) + VALUES (s.some_number, 'parag'); +DEBUG: Sub-query is not pushable, try repartitioning +DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:1 from the source list to redistribute +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: Execute MERGE task list +-- let's verify if data inserted to second shard of target. +EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 4 + Tuple data received from nodes: 9 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_table_4000076 target_table (actual rows=0 loops=1) + -> Task + Tuple data received from node: 9 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_table_4000077 target_table (actual rows=1 loops=1) + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_table_4000078 target_table (actual rows=0 loops=1) + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on target_table_4000079 target_table (actual rows=0 loops=1) +(20 rows) + +-- let's verify target data too. +SELECT * FROM target_table; + id | name +--------------------------------------------------------------------- + 3 | parag +(1 row) + +-- test UPDATE : when source is single sharded and table are colocated +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + UPDATE SET name = 'parag jain'; +DEBUG: Sub-query is not pushable, try repartitioning +DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:1 from the source list to redistribute +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: Execute MERGE task list +-- let's verify if data updated properly. +SELECT * FROM target_table; + id | name +--------------------------------------------------------------------- + 3 | parag jain +(1 row) + +-- let's see what happend when we try to update distributed key of target table +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + UPDATE SET id = 1500; +ERROR: updating the distribution column is not allowed in MERGE actions +SELECT * FROM target_table; + id | name +--------------------------------------------------------------------- + 3 | parag jain +(1 row) + +-- test DELETE : when source is single sharded and table are colocated +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + DELETE; +DEBUG: Sub-query is not pushable, try repartitioning +DEBUG: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:1 from the source list to redistribute +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: Execute MERGE task list +-- let's verify if data deleted properly. +SELECT * FROM target_table; + id | name +--------------------------------------------------------------------- +(0 rows) + +-- +DELETE FROM source_withdata; +DELETE FROM target_table; +INSERT INTO source VALUES (1,1); +merge into target_table sda +using source_withdata sdn +on sda.id = sdn.id AND sda.id = 1 +when not matched then + insert (id) + values (10000); +ERROR: MERGE INSERT is using unsupported expression type for distribution column +DETAIL: Inserting arbitrary values that don't correspond to the joined column values can lead to unpredictable outcomes where rows are incorrectly distributed among different shards +SELECT * FROM target_table WHERE id = 10000; + id | name +--------------------------------------------------------------------- +(0 rows) + +RESET client_min_messages; -- This will prune shards with restriction information as NOT MATCHED is void BEGIN; SET citus.log_remote_commands to true; @@ -2898,14 +3189,14 @@ WHEN NOT MATCHED THEN -> Limit -> Sort Sort Key: id2 - -> Seq Scan on demo_source_table_4000135 demo_source_table + -> Seq Scan on demo_source_table_4000151 demo_source_table -> Distributed Subplan XXX_2 -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on demo_source_table_4000135 demo_source_table + -> Seq Scan on demo_source_table_4000151 demo_source_table Task Count: 1 Tasks Shown: All -> Task @@ -3119,10 +3410,10 @@ DEBUG: Creating MERGE repartition plan DEBUG: Using column - index:0 from the source list to redistribute DEBUG: Collect source query results on coordinator DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: -DEBUG: -DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: DEBUG: Execute MERGE task list RESET client_min_messages; SELECT * FROM target_6785 ORDER BY 1; @@ -3240,7 +3531,7 @@ USING s1 s ON t.id = s.id WHEN NOT MATCHED THEN INSERT (id) VALUES(s.val); -ERROR: MERGE INSERT must use the source table distribution column value +ERROR: MERGE INSERT must use the source's joining column for target's distribution column MERGE INTO t1 t USING s1 s ON t.id = s.id @@ -3966,7 +4257,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_ PL/pgSQL function citus_drop_trigger() line XX at PERFORM DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 103 other objects +NOTICE: drop cascades to 107 other objects DETAIL: drop cascades to function insert_data() drop cascades to table local_local drop cascades to table target @@ -4026,6 +4317,10 @@ drop cascades to table pg_source drop cascades to table citus_target drop cascades to table citus_source drop cascades to function compare_tables() +drop cascades to table source_pushdowntest +drop cascades to table target_pushdowntest +drop cascades to table source_withdata +drop cascades to table target_table drop cascades to view pg_source_view drop cascades to view citus_source_view drop cascades to table pg_pa_target @@ -4042,7 +4337,7 @@ drop cascades to table target_set drop cascades to table source_set drop cascades to table refsource_ref drop cascades to table pg_result -drop cascades to table refsource_ref_4000112 +drop cascades to table refsource_ref_4000128 drop cascades to table pg_ref drop cascades to table local_ref drop cascades to table reftarget_local @@ -4060,11 +4355,7 @@ drop cascades to table source_6785 drop cascades to table target_6785 drop cascades to function add_s(integer,integer) drop cascades to table pg -drop cascades to table t1_4000174 -drop cascades to table s1_4000175 +drop cascades to table t1_4000190 +drop cascades to table s1_4000191 drop cascades to table t1 -drop cascades to table s1 -drop cascades to table dist_target -drop cascades to table dist_source -drop cascades to view show_tables -and 3 other objects (see server log for list) +and 7 other objects (see server log for list) diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index fca36f5ab..7f0c7ca57 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -116,7 +116,8 @@ test: function_with_case_when test: clock # MERGE tests -test: merge pgmerge merge_repartition2 +test: merge pgmerge +test: merge_repartition2 test: merge_repartition1 merge_schema_sharding test: merge_partition_tables diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index a41e80841..5316b5233 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1206,6 +1206,139 @@ SET citus.log_remote_commands to false; SELECT compare_tables(); ROLLBACK; + +-- let's create source and target table +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 13000; +CREATE TABLE source_pushdowntest (id integer); +CREATE TABLE target_pushdowntest (id integer ); + +-- let's distribute both table on id field +SELECT create_distributed_table('source_pushdowntest', 'id'); +SELECT create_distributed_table('target_pushdowntest', 'id'); + +-- we are doing this operation on single node setup let's figure out colocation id of both tables +-- both has same colocation id so both are colocated. +WITH colocations AS ( + SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'source_pushdowntest'::regclass + OR logicalrelid = 'target_pushdowntest'::regclass +) +SELECT + CASE + WHEN COUNT(DISTINCT colocationid) = 1 THEN 'Same' + ELSE 'Different' + END AS colocation_status +FROM colocations; + +SET client_min_messages TO DEBUG1; +-- Test 1 : tables are colocated AND query is multisharded AND Join On distributed column : should push down to workers. + +EXPLAIN (costs off, timing off, summary off) +MERGE INTO target_pushdowntest t +USING source_pushdowntest s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); + +-- Test 2 : tables are colocated AND source query is not multisharded : should push down to worker. +-- DEBUG LOGS show that query is getting pushed down +MERGE INTO target_pushdowntest t +USING (SELECT * from source_pushdowntest where id = 1) s +on t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.id); + + +-- Test 3 : tables are colocated source query is single sharded but not using source distributed column in insertion. let's not pushdown. +INSERT INTO source_pushdowntest (id) VALUES (3); + +EXPLAIN (costs off, timing off, summary off) +MERGE INTO target_pushdowntest t +USING (SELECT 1 as somekey, id from source_pushdowntest where id = 1) s +on t.id = s.somekey +WHEN NOT MATCHED THEN + INSERT (id) + VALUES (s.somekey); + + +-- let's verify if we use some other column from source for value of distributed column in target. +-- it should be inserted to correct shard of target. +CREATE TABLE source_withdata (id integer, some_number integer); +CREATE TABLE target_table (id integer, name text); +SELECT create_distributed_table('source_withdata', 'id'); +SELECT create_distributed_table('target_table', 'id'); + +INSERT INTO source_withdata (id, some_number) VALUES (1, 3); + +-- we will use some_number column from source_withdata to insert into distributed column of target. +-- value of some_number is 3 let's verify what shard it should go to. +select worker_hash(3); + +-- it should go to second shard of target as target has 4 shard and hash "-28094569" comes in range of second shard. +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN NOT MATCHED THEN + INSERT (id, name) + VALUES (s.some_number, 'parag'); + +-- let's verify if data inserted to second shard of target. +EXPLAIN (analyze on, costs off, timing off, summary off) SELECT * FROM target_table; + +-- let's verify target data too. +SELECT * FROM target_table; + + +-- test UPDATE : when source is single sharded and table are colocated +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + UPDATE SET name = 'parag jain'; + +-- let's verify if data updated properly. +SELECT * FROM target_table; + +-- let's see what happend when we try to update distributed key of target table +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + UPDATE SET id = 1500; + +SELECT * FROM target_table; + +-- test DELETE : when source is single sharded and table are colocated +MERGE INTO target_table t +USING (SELECT id, some_number from source_withdata where id = 1) s +on t.id = s.some_number +WHEN MATCHED THEN + DELETE; + +-- let's verify if data deleted properly. +SELECT * FROM target_table; + +-- +DELETE FROM source_withdata; +DELETE FROM target_table; +INSERT INTO source VALUES (1,1); + +merge into target_table sda +using source_withdata sdn +on sda.id = sdn.id AND sda.id = 1 +when not matched then + insert (id) + values (10000); + +SELECT * FROM target_table WHERE id = 10000; + +RESET client_min_messages; + + + -- This will prune shards with restriction information as NOT MATCHED is void BEGIN; SET citus.log_remote_commands to true;