diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index e79223816..e8dadb35d 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -346,9 +346,7 @@ static LocalCopyStatus GetLocalCopyStatus(void); static bool ShardIntervalListHasLocalPlacements(List *shardIntervalList); static void LogLocalCopyToRelationExecution(uint64 shardId); static void LogLocalCopyToFileExecution(uint64 shardId); -#if PG_VERSION_NUM >= PG_VERSION_15 static void ErrorIfMergeInCopy(CopyStmt *copyStatement); -#endif /* exports for SQL callable functions */ @@ -2831,23 +2829,22 @@ CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName) } -#if PG_VERSION_NUM >= PG_VERSION_15 - /* * ErrorIfMergeInCopy Raises an exception if the MERGE is called in the COPY. */ static void ErrorIfMergeInCopy(CopyStmt *copyStatement) { +#if PG_VERSION_NUM < 150000 + return; +#else if (!copyStatement->relation && (IsA(copyStatement->query, MergeStmt))) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("MERGE not supported in COPY"))); } -} - - #endif +} /* @@ -2860,9 +2857,7 @@ Node * ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletion *completionTag, const char *queryString) { - #if PG_VERSION_NUM >= PG_VERSION_15 ErrorIfMergeInCopy(copyStatement); - #endif /* * Handle special COPY "resultid" FROM STDIN WITH (format result) commands diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 730c53d39..cf3f13eab 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -98,6 +98,7 @@ static DistributedPlan * CreateNonPushableMergePlan(Oid targetRelationId, uint64 ParamListInfo boundParams); static char * MergeCommandResultIdPrefix(uint64 planId); static void ErrorIfMergeHasReturningList(Query *query); +static Node * GetMergeJoinCondition(Query *mergeQuery); #endif @@ -158,33 +159,49 @@ CreateMergePlan(uint64 planId, Query *originalQuery, Query *query, /* - * GetMergeJoinConditionList returns all the Join conditions from the ON clause + * GetMergeJoinTree constructs and returns the jointree for a MERGE query. */ -List * -GetMergeJoinConditionList(Query *mergeQuery) +FromExpr * +GetMergeJoinTree(Query *mergeQuery) { + FromExpr *mergeJointree = NULL; #if PG_VERSION_NUM >= PG_VERSION_17 - List *mergeJoinConditionList = NIL; - if (IsA(mergeQuery->mergeJoinCondition, List)) - { - mergeJoinConditionList = (List *) mergeQuery->mergeJoinCondition; - } - else - { - Node *joinClause = - eval_const_expressions(NULL, mergeQuery->mergeJoinCondition); - joinClause = (Node *) canonicalize_qual((Expr *) joinClause, false); - mergeJoinConditionList = make_ands_implicit((Expr *) joinClause); - } + + /* + * In Postgres 17, the query tree has a specific field for the merge condition. + * For deriving the WhereClauseList from the merge condition, we construct a dummy + * jointree with an empty fromlist. This works because the fromlist of a merge query + * join tree consists of range table references only, and range table references are + * disregarded by the WhereClauseList() walker. + */ + mergeJointree = makeFromExpr(NIL, mergeQuery->mergeJoinCondition); #else - List *mergeJoinConditionList = WhereClauseList(mergeQuery->jointree); + mergeJointree = mergeQuery->jointree; #endif - return mergeJoinConditionList; + + return mergeJointree; } #if PG_VERSION_NUM >= PG_VERSION_15 + +/* + * GetMergeJoinCondition returns the quals of the ON condition + */ +static Node * +GetMergeJoinCondition(Query *mergeQuery) +{ + Node *joinCondition = NULL; +#if PG_VERSION_NUM >= PG_VERSION_17 + joinCondition = (Node *) mergeQuery->mergeJoinCondition; +#else + joinCondition = (Node *) mergeQuery->jointree->quals; +#endif + return joinCondition; +} + + /* * CreateRouterMergePlan attempts to create a pushable plan for the given MERGE * SQL statement. If the planning fails, the ->planningError is set to a description @@ -589,6 +606,7 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, Query *query, List *targetList, CmdType commandType) { uint32 targetRangeTableIndex = query->resultRelation; + FromExpr *joinTree = GetMergeJoinTree(query); Var *distributionColumn = NULL; if (IsCitusTable(resultRelationId) && HasDistributionKey(resultRelationId)) { @@ -626,8 +644,7 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, Query *query, } if (targetEntryDistributionColumn && - TargetEntryChangesValue(targetEntry, distributionColumn, (Node *) query, - CMD_MERGE)) + TargetEntryChangesValue(targetEntry, distributionColumn, joinTree)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "updating the distribution column is not " @@ -749,11 +766,7 @@ ErrorIfRepartitionMergeNotSupported(Oid targetRelationId, Query *mergeQuery, /* * Sub-queries and CTEs are not allowed in actions and ON clause */ -#if PG_VERSION_NUM >= PG_VERSION_17 - Node *joinCondition = (Node *) mergeQuery->mergeJoinCondition; -#else - Node *joinCondition = (Node *) mergeQuery->jointree->quals; -#endif + Node *joinCondition = GetMergeJoinCondition(mergeQuery); if (FindNodeMatchingCheckFunction(joinCondition, IsNodeSubquery)) { @@ -1255,11 +1268,7 @@ ErrorIfMergeQueryQualAndTargetListNotSupported(Oid targetRelationId, Query *orig "supported in MERGE sql with distributed tables"))); } -#if PG_VERSION_NUM >= PG_VERSION_17 - Node *joinCondition = (Node *) originalQuery->mergeJoinCondition; -#else - Node *joinCondition = (Node *) originalQuery->jointree->quals; -#endif + Node *joinCondition = GetMergeJoinCondition(originalQuery); DeferredErrorMessage *deferredError = MergeQualAndTargetListFunctionsSupported( @@ -1341,7 +1350,7 @@ static int SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList, CitusTableCacheEntry *targetRelation) { - List *mergeJoinConditionList = GetMergeJoinConditionList(mergeQuery); + List *mergeJoinConditionList = WhereClauseList(GetMergeJoinTree(mergeQuery)); Var *targetColumn = targetRelation->partitionColumn; Var *sourceRepartitionVar = NULL; bool foundTypeMismatch = false; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index ee88bb551..72d1dddd1 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -597,9 +597,8 @@ TargetlistAndFunctionsSupported(Oid resultRelationId, FromExpr *joinTree, Node * NULL, NULL); } - if (targetEntryPartitionColumn && - TargetEntryChangesValue(targetEntry, partitionColumn, - (Node *) joinTree, commandType)) + if (commandType == CMD_UPDATE && targetEntryPartitionColumn && + TargetEntryChangesValue(targetEntry, partitionColumn, joinTree)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "modifying the partition value of rows is not " @@ -1611,15 +1610,8 @@ MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context) * tree, or the target entry sets a different column. */ bool -TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, Node *node, CmdType - commandType) +TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTree) { - if (commandType != CMD_UPDATE && commandType != CMD_MERGE) - { - /* no-op */ - return false; - } - bool isColumnValueChanged = true; Expr *setExpr = targetEntry->expr; @@ -1635,6 +1627,7 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, Node *node, CmdTy else if (IsA(setExpr, Const)) { Const *newValue = (Const *) setExpr; + List *restrictClauseList = WhereClauseList(joinTree); OpExpr *equalityExpr = MakeOpExpression(column, BTEqualStrategyNumber); Node *rightOp = get_rightop((Expr *) equalityExpr); @@ -1646,16 +1639,6 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, Node *node, CmdTy rightConst->constisnull = newValue->constisnull; rightConst->constbyval = newValue->constbyval; - List *restrictClauseList = NIL; - if (commandType == CMD_UPDATE) - { - restrictClauseList = WhereClauseList((FromExpr *) node); - } - else - { - restrictClauseList = GetMergeJoinConditionList((Query *) node); - } - bool predicateIsImplied = predicate_implied_by(list_make1(equalityExpr), restrictClauseList, false); if (predicateIsImplied) diff --git a/src/include/distributed/merge_planner.h b/src/include/distributed/merge_planner.h index a1470caa7..53d451ea6 100644 --- a/src/include/distributed/merge_planner.h +++ b/src/include/distributed/merge_planner.h @@ -32,7 +32,7 @@ extern void NonPushableMergeCommandExplainScan(CustomScanState *node, List *ance struct ExplainState *es); extern Var * FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query); extern RangeTblEntry * ExtractMergeSourceRangeTableEntry(Query *query, bool joinSourceOk); -extern List * GetMergeJoinConditionList(Query *mergeQuery); +extern FromExpr * GetMergeJoinTree(Query *mergeQuery); #endif /* MERGE_PLANNER_H */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index b80b10530..ae75ee631 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -111,7 +111,7 @@ extern DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelation List *returningList); extern bool NodeIsFieldStore(Node *node); extern bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, - Node *node, CmdType commandType); + FromExpr *joinTree); extern bool MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce); extern bool HasDangerousJoinUsing(List *rtableList, Node *jtnode); diff --git a/src/test/regress/expected/merge_unsupported_0.out b/src/test/regress/expected/merge_unsupported_0.out index a7e3fbf20..8b26f4ae9 100644 --- a/src/test/regress/expected/merge_unsupported_0.out +++ b/src/test/regress/expected/merge_unsupported_0.out @@ -4,3 +4,102 @@ SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 \if :server_version_ge_15 \else \q +\endif +-- +-- MERGE test from PG community (adapted to Citus by converting all tables to Citus local) +-- +DROP SCHEMA IF EXISTS pgmerge_schema CASCADE; +NOTICE: schema "pgmerge_schema" does not exist, skipping +CREATE SCHEMA pgmerge_schema; +SET search_path TO pgmerge_schema; +SET citus.use_citus_managed_tables to true; +DROP TABLE IF EXISTS target; +NOTICE: table "target" does not exist, skipping +DROP TABLE IF EXISTS source; +NOTICE: table "source" does not exist, skipping +CREATE TABLE target (tid integer, balance integer) + WITH (autovacuum_enabled=off); +CREATE TABLE source (sid integer, delta integer) -- no index + WITH (autovacuum_enabled=off); +SELECT citus_add_local_table_to_metadata('target'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('source'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +\set SHOW_CONTEXT errors +-- used in a CTE +WITH foo AS ( + MERGE INTO target USING source ON (true) + WHEN MATCHED THEN DELETE +) SELECT * FROM foo; +ERROR: MERGE not supported in WITH query +-- used in COPY +COPY ( + MERGE INTO target USING source ON (true) + WHEN MATCHED THEN DELETE +) TO stdout; +ERROR: MERGE not supported in COPY +-- used in a CTE with RETURNING +WITH foo AS ( + MERGE INTO target USING source ON (true) + WHEN MATCHED THEN DELETE RETURNING target.* +) SELECT * FROM foo; +ERROR: syntax error at or near "RETURNING" +-- used in COPY with RETURNING +COPY ( + MERGE INTO target USING source ON (true) + WHEN MATCHED THEN DELETE RETURNING target.* +) TO stdout; +ERROR: syntax error at or near "RETURNING" +-- unsupported relation types +-- view +CREATE VIEW tv AS SELECT count(tid) AS tid FROM target; +MERGE INTO tv t +USING source s +ON t.tid = s.sid +WHEN NOT MATCHED THEN + INSERT DEFAULT VALUES; +ERROR: cannot execute MERGE on relation "tv" +DETAIL: This operation is not supported for views. +DROP VIEW tv; +CREATE TABLE sq_target (tid integer NOT NULL, balance integer) + WITH (autovacuum_enabled=off); +CREATE TABLE sq_source (delta integer, sid integer, balance integer DEFAULT 0) + WITH (autovacuum_enabled=off); +SELECT citus_add_local_table_to_metadata('sq_target'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('sq_source'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO sq_target(tid, balance) VALUES (1,100), (2,200), (3,300); +INSERT INTO sq_source(sid, delta) VALUES (1,10), (2,20), (4,40); +CREATE VIEW v AS SELECT * FROM sq_source WHERE sid < 2; +-- RETURNING +BEGIN; +INSERT INTO sq_source (sid, balance, delta) VALUES (-1, -1, -10); +MERGE INTO sq_target t +USING v +ON tid = sid +WHEN MATCHED AND tid > 2 THEN + UPDATE SET balance = t.balance + delta +WHEN NOT MATCHED THEN + INSERT (balance, tid) VALUES (balance + delta, sid) +WHEN MATCHED AND tid < 2 THEN + DELETE +RETURNING *; +ERROR: syntax error at or near "RETURNING" +ROLLBACK; diff --git a/src/test/regress/expected/merge_unsupported_1.out b/src/test/regress/expected/merge_unsupported_1.out new file mode 100644 index 000000000..a7e3fbf20 --- /dev/null +++ b/src/test/regress/expected/merge_unsupported_1.out @@ -0,0 +1,6 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q diff --git a/src/test/regress/expected/pg15.out b/src/test/regress/expected/pg15.out index c06142671..9f571e40a 100644 --- a/src/test/regress/expected/pg15.out +++ b/src/test/regress/expected/pg15.out @@ -437,11 +437,6 @@ MERGE INTO tbl1 USING targq ON (true) WHEN MATCHED THEN DELETE; ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from a non-equi-join. DETAIL: Without a equi-join condition on the target's distribution column, the source rows cannot be efficiently redistributed, and the NOT-MATCHED condition cannot be evaluated unambiguously. This can result in incorrect or unexpected results when attempting to merge tables in a distributed setting -WITH foo AS ( - MERGE INTO tbl1 USING tbl2 ON (true) - WHEN MATCHED THEN DELETE -) SELECT * FROM foo; -ERROR: MERGE not supported in WITH query COPY ( MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE diff --git a/src/test/regress/sql/pg15.sql b/src/test/regress/sql/pg15.sql index 0d53bc9dd..25ada1cc2 100644 --- a/src/test/regress/sql/pg15.sql +++ b/src/test/regress/sql/pg15.sql @@ -287,11 +287,6 @@ WITH targq AS ( MERGE INTO tbl1 USING targq ON (true) WHEN MATCHED THEN DELETE; -WITH foo AS ( - MERGE INTO tbl1 USING tbl2 ON (true) - WHEN MATCHED THEN DELETE -) SELECT * FROM foo; - COPY ( MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE