diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 13d0b84d6..6a80a7c33 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -33,6 +33,7 @@ #include "distributed/query_colocation_checker.h" #include "distributed/repartition_executor.h" #include "distributed/shared_library_init.h" +#include "distributed/shard_pruning.h" #if PG_VERSION_NUM >= PG_VERSION_15 @@ -40,6 +41,9 @@ static int SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList, CitusTableCacheEntry *targetRelation); static Var * ValidateAndReturnVarIfSupported(Node *entryExpr); +static DeferredErrorMessage * DeferErrorIfTargetHasFalseClause(Oid targetRelationId, + PlannerRestrictionContext * + plannerRestrictionContext); static void ErrorIfMergeQueryQualAndTargetListNotSupported(Oid targetRelationId, Query *originalQuery); static void ErrorIfMergeNotSupported(Query *query, Oid targetRelationId, @@ -51,7 +55,8 @@ static DeferredErrorMessage * DeferErrorIfRoutableMergeNotSupported(Query *query List *rangeTableList, PlannerRestrictionContext * - plannerRestrictionContext); + plannerRestrictionContext, + Oid targetRelationId); static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, Query *query, @@ -164,7 +169,8 @@ CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery, Query *query, distributedPlan->planningError = DeferErrorIfRoutableMergeNotSupported(originalQuery, rangeTableList, - plannerRestrictionContext); + plannerRestrictionContext, + targetRelationId); if (distributedPlan->planningError != NULL) { return distributedPlan; @@ -926,13 +932,52 @@ ErrorIfMergeNotSupported(Query *query, Oid targetRelationId, List *rangeTableLis } +/* + * DeferErrorIfTargetHasFalseClause checks for the presence of a false clause in the + * target relation and throws an exception if found. Router planner prunes all the shards + * for relations with such clauses, resulting in no task generation for the job. However, + * in the case of a MERGE query, tasks still need to be generated for the shards of the + * source relation. + */ +static DeferredErrorMessage * +DeferErrorIfTargetHasFalseClause(Oid targetRelationId, + PlannerRestrictionContext *plannerRestrictionContext) +{ + ListCell *restrictionCell = NULL; + foreach(restrictionCell, + plannerRestrictionContext->relationRestrictionContext->relationRestrictionList) + { + RelationRestriction *relationRestriction = + (RelationRestriction *) lfirst(restrictionCell); + Oid relationId = relationRestriction->relationId; + + /* Check only for target relation */ + if (relationId != targetRelationId) + { + continue; + } + + List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo; + List *restrictClauseList = get_all_actual_clauses(baseRestrictionList); + if (ContainsFalseClause(restrictClauseList)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "Routing query is not possible with " + "no shards for target", NULL, NULL); + } + } + return NULL; +} + + /* * DeferErrorIfRoutableMergeNotSupported Checks for conditions that prevent pushable planning, if * found, raises a deferred error, which then continues to try repartitioning strategy. */ static DeferredErrorMessage * DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, - PlannerRestrictionContext *plannerRestrictionContext) + PlannerRestrictionContext *plannerRestrictionContext, + Oid targetRelationId) { List *distTablesList = NIL; List *refTablesList = NIL; @@ -1020,6 +1065,17 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, "conflict, use ON instead", NULL, NULL); } + deferredError = DeferErrorIfTargetHasFalseClause(targetRelationId, + plannerRestrictionContext); + if (deferredError) + { + ereport(DEBUG1, (errmsg("Target relation has a filter of the " + "form: false (AND ..), which results " + "in empty shards, but we still need " + "to evaluate NOT-MATCHED clause, try " + "repartitioning"))); + return deferredError; + } return NULL; } diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 882a22091..3cb69936c 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -2973,6 +2973,164 @@ SELECT COUNT(*) FROM demo_distributed where id1 = 2; 7 (1 row) +-- +-- Test FALSE filters +-- +CREATE TABLE source_filter(order_id INT, customer_id INT, order_center VARCHAR, order_time timestamp); +CREATE TABLE target_filter(customer_id INT, last_order_id INT, order_center VARCHAR, order_count INT, last_order timestamp); +SELECT create_distributed_table('source_filter', 'customer_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('target_filter', 'customer_id', colocate_with => 'source_filter'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE FUNCTION load_filter() RETURNS VOID AS $$ + +TRUNCATE target_filter; +TRUNCATE source_filter; + +INSERT INTO target_filter VALUES(100, 11, 'trg', -1, '2022-01-01 00:00:00'); -- Match UPDATE +INSERT INTO target_filter VALUES(200, 11, 'trg', -1, '2022-01-01 00:00:00'); -- Match DELETE + +INSERT INTO source_filter VALUES(12, 100, 'src', '2022-01-01 00:00:00'); +INSERT INTO source_filter VALUES(12, 200, 'src', '2022-01-01 00:00:00'); +INSERT INTO source_filter VALUES(12, 300, 'src', '2022-01-01 00:00:00'); + +$$ +LANGUAGE SQL; +--WHEN MATCH and FALSE +SELECT load_filter(); + load_filter +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO target_filter t +USING source_filter s +ON s.customer_id = t.customer_id +WHEN MATCHED AND t.customer_id = 100 AND (FALSE) THEN + UPDATE SET order_count = 999 +WHEN MATCHED AND t.customer_id = 200 THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time); +SELECT * FROM target_filter ORDER BY 1, 2; + customer_id | last_order_id | order_center | order_count | last_order +--------------------------------------------------------------------- + 100 | 11 | trg | -1 | Sat Jan 01 00:00:00 2022 + 300 | 12 | src | 1 | Sat Jan 01 00:00:00 2022 +(2 rows) + +--WHEN NOT MATCH and 1=0 +SELECT load_filter(); + load_filter +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO target_filter t +USING source_filter s +ON s.customer_id = t.customer_id +WHEN MATCHED AND t.customer_id = 100 THEN + UPDATE SET order_count = 999 +WHEN MATCHED AND t.customer_id = 200 THEN + DELETE +WHEN NOT MATCHED AND (1=0) THEN + INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time); +SELECT * FROM target_filter ORDER BY 1, 2; + customer_id | last_order_id | order_center | order_count | last_order +--------------------------------------------------------------------- + 100 | 11 | trg | 999 | Sat Jan 01 00:00:00 2022 +(1 row) + +--ON t.key = s.key AND 1 < 0 +SELECT load_filter(); + load_filter +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO target_filter t +USING source_filter s +ON s.customer_id = t.customer_id AND 1 < 0 +WHEN MATCHED AND t.customer_id = 100 THEN + UPDATE SET order_count = 999 +WHEN MATCHED AND t.customer_id = 200 THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time); +ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from either a non-equi-join or a mismatch in the datatypes of the columns being joined. +DETAIL: Without a equi-join condition on the target's distribution column, the source rows cannot be efficiently redistributed, and the NOT-MATCHED condition cannot be evaluated unambiguously. This can result in incorrect or unexpected results when attempting to merge tables in a distributed setting +SELECT * FROM target_filter ORDER BY 1, 2; + customer_id | last_order_id | order_center | order_count | last_order +--------------------------------------------------------------------- + 100 | 11 | trg | -1 | Sat Jan 01 00:00:00 2022 + 200 | 11 | trg | -1 | Sat Jan 01 00:00:00 2022 +(2 rows) + +--(SELECT * FROM source_filter WHERE false) as source_filter +SELECT load_filter(); + load_filter +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO target_filter t +USING (SELECT * FROM source_filter WHERE false) s +ON s.customer_id = t.customer_id +WHEN MATCHED AND t.customer_id = 100 THEN + UPDATE SET order_count = 999 +WHEN MATCHED AND t.customer_id = 200 THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time); +SELECT * FROM target_filter ORDER BY 1, 2; + customer_id | last_order_id | order_center | order_count | last_order +--------------------------------------------------------------------- + 100 | 11 | trg | -1 | Sat Jan 01 00:00:00 2022 + 200 | 11 | trg | -1 | Sat Jan 01 00:00:00 2022 +(2 rows) + +-- Bug 6785 +CREATE TABLE source_6785( id integer, z int, d jsonb); +CREATE TABLE target_6785( id integer, z int, d jsonb); +SELECT create_distributed_table('target_6785','id'), create_distributed_table('source_6785', 'id'); + create_distributed_table | create_distributed_table +--------------------------------------------------------------------- + | +(1 row) + +INSERT INTO source_6785 SELECT i,i FROM generate_series(0,5)i; +SET client_min_messages TO DEBUG1; +MERGE INTO target_6785 sda +USING (SELECT * FROM source_6785 WHERE id = 1) sdn +ON sda.id = sdn.id AND sda.id = 2 +WHEN NOT matched THEN + INSERT (id, z) VALUES (sdn.id, 5); +DEBUG: Target relation has a filter of the form: false (AND ..), which results in empty shards, but we still need to evaluate NOT-MATCHED clause, try repartitioning +DEBUG: Routing query is not possible with no shards for target +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:0 from the source list to redistribute +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: Execute MERGE task list +RESET client_min_messages; +SELECT * FROM target_6785 ORDER BY 1; + id | z | d +--------------------------------------------------------------------- + 1 | 5 | +(1 row) + -- -- Error and Unsupported scenarios -- @@ -3725,21 +3883,21 @@ INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; -- with a colocated table MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b; -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) WHEN MATCHED THEN DELETE; -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan -- with non-colocated single-shard table MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a) @@ -3774,14 +3932,14 @@ DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: Collect source query results on coordinator DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000173 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000173'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) -DEBUG: -DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000174 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000174'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) -DEBUG: -DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000175 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000175'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) -DEBUG: -DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000176 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000176'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) +DEBUG: +DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000189 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000189'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) +DEBUG: +DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000190 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000190'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) +DEBUG: +DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000191 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000191'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) +DEBUG: +DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000192 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000192'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) -- with a reference table MERGE INTO nullkey_c1_t1 USING reference_table ON (nullkey_c1_t1.a = reference_table.a) WHEN MATCHED THEN UPDATE SET b = reference_table.b; @@ -3824,7 +3982,7 @@ WITH cte AS ( ) MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b; -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan WITH cte AS ( SELECT * FROM distributed_table @@ -3989,7 +4147,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_ PL/pgSQL function citus_drop_trigger() line XX at PERFORM DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 98 other objects +NOTICE: drop cascades to 103 other objects DETAIL: drop cascades to function insert_data() drop cascades to table local_local drop cascades to table target @@ -4076,15 +4234,18 @@ drop cascades to table demo_distributed drop cascades to table demo_source_table drop cascades to table pg_demo_result drop cascades to table dist_demo_result +drop cascades to table source_filter +drop cascades to table target_filter +drop cascades to function load_filter() +drop cascades to table source_6785 +drop cascades to table target_6785 drop cascades to function add_s(integer,integer) drop cascades to table pg -drop cascades to table t1_4000158 -drop cascades to table s1_4000159 +drop cascades to table t1_4000174 +drop cascades to table s1_4000175 drop cascades to table t1 drop cascades to table s1 drop cascades to table dist_target drop cascades to table dist_source drop cascades to view show_tables -drop cascades to table target_columnar -drop cascades to table target_1 -drop cascades to table source_2 +and 3 other objects (see server log for list) diff --git a/src/test/regress/expected/merge_repartition1.out b/src/test/regress/expected/merge_repartition1.out index 0c3c47389..279358e30 100644 --- a/src/test/regress/expected/merge_repartition1.out +++ b/src/test/regress/expected/merge_repartition1.out @@ -1000,28 +1000,14 @@ SQL function "compare_data" statement 2 (1 row) -- Test source-query that requires repartitioning on top of MERGE repartitioning +SET client_min_messages TO WARNING; SELECT cleanup_data(); -NOTICE: creating a new table for merge_repartition1_schema.citus_target -CONTEXT: SQL function "cleanup_data" statement 5 -NOTICE: moving the data of merge_repartition1_schema.citus_target -CONTEXT: SQL function "cleanup_data" statement 5 -NOTICE: dropping the old merge_repartition1_schema.citus_target -CONTEXT: SQL function "cleanup_data" statement 5 -NOTICE: renaming the new table to merge_repartition1_schema.citus_target -CONTEXT: SQL function "cleanup_data" statement 5 -NOTICE: creating a new table for merge_repartition1_schema.citus_source -CONTEXT: SQL function "cleanup_data" statement 6 -NOTICE: moving the data of merge_repartition1_schema.citus_source -CONTEXT: SQL function "cleanup_data" statement 6 -NOTICE: dropping the old merge_repartition1_schema.citus_source -CONTEXT: SQL function "cleanup_data" statement 6 -NOTICE: renaming the new table to merge_repartition1_schema.citus_source -CONTEXT: SQL function "cleanup_data" statement 6 cleanup_data --------------------------------------------------------------------- (1 row) +RESET client_min_messages; SELECT setup_data(); setup_data --------------------------------------------------------------------- @@ -1159,28 +1145,14 @@ NOTICE: renaming the new table to merge_repartition1_schema.citus_source (1 row) -- Test CTE/Subquery in merge-actions (works only for router query) +SET client_min_messages TO WARNING; SELECT cleanup_data(); -NOTICE: creating a new table for merge_repartition1_schema.citus_target -CONTEXT: SQL function "cleanup_data" statement 5 -NOTICE: moving the data of merge_repartition1_schema.citus_target -CONTEXT: SQL function "cleanup_data" statement 5 -NOTICE: dropping the old merge_repartition1_schema.citus_target -CONTEXT: SQL function "cleanup_data" statement 5 -NOTICE: renaming the new table to merge_repartition1_schema.citus_target -CONTEXT: SQL function "cleanup_data" statement 5 -NOTICE: creating a new table for merge_repartition1_schema.citus_source -CONTEXT: SQL function "cleanup_data" statement 6 -NOTICE: moving the data of merge_repartition1_schema.citus_source -CONTEXT: SQL function "cleanup_data" statement 6 -NOTICE: dropping the old merge_repartition1_schema.citus_source -CONTEXT: SQL function "cleanup_data" statement 6 -NOTICE: renaming the new table to merge_repartition1_schema.citus_source -CONTEXT: SQL function "cleanup_data" statement 6 cleanup_data --------------------------------------------------------------------- (1 row) +RESET client_min_messages; SELECT setup_data(); setup_data --------------------------------------------------------------------- @@ -1233,6 +1205,137 @@ SQL function "compare_data" statement 2 (1 row) +-- +-- Test target with false clause +-- +SET client_min_messages TO WARNING; +SELECT cleanup_data(); + cleanup_data +--------------------------------------------------------------------- + +(1 row) + +RESET client_min_messages; +SELECT setup_data(); + setup_data +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('citus_target', 'id'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition1_schema.citus_target$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('citus_source', 'id', colocate_with => 'citus_target'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition1_schema.citus_source$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO pg_target t +USING (SELECT * FROM pg_source WHERE id > 2500) AS s +ON t.id = s.id AND t.id < 2500 +WHEN MATCHED AND t.id <= 5500 THEN + UPDATE SET val = s.val + 1 +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +MERGE INTO citus_target t +USING (SELECT * FROM citus_source WHERE id > 2500) AS s +ON t.id = s.id AND t.id < 2500 +WHEN MATCHED AND t.id <= 5500 THEN + UPDATE SET val = s.val + 1 +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +SELECT compare_data(); +NOTICE: The average of pg_target.id is equal to citus_target.id +CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE +SQL function "compare_data" statement 1 +NOTICE: The average of pg_target.val is equal to citus_target.val +CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE +SQL function "compare_data" statement 2 + compare_data +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO WARNING; +SELECT cleanup_data(); + cleanup_data +--------------------------------------------------------------------- + +(1 row) + +RESET client_min_messages; +SELECT setup_data(); + setup_data +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('citus_target', 'id'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition1_schema.citus_target$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('citus_source', 'id', colocate_with => 'citus_target'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition1_schema.citus_source$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO pg_target t +USING (SELECT * FROM pg_source WHERE id = 2500) AS s +ON t.id = s.id AND t.id = 5000 +WHEN MATCHED AND t.id <= 5500 THEN + UPDATE SET val = s.val + 1 +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +MERGE INTO citus_target t +USING (SELECT * FROM citus_source WHERE id = 2500) AS s +ON t.id = s.id AND t.id = 5000 +WHEN MATCHED AND t.id <= 5500 THEN + UPDATE SET val = s.val + 1 +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +SELECT compare_data(); +NOTICE: The average of pg_target.id is equal to citus_target.id +CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE +SQL function "compare_data" statement 1 +NOTICE: The average of pg_target.val is equal to citus_target.val +CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE +SQL function "compare_data" statement 2 + compare_data +--------------------------------------------------------------------- + +(1 row) + DROP SCHEMA merge_repartition1_schema CASCADE; NOTICE: drop cascades to 8 other objects DETAIL: drop cascades to table pg_target diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index db3a76fb6..4fb911736 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1855,6 +1855,102 @@ WHEN MATCHED THEN UPDATE SET val1 = 150; SELECT COUNT(*) FROM demo_distributed where val1 = 150; SELECT COUNT(*) FROM demo_distributed where id1 = 2; +-- +-- Test FALSE filters +-- +CREATE TABLE source_filter(order_id INT, customer_id INT, order_center VARCHAR, order_time timestamp); +CREATE TABLE target_filter(customer_id INT, last_order_id INT, order_center VARCHAR, order_count INT, last_order timestamp); + +SELECT create_distributed_table('source_filter', 'customer_id'); +SELECT create_distributed_table('target_filter', 'customer_id', colocate_with => 'source_filter'); + +CREATE FUNCTION load_filter() RETURNS VOID AS $$ + +TRUNCATE target_filter; +TRUNCATE source_filter; + +INSERT INTO target_filter VALUES(100, 11, 'trg', -1, '2022-01-01 00:00:00'); -- Match UPDATE +INSERT INTO target_filter VALUES(200, 11, 'trg', -1, '2022-01-01 00:00:00'); -- Match DELETE + +INSERT INTO source_filter VALUES(12, 100, 'src', '2022-01-01 00:00:00'); +INSERT INTO source_filter VALUES(12, 200, 'src', '2022-01-01 00:00:00'); +INSERT INTO source_filter VALUES(12, 300, 'src', '2022-01-01 00:00:00'); + +$$ +LANGUAGE SQL; + +--WHEN MATCH and FALSE +SELECT load_filter(); +MERGE INTO target_filter t +USING source_filter s +ON s.customer_id = t.customer_id +WHEN MATCHED AND t.customer_id = 100 AND (FALSE) THEN + UPDATE SET order_count = 999 +WHEN MATCHED AND t.customer_id = 200 THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time); + +SELECT * FROM target_filter ORDER BY 1, 2; + +--WHEN NOT MATCH and 1=0 +SELECT load_filter(); +MERGE INTO target_filter t +USING source_filter s +ON s.customer_id = t.customer_id +WHEN MATCHED AND t.customer_id = 100 THEN + UPDATE SET order_count = 999 +WHEN MATCHED AND t.customer_id = 200 THEN + DELETE +WHEN NOT MATCHED AND (1=0) THEN + INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time); + +SELECT * FROM target_filter ORDER BY 1, 2; + +--ON t.key = s.key AND 1 < 0 +SELECT load_filter(); +MERGE INTO target_filter t +USING source_filter s +ON s.customer_id = t.customer_id AND 1 < 0 +WHEN MATCHED AND t.customer_id = 100 THEN + UPDATE SET order_count = 999 +WHEN MATCHED AND t.customer_id = 200 THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time); + +SELECT * FROM target_filter ORDER BY 1, 2; + +--(SELECT * FROM source_filter WHERE false) as source_filter +SELECT load_filter(); +MERGE INTO target_filter t +USING (SELECT * FROM source_filter WHERE false) s +ON s.customer_id = t.customer_id +WHEN MATCHED AND t.customer_id = 100 THEN + UPDATE SET order_count = 999 +WHEN MATCHED AND t.customer_id = 200 THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time); + +SELECT * FROM target_filter ORDER BY 1, 2; + +-- Bug 6785 +CREATE TABLE source_6785( id integer, z int, d jsonb); +CREATE TABLE target_6785( id integer, z int, d jsonb); +SELECT create_distributed_table('target_6785','id'), create_distributed_table('source_6785', 'id'); +INSERT INTO source_6785 SELECT i,i FROM generate_series(0,5)i; + +SET client_min_messages TO DEBUG1; +MERGE INTO target_6785 sda +USING (SELECT * FROM source_6785 WHERE id = 1) sdn +ON sda.id = sdn.id AND sda.id = 2 +WHEN NOT matched THEN + INSERT (id, z) VALUES (sdn.id, 5); +RESET client_min_messages; + +SELECT * FROM target_6785 ORDER BY 1; + -- -- Error and Unsupported scenarios -- diff --git a/src/test/regress/sql/merge_repartition1.sql b/src/test/regress/sql/merge_repartition1.sql index 4d73e999d..858f4710c 100644 --- a/src/test/regress/sql/merge_repartition1.sql +++ b/src/test/regress/sql/merge_repartition1.sql @@ -434,7 +434,9 @@ WHEN NOT MATCHED THEN SELECT compare_data(); -- Test source-query that requires repartitioning on top of MERGE repartitioning +SET client_min_messages TO WARNING; SELECT cleanup_data(); +RESET client_min_messages; SELECT setup_data(); SELECT create_distributed_table('citus_target', 'id'); SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none'); @@ -489,7 +491,9 @@ SELECT compare_data(); SELECT alter_table_set_access_method('citus_source', 'heap'); -- Test CTE/Subquery in merge-actions (works only for router query) +SET client_min_messages TO WARNING; SELECT cleanup_data(); +RESET client_min_messages; SELECT setup_data(); SELECT create_distributed_table('citus_target', 'id'); SELECT create_distributed_table('citus_source', 'id', colocate_with=>'citus_target'); @@ -512,4 +516,65 @@ WHEN NOT MATCHED AND (SELECT max_a < 5001 FROM (SELECT max(id) as max_a, max(val SELECT compare_data(); +-- +-- Test target with false clause +-- +SET client_min_messages TO WARNING; +SELECT cleanup_data(); +RESET client_min_messages; +SELECT setup_data(); +SELECT create_distributed_table('citus_target', 'id'); +SELECT create_distributed_table('citus_source', 'id', colocate_with => 'citus_target'); + +MERGE INTO pg_target t +USING (SELECT * FROM pg_source WHERE id > 2500) AS s +ON t.id = s.id AND t.id < 2500 +WHEN MATCHED AND t.id <= 5500 THEN + UPDATE SET val = s.val + 1 +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +MERGE INTO citus_target t +USING (SELECT * FROM citus_source WHERE id > 2500) AS s +ON t.id = s.id AND t.id < 2500 +WHEN MATCHED AND t.id <= 5500 THEN + UPDATE SET val = s.val + 1 +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +SELECT compare_data(); + +SET client_min_messages TO WARNING; +SELECT cleanup_data(); +RESET client_min_messages; +SELECT setup_data(); +SELECT create_distributed_table('citus_target', 'id'); +SELECT create_distributed_table('citus_source', 'id', colocate_with => 'citus_target'); + +MERGE INTO pg_target t +USING (SELECT * FROM pg_source WHERE id = 2500) AS s +ON t.id = s.id AND t.id = 5000 +WHEN MATCHED AND t.id <= 5500 THEN + UPDATE SET val = s.val + 1 +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +MERGE INTO citus_target t +USING (SELECT * FROM citus_source WHERE id = 2500) AS s +ON t.id = s.id AND t.id = 5000 +WHEN MATCHED AND t.id <= 5500 THEN + UPDATE SET val = s.val + 1 +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +SELECT compare_data(); + DROP SCHEMA merge_repartition1_schema CASCADE;