From a482b36760d6ca60ec8ed33b3d82dfa7d532c9af Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 30 Jan 2023 17:01:59 +0300 Subject: [PATCH] Revert "Support MERGE on distributed tables with restrictions" (#6675) Co-authored-by: Marco Slot --- .../distributed/deparser/ruleutils_15.c | 13 +- .../distributed/planner/distributed_planner.c | 194 ++++- .../planner/fast_path_router_planner.c | 4 +- .../planner/multi_physical_planner.c | 73 +- .../planner/multi_router_planner.c | 698 ++++-------------- .../relation_restriction_equivalence.c | 31 +- src/include/distributed/distributed_planner.h | 4 - .../distributed/multi_router_planner.h | 1 - .../relation_restriction_equivalence.h | 11 - src/test/regress/expected/merge.out | 690 +---------------- .../regress/expected/multi_modifications.out | 2 +- .../expected/multi_mx_modifications.out | 2 +- .../expected/multi_shard_update_delete.out | 2 +- src/test/regress/expected/pg15.out | 10 +- src/test/regress/expected/pgmerge.out | 10 +- src/test/regress/sql/merge.sql | 350 +-------- src/test/regress/sql/pgmerge.sql | 8 - 17 files changed, 384 insertions(+), 1719 deletions(-) diff --git a/src/backend/distributed/deparser/ruleutils_15.c b/src/backend/distributed/deparser/ruleutils_15.c index 139b2a3fd..6dabacd49 100644 --- a/src/backend/distributed/deparser/ruleutils_15.c +++ b/src/backend/distributed/deparser/ruleutils_15.c @@ -53,7 +53,6 @@ #include "common/keywords.h" #include "distributed/citus_nodefuncs.h" #include "distributed/citus_ruleutils.h" -#include "distributed/multi_router_planner.h" #include "executor/spi.h" #include "foreign/foreign.h" #include "funcapi.h" @@ -3724,6 +3723,7 @@ static void get_merge_query_def(Query *query, deparse_context *context) { StringInfo buf = context->buf; + RangeTblEntry *targetRte; /* Insert the WITH clause if given */ get_with_clause(query, context); @@ -3731,7 +3731,7 @@ get_merge_query_def(Query *query, deparse_context *context) /* * Start the query with MERGE INTO */ - RangeTblEntry *targetRte = ExtractResultRelationRTE(query); + targetRte = rt_fetch(query->resultRelation, query->rtable); if (PRETTY_INDENT(context)) { @@ -3853,15 +3853,6 @@ get_merge_query_def(Query *query, deparse_context *context) } } - /* - * RETURNING is not supported in MERGE, so it must be NULL, but if PG adds it later - * we might miss it, let's raise an exception to investigate. - */ - if (unlikely(query->returningList)) - { - elog(ERROR, "Unexpected RETURNING clause in MERGE"); - } - ereport(DEBUG1, (errmsg("", buf->data))); } diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 262258d7f..701ae4ff5 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -75,6 +75,9 @@ static uint64 NextPlanId = 1; /* keep track of planner call stack levels */ int PlannerLevel = 0; +static void ErrorIfQueryHasUnsupportedMergeCommand(Query *queryTree, + List *rangeTableList); +static bool ContainsMergeCommandWalker(Node *node); static bool ListContainsDistributedTableRTE(List *rangeTableList, bool *maybeHasForeignDistributedTable); static bool IsUpdateOrDelete(Query *query); @@ -129,7 +132,7 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext static RTEListProperties * GetRTEListProperties(List *rangeTableList); static List * TranslatedVars(PlannerInfo *root, int relationIndex); static void WarnIfListHasForeignDistributedTable(List *rangeTableList); - +static void ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList); /* Distributed planner hook */ PlannedStmt * @@ -197,6 +200,12 @@ distributed_planner(Query *parse, if (!fastPathRouterQuery) { + /* + * Fast path queries cannot have merge command, and we + * prevent the remaining here. + */ + ErrorIfQueryHasUnsupportedMergeCommand(parse, rangeTableList); + /* * When there are partitioned tables (not applicable to fast path), * pretend that they are regular tables to avoid unnecessary work @@ -295,11 +304,44 @@ distributed_planner(Query *parse, } +/* + * ErrorIfQueryHasUnsupportedMergeCommand walks over the query tree and bails out + * if there is no Merge command (e.g., CMD_MERGE) in the query tree. For merge, + * looks for all supported combinations, throws an exception if any violations + * are seen. + */ +static void +ErrorIfQueryHasUnsupportedMergeCommand(Query *queryTree, List *rangeTableList) +{ + /* + * Postgres currently doesn't support Merge queries inside subqueries and + * ctes, but lets be defensive and do query tree walk anyway. + * + * We do not call this path for fast-path queries to avoid this additional + * overhead. + */ + if (!ContainsMergeCommandWalker((Node *) queryTree)) + { + /* No MERGE found */ + return; + } + + + /* + * In Citus we have limited support for MERGE, it's allowed + * only if all the tables(target, source or any CTE) tables + * are are local i.e. a combination of Citus local and Non-Citus + * tables (regular Postgres tables). + */ + ErrorIfMergeHasUnsupportedTables(queryTree, rangeTableList); +} + + /* * ContainsMergeCommandWalker walks over the node and finds if there are any * Merge command (e.g., CMD_MERGE) in the node. */ -bool +static bool ContainsMergeCommandWalker(Node *node) { #if PG_VERSION_NUM < PG_VERSION_15 @@ -634,8 +676,7 @@ bool IsUpdateOrDelete(Query *query) { return query->commandType == CMD_UPDATE || - query->commandType == CMD_DELETE || - query->commandType == CMD_MERGE; + query->commandType == CMD_DELETE; } @@ -2570,3 +2611,148 @@ WarnIfListHasForeignDistributedTable(List *rangeTableList) } } } + + +/* + * IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is + * permitted on special relations, such as materialized view, returns true only if + * it's a "source" relation. + */ +bool +IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte) +{ + if (!IsMergeQuery(parse)) + { + return false; + } + + RangeTblEntry *targetRte = rt_fetch(parse->resultRelation, parse->rtable); + + /* Is it a target relation? */ + if (targetRte->relid == rte->relid) + { + return false; + } + + return true; +} + + +/* + * ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE + * present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus + * tables (regular Postgres tables), raises an exception for all other combinations. + */ +static void +ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList) +{ + ListCell *tableCell = NULL; + + foreach(tableCell, rangeTableList) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(tableCell); + Oid relationId = rangeTableEntry->relid; + + switch (rangeTableEntry->rtekind) + { + case RTE_RELATION: + { + /* Check the relation type */ + break; + } + + case RTE_SUBQUERY: + case RTE_FUNCTION: + case RTE_TABLEFUNC: + case RTE_VALUES: + case RTE_JOIN: + case RTE_CTE: + { + /* Skip them as base table(s) will be checked */ + continue; + } + + /* + * RTE_NAMEDTUPLESTORE is typically used in ephmeral named relations, + * such as, trigger data; until we find a genuine use case, raise an + * exception. + * RTE_RESULT is a node added by the planner and we shouldn't + * encounter it in the parse tree. + */ + case RTE_NAMEDTUPLESTORE: + case RTE_RESULT: + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("MERGE command is not supported with " + "Tuplestores and results"))); + break; + } + + default: + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("MERGE command: Unrecognized range table entry."))); + } + } + + /* RTE Relation can be of various types, check them now */ + + /* skip the regular views as they are replaced with subqueries */ + if (rangeTableEntry->relkind == RELKIND_VIEW) + { + continue; + } + + if (rangeTableEntry->relkind == RELKIND_MATVIEW || + rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE) + { + /* Materialized view or Foreign table as target is not allowed */ + if (IsMergeAllowedOnRelation(parse, rangeTableEntry)) + { + /* Non target relation is ok */ + continue; + } + else + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("MERGE command is not allowed " + "on materialized view"))); + } + } + + if (rangeTableEntry->relkind != RELKIND_RELATION && + rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Unexpected relation type(relkind:%c) in MERGE command", + rangeTableEntry->relkind))); + } + + Assert(rangeTableEntry->relid != 0); + + /* Distributed tables and Reference tables are not supported yet */ + if (IsCitusTableType(relationId, REFERENCE_TABLE) || + IsCitusTableType(relationId, DISTRIBUTED_TABLE)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("MERGE command is not supported on " + "distributed/reference tables yet"))); + } + + /* Regular Postgres tables and Citus local tables are allowed */ + if (!IsCitusTable(relationId) || + IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) + { + continue; + } + + + /* Any other Citus table type missing ? */ + } + + /* All the tables are local, supported */ +} diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index f585e2494..b947c036f 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -54,6 +54,8 @@ bool EnableFastPathRouterPlanner = true; static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey); +static bool ConjunctionContainsColumnFilter(Node *node, Var *column, + Node **distributionKeyValue); static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, Node **distributionKeyValue); @@ -292,7 +294,7 @@ ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey) * * If the conjuction contains column filter which is const, distributionKeyValue is set. */ -bool +static bool ConjunctionContainsColumnFilter(Node *node, Var *column, Node **distributionKeyValue) { if (node == NULL) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 6e237b546..a2590d48d 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -164,7 +164,6 @@ static uint32 HashPartitionCount(void); static Job * BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestrictionContext); static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction); -static bool IsOuterTableOfOuterJoin(RelationRestriction *relationRestriction); static void ErrorIfUnsupportedShardDistribution(Query *query); static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, RelationRestrictionContext *restrictionContext, @@ -2226,34 +2225,19 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, } /* - * Skip adding shards of non-target (outer)relations. - * Note: This is a stop-gap arrangement for phase-I where in sql - * generates a single task on the shard identified by constant - * qual(filter) on the target relation. + * For left joins we don't care about the shards pruned for the right hand side. + * If the right hand side would prune to a smaller set we should still send it to + * all tables of the left hand side. However if the right hand side is bigger than + * the left hand side we don't have to send the query to any shard that is not + * matching anything on the left hand side. + * + * Instead we will simply skip any RelationRestriction if it is an OUTER join and + * the table is part of the non-outer side of the join. */ - if (IsMergeQuery(query) && - IsOuterTableOfOuterJoin(relationRestriction)) + if (IsInnerTableOfOuterJoin(relationRestriction)) { continue; } - else if (!IsMergeQuery(query) && - IsInnerTableOfOuterJoin(relationRestriction)) - { - /* - * For left joins we don't care about the shards pruned for - * the right hand side. If the right hand side would prune - * to a smaller set we should still send it to all tables - * of the left hand side. However if the right hand side is - * bigger than the left hand side we don't have to send the - * query to any shard that is not matching anything on the - * left hand side. - * - * Instead we will simply skip any RelationRestriction if it - * is an OUTER join and the table is part of the non-outer - * side of the join. - */ - continue; - } ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, prunedShardList) @@ -2318,45 +2302,6 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, } -/* - * IsOuterTableOfOuterJoin tests based on the join information envoded in a - * RelationRestriction if the table accessed for this relation is - * a) in an outer join - * b) on the outer part of said join - * - * The function returns true only if both conditions above hold true - */ -static bool -IsOuterTableOfOuterJoin(RelationRestriction *relationRestriction) -{ - RestrictInfo *joinInfo = NULL; - foreach_ptr(joinInfo, relationRestriction->relOptInfo->joininfo) - { - if (joinInfo->outer_relids == NULL) - { - /* not an outer join */ - continue; - } - - /* - * This join restriction info describes an outer join, we need to figure out if - * our table is in the outer part of this join. If that is the case this is a - * outer table of an outer join. - */ - bool isInOuter = bms_is_member(relationRestriction->relOptInfo->relid, - joinInfo->outer_relids); - if (isInOuter) - { - /* this table is joined in the outer part of an outer join */ - return true; - } - } - - /* we have not found any join clause that satisfies both requirements */ - return false; -} - - /* * IsInnerTableOfOuterJoin tests based on the join information envoded in a * RelationRestriction if the table accessed for this relation is diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 16cf7926b..631322e80 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -121,6 +121,7 @@ static void CreateSingleTaskRouterSelectPlan(DistributedPlan *distributedPlan, Query *query, PlannerRestrictionContext * plannerRestrictionContext); +static Oid ResultRelationOidForQuery(Query *query); static bool IsTidColumn(Node *node); static DeferredErrorMessage * ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, @@ -179,24 +180,6 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList); static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList); static bool IsLocallyAccessibleCitusLocalTable(Oid relationId); -static bool QueryHasMergeCommand(Query *queryTree); -static DeferredErrorMessage * MergeQuerySupported(Query *originalQuery, - PlannerRestrictionContext * - plannerRestrictionContext); -static DeferredErrorMessage * ErrorIfMergeHasUnsupportedTables(Query *parse, - List *rangeTableList, - PlannerRestrictionContext * - restrictionContext); -static DeferredErrorMessage * ErrorIfDistTablesNotColocated(Query *parse, - List *distTablesList, - PlannerRestrictionContext * - plannerRestrictionContext); -static DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId, - FromExpr *joinTree, - Node *quals, - List *targetList, - CmdType commandType, - List *returningList); /* @@ -462,7 +445,7 @@ ModifyQueryResultRelationId(Query *query) * ResultRelationOidForQuery returns the OID of the relation this is modified * by a given query. */ -Oid +static Oid ResultRelationOidForQuery(Query *query) { RangeTblEntry *resultRTE = rt_fetch(query->resultRelation, query->rtable); @@ -529,161 +512,6 @@ IsTidColumn(Node *node) } -/* - * TargetlistAndFunctionsSupported implements a subset of what ModifyPartialQuerySupported - * checks, that subset being checking what functions are allowed, if we are - * updating distribution column, etc. - * Note: This subset of checks are repeated for each MERGE modify action. - */ -static DeferredErrorMessage * -TargetlistAndFunctionsSupported(Oid resultRelationId, FromExpr *joinTree, Node *quals, - List *targetList, - CmdType commandType, List *returningList) -{ - uint32 rangeTableId = 1; - Var *partitionColumn = NULL; - - if (IsCitusTable(resultRelationId)) - { - partitionColumn = PartitionColumn(resultRelationId, rangeTableId); - } - - bool hasVarArgument = false; /* A STABLE function is passed a Var argument */ - bool hasBadCoalesce = false; /* CASE/COALESCE passed a mutable function */ - ListCell *targetEntryCell = NULL; - - foreach(targetEntryCell, targetList) - { - TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - - /* skip resjunk entries: UPDATE adds some for ctid, etc. */ - if (targetEntry->resjunk) - { - continue; - } - - bool targetEntryPartitionColumn = false; - AttrNumber targetColumnAttrNumber = InvalidAttrNumber; - - /* reference tables do not have partition column */ - if (partitionColumn == NULL) - { - targetEntryPartitionColumn = false; - } - else - { - if (commandType == CMD_UPDATE) - { - /* - * Note that it is not possible to give an alias to - * UPDATE table SET ... - */ - if (targetEntry->resname) - { - targetColumnAttrNumber = get_attnum(resultRelationId, - targetEntry->resname); - if (targetColumnAttrNumber == partitionColumn->varattno) - { - targetEntryPartitionColumn = true; - } - } - } - } - - - if (commandType == CMD_UPDATE && - FindNodeMatchingCheckFunction((Node *) targetEntry->expr, - CitusIsVolatileFunction)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "functions used in UPDATE queries on distributed " - "tables must not be VOLATILE", - NULL, NULL); - } - - if (commandType == CMD_UPDATE && targetEntryPartitionColumn && - TargetEntryChangesValue(targetEntry, partitionColumn, - joinTree)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "modifying the partition value of rows is not " - "allowed", - NULL, NULL); - } - - if (commandType == CMD_UPDATE && - MasterIrreducibleExpression((Node *) targetEntry->expr, - &hasVarArgument, &hasBadCoalesce)) - { - Assert(hasVarArgument || hasBadCoalesce); - } - - if (FindNodeMatchingCheckFunction((Node *) targetEntry->expr, - NodeIsFieldStore)) - { - /* DELETE cannot do field indirection already */ - Assert(commandType == CMD_UPDATE || commandType == CMD_INSERT); - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "inserting or modifying composite type fields is not " - "supported", NULL, - "Use the column name to insert or update the composite " - "type as a single value"); - } - } - - if (joinTree != NULL) - { - if (FindNodeMatchingCheckFunction((Node *) quals, - CitusIsVolatileFunction)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "functions used in the WHERE/ON/WHEN clause of modification " - "queries on distributed tables must not be VOLATILE", - NULL, NULL); - } - else if (MasterIrreducibleExpression(quals, &hasVarArgument, - &hasBadCoalesce)) - { - Assert(hasVarArgument || hasBadCoalesce); - } - } - - if (hasVarArgument) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "STABLE functions used in UPDATE queries " - "cannot be called with column references", - NULL, NULL); - } - - if (hasBadCoalesce) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "non-IMMUTABLE functions are not allowed in CASE or " - "COALESCE statements", - NULL, NULL); - } - - if (contain_mutable_functions((Node *) returningList)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "non-IMMUTABLE functions are not allowed in the " - "RETURNING clause", - NULL, NULL); - } - - if (quals != NULL && - nodeTag(quals) == T_CurrentOfExpr) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot run DML queries with cursors", NULL, - NULL); - } - - return NULL; -} - - /* * ModifyPartialQuerySupported implements a subset of what ModifyQuerySupported checks, * that subset being what's necessary to check modifying CTEs for. @@ -792,21 +620,148 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, Oid resultRelationId = ModifyQueryResultRelationId(queryTree); *distributedTableIdOutput = resultRelationId; + uint32 rangeTableId = 1; + Var *partitionColumn = NULL; + if (IsCitusTable(resultRelationId)) + { + partitionColumn = PartitionColumn(resultRelationId, rangeTableId); + } commandType = queryTree->commandType; if (commandType == CMD_INSERT || commandType == CMD_UPDATE || commandType == CMD_DELETE) { - deferredError = - TargetlistAndFunctionsSupported(resultRelationId, - queryTree->jointree, - queryTree->jointree->quals, - queryTree->targetList, - commandType, - queryTree->returningList); - if (deferredError) + bool hasVarArgument = false; /* A STABLE function is passed a Var argument */ + bool hasBadCoalesce = false; /* CASE/COALESCE passed a mutable function */ + FromExpr *joinTree = queryTree->jointree; + ListCell *targetEntryCell = NULL; + + foreach(targetEntryCell, queryTree->targetList) { - return deferredError; + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + + /* skip resjunk entries: UPDATE adds some for ctid, etc. */ + if (targetEntry->resjunk) + { + continue; + } + + bool targetEntryPartitionColumn = false; + AttrNumber targetColumnAttrNumber = InvalidAttrNumber; + + /* reference tables do not have partition column */ + if (partitionColumn == NULL) + { + targetEntryPartitionColumn = false; + } + else + { + if (commandType == CMD_UPDATE) + { + /* + * Note that it is not possible to give an alias to + * UPDATE table SET ... + */ + if (targetEntry->resname) + { + targetColumnAttrNumber = get_attnum(resultRelationId, + targetEntry->resname); + if (targetColumnAttrNumber == partitionColumn->varattno) + { + targetEntryPartitionColumn = true; + } + } + } + } + + + if (commandType == CMD_UPDATE && + FindNodeMatchingCheckFunction((Node *) targetEntry->expr, + CitusIsVolatileFunction)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "functions used in UPDATE queries on distributed " + "tables must not be VOLATILE", + NULL, NULL); + } + + if (commandType == CMD_UPDATE && targetEntryPartitionColumn && + TargetEntryChangesValue(targetEntry, partitionColumn, + queryTree->jointree)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "modifying the partition value of rows is not " + "allowed", + NULL, NULL); + } + + if (commandType == CMD_UPDATE && + MasterIrreducibleExpression((Node *) targetEntry->expr, + &hasVarArgument, &hasBadCoalesce)) + { + Assert(hasVarArgument || hasBadCoalesce); + } + + if (FindNodeMatchingCheckFunction((Node *) targetEntry->expr, + NodeIsFieldStore)) + { + /* DELETE cannot do field indirection already */ + Assert(commandType == CMD_UPDATE || commandType == CMD_INSERT); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "inserting or modifying composite type fields is not " + "supported", NULL, + "Use the column name to insert or update the composite " + "type as a single value"); + } + } + + if (joinTree != NULL) + { + if (FindNodeMatchingCheckFunction((Node *) joinTree->quals, + CitusIsVolatileFunction)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "functions used in the WHERE clause of modification " + "queries on distributed tables must not be VOLATILE", + NULL, NULL); + } + else if (MasterIrreducibleExpression(joinTree->quals, &hasVarArgument, + &hasBadCoalesce)) + { + Assert(hasVarArgument || hasBadCoalesce); + } + } + + if (hasVarArgument) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "STABLE functions used in UPDATE queries " + "cannot be called with column references", + NULL, NULL); + } + + if (hasBadCoalesce) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "non-IMMUTABLE functions are not allowed in CASE or " + "COALESCE statements", + NULL, NULL); + } + + if (contain_mutable_functions((Node *) queryTree->returningList)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "non-IMMUTABLE functions are not allowed in the " + "RETURNING clause", + NULL, NULL); + } + + if (queryTree->jointree->quals != NULL && + nodeTag(queryTree->jointree->quals) == T_CurrentOfExpr) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot run DML queries with cursors", NULL, + NULL); } } @@ -918,85 +873,6 @@ NodeIsFieldStore(Node *node) } -/* - * MergeQuerySupported does check for a MERGE command in the query, if it finds - * one, it will verify the below criteria - * - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables - * - Distributed tables requirements in ErrorIfDistTablesNotColocated - * - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported - */ -static DeferredErrorMessage * -MergeQuerySupported(Query *originalQuery, - PlannerRestrictionContext *plannerRestrictionContext) -{ - /* For non-MERGE commands it's a no-op */ - if (!QueryHasMergeCommand(originalQuery)) - { - return NULL; - } - - List *rangeTableList = ExtractRangeTableEntryList(originalQuery); - RangeTblEntry *resultRte = ExtractResultRelationRTE(originalQuery); - - /* - * Fast path queries cannot have merge command, and we prevent the remaining here. - * In Citus we have limited support for MERGE, it's allowed only if all - * the tables(target, source or any CTE) tables are are local i.e. a - * combination of Citus local and Non-Citus tables (regular Postgres tables) - * or distributed tables with some restrictions, please see header of routine - * ErrorIfDistTablesNotColocated for details. - */ - DeferredErrorMessage *deferredError = - ErrorIfMergeHasUnsupportedTables(originalQuery, - rangeTableList, - plannerRestrictionContext); - if (deferredError) - { - return deferredError; - } - - Oid resultRelationId = resultRte->relid; - deferredError = - TargetlistAndFunctionsSupported(resultRelationId, - originalQuery->jointree, - originalQuery->jointree->quals, - originalQuery->targetList, - originalQuery->commandType, - originalQuery->returningList); - if (deferredError) - { - return deferredError; - } - - #if PG_VERSION_NUM >= PG_VERSION_15 - - /* - * MERGE is a special case where we have multiple modify statements - * within itself. Check each INSERT/UPDATE/DELETE individually. - */ - MergeAction *action = NULL; - foreach_ptr(action, originalQuery->mergeActionList) - { - Assert(originalQuery->returningList == NULL); - deferredError = - TargetlistAndFunctionsSupported(resultRelationId, - originalQuery->jointree, - action->qual, - action->targetList, - action->commandType, - originalQuery->returningList); - if (deferredError) - { - return deferredError; - } - } - - #endif - - return NULL; -} - - /* * ModifyQuerySupported returns NULL if the query only contains supported * features, otherwise it returns an error description. @@ -1012,17 +888,8 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer PlannerRestrictionContext *plannerRestrictionContext) { Oid distributedTableId = InvalidOid; - DeferredErrorMessage *error = MergeQuerySupported(originalQuery, - plannerRestrictionContext); - if (error) - { - /* - * For MERGE, we do not do recursive plannning, simply bail out. - */ - RaiseDeferredError(error, ERROR); - } - - error = ModifyPartialQuerySupported(queryTree, multiShardQuery, &distributedTableId); + DeferredErrorMessage *error = ModifyPartialQuerySupported(queryTree, multiShardQuery, + &distributedTableId); if (error) { return error; @@ -4074,288 +3941,3 @@ CompareInsertValuesByShardId(const void *leftElement, const void *rightElement) } } } - - -/* - * IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is - * permitted on special relations, such as materialized view, returns true only if - * it's a "source" relation. - */ -bool -IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte) -{ - if (!IsMergeQuery(parse)) - { - return false; - } - - RangeTblEntry *targetRte = rt_fetch(parse->resultRelation, parse->rtable); - - /* Is it a target relation? */ - if (targetRte->relid == rte->relid) - { - return false; - } - - return true; -} - - -/* - * ErrorIfDistTablesNotColocated Checks to see if - * - * - There are a minimum of two distributed tables (source and a target). - * - All the distributed tables are indeed colocated. - * - MERGE relations are joined on the distribution column - * MERGE .. USING .. ON target.dist_key = source.dist_key - * - The query should touch only a single shard i.e. JOIN AND with a constant qual - * MERGE .. USING .. ON target.dist_key = source.dist_key AND target.dist_key = <> - * - * If any of the conditions are not met, it raises an exception. - */ -static DeferredErrorMessage * -ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList, - PlannerRestrictionContext *plannerRestrictionContext) -{ - /* All MERGE tables must be distributed */ - if (list_length(distTablesList) < 2) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "For MERGE command, both the source and target " - "must be distributed", NULL, NULL); - } - - /* All distributed tables must be colocated */ - if (!AllRelationsInListColocated(distTablesList, RANGETABLE_ENTRY)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "For MERGE command, all the distributed tables " - "must be colocated", NULL, NULL); - } - - /* Are source and target tables joined on distribution column? */ - if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE command is only supported when distributed " - "tables are joined on their distribution column", - NULL, NULL); - } - - /* Look for a constant qual i.e. AND target.dist_key = <> */ - Node *distributionKeyValue = NULL; - Oid targetRelId = ResultRelationOidForQuery(parse); - Var *distributionKey = PartitionColumn(targetRelId, 1); - - Assert(distributionKey); - - /* convert list of expressions into expression tree for further processing */ - Node *quals = parse->jointree->quals; - - if (quals && IsA(quals, List)) - { - quals = (Node *) make_ands_explicit((List *) quals); - } - - if (!ConjunctionContainsColumnFilter(quals, distributionKey, &distributionKeyValue)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE on a distributed table requires a constant filter " - "on the distribution column of the target table", NULL, - "Consider adding AND target.dist_key = <> to the ON clause"); - } - - return NULL; -} - - -/* - * ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE - * present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus - * tables (regular Postgres tables), or distributed tables with some restrictions, please - * see header of routine ErrorIfDistTablesNotColocated for details, raises an exception - * for all other combinations. - */ -static DeferredErrorMessage * -ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, - PlannerRestrictionContext *restrictionContext) -{ - List *distTablesList = NIL; - bool foundLocalTables = false; - - RangeTblEntry *rangeTableEntry = NULL; - foreach_ptr(rangeTableEntry, rangeTableList) - { - Oid relationId = rangeTableEntry->relid; - - switch (rangeTableEntry->rtekind) - { - case RTE_RELATION: - { - /* Check the relation type */ - break; - } - - case RTE_SUBQUERY: - case RTE_FUNCTION: - case RTE_TABLEFUNC: - case RTE_VALUES: - case RTE_JOIN: - case RTE_CTE: - { - /* Skip them as base table(s) will be checked */ - continue; - } - - /* - * RTE_NAMEDTUPLESTORE is typically used in ephmeral named relations, - * such as, trigger data; until we find a genuine use case, raise an - * exception. - * RTE_RESULT is a node added by the planner and we shouldn't - * encounter it in the parse tree. - */ - case RTE_NAMEDTUPLESTORE: - case RTE_RESULT: - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE command is not supported with " - "Tuplestores and results", - NULL, NULL); - } - - default: - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE command: Unrecognized range table entry.", - NULL, NULL); - } - } - - /* RTE Relation can be of various types, check them now */ - - /* skip the regular views as they are replaced with subqueries */ - if (rangeTableEntry->relkind == RELKIND_VIEW) - { - continue; - } - - if (rangeTableEntry->relkind == RELKIND_MATVIEW || - rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE) - { - /* Materialized view or Foreign table as target is not allowed */ - if (IsMergeAllowedOnRelation(parse, rangeTableEntry)) - { - /* Non target relation is ok */ - continue; - } - else - { - /* Usually we don't reach this exception as the Postgres parser catches it */ - StringInfo errorMessage = makeStringInfo(); - appendStringInfo(errorMessage, - "MERGE command is not allowed on " - "relation type(relkind:%c)", rangeTableEntry->relkind); - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, - NULL, NULL); - } - } - - if (rangeTableEntry->relkind != RELKIND_RELATION && - rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE) - { - StringInfo errorMessage = makeStringInfo(); - appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) " - "in MERGE command", rangeTableEntry->relkind); - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, - NULL, NULL); - } - - Assert(rangeTableEntry->relid != 0); - - /* Reference tables are not supported yet */ - if (IsCitusTableType(relationId, REFERENCE_TABLE)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE command is not supported on reference " - "tables yet", NULL, NULL); - } - - /* Append/Range tables are not supported */ - if (IsCitusTableType(relationId, APPEND_DISTRIBUTED) || - IsCitusTableType(relationId, RANGE_DISTRIBUTED)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "For MERGE command, all the distributed tables " - "must be colocated, for append/range distribution, " - "colocation is not supported", NULL, - "Consider using hash distribution instead"); - } - - /* - * For now, save all distributed tables, later (below) we will - * check for supported combination(s). - */ - if (IsCitusTableType(relationId, DISTRIBUTED_TABLE)) - { - distTablesList = lappend(distTablesList, rangeTableEntry); - continue; - } - - /* Regular Postgres tables and Citus local tables are allowed */ - if (!IsCitusTable(relationId) || - IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) - { - foundLocalTables = true; - continue; - } - - /* Any other Citus table type missing ? */ - } - - /* Ensure all tables are indeed local */ - if (foundLocalTables && list_length(distTablesList) == 0) - { - /* All the tables are local, supported */ - return NULL; - } - else if (foundLocalTables && list_length(distTablesList) > 0) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE command is not supported with " - "combination of distributed/local tables yet", - NULL, NULL); - } - - /* Ensure all distributed tables are indeed co-located */ - return ErrorIfDistTablesNotColocated(parse, distTablesList, restrictionContext); -} - - -/* - * QueryHasMergeCommand walks over the query tree and returns false if there - * is no Merge command (e.g., CMD_MERGE), true otherwise. - */ -static bool -QueryHasMergeCommand(Query *queryTree) -{ - /* function is void for pre-15 versions of Postgres */ - #if PG_VERSION_NUM < PG_VERSION_15 - return false; - #else - - /* - * Postgres currently doesn't support Merge queries inside subqueries and - * ctes, but lets be defensive and do query tree walk anyway. - * - * We do not call this path for fast-path queries to avoid this additional - * overhead. - */ - if (!ContainsMergeCommandWalker((Node *) queryTree)) - { - /* No MERGE found */ - return false; - } - - return true; - #endif -} diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index f92f79da6..713f1f4f2 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -151,6 +151,8 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass secondClass); static Var * PartitionKeyForRTEIdentityInQuery(Query *query, int targetRTEIndex, Index *partitionKeyIndex); +static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext * + restrictionContext); static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node); static JoinRestrictionContext * FilterJoinRestrictionContext( JoinRestrictionContext *joinRestrictionContext, Relids @@ -381,8 +383,7 @@ SafeToPushdownUnionSubquery(Query *originalQuery, return false; } - if (!AllRelationsInListColocated(restrictionContext->relationRestrictionList, - RESTRICTION_CONTEXT)) + if (!AllRelationsInRestrictionContextColocated(restrictionContext)) { /* distribution columns are equal, but tables are not co-located */ return false; @@ -1918,33 +1919,19 @@ FindQueryContainingRTEIdentityInternal(Node *node, /* - * AllRelationsInListColocated determines whether all of the relations in the - * given list are co-located. - * Note: The list can be of dofferent types, which is specified by ListEntryType + * AllRelationsInRestrictionContextColocated determines whether all of the relations in the + * given relation restrictions list are co-located. */ -bool -AllRelationsInListColocated(List *relationList, ListEntryType entryType) +static bool +AllRelationsInRestrictionContextColocated(RelationRestrictionContext *restrictionContext) { - void *varPtr = NULL; - RangeTblEntry *rangeTableEntry = NULL; RelationRestriction *relationRestriction = NULL; int initialColocationId = INVALID_COLOCATION_ID; /* check whether all relations exists in the main restriction list */ - foreach_ptr(varPtr, relationList) + foreach_ptr(relationRestriction, restrictionContext->relationRestrictionList) { - Oid relationId = InvalidOid; - - if (entryType == RANGETABLE_ENTRY) - { - rangeTableEntry = (RangeTblEntry *) varPtr; - relationId = rangeTableEntry->relid; - } - else if (entryType == RESTRICTION_CONTEXT) - { - relationRestriction = (RelationRestriction *) varPtr; - relationId = relationRestriction->relationId; - } + Oid relationId = relationRestriction->relationId; if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 19bd9f0c2..29c3c7154 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -256,9 +256,5 @@ extern struct DistributedPlan * CreateDistributedPlan(uint64 planId, plannerRestrictionContext); extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte); -extern bool ConjunctionContainsColumnFilter(Node *node, - Var *column, - Node **distributionKeyValue); -extern bool ContainsMergeCommandWalker(Node *node); #endif /* DISTRIBUTED_PLANNER_H */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 07d160865..62d698b51 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -99,7 +99,6 @@ extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamLi boundParams); extern bool FastPathRouterQuery(Query *query, Node **distributionKeyValue); extern bool JoinConditionIsOnFalse(List *relOptInfo); -extern Oid ResultRelationOidForQuery(Query *query); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index 4fd9c7015..ccd50a6db 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -17,15 +17,6 @@ #define SINGLE_RTE_INDEX 1 -/* - * Represents the pointer type that's being passed in the list. - */ -typedef enum ListEntryType -{ - RANGETABLE_ENTRY, /* RangeTblEntry */ - RESTRICTION_CONTEXT /* RelationRestriction */ -} ListEntryType; - extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); @@ -63,6 +54,4 @@ extern RelationRestrictionContext * FilterRelationRestrictionContext( RelationRestrictionContext *relationRestrictionContext, Relids queryRteIdentities); -extern bool AllRelationsInListColocated(List *relationList, ListEntryType entryType); - #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */ diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index cc87193a0..6fc472b70 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -18,7 +18,6 @@ SET search_path TO merge_schema; SET citus.shard_count TO 4; SET citus.next_shard_id TO 4000000; SET citus.explain_all_tasks to true; -SET citus.shard_replication_factor TO 1; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata ?column? @@ -215,18 +214,9 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) --- Updates one of the row with customer_id = 30002 -SELECT * from target t WHERE t.customer_id = 30002; - customer_id | last_order_id | order_center | order_count | last_order ---------------------------------------------------------------------- - 30002 | 103 | AX | -1 | Sun Jan 17 19:53:00 2021 -(1 row) - --- Turn on notice to print tasks sent to nodes (it should be a single task) -SET citus.log_remote_commands to true; MERGE INTO target t USING source s - ON (t.customer_id = s.customer_id) AND t.customer_id = 30002 + ON (t.customer_id = s.customer_id) WHEN MATCHED AND t.order_center = 'XX' THEN DELETE WHEN MATCHED THEN @@ -236,39 +226,7 @@ MERGE INTO target t WHEN NOT MATCHED THEN -- New entry, record it. INSERT (customer_id, last_order_id, order_center, order_count, last_order) VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time); -NOTICE: issuing MERGE INTO merge_schema.target_4000002 t USING merge_schema.source_4000006 s ON ((t.customer_id OPERATOR(pg_catalog.=) s.customer_id) AND (t.customer_id OPERATOR(pg_catalog.=) 30002)) WHEN MATCHED AND ((t.order_center COLLATE "default") OPERATOR(pg_catalog.=) 'XX'::text) THEN DELETE WHEN MATCHED THEN UPDATE SET last_order_id = s.order_id, order_count = (t.order_count OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (customer_id, last_order_id, order_center, order_count, last_order) VALUES (s.customer_id, s.order_id, s.order_center, 123, s.order_time) -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -SET citus.log_remote_commands to false; -SELECT * from target t WHERE t.customer_id = 30002; - customer_id | last_order_id | order_center | order_count | last_order ---------------------------------------------------------------------- - 30002 | 103 | AX | 0 | Sun Jan 17 19:53:00 2021 -(1 row) - --- Deletes one of the row with customer_id = 30004 -SELECT * from target t WHERE t.customer_id = 30004; - customer_id | last_order_id | order_center | order_count | last_order ---------------------------------------------------------------------- - 30004 | 99 | XX | -1 | Fri Sep 11 03:23:00 2020 -(1 row) - -MERGE INTO target t - USING source s - ON (t.customer_id = s.customer_id) AND t.customer_id = 30004 - WHEN MATCHED AND t.order_center = 'XX' THEN - DELETE - WHEN MATCHED THEN - UPDATE SET -- Existing customer, update the order count and last_order_id - order_count = t.order_count + 1, - last_order_id = s.order_id - WHEN NOT MATCHED THEN -- New entry, record it. - INSERT (customer_id, last_order_id, order_center, order_count, last_order) - VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time); -SELECT * from target t WHERE t.customer_id = 30004; - customer_id | last_order_id | order_center | order_count | last_order ---------------------------------------------------------------------- -(0 rows) - +ERROR: MERGE command is not supported on distributed/reference tables yet -- -- Test MERGE with CTE as source -- @@ -428,39 +386,18 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) -SELECT * FROM t1 order by id; - id | val ---------------------------------------------------------------------- - 1 | 0 - 2 | 0 - 5 | 0 -(3 rows) - -SET citus.log_remote_commands to true; WITH s1_res AS ( SELECT * FROM s1 ) MERGE INTO t1 - USING s1_res ON (s1_res.id = t1.id) AND t1.id = 6 + USING s1_res ON (s1_res.id = t1.id) WHEN MATCHED AND s1_res.val = 0 THEN DELETE WHEN MATCHED THEN UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_4000018 s1) MERGE INTO merge_schema.t1_4000014 t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val) -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -SET citus.log_remote_commands to false; --- As the id 6 is NO match, VALUES(6, 1) should appear in target -SELECT * FROM t1 order by id; - id | val ---------------------------------------------------------------------- - 1 | 0 - 2 | 0 - 5 | 0 - 6 | 1 -(4 rows) - +ERROR: MERGE command is not supported on distributed/reference tables yet -- -- Test with multiple join conditions -- @@ -616,39 +553,16 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) -SELECT * FROM t2 ORDER BY 1; - id | val | src ---------------------------------------------------------------------- - 1 | 0 | target - 2 | 0 | target - 3 | 1 | match - 4 | 0 | match -(4 rows) - -SET citus.log_remote_commands to true; MERGE INTO t2 USING s2 -ON t2.id = s2.id AND t2.src = s2.src AND t2.id = 4 +ON t2.id = s2.id AND t2.src = s2.src WHEN MATCHED AND t2.val = 1 THEN UPDATE SET val = s2.val + 10 WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src); -NOTICE: issuing MERGE INTO merge_schema.t2_4000023 t2 USING merge_schema.s2_4000027 s2 ON ((t2.id OPERATOR(pg_catalog.=) s2.id) AND (t2.src OPERATOR(pg_catalog.=) s2.src) AND (t2.id OPERATOR(pg_catalog.=) 4)) WHEN MATCHED AND (t2.val OPERATOR(pg_catalog.=) 1) THEN UPDATE SET val = (s2.val OPERATOR(pg_catalog.+) 10) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src) -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -SET citus.log_remote_commands to false; --- Row with id = 4 is a match for delete clause, row should be deleted --- Row with id = 3 is a NO match, row from source will be inserted -SELECT * FROM t2 ORDER BY 1; - id | val | src ---------------------------------------------------------------------- - 1 | 0 | target - 2 | 0 | target - 3 | 1 | match - 3 | 10 | match -(4 rows) - +ERROR: MERGE command is not supported on distributed/reference tables yet -- -- With sub-query as the MERGE source -- @@ -1299,261 +1213,9 @@ SELECT * FROM ft_target; 3 | source (2 rows) --- --- complex joins on the source side --- --- source(join of two relations) relation is an unaliased join -CREATE TABLE target_cj(tid int, src text, val int); -CREATE TABLE source_cj1(sid1 int, src1 text, val1 int); -CREATE TABLE source_cj2(sid2 int, src2 text, val2 int); -INSERT INTO target_cj VALUES (1, 'target', 0); -INSERT INTO target_cj VALUES (2, 'target', 0); -INSERT INTO target_cj VALUES (2, 'target', 0); -INSERT INTO target_cj VALUES (3, 'target', 0); -INSERT INTO source_cj1 VALUES (2, 'source-1', 10); -INSERT INTO source_cj2 VALUES (2, 'source-2', 20); -BEGIN; -MERGE INTO target_cj t -USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 -ON t.tid = sid1 AND t.tid = 2 -WHEN MATCHED THEN - UPDATE SET src = src2 -WHEN NOT MATCHED THEN - DO NOTHING; --- Gold result to compare against -SELECT * FROM target_cj ORDER BY 1; - tid | src | val ---------------------------------------------------------------------- - 1 | target | 0 - 2 | source-2 | 0 - 2 | source-2 | 0 - 3 | target | 0 -(4 rows) - -ROLLBACK; -BEGIN; --- try accessing columns from either side of the source join -MERGE INTO target_cj t -USING source_cj1 s2 - INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 -ON t.tid = sid1 AND t.tid = 2 -WHEN MATCHED THEN - UPDATE SET tid = sid2, src = src1, val = val2 -WHEN NOT MATCHED THEN - DO NOTHING; --- Gold result to compare against -SELECT * FROM target_cj ORDER BY 1; - tid | src | val ---------------------------------------------------------------------- - 1 | target | 0 - 2 | source-1 | 20 - 2 | source-1 | 20 - 3 | target | 0 -(4 rows) - -ROLLBACK; --- Test the same scenarios with distributed tables -SELECT create_distributed_table('target_cj', 'tid'); -NOTICE: Copying data from local table... -NOTICE: copying the data has completed -DETAIL: The local data in the table is no longer visible, but is still on disk. -HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.target_cj$$) - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('source_cj1', 'sid1'); -NOTICE: Copying data from local table... -NOTICE: copying the data has completed -DETAIL: The local data in the table is no longer visible, but is still on disk. -HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.source_cj1$$) - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('source_cj2', 'sid2'); -NOTICE: Copying data from local table... -NOTICE: copying the data has completed -DETAIL: The local data in the table is no longer visible, but is still on disk. -HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.source_cj2$$) - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -BEGIN; -SET citus.log_remote_commands to true; -MERGE INTO target_cj t -USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 -ON t.tid = sid1 AND t.tid = 2 -WHEN MATCHED THEN - UPDATE SET src = src2 -WHEN NOT MATCHED THEN - DO NOTHING; -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.target_cj_4000050 t USING (merge_schema.source_cj1_4000054 s1 JOIN merge_schema.source_cj2_4000058 s2 ON ((s1.sid1 OPERATOR(pg_catalog.=) s2.sid2))) ON ((t.tid OPERATOR(pg_catalog.=) s1.sid1) AND (t.tid OPERATOR(pg_catalog.=) 2)) WHEN MATCHED THEN UPDATE SET src = s2.src2 WHEN NOT MATCHED THEN DO NOTHING -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -SET citus.log_remote_commands to false; -SELECT * FROM target_cj ORDER BY 1; - tid | src | val ---------------------------------------------------------------------- - 1 | target | 0 - 2 | source-2 | 0 - 2 | source-2 | 0 - 3 | target | 0 -(4 rows) - -ROLLBACK; -BEGIN; --- try accessing columns from either side of the source join -MERGE INTO target_cj t -USING source_cj1 s2 - INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 -ON t.tid = sid1 AND t.tid = 2 -WHEN MATCHED THEN - UPDATE SET src = src1, val = val2 -WHEN NOT MATCHED THEN - DO NOTHING; -SELECT * FROM target_cj ORDER BY 1; - tid | src | val ---------------------------------------------------------------------- - 1 | target | 0 - 2 | source-1 | 20 - 2 | source-1 | 20 - 3 | target | 0 -(4 rows) - -ROLLBACK; --- sub-query as a source -BEGIN; -MERGE INTO target_cj t -USING (SELECT * FROM source_cj1 WHERE sid1 = 2) sub -ON t.tid = sub.sid1 AND t.tid = 2 -WHEN MATCHED THEN - UPDATE SET src = sub.src1, val = val1 -WHEN NOT MATCHED THEN - DO NOTHING; -SELECT * FROM target_cj ORDER BY 1; - tid | src | val ---------------------------------------------------------------------- - 1 | target | 0 - 2 | source-1 | 10 - 2 | source-1 | 10 - 3 | target | 0 -(4 rows) - -ROLLBACK; --- Test self-join -BEGIN; -SELECT * FROM target_cj ORDER BY 1; - tid | src | val ---------------------------------------------------------------------- - 1 | target | 0 - 2 | target | 0 - 2 | target | 0 - 3 | target | 0 -(4 rows) - -set citus.log_remote_commands to true; -MERGE INTO target_cj t1 -USING (SELECT * FROM target_cj) sub -ON t1.tid = sub.tid AND t1.tid = 3 -WHEN MATCHED THEN - UPDATE SET src = sub.src, val = sub.val + 100 -WHEN NOT MATCHED THEN - DO NOTHING; -NOTICE: issuing MERGE INTO merge_schema.target_cj_4000048 t1 USING (SELECT target_cj.tid, target_cj.src, target_cj.val FROM merge_schema.target_cj_4000048 target_cj) sub ON ((t1.tid OPERATOR(pg_catalog.=) sub.tid) AND (t1.tid OPERATOR(pg_catalog.=) 3)) WHEN MATCHED THEN UPDATE SET src = sub.src, val = (sub.val OPERATOR(pg_catalog.+) 100) WHEN NOT MATCHED THEN DO NOTHING -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -set citus.log_remote_commands to false; -SELECT * FROM target_cj ORDER BY 1; - tid | src | val ---------------------------------------------------------------------- - 1 | target | 0 - 2 | target | 0 - 2 | target | 0 - 3 | target | 100 -(4 rows) - -ROLLBACK; --- Test PREPARE -PREPARE foo(int) AS -MERGE INTO target_cj target -USING (SELECT * FROM source_cj1) sub -ON target.tid = sub.sid1 AND target.tid = $1 -WHEN MATCHED THEN - UPDATE SET val = sub.val1 -WHEN NOT MATCHED THEN - DO NOTHING; -SELECT * FROM target_cj ORDER BY 1; - tid | src | val ---------------------------------------------------------------------- - 1 | target | 0 - 2 | target | 0 - 2 | target | 0 - 3 | target | 0 -(4 rows) - -BEGIN; -EXECUTE foo(2); -EXECUTE foo(2); -EXECUTE foo(2); -EXECUTE foo(2); -EXECUTE foo(2); -SELECT * FROM target_cj ORDER BY 1; - tid | src | val ---------------------------------------------------------------------- - 1 | target | 0 - 2 | target | 10 - 2 | target | 10 - 3 | target | 0 -(4 rows) - -ROLLBACK; -BEGIN; -SET citus.log_remote_commands to true; -SET client_min_messages TO DEBUG1; -EXECUTE foo(2); -DEBUG: -DEBUG: -DEBUG: -DEBUG: -DEBUG: -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing MERGE INTO merge_schema.target_cj_4000050 target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_4000054 source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -RESET client_min_messages; -EXECUTE foo(2); -NOTICE: issuing MERGE INTO merge_schema.target_cj_4000050 target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_4000054 source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -SET citus.log_remote_commands to false; -SELECT * FROM target_cj ORDER BY 1; - tid | src | val ---------------------------------------------------------------------- - 1 | target | 0 - 2 | target | 10 - 2 | target | 10 - 3 | target | 0 -(4 rows) - -ROLLBACK; -- -- Error and Unsupported scenarios -- --- try updating the distribution key column -BEGIN; -MERGE INTO target_cj t - USING source_cj1 s - ON t.tid = s.sid1 AND t.tid = 2 - WHEN MATCHED THEN - UPDATE SET tid = tid + 9, src = src || ' updated by merge' - WHEN NOT MATCHED THEN - INSERT VALUES (sid1, 'inserted by merge', val1); -ERROR: modifying the partition value of rows is not allowed -ROLLBACK; -- Foreign table as target MERGE INTO foreign_table USING ft_target ON (foreign_table.id = ft_target.id) @@ -1612,54 +1274,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1.id, s1.val); -ERROR: MERGE command is not supported with combination of distributed/local tables yet --- Now both s1 and t1 are distributed tables -SELECT undistribute_table('t1'); -NOTICE: creating a new table for merge_schema.t1 -NOTICE: moving the data of merge_schema.t1 -NOTICE: dropping the old merge_schema.t1 -NOTICE: renaming the new table to merge_schema.t1 - undistribute_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('t1', 'id'); -NOTICE: Copying data from local table... -NOTICE: copying the data has completed -DETAIL: The local data in the table is no longer visible, but is still on disk. -HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.t1$$) - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - --- We have a potential pitfall where a function can be invoked in --- the MERGE conditions which can insert/update to a random shard -CREATE OR REPLACE function merge_when_and_write() RETURNS BOOLEAN -LANGUAGE PLPGSQL AS -$$ -BEGIN - INSERT INTO t1 VALUES (100, 100); - RETURN TRUE; -END; -$$; --- Test preventing "ON" join condition from writing to the database -BEGIN; -MERGE INTO t1 -USING s1 ON t1.id = s1.id AND t1.id = 2 AND (merge_when_and_write()) -WHEN MATCHED THEN - UPDATE SET val = t1.val + s1.val; -ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE -ROLLBACK; --- Test preventing WHEN clause(s) from writing to the database -BEGIN; -MERGE INTO t1 -USING s1 ON t1.id = s1.id AND t1.id = 2 -WHEN MATCHED AND (merge_when_and_write()) THEN - UPDATE SET val = t1.val + s1.val; -ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE -ROLLBACK; +ERROR: MERGE command is not supported on distributed/reference tables yet -- Joining on partition columns with sub-query MERGE INTO t1 USING (SELECT * FROM s1) sub ON (sub.val = t1.id) -- sub.val is not a distribution column @@ -1669,7 +1284,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is not supported on distributed/reference tables yet -- Joining on partition columns with CTE WITH s1_res AS ( SELECT * FROM s1 @@ -1682,7 +1297,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is not supported on distributed/reference tables yet -- Constant Join condition WITH s1_res AS ( SELECT * FROM s1 @@ -1695,7 +1310,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is not supported on distributed/reference tables yet -- With a single WHEN clause, which causes a non-left join WITH s1_res AS ( SELECT * FROM s1 @@ -1704,7 +1319,7 @@ WITH s1_res AS ( WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is not supported on distributed/reference tables yet -- -- Reference tables -- @@ -1756,7 +1371,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1.id, s1.val); -ERROR: MERGE command is not supported on reference tables yet +ERROR: MERGE command is not supported on distributed/reference tables yet -- -- Postgres + Citus-Distributed table -- @@ -1798,7 +1413,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1.id, s1.val); -ERROR: MERGE command is not supported with combination of distributed/local tables yet +ERROR: MERGE command is not supported on distributed/reference tables yet MERGE INTO t1 USING (SELECT * FROM s1) sub ON (sub.id = t1.id) WHEN MATCHED AND sub.val = 0 THEN @@ -1807,7 +1422,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val); -ERROR: MERGE command is not supported with combination of distributed/local tables yet +ERROR: MERGE command is not supported on distributed/reference tables yet CREATE TABLE pg(val int); SELECT create_distributed_table('s1', 'id'); NOTICE: Copying data from local table... @@ -1828,7 +1443,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val); -ERROR: MERGE command is not supported with combination of distributed/local tables yet +ERROR: MERGE command is not supported on distributed/reference tables yet -- Mix Postgres table in CTE WITH pg_res AS ( SELECT * FROM pg @@ -1841,7 +1456,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val); -ERROR: MERGE command is not supported with combination of distributed/local tables yet +ERROR: MERGE command is not supported on distributed/reference tables yet -- Match more than one source row should fail same as Postgres behavior SELECT undistribute_table('t1'); NOTICE: creating a new table for merge_schema.t1 @@ -1896,265 +1511,6 @@ WHEN NOT MATCHED THEN INSERT VALUES(mv_source.id, mv_source.val); ERROR: cannot execute MERGE on relation "mv_source" DETAIL: This operation is not supported for materialized views. --- Distributed tables *must* be colocated -CREATE TABLE dist_target(id int, val varchar); -SELECT create_distributed_table('dist_target', 'id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -CREATE TABLE dist_source(id int, val varchar); -SELECT create_distributed_table('dist_source', 'id', colocate_with => 'none'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -MERGE INTO dist_target -USING dist_source -ON dist_target.id = dist_source.id -WHEN MATCHED THEN -UPDATE SET val = dist_source.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_source.id, dist_source.val); -ERROR: For MERGE command, all the distributed tables must be colocated --- Distributed tables *must* be joined on distribution column -CREATE TABLE dist_colocated(id int, val int); -SELECT create_distributed_table('dist_colocated', 'id', colocate_with => 'dist_target'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -MERGE INTO dist_target -USING dist_colocated -ON dist_target.id = dist_colocated.val -- val is not the distribution column -WHEN MATCHED THEN -UPDATE SET val = dist_colocated.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_colocated.id, dist_colocated.val); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column --- MERGE command must be joined with with a constant qual on target relation --- AND clause is missing -MERGE INTO dist_target -USING dist_colocated -ON dist_target.id = dist_colocated.id -WHEN MATCHED THEN -UPDATE SET val = dist_colocated.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_colocated.id, dist_colocated.val); -ERROR: MERGE on a distributed table requires a constant filter on the distribution column of the target table -HINT: Consider adding AND target.dist_key = <> to the ON clause --- AND clause incorrect table (must be target) -MERGE INTO dist_target -USING dist_colocated -ON dist_target.id = dist_colocated.id AND dist_colocated.id = 1 -WHEN MATCHED THEN -UPDATE SET val = dist_colocated.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_colocated.id, dist_colocated.val); -ERROR: MERGE on a distributed table requires a constant filter on the distribution column of the target table -HINT: Consider adding AND target.dist_key = <> to the ON clause --- AND clause incorrect column (must be distribution column) -MERGE INTO dist_target -USING dist_colocated -ON dist_target.id = dist_colocated.id AND dist_target.val = 'const' -WHEN MATCHED THEN -UPDATE SET val = dist_colocated.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_colocated.id, dist_colocated.val); -ERROR: MERGE on a distributed table requires a constant filter on the distribution column of the target table -HINT: Consider adding AND target.dist_key = <> to the ON clause --- Both the source and target must be distributed -MERGE INTO dist_target -USING (SELECT 100 id) AS source -ON dist_target.id = source.id AND dist_target.val = 'const' -WHEN MATCHED THEN -UPDATE SET val = 'source' -WHEN NOT MATCHED THEN -INSERT VALUES(source.id, 'source'); -ERROR: For MERGE command, both the source and target must be distributed --- Non-hash distributed tables (append/range). -CREATE VIEW show_tables AS -SELECT logicalrelid, partmethod -FROM pg_dist_partition -WHERE (logicalrelid = 'dist_target'::regclass) OR (logicalrelid = 'dist_source'::regclass) -ORDER BY 1; -SELECT undistribute_table('dist_source'); -NOTICE: creating a new table for merge_schema.dist_source -NOTICE: moving the data of merge_schema.dist_source -NOTICE: dropping the old merge_schema.dist_source -NOTICE: drop cascades to view show_tables -CONTEXT: SQL statement "DROP TABLE merge_schema.dist_source CASCADE" -NOTICE: renaming the new table to merge_schema.dist_source - undistribute_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('dist_source', 'id', 'append'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM show_tables; - logicalrelid | partmethod ---------------------------------------------------------------------- - dist_target | h - dist_source | a -(2 rows) - -MERGE INTO dist_target -USING dist_source -ON dist_target.id = dist_source.id -WHEN MATCHED THEN -UPDATE SET val = dist_source.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_source.id, dist_source.val); -ERROR: For MERGE command, all the distributed tables must be colocated, for append/range distribution, colocation is not supported -HINT: Consider using hash distribution instead -SELECT undistribute_table('dist_source'); -NOTICE: creating a new table for merge_schema.dist_source -NOTICE: moving the data of merge_schema.dist_source -NOTICE: dropping the old merge_schema.dist_source -NOTICE: drop cascades to view show_tables -CONTEXT: SQL statement "DROP TABLE merge_schema.dist_source CASCADE" -NOTICE: renaming the new table to merge_schema.dist_source - undistribute_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('dist_source', 'id', 'range'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM show_tables; - logicalrelid | partmethod ---------------------------------------------------------------------- - dist_target | h - dist_source | r -(2 rows) - -MERGE INTO dist_target -USING dist_source -ON dist_target.id = dist_source.id -WHEN MATCHED THEN -UPDATE SET val = dist_source.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_source.id, dist_source.val); -ERROR: For MERGE command, all the distributed tables must be colocated, for append/range distribution, colocation is not supported -HINT: Consider using hash distribution instead --- Both are append tables -SELECT undistribute_table('dist_target'); -NOTICE: creating a new table for merge_schema.dist_target -NOTICE: moving the data of merge_schema.dist_target -NOTICE: dropping the old merge_schema.dist_target -NOTICE: drop cascades to view show_tables -CONTEXT: SQL statement "DROP TABLE merge_schema.dist_target CASCADE" -NOTICE: renaming the new table to merge_schema.dist_target - undistribute_table ---------------------------------------------------------------------- - -(1 row) - -SELECT undistribute_table('dist_source'); -NOTICE: creating a new table for merge_schema.dist_source -NOTICE: moving the data of merge_schema.dist_source -NOTICE: dropping the old merge_schema.dist_source -NOTICE: drop cascades to view show_tables -CONTEXT: SQL statement "DROP TABLE merge_schema.dist_source CASCADE" -NOTICE: renaming the new table to merge_schema.dist_source - undistribute_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('dist_target', 'id', 'append'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('dist_source', 'id', 'append'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM show_tables; - logicalrelid | partmethod ---------------------------------------------------------------------- - dist_target | a - dist_source | a -(2 rows) - -MERGE INTO dist_target -USING dist_source -ON dist_target.id = dist_source.id -WHEN MATCHED THEN -UPDATE SET val = dist_source.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_source.id, dist_source.val); -ERROR: For MERGE command, all the distributed tables must be colocated, for append/range distribution, colocation is not supported -HINT: Consider using hash distribution instead --- Both are range tables -SELECT undistribute_table('dist_target'); -NOTICE: creating a new table for merge_schema.dist_target -NOTICE: moving the data of merge_schema.dist_target -NOTICE: dropping the old merge_schema.dist_target -NOTICE: drop cascades to view show_tables -CONTEXT: SQL statement "DROP TABLE merge_schema.dist_target CASCADE" -NOTICE: renaming the new table to merge_schema.dist_target - undistribute_table ---------------------------------------------------------------------- - -(1 row) - -SELECT undistribute_table('dist_source'); -NOTICE: creating a new table for merge_schema.dist_source -NOTICE: moving the data of merge_schema.dist_source -NOTICE: dropping the old merge_schema.dist_source -NOTICE: drop cascades to view show_tables -CONTEXT: SQL statement "DROP TABLE merge_schema.dist_source CASCADE" -NOTICE: renaming the new table to merge_schema.dist_source - undistribute_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('dist_target', 'id', 'range'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('dist_source', 'id', 'range'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM show_tables; - logicalrelid | partmethod ---------------------------------------------------------------------- - dist_target | r - dist_source | r -(2 rows) - -MERGE INTO dist_target -USING dist_source -ON dist_target.id = dist_source.id -WHEN MATCHED THEN -UPDATE SET val = dist_source.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_source.id, dist_source.val); -ERROR: For MERGE command, all the distributed tables must be colocated, for append/range distribution, colocation is not supported -HINT: Consider using hash distribution instead DROP SERVER foreign_server CASCADE; NOTICE: drop cascades to 3 other objects DETAIL: drop cascades to user mapping for postgres on server foreign_server @@ -2163,9 +1519,8 @@ drop cascades to foreign table foreign_table NOTICE: foreign table "foreign_table_4000046" does not exist, skipping CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name, drop_shards_metadata_only := false)" PL/pgSQL function citus_drop_trigger() line XX at PERFORM -DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 63 other objects +NOTICE: drop cascades to 56 other objects DETAIL: drop cascades to function insert_data() drop cascades to table pg_result drop cascades to table local_local @@ -2217,18 +1572,11 @@ drop cascades to table ft_target drop cascades to table ft_source_4000045 drop cascades to table ft_source drop cascades to extension postgres_fdw -drop cascades to table target_cj -drop cascades to table source_cj1 -drop cascades to table source_cj2 drop cascades to table pg -drop cascades to table t1_4000078 -drop cascades to table s1_4000079 +drop cascades to table t1_4000062 +drop cascades to table s1_4000063 drop cascades to table t1 drop cascades to table s1 -drop cascades to table dist_colocated -drop cascades to table dist_target -drop cascades to table dist_source -drop cascades to view show_tables SELECT 1 FROM master_remove_node('localhost', :master_port); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 887003a97..5b5764593 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -177,7 +177,7 @@ INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50: INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random()); -- commands with mutable functions in their quals DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000); -ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE -- commands with mutable but non-volatile functions(ie: stable func.) in their quals -- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable) DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp; diff --git a/src/test/regress/expected/multi_mx_modifications.out b/src/test/regress/expected/multi_mx_modifications.out index 9e053d3f2..276766c30 100644 --- a/src/test/regress/expected/multi_mx_modifications.out +++ b/src/test/regress/expected/multi_mx_modifications.out @@ -95,7 +95,7 @@ INSERT INTO limit_orders_mx VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11: INSERT INTO limit_orders_mx VALUES (2036, 'GOOG', 5634, now(), 'buy', random()); -- commands with mutable functions in their quals DELETE FROM limit_orders_mx WHERE id = 246 AND bidder_id = (random() * 1000); -ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE -- commands with mutable but non-volatile functions(ie: stable func.) in their quals -- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable) DELETE FROM limit_orders_mx WHERE id = 246 AND placed_at = current_timestamp::timestamp; diff --git a/src/test/regress/expected/multi_shard_update_delete.out b/src/test/regress/expected/multi_shard_update_delete.out index af8ddfb2d..016801d26 100644 --- a/src/test/regress/expected/multi_shard_update_delete.out +++ b/src/test/regress/expected/multi_shard_update_delete.out @@ -674,7 +674,7 @@ UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id * random(); -ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE UPDATE users_test_table SET value_2 = 5 * random() FROM events_test_table diff --git a/src/test/regress/expected/pg15.out b/src/test/regress/expected/pg15.out index d92686b93..7a41b25ec 100644 --- a/src/test/regress/expected/pg15.out +++ b/src/test/regress/expected/pg15.out @@ -315,7 +315,7 @@ SELECT create_reference_table('tbl2'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is not supported on reference tables yet +ERROR: MERGE command is not supported on distributed/reference tables yet -- now, both are reference, still not supported SELECT create_reference_table('tbl1'); create_reference_table @@ -325,7 +325,7 @@ SELECT create_reference_table('tbl1'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is not supported on reference tables yet +ERROR: MERGE command is not supported on distributed/reference tables yet -- now, both distributed, not works SELECT undistribute_table('tbl1'); NOTICE: creating a new table for pg15.tbl1 @@ -419,14 +419,14 @@ SELECT create_distributed_table('tbl2', 'x'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is not supported on distributed/reference tables yet -- also, not inside subqueries & ctes WITH targq AS ( SELECT * FROM tbl2 ) MERGE INTO tbl1 USING targq ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is not supported on distributed/reference tables yet -- crashes on beta3, fixed on 15 stable --WITH foo AS ( -- MERGE INTO tbl1 USING tbl2 ON (true) @@ -441,7 +441,7 @@ USING tbl2 ON (true) WHEN MATCHED THEN UPDATE SET x = (SELECT count(*) FROM tbl2); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is not supported on distributed/reference tables yet -- test numeric types with negative scale CREATE TABLE numeric_negative_scale(numeric_column numeric(3,-1), orig_value int); INSERT into numeric_negative_scale SELECT x,x FROM generate_series(111, 115) x; diff --git a/src/test/regress/expected/pgmerge.out b/src/test/regress/expected/pgmerge.out index 89c3f85ca..b90760691 100644 --- a/src/test/regress/expected/pgmerge.out +++ b/src/test/regress/expected/pgmerge.out @@ -910,15 +910,7 @@ MERGE INTO wq_target t USING wq_source s ON t.tid = s.sid WHEN MATCHED AND (merge_when_and_write()) THEN UPDATE SET balance = t.balance + s.balance; -ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE -ROLLBACK; --- Test preventing ON condition from writing to the database -BEGIN; -MERGE INTO wq_target t -USING wq_source s ON t.tid = s.sid AND (merge_when_and_write()) -WHEN MATCHED THEN - UPDATE SET balance = t.balance + s.balance; -ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE ROLLBACK; drop function merge_when_and_write(); DROP TABLE wq_target, wq_source; diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 539e9814d..c266b5333 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -19,7 +19,6 @@ SET search_path TO merge_schema; SET citus.shard_count TO 4; SET citus.next_shard_id TO 4000000; SET citus.explain_all_tasks to true; -SET citus.shard_replication_factor TO 1; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); CREATE TABLE source @@ -144,13 +143,9 @@ SELECT undistribute_table('source'); SELECT create_distributed_table('target', 'customer_id'); SELECT create_distributed_table('source', 'customer_id'); --- Updates one of the row with customer_id = 30002 -SELECT * from target t WHERE t.customer_id = 30002; --- Turn on notice to print tasks sent to nodes (it should be a single task) -SET citus.log_remote_commands to true; MERGE INTO target t USING source s - ON (t.customer_id = s.customer_id) AND t.customer_id = 30002 + ON (t.customer_id = s.customer_id) WHEN MATCHED AND t.order_center = 'XX' THEN DELETE @@ -163,27 +158,6 @@ MERGE INTO target t WHEN NOT MATCHED THEN -- New entry, record it. INSERT (customer_id, last_order_id, order_center, order_count, last_order) VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time); -SET citus.log_remote_commands to false; -SELECT * from target t WHERE t.customer_id = 30002; - --- Deletes one of the row with customer_id = 30004 -SELECT * from target t WHERE t.customer_id = 30004; -MERGE INTO target t - USING source s - ON (t.customer_id = s.customer_id) AND t.customer_id = 30004 - - WHEN MATCHED AND t.order_center = 'XX' THEN - DELETE - - WHEN MATCHED THEN - UPDATE SET -- Existing customer, update the order count and last_order_id - order_count = t.order_count + 1, - last_order_id = s.order_id - - WHEN NOT MATCHED THEN -- New entry, record it. - INSERT (customer_id, last_order_id, order_center, order_count, last_order) - VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time); -SELECT * from target t WHERE t.customer_id = 30004; -- -- Test MERGE with CTE as source @@ -269,13 +243,11 @@ SELECT create_distributed_table('t1', 'id'); SELECT create_distributed_table('s1', 'id'); -SELECT * FROM t1 order by id; -SET citus.log_remote_commands to true; WITH s1_res AS ( SELECT * FROM s1 ) MERGE INTO t1 - USING s1_res ON (s1_res.id = t1.id) AND t1.id = 6 + USING s1_res ON (s1_res.id = t1.id) WHEN MATCHED AND s1_res.val = 0 THEN DELETE @@ -283,9 +255,6 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -SET citus.log_remote_commands to false; --- As the id 6 is NO match, VALUES(6, 1) should appear in target -SELECT * FROM t1 order by id; -- -- Test with multiple join conditions @@ -356,21 +325,15 @@ SELECT undistribute_table('s2'); SELECT create_distributed_table('t2', 'id'); SELECT create_distributed_table('s2', 'id'); -SELECT * FROM t2 ORDER BY 1; -SET citus.log_remote_commands to true; MERGE INTO t2 USING s2 -ON t2.id = s2.id AND t2.src = s2.src AND t2.id = 4 +ON t2.id = s2.id AND t2.src = s2.src WHEN MATCHED AND t2.val = 1 THEN UPDATE SET val = s2.val + 10 WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src); -SET citus.log_remote_commands to false; --- Row with id = 4 is a match for delete clause, row should be deleted --- Row with id = 3 is a NO match, row from source will be inserted -SELECT * FROM t2 ORDER BY 1; -- -- With sub-query as the MERGE source @@ -861,159 +824,10 @@ RESET client_min_messages; SELECT * FROM ft_target; --- --- complex joins on the source side --- - --- source(join of two relations) relation is an unaliased join - -CREATE TABLE target_cj(tid int, src text, val int); -CREATE TABLE source_cj1(sid1 int, src1 text, val1 int); -CREATE TABLE source_cj2(sid2 int, src2 text, val2 int); - -INSERT INTO target_cj VALUES (1, 'target', 0); -INSERT INTO target_cj VALUES (2, 'target', 0); -INSERT INTO target_cj VALUES (2, 'target', 0); -INSERT INTO target_cj VALUES (3, 'target', 0); - -INSERT INTO source_cj1 VALUES (2, 'source-1', 10); -INSERT INTO source_cj2 VALUES (2, 'source-2', 20); - -BEGIN; -MERGE INTO target_cj t -USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 -ON t.tid = sid1 AND t.tid = 2 -WHEN MATCHED THEN - UPDATE SET src = src2 -WHEN NOT MATCHED THEN - DO NOTHING; --- Gold result to compare against -SELECT * FROM target_cj ORDER BY 1; -ROLLBACK; - -BEGIN; --- try accessing columns from either side of the source join -MERGE INTO target_cj t -USING source_cj1 s2 - INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 -ON t.tid = sid1 AND t.tid = 2 -WHEN MATCHED THEN - UPDATE SET tid = sid2, src = src1, val = val2 -WHEN NOT MATCHED THEN - DO NOTHING; --- Gold result to compare against -SELECT * FROM target_cj ORDER BY 1; -ROLLBACK; - --- Test the same scenarios with distributed tables - -SELECT create_distributed_table('target_cj', 'tid'); -SELECT create_distributed_table('source_cj1', 'sid1'); -SELECT create_distributed_table('source_cj2', 'sid2'); - -BEGIN; -SET citus.log_remote_commands to true; -MERGE INTO target_cj t -USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 -ON t.tid = sid1 AND t.tid = 2 -WHEN MATCHED THEN - UPDATE SET src = src2 -WHEN NOT MATCHED THEN - DO NOTHING; -SET citus.log_remote_commands to false; -SELECT * FROM target_cj ORDER BY 1; -ROLLBACK; - -BEGIN; --- try accessing columns from either side of the source join -MERGE INTO target_cj t -USING source_cj1 s2 - INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 -ON t.tid = sid1 AND t.tid = 2 -WHEN MATCHED THEN - UPDATE SET src = src1, val = val2 -WHEN NOT MATCHED THEN - DO NOTHING; -SELECT * FROM target_cj ORDER BY 1; -ROLLBACK; - --- sub-query as a source -BEGIN; -MERGE INTO target_cj t -USING (SELECT * FROM source_cj1 WHERE sid1 = 2) sub -ON t.tid = sub.sid1 AND t.tid = 2 -WHEN MATCHED THEN - UPDATE SET src = sub.src1, val = val1 -WHEN NOT MATCHED THEN - DO NOTHING; -SELECT * FROM target_cj ORDER BY 1; -ROLLBACK; - --- Test self-join -BEGIN; -SELECT * FROM target_cj ORDER BY 1; -set citus.log_remote_commands to true; -MERGE INTO target_cj t1 -USING (SELECT * FROM target_cj) sub -ON t1.tid = sub.tid AND t1.tid = 3 -WHEN MATCHED THEN - UPDATE SET src = sub.src, val = sub.val + 100 -WHEN NOT MATCHED THEN - DO NOTHING; -set citus.log_remote_commands to false; -SELECT * FROM target_cj ORDER BY 1; -ROLLBACK; - - --- Test PREPARE -PREPARE foo(int) AS -MERGE INTO target_cj target -USING (SELECT * FROM source_cj1) sub -ON target.tid = sub.sid1 AND target.tid = $1 -WHEN MATCHED THEN - UPDATE SET val = sub.val1 -WHEN NOT MATCHED THEN - DO NOTHING; - -SELECT * FROM target_cj ORDER BY 1; - -BEGIN; -EXECUTE foo(2); -EXECUTE foo(2); -EXECUTE foo(2); -EXECUTE foo(2); -EXECUTE foo(2); -SELECT * FROM target_cj ORDER BY 1; -ROLLBACK; - -BEGIN; - -SET citus.log_remote_commands to true; -SET client_min_messages TO DEBUG1; -EXECUTE foo(2); -RESET client_min_messages; - -EXECUTE foo(2); -SET citus.log_remote_commands to false; - -SELECT * FROM target_cj ORDER BY 1; -ROLLBACK; - -- -- Error and Unsupported scenarios -- --- try updating the distribution key column -BEGIN; -MERGE INTO target_cj t - USING source_cj1 s - ON t.tid = s.sid1 AND t.tid = 2 - WHEN MATCHED THEN - UPDATE SET tid = tid + 9, src = src || ' updated by merge' - WHEN NOT MATCHED THEN - INSERT VALUES (sid1, 'inserted by merge', val1); -ROLLBACK; - -- Foreign table as target MERGE INTO foreign_table USING ft_target ON (foreign_table.id = ft_target.id) @@ -1040,38 +854,6 @@ MERGE INTO t1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1.id, s1.val); --- Now both s1 and t1 are distributed tables -SELECT undistribute_table('t1'); -SELECT create_distributed_table('t1', 'id'); - --- We have a potential pitfall where a function can be invoked in --- the MERGE conditions which can insert/update to a random shard -CREATE OR REPLACE function merge_when_and_write() RETURNS BOOLEAN -LANGUAGE PLPGSQL AS -$$ -BEGIN - INSERT INTO t1 VALUES (100, 100); - RETURN TRUE; -END; -$$; - --- Test preventing "ON" join condition from writing to the database -BEGIN; -MERGE INTO t1 -USING s1 ON t1.id = s1.id AND t1.id = 2 AND (merge_when_and_write()) -WHEN MATCHED THEN - UPDATE SET val = t1.val + s1.val; -ROLLBACK; - --- Test preventing WHEN clause(s) from writing to the database -BEGIN; -MERGE INTO t1 -USING s1 ON t1.id = s1.id AND t1.id = 2 -WHEN MATCHED AND (merge_when_and_write()) THEN - UPDATE SET val = t1.val + s1.val; -ROLLBACK; - - -- Joining on partition columns with sub-query MERGE INTO t1 USING (SELECT * FROM s1) sub ON (sub.val = t1.id) -- sub.val is not a distribution column @@ -1215,132 +997,6 @@ WHEN MATCHED THEN WHEN NOT MATCHED THEN INSERT VALUES(mv_source.id, mv_source.val); --- Distributed tables *must* be colocated -CREATE TABLE dist_target(id int, val varchar); -SELECT create_distributed_table('dist_target', 'id'); -CREATE TABLE dist_source(id int, val varchar); -SELECT create_distributed_table('dist_source', 'id', colocate_with => 'none'); - -MERGE INTO dist_target -USING dist_source -ON dist_target.id = dist_source.id -WHEN MATCHED THEN -UPDATE SET val = dist_source.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_source.id, dist_source.val); - --- Distributed tables *must* be joined on distribution column -CREATE TABLE dist_colocated(id int, val int); -SELECT create_distributed_table('dist_colocated', 'id', colocate_with => 'dist_target'); - -MERGE INTO dist_target -USING dist_colocated -ON dist_target.id = dist_colocated.val -- val is not the distribution column -WHEN MATCHED THEN -UPDATE SET val = dist_colocated.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_colocated.id, dist_colocated.val); - --- MERGE command must be joined with with a constant qual on target relation - --- AND clause is missing -MERGE INTO dist_target -USING dist_colocated -ON dist_target.id = dist_colocated.id -WHEN MATCHED THEN -UPDATE SET val = dist_colocated.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_colocated.id, dist_colocated.val); - --- AND clause incorrect table (must be target) -MERGE INTO dist_target -USING dist_colocated -ON dist_target.id = dist_colocated.id AND dist_colocated.id = 1 -WHEN MATCHED THEN -UPDATE SET val = dist_colocated.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_colocated.id, dist_colocated.val); - --- AND clause incorrect column (must be distribution column) -MERGE INTO dist_target -USING dist_colocated -ON dist_target.id = dist_colocated.id AND dist_target.val = 'const' -WHEN MATCHED THEN -UPDATE SET val = dist_colocated.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_colocated.id, dist_colocated.val); - --- Both the source and target must be distributed -MERGE INTO dist_target -USING (SELECT 100 id) AS source -ON dist_target.id = source.id AND dist_target.val = 'const' -WHEN MATCHED THEN -UPDATE SET val = 'source' -WHEN NOT MATCHED THEN -INSERT VALUES(source.id, 'source'); - --- Non-hash distributed tables (append/range). -CREATE VIEW show_tables AS -SELECT logicalrelid, partmethod -FROM pg_dist_partition -WHERE (logicalrelid = 'dist_target'::regclass) OR (logicalrelid = 'dist_source'::regclass) -ORDER BY 1; - -SELECT undistribute_table('dist_source'); -SELECT create_distributed_table('dist_source', 'id', 'append'); -SELECT * FROM show_tables; - -MERGE INTO dist_target -USING dist_source -ON dist_target.id = dist_source.id -WHEN MATCHED THEN -UPDATE SET val = dist_source.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_source.id, dist_source.val); - -SELECT undistribute_table('dist_source'); -SELECT create_distributed_table('dist_source', 'id', 'range'); -SELECT * FROM show_tables; - -MERGE INTO dist_target -USING dist_source -ON dist_target.id = dist_source.id -WHEN MATCHED THEN -UPDATE SET val = dist_source.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_source.id, dist_source.val); - --- Both are append tables -SELECT undistribute_table('dist_target'); -SELECT undistribute_table('dist_source'); -SELECT create_distributed_table('dist_target', 'id', 'append'); -SELECT create_distributed_table('dist_source', 'id', 'append'); -SELECT * FROM show_tables; - -MERGE INTO dist_target -USING dist_source -ON dist_target.id = dist_source.id -WHEN MATCHED THEN -UPDATE SET val = dist_source.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_source.id, dist_source.val); - --- Both are range tables -SELECT undistribute_table('dist_target'); -SELECT undistribute_table('dist_source'); -SELECT create_distributed_table('dist_target', 'id', 'range'); -SELECT create_distributed_table('dist_source', 'id', 'range'); -SELECT * FROM show_tables; - -MERGE INTO dist_target -USING dist_source -ON dist_target.id = dist_source.id -WHEN MATCHED THEN -UPDATE SET val = dist_source.val -WHEN NOT MATCHED THEN -INSERT VALUES(dist_source.id, dist_source.val); - DROP SERVER foreign_server CASCADE; -DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; SELECT 1 FROM master_remove_node('localhost', :master_port); diff --git a/src/test/regress/sql/pgmerge.sql b/src/test/regress/sql/pgmerge.sql index 83bf01a68..6842f516a 100644 --- a/src/test/regress/sql/pgmerge.sql +++ b/src/test/regress/sql/pgmerge.sql @@ -608,14 +608,6 @@ USING wq_source s ON t.tid = s.sid WHEN MATCHED AND (merge_when_and_write()) THEN UPDATE SET balance = t.balance + s.balance; ROLLBACK; - --- Test preventing ON condition from writing to the database -BEGIN; -MERGE INTO wq_target t -USING wq_source s ON t.tid = s.sid AND (merge_when_and_write()) -WHEN MATCHED THEN - UPDATE SET balance = t.balance + s.balance; -ROLLBACK; drop function merge_when_and_write(); DROP TABLE wq_target, wq_source;