From 44c387b978a51b0c0e87c7f9aec154cfc3041da1 Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Wed, 21 Dec 2022 19:33:13 -0800 Subject: [PATCH] Support MERGE on distributed tables with restrictions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This implements the phase - II of MERGE sql support Support routable query where all the tables in the merge-sql are distributed, co-located, and both the source and target relations are joined on the distribution column with a constant qual. This should be a Citus single-task query. Below is an example. SELECT create_distributed_table('t1', 'id'); SELECT create_distributed_table('s1', 'id', colocate_with => ‘t1’); MERGE INTO t1 USING s1 ON t1.id = s1.id AND t1.id = 100 WHEN MATCHED THEN UPDATE SET val = s1.val + 10 WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val, src) VALUES (s1.id, s1.val, s1.src) Basically, MERGE checks to see if There are a minimum of two distributed tables (source and a target). All the distributed tables are indeed colocated. MERGE relations are joined on the distribution column MERGE .. USING .. ON target.dist_key = source.dist_key The query should touch only a single shard i.e. JOIN AND with a constant qual MERGE .. USING .. ON target.dist_key = source.dist_key AND target.dist_key = <> If any of the conditions are not met, it raises an exception. --- .../distributed/deparser/ruleutils_15.c | 13 +- .../distributed/planner/distributed_planner.c | 194 +---- .../planner/fast_path_router_planner.c | 4 +- .../planner/multi_physical_planner.c | 73 +- .../planner/multi_router_planner.c | 698 ++++++++++++++---- .../relation_restriction_equivalence.c | 31 +- src/include/distributed/distributed_planner.h | 4 + .../distributed/multi_router_planner.h | 1 + .../relation_restriction_equivalence.h | 11 + src/test/regress/expected/merge.out | 690 ++++++++++++++++- .../regress/expected/multi_modifications.out | 2 +- .../expected/multi_mx_modifications.out | 2 +- .../expected/multi_shard_update_delete.out | 2 +- src/test/regress/expected/pg15.out | 10 +- src/test/regress/expected/pgmerge.out | 10 +- src/test/regress/sql/merge.sql | 350 ++++++++- src/test/regress/sql/pgmerge.sql | 8 + 17 files changed, 1719 insertions(+), 384 deletions(-) diff --git a/src/backend/distributed/deparser/ruleutils_15.c b/src/backend/distributed/deparser/ruleutils_15.c index 6dabacd49..139b2a3fd 100644 --- a/src/backend/distributed/deparser/ruleutils_15.c +++ b/src/backend/distributed/deparser/ruleutils_15.c @@ -53,6 +53,7 @@ #include "common/keywords.h" #include "distributed/citus_nodefuncs.h" #include "distributed/citus_ruleutils.h" +#include "distributed/multi_router_planner.h" #include "executor/spi.h" #include "foreign/foreign.h" #include "funcapi.h" @@ -3723,7 +3724,6 @@ static void get_merge_query_def(Query *query, deparse_context *context) { StringInfo buf = context->buf; - RangeTblEntry *targetRte; /* Insert the WITH clause if given */ get_with_clause(query, context); @@ -3731,7 +3731,7 @@ get_merge_query_def(Query *query, deparse_context *context) /* * Start the query with MERGE INTO */ - targetRte = rt_fetch(query->resultRelation, query->rtable); + RangeTblEntry *targetRte = ExtractResultRelationRTE(query); if (PRETTY_INDENT(context)) { @@ -3853,6 +3853,15 @@ get_merge_query_def(Query *query, deparse_context *context) } } + /* + * RETURNING is not supported in MERGE, so it must be NULL, but if PG adds it later + * we might miss it, let's raise an exception to investigate. + */ + if (unlikely(query->returningList)) + { + elog(ERROR, "Unexpected RETURNING clause in MERGE"); + } + ereport(DEBUG1, (errmsg("", buf->data))); } diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index cca86730c..1f768fb5d 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -75,9 +75,6 @@ static uint64 NextPlanId = 1; /* keep track of planner call stack levels */ int PlannerLevel = 0; -static void ErrorIfQueryHasUnsupportedMergeCommand(Query *queryTree, - List *rangeTableList); -static bool ContainsMergeCommandWalker(Node *node); static bool ListContainsDistributedTableRTE(List *rangeTableList, bool *maybeHasForeignDistributedTable); static bool IsUpdateOrDelete(Query *query); @@ -132,7 +129,7 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext static RTEListProperties * GetRTEListProperties(List *rangeTableList); static List * TranslatedVars(PlannerInfo *root, int relationIndex); static void WarnIfListHasForeignDistributedTable(List *rangeTableList); -static void ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList); + /* Distributed planner hook */ PlannedStmt * @@ -200,12 +197,6 @@ distributed_planner(Query *parse, if (!fastPathRouterQuery) { - /* - * Fast path queries cannot have merge command, and we - * prevent the remaining here. - */ - ErrorIfQueryHasUnsupportedMergeCommand(parse, rangeTableList); - /* * When there are partitioned tables (not applicable to fast path), * pretend that they are regular tables to avoid unnecessary work @@ -304,44 +295,11 @@ distributed_planner(Query *parse, } -/* - * ErrorIfQueryHasUnsupportedMergeCommand walks over the query tree and bails out - * if there is no Merge command (e.g., CMD_MERGE) in the query tree. For merge, - * looks for all supported combinations, throws an exception if any violations - * are seen. - */ -static void -ErrorIfQueryHasUnsupportedMergeCommand(Query *queryTree, List *rangeTableList) -{ - /* - * Postgres currently doesn't support Merge queries inside subqueries and - * ctes, but lets be defensive and do query tree walk anyway. - * - * We do not call this path for fast-path queries to avoid this additional - * overhead. - */ - if (!ContainsMergeCommandWalker((Node *) queryTree)) - { - /* No MERGE found */ - return; - } - - - /* - * In Citus we have limited support for MERGE, it's allowed - * only if all the tables(target, source or any CTE) tables - * are are local i.e. a combination of Citus local and Non-Citus - * tables (regular Postgres tables). - */ - ErrorIfMergeHasUnsupportedTables(queryTree, rangeTableList); -} - - /* * ContainsMergeCommandWalker walks over the node and finds if there are any * Merge command (e.g., CMD_MERGE) in the node. */ -static bool +bool ContainsMergeCommandWalker(Node *node) { #if PG_VERSION_NUM < PG_VERSION_15 @@ -676,7 +634,8 @@ bool IsUpdateOrDelete(Query *query) { return query->commandType == CMD_UPDATE || - query->commandType == CMD_DELETE; + query->commandType == CMD_DELETE || + query->commandType == CMD_MERGE; } @@ -2587,148 +2546,3 @@ WarnIfListHasForeignDistributedTable(List *rangeTableList) } } } - - -/* - * IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is - * permitted on special relations, such as materialized view, returns true only if - * it's a "source" relation. - */ -bool -IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte) -{ - if (!IsMergeQuery(parse)) - { - return false; - } - - RangeTblEntry *targetRte = rt_fetch(parse->resultRelation, parse->rtable); - - /* Is it a target relation? */ - if (targetRte->relid == rte->relid) - { - return false; - } - - return true; -} - - -/* - * ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE - * present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus - * tables (regular Postgres tables), raises an exception for all other combinations. - */ -static void -ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList) -{ - ListCell *tableCell = NULL; - - foreach(tableCell, rangeTableList) - { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(tableCell); - Oid relationId = rangeTableEntry->relid; - - switch (rangeTableEntry->rtekind) - { - case RTE_RELATION: - { - /* Check the relation type */ - break; - } - - case RTE_SUBQUERY: - case RTE_FUNCTION: - case RTE_TABLEFUNC: - case RTE_VALUES: - case RTE_JOIN: - case RTE_CTE: - { - /* Skip them as base table(s) will be checked */ - continue; - } - - /* - * RTE_NAMEDTUPLESTORE is typically used in ephmeral named relations, - * such as, trigger data; until we find a genuine use case, raise an - * exception. - * RTE_RESULT is a node added by the planner and we shouldn't - * encounter it in the parse tree. - */ - case RTE_NAMEDTUPLESTORE: - case RTE_RESULT: - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("MERGE command is not supported with " - "Tuplestores and results"))); - break; - } - - default: - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("MERGE command: Unrecognized range table entry."))); - } - } - - /* RTE Relation can be of various types, check them now */ - - /* skip the regular views as they are replaced with subqueries */ - if (rangeTableEntry->relkind == RELKIND_VIEW) - { - continue; - } - - if (rangeTableEntry->relkind == RELKIND_MATVIEW || - rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE) - { - /* Materialized view or Foreign table as target is not allowed */ - if (IsMergeAllowedOnRelation(parse, rangeTableEntry)) - { - /* Non target relation is ok */ - continue; - } - else - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("MERGE command is not allowed " - "on materialized view"))); - } - } - - if (rangeTableEntry->relkind != RELKIND_RELATION && - rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Unexpected relation type(relkind:%c) in MERGE command", - rangeTableEntry->relkind))); - } - - Assert(rangeTableEntry->relid != 0); - - /* Distributed tables and Reference tables are not supported yet */ - if (IsCitusTableType(relationId, REFERENCE_TABLE) || - IsCitusTableType(relationId, DISTRIBUTED_TABLE)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("MERGE command is not supported on " - "distributed/reference tables yet"))); - } - - /* Regular Postgres tables and Citus local tables are allowed */ - if (!IsCitusTable(relationId) || - IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) - { - continue; - } - - - /* Any other Citus table type missing ? */ - } - - /* All the tables are local, supported */ -} diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index b947c036f..f585e2494 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -54,8 +54,6 @@ bool EnableFastPathRouterPlanner = true; static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey); -static bool ConjunctionContainsColumnFilter(Node *node, Var *column, - Node **distributionKeyValue); static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, Node **distributionKeyValue); @@ -294,7 +292,7 @@ ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey) * * If the conjuction contains column filter which is const, distributionKeyValue is set. */ -static bool +bool ConjunctionContainsColumnFilter(Node *node, Var *column, Node **distributionKeyValue) { if (node == NULL) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index a2590d48d..6e237b546 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -164,6 +164,7 @@ static uint32 HashPartitionCount(void); static Job * BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestrictionContext); static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction); +static bool IsOuterTableOfOuterJoin(RelationRestriction *relationRestriction); static void ErrorIfUnsupportedShardDistribution(Query *query); static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, RelationRestrictionContext *restrictionContext, @@ -2225,19 +2226,34 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, } /* - * For left joins we don't care about the shards pruned for the right hand side. - * If the right hand side would prune to a smaller set we should still send it to - * all tables of the left hand side. However if the right hand side is bigger than - * the left hand side we don't have to send the query to any shard that is not - * matching anything on the left hand side. - * - * Instead we will simply skip any RelationRestriction if it is an OUTER join and - * the table is part of the non-outer side of the join. + * Skip adding shards of non-target (outer)relations. + * Note: This is a stop-gap arrangement for phase-I where in sql + * generates a single task on the shard identified by constant + * qual(filter) on the target relation. */ - if (IsInnerTableOfOuterJoin(relationRestriction)) + if (IsMergeQuery(query) && + IsOuterTableOfOuterJoin(relationRestriction)) { continue; } + else if (!IsMergeQuery(query) && + IsInnerTableOfOuterJoin(relationRestriction)) + { + /* + * For left joins we don't care about the shards pruned for + * the right hand side. If the right hand side would prune + * to a smaller set we should still send it to all tables + * of the left hand side. However if the right hand side is + * bigger than the left hand side we don't have to send the + * query to any shard that is not matching anything on the + * left hand side. + * + * Instead we will simply skip any RelationRestriction if it + * is an OUTER join and the table is part of the non-outer + * side of the join. + */ + continue; + } ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, prunedShardList) @@ -2302,6 +2318,45 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, } +/* + * IsOuterTableOfOuterJoin tests based on the join information envoded in a + * RelationRestriction if the table accessed for this relation is + * a) in an outer join + * b) on the outer part of said join + * + * The function returns true only if both conditions above hold true + */ +static bool +IsOuterTableOfOuterJoin(RelationRestriction *relationRestriction) +{ + RestrictInfo *joinInfo = NULL; + foreach_ptr(joinInfo, relationRestriction->relOptInfo->joininfo) + { + if (joinInfo->outer_relids == NULL) + { + /* not an outer join */ + continue; + } + + /* + * This join restriction info describes an outer join, we need to figure out if + * our table is in the outer part of this join. If that is the case this is a + * outer table of an outer join. + */ + bool isInOuter = bms_is_member(relationRestriction->relOptInfo->relid, + joinInfo->outer_relids); + if (isInOuter) + { + /* this table is joined in the outer part of an outer join */ + return true; + } + } + + /* we have not found any join clause that satisfies both requirements */ + return false; +} + + /* * IsInnerTableOfOuterJoin tests based on the join information envoded in a * RelationRestriction if the table accessed for this relation is diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 631322e80..16cf7926b 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -121,7 +121,6 @@ static void CreateSingleTaskRouterSelectPlan(DistributedPlan *distributedPlan, Query *query, PlannerRestrictionContext * plannerRestrictionContext); -static Oid ResultRelationOidForQuery(Query *query); static bool IsTidColumn(Node *node); static DeferredErrorMessage * ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, @@ -180,6 +179,24 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList); static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList); static bool IsLocallyAccessibleCitusLocalTable(Oid relationId); +static bool QueryHasMergeCommand(Query *queryTree); +static DeferredErrorMessage * MergeQuerySupported(Query *originalQuery, + PlannerRestrictionContext * + plannerRestrictionContext); +static DeferredErrorMessage * ErrorIfMergeHasUnsupportedTables(Query *parse, + List *rangeTableList, + PlannerRestrictionContext * + restrictionContext); +static DeferredErrorMessage * ErrorIfDistTablesNotColocated(Query *parse, + List *distTablesList, + PlannerRestrictionContext * + plannerRestrictionContext); +static DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId, + FromExpr *joinTree, + Node *quals, + List *targetList, + CmdType commandType, + List *returningList); /* @@ -445,7 +462,7 @@ ModifyQueryResultRelationId(Query *query) * ResultRelationOidForQuery returns the OID of the relation this is modified * by a given query. */ -static Oid +Oid ResultRelationOidForQuery(Query *query) { RangeTblEntry *resultRTE = rt_fetch(query->resultRelation, query->rtable); @@ -512,6 +529,161 @@ IsTidColumn(Node *node) } +/* + * TargetlistAndFunctionsSupported implements a subset of what ModifyPartialQuerySupported + * checks, that subset being checking what functions are allowed, if we are + * updating distribution column, etc. + * Note: This subset of checks are repeated for each MERGE modify action. + */ +static DeferredErrorMessage * +TargetlistAndFunctionsSupported(Oid resultRelationId, FromExpr *joinTree, Node *quals, + List *targetList, + CmdType commandType, List *returningList) +{ + uint32 rangeTableId = 1; + Var *partitionColumn = NULL; + + if (IsCitusTable(resultRelationId)) + { + partitionColumn = PartitionColumn(resultRelationId, rangeTableId); + } + + bool hasVarArgument = false; /* A STABLE function is passed a Var argument */ + bool hasBadCoalesce = false; /* CASE/COALESCE passed a mutable function */ + ListCell *targetEntryCell = NULL; + + foreach(targetEntryCell, targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + + /* skip resjunk entries: UPDATE adds some for ctid, etc. */ + if (targetEntry->resjunk) + { + continue; + } + + bool targetEntryPartitionColumn = false; + AttrNumber targetColumnAttrNumber = InvalidAttrNumber; + + /* reference tables do not have partition column */ + if (partitionColumn == NULL) + { + targetEntryPartitionColumn = false; + } + else + { + if (commandType == CMD_UPDATE) + { + /* + * Note that it is not possible to give an alias to + * UPDATE table SET ... + */ + if (targetEntry->resname) + { + targetColumnAttrNumber = get_attnum(resultRelationId, + targetEntry->resname); + if (targetColumnAttrNumber == partitionColumn->varattno) + { + targetEntryPartitionColumn = true; + } + } + } + } + + + if (commandType == CMD_UPDATE && + FindNodeMatchingCheckFunction((Node *) targetEntry->expr, + CitusIsVolatileFunction)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "functions used in UPDATE queries on distributed " + "tables must not be VOLATILE", + NULL, NULL); + } + + if (commandType == CMD_UPDATE && targetEntryPartitionColumn && + TargetEntryChangesValue(targetEntry, partitionColumn, + joinTree)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "modifying the partition value of rows is not " + "allowed", + NULL, NULL); + } + + if (commandType == CMD_UPDATE && + MasterIrreducibleExpression((Node *) targetEntry->expr, + &hasVarArgument, &hasBadCoalesce)) + { + Assert(hasVarArgument || hasBadCoalesce); + } + + if (FindNodeMatchingCheckFunction((Node *) targetEntry->expr, + NodeIsFieldStore)) + { + /* DELETE cannot do field indirection already */ + Assert(commandType == CMD_UPDATE || commandType == CMD_INSERT); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "inserting or modifying composite type fields is not " + "supported", NULL, + "Use the column name to insert or update the composite " + "type as a single value"); + } + } + + if (joinTree != NULL) + { + if (FindNodeMatchingCheckFunction((Node *) quals, + CitusIsVolatileFunction)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "functions used in the WHERE/ON/WHEN clause of modification " + "queries on distributed tables must not be VOLATILE", + NULL, NULL); + } + else if (MasterIrreducibleExpression(quals, &hasVarArgument, + &hasBadCoalesce)) + { + Assert(hasVarArgument || hasBadCoalesce); + } + } + + if (hasVarArgument) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "STABLE functions used in UPDATE queries " + "cannot be called with column references", + NULL, NULL); + } + + if (hasBadCoalesce) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "non-IMMUTABLE functions are not allowed in CASE or " + "COALESCE statements", + NULL, NULL); + } + + if (contain_mutable_functions((Node *) returningList)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "non-IMMUTABLE functions are not allowed in the " + "RETURNING clause", + NULL, NULL); + } + + if (quals != NULL && + nodeTag(quals) == T_CurrentOfExpr) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot run DML queries with cursors", NULL, + NULL); + } + + return NULL; +} + + /* * ModifyPartialQuerySupported implements a subset of what ModifyQuerySupported checks, * that subset being what's necessary to check modifying CTEs for. @@ -620,148 +792,21 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, Oid resultRelationId = ModifyQueryResultRelationId(queryTree); *distributedTableIdOutput = resultRelationId; - uint32 rangeTableId = 1; - Var *partitionColumn = NULL; - if (IsCitusTable(resultRelationId)) - { - partitionColumn = PartitionColumn(resultRelationId, rangeTableId); - } commandType = queryTree->commandType; if (commandType == CMD_INSERT || commandType == CMD_UPDATE || commandType == CMD_DELETE) { - bool hasVarArgument = false; /* A STABLE function is passed a Var argument */ - bool hasBadCoalesce = false; /* CASE/COALESCE passed a mutable function */ - FromExpr *joinTree = queryTree->jointree; - ListCell *targetEntryCell = NULL; - - foreach(targetEntryCell, queryTree->targetList) + deferredError = + TargetlistAndFunctionsSupported(resultRelationId, + queryTree->jointree, + queryTree->jointree->quals, + queryTree->targetList, + commandType, + queryTree->returningList); + if (deferredError) { - TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - - /* skip resjunk entries: UPDATE adds some for ctid, etc. */ - if (targetEntry->resjunk) - { - continue; - } - - bool targetEntryPartitionColumn = false; - AttrNumber targetColumnAttrNumber = InvalidAttrNumber; - - /* reference tables do not have partition column */ - if (partitionColumn == NULL) - { - targetEntryPartitionColumn = false; - } - else - { - if (commandType == CMD_UPDATE) - { - /* - * Note that it is not possible to give an alias to - * UPDATE table SET ... - */ - if (targetEntry->resname) - { - targetColumnAttrNumber = get_attnum(resultRelationId, - targetEntry->resname); - if (targetColumnAttrNumber == partitionColumn->varattno) - { - targetEntryPartitionColumn = true; - } - } - } - } - - - if (commandType == CMD_UPDATE && - FindNodeMatchingCheckFunction((Node *) targetEntry->expr, - CitusIsVolatileFunction)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "functions used in UPDATE queries on distributed " - "tables must not be VOLATILE", - NULL, NULL); - } - - if (commandType == CMD_UPDATE && targetEntryPartitionColumn && - TargetEntryChangesValue(targetEntry, partitionColumn, - queryTree->jointree)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "modifying the partition value of rows is not " - "allowed", - NULL, NULL); - } - - if (commandType == CMD_UPDATE && - MasterIrreducibleExpression((Node *) targetEntry->expr, - &hasVarArgument, &hasBadCoalesce)) - { - Assert(hasVarArgument || hasBadCoalesce); - } - - if (FindNodeMatchingCheckFunction((Node *) targetEntry->expr, - NodeIsFieldStore)) - { - /* DELETE cannot do field indirection already */ - Assert(commandType == CMD_UPDATE || commandType == CMD_INSERT); - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "inserting or modifying composite type fields is not " - "supported", NULL, - "Use the column name to insert or update the composite " - "type as a single value"); - } - } - - if (joinTree != NULL) - { - if (FindNodeMatchingCheckFunction((Node *) joinTree->quals, - CitusIsVolatileFunction)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "functions used in the WHERE clause of modification " - "queries on distributed tables must not be VOLATILE", - NULL, NULL); - } - else if (MasterIrreducibleExpression(joinTree->quals, &hasVarArgument, - &hasBadCoalesce)) - { - Assert(hasVarArgument || hasBadCoalesce); - } - } - - if (hasVarArgument) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "STABLE functions used in UPDATE queries " - "cannot be called with column references", - NULL, NULL); - } - - if (hasBadCoalesce) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "non-IMMUTABLE functions are not allowed in CASE or " - "COALESCE statements", - NULL, NULL); - } - - if (contain_mutable_functions((Node *) queryTree->returningList)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "non-IMMUTABLE functions are not allowed in the " - "RETURNING clause", - NULL, NULL); - } - - if (queryTree->jointree->quals != NULL && - nodeTag(queryTree->jointree->quals) == T_CurrentOfExpr) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot run DML queries with cursors", NULL, - NULL); + return deferredError; } } @@ -873,6 +918,85 @@ NodeIsFieldStore(Node *node) } +/* + * MergeQuerySupported does check for a MERGE command in the query, if it finds + * one, it will verify the below criteria + * - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables + * - Distributed tables requirements in ErrorIfDistTablesNotColocated + * - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported + */ +static DeferredErrorMessage * +MergeQuerySupported(Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext) +{ + /* For non-MERGE commands it's a no-op */ + if (!QueryHasMergeCommand(originalQuery)) + { + return NULL; + } + + List *rangeTableList = ExtractRangeTableEntryList(originalQuery); + RangeTblEntry *resultRte = ExtractResultRelationRTE(originalQuery); + + /* + * Fast path queries cannot have merge command, and we prevent the remaining here. + * In Citus we have limited support for MERGE, it's allowed only if all + * the tables(target, source or any CTE) tables are are local i.e. a + * combination of Citus local and Non-Citus tables (regular Postgres tables) + * or distributed tables with some restrictions, please see header of routine + * ErrorIfDistTablesNotColocated for details. + */ + DeferredErrorMessage *deferredError = + ErrorIfMergeHasUnsupportedTables(originalQuery, + rangeTableList, + plannerRestrictionContext); + if (deferredError) + { + return deferredError; + } + + Oid resultRelationId = resultRte->relid; + deferredError = + TargetlistAndFunctionsSupported(resultRelationId, + originalQuery->jointree, + originalQuery->jointree->quals, + originalQuery->targetList, + originalQuery->commandType, + originalQuery->returningList); + if (deferredError) + { + return deferredError; + } + + #if PG_VERSION_NUM >= PG_VERSION_15 + + /* + * MERGE is a special case where we have multiple modify statements + * within itself. Check each INSERT/UPDATE/DELETE individually. + */ + MergeAction *action = NULL; + foreach_ptr(action, originalQuery->mergeActionList) + { + Assert(originalQuery->returningList == NULL); + deferredError = + TargetlistAndFunctionsSupported(resultRelationId, + originalQuery->jointree, + action->qual, + action->targetList, + action->commandType, + originalQuery->returningList); + if (deferredError) + { + return deferredError; + } + } + + #endif + + return NULL; +} + + /* * ModifyQuerySupported returns NULL if the query only contains supported * features, otherwise it returns an error description. @@ -888,8 +1012,17 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer PlannerRestrictionContext *plannerRestrictionContext) { Oid distributedTableId = InvalidOid; - DeferredErrorMessage *error = ModifyPartialQuerySupported(queryTree, multiShardQuery, - &distributedTableId); + DeferredErrorMessage *error = MergeQuerySupported(originalQuery, + plannerRestrictionContext); + if (error) + { + /* + * For MERGE, we do not do recursive plannning, simply bail out. + */ + RaiseDeferredError(error, ERROR); + } + + error = ModifyPartialQuerySupported(queryTree, multiShardQuery, &distributedTableId); if (error) { return error; @@ -3941,3 +4074,288 @@ CompareInsertValuesByShardId(const void *leftElement, const void *rightElement) } } } + + +/* + * IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is + * permitted on special relations, such as materialized view, returns true only if + * it's a "source" relation. + */ +bool +IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte) +{ + if (!IsMergeQuery(parse)) + { + return false; + } + + RangeTblEntry *targetRte = rt_fetch(parse->resultRelation, parse->rtable); + + /* Is it a target relation? */ + if (targetRte->relid == rte->relid) + { + return false; + } + + return true; +} + + +/* + * ErrorIfDistTablesNotColocated Checks to see if + * + * - There are a minimum of two distributed tables (source and a target). + * - All the distributed tables are indeed colocated. + * - MERGE relations are joined on the distribution column + * MERGE .. USING .. ON target.dist_key = source.dist_key + * - The query should touch only a single shard i.e. JOIN AND with a constant qual + * MERGE .. USING .. ON target.dist_key = source.dist_key AND target.dist_key = <> + * + * If any of the conditions are not met, it raises an exception. + */ +static DeferredErrorMessage * +ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList, + PlannerRestrictionContext *plannerRestrictionContext) +{ + /* All MERGE tables must be distributed */ + if (list_length(distTablesList) < 2) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "For MERGE command, both the source and target " + "must be distributed", NULL, NULL); + } + + /* All distributed tables must be colocated */ + if (!AllRelationsInListColocated(distTablesList, RANGETABLE_ENTRY)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "For MERGE command, all the distributed tables " + "must be colocated", NULL, NULL); + } + + /* Are source and target tables joined on distribution column? */ + if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command is only supported when distributed " + "tables are joined on their distribution column", + NULL, NULL); + } + + /* Look for a constant qual i.e. AND target.dist_key = <> */ + Node *distributionKeyValue = NULL; + Oid targetRelId = ResultRelationOidForQuery(parse); + Var *distributionKey = PartitionColumn(targetRelId, 1); + + Assert(distributionKey); + + /* convert list of expressions into expression tree for further processing */ + Node *quals = parse->jointree->quals; + + if (quals && IsA(quals, List)) + { + quals = (Node *) make_ands_explicit((List *) quals); + } + + if (!ConjunctionContainsColumnFilter(quals, distributionKey, &distributionKeyValue)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE on a distributed table requires a constant filter " + "on the distribution column of the target table", NULL, + "Consider adding AND target.dist_key = <> to the ON clause"); + } + + return NULL; +} + + +/* + * ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE + * present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus + * tables (regular Postgres tables), or distributed tables with some restrictions, please + * see header of routine ErrorIfDistTablesNotColocated for details, raises an exception + * for all other combinations. + */ +static DeferredErrorMessage * +ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, + PlannerRestrictionContext *restrictionContext) +{ + List *distTablesList = NIL; + bool foundLocalTables = false; + + RangeTblEntry *rangeTableEntry = NULL; + foreach_ptr(rangeTableEntry, rangeTableList) + { + Oid relationId = rangeTableEntry->relid; + + switch (rangeTableEntry->rtekind) + { + case RTE_RELATION: + { + /* Check the relation type */ + break; + } + + case RTE_SUBQUERY: + case RTE_FUNCTION: + case RTE_TABLEFUNC: + case RTE_VALUES: + case RTE_JOIN: + case RTE_CTE: + { + /* Skip them as base table(s) will be checked */ + continue; + } + + /* + * RTE_NAMEDTUPLESTORE is typically used in ephmeral named relations, + * such as, trigger data; until we find a genuine use case, raise an + * exception. + * RTE_RESULT is a node added by the planner and we shouldn't + * encounter it in the parse tree. + */ + case RTE_NAMEDTUPLESTORE: + case RTE_RESULT: + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command is not supported with " + "Tuplestores and results", + NULL, NULL); + } + + default: + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command: Unrecognized range table entry.", + NULL, NULL); + } + } + + /* RTE Relation can be of various types, check them now */ + + /* skip the regular views as they are replaced with subqueries */ + if (rangeTableEntry->relkind == RELKIND_VIEW) + { + continue; + } + + if (rangeTableEntry->relkind == RELKIND_MATVIEW || + rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE) + { + /* Materialized view or Foreign table as target is not allowed */ + if (IsMergeAllowedOnRelation(parse, rangeTableEntry)) + { + /* Non target relation is ok */ + continue; + } + else + { + /* Usually we don't reach this exception as the Postgres parser catches it */ + StringInfo errorMessage = makeStringInfo(); + appendStringInfo(errorMessage, + "MERGE command is not allowed on " + "relation type(relkind:%c)", rangeTableEntry->relkind); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, + NULL, NULL); + } + } + + if (rangeTableEntry->relkind != RELKIND_RELATION && + rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE) + { + StringInfo errorMessage = makeStringInfo(); + appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) " + "in MERGE command", rangeTableEntry->relkind); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, + NULL, NULL); + } + + Assert(rangeTableEntry->relid != 0); + + /* Reference tables are not supported yet */ + if (IsCitusTableType(relationId, REFERENCE_TABLE)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command is not supported on reference " + "tables yet", NULL, NULL); + } + + /* Append/Range tables are not supported */ + if (IsCitusTableType(relationId, APPEND_DISTRIBUTED) || + IsCitusTableType(relationId, RANGE_DISTRIBUTED)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "For MERGE command, all the distributed tables " + "must be colocated, for append/range distribution, " + "colocation is not supported", NULL, + "Consider using hash distribution instead"); + } + + /* + * For now, save all distributed tables, later (below) we will + * check for supported combination(s). + */ + if (IsCitusTableType(relationId, DISTRIBUTED_TABLE)) + { + distTablesList = lappend(distTablesList, rangeTableEntry); + continue; + } + + /* Regular Postgres tables and Citus local tables are allowed */ + if (!IsCitusTable(relationId) || + IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) + { + foundLocalTables = true; + continue; + } + + /* Any other Citus table type missing ? */ + } + + /* Ensure all tables are indeed local */ + if (foundLocalTables && list_length(distTablesList) == 0) + { + /* All the tables are local, supported */ + return NULL; + } + else if (foundLocalTables && list_length(distTablesList) > 0) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command is not supported with " + "combination of distributed/local tables yet", + NULL, NULL); + } + + /* Ensure all distributed tables are indeed co-located */ + return ErrorIfDistTablesNotColocated(parse, distTablesList, restrictionContext); +} + + +/* + * QueryHasMergeCommand walks over the query tree and returns false if there + * is no Merge command (e.g., CMD_MERGE), true otherwise. + */ +static bool +QueryHasMergeCommand(Query *queryTree) +{ + /* function is void for pre-15 versions of Postgres */ + #if PG_VERSION_NUM < PG_VERSION_15 + return false; + #else + + /* + * Postgres currently doesn't support Merge queries inside subqueries and + * ctes, but lets be defensive and do query tree walk anyway. + * + * We do not call this path for fast-path queries to avoid this additional + * overhead. + */ + if (!ContainsMergeCommandWalker((Node *) queryTree)) + { + /* No MERGE found */ + return false; + } + + return true; + #endif +} diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index 15c08f8b8..11b335768 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -151,8 +151,6 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass secondClass); static Var * PartitionKeyForRTEIdentityInQuery(Query *query, int targetRTEIndex, Index *partitionKeyIndex); -static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext * - restrictionContext); static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node); static JoinRestrictionContext * FilterJoinRestrictionContext( JoinRestrictionContext *joinRestrictionContext, Relids @@ -383,7 +381,8 @@ SafeToPushdownUnionSubquery(Query *originalQuery, return false; } - if (!AllRelationsInRestrictionContextColocated(restrictionContext)) + if (!AllRelationsInListColocated(restrictionContext->relationRestrictionList, + RESTRICTION_CONTEXT)) { /* distribution columns are equal, but tables are not co-located */ return false; @@ -1905,19 +1904,33 @@ FindQueryContainingRTEIdentityInternal(Node *node, /* - * AllRelationsInRestrictionContextColocated determines whether all of the relations in the - * given relation restrictions list are co-located. + * AllRelationsInListColocated determines whether all of the relations in the + * given list are co-located. + * Note: The list can be of dofferent types, which is specified by ListEntryType */ -static bool -AllRelationsInRestrictionContextColocated(RelationRestrictionContext *restrictionContext) +bool +AllRelationsInListColocated(List *relationList, ListEntryType entryType) { + void *varPtr = NULL; + RangeTblEntry *rangeTableEntry = NULL; RelationRestriction *relationRestriction = NULL; int initialColocationId = INVALID_COLOCATION_ID; /* check whether all relations exists in the main restriction list */ - foreach_ptr(relationRestriction, restrictionContext->relationRestrictionList) + foreach_ptr(varPtr, relationList) { - Oid relationId = relationRestriction->relationId; + Oid relationId = InvalidOid; + + if (entryType == RANGETABLE_ENTRY) + { + rangeTableEntry = (RangeTblEntry *) varPtr; + relationId = rangeTableEntry->relid; + } + else if (entryType == RESTRICTION_CONTEXT) + { + relationRestriction = (RelationRestriction *) varPtr; + relationId = relationRestriction->relationId; + } if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 31eceea40..588efd254 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -254,5 +254,9 @@ extern struct DistributedPlan * CreateDistributedPlan(uint64 planId, Query *orig plannerRestrictionContext); extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte); +extern bool ConjunctionContainsColumnFilter(Node *node, + Var *column, + Node **distributionKeyValue); +extern bool ContainsMergeCommandWalker(Node *node); #endif /* DISTRIBUTED_PLANNER_H */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 62d698b51..07d160865 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -99,6 +99,7 @@ extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamLi boundParams); extern bool FastPathRouterQuery(Query *query, Node **distributionKeyValue); extern bool JoinConditionIsOnFalse(List *relOptInfo); +extern Oid ResultRelationOidForQuery(Query *query); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index ccd50a6db..4fd9c7015 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -17,6 +17,15 @@ #define SINGLE_RTE_INDEX 1 +/* + * Represents the pointer type that's being passed in the list. + */ +typedef enum ListEntryType +{ + RANGETABLE_ENTRY, /* RangeTblEntry */ + RESTRICTION_CONTEXT /* RelationRestriction */ +} ListEntryType; + extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); @@ -54,4 +63,6 @@ extern RelationRestrictionContext * FilterRelationRestrictionContext( RelationRestrictionContext *relationRestrictionContext, Relids queryRteIdentities); +extern bool AllRelationsInListColocated(List *relationList, ListEntryType entryType); + #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */ diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 6fc472b70..cc87193a0 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -18,6 +18,7 @@ SET search_path TO merge_schema; SET citus.shard_count TO 4; SET citus.next_shard_id TO 4000000; SET citus.explain_all_tasks to true; +SET citus.shard_replication_factor TO 1; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata ?column? @@ -214,9 +215,18 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) +-- Updates one of the row with customer_id = 30002 +SELECT * from target t WHERE t.customer_id = 30002; + customer_id | last_order_id | order_center | order_count | last_order +--------------------------------------------------------------------- + 30002 | 103 | AX | -1 | Sun Jan 17 19:53:00 2021 +(1 row) + +-- Turn on notice to print tasks sent to nodes (it should be a single task) +SET citus.log_remote_commands to true; MERGE INTO target t USING source s - ON (t.customer_id = s.customer_id) + ON (t.customer_id = s.customer_id) AND t.customer_id = 30002 WHEN MATCHED AND t.order_center = 'XX' THEN DELETE WHEN MATCHED THEN @@ -226,7 +236,39 @@ MERGE INTO target t WHEN NOT MATCHED THEN -- New entry, record it. INSERT (customer_id, last_order_id, order_center, order_count, last_order) VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time); -ERROR: MERGE command is not supported on distributed/reference tables yet +NOTICE: issuing MERGE INTO merge_schema.target_4000002 t USING merge_schema.source_4000006 s ON ((t.customer_id OPERATOR(pg_catalog.=) s.customer_id) AND (t.customer_id OPERATOR(pg_catalog.=) 30002)) WHEN MATCHED AND ((t.order_center COLLATE "default") OPERATOR(pg_catalog.=) 'XX'::text) THEN DELETE WHEN MATCHED THEN UPDATE SET last_order_id = s.order_id, order_count = (t.order_count OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (customer_id, last_order_id, order_center, order_count, last_order) VALUES (s.customer_id, s.order_id, s.order_center, 123, s.order_time) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT * from target t WHERE t.customer_id = 30002; + customer_id | last_order_id | order_center | order_count | last_order +--------------------------------------------------------------------- + 30002 | 103 | AX | 0 | Sun Jan 17 19:53:00 2021 +(1 row) + +-- Deletes one of the row with customer_id = 30004 +SELECT * from target t WHERE t.customer_id = 30004; + customer_id | last_order_id | order_center | order_count | last_order +--------------------------------------------------------------------- + 30004 | 99 | XX | -1 | Fri Sep 11 03:23:00 2020 +(1 row) + +MERGE INTO target t + USING source s + ON (t.customer_id = s.customer_id) AND t.customer_id = 30004 + WHEN MATCHED AND t.order_center = 'XX' THEN + DELETE + WHEN MATCHED THEN + UPDATE SET -- Existing customer, update the order count and last_order_id + order_count = t.order_count + 1, + last_order_id = s.order_id + WHEN NOT MATCHED THEN -- New entry, record it. + INSERT (customer_id, last_order_id, order_center, order_count, last_order) + VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time); +SELECT * from target t WHERE t.customer_id = 30004; + customer_id | last_order_id | order_center | order_count | last_order +--------------------------------------------------------------------- +(0 rows) + -- -- Test MERGE with CTE as source -- @@ -386,18 +428,39 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) +SELECT * FROM t1 order by id; + id | val +--------------------------------------------------------------------- + 1 | 0 + 2 | 0 + 5 | 0 +(3 rows) + +SET citus.log_remote_commands to true; WITH s1_res AS ( SELECT * FROM s1 ) MERGE INTO t1 - USING s1_res ON (s1_res.id = t1.id) + USING s1_res ON (s1_res.id = t1.id) AND t1.id = 6 WHEN MATCHED AND s1_res.val = 0 THEN DELETE WHEN MATCHED THEN UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_4000018 s1) MERGE INTO merge_schema.t1_4000014 t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +-- As the id 6 is NO match, VALUES(6, 1) should appear in target +SELECT * FROM t1 order by id; + id | val +--------------------------------------------------------------------- + 1 | 0 + 2 | 0 + 5 | 0 + 6 | 1 +(4 rows) + -- -- Test with multiple join conditions -- @@ -553,16 +616,39 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) +SELECT * FROM t2 ORDER BY 1; + id | val | src +--------------------------------------------------------------------- + 1 | 0 | target + 2 | 0 | target + 3 | 1 | match + 4 | 0 | match +(4 rows) + +SET citus.log_remote_commands to true; MERGE INTO t2 USING s2 -ON t2.id = s2.id AND t2.src = s2.src +ON t2.id = s2.id AND t2.src = s2.src AND t2.id = 4 WHEN MATCHED AND t2.val = 1 THEN UPDATE SET val = s2.val + 10 WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src); -ERROR: MERGE command is not supported on distributed/reference tables yet +NOTICE: issuing MERGE INTO merge_schema.t2_4000023 t2 USING merge_schema.s2_4000027 s2 ON ((t2.id OPERATOR(pg_catalog.=) s2.id) AND (t2.src OPERATOR(pg_catalog.=) s2.src) AND (t2.id OPERATOR(pg_catalog.=) 4)) WHEN MATCHED AND (t2.val OPERATOR(pg_catalog.=) 1) THEN UPDATE SET val = (s2.val OPERATOR(pg_catalog.+) 10) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +-- Row with id = 4 is a match for delete clause, row should be deleted +-- Row with id = 3 is a NO match, row from source will be inserted +SELECT * FROM t2 ORDER BY 1; + id | val | src +--------------------------------------------------------------------- + 1 | 0 | target + 2 | 0 | target + 3 | 1 | match + 3 | 10 | match +(4 rows) + -- -- With sub-query as the MERGE source -- @@ -1213,9 +1299,261 @@ SELECT * FROM ft_target; 3 | source (2 rows) +-- +-- complex joins on the source side +-- +-- source(join of two relations) relation is an unaliased join +CREATE TABLE target_cj(tid int, src text, val int); +CREATE TABLE source_cj1(sid1 int, src1 text, val1 int); +CREATE TABLE source_cj2(sid2 int, src2 text, val2 int); +INSERT INTO target_cj VALUES (1, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (3, 'target', 0); +INSERT INTO source_cj1 VALUES (2, 'source-1', 10); +INSERT INTO source_cj2 VALUES (2, 'source-2', 20); +BEGIN; +MERGE INTO target_cj t +USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src2 +WHEN NOT MATCHED THEN + DO NOTHING; +-- Gold result to compare against +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | source-2 | 0 + 2 | source-2 | 0 + 3 | target | 0 +(4 rows) + +ROLLBACK; +BEGIN; +-- try accessing columns from either side of the source join +MERGE INTO target_cj t +USING source_cj1 s2 + INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET tid = sid2, src = src1, val = val2 +WHEN NOT MATCHED THEN + DO NOTHING; +-- Gold result to compare against +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | source-1 | 20 + 2 | source-1 | 20 + 3 | target | 0 +(4 rows) + +ROLLBACK; +-- Test the same scenarios with distributed tables +SELECT create_distributed_table('target_cj', 'tid'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.target_cj$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('source_cj1', 'sid1'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.source_cj1$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('source_cj2', 'sid2'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.source_cj2$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO target_cj t +USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src2 +WHEN NOT MATCHED THEN + DO NOTHING; +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.target_cj_4000050 t USING (merge_schema.source_cj1_4000054 s1 JOIN merge_schema.source_cj2_4000058 s2 ON ((s1.sid1 OPERATOR(pg_catalog.=) s2.sid2))) ON ((t.tid OPERATOR(pg_catalog.=) s1.sid1) AND (t.tid OPERATOR(pg_catalog.=) 2)) WHEN MATCHED THEN UPDATE SET src = s2.src2 WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | source-2 | 0 + 2 | source-2 | 0 + 3 | target | 0 +(4 rows) + +ROLLBACK; +BEGIN; +-- try accessing columns from either side of the source join +MERGE INTO target_cj t +USING source_cj1 s2 + INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src1, val = val2 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | source-1 | 20 + 2 | source-1 | 20 + 3 | target | 0 +(4 rows) + +ROLLBACK; +-- sub-query as a source +BEGIN; +MERGE INTO target_cj t +USING (SELECT * FROM source_cj1 WHERE sid1 = 2) sub +ON t.tid = sub.sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = sub.src1, val = val1 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | source-1 | 10 + 2 | source-1 | 10 + 3 | target | 0 +(4 rows) + +ROLLBACK; +-- Test self-join +BEGIN; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | target | 0 + 2 | target | 0 + 3 | target | 0 +(4 rows) + +set citus.log_remote_commands to true; +MERGE INTO target_cj t1 +USING (SELECT * FROM target_cj) sub +ON t1.tid = sub.tid AND t1.tid = 3 +WHEN MATCHED THEN + UPDATE SET src = sub.src, val = sub.val + 100 +WHEN NOT MATCHED THEN + DO NOTHING; +NOTICE: issuing MERGE INTO merge_schema.target_cj_4000048 t1 USING (SELECT target_cj.tid, target_cj.src, target_cj.val FROM merge_schema.target_cj_4000048 target_cj) sub ON ((t1.tid OPERATOR(pg_catalog.=) sub.tid) AND (t1.tid OPERATOR(pg_catalog.=) 3)) WHEN MATCHED THEN UPDATE SET src = sub.src, val = (sub.val OPERATOR(pg_catalog.+) 100) WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +set citus.log_remote_commands to false; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | target | 0 + 2 | target | 0 + 3 | target | 100 +(4 rows) + +ROLLBACK; +-- Test PREPARE +PREPARE foo(int) AS +MERGE INTO target_cj target +USING (SELECT * FROM source_cj1) sub +ON target.tid = sub.sid1 AND target.tid = $1 +WHEN MATCHED THEN + UPDATE SET val = sub.val1 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | target | 0 + 2 | target | 0 + 3 | target | 0 +(4 rows) + +BEGIN; +EXECUTE foo(2); +EXECUTE foo(2); +EXECUTE foo(2); +EXECUTE foo(2); +EXECUTE foo(2); +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | target | 10 + 2 | target | 10 + 3 | target | 0 +(4 rows) + +ROLLBACK; +BEGIN; +SET citus.log_remote_commands to true; +SET client_min_messages TO DEBUG1; +EXECUTE foo(2); +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.target_cj_4000050 target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_4000054 source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +RESET client_min_messages; +EXECUTE foo(2); +NOTICE: issuing MERGE INTO merge_schema.target_cj_4000050 target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_4000054 source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | target | 10 + 2 | target | 10 + 3 | target | 0 +(4 rows) + +ROLLBACK; -- -- Error and Unsupported scenarios -- +-- try updating the distribution key column +BEGIN; +MERGE INTO target_cj t + USING source_cj1 s + ON t.tid = s.sid1 AND t.tid = 2 + WHEN MATCHED THEN + UPDATE SET tid = tid + 9, src = src || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid1, 'inserted by merge', val1); +ERROR: modifying the partition value of rows is not allowed +ROLLBACK; -- Foreign table as target MERGE INTO foreign_table USING ft_target ON (foreign_table.id = ft_target.id) @@ -1274,7 +1612,54 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1.id, s1.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported with combination of distributed/local tables yet +-- Now both s1 and t1 are distributed tables +SELECT undistribute_table('t1'); +NOTICE: creating a new table for merge_schema.t1 +NOTICE: moving the data of merge_schema.t1 +NOTICE: dropping the old merge_schema.t1 +NOTICE: renaming the new table to merge_schema.t1 + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('t1', 'id'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.t1$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- We have a potential pitfall where a function can be invoked in +-- the MERGE conditions which can insert/update to a random shard +CREATE OR REPLACE function merge_when_and_write() RETURNS BOOLEAN +LANGUAGE PLPGSQL AS +$$ +BEGIN + INSERT INTO t1 VALUES (100, 100); + RETURN TRUE; +END; +$$; +-- Test preventing "ON" join condition from writing to the database +BEGIN; +MERGE INTO t1 +USING s1 ON t1.id = s1.id AND t1.id = 2 AND (merge_when_and_write()) +WHEN MATCHED THEN + UPDATE SET val = t1.val + s1.val; +ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ROLLBACK; +-- Test preventing WHEN clause(s) from writing to the database +BEGIN; +MERGE INTO t1 +USING s1 ON t1.id = s1.id AND t1.id = 2 +WHEN MATCHED AND (merge_when_and_write()) THEN + UPDATE SET val = t1.val + s1.val; +ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ROLLBACK; -- Joining on partition columns with sub-query MERGE INTO t1 USING (SELECT * FROM s1) sub ON (sub.val = t1.id) -- sub.val is not a distribution column @@ -1284,7 +1669,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column -- Joining on partition columns with CTE WITH s1_res AS ( SELECT * FROM s1 @@ -1297,7 +1682,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column -- Constant Join condition WITH s1_res AS ( SELECT * FROM s1 @@ -1310,7 +1695,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column -- With a single WHEN clause, which causes a non-left join WITH s1_res AS ( SELECT * FROM s1 @@ -1319,7 +1704,7 @@ WITH s1_res AS ( WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column -- -- Reference tables -- @@ -1371,7 +1756,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1.id, s1.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported on reference tables yet -- -- Postgres + Citus-Distributed table -- @@ -1413,7 +1798,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1.id, s1.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported with combination of distributed/local tables yet MERGE INTO t1 USING (SELECT * FROM s1) sub ON (sub.id = t1.id) WHEN MATCHED AND sub.val = 0 THEN @@ -1422,7 +1807,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported with combination of distributed/local tables yet CREATE TABLE pg(val int); SELECT create_distributed_table('s1', 'id'); NOTICE: Copying data from local table... @@ -1443,7 +1828,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported with combination of distributed/local tables yet -- Mix Postgres table in CTE WITH pg_res AS ( SELECT * FROM pg @@ -1456,7 +1841,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported with combination of distributed/local tables yet -- Match more than one source row should fail same as Postgres behavior SELECT undistribute_table('t1'); NOTICE: creating a new table for merge_schema.t1 @@ -1511,6 +1896,265 @@ WHEN NOT MATCHED THEN INSERT VALUES(mv_source.id, mv_source.val); ERROR: cannot execute MERGE on relation "mv_source" DETAIL: This operation is not supported for materialized views. +-- Distributed tables *must* be colocated +CREATE TABLE dist_target(id int, val varchar); +SELECT create_distributed_table('dist_target', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE dist_source(id int, val varchar); +SELECT create_distributed_table('dist_source', 'id', colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); +ERROR: For MERGE command, all the distributed tables must be colocated +-- Distributed tables *must* be joined on distribution column +CREATE TABLE dist_colocated(id int, val int); +SELECT create_distributed_table('dist_colocated', 'id', colocate_with => 'dist_target'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO dist_target +USING dist_colocated +ON dist_target.id = dist_colocated.val -- val is not the distribution column +WHEN MATCHED THEN +UPDATE SET val = dist_colocated.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_colocated.id, dist_colocated.val); +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +-- MERGE command must be joined with with a constant qual on target relation +-- AND clause is missing +MERGE INTO dist_target +USING dist_colocated +ON dist_target.id = dist_colocated.id +WHEN MATCHED THEN +UPDATE SET val = dist_colocated.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_colocated.id, dist_colocated.val); +ERROR: MERGE on a distributed table requires a constant filter on the distribution column of the target table +HINT: Consider adding AND target.dist_key = <> to the ON clause +-- AND clause incorrect table (must be target) +MERGE INTO dist_target +USING dist_colocated +ON dist_target.id = dist_colocated.id AND dist_colocated.id = 1 +WHEN MATCHED THEN +UPDATE SET val = dist_colocated.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_colocated.id, dist_colocated.val); +ERROR: MERGE on a distributed table requires a constant filter on the distribution column of the target table +HINT: Consider adding AND target.dist_key = <> to the ON clause +-- AND clause incorrect column (must be distribution column) +MERGE INTO dist_target +USING dist_colocated +ON dist_target.id = dist_colocated.id AND dist_target.val = 'const' +WHEN MATCHED THEN +UPDATE SET val = dist_colocated.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_colocated.id, dist_colocated.val); +ERROR: MERGE on a distributed table requires a constant filter on the distribution column of the target table +HINT: Consider adding AND target.dist_key = <> to the ON clause +-- Both the source and target must be distributed +MERGE INTO dist_target +USING (SELECT 100 id) AS source +ON dist_target.id = source.id AND dist_target.val = 'const' +WHEN MATCHED THEN +UPDATE SET val = 'source' +WHEN NOT MATCHED THEN +INSERT VALUES(source.id, 'source'); +ERROR: For MERGE command, both the source and target must be distributed +-- Non-hash distributed tables (append/range). +CREATE VIEW show_tables AS +SELECT logicalrelid, partmethod +FROM pg_dist_partition +WHERE (logicalrelid = 'dist_target'::regclass) OR (logicalrelid = 'dist_source'::regclass) +ORDER BY 1; +SELECT undistribute_table('dist_source'); +NOTICE: creating a new table for merge_schema.dist_source +NOTICE: moving the data of merge_schema.dist_source +NOTICE: dropping the old merge_schema.dist_source +NOTICE: drop cascades to view show_tables +CONTEXT: SQL statement "DROP TABLE merge_schema.dist_source CASCADE" +NOTICE: renaming the new table to merge_schema.dist_source + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_source', 'id', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM show_tables; + logicalrelid | partmethod +--------------------------------------------------------------------- + dist_target | h + dist_source | a +(2 rows) + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); +ERROR: For MERGE command, all the distributed tables must be colocated, for append/range distribution, colocation is not supported +HINT: Consider using hash distribution instead +SELECT undistribute_table('dist_source'); +NOTICE: creating a new table for merge_schema.dist_source +NOTICE: moving the data of merge_schema.dist_source +NOTICE: dropping the old merge_schema.dist_source +NOTICE: drop cascades to view show_tables +CONTEXT: SQL statement "DROP TABLE merge_schema.dist_source CASCADE" +NOTICE: renaming the new table to merge_schema.dist_source + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_source', 'id', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM show_tables; + logicalrelid | partmethod +--------------------------------------------------------------------- + dist_target | h + dist_source | r +(2 rows) + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); +ERROR: For MERGE command, all the distributed tables must be colocated, for append/range distribution, colocation is not supported +HINT: Consider using hash distribution instead +-- Both are append tables +SELECT undistribute_table('dist_target'); +NOTICE: creating a new table for merge_schema.dist_target +NOTICE: moving the data of merge_schema.dist_target +NOTICE: dropping the old merge_schema.dist_target +NOTICE: drop cascades to view show_tables +CONTEXT: SQL statement "DROP TABLE merge_schema.dist_target CASCADE" +NOTICE: renaming the new table to merge_schema.dist_target + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT undistribute_table('dist_source'); +NOTICE: creating a new table for merge_schema.dist_source +NOTICE: moving the data of merge_schema.dist_source +NOTICE: dropping the old merge_schema.dist_source +NOTICE: drop cascades to view show_tables +CONTEXT: SQL statement "DROP TABLE merge_schema.dist_source CASCADE" +NOTICE: renaming the new table to merge_schema.dist_source + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_target', 'id', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_source', 'id', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM show_tables; + logicalrelid | partmethod +--------------------------------------------------------------------- + dist_target | a + dist_source | a +(2 rows) + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); +ERROR: For MERGE command, all the distributed tables must be colocated, for append/range distribution, colocation is not supported +HINT: Consider using hash distribution instead +-- Both are range tables +SELECT undistribute_table('dist_target'); +NOTICE: creating a new table for merge_schema.dist_target +NOTICE: moving the data of merge_schema.dist_target +NOTICE: dropping the old merge_schema.dist_target +NOTICE: drop cascades to view show_tables +CONTEXT: SQL statement "DROP TABLE merge_schema.dist_target CASCADE" +NOTICE: renaming the new table to merge_schema.dist_target + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT undistribute_table('dist_source'); +NOTICE: creating a new table for merge_schema.dist_source +NOTICE: moving the data of merge_schema.dist_source +NOTICE: dropping the old merge_schema.dist_source +NOTICE: drop cascades to view show_tables +CONTEXT: SQL statement "DROP TABLE merge_schema.dist_source CASCADE" +NOTICE: renaming the new table to merge_schema.dist_source + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_target', 'id', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_source', 'id', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM show_tables; + logicalrelid | partmethod +--------------------------------------------------------------------- + dist_target | r + dist_source | r +(2 rows) + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); +ERROR: For MERGE command, all the distributed tables must be colocated, for append/range distribution, colocation is not supported +HINT: Consider using hash distribution instead DROP SERVER foreign_server CASCADE; NOTICE: drop cascades to 3 other objects DETAIL: drop cascades to user mapping for postgres on server foreign_server @@ -1519,8 +2163,9 @@ drop cascades to foreign table foreign_table NOTICE: foreign table "foreign_table_4000046" does not exist, skipping CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name, drop_shards_metadata_only := false)" PL/pgSQL function citus_drop_trigger() line XX at PERFORM +DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 56 other objects +NOTICE: drop cascades to 63 other objects DETAIL: drop cascades to function insert_data() drop cascades to table pg_result drop cascades to table local_local @@ -1572,11 +2217,18 @@ drop cascades to table ft_target drop cascades to table ft_source_4000045 drop cascades to table ft_source drop cascades to extension postgres_fdw +drop cascades to table target_cj +drop cascades to table source_cj1 +drop cascades to table source_cj2 drop cascades to table pg -drop cascades to table t1_4000062 -drop cascades to table s1_4000063 +drop cascades to table t1_4000078 +drop cascades to table s1_4000079 drop cascades to table t1 drop cascades to table s1 +drop cascades to table dist_colocated +drop cascades to table dist_target +drop cascades to table dist_source +drop cascades to view show_tables SELECT 1 FROM master_remove_node('localhost', :master_port); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 5b5764593..887003a97 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -177,7 +177,7 @@ INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50: INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random()); -- commands with mutable functions in their quals DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000); -ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE +ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE -- commands with mutable but non-volatile functions(ie: stable func.) in their quals -- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable) DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp; diff --git a/src/test/regress/expected/multi_mx_modifications.out b/src/test/regress/expected/multi_mx_modifications.out index 276766c30..9e053d3f2 100644 --- a/src/test/regress/expected/multi_mx_modifications.out +++ b/src/test/regress/expected/multi_mx_modifications.out @@ -95,7 +95,7 @@ INSERT INTO limit_orders_mx VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11: INSERT INTO limit_orders_mx VALUES (2036, 'GOOG', 5634, now(), 'buy', random()); -- commands with mutable functions in their quals DELETE FROM limit_orders_mx WHERE id = 246 AND bidder_id = (random() * 1000); -ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE +ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE -- commands with mutable but non-volatile functions(ie: stable func.) in their quals -- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable) DELETE FROM limit_orders_mx WHERE id = 246 AND placed_at = current_timestamp::timestamp; diff --git a/src/test/regress/expected/multi_shard_update_delete.out b/src/test/regress/expected/multi_shard_update_delete.out index 916db808e..ee572324d 100644 --- a/src/test/regress/expected/multi_shard_update_delete.out +++ b/src/test/regress/expected/multi_shard_update_delete.out @@ -691,7 +691,7 @@ UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id * random(); -ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE +ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE UPDATE users_test_table SET value_2 = 5 * random() FROM events_test_table diff --git a/src/test/regress/expected/pg15.out b/src/test/regress/expected/pg15.out index 7a41b25ec..d92686b93 100644 --- a/src/test/regress/expected/pg15.out +++ b/src/test/regress/expected/pg15.out @@ -315,7 +315,7 @@ SELECT create_reference_table('tbl2'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported on reference tables yet -- now, both are reference, still not supported SELECT create_reference_table('tbl1'); create_reference_table @@ -325,7 +325,7 @@ SELECT create_reference_table('tbl1'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported on reference tables yet -- now, both distributed, not works SELECT undistribute_table('tbl1'); NOTICE: creating a new table for pg15.tbl1 @@ -419,14 +419,14 @@ SELECT create_distributed_table('tbl2', 'x'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column -- also, not inside subqueries & ctes WITH targq AS ( SELECT * FROM tbl2 ) MERGE INTO tbl1 USING targq ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column -- crashes on beta3, fixed on 15 stable --WITH foo AS ( -- MERGE INTO tbl1 USING tbl2 ON (true) @@ -441,7 +441,7 @@ USING tbl2 ON (true) WHEN MATCHED THEN UPDATE SET x = (SELECT count(*) FROM tbl2); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column -- test numeric types with negative scale CREATE TABLE numeric_negative_scale(numeric_column numeric(3,-1), orig_value int); INSERT into numeric_negative_scale SELECT x,x FROM generate_series(111, 115) x; diff --git a/src/test/regress/expected/pgmerge.out b/src/test/regress/expected/pgmerge.out index b90760691..89c3f85ca 100644 --- a/src/test/regress/expected/pgmerge.out +++ b/src/test/regress/expected/pgmerge.out @@ -910,7 +910,15 @@ MERGE INTO wq_target t USING wq_source s ON t.tid = s.sid WHEN MATCHED AND (merge_when_and_write()) THEN UPDATE SET balance = t.balance + s.balance; -ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE +ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ROLLBACK; +-- Test preventing ON condition from writing to the database +BEGIN; +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid AND (merge_when_and_write()) +WHEN MATCHED THEN + UPDATE SET balance = t.balance + s.balance; +ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE ROLLBACK; drop function merge_when_and_write(); DROP TABLE wq_target, wq_source; diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index c266b5333..539e9814d 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -19,6 +19,7 @@ SET search_path TO merge_schema; SET citus.shard_count TO 4; SET citus.next_shard_id TO 4000000; SET citus.explain_all_tasks to true; +SET citus.shard_replication_factor TO 1; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); CREATE TABLE source @@ -143,9 +144,13 @@ SELECT undistribute_table('source'); SELECT create_distributed_table('target', 'customer_id'); SELECT create_distributed_table('source', 'customer_id'); +-- Updates one of the row with customer_id = 30002 +SELECT * from target t WHERE t.customer_id = 30002; +-- Turn on notice to print tasks sent to nodes (it should be a single task) +SET citus.log_remote_commands to true; MERGE INTO target t USING source s - ON (t.customer_id = s.customer_id) + ON (t.customer_id = s.customer_id) AND t.customer_id = 30002 WHEN MATCHED AND t.order_center = 'XX' THEN DELETE @@ -158,6 +163,27 @@ MERGE INTO target t WHEN NOT MATCHED THEN -- New entry, record it. INSERT (customer_id, last_order_id, order_center, order_count, last_order) VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time); +SET citus.log_remote_commands to false; +SELECT * from target t WHERE t.customer_id = 30002; + +-- Deletes one of the row with customer_id = 30004 +SELECT * from target t WHERE t.customer_id = 30004; +MERGE INTO target t + USING source s + ON (t.customer_id = s.customer_id) AND t.customer_id = 30004 + + WHEN MATCHED AND t.order_center = 'XX' THEN + DELETE + + WHEN MATCHED THEN + UPDATE SET -- Existing customer, update the order count and last_order_id + order_count = t.order_count + 1, + last_order_id = s.order_id + + WHEN NOT MATCHED THEN -- New entry, record it. + INSERT (customer_id, last_order_id, order_center, order_count, last_order) + VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time); +SELECT * from target t WHERE t.customer_id = 30004; -- -- Test MERGE with CTE as source @@ -243,11 +269,13 @@ SELECT create_distributed_table('t1', 'id'); SELECT create_distributed_table('s1', 'id'); +SELECT * FROM t1 order by id; +SET citus.log_remote_commands to true; WITH s1_res AS ( SELECT * FROM s1 ) MERGE INTO t1 - USING s1_res ON (s1_res.id = t1.id) + USING s1_res ON (s1_res.id = t1.id) AND t1.id = 6 WHEN MATCHED AND s1_res.val = 0 THEN DELETE @@ -255,6 +283,9 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); +SET citus.log_remote_commands to false; +-- As the id 6 is NO match, VALUES(6, 1) should appear in target +SELECT * FROM t1 order by id; -- -- Test with multiple join conditions @@ -325,15 +356,21 @@ SELECT undistribute_table('s2'); SELECT create_distributed_table('t2', 'id'); SELECT create_distributed_table('s2', 'id'); +SELECT * FROM t2 ORDER BY 1; +SET citus.log_remote_commands to true; MERGE INTO t2 USING s2 -ON t2.id = s2.id AND t2.src = s2.src +ON t2.id = s2.id AND t2.src = s2.src AND t2.id = 4 WHEN MATCHED AND t2.val = 1 THEN UPDATE SET val = s2.val + 10 WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src); +SET citus.log_remote_commands to false; +-- Row with id = 4 is a match for delete clause, row should be deleted +-- Row with id = 3 is a NO match, row from source will be inserted +SELECT * FROM t2 ORDER BY 1; -- -- With sub-query as the MERGE source @@ -824,10 +861,159 @@ RESET client_min_messages; SELECT * FROM ft_target; +-- +-- complex joins on the source side +-- + +-- source(join of two relations) relation is an unaliased join + +CREATE TABLE target_cj(tid int, src text, val int); +CREATE TABLE source_cj1(sid1 int, src1 text, val1 int); +CREATE TABLE source_cj2(sid2 int, src2 text, val2 int); + +INSERT INTO target_cj VALUES (1, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (3, 'target', 0); + +INSERT INTO source_cj1 VALUES (2, 'source-1', 10); +INSERT INTO source_cj2 VALUES (2, 'source-2', 20); + +BEGIN; +MERGE INTO target_cj t +USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src2 +WHEN NOT MATCHED THEN + DO NOTHING; +-- Gold result to compare against +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +BEGIN; +-- try accessing columns from either side of the source join +MERGE INTO target_cj t +USING source_cj1 s2 + INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET tid = sid2, src = src1, val = val2 +WHEN NOT MATCHED THEN + DO NOTHING; +-- Gold result to compare against +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +-- Test the same scenarios with distributed tables + +SELECT create_distributed_table('target_cj', 'tid'); +SELECT create_distributed_table('source_cj1', 'sid1'); +SELECT create_distributed_table('source_cj2', 'sid2'); + +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO target_cj t +USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src2 +WHEN NOT MATCHED THEN + DO NOTHING; +SET citus.log_remote_commands to false; +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +BEGIN; +-- try accessing columns from either side of the source join +MERGE INTO target_cj t +USING source_cj1 s2 + INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src1, val = val2 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +-- sub-query as a source +BEGIN; +MERGE INTO target_cj t +USING (SELECT * FROM source_cj1 WHERE sid1 = 2) sub +ON t.tid = sub.sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = sub.src1, val = val1 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +-- Test self-join +BEGIN; +SELECT * FROM target_cj ORDER BY 1; +set citus.log_remote_commands to true; +MERGE INTO target_cj t1 +USING (SELECT * FROM target_cj) sub +ON t1.tid = sub.tid AND t1.tid = 3 +WHEN MATCHED THEN + UPDATE SET src = sub.src, val = sub.val + 100 +WHEN NOT MATCHED THEN + DO NOTHING; +set citus.log_remote_commands to false; +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + + +-- Test PREPARE +PREPARE foo(int) AS +MERGE INTO target_cj target +USING (SELECT * FROM source_cj1) sub +ON target.tid = sub.sid1 AND target.tid = $1 +WHEN MATCHED THEN + UPDATE SET val = sub.val1 +WHEN NOT MATCHED THEN + DO NOTHING; + +SELECT * FROM target_cj ORDER BY 1; + +BEGIN; +EXECUTE foo(2); +EXECUTE foo(2); +EXECUTE foo(2); +EXECUTE foo(2); +EXECUTE foo(2); +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +BEGIN; + +SET citus.log_remote_commands to true; +SET client_min_messages TO DEBUG1; +EXECUTE foo(2); +RESET client_min_messages; + +EXECUTE foo(2); +SET citus.log_remote_commands to false; + +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + -- -- Error and Unsupported scenarios -- +-- try updating the distribution key column +BEGIN; +MERGE INTO target_cj t + USING source_cj1 s + ON t.tid = s.sid1 AND t.tid = 2 + WHEN MATCHED THEN + UPDATE SET tid = tid + 9, src = src || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid1, 'inserted by merge', val1); +ROLLBACK; + -- Foreign table as target MERGE INTO foreign_table USING ft_target ON (foreign_table.id = ft_target.id) @@ -854,6 +1040,38 @@ MERGE INTO t1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1.id, s1.val); +-- Now both s1 and t1 are distributed tables +SELECT undistribute_table('t1'); +SELECT create_distributed_table('t1', 'id'); + +-- We have a potential pitfall where a function can be invoked in +-- the MERGE conditions which can insert/update to a random shard +CREATE OR REPLACE function merge_when_and_write() RETURNS BOOLEAN +LANGUAGE PLPGSQL AS +$$ +BEGIN + INSERT INTO t1 VALUES (100, 100); + RETURN TRUE; +END; +$$; + +-- Test preventing "ON" join condition from writing to the database +BEGIN; +MERGE INTO t1 +USING s1 ON t1.id = s1.id AND t1.id = 2 AND (merge_when_and_write()) +WHEN MATCHED THEN + UPDATE SET val = t1.val + s1.val; +ROLLBACK; + +-- Test preventing WHEN clause(s) from writing to the database +BEGIN; +MERGE INTO t1 +USING s1 ON t1.id = s1.id AND t1.id = 2 +WHEN MATCHED AND (merge_when_and_write()) THEN + UPDATE SET val = t1.val + s1.val; +ROLLBACK; + + -- Joining on partition columns with sub-query MERGE INTO t1 USING (SELECT * FROM s1) sub ON (sub.val = t1.id) -- sub.val is not a distribution column @@ -997,6 +1215,132 @@ WHEN MATCHED THEN WHEN NOT MATCHED THEN INSERT VALUES(mv_source.id, mv_source.val); +-- Distributed tables *must* be colocated +CREATE TABLE dist_target(id int, val varchar); +SELECT create_distributed_table('dist_target', 'id'); +CREATE TABLE dist_source(id int, val varchar); +SELECT create_distributed_table('dist_source', 'id', colocate_with => 'none'); + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); + +-- Distributed tables *must* be joined on distribution column +CREATE TABLE dist_colocated(id int, val int); +SELECT create_distributed_table('dist_colocated', 'id', colocate_with => 'dist_target'); + +MERGE INTO dist_target +USING dist_colocated +ON dist_target.id = dist_colocated.val -- val is not the distribution column +WHEN MATCHED THEN +UPDATE SET val = dist_colocated.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_colocated.id, dist_colocated.val); + +-- MERGE command must be joined with with a constant qual on target relation + +-- AND clause is missing +MERGE INTO dist_target +USING dist_colocated +ON dist_target.id = dist_colocated.id +WHEN MATCHED THEN +UPDATE SET val = dist_colocated.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_colocated.id, dist_colocated.val); + +-- AND clause incorrect table (must be target) +MERGE INTO dist_target +USING dist_colocated +ON dist_target.id = dist_colocated.id AND dist_colocated.id = 1 +WHEN MATCHED THEN +UPDATE SET val = dist_colocated.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_colocated.id, dist_colocated.val); + +-- AND clause incorrect column (must be distribution column) +MERGE INTO dist_target +USING dist_colocated +ON dist_target.id = dist_colocated.id AND dist_target.val = 'const' +WHEN MATCHED THEN +UPDATE SET val = dist_colocated.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_colocated.id, dist_colocated.val); + +-- Both the source and target must be distributed +MERGE INTO dist_target +USING (SELECT 100 id) AS source +ON dist_target.id = source.id AND dist_target.val = 'const' +WHEN MATCHED THEN +UPDATE SET val = 'source' +WHEN NOT MATCHED THEN +INSERT VALUES(source.id, 'source'); + +-- Non-hash distributed tables (append/range). +CREATE VIEW show_tables AS +SELECT logicalrelid, partmethod +FROM pg_dist_partition +WHERE (logicalrelid = 'dist_target'::regclass) OR (logicalrelid = 'dist_source'::regclass) +ORDER BY 1; + +SELECT undistribute_table('dist_source'); +SELECT create_distributed_table('dist_source', 'id', 'append'); +SELECT * FROM show_tables; + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); + +SELECT undistribute_table('dist_source'); +SELECT create_distributed_table('dist_source', 'id', 'range'); +SELECT * FROM show_tables; + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); + +-- Both are append tables +SELECT undistribute_table('dist_target'); +SELECT undistribute_table('dist_source'); +SELECT create_distributed_table('dist_target', 'id', 'append'); +SELECT create_distributed_table('dist_source', 'id', 'append'); +SELECT * FROM show_tables; + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); + +-- Both are range tables +SELECT undistribute_table('dist_target'); +SELECT undistribute_table('dist_source'); +SELECT create_distributed_table('dist_target', 'id', 'range'); +SELECT create_distributed_table('dist_source', 'id', 'range'); +SELECT * FROM show_tables; + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); + DROP SERVER foreign_server CASCADE; +DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; SELECT 1 FROM master_remove_node('localhost', :master_port); diff --git a/src/test/regress/sql/pgmerge.sql b/src/test/regress/sql/pgmerge.sql index 6842f516a..83bf01a68 100644 --- a/src/test/regress/sql/pgmerge.sql +++ b/src/test/regress/sql/pgmerge.sql @@ -608,6 +608,14 @@ USING wq_source s ON t.tid = s.sid WHEN MATCHED AND (merge_when_and_write()) THEN UPDATE SET balance = t.balance + s.balance; ROLLBACK; + +-- Test preventing ON condition from writing to the database +BEGIN; +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid AND (merge_when_and_write()) +WHEN MATCHED THEN + UPDATE SET balance = t.balance + s.balance; +ROLLBACK; drop function merge_when_and_write(); DROP TABLE wq_target, wq_source;