From 1e42cd3da03584d25aa0ace99574cbca05293189 Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Wed, 21 Dec 2022 19:33:13 -0800 Subject: [PATCH] Support MERGE on distributed tables with restrictions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This implements the phase - II of MERGE sql support Support routable query where all the tables in the merge-sql are distributed, co-located, and both the source and target relations are joined on the distribution column with a constant qual. This should be a Citus single-task query. Below is an example. SELECT create_distributed_table('t1', 'id'); SELECT create_distributed_table('s1', 'id', colocate_with => ‘t1’); MERGE INTO t1 USING s1 ON t1.id = s1.id AND t1.id = 100 WHEN MATCHED THEN UPDATE SET val = s1.val + 10 WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val, src) VALUES (s1.id, s1.val, s1.src) Basically, MERGE checks to see if There are a minimum of two distributed tables (source and a target). All the distributed tables are indeed colocated. MERGE relations are joined on the distribution column MERGE .. USING .. ON target.dist_key = source.dist_key The query should touch only a single shard i.e. JOIN AND with a constant qual MERGE .. USING .. ON target.dist_key = source.dist_key AND target.dist_key = <> If any of the conditions are not met, it raises an exception. (cherry picked from commit 44c387b978a51b0c0e87c7f9aec154cfc3041da1) This implements MERGE phase3 Support pushdown query where all the tables in the merge-sql are Citus-distributed, co-located, and both the source and target relations are joined on the distribution column. This will generate multiple tasks which execute independently after pushdown. SELECT create_distributed_table('t1', 'id'); SELECT create_distributed_table('s1', 'id', colocate_with => ‘t1’); MERGE INTO t1 USING s1 ON t1.id = s1.id WHEN MATCHED THEN UPDATE SET val = s1.val + 10 WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val, src) VALUES (s1.id, s1.val, s1.src) *The only exception for both the phases II and III is, UPDATEs and INSERTs must be done on the same shard-group as the joined key; for example, below scenarios are NOT supported as the key-value to be inserted/updated is not guaranteed to be on the same node as the id distribution-column. MERGE INTO target t USING source s ON (t.customer_id = s.customer_id) WHEN NOT MATCHED THEN - - INSERT(customer_id, …) VALUES (, ……); OR this scenario where we update the distribution column itself MERGE INTO target t USING source s On (t.customer_id = s.customer_id) WHEN MATCHED THEN UPDATE SET customer_id = 100; (cherry picked from commit fa7b8949a88bf614d5a07fc33f6159d9efa5d087) --- .../distributed/planner/distributed_planner.c | 194 +-- .../planner/fast_path_router_planner.c | 4 +- .../planner/multi_physical_planner.c | 17 +- .../planner/multi_router_planner.c | 353 ++++- .../relation_restriction_equivalence.c | 31 +- src/include/distributed/distributed_planner.h | 4 + .../relation_restriction_equivalence.h | 11 + src/test/regress/bin/normalize.sed | 4 + src/test/regress/expected/merge.out | 1306 ++++++++++++++++- src/test/regress/expected/pg15.out | 10 +- src/test/regress/expected/pgmerge.out | 14 +- src/test/regress/sql/merge.sql | 742 +++++++++- src/test/regress/sql/pgmerge.sql | 8 + 13 files changed, 2450 insertions(+), 248 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 701ae4ff5..262258d7f 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -75,9 +75,6 @@ static uint64 NextPlanId = 1; /* keep track of planner call stack levels */ int PlannerLevel = 0; -static void ErrorIfQueryHasUnsupportedMergeCommand(Query *queryTree, - List *rangeTableList); -static bool ContainsMergeCommandWalker(Node *node); static bool ListContainsDistributedTableRTE(List *rangeTableList, bool *maybeHasForeignDistributedTable); static bool IsUpdateOrDelete(Query *query); @@ -132,7 +129,7 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext static RTEListProperties * GetRTEListProperties(List *rangeTableList); static List * TranslatedVars(PlannerInfo *root, int relationIndex); static void WarnIfListHasForeignDistributedTable(List *rangeTableList); -static void ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList); + /* Distributed planner hook */ PlannedStmt * @@ -200,12 +197,6 @@ distributed_planner(Query *parse, if (!fastPathRouterQuery) { - /* - * Fast path queries cannot have merge command, and we - * prevent the remaining here. - */ - ErrorIfQueryHasUnsupportedMergeCommand(parse, rangeTableList); - /* * When there are partitioned tables (not applicable to fast path), * pretend that they are regular tables to avoid unnecessary work @@ -304,44 +295,11 @@ distributed_planner(Query *parse, } -/* - * ErrorIfQueryHasUnsupportedMergeCommand walks over the query tree and bails out - * if there is no Merge command (e.g., CMD_MERGE) in the query tree. For merge, - * looks for all supported combinations, throws an exception if any violations - * are seen. - */ -static void -ErrorIfQueryHasUnsupportedMergeCommand(Query *queryTree, List *rangeTableList) -{ - /* - * Postgres currently doesn't support Merge queries inside subqueries and - * ctes, but lets be defensive and do query tree walk anyway. - * - * We do not call this path for fast-path queries to avoid this additional - * overhead. - */ - if (!ContainsMergeCommandWalker((Node *) queryTree)) - { - /* No MERGE found */ - return; - } - - - /* - * In Citus we have limited support for MERGE, it's allowed - * only if all the tables(target, source or any CTE) tables - * are are local i.e. a combination of Citus local and Non-Citus - * tables (regular Postgres tables). - */ - ErrorIfMergeHasUnsupportedTables(queryTree, rangeTableList); -} - - /* * ContainsMergeCommandWalker walks over the node and finds if there are any * Merge command (e.g., CMD_MERGE) in the node. */ -static bool +bool ContainsMergeCommandWalker(Node *node) { #if PG_VERSION_NUM < PG_VERSION_15 @@ -676,7 +634,8 @@ bool IsUpdateOrDelete(Query *query) { return query->commandType == CMD_UPDATE || - query->commandType == CMD_DELETE; + query->commandType == CMD_DELETE || + query->commandType == CMD_MERGE; } @@ -2611,148 +2570,3 @@ WarnIfListHasForeignDistributedTable(List *rangeTableList) } } } - - -/* - * IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is - * permitted on special relations, such as materialized view, returns true only if - * it's a "source" relation. - */ -bool -IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte) -{ - if (!IsMergeQuery(parse)) - { - return false; - } - - RangeTblEntry *targetRte = rt_fetch(parse->resultRelation, parse->rtable); - - /* Is it a target relation? */ - if (targetRte->relid == rte->relid) - { - return false; - } - - return true; -} - - -/* - * ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE - * present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus - * tables (regular Postgres tables), raises an exception for all other combinations. - */ -static void -ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList) -{ - ListCell *tableCell = NULL; - - foreach(tableCell, rangeTableList) - { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(tableCell); - Oid relationId = rangeTableEntry->relid; - - switch (rangeTableEntry->rtekind) - { - case RTE_RELATION: - { - /* Check the relation type */ - break; - } - - case RTE_SUBQUERY: - case RTE_FUNCTION: - case RTE_TABLEFUNC: - case RTE_VALUES: - case RTE_JOIN: - case RTE_CTE: - { - /* Skip them as base table(s) will be checked */ - continue; - } - - /* - * RTE_NAMEDTUPLESTORE is typically used in ephmeral named relations, - * such as, trigger data; until we find a genuine use case, raise an - * exception. - * RTE_RESULT is a node added by the planner and we shouldn't - * encounter it in the parse tree. - */ - case RTE_NAMEDTUPLESTORE: - case RTE_RESULT: - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("MERGE command is not supported with " - "Tuplestores and results"))); - break; - } - - default: - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("MERGE command: Unrecognized range table entry."))); - } - } - - /* RTE Relation can be of various types, check them now */ - - /* skip the regular views as they are replaced with subqueries */ - if (rangeTableEntry->relkind == RELKIND_VIEW) - { - continue; - } - - if (rangeTableEntry->relkind == RELKIND_MATVIEW || - rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE) - { - /* Materialized view or Foreign table as target is not allowed */ - if (IsMergeAllowedOnRelation(parse, rangeTableEntry)) - { - /* Non target relation is ok */ - continue; - } - else - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("MERGE command is not allowed " - "on materialized view"))); - } - } - - if (rangeTableEntry->relkind != RELKIND_RELATION && - rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Unexpected relation type(relkind:%c) in MERGE command", - rangeTableEntry->relkind))); - } - - Assert(rangeTableEntry->relid != 0); - - /* Distributed tables and Reference tables are not supported yet */ - if (IsCitusTableType(relationId, REFERENCE_TABLE) || - IsCitusTableType(relationId, DISTRIBUTED_TABLE)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("MERGE command is not supported on " - "distributed/reference tables yet"))); - } - - /* Regular Postgres tables and Citus local tables are allowed */ - if (!IsCitusTable(relationId) || - IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) - { - continue; - } - - - /* Any other Citus table type missing ? */ - } - - /* All the tables are local, supported */ -} diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index aa029f3c0..e7d91a101 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -54,8 +54,6 @@ bool EnableFastPathRouterPlanner = true; static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey); -static bool ConjunctionContainsColumnFilter(Node *node, Var *column, - Node **distributionKeyValue); static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, Node **distributionKeyValue); @@ -294,7 +292,7 @@ ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey) * * If the conjuction contains column filter which is const, distributionKeyValue is set. */ -static bool +bool ConjunctionContainsColumnFilter(Node *node, Var *column, Node **distributionKeyValue) { if (node == NULL) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index be6caf0e2..b30dddeb7 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2225,14 +2225,17 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, } /* - * For left joins we don't care about the shards pruned for the right hand side. - * If the right hand side would prune to a smaller set we should still send it to - * all tables of the left hand side. However if the right hand side is bigger than - * the left hand side we don't have to send the query to any shard that is not - * matching anything on the left hand side. + * For left joins we don't care about the shards pruned for + * the right hand side. If the right hand side would prune + * to a smaller set we should still send it to all tables + * of the left hand side. However if the right hand side is + * bigger than the left hand side we don't have to send the + * query to any shard that is not matching anything on the + * left hand side. * - * Instead we will simply skip any RelationRestriction if it is an OUTER join and - * the table is part of the non-outer side of the join. + * Instead we will simply skip any RelationRestriction if it + * is an OUTER join and the table is part of the non-outer + * side of the join. */ if (IsInnerTableOfOuterJoin(relationRestriction)) { diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index f4591a770..99beff2c8 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -185,7 +185,6 @@ static DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelation List *targetList, CmdType commandType, List *returningList); - /* * CreateRouterPlan attempts to create a router executor plan for the given * SELECT statement. ->planningError is set if planning fails. @@ -905,6 +904,85 @@ NodeIsFieldStore(Node *node) } +/* + * MergeQuerySupported does check for a MERGE command in the query, if it finds + * one, it will verify the below criteria + * - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables + * - Distributed tables requirements in ErrorIfDistTablesNotColocated + * - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported + */ +static DeferredErrorMessage * +MergeQuerySupported(Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext) +{ + /* For non-MERGE commands it's a no-op */ + if (!QueryHasMergeCommand(originalQuery)) + { + return NULL; + } + + List *rangeTableList = ExtractRangeTableEntryList(originalQuery); + RangeTblEntry *resultRte = ExtractResultRelationRTE(originalQuery); + + /* + * Fast path queries cannot have merge command, and we prevent the remaining here. + * In Citus we have limited support for MERGE, it's allowed only if all + * the tables(target, source or any CTE) tables are are local i.e. a + * combination of Citus local and Non-Citus tables (regular Postgres tables) + * or distributed tables with some restrictions, please see header of routine + * ErrorIfDistTablesNotColocated for details. + */ + DeferredErrorMessage *deferredError = + ErrorIfMergeHasUnsupportedTables(originalQuery, + rangeTableList, + plannerRestrictionContext); + if (deferredError) + { + return deferredError; + } + + Oid resultRelationId = resultRte->relid; + deferredError = + TargetlistAndFunctionsSupported(resultRelationId, + originalQuery->jointree, + originalQuery->jointree->quals, + originalQuery->targetList, + originalQuery->commandType, + originalQuery->returningList); + if (deferredError) + { + return deferredError; + } + + #if PG_VERSION_NUM >= PG_VERSION_15 + + /* + * MERGE is a special case where we have multiple modify statements + * within itself. Check each INSERT/UPDATE/DELETE individually. + */ + MergeAction *action = NULL; + foreach_ptr(action, originalQuery->mergeActionList) + { + Assert(originalQuery->returningList == NULL); + deferredError = + TargetlistAndFunctionsSupported(resultRelationId, + originalQuery->jointree, + action->qual, + action->targetList, + action->commandType, + originalQuery->returningList); + if (deferredError) + { + return deferredError; + } + } + + #endif + + return NULL; +} + + /* * ModifyQuerySupported returns NULL if the query only contains supported * features, otherwise it returns an error description. @@ -920,8 +998,17 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer PlannerRestrictionContext *plannerRestrictionContext) { Oid distributedTableId = InvalidOid; - DeferredErrorMessage *error = ModifyPartialQuerySupported(queryTree, multiShardQuery, - &distributedTableId); + DeferredErrorMessage *error = MergeQuerySupported(originalQuery, + plannerRestrictionContext); + if (error) + { + /* + * For MERGE, we do not do recursive plannning, simply bail out. + */ + RaiseDeferredError(error, ERROR); + } + + error = ModifyPartialQuerySupported(queryTree, multiShardQuery, &distributedTableId); if (error) { return error; @@ -3969,3 +4056,263 @@ CompareInsertValuesByShardId(const void *leftElement, const void *rightElement) } } } + + +/* + * IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is + * permitted on special relations, such as materialized view, returns true only if + * it's a "source" relation. + */ +bool +IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte) +{ + if (!IsMergeQuery(parse)) + { + return false; + } + + RangeTblEntry *targetRte = rt_fetch(parse->resultRelation, parse->rtable); + + /* Is it a target relation? */ + if (targetRte->relid == rte->relid) + { + return false; + } + + return true; +} + + +/* + * ErrorIfDistTablesNotColocated Checks to see if + * + * - There are a minimum of two distributed tables (source and a target). + * - All the distributed tables are indeed colocated. + * - MERGE relations are joined on the distribution column + * MERGE .. USING .. ON target.dist_key = source.dist_key + * + * If any of the conditions are not met, it raises an exception. + */ +static DeferredErrorMessage * +ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList, + PlannerRestrictionContext *plannerRestrictionContext) +{ + /* All MERGE tables must be distributed */ + if (list_length(distTablesList) < 2) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "For MERGE command, both the source and target " + "must be distributed", NULL, NULL); + } + + /* All distributed tables must be colocated */ + if (!AllRelationsInListColocated(distTablesList, RANGETABLE_ENTRY)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "For MERGE command, all the distributed tables " + "must be colocated", NULL, NULL); + } + + /* Are source and target tables joined on distribution column? */ + if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command is only supported when distributed " + "tables are joined on their distribution column", + NULL, NULL); + } + + return NULL; +} + + +/* + * ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE + * present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus + * tables (regular Postgres tables), or distributed tables with some restrictions, please + * see header of routine ErrorIfDistTablesNotColocated for details, raises an exception + * for all other combinations. + */ +static DeferredErrorMessage * +ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, + PlannerRestrictionContext *restrictionContext) +{ + List *distTablesList = NIL; + bool foundLocalTables = false; + + RangeTblEntry *rangeTableEntry = NULL; + foreach_ptr(rangeTableEntry, rangeTableList) + { + Oid relationId = rangeTableEntry->relid; + + switch (rangeTableEntry->rtekind) + { + case RTE_RELATION: + { + /* Check the relation type */ + break; + } + + case RTE_SUBQUERY: + case RTE_FUNCTION: + case RTE_TABLEFUNC: + case RTE_VALUES: + case RTE_JOIN: + case RTE_CTE: + { + /* Skip them as base table(s) will be checked */ + continue; + } + + /* + * RTE_NAMEDTUPLESTORE is typically used in ephmeral named relations, + * such as, trigger data; until we find a genuine use case, raise an + * exception. + * RTE_RESULT is a node added by the planner and we shouldn't + * encounter it in the parse tree. + */ + case RTE_NAMEDTUPLESTORE: + case RTE_RESULT: + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command is not supported with " + "Tuplestores and results", + NULL, NULL); + } + + default: + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command: Unrecognized range table entry.", + NULL, NULL); + } + } + + /* RTE Relation can be of various types, check them now */ + + /* skip the regular views as they are replaced with subqueries */ + if (rangeTableEntry->relkind == RELKIND_VIEW) + { + continue; + } + + if (rangeTableEntry->relkind == RELKIND_MATVIEW || + rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE) + { + /* Materialized view or Foreign table as target is not allowed */ + if (IsMergeAllowedOnRelation(parse, rangeTableEntry)) + { + /* Non target relation is ok */ + continue; + } + else + { + /* Usually we don't reach this exception as the Postgres parser catches it */ + StringInfo errorMessage = makeStringInfo(); + appendStringInfo(errorMessage, + "MERGE command is not allowed on " + "relation type(relkind:%c)", rangeTableEntry->relkind); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, + NULL, NULL); + } + } + + if (rangeTableEntry->relkind != RELKIND_RELATION && + rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE) + { + StringInfo errorMessage = makeStringInfo(); + appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) " + "in MERGE command", rangeTableEntry->relkind); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, + NULL, NULL); + } + + Assert(rangeTableEntry->relid != 0); + + /* Reference tables are not supported yet */ + if (IsCitusTableType(relationId, REFERENCE_TABLE)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command is not supported on reference " + "tables yet", NULL, NULL); + } + + /* Append/Range tables are not supported */ + if (IsCitusTableType(relationId, APPEND_DISTRIBUTED) || + IsCitusTableType(relationId, RANGE_DISTRIBUTED)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "For MERGE command, all the distributed tables " + "must be colocated, for append/range distribution, " + "colocation is not supported", NULL, + "Consider using hash distribution instead"); + } + + /* + * For now, save all distributed tables, later (below) we will + * check for supported combination(s). + */ + if (IsCitusTableType(relationId, DISTRIBUTED_TABLE)) + { + distTablesList = lappend(distTablesList, rangeTableEntry); + continue; + } + + /* Regular Postgres tables and Citus local tables are allowed */ + if (!IsCitusTable(relationId) || + IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) + { + foundLocalTables = true; + continue; + } + + /* Any other Citus table type missing ? */ + } + + /* Ensure all tables are indeed local */ + if (foundLocalTables && list_length(distTablesList) == 0) + { + /* All the tables are local, supported */ + return NULL; + } + else if (foundLocalTables && list_length(distTablesList) > 0) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command is not supported with " + "combination of distributed/local tables yet", + NULL, NULL); + } + + /* Ensure all distributed tables are indeed co-located */ + return ErrorIfDistTablesNotColocated(parse, distTablesList, restrictionContext); +} + + +/* + * QueryHasMergeCommand walks over the query tree and returns false if there + * is no Merge command (e.g., CMD_MERGE), true otherwise. + */ +static bool +QueryHasMergeCommand(Query *queryTree) +{ + /* function is void for pre-15 versions of Postgres */ + #if PG_VERSION_NUM < PG_VERSION_15 + return false; + #else + + /* + * Postgres currently doesn't support Merge queries inside subqueries and + * ctes, but lets be defensive and do query tree walk anyway. + * + * We do not call this path for fast-path queries to avoid this additional + * overhead. + */ + if (!ContainsMergeCommandWalker((Node *) queryTree)) + { + /* No MERGE found */ + return false; + } + + return true; + #endif +} diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index 4d131899a..f76a95d26 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -151,8 +151,6 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass secondClass); static Var * PartitionKeyForRTEIdentityInQuery(Query *query, int targetRTEIndex, Index *partitionKeyIndex); -static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext * - restrictionContext); static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node); static JoinRestrictionContext * FilterJoinRestrictionContext( JoinRestrictionContext *joinRestrictionContext, Relids @@ -383,7 +381,8 @@ SafeToPushdownUnionSubquery(Query *originalQuery, return false; } - if (!AllRelationsInRestrictionContextColocated(restrictionContext)) + if (!AllRelationsInListColocated(restrictionContext->relationRestrictionList, + RESTRICTION_CONTEXT)) { /* distribution columns are equal, but tables are not co-located */ return false; @@ -1919,19 +1918,33 @@ FindQueryContainingRTEIdentityInternal(Node *node, /* - * AllRelationsInRestrictionContextColocated determines whether all of the relations in the - * given relation restrictions list are co-located. + * AllRelationsInListColocated determines whether all of the relations in the + * given list are co-located. + * Note: The list can be of dofferent types, which is specified by ListEntryType */ -static bool -AllRelationsInRestrictionContextColocated(RelationRestrictionContext *restrictionContext) +bool +AllRelationsInListColocated(List *relationList, ListEntryType entryType) { + void *varPtr = NULL; + RangeTblEntry *rangeTableEntry = NULL; RelationRestriction *relationRestriction = NULL; int initialColocationId = INVALID_COLOCATION_ID; /* check whether all relations exists in the main restriction list */ - foreach_ptr(relationRestriction, restrictionContext->relationRestrictionList) + foreach_ptr(varPtr, relationList) { - Oid relationId = relationRestriction->relationId; + Oid relationId = InvalidOid; + + if (entryType == RANGETABLE_ENTRY) + { + rangeTableEntry = (RangeTblEntry *) varPtr; + relationId = rangeTableEntry->relid; + } + else if (entryType == RESTRICTION_CONTEXT) + { + relationRestriction = (RelationRestriction *) varPtr; + relationId = relationRestriction->relationId; + } if (IsCitusTable(relationId) && !HasDistributionKey(relationId)) { diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 29c3c7154..19bd9f0c2 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -256,5 +256,9 @@ extern struct DistributedPlan * CreateDistributedPlan(uint64 planId, plannerRestrictionContext); extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte); +extern bool ConjunctionContainsColumnFilter(Node *node, + Var *column, + Node **distributionKeyValue); +extern bool ContainsMergeCommandWalker(Node *node); #endif /* DISTRIBUTED_PLANNER_H */ diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index ccd50a6db..4fd9c7015 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -17,6 +17,15 @@ #define SINGLE_RTE_INDEX 1 +/* + * Represents the pointer type that's being passed in the list. + */ +typedef enum ListEntryType +{ + RANGETABLE_ENTRY, /* RangeTblEntry */ + RESTRICTION_CONTEXT /* RelationRestriction */ +} ListEntryType; + extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); @@ -54,4 +63,6 @@ extern RelationRestrictionContext * FilterRelationRestrictionContext( RelationRestrictionContext *relationRestrictionContext, Relids queryRteIdentities); +extern bool AllRelationsInListColocated(List *relationList, ListEntryType entryType); + #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */ diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index df343a077..2ebb31f47 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -28,6 +28,10 @@ s/\(ref_id\)=\([0-9]+\)/(ref_id)=(X)/g # shard table names for multi_subtransactions s/"t2_[0-9]+"/"t2_xxxxxxx"/g +# shard table names for MERGE tests +s/merge_schema\.([_a-z0-9]+)_40[0-9]+ /merge_schema.\1_xxxxxxx /g +s/pgmerge_schema\.([_a-z0-9]+)_40[0-9]+ /pgmerge_schema.\1_xxxxxxx /g + # shard table names for multi_subquery s/ keyval(1|2|ref)_[0-9]+ / keyval\1_xxxxxxx /g diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 6fc472b70..02671acd0 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -18,6 +18,7 @@ SET search_path TO merge_schema; SET citus.shard_count TO 4; SET citus.next_shard_id TO 4000000; SET citus.explain_all_tasks to true; +SET citus.shard_replication_factor TO 1; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata ?column? @@ -214,9 +215,45 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) +-- Updates one of the row with customer_id = 30002 +SELECT * from target t WHERE t.customer_id = 30002; + customer_id | last_order_id | order_center | order_count | last_order +--------------------------------------------------------------------- + 30002 | 103 | AX | -1 | Sun Jan 17 19:53:00 2021 +(1 row) + +-- Turn on notice to print tasks sent to nodes +SET citus.log_remote_commands to true; MERGE INTO target t USING source s - ON (t.customer_id = s.customer_id) + ON (t.customer_id = s.customer_id) AND t.customer_id = 30002 + WHEN MATCHED AND t.order_center = 'XX' THEN + DELETE + WHEN MATCHED THEN + UPDATE SET -- Existing customer, update the order count and last_order_id + order_count = t.order_count + 1, + last_order_id = s.order_id + WHEN NOT MATCHED THEN + DO NOTHING; +NOTICE: issuing MERGE INTO merge_schema.target_xxxxxxx t USING merge_schema.source_xxxxxxx s ON ((t.customer_id OPERATOR(pg_catalog.=) s.customer_id) AND (t.customer_id OPERATOR(pg_catalog.=) 30002)) WHEN MATCHED AND ((t.order_center COLLATE "default") OPERATOR(pg_catalog.=) 'XX'::text) THEN DELETE WHEN MATCHED THEN UPDATE SET last_order_id = s.order_id, order_count = (t.order_count OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT * from target t WHERE t.customer_id = 30002; + customer_id | last_order_id | order_center | order_count | last_order +--------------------------------------------------------------------- + 30002 | 103 | AX | 0 | Sun Jan 17 19:53:00 2021 +(1 row) + +-- Deletes one of the row with customer_id = 30004 +SELECT * from target t WHERE t.customer_id = 30004; + customer_id | last_order_id | order_center | order_count | last_order +--------------------------------------------------------------------- + 30004 | 99 | XX | -1 | Fri Sep 11 03:23:00 2020 +(1 row) + +MERGE INTO target t + USING source s + ON (t.customer_id = s.customer_id) AND t.customer_id = 30004 WHEN MATCHED AND t.order_center = 'XX' THEN DELETE WHEN MATCHED THEN @@ -226,7 +263,11 @@ MERGE INTO target t WHEN NOT MATCHED THEN -- New entry, record it. INSERT (customer_id, last_order_id, order_center, order_count, last_order) VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time); -ERROR: MERGE command is not supported on distributed/reference tables yet +SELECT * from target t WHERE t.customer_id = 30004; + customer_id | last_order_id | order_center | order_count | last_order +--------------------------------------------------------------------- +(0 rows) + -- -- Test MERGE with CTE as source -- @@ -386,18 +427,61 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) +SELECT * FROM t1 order by id; + id | val +--------------------------------------------------------------------- + 1 | 0 + 2 | 0 + 5 | 0 +(3 rows) + +SET citus.log_remote_commands to true; WITH s1_res AS ( SELECT * FROM s1 ) MERGE INTO t1 - USING s1_res ON (s1_res.id = t1.id) + USING s1_res ON (s1_res.id = t1.id) AND t1.id = 6 WHEN MATCHED AND s1_res.val = 0 THEN DELETE WHEN MATCHED THEN UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_xxxxxxx s1) MERGE INTO merge_schema.t1_xxxxxxx t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_xxxxxxx s1) MERGE INTO merge_schema.t1_xxxxxxx t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_xxxxxxx s1) MERGE INTO merge_schema.t1_xxxxxxx t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH s1_res AS (SELECT s1.id, s1.val FROM merge_schema.s1_xxxxxxx s1) MERGE INTO merge_schema.t1_xxxxxxx t1 USING s1_res ON ((s1_res.id OPERATOR(pg_catalog.=) t1.id) AND (t1.id OPERATOR(pg_catalog.=) 6)) WHEN MATCHED AND (s1_res.val OPERATOR(pg_catalog.=) 0) THEN DELETE WHEN MATCHED THEN UPDATE SET val = (t1.val OPERATOR(pg_catalog.+) 1) WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +-- Other than id 6 everything else is a NO match, and should appear in target +SELECT * FROM t1 order by 1, 2; + id | val +--------------------------------------------------------------------- + 1 | 0 + 1 | 0 + 2 | 0 + 2 | 1 + 3 | 1 + 4 | 1 + 5 | 0 + 6 | 1 +(8 rows) + -- -- Test with multiple join conditions -- @@ -553,16 +637,38 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) +SELECT * FROM t2 ORDER BY 1; + id | val | src +--------------------------------------------------------------------- + 1 | 0 | target + 2 | 0 | target + 3 | 1 | match + 4 | 0 | match +(4 rows) + +SET citus.log_remote_commands to true; MERGE INTO t2 USING s2 -ON t2.id = s2.id AND t2.src = s2.src +ON t2.id = s2.id AND t2.src = s2.src AND t2.id = 4 WHEN MATCHED AND t2.val = 1 THEN UPDATE SET val = s2.val + 10 WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN - INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src); -ERROR: MERGE command is not supported on distributed/reference tables yet + DO NOTHING; +NOTICE: issuing MERGE INTO merge_schema.t2_xxxxxxx t2 USING merge_schema.s2_xxxxxxx s2 ON ((t2.id OPERATOR(pg_catalog.=) s2.id) AND (t2.src OPERATOR(pg_catalog.=) s2.src) AND (t2.id OPERATOR(pg_catalog.=) 4)) WHEN MATCHED AND (t2.val OPERATOR(pg_catalog.=) 1) THEN UPDATE SET val = (s2.val OPERATOR(pg_catalog.+) 10) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +-- Row with id = 4 is a match for delete clause, row should be deleted +-- Row with id = 3 is a NO match, row from source will be inserted +SELECT * FROM t2 ORDER BY 1; + id | val | src +--------------------------------------------------------------------- + 1 | 0 | target + 2 | 0 | target + 3 | 1 | match +(3 rows) + -- -- With sub-query as the MERGE source -- @@ -943,7 +1049,7 @@ WHEN MATCHED THEN UPDATE SET value = vl_source.value, id = vl_target.id + 1 WHEN NOT MATCHED THEN INSERT VALUES(vl_source.ID, vl_source.value); -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO vl_local FROM vl_target ORDER BY 1 ; -- Should be equal @@ -996,7 +1102,7 @@ WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(rs_source.id); -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO rs_local FROM rs_target ORDER BY 1 ; -- Should be equal @@ -1132,7 +1238,7 @@ DEBUG: function does not have co-located tables DEBUG: generating subplan XXX_1 for subquery SELECT id, source FROM merge_schema.f_dist() f(id integer, source character varying) DEBUG: DEBUG: Plan XXX query after replacing subqueries and CTEs: MERGE INTO merge_schema.fn_target USING (SELECT intermediate_result.id, intermediate_result.source FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source) -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO fn_local FROM fn_target ORDER BY 1 ; -- Should be equal @@ -1204,7 +1310,7 @@ MERGE INTO ft_target DELETE WHEN NOT MATCHED THEN INSERT (id, user_val) VALUES (foreign_table.id, foreign_table.user_val); -DEBUG: +DEBUG: RESET client_min_messages; SELECT * FROM ft_target; id | user_val @@ -1213,9 +1319,866 @@ SELECT * FROM ft_target; 3 | source (2 rows) +-- +-- complex joins on the source side +-- +-- source(join of two relations) relation is an unaliased join +CREATE TABLE target_cj(tid int, src text, val int); +CREATE TABLE source_cj1(sid1 int, src1 text, val1 int); +CREATE TABLE source_cj2(sid2 int, src2 text, val2 int); +INSERT INTO target_cj VALUES (1, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (3, 'target', 0); +INSERT INTO source_cj1 VALUES (2, 'source-1', 10); +INSERT INTO source_cj2 VALUES (2, 'source-2', 20); +BEGIN; +MERGE INTO target_cj t +USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src2 +WHEN NOT MATCHED THEN + DO NOTHING; +-- Gold result to compare against +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | source-2 | 0 + 2 | source-2 | 0 + 3 | target | 0 +(4 rows) + +ROLLBACK; +BEGIN; +-- try accessing columns from either side of the source join +MERGE INTO target_cj t +USING source_cj1 s2 + INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET tid = sid2, src = src1, val = val2 +WHEN NOT MATCHED THEN + DO NOTHING; +-- Gold result to compare against +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | source-1 | 20 + 2 | source-1 | 20 + 3 | target | 0 +(4 rows) + +ROLLBACK; +-- Test the same scenarios with distributed tables +SELECT create_distributed_table('target_cj', 'tid'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.target_cj$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('source_cj1', 'sid1'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.source_cj1$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('source_cj2', 'sid2'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.source_cj2$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO target_cj t +USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src2 +WHEN NOT MATCHED THEN + DO NOTHING; +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.target_cj_xxxxxxx t USING (merge_schema.source_cj1_xxxxxxx s1 JOIN merge_schema.source_cj2_xxxxxxx s2 ON ((s1.sid1 OPERATOR(pg_catalog.=) s2.sid2))) ON ((t.tid OPERATOR(pg_catalog.=) s1.sid1) AND (t.tid OPERATOR(pg_catalog.=) 2)) WHEN MATCHED THEN UPDATE SET src = s2.src2 WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | source-2 | 0 + 2 | source-2 | 0 + 3 | target | 0 +(4 rows) + +ROLLBACK; +BEGIN; +-- try accessing columns from either side of the source join +MERGE INTO target_cj t +USING source_cj1 s2 + INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src1, val = val2 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | source-1 | 20 + 2 | source-1 | 20 + 3 | target | 0 +(4 rows) + +ROLLBACK; +-- sub-query as a source +BEGIN; +MERGE INTO target_cj t +USING (SELECT * FROM source_cj1 WHERE sid1 = 2) sub +ON t.tid = sub.sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = sub.src1, val = val1 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | source-1 | 10 + 2 | source-1 | 10 + 3 | target | 0 +(4 rows) + +ROLLBACK; +-- Test self-join +BEGIN; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | target | 0 + 2 | target | 0 + 3 | target | 0 +(4 rows) + +set citus.log_remote_commands to true; +MERGE INTO target_cj t1 +USING (SELECT * FROM target_cj) sub +ON t1.tid = sub.tid AND t1.tid = 3 +WHEN MATCHED THEN + UPDATE SET src = sub.src, val = sub.val + 100 +WHEN NOT MATCHED THEN + DO NOTHING; +NOTICE: issuing MERGE INTO merge_schema.target_cj_xxxxxxx t1 USING (SELECT target_cj.tid, target_cj.src, target_cj.val FROM merge_schema.target_cj_xxxxxxx target_cj) sub ON ((t1.tid OPERATOR(pg_catalog.=) sub.tid) AND (t1.tid OPERATOR(pg_catalog.=) 3)) WHEN MATCHED THEN UPDATE SET src = sub.src, val = (sub.val OPERATOR(pg_catalog.+) 100) WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +set citus.log_remote_commands to false; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | target | 0 + 2 | target | 0 + 3 | target | 100 +(4 rows) + +ROLLBACK; +-- Test PREPARE +PREPARE foo(int) AS +MERGE INTO target_cj target +USING (SELECT * FROM source_cj1) sub +ON target.tid = sub.sid1 AND target.tid = $1 +WHEN MATCHED THEN + UPDATE SET val = sub.val1 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | target | 0 + 2 | target | 0 + 3 | target | 0 +(4 rows) + +BEGIN; +EXECUTE foo(2); +EXECUTE foo(2); +EXECUTE foo(2); +EXECUTE foo(2); +EXECUTE foo(2); +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | target | 10 + 2 | target | 10 + 3 | target | 0 +(4 rows) + +ROLLBACK; +BEGIN; +SET citus.log_remote_commands to true; +SET client_min_messages TO DEBUG1; +EXECUTE foo(2); +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +RESET client_min_messages; +EXECUTE foo(2); +NOTICE: issuing MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | target | 10 + 2 | target | 10 + 3 | target | 0 +(4 rows) + +ROLLBACK; +-- Test distributed tables, must be co-located and joined on distribution column. +-- +-- We create two sets of source and target tables, one set is Postgres and the other +-- is Citus distributed. Run the _exact_ MERGE SQL on both the sets and compare the +-- final results of target tables of Postgres and Citus, the result should match. +-- This is repeated for various MERGE SQL combinations +-- +CREATE TABLE pg_target(id int, val varchar); +CREATE TABLE pg_source(id int, val varchar); +CREATE TABLE citus_target(id int, val varchar); +CREATE TABLE citus_source(id int, val varchar); +-- Half of the source rows do not match +INSERT INTO pg_target SELECT i, 'target' FROM generate_series(250, 500) i; +INSERT INTO pg_source SELECT i, 'source' FROM generate_series(1, 500) i; +INSERT INTO citus_target SELECT i, 'target' FROM generate_series(250, 500) i; +INSERT INTO citus_source SELECT i, 'source' FROM generate_series(1, 500) i; +SELECT create_distributed_table('citus_target', 'id'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.citus_target$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('citus_source', 'id'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.citus_source$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- +-- This routine compares the target tables of Postgres and Citus and +-- returns true if they match, false if the results do not match. +-- +CREATE OR REPLACE FUNCTION compare_tables() RETURNS BOOLEAN AS $$ +DECLARE ret BOOL; +BEGIN +SELECT count(1) = 0 INTO ret + FROM pg_target + FULL OUTER JOIN citus_target + USING (id, val) + WHERE pg_target.id IS NULL + OR citus_target.id IS NULL; +RETURN ret; +END +$$ LANGUAGE PLPGSQL; +-- Make sure we start with exact data in Postgres and Citus +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +-- Run the MERGE on both Postgres and Citus, and compare the final target tables +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- +-- ON clause filter on source +-- +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND s.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND s.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- +-- ON clause filter on target +-- +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND t.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND t.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (t.id OPERATOR(pg_catalog.<) 100)) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 400) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- +-- NOT MATCHED clause filter on source +-- +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id +WHEN MATCHED THEN + DO NOTHING +WHEN NOT MATCHED AND s.id < 100 THEN + INSERT VALUES(s.id, s.val); +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id +WHEN MATCHED THEN + DO NOTHING +WHEN NOT MATCHED AND s.id < 100 THEN + INSERT VALUES(s.id, s.val); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON (t.id OPERATOR(pg_catalog.=) s.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED AND (s.id OPERATOR(pg_catalog.<) 100) THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- +-- Test constant filter in ON clause to check if shards are pruned +-- with restriction information +-- +-- +-- Though constant filter is present, this won't prune shards as +-- NOT MATCHED clause is present +-- +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s.id, s.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- This will prune shards with restriction information as NOT MATCHED is void +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + DO NOTHING; +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + DO NOTHING; +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING merge_schema.citus_source_xxxxxxx s ON ((t.id OPERATOR(pg_catalog.=) s.id) AND (s.id OPERATOR(pg_catalog.=) 250)) WHEN MATCHED THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by Merge'::text) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Test CTE with distributed tables +CREATE VIEW pg_source_view AS SELECT * FROM pg_source WHERE id < 400; +WARNING: "view pg_source_view" has dependency to "table pg_source" that is not in Citus' metadata +DETAIL: "view pg_source_view" will be created only locally +HINT: Distribute "table pg_source" first to distribute "view pg_source_view" +CREATE VIEW citus_source_view AS SELECT * FROM citus_source WHERE id < 400; +BEGIN; +SEt citus.log_remote_commands to true; +WITH cte AS ( + SELECT * FROM pg_source_view +) +MERGE INTO pg_target t +USING cte +ON cte.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by CTE' +WHEN NOT MATCHED THEN + INSERT VALUES (cte.id, cte.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; +WITH cte AS ( + SELECT * FROM citus_source_view +) +MERGE INTO citus_target t +USING cte +ON cte.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by CTE' +WHEN NOT MATCHED THEN + INSERT VALUES (cte.id, cte.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH cte AS (SELECT citus_source_view.id, citus_source_view.val FROM (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source WHERE (citus_source.id OPERATOR(pg_catalog.<) 400)) citus_source_view) MERGE INTO merge_schema.citus_target_xxxxxxx t USING cte ON (cte.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by CTE'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (cte.id, cte.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Test sub-query with distributed tables +BEGIN; +SEt citus.log_remote_commands to true; +MERGE INTO pg_target t +USING (SELECT * FROM pg_source) subq +ON subq.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by subquery' +WHEN NOT MATCHED THEN + INSERT VALUES (subq.id, subq.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; +MERGE INTO citus_target t +USING (SELECT * FROM citus_source) subq +ON subq.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by subquery' +WHEN NOT MATCHED THEN + INSERT VALUES (subq.id, subq.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx t USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) subq ON (subq.id OPERATOR(pg_catalog.=) t.id) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.>) 350) THEN UPDATE SET val = (((t.val COLLATE "default") OPERATOR(pg_catalog.||) 'Updated by subquery'::text) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (subq.id, subq.val) WHEN MATCHED AND (t.id OPERATOR(pg_catalog.<) 350) THEN DELETE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Test PREPARE +PREPARE pg_prep(int) AS +MERGE INTO pg_target +USING (SELECT * FROM pg_source) sub +ON pg_target.id = sub.id AND pg_target.id = $1 +WHEN MATCHED THEN + UPDATE SET val = 'Updated by prepare using ' || sub.val +WHEN NOT MATCHED THEN + DO NOTHING; +PREPARE citus_prep(int) AS +MERGE INTO citus_target +USING (SELECT * FROM citus_source) sub +ON citus_target.id = sub.id AND citus_target.id = $1 +WHEN MATCHED THEN + UPDATE SET val = 'Updated by prepare using ' || sub.val +WHEN NOT MATCHED THEN + DO NOTHING; +BEGIN; +SET citus.log_remote_commands to true; +SELECT * FROM pg_target WHERE id = 500; -- before merge + id | val +--------------------------------------------------------------------- + 500 | target +(1 row) + +EXECUTE pg_prep(500); +SELECT * FROM pg_target WHERE id = 500; -- non-cached + id | val +--------------------------------------------------------------------- + 500 | Updated by prepare using source +(1 row) + +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +SELECT * FROM pg_target WHERE id = 500; -- cached + id | val +--------------------------------------------------------------------- + 500 | Updated by prepare using source +(1 row) + +SELECT * FROM citus_target WHERE id = 500; -- before merge +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT id, val FROM merge_schema.citus_target_xxxxxxx citus_target WHERE (id OPERATOR(pg_catalog.=) 500) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + id | val +--------------------------------------------------------------------- + 500 | target +(1 row) + +EXECUTE citus_prep(500); +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SELECT * FROM citus_target WHERE id = 500; -- non-cached +NOTICE: issuing SELECT id, val FROM merge_schema.citus_target_xxxxxxx citus_target WHERE (id OPERATOR(pg_catalog.=) 500) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + id | val +--------------------------------------------------------------------- + 500 | Updated by prepare using source +(1 row) + +EXECUTE citus_prep(500); +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +EXECUTE citus_prep(500); +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +EXECUTE citus_prep(500); +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +EXECUTE citus_prep(500); +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +EXECUTE citus_prep(500); +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SELECT * FROM citus_target WHERE id = 500; -- cached +NOTICE: issuing SELECT id, val FROM merge_schema.citus_target_xxxxxxx citus_target WHERE (id OPERATOR(pg_catalog.=) 500) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + id | val +--------------------------------------------------------------------- + 500 | Updated by prepare using source +(1 row) + +SET citus.log_remote_commands to false; +SELECT compare_tables(); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Test partitions + distributed tables +CREATE TABLE pg_pa_target (tid integer, balance float, val text) + PARTITION BY LIST (tid); +CREATE TABLE citus_pa_target (tid integer, balance float, val text) + PARTITION BY LIST (tid); +CREATE TABLE part1 PARTITION OF pg_pa_target FOR VALUES IN (1,4) + WITH (autovacuum_enabled=off); +CREATE TABLE part2 PARTITION OF pg_pa_target FOR VALUES IN (2,5,6) + WITH (autovacuum_enabled=off); +CREATE TABLE part3 PARTITION OF pg_pa_target FOR VALUES IN (3,8,9) + WITH (autovacuum_enabled=off); +CREATE TABLE part4 PARTITION OF pg_pa_target DEFAULT + WITH (autovacuum_enabled=off); +CREATE TABLE part5 PARTITION OF citus_pa_target FOR VALUES IN (1,4) + WITH (autovacuum_enabled=off); +CREATE TABLE part6 PARTITION OF citus_pa_target FOR VALUES IN (2,5,6) + WITH (autovacuum_enabled=off); +CREATE TABLE part7 PARTITION OF citus_pa_target FOR VALUES IN (3,8,9) + WITH (autovacuum_enabled=off); +CREATE TABLE part8 PARTITION OF citus_pa_target DEFAULT + WITH (autovacuum_enabled=off); +CREATE TABLE pg_pa_source (sid integer, delta float); +CREATE TABLE citus_pa_source (sid integer, delta float); +-- insert many rows to the source table +INSERT INTO pg_pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +INSERT INTO citus_pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- insert a few rows in the target table (odd numbered tid) +INSERT INTO pg_pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id; +INSERT INTO citus_pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id; +SELECT create_distributed_table('citus_pa_target', 'tid'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.part5$$) +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.part6$$) +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.part7$$) +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.part8$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('citus_pa_source', 'sid'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.citus_pa_source$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION pa_compare_tables() RETURNS BOOLEAN AS $$ +DECLARE ret BOOL; +BEGIN +SELECT count(1) = 0 INTO ret + FROM pg_pa_target + FULL OUTER JOIN citus_pa_target + USING (tid, balance, val) + WHERE pg_pa_target.tid IS NULL + OR citus_pa_target.tid IS NULL; +RETURN ret; +END +$$ LANGUAGE PLPGSQL; +-- try simple MERGE +BEGIN; +MERGE INTO pg_pa_target t + USING pg_pa_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); +MERGE INTO citus_pa_target t + USING citus_pa_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); +SELECT pa_compare_tables(); + pa_compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO pg_pa_target t + USING pg_pa_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); +MERGE INTO citus_pa_target t + USING citus_pa_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); +SELECT pa_compare_tables(); + pa_compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; -- -- Error and Unsupported scenarios -- +-- try updating the distribution key column +BEGIN; +MERGE INTO target_cj t + USING source_cj1 s + ON t.tid = s.sid1 AND t.tid = 2 + WHEN MATCHED THEN + UPDATE SET tid = tid + 9, src = src || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid1, 'inserted by merge', val1); +ERROR: modifying the partition value of rows is not allowed +ROLLBACK; -- Foreign table as target MERGE INTO foreign_table USING ft_target ON (foreign_table.id = ft_target.id) @@ -1274,7 +2237,54 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1.id, s1.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported with combination of distributed/local tables yet +-- Now both s1 and t1 are distributed tables +SELECT undistribute_table('t1'); +NOTICE: creating a new table for merge_schema.t1 +NOTICE: moving the data of merge_schema.t1 +NOTICE: dropping the old merge_schema.t1 +NOTICE: renaming the new table to merge_schema.t1 + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('t1', 'id'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.t1$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- We have a potential pitfall where a function can be invoked in +-- the MERGE conditions which can insert/update to a random shard +CREATE OR REPLACE function merge_when_and_write() RETURNS BOOLEAN +LANGUAGE PLPGSQL AS +$$ +BEGIN + INSERT INTO t1 VALUES (100, 100); + RETURN TRUE; +END; +$$; +-- Test preventing "ON" join condition from writing to the database +BEGIN; +MERGE INTO t1 +USING s1 ON t1.id = s1.id AND t1.id = 2 AND (merge_when_and_write()) +WHEN MATCHED THEN + UPDATE SET val = t1.val + s1.val; +ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ROLLBACK; +-- Test preventing WHEN clause(s) from writing to the database +BEGIN; +MERGE INTO t1 +USING s1 ON t1.id = s1.id AND t1.id = 2 +WHEN MATCHED AND (merge_when_and_write()) THEN + UPDATE SET val = t1.val + s1.val; +ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ROLLBACK; -- Joining on partition columns with sub-query MERGE INTO t1 USING (SELECT * FROM s1) sub ON (sub.val = t1.id) -- sub.val is not a distribution column @@ -1284,7 +2294,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column -- Joining on partition columns with CTE WITH s1_res AS ( SELECT * FROM s1 @@ -1297,7 +2307,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column -- Constant Join condition WITH s1_res AS ( SELECT * FROM s1 @@ -1310,7 +2320,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column -- With a single WHEN clause, which causes a non-left join WITH s1_res AS ( SELECT * FROM s1 @@ -1319,7 +2329,7 @@ WITH s1_res AS ( WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column -- -- Reference tables -- @@ -1371,7 +2381,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1.id, s1.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported on reference tables yet -- -- Postgres + Citus-Distributed table -- @@ -1413,7 +2423,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1.id, s1.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported with combination of distributed/local tables yet MERGE INTO t1 USING (SELECT * FROM s1) sub ON (sub.id = t1.id) WHEN MATCHED AND sub.val = 0 THEN @@ -1422,7 +2432,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported with combination of distributed/local tables yet CREATE TABLE pg(val int); SELECT create_distributed_table('s1', 'id'); NOTICE: Copying data from local table... @@ -1443,7 +2453,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported with combination of distributed/local tables yet -- Mix Postgres table in CTE WITH pg_res AS ( SELECT * FROM pg @@ -1456,7 +2466,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported with combination of distributed/local tables yet -- Match more than one source row should fail same as Postgres behavior SELECT undistribute_table('t1'); NOTICE: creating a new table for merge_schema.t1 @@ -1511,6 +2521,234 @@ WHEN NOT MATCHED THEN INSERT VALUES(mv_source.id, mv_source.val); ERROR: cannot execute MERGE on relation "mv_source" DETAIL: This operation is not supported for materialized views. +-- Distributed tables *must* be colocated +CREATE TABLE dist_target(id int, val varchar); +SELECT create_distributed_table('dist_target', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE dist_source(id int, val varchar); +SELECT create_distributed_table('dist_source', 'id', colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); +ERROR: For MERGE command, all the distributed tables must be colocated +-- Distributed tables *must* be joined on distribution column +CREATE TABLE dist_colocated(id int, val int); +SELECT create_distributed_table('dist_colocated', 'id', colocate_with => 'dist_target'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO dist_target +USING dist_colocated +ON dist_target.id = dist_colocated.val -- val is not the distribution column +WHEN MATCHED THEN +UPDATE SET val = dist_colocated.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_colocated.id, dist_colocated.val); +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +-- Both the source and target must be distributed +MERGE INTO dist_target +USING (SELECT 100 id) AS source +ON dist_target.id = source.id AND dist_target.val = 'const' +WHEN MATCHED THEN +UPDATE SET val = 'source' +WHEN NOT MATCHED THEN +INSERT VALUES(source.id, 'source'); +ERROR: For MERGE command, both the source and target must be distributed +-- Non-hash distributed tables (append/range). +CREATE VIEW show_tables AS +SELECT logicalrelid, partmethod +FROM pg_dist_partition +WHERE (logicalrelid = 'dist_target'::regclass) OR (logicalrelid = 'dist_source'::regclass) +ORDER BY 1; +SELECT undistribute_table('dist_source'); +NOTICE: creating a new table for merge_schema.dist_source +NOTICE: moving the data of merge_schema.dist_source +NOTICE: dropping the old merge_schema.dist_source +NOTICE: drop cascades to view show_tables +CONTEXT: SQL statement "DROP TABLE merge_schema.dist_source CASCADE" +NOTICE: renaming the new table to merge_schema.dist_source + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_source', 'id', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM show_tables; + logicalrelid | partmethod +--------------------------------------------------------------------- + dist_target | h + dist_source | a +(2 rows) + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); +ERROR: For MERGE command, all the distributed tables must be colocated, for append/range distribution, colocation is not supported +HINT: Consider using hash distribution instead +SELECT undistribute_table('dist_source'); +NOTICE: creating a new table for merge_schema.dist_source +NOTICE: moving the data of merge_schema.dist_source +NOTICE: dropping the old merge_schema.dist_source +NOTICE: drop cascades to view show_tables +CONTEXT: SQL statement "DROP TABLE merge_schema.dist_source CASCADE" +NOTICE: renaming the new table to merge_schema.dist_source + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_source', 'id', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM show_tables; + logicalrelid | partmethod +--------------------------------------------------------------------- + dist_target | h + dist_source | r +(2 rows) + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); +ERROR: For MERGE command, all the distributed tables must be colocated, for append/range distribution, colocation is not supported +HINT: Consider using hash distribution instead +-- Both are append tables +SELECT undistribute_table('dist_target'); +NOTICE: creating a new table for merge_schema.dist_target +NOTICE: moving the data of merge_schema.dist_target +NOTICE: dropping the old merge_schema.dist_target +NOTICE: drop cascades to view show_tables +CONTEXT: SQL statement "DROP TABLE merge_schema.dist_target CASCADE" +NOTICE: renaming the new table to merge_schema.dist_target + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT undistribute_table('dist_source'); +NOTICE: creating a new table for merge_schema.dist_source +NOTICE: moving the data of merge_schema.dist_source +NOTICE: dropping the old merge_schema.dist_source +NOTICE: drop cascades to view show_tables +CONTEXT: SQL statement "DROP TABLE merge_schema.dist_source CASCADE" +NOTICE: renaming the new table to merge_schema.dist_source + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_target', 'id', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_source', 'id', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM show_tables; + logicalrelid | partmethod +--------------------------------------------------------------------- + dist_target | a + dist_source | a +(2 rows) + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); +ERROR: For MERGE command, all the distributed tables must be colocated, for append/range distribution, colocation is not supported +HINT: Consider using hash distribution instead +-- Both are range tables +SELECT undistribute_table('dist_target'); +NOTICE: creating a new table for merge_schema.dist_target +NOTICE: moving the data of merge_schema.dist_target +NOTICE: dropping the old merge_schema.dist_target +NOTICE: drop cascades to view show_tables +CONTEXT: SQL statement "DROP TABLE merge_schema.dist_target CASCADE" +NOTICE: renaming the new table to merge_schema.dist_target + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT undistribute_table('dist_source'); +NOTICE: creating a new table for merge_schema.dist_source +NOTICE: moving the data of merge_schema.dist_source +NOTICE: dropping the old merge_schema.dist_source +NOTICE: drop cascades to view show_tables +CONTEXT: SQL statement "DROP TABLE merge_schema.dist_source CASCADE" +NOTICE: renaming the new table to merge_schema.dist_source + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_target', 'id', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_source', 'id', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM show_tables; + logicalrelid | partmethod +--------------------------------------------------------------------- + dist_target | r + dist_source | r +(2 rows) + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); +ERROR: For MERGE command, all the distributed tables must be colocated, for append/range distribution, colocation is not supported +HINT: Consider using hash distribution instead DROP SERVER foreign_server CASCADE; NOTICE: drop cascades to 3 other objects DETAIL: drop cascades to user mapping for postgres on server foreign_server @@ -1519,8 +2757,9 @@ drop cascades to foreign table foreign_table NOTICE: foreign table "foreign_table_4000046" does not exist, skipping CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name, drop_shards_metadata_only := false)" PL/pgSQL function citus_drop_trigger() line XX at PERFORM +DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 56 other objects +NOTICE: drop cascades to 75 other objects DETAIL: drop cascades to function insert_data() drop cascades to table pg_result drop cascades to table local_local @@ -1572,11 +2811,30 @@ drop cascades to table ft_target drop cascades to table ft_source_4000045 drop cascades to table ft_source drop cascades to extension postgres_fdw +drop cascades to table target_cj +drop cascades to table source_cj1 +drop cascades to table source_cj2 +drop cascades to table pg_target +drop cascades to table pg_source +drop cascades to table citus_target +drop cascades to table citus_source +drop cascades to function compare_tables() +drop cascades to view pg_source_view +drop cascades to view citus_source_view +drop cascades to table pg_pa_target +drop cascades to table citus_pa_target +drop cascades to table pg_pa_source +drop cascades to table citus_pa_source +drop cascades to function pa_compare_tables() drop cascades to table pg -drop cascades to table t1_4000062 -drop cascades to table s1_4000063 +drop cascades to table t1_4000110 +drop cascades to table s1_4000111 drop cascades to table t1 drop cascades to table s1 +drop cascades to table dist_colocated +drop cascades to table dist_target +drop cascades to table dist_source +drop cascades to view show_tables SELECT 1 FROM master_remove_node('localhost', :master_port); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/pg15.out b/src/test/regress/expected/pg15.out index 7a41b25ec..d92686b93 100644 --- a/src/test/regress/expected/pg15.out +++ b/src/test/regress/expected/pg15.out @@ -315,7 +315,7 @@ SELECT create_reference_table('tbl2'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported on reference tables yet -- now, both are reference, still not supported SELECT create_reference_table('tbl1'); create_reference_table @@ -325,7 +325,7 @@ SELECT create_reference_table('tbl1'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is not supported on reference tables yet -- now, both distributed, not works SELECT undistribute_table('tbl1'); NOTICE: creating a new table for pg15.tbl1 @@ -419,14 +419,14 @@ SELECT create_distributed_table('tbl2', 'x'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column -- also, not inside subqueries & ctes WITH targq AS ( SELECT * FROM tbl2 ) MERGE INTO tbl1 USING targq ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column -- crashes on beta3, fixed on 15 stable --WITH foo AS ( -- MERGE INTO tbl1 USING tbl2 ON (true) @@ -441,7 +441,7 @@ USING tbl2 ON (true) WHEN MATCHED THEN UPDATE SET x = (SELECT count(*) FROM tbl2); -ERROR: MERGE command is not supported on distributed/reference tables yet +ERROR: MERGE command is only supported when distributed tables are joined on their distribution column -- test numeric types with negative scale CREATE TABLE numeric_negative_scale(numeric_column numeric(3,-1), orig_value int); INSERT into numeric_negative_scale SELECT x,x FROM generate_series(111, 115) x; diff --git a/src/test/regress/expected/pgmerge.out b/src/test/regress/expected/pgmerge.out index b90760691..0bedf356f 100644 --- a/src/test/regress/expected/pgmerge.out +++ b/src/test/regress/expected/pgmerge.out @@ -910,7 +910,15 @@ MERGE INTO wq_target t USING wq_source s ON t.tid = s.sid WHEN MATCHED AND (merge_when_and_write()) THEN UPDATE SET balance = t.balance + s.balance; -ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE +ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ROLLBACK; +-- Test preventing ON condition from writing to the database +BEGIN; +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid AND (merge_when_and_write()) +WHEN MATCHED THEN + UPDATE SET balance = t.balance + s.balance; +ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE ROLLBACK; drop function merge_when_and_write(); DROP TABLE wq_target, wq_source; @@ -1891,7 +1899,7 @@ MERGE INTO pa_target t UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (slogts::timestamp, sid, delta, 'inserted by merge'); -DEBUG: +DEBUG: SELECT * FROM pa_target ORDER BY tid; logts | tid | balance | val --------------------------------------------------------------------- @@ -2083,7 +2091,7 @@ WHEN MATCHED THEN UPDATE WHEN NOT MATCHED THEN INSERT (city_id, logdate, peaktemp, unitsales) VALUES (city_id, logdate, peaktemp, unitsales); -DEBUG: +DEBUG: RESET client_min_messages; SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate; tableoid | city_id | logdate | peaktemp | unitsales diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index c266b5333..12294b2c9 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -19,6 +19,7 @@ SET search_path TO merge_schema; SET citus.shard_count TO 4; SET citus.next_shard_id TO 4000000; SET citus.explain_all_tasks to true; +SET citus.shard_replication_factor TO 1; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); CREATE TABLE source @@ -143,9 +144,33 @@ SELECT undistribute_table('source'); SELECT create_distributed_table('target', 'customer_id'); SELECT create_distributed_table('source', 'customer_id'); +-- Updates one of the row with customer_id = 30002 +SELECT * from target t WHERE t.customer_id = 30002; +-- Turn on notice to print tasks sent to nodes +SET citus.log_remote_commands to true; MERGE INTO target t USING source s - ON (t.customer_id = s.customer_id) + ON (t.customer_id = s.customer_id) AND t.customer_id = 30002 + + WHEN MATCHED AND t.order_center = 'XX' THEN + DELETE + + WHEN MATCHED THEN + UPDATE SET -- Existing customer, update the order count and last_order_id + order_count = t.order_count + 1, + last_order_id = s.order_id + + WHEN NOT MATCHED THEN + DO NOTHING; + +SET citus.log_remote_commands to false; +SELECT * from target t WHERE t.customer_id = 30002; + +-- Deletes one of the row with customer_id = 30004 +SELECT * from target t WHERE t.customer_id = 30004; +MERGE INTO target t + USING source s + ON (t.customer_id = s.customer_id) AND t.customer_id = 30004 WHEN MATCHED AND t.order_center = 'XX' THEN DELETE @@ -158,6 +183,7 @@ MERGE INTO target t WHEN NOT MATCHED THEN -- New entry, record it. INSERT (customer_id, last_order_id, order_center, order_count, last_order) VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time); +SELECT * from target t WHERE t.customer_id = 30004; -- -- Test MERGE with CTE as source @@ -243,11 +269,13 @@ SELECT create_distributed_table('t1', 'id'); SELECT create_distributed_table('s1', 'id'); +SELECT * FROM t1 order by id; +SET citus.log_remote_commands to true; WITH s1_res AS ( SELECT * FROM s1 ) MERGE INTO t1 - USING s1_res ON (s1_res.id = t1.id) + USING s1_res ON (s1_res.id = t1.id) AND t1.id = 6 WHEN MATCHED AND s1_res.val = 0 THEN DELETE @@ -255,6 +283,9 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); +SET citus.log_remote_commands to false; +-- Other than id 6 everything else is a NO match, and should appear in target +SELECT * FROM t1 order by 1, 2; -- -- Test with multiple join conditions @@ -325,15 +356,21 @@ SELECT undistribute_table('s2'); SELECT create_distributed_table('t2', 'id'); SELECT create_distributed_table('s2', 'id'); +SELECT * FROM t2 ORDER BY 1; +SET citus.log_remote_commands to true; MERGE INTO t2 USING s2 -ON t2.id = s2.id AND t2.src = s2.src +ON t2.id = s2.id AND t2.src = s2.src AND t2.id = 4 WHEN MATCHED AND t2.val = 1 THEN UPDATE SET val = s2.val + 10 WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN - INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src); + DO NOTHING; +SET citus.log_remote_commands to false; +-- Row with id = 4 is a match for delete clause, row should be deleted +-- Row with id = 3 is a NO match, row from source will be inserted +SELECT * FROM t2 ORDER BY 1; -- -- With sub-query as the MERGE source @@ -824,10 +861,577 @@ RESET client_min_messages; SELECT * FROM ft_target; +-- +-- complex joins on the source side +-- + +-- source(join of two relations) relation is an unaliased join + +CREATE TABLE target_cj(tid int, src text, val int); +CREATE TABLE source_cj1(sid1 int, src1 text, val1 int); +CREATE TABLE source_cj2(sid2 int, src2 text, val2 int); + +INSERT INTO target_cj VALUES (1, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (3, 'target', 0); + +INSERT INTO source_cj1 VALUES (2, 'source-1', 10); +INSERT INTO source_cj2 VALUES (2, 'source-2', 20); + +BEGIN; +MERGE INTO target_cj t +USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src2 +WHEN NOT MATCHED THEN + DO NOTHING; +-- Gold result to compare against +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +BEGIN; +-- try accessing columns from either side of the source join +MERGE INTO target_cj t +USING source_cj1 s2 + INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET tid = sid2, src = src1, val = val2 +WHEN NOT MATCHED THEN + DO NOTHING; +-- Gold result to compare against +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +-- Test the same scenarios with distributed tables + +SELECT create_distributed_table('target_cj', 'tid'); +SELECT create_distributed_table('source_cj1', 'sid1'); +SELECT create_distributed_table('source_cj2', 'sid2'); + +BEGIN; +SET citus.log_remote_commands to true; +MERGE INTO target_cj t +USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src2 +WHEN NOT MATCHED THEN + DO NOTHING; +SET citus.log_remote_commands to false; +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +BEGIN; +-- try accessing columns from either side of the source join +MERGE INTO target_cj t +USING source_cj1 s2 + INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src1, val = val2 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +-- sub-query as a source +BEGIN; +MERGE INTO target_cj t +USING (SELECT * FROM source_cj1 WHERE sid1 = 2) sub +ON t.tid = sub.sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = sub.src1, val = val1 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +-- Test self-join +BEGIN; +SELECT * FROM target_cj ORDER BY 1; +set citus.log_remote_commands to true; +MERGE INTO target_cj t1 +USING (SELECT * FROM target_cj) sub +ON t1.tid = sub.tid AND t1.tid = 3 +WHEN MATCHED THEN + UPDATE SET src = sub.src, val = sub.val + 100 +WHEN NOT MATCHED THEN + DO NOTHING; +set citus.log_remote_commands to false; +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + + +-- Test PREPARE +PREPARE foo(int) AS +MERGE INTO target_cj target +USING (SELECT * FROM source_cj1) sub +ON target.tid = sub.sid1 AND target.tid = $1 +WHEN MATCHED THEN + UPDATE SET val = sub.val1 +WHEN NOT MATCHED THEN + DO NOTHING; + +SELECT * FROM target_cj ORDER BY 1; + +BEGIN; +EXECUTE foo(2); +EXECUTE foo(2); +EXECUTE foo(2); +EXECUTE foo(2); +EXECUTE foo(2); +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +BEGIN; + +SET citus.log_remote_commands to true; +SET client_min_messages TO DEBUG1; +EXECUTE foo(2); +RESET client_min_messages; + +EXECUTE foo(2); +SET citus.log_remote_commands to false; + +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +-- Test distributed tables, must be co-located and joined on distribution column. + +-- +-- We create two sets of source and target tables, one set is Postgres and the other +-- is Citus distributed. Run the _exact_ MERGE SQL on both the sets and compare the +-- final results of target tables of Postgres and Citus, the result should match. +-- This is repeated for various MERGE SQL combinations +-- +CREATE TABLE pg_target(id int, val varchar); +CREATE TABLE pg_source(id int, val varchar); +CREATE TABLE citus_target(id int, val varchar); +CREATE TABLE citus_source(id int, val varchar); + +-- Half of the source rows do not match +INSERT INTO pg_target SELECT i, 'target' FROM generate_series(250, 500) i; +INSERT INTO pg_source SELECT i, 'source' FROM generate_series(1, 500) i; + +INSERT INTO citus_target SELECT i, 'target' FROM generate_series(250, 500) i; +INSERT INTO citus_source SELECT i, 'source' FROM generate_series(1, 500) i; + +SELECT create_distributed_table('citus_target', 'id'); +SELECT create_distributed_table('citus_source', 'id'); + +-- +-- This routine compares the target tables of Postgres and Citus and +-- returns true if they match, false if the results do not match. +-- +CREATE OR REPLACE FUNCTION compare_tables() RETURNS BOOLEAN AS $$ +DECLARE ret BOOL; +BEGIN +SELECT count(1) = 0 INTO ret + FROM pg_target + FULL OUTER JOIN citus_target + USING (id, val) + WHERE pg_target.id IS NULL + OR citus_target.id IS NULL; +RETURN ret; +END +$$ LANGUAGE PLPGSQL; + +-- Make sure we start with exact data in Postgres and Citus +SELECT compare_tables(); + +-- Run the MERGE on both Postgres and Citus, and compare the final target tables + +BEGIN; +SET citus.log_remote_commands to true; + +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- +-- ON clause filter on source +-- +BEGIN; +SET citus.log_remote_commands to true; + +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND s.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND s.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- +-- ON clause filter on target +-- +BEGIN; +SET citus.log_remote_commands to true; + +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND t.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND t.id < 100 +WHEN MATCHED AND t.id > 400 THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- +-- NOT MATCHED clause filter on source +-- +BEGIN; +SET citus.log_remote_commands to true; + +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id +WHEN MATCHED THEN + DO NOTHING +WHEN NOT MATCHED AND s.id < 100 THEN + INSERT VALUES(s.id, s.val); + +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id +WHEN MATCHED THEN + DO NOTHING +WHEN NOT MATCHED AND s.id < 100 THEN + INSERT VALUES(s.id, s.val); + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- +-- Test constant filter in ON clause to check if shards are pruned +-- with restriction information +-- + +-- +-- Though constant filter is present, this won't prune shards as +-- NOT MATCHED clause is present +-- +BEGIN; +SET citus.log_remote_commands to true; + +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + INSERT VALUES(s.id, s.val); + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- This will prune shards with restriction information as NOT MATCHED is void +BEGIN; +SET citus.log_remote_commands to true; + +MERGE INTO pg_target t +USING pg_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + DO NOTHING; + +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id AND s.id = 250 +WHEN MATCHED THEN + UPDATE SET val = t.val || 'Updated by Merge' +WHEN NOT MATCHED THEN + DO NOTHING; + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- Test CTE with distributed tables +CREATE VIEW pg_source_view AS SELECT * FROM pg_source WHERE id < 400; +CREATE VIEW citus_source_view AS SELECT * FROM citus_source WHERE id < 400; + +BEGIN; +SEt citus.log_remote_commands to true; + +WITH cte AS ( + SELECT * FROM pg_source_view +) +MERGE INTO pg_target t +USING cte +ON cte.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by CTE' +WHEN NOT MATCHED THEN + INSERT VALUES (cte.id, cte.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; + +WITH cte AS ( + SELECT * FROM citus_source_view +) +MERGE INTO citus_target t +USING cte +ON cte.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by CTE' +WHEN NOT MATCHED THEN + INSERT VALUES (cte.id, cte.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + + +-- Test sub-query with distributed tables +BEGIN; +SEt citus.log_remote_commands to true; + +MERGE INTO pg_target t +USING (SELECT * FROM pg_source) subq +ON subq.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by subquery' +WHEN NOT MATCHED THEN + INSERT VALUES (subq.id, subq.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; + +MERGE INTO citus_target t +USING (SELECT * FROM citus_source) subq +ON subq.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated by subquery' +WHEN NOT MATCHED THEN + INSERT VALUES (subq.id, subq.val) +WHEN MATCHED AND t.id < 350 THEN + DELETE; + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- Test PREPARE +PREPARE pg_prep(int) AS +MERGE INTO pg_target +USING (SELECT * FROM pg_source) sub +ON pg_target.id = sub.id AND pg_target.id = $1 +WHEN MATCHED THEN + UPDATE SET val = 'Updated by prepare using ' || sub.val +WHEN NOT MATCHED THEN + DO NOTHING; + +PREPARE citus_prep(int) AS +MERGE INTO citus_target +USING (SELECT * FROM citus_source) sub +ON citus_target.id = sub.id AND citus_target.id = $1 +WHEN MATCHED THEN + UPDATE SET val = 'Updated by prepare using ' || sub.val +WHEN NOT MATCHED THEN + DO NOTHING; + +BEGIN; +SET citus.log_remote_commands to true; + +SELECT * FROM pg_target WHERE id = 500; -- before merge +EXECUTE pg_prep(500); +SELECT * FROM pg_target WHERE id = 500; -- non-cached +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +EXECUTE pg_prep(500); +SELECT * FROM pg_target WHERE id = 500; -- cached + +SELECT * FROM citus_target WHERE id = 500; -- before merge +EXECUTE citus_prep(500); +SELECT * FROM citus_target WHERE id = 500; -- non-cached +EXECUTE citus_prep(500); +EXECUTE citus_prep(500); +EXECUTE citus_prep(500); +EXECUTE citus_prep(500); +EXECUTE citus_prep(500); +SELECT * FROM citus_target WHERE id = 500; -- cached + +SET citus.log_remote_commands to false; +SELECT compare_tables(); +ROLLBACK; + +-- Test partitions + distributed tables + +CREATE TABLE pg_pa_target (tid integer, balance float, val text) + PARTITION BY LIST (tid); +CREATE TABLE citus_pa_target (tid integer, balance float, val text) + PARTITION BY LIST (tid); + +CREATE TABLE part1 PARTITION OF pg_pa_target FOR VALUES IN (1,4) + WITH (autovacuum_enabled=off); +CREATE TABLE part2 PARTITION OF pg_pa_target FOR VALUES IN (2,5,6) + WITH (autovacuum_enabled=off); +CREATE TABLE part3 PARTITION OF pg_pa_target FOR VALUES IN (3,8,9) + WITH (autovacuum_enabled=off); +CREATE TABLE part4 PARTITION OF pg_pa_target DEFAULT + WITH (autovacuum_enabled=off); +CREATE TABLE part5 PARTITION OF citus_pa_target FOR VALUES IN (1,4) + WITH (autovacuum_enabled=off); +CREATE TABLE part6 PARTITION OF citus_pa_target FOR VALUES IN (2,5,6) + WITH (autovacuum_enabled=off); +CREATE TABLE part7 PARTITION OF citus_pa_target FOR VALUES IN (3,8,9) + WITH (autovacuum_enabled=off); +CREATE TABLE part8 PARTITION OF citus_pa_target DEFAULT + WITH (autovacuum_enabled=off); + +CREATE TABLE pg_pa_source (sid integer, delta float); +CREATE TABLE citus_pa_source (sid integer, delta float); + +-- insert many rows to the source table +INSERT INTO pg_pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +INSERT INTO citus_pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- insert a few rows in the target table (odd numbered tid) +INSERT INTO pg_pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id; +INSERT INTO citus_pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id; + +SELECT create_distributed_table('citus_pa_target', 'tid'); +SELECT create_distributed_table('citus_pa_source', 'sid'); + +CREATE OR REPLACE FUNCTION pa_compare_tables() RETURNS BOOLEAN AS $$ +DECLARE ret BOOL; +BEGIN +SELECT count(1) = 0 INTO ret + FROM pg_pa_target + FULL OUTER JOIN citus_pa_target + USING (tid, balance, val) + WHERE pg_pa_target.tid IS NULL + OR citus_pa_target.tid IS NULL; +RETURN ret; +END +$$ LANGUAGE PLPGSQL; + +-- try simple MERGE +BEGIN; +MERGE INTO pg_pa_target t + USING pg_pa_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); + +MERGE INTO citus_pa_target t + USING citus_pa_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); + +SELECT pa_compare_tables(); +ROLLBACK; + +-- same with a constant qual +BEGIN; +MERGE INTO pg_pa_target t + USING pg_pa_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); + +MERGE INTO citus_pa_target t + USING citus_pa_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge'); + +SELECT pa_compare_tables(); +ROLLBACK; + -- -- Error and Unsupported scenarios -- +-- try updating the distribution key column +BEGIN; +MERGE INTO target_cj t + USING source_cj1 s + ON t.tid = s.sid1 AND t.tid = 2 + WHEN MATCHED THEN + UPDATE SET tid = tid + 9, src = src || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid1, 'inserted by merge', val1); +ROLLBACK; + -- Foreign table as target MERGE INTO foreign_table USING ft_target ON (foreign_table.id = ft_target.id) @@ -854,6 +1458,38 @@ MERGE INTO t1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1.id, s1.val); +-- Now both s1 and t1 are distributed tables +SELECT undistribute_table('t1'); +SELECT create_distributed_table('t1', 'id'); + +-- We have a potential pitfall where a function can be invoked in +-- the MERGE conditions which can insert/update to a random shard +CREATE OR REPLACE function merge_when_and_write() RETURNS BOOLEAN +LANGUAGE PLPGSQL AS +$$ +BEGIN + INSERT INTO t1 VALUES (100, 100); + RETURN TRUE; +END; +$$; + +-- Test preventing "ON" join condition from writing to the database +BEGIN; +MERGE INTO t1 +USING s1 ON t1.id = s1.id AND t1.id = 2 AND (merge_when_and_write()) +WHEN MATCHED THEN + UPDATE SET val = t1.val + s1.val; +ROLLBACK; + +-- Test preventing WHEN clause(s) from writing to the database +BEGIN; +MERGE INTO t1 +USING s1 ON t1.id = s1.id AND t1.id = 2 +WHEN MATCHED AND (merge_when_and_write()) THEN + UPDATE SET val = t1.val + s1.val; +ROLLBACK; + + -- Joining on partition columns with sub-query MERGE INTO t1 USING (SELECT * FROM s1) sub ON (sub.val = t1.id) -- sub.val is not a distribution column @@ -997,6 +1633,104 @@ WHEN MATCHED THEN WHEN NOT MATCHED THEN INSERT VALUES(mv_source.id, mv_source.val); +-- Distributed tables *must* be colocated +CREATE TABLE dist_target(id int, val varchar); +SELECT create_distributed_table('dist_target', 'id'); +CREATE TABLE dist_source(id int, val varchar); +SELECT create_distributed_table('dist_source', 'id', colocate_with => 'none'); + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); + +-- Distributed tables *must* be joined on distribution column +CREATE TABLE dist_colocated(id int, val int); +SELECT create_distributed_table('dist_colocated', 'id', colocate_with => 'dist_target'); + +MERGE INTO dist_target +USING dist_colocated +ON dist_target.id = dist_colocated.val -- val is not the distribution column +WHEN MATCHED THEN +UPDATE SET val = dist_colocated.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_colocated.id, dist_colocated.val); + + +-- Both the source and target must be distributed +MERGE INTO dist_target +USING (SELECT 100 id) AS source +ON dist_target.id = source.id AND dist_target.val = 'const' +WHEN MATCHED THEN +UPDATE SET val = 'source' +WHEN NOT MATCHED THEN +INSERT VALUES(source.id, 'source'); + +-- Non-hash distributed tables (append/range). +CREATE VIEW show_tables AS +SELECT logicalrelid, partmethod +FROM pg_dist_partition +WHERE (logicalrelid = 'dist_target'::regclass) OR (logicalrelid = 'dist_source'::regclass) +ORDER BY 1; + +SELECT undistribute_table('dist_source'); +SELECT create_distributed_table('dist_source', 'id', 'append'); +SELECT * FROM show_tables; + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); + +SELECT undistribute_table('dist_source'); +SELECT create_distributed_table('dist_source', 'id', 'range'); +SELECT * FROM show_tables; + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); + +-- Both are append tables +SELECT undistribute_table('dist_target'); +SELECT undistribute_table('dist_source'); +SELECT create_distributed_table('dist_target', 'id', 'append'); +SELECT create_distributed_table('dist_source', 'id', 'append'); +SELECT * FROM show_tables; + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); + +-- Both are range tables +SELECT undistribute_table('dist_target'); +SELECT undistribute_table('dist_source'); +SELECT create_distributed_table('dist_target', 'id', 'range'); +SELECT create_distributed_table('dist_source', 'id', 'range'); +SELECT * FROM show_tables; + +MERGE INTO dist_target +USING dist_source +ON dist_target.id = dist_source.id +WHEN MATCHED THEN +UPDATE SET val = dist_source.val +WHEN NOT MATCHED THEN +INSERT VALUES(dist_source.id, dist_source.val); + DROP SERVER foreign_server CASCADE; +DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; SELECT 1 FROM master_remove_node('localhost', :master_port); diff --git a/src/test/regress/sql/pgmerge.sql b/src/test/regress/sql/pgmerge.sql index 6842f516a..83bf01a68 100644 --- a/src/test/regress/sql/pgmerge.sql +++ b/src/test/regress/sql/pgmerge.sql @@ -608,6 +608,14 @@ USING wq_source s ON t.tid = s.sid WHEN MATCHED AND (merge_when_and_write()) THEN UPDATE SET balance = t.balance + s.balance; ROLLBACK; + +-- Test preventing ON condition from writing to the database +BEGIN; +MERGE INTO wq_target t +USING wq_source s ON t.tid = s.sid AND (merge_when_and_write()) +WHEN MATCHED THEN + UPDATE SET balance = t.balance + s.balance; +ROLLBACK; drop function merge_when_and_write(); DROP TABLE wq_target, wq_source;