diff --git a/src/backend/distributed/deparser/citus_ruleutils.c b/src/backend/distributed/deparser/citus_ruleutils.c index 6b865e061..f964eec84 100644 --- a/src/backend/distributed/deparser/citus_ruleutils.c +++ b/src/backend/distributed/deparser/citus_ruleutils.c @@ -1381,6 +1381,13 @@ contain_nextval_expression_walker(Node *node, void *context) return true; } } + + if (IsA(node, Query)) + { + return query_tree_walker((Query *) node, + contain_nextval_expression_walker, NULL, 0); + } + return expression_tree_walker(node, contain_nextval_expression_walker, context); } diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 6a80a7c33..6dd4658b5 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -21,6 +21,7 @@ #include "distributed/citus_clauses.h" #include "distributed/citus_custom_scan.h" +#include "distributed/citus_ruleutils.h" #include "distributed/insert_select_planner.h" #include "distributed/listutils.h" #include "distributed/local_distributed_join_planner.h" @@ -701,13 +702,6 @@ ErrorIfRepartitionMergeNotSupported(Oid targetRelationId, Query *mergeQuery, "Postgres table is not yet supported"))); } - queryRteListProperties = GetRTEListPropertiesForQuery(sourceQuery); - if (!queryRteListProperties->hasCitusTable) - { - ereport(ERROR, (errmsg("To MERGE into a distributed table, source must " - "be Citus table(s)"))); - } - /* * Sub-queries and CTEs are not allowed in actions and ON clause */ @@ -722,6 +716,14 @@ ErrorIfRepartitionMergeNotSupported(Oid targetRelationId, Query *mergeQuery, "routable query"))); } + /* + * Sequences are not supported + */ + if (contain_nextval_expression_walker((Node *) mergeQuery, NULL)) + { + ereport(ERROR, (errmsg("Distributed MERGE doesn't support sequences yet"))); + } + MergeAction *action = NULL; foreach_ptr(action, mergeQuery->mergeActionList) { @@ -1076,6 +1078,17 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, "repartitioning"))); return deferredError; } + + /* + * Check for any volatile or stable functions + */ + if (contain_mutable_functions((Node *) query)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "non-IMMUTABLE functions are not routable, " + "try repartitioning", NULL, NULL); + } + return NULL; } @@ -1089,20 +1102,6 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, static void ErrorIfMergeQueryQualAndTargetListNotSupported(Oid targetRelationId, Query *originalQuery) { - /* - * TODO: For now, we are adding an exception where any volatile or stable - * functions are not allowed in the MERGE query, but this will become too - * restrictive as this will prevent many useful and simple cases, such as, - * INSERT VALUES(ts::timestamp), bigserial column inserts etc. But without - * this restriction, we have a potential danger of some of the function(s) - * getting executed at the worker which will result in incorrect behavior. - */ - if (contain_mutable_functions((Node *) originalQuery)) - { - ereport(ERROR, (errmsg("non-IMMUTABLE functions are not yet " - "supported in MERGE sql with distributed tables"))); - } - DeferredErrorMessage *deferredError = MergeQualAndTargetListFunctionsSupported(targetRelationId, originalQuery, @@ -1195,15 +1194,14 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList, Var *sourceRepartitionVar = NULL; OpExpr *validJoinClause = - SinglePartitionJoinClause(list_make1(targetColumn), mergeJoinConditionList); + SinglePartitionJoinClause(list_make1(targetColumn), mergeJoinConditionList, + false); if (!validJoinClause) { ereport(ERROR, (errmsg("The required join operation is missing between " "the target's distribution column and any " "expression originating from the source. The " - "issue may arise from either a non-equi-join or " - "a mismatch in the datatypes of the columns being " - "joined."), + "issue may arise from a non-equi-join."), errdetail("Without a equi-join condition on the target's " "distribution column, the source rows " "cannot be efficiently redistributed, and " diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index 79007b70d..d3fbdfd07 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -999,7 +999,8 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, } OpExpr *joinClause = - SinglePartitionJoinClause(currentPartitionColumnList, applicableJoinClauses); + SinglePartitionJoinClause(currentPartitionColumnList, applicableJoinClauses, + true); if (joinClause != NULL) { if (currentPartitionMethod == DISTRIBUTE_BY_HASH) @@ -1037,7 +1038,8 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, */ List *candidatePartitionColumnList = list_make1(candidatePartitionColumn); joinClause = SinglePartitionJoinClause(candidatePartitionColumnList, - applicableJoinClauses); + applicableJoinClauses, + true); if (joinClause != NULL) { if (candidatePartitionMethod == DISTRIBUTE_BY_HASH) @@ -1078,7 +1080,8 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, * clause exists, the function returns NULL. */ OpExpr * -SinglePartitionJoinClause(List *partitionColumnList, List *applicableJoinClauses) +SinglePartitionJoinClause(List *partitionColumnList, List *applicableJoinClauses, bool + typeMismatchOk) { if (list_length(partitionColumnList) == 0) { @@ -1086,6 +1089,7 @@ SinglePartitionJoinClause(List *partitionColumnList, List *applicableJoinClauses } Var *partitionColumn = NULL; + bool foundTypeMismatch = false; foreach_ptr(partitionColumn, partitionColumnList) { Node *applicableJoinClause = NULL; @@ -1121,11 +1125,22 @@ SinglePartitionJoinClause(List *partitionColumnList, List *applicableJoinClauses { ereport(DEBUG1, (errmsg("single partition column types do not " "match"))); + foundTypeMismatch = true; } } } } + if (foundTypeMismatch && !typeMismatchOk) + { + ereport(ERROR, (errmsg( + "There is a datatype mismatch between target's distribution " + "column and the expression originating from the source."), + errdetail("If the types are different, Citus uses different hash " + "functions for the two column types, which might " + "lead to incorrect repartitioning of the result data"))); + } + return NULL; } diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index fa9e5bb61..0974019dc 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -2140,7 +2140,8 @@ ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, * we introduce a (re-)partition operator for the other column. */ OpExpr *joinClause = SinglePartitionJoinClause(partitionColumnList, - applicableJoinClauses); + applicableJoinClauses, + true); Assert(joinClause != NULL); /* both are verified in SinglePartitionJoinClause to not be NULL, assert is to guard */ diff --git a/src/include/distributed/multi_join_order.h b/src/include/distributed/multi_join_order.h index 4e4ba1dd2..00553f105 100644 --- a/src/include/distributed/multi_join_order.h +++ b/src/include/distributed/multi_join_order.h @@ -99,7 +99,8 @@ extern bool NodeIsEqualsOpExpr(Node *node); extern bool IsSupportedReferenceJoin(JoinType joinType, bool leftIsReferenceTable, bool rightIsReferenceTable); extern OpExpr * SinglePartitionJoinClause(List *partitionColumnList, - List *applicableJoinClauses); + List *applicableJoinClauses, + bool typeMismatchOk); extern OpExpr * DualPartitionJoinClause(List *applicableJoinClauses); extern Var * LeftColumnOrNULL(OpExpr *joinClause); extern Var * RightColumnOrNULL(OpExpr *joinClause); diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 3cb69936c..7568f8d75 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -2355,7 +2355,7 @@ USING source_serial sdn ON sda.id = sdn.id WHEN NOT matched THEN INSERT (id, z) VALUES (id, z); -ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables +ERROR: functions used in MERGE actions on distributed tables must not be VOLATILE SELECT count(*) from source_serial; count --------------------------------------------------------------------- @@ -3065,7 +3065,7 @@ WHEN MATCHED AND t.customer_id = 200 THEN DELETE WHEN NOT MATCHED THEN INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time); -ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from either a non-equi-join or a mismatch in the datatypes of the columns being joined. +ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from a non-equi-join. DETAIL: Without a equi-join condition on the target's distribution column, the source rows cannot be efficiently redistributed, and the NOT-MATCHED condition cannot be evaluated unambiguously. This can result in incorrect or unexpected results when attempting to merge tables in a distributed setting SELECT * FROM target_filter ORDER BY 1, 2; customer_id | last_order_id | order_center | order_count | last_order @@ -3131,9 +3131,106 @@ SELECT * FROM target_6785 ORDER BY 1; 1 | 5 | (1 row) +-- Get the result from Postgres tables +CREATE TABLE dest_tab(tid int, val int, ser bigserial, tm timestamp); +CREATE TABLE src_tab(sid int, val int); +SELECT setval('dest_tab_ser_seq', 1, true); + setval +--------------------------------------------------------------------- + 1 +(1 row) + +INSERT INTO src_tab SELECT i,i FROM generate_series(0,1000)i; +CREATE OR REPLACE FUNCTION src_tab_result() +RETURNS TABLE (id INT, val INT, bs bigint, tm timestamp) AS +$$ +BEGIN + RETURN QUERY SELECT src_tab.*, nextval('dest_tab_ser_seq'::regclass), now()::timestamp FROM src_tab; +END; +$$ +LANGUAGE plpgsql; +MERGE INTO dest_tab t +USING (SELECT * FROM src_tab_result()) s +ON (s.id = t.tid) + WHEN MATCHED THEN + UPDATE SET val = t.val + 1 + WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val, s.bs, s.tm); +-- Save the postgres result +CREATE TABLE postgres_serial AS SELECT MIN(ser), MAX(ser), AVG(ser) FROM dest_tab ; +-- Cleanup the data and reset the sequence +TRUNCATE dest_tab; +TRUNCATE src_tab; +SELECT setval('dest_tab_ser_seq', 1, true); + setval +--------------------------------------------------------------------- + 1 +(1 row) + +INSERT INTO src_tab SELECT i,i FROM generate_series(0,1000)i; +-- Now, make them Citus tables +SELECT create_distributed_table('dest_tab', 'tid'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('src_tab', 'sid', colocate_with => 'dest_tab'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.src_tab$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO dest_tab t +USING (SELECT * FROM src_tab_result()) s +ON (s.id = t.tid) + WHEN MATCHED THEN + UPDATE SET val = t.val + 1 + WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val, s.bs, s.tm); +-- Compare the sequence value +SELECT + ((SELECT MAX(max) FROM postgres_serial) = (SELECT MAX(ser) FROM dest_tab) + AND + (SELECT MIN(min) FROM postgres_serial) = (SELECT MIN(ser) FROM dest_tab) + AND + (SELECT AVG(avg) FROM postgres_serial) = (SELECT AVG(ser) FROM dest_tab)) AS all_equal; + all_equal +--------------------------------------------------------------------- + t +(1 row) + +-- +-- Source without a table +-- +MERGE INTO dest_tab t +USING (VALUES (1, -1), (2, -1), (3, -1)) as s (sid, val) +ON t.tid = s.sid +WHEN MATCHED THEN + UPDATE SET val = s.val +WHEN NOT MATCHED THEN + DO NOTHING; +-- Three rows with 'val = -1' +SELECT COUNT(*) FROM dest_tab where val = -1; + count +--------------------------------------------------------------------- + 3 +(1 row) + -- -- Error and Unsupported scenarios -- +-- Test sequences +MERGE INTO target_serial sda +USING (select id, z, nextval('target_serial_d_seq'::regclass) as ser from source_serial) sdn +ON sda.id = sdn.id +WHEN NOT matched THEN + INSERT (id, z, d) VALUES (id, z, ser); +ERROR: Distributed MERGE doesn't support sequences yet -- Test explain analyze with repartition EXPLAIN ANALYZE MERGE INTO demo_distributed t @@ -3144,15 +3241,6 @@ WHEN MATCHED THEN WHEN NOT MATCHED THEN INSERT VALUES(id2, s3); ERROR: EXPLAIN ANALYZE is currently not supported for MERGE INTO ... commands with repartitioning --- Source without a table -MERGE INTO target_cj t -USING (VALUES (1, 1), (2, 1), (3, 3)) as s (sid, val) -ON t.tid = s.sid AND t.tid = 2 -WHEN MATCHED THEN - UPDATE SET val = s.val -WHEN NOT MATCHED THEN - DO NOTHING; -ERROR: To MERGE into a distributed table, source must be Citus table(s) -- Incomplete source MERGE INTO target_cj t USING (source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = val2) s @@ -3362,7 +3450,7 @@ MERGE INTO t1 USING (SELECT * FROM s1 WHERE true) s1 ON t1.id = s1.id AND s1.id = 2 WHEN matched THEN UPDATE SET id = s1.id, val = random(); -ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables +ERROR: functions used in MERGE actions on distributed tables must not be VOLATILE -- Test STABLE function CREATE FUNCTION add_s(integer, integer) RETURNS integer AS 'select $1 + $2;' @@ -3372,14 +3460,14 @@ MERGE INTO t1 USING s1 ON t1.id = s1.id WHEN NOT MATCHED THEN INSERT VALUES(s1.id, add_s(s1.val, 2)); -ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables +ERROR: STABLE functions used in MERGE queries cannot be called with column references -- Test preventing "ON" join condition from writing to the database BEGIN; MERGE INTO t1 USING s1 ON t1.id = s1.id AND t1.id = 2 AND (merge_when_and_write()) WHEN MATCHED THEN UPDATE SET val = t1.val + s1.val; -ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables +ERROR: functions used in the ON clause of MERGE queries on distributed tables must not be VOLATILE ROLLBACK; -- Test preventing WHEN clause(s) from writing to the database BEGIN; @@ -3387,7 +3475,7 @@ MERGE INTO t1 USING s1 ON t1.id = s1.id AND t1.id = 2 WHEN MATCHED AND (merge_when_and_write()) THEN UPDATE SET val = t1.val + s1.val; -ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables +ERROR: functions used in the WHEN clause of MERGE queries on distributed tables must not be VOLATILE ROLLBACK; -- Joining on non-partition columns with CTE source, but INSERT incorrect column WITH s1_res AS ( @@ -3414,7 +3502,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from either a non-equi-join or a mismatch in the datatypes of the columns being joined. +ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from a non-equi-join. DETAIL: Without a equi-join condition on the target's distribution column, the source rows cannot be efficiently redistributed, and the NOT-MATCHED condition cannot be evaluated unambiguously. This can result in incorrect or unexpected results when attempting to merge tables in a distributed setting -- Join condition without target distribution column WITH s1_res AS ( @@ -3424,7 +3512,7 @@ WITH s1_res AS ( WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from either a non-equi-join or a mismatch in the datatypes of the columns being joined. +ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from a non-equi-join. DETAIL: Without a equi-join condition on the target's distribution column, the source rows cannot be efficiently redistributed, and the NOT-MATCHED condition cannot be evaluated unambiguously. This can result in incorrect or unexpected results when attempting to merge tables in a distributed setting -- -- Reference tables @@ -3633,13 +3721,13 @@ SELECT create_distributed_table('dist_source', 'id', colocate_with => 'none'); (1 row) MERGE INTO dist_target -USING (SELECT 100 id) AS source -ON dist_target.id = source.id AND dist_target.val = 'const' +USING (SELECT 100 id, 101 val) AS source +ON dist_target.id = source.val AND dist_target.val = 'const' WHEN MATCHED THEN UPDATE SET val = 'source' WHEN NOT MATCHED THEN INSERT VALUES(source.id, 'source'); -ERROR: To MERGE into a distributed table, source must be Citus table(s) +ERROR: MERGE INSERT must use the source's joining column for target's distribution column -- Non-hash distributed tables (append/range). CREATE VIEW show_tables AS SELECT logicalrelid, partmethod @@ -3883,21 +3971,21 @@ INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; -- with a colocated table MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b; -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) WHEN MATCHED THEN DELETE; -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan -- with non-colocated single-shard table MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a) @@ -3932,14 +4020,14 @@ DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: Collect source query results on coordinator DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000189 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000189'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) -DEBUG: -DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000190 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000190'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) -DEBUG: -DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000191 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000191'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) -DEBUG: -DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000192 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000192'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) +DEBUG: +DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000197 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000197'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) +DEBUG: +DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000198 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000198'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) +DEBUG: +DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000199 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000199'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) +DEBUG: +DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000200 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000200'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) -- with a reference table MERGE INTO nullkey_c1_t1 USING reference_table ON (nullkey_c1_t1.a = reference_table.a) WHEN MATCHED THEN UPDATE SET b = reference_table.b; @@ -3982,7 +4070,7 @@ WITH cte AS ( ) MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b; -DEBUG: +DEBUG: DEBUG: Creating MERGE router plan WITH cte AS ( SELECT * FROM distributed_table @@ -4031,7 +4119,7 @@ EXPLAIN MERGE INTO demo_distributed t USING demo_source_table s ON (s.id2 + 1 = t.id1) WHEN MATCHED THEN UPDATE SET val1 = 15; -ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from either a non-equi-join or a mismatch in the datatypes of the columns being joined. +ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from a non-equi-join. DETAIL: Without a equi-join condition on the target's distribution column, the source rows cannot be efficiently redistributed, and the NOT-MATCHED condition cannot be evaluated unambiguously. This can result in incorrect or unexpected results when attempting to merge tables in a distributed setting -- Sub-queries and CTEs are not allowed in actions and ON clause CREATE TABLE target_1 (a int, b int, c int); @@ -4136,6 +4224,14 @@ WHEN MATCHED THEN DELETE; ERROR: Sub-queries and CTEs are not allowed in ON clause for MERGE with repartitioning HINT: Consider making the source and target colocated and joined on the distribution column to make it a routable query +-- Datatype mismatch between target and source join column +WITH src AS (SELECT FLOOR(val) AS a FROM src_tab) +MERGE INTO dest_tab t +USING src +ON t.tid = src.a +WHEN MATCHED THEN DELETE; +ERROR: There is a datatype mismatch between target's distribution column and the expression originating from the source. +DETAIL: If the types are different, Citus uses different hash functions for the two column types, which might lead to incorrect repartitioning of the result data RESET client_min_messages; DROP SERVER foreign_server CASCADE; NOTICE: drop cascades to 3 other objects @@ -4147,7 +4243,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_ PL/pgSQL function citus_drop_trigger() line XX at PERFORM DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 103 other objects +NOTICE: drop cascades to 107 other objects DETAIL: drop cascades to function insert_data() drop cascades to table local_local drop cascades to table target @@ -4239,13 +4335,13 @@ drop cascades to table target_filter drop cascades to function load_filter() drop cascades to table source_6785 drop cascades to table target_6785 +drop cascades to table dest_tab +drop cascades to table src_tab +drop cascades to function src_tab_result() +drop cascades to table postgres_serial drop cascades to function add_s(integer,integer) drop cascades to table pg -drop cascades to table t1_4000174 -drop cascades to table s1_4000175 +drop cascades to table t1_4000182 +drop cascades to table s1_4000183 drop cascades to table t1 -drop cascades to table s1 -drop cascades to table dist_target -drop cascades to table dist_source -drop cascades to view show_tables -and 3 other objects (see server log for list) +and 7 other objects (see server log for list) diff --git a/src/test/regress/expected/merge_repartition1.out b/src/test/regress/expected/merge_repartition1.out index 279358e30..7ee2c659f 100644 --- a/src/test/regress/expected/merge_repartition1.out +++ b/src/test/regress/expected/merge_repartition1.out @@ -88,6 +88,14 @@ CREATE OR REPLACE FUNCTION compare_data() RETURNS VOID SET search_path TO merge_ SELECT check_data('pg_target', 'val', 'citus_target', 'val'); $$ LANGUAGE SQL; +CREATE OR REPLACE FUNCTION source_result(table_name text) +RETURNS TABLE (id int, val int, const int) AS +$$ +BEGIN + RETURN QUERY EXECUTE format('SELECT * FROM %I', table_name); +END; +$$ +LANGUAGE plpgsql; -- -- Target and source are distributed, and non-colocated -- @@ -1336,13 +1344,71 @@ SQL function "compare_data" statement 2 (1 row) +-- +-- Target is distributed, and source is a function returning rows +-- +SET client_min_messages TO WARNING; +SELECT cleanup_data(); + cleanup_data +--------------------------------------------------------------------- + +(1 row) + +RESET client_min_messages; +SELECT setup_data(); + setup_data +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('citus_target', 'id'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition1_schema.citus_target$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO pg_target t +USING (SELECT * FROM source_result('pg_source')) s +ON t.id = s.id +WHEN MATCHED AND t.id <= 7500 THEN + UPDATE SET val = s.val + 1 +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +MERGE INTO citus_target t +USING (SELECT * FROM source_result('citus_source')) s +ON t.id = s.id +WHEN MATCHED AND t.id <= 7500 THEN + UPDATE SET val = s.val + 1 +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +SELECT compare_data(); +NOTICE: The average of pg_target.id is equal to citus_target.id +CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE +SQL function "compare_data" statement 1 +NOTICE: The average of pg_target.val is equal to citus_target.val +CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE +SQL function "compare_data" statement 2 + compare_data +--------------------------------------------------------------------- + +(1 row) + DROP SCHEMA merge_repartition1_schema CASCADE; -NOTICE: drop cascades to 8 other objects +NOTICE: drop cascades to 9 other objects DETAIL: drop cascades to table pg_target drop cascades to table pg_source drop cascades to function cleanup_data() drop cascades to function setup_data() drop cascades to function check_data(text,text,text,text) drop cascades to function compare_data() +drop cascades to function source_result(text) drop cascades to table citus_target drop cascades to table citus_source diff --git a/src/test/regress/expected/pg15.out b/src/test/regress/expected/pg15.out index 036f8371f..cb346ff57 100644 --- a/src/test/regress/expected/pg15.out +++ b/src/test/regress/expected/pg15.out @@ -406,7 +406,7 @@ SELECT create_distributed_table('tbl2', 'x'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; -ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from either a non-equi-join or a mismatch in the datatypes of the columns being joined. +ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from a non-equi-join. DETAIL: Without a equi-join condition on the target's distribution column, the source rows cannot be efficiently redistributed, and the NOT-MATCHED condition cannot be evaluated unambiguously. This can result in incorrect or unexpected results when attempting to merge tables in a distributed setting -- also, inside subqueries & ctes WITH targq AS ( @@ -414,7 +414,7 @@ WITH targq AS ( ) MERGE INTO tbl1 USING targq ON (true) WHEN MATCHED THEN DELETE; -ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from either a non-equi-join or a mismatch in the datatypes of the columns being joined. +ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from a non-equi-join. DETAIL: Without a equi-join condition on the target's distribution column, the source rows cannot be efficiently redistributed, and the NOT-MATCHED condition cannot be evaluated unambiguously. This can result in incorrect or unexpected results when attempting to merge tables in a distributed setting WITH foo AS ( MERGE INTO tbl1 USING tbl2 ON (true) @@ -431,7 +431,7 @@ USING tbl2 ON (true) WHEN MATCHED THEN DO NOTHING; -ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from either a non-equi-join or a mismatch in the datatypes of the columns being joined. +ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from a non-equi-join. DETAIL: Without a equi-join condition on the target's distribution column, the source rows cannot be efficiently redistributed, and the NOT-MATCHED condition cannot be evaluated unambiguously. This can result in incorrect or unexpected results when attempting to merge tables in a distributed setting MERGE INTO tbl1 t USING tbl2 diff --git a/src/test/regress/expected/pgmerge.out b/src/test/regress/expected/pgmerge.out index 895bf0680..318448808 100644 --- a/src/test/regress/expected/pgmerge.out +++ b/src/test/regress/expected/pgmerge.out @@ -911,7 +911,7 @@ MERGE INTO wq_target t USING wq_source s ON t.tid = s.sid WHEN MATCHED AND (merge_when_and_write()) THEN UPDATE SET balance = t.balance + s.balance; -ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables +ERROR: functions used in the WHEN clause of MERGE queries on distributed tables must not be VOLATILE ROLLBACK; -- Test preventing ON condition from writing to the database BEGIN; @@ -919,7 +919,7 @@ MERGE INTO wq_target t USING wq_source s ON t.tid = s.sid AND (merge_when_and_write()) WHEN MATCHED THEN UPDATE SET balance = t.balance + s.balance; -ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables +ERROR: functions used in the ON clause of MERGE queries on distributed tables must not be VOLATILE ROLLBACK; drop function merge_when_and_write(); DROP TABLE wq_target, wq_source; diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 4fb911736..b7f8331cb 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1951,10 +1951,84 @@ RESET client_min_messages; SELECT * FROM target_6785 ORDER BY 1; +-- Get the result from Postgres tables +CREATE TABLE dest_tab(tid int, val int, ser bigserial, tm timestamp); +CREATE TABLE src_tab(sid int, val int); + +SELECT setval('dest_tab_ser_seq', 1, true); +INSERT INTO src_tab SELECT i,i FROM generate_series(0,1000)i; + +CREATE OR REPLACE FUNCTION src_tab_result() +RETURNS TABLE (id INT, val INT, bs bigint, tm timestamp) AS +$$ +BEGIN + RETURN QUERY SELECT src_tab.*, nextval('dest_tab_ser_seq'::regclass), now()::timestamp FROM src_tab; +END; +$$ +LANGUAGE plpgsql; + +MERGE INTO dest_tab t +USING (SELECT * FROM src_tab_result()) s +ON (s.id = t.tid) + WHEN MATCHED THEN + UPDATE SET val = t.val + 1 + WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val, s.bs, s.tm); + +-- Save the postgres result +CREATE TABLE postgres_serial AS SELECT MIN(ser), MAX(ser), AVG(ser) FROM dest_tab ; + +-- Cleanup the data and reset the sequence +TRUNCATE dest_tab; +TRUNCATE src_tab; +SELECT setval('dest_tab_ser_seq', 1, true); + +INSERT INTO src_tab SELECT i,i FROM generate_series(0,1000)i; + +-- Now, make them Citus tables +SELECT create_distributed_table('dest_tab', 'tid'); +SELECT create_distributed_table('src_tab', 'sid', colocate_with => 'dest_tab'); + +MERGE INTO dest_tab t +USING (SELECT * FROM src_tab_result()) s +ON (s.id = t.tid) + WHEN MATCHED THEN + UPDATE SET val = t.val + 1 + WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val, s.bs, s.tm); + +-- Compare the sequence value +SELECT + ((SELECT MAX(max) FROM postgres_serial) = (SELECT MAX(ser) FROM dest_tab) + AND + (SELECT MIN(min) FROM postgres_serial) = (SELECT MIN(ser) FROM dest_tab) + AND + (SELECT AVG(avg) FROM postgres_serial) = (SELECT AVG(ser) FROM dest_tab)) AS all_equal; + +-- +-- Source without a table +-- +MERGE INTO dest_tab t +USING (VALUES (1, -1), (2, -1), (3, -1)) as s (sid, val) +ON t.tid = s.sid +WHEN MATCHED THEN + UPDATE SET val = s.val +WHEN NOT MATCHED THEN + DO NOTHING; + +-- Three rows with 'val = -1' +SELECT COUNT(*) FROM dest_tab where val = -1; + -- -- Error and Unsupported scenarios -- +-- Test sequences +MERGE INTO target_serial sda +USING (select id, z, nextval('target_serial_d_seq'::regclass) as ser from source_serial) sdn +ON sda.id = sdn.id +WHEN NOT matched THEN + INSERT (id, z, d) VALUES (id, z, ser); -- Test explain analyze with repartition EXPLAIN ANALYZE @@ -1966,15 +2040,6 @@ WHEN MATCHED THEN WHEN NOT MATCHED THEN INSERT VALUES(id2, s3); --- Source without a table -MERGE INTO target_cj t -USING (VALUES (1, 1), (2, 1), (3, 3)) as s (sid, val) -ON t.tid = s.sid AND t.tid = 2 -WHEN MATCHED THEN - UPDATE SET val = s.val -WHEN NOT MATCHED THEN - DO NOTHING; - -- Incomplete source MERGE INTO target_cj t USING (source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = val2) s @@ -2298,8 +2363,8 @@ CREATE TABLE dist_source(id int, val varchar); SELECT create_distributed_table('dist_source', 'id', colocate_with => 'none'); MERGE INTO dist_target -USING (SELECT 100 id) AS source -ON dist_target.id = source.id AND dist_target.val = 'const' +USING (SELECT 100 id, 101 val) AS source +ON dist_target.id = source.val AND dist_target.val = 'const' WHEN MATCHED THEN UPDATE SET val = 'source' WHEN NOT MATCHED THEN @@ -2586,6 +2651,13 @@ ON (t1.a = t2.a AND (SELECT max(a) > 55 FROM cte_2)) WHEN MATCHED THEN DELETE; +-- Datatype mismatch between target and source join column +WITH src AS (SELECT FLOOR(val) AS a FROM src_tab) +MERGE INTO dest_tab t +USING src +ON t.tid = src.a +WHEN MATCHED THEN DELETE; + RESET client_min_messages; DROP SERVER foreign_server CASCADE; DROP FUNCTION merge_when_and_write(); diff --git a/src/test/regress/sql/merge_repartition1.sql b/src/test/regress/sql/merge_repartition1.sql index 858f4710c..33215ccf3 100644 --- a/src/test/regress/sql/merge_repartition1.sql +++ b/src/test/regress/sql/merge_repartition1.sql @@ -80,6 +80,15 @@ CREATE OR REPLACE FUNCTION compare_data() RETURNS VOID SET search_path TO merge_ $$ LANGUAGE SQL; +CREATE OR REPLACE FUNCTION source_result(table_name text) +RETURNS TABLE (id int, val int, const int) AS +$$ +BEGIN + RETURN QUERY EXECUTE format('SELECT * FROM %I', table_name); +END; +$$ +LANGUAGE plpgsql; + -- -- Target and source are distributed, and non-colocated -- @@ -577,4 +586,35 @@ WHEN NOT MATCHED THEN SELECT compare_data(); +-- +-- Target is distributed, and source is a function returning rows +-- +SET client_min_messages TO WARNING; +SELECT cleanup_data(); +RESET client_min_messages; +SELECT setup_data(); +SELECT create_distributed_table('citus_target', 'id'); + +MERGE INTO pg_target t +USING (SELECT * FROM source_result('pg_source')) s +ON t.id = s.id +WHEN MATCHED AND t.id <= 7500 THEN + UPDATE SET val = s.val + 1 +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +MERGE INTO citus_target t +USING (SELECT * FROM source_result('citus_source')) s +ON t.id = s.id +WHEN MATCHED AND t.id <= 7500 THEN + UPDATE SET val = s.val + 1 +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +SELECT compare_data(); + DROP SCHEMA merge_repartition1_schema CASCADE;