From cf5513628151410d839a469f9fcafc1ead68f912 Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Tue, 31 Jan 2023 18:23:44 -0800 Subject: [PATCH] 1) Restrict MERGE command INSERT to the source's distribution column Fixes #6672 2) Move all MERGE related routines to a new file merge_planner.c 3) Make ConjunctionContainsColumnFilter() static again, and rearrange the code in MergeQuerySupported() 4) Restore the original format in the comments section. 5) Add big serial test. Implement latest set of comments --- .../distributed/planner/distributed_planner.c | 41 +- .../planner/fast_path_router_planner.c | 5 +- .../distributed/planner/merge_planner.c | 701 ++++++++++++++++++ .../planner/multi_physical_planner.c | 17 +- .../planner/multi_router_planner.c | 398 +--------- .../planner/query_pushdown_planning.c | 12 +- .../relation_restriction_equivalence.c | 68 +- src/include/distributed/distributed_planner.h | 6 - src/include/distributed/merge_planner.h | 26 + .../distributed/multi_router_planner.h | 13 +- .../relation_restriction_equivalence.h | 12 +- src/test/regress/create_schedule | 1 + src/test/regress/expected/merge.out | 444 +++++++++-- src/test/regress/expected/merge_arbitrary.out | 150 ++++ .../regress/expected/merge_arbitrary_0.out | 6 + .../expected/merge_arbitrary_create.out | 72 ++ .../expected/merge_arbitrary_create_0.out | 6 + src/test/regress/expected/pg15.out | 31 +- src/test/regress/expected/pgmerge.out | 12 +- src/test/regress/sql/merge.sql | 235 +++++- src/test/regress/sql/merge_arbitrary.sql | 133 ++++ .../regress/sql/merge_arbitrary_create.sql | 50 ++ src/test/regress/sql/pg15.sql | 23 +- src/test/regress/sql/pgmerge.sql | 6 +- src/test/regress/sql_schedule | 1 + 25 files changed, 1920 insertions(+), 549 deletions(-) create mode 100644 src/backend/distributed/planner/merge_planner.c create mode 100644 src/include/distributed/merge_planner.h create mode 100644 src/test/regress/expected/merge_arbitrary.out create mode 100644 src/test/regress/expected/merge_arbitrary_0.out create mode 100644 src/test/regress/expected/merge_arbitrary_create.out create mode 100644 src/test/regress/expected/merge_arbitrary_create_0.out create mode 100644 src/test/regress/sql/merge_arbitrary.sql create mode 100644 src/test/regress/sql/merge_arbitrary_create.sql diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 262258d7f..17b63ee0a 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -77,7 +77,7 @@ int PlannerLevel = 0; static bool ListContainsDistributedTableRTE(List *rangeTableList, bool *maybeHasForeignDistributedTable); -static bool IsUpdateOrDelete(Query *query); +static bool IsUpdateOrDeleteOrMerge(Query *query); static PlannedStmt * CreateDistributedPlannedStmt( DistributedPlanningContext *planContext); static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, @@ -153,7 +153,7 @@ distributed_planner(Query *parse, * We cannot have merge command for this path as well because * there cannot be recursively planned merge command. */ - Assert(!ContainsMergeCommandWalker((Node *) parse)); + Assert(!IsMergeQuery(parse)); needsDistributedPlanning = true; } @@ -295,39 +295,6 @@ distributed_planner(Query *parse, } -/* - * ContainsMergeCommandWalker walks over the node and finds if there are any - * Merge command (e.g., CMD_MERGE) in the node. - */ -bool -ContainsMergeCommandWalker(Node *node) -{ - #if PG_VERSION_NUM < PG_VERSION_15 - return false; - #endif - - if (node == NULL) - { - return false; - } - - if (IsA(node, Query)) - { - Query *query = (Query *) node; - if (IsMergeQuery(query)) - { - return true; - } - - return query_tree_walker((Query *) node, ContainsMergeCommandWalker, NULL, 0); - } - - return expression_tree_walker(node, ContainsMergeCommandWalker, NULL); - - return false; -} - - /* * ExtractRangeTableEntryList is a wrapper around ExtractRangeTableEntryWalker. * The function traverses the input query and returns all the range table @@ -631,7 +598,7 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan) * IsUpdateOrDelete returns true if the query performs an update or delete. */ bool -IsUpdateOrDelete(Query *query) +IsUpdateOrDeleteOrMerge(Query *query) { return query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE || @@ -809,7 +776,7 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) * if it is planned as a multi shard modify query. */ if ((distributedPlan->planningError || - (IsUpdateOrDelete(planContext->originalQuery) && IsMultiTaskPlan( + (IsUpdateOrDeleteOrMerge(planContext->originalQuery) && IsMultiTaskPlan( distributedPlan))) && hasUnresolvedParams) { diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index e7d91a101..ecb62478a 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -56,6 +56,9 @@ bool EnableFastPathRouterPlanner = true; static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey); static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, Node **distributionKeyValue); +static bool ConjunctionContainsColumnFilter(Node *node, + Var *column, + Node **distributionKeyValue); /* @@ -292,7 +295,7 @@ ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey) * * If the conjuction contains column filter which is const, distributionKeyValue is set. */ -bool +static bool ConjunctionContainsColumnFilter(Node *node, Var *column, Node **distributionKeyValue) { if (node == NULL) diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c new file mode 100644 index 000000000..03fd9e00d --- /dev/null +++ b/src/backend/distributed/planner/merge_planner.c @@ -0,0 +1,701 @@ +/*------------------------------------------------------------------------- + * + * merge_planner.c + * + * This file contains functions to help plan MERGE queries. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include + +#include "postgres.h" +#include "nodes/makefuncs.h" +#include "optimizer/optimizer.h" +#include "parser/parsetree.h" +#include "utils/lsyscache.h" + +#include "distributed/citus_clauses.h" +#include "distributed/listutils.h" +#include "distributed/merge_planner.h" +#include "distributed/multi_logical_optimizer.h" +#include "distributed/multi_router_planner.h" +#include "distributed/pg_version_constants.h" +#include "distributed/query_pushdown_planning.h" + +#if PG_VERSION_NUM >= PG_VERSION_15 + +static DeferredErrorMessage * CheckIfRTETypeIsUnsupported(Query *parse, + RangeTblEntry *rangeTableEntry); +static DeferredErrorMessage * ErrorIfDistTablesNotColocated(Query *parse, + List * + distTablesList, + PlannerRestrictionContext + * + plannerRestrictionContext); +static DeferredErrorMessage * ErrorIfMergeHasUnsupportedTables(Query *parse, + List *rangeTableList, + PlannerRestrictionContext * + restrictionContext); +static bool IsDistributionColumnInMergeSource(Expr *columnExpression, Query *query, bool + skipOuterVars); +static DeferredErrorMessage * InsertDistributionColumnMatchesSource(Query *query, + RangeTblEntry * + resultRte); + +static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid + resultRelationId, + FromExpr *joinTree, + Node *quals, + List *targetList, + CmdType commandType); +#endif + + +/* + * MergeQuerySupported does check for a MERGE command in the query, if it finds + * one, it will verify the below criteria + * - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables + * - Distributed tables requirements in ErrorIfDistTablesNotColocated + * - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported + */ +DeferredErrorMessage * +MergeQuerySupported(Query *originalQuery, bool multiShardQuery, + PlannerRestrictionContext *plannerRestrictionContext) +{ + /* function is void for pre-15 versions of Postgres */ + #if PG_VERSION_NUM < PG_VERSION_15 + + return NULL; + + #else + + /* For non-MERGE commands it's a no-op */ + if (!IsMergeQuery(originalQuery)) + { + return NULL; + } + + /* + * TODO: For now, we are adding an exception where any volatile or stable + * functions are not allowed in the MERGE query, but this will become too + * restrictive as this will prevent many useful and simple cases, such as, + * INSERT VALUES(ts::timestamp), bigserial column inserts etc. But without + * this restriction, we have a potential danger of some of the function(s) + * getting executed at the worker which will result in incorrect behavior. + */ + if (contain_mutable_functions((Node *) originalQuery)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "non-IMMUTABLE functions are not yet supported " + "in MERGE sql with distributed tables ", + NULL, NULL); + } + + List *rangeTableList = ExtractRangeTableEntryList(originalQuery); + RangeTblEntry *resultRte = ExtractResultRelationRTE(originalQuery); + + /* + * Fast path queries cannot have merge command, and we prevent the remaining here. + * In Citus we have limited support for MERGE, it's allowed only if all + * the tables(target, source or any CTE) tables are are local i.e. a + * combination of Citus local and Non-Citus tables (regular Postgres tables) + * or distributed tables with some restrictions, please see header of routine + * ErrorIfDistTablesNotColocated for details. + */ + DeferredErrorMessage *deferredError = + ErrorIfMergeHasUnsupportedTables(originalQuery, + rangeTableList, + plannerRestrictionContext); + if (deferredError) + { + /* MERGE's unsupported combination, raise the exception */ + RaiseDeferredError(deferredError, ERROR); + } + + Oid resultRelationId = resultRte->relid; + deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId, + originalQuery->jointree, + originalQuery->jointree-> + quals, + originalQuery->targetList, + originalQuery->commandType); + if (deferredError) + { + return deferredError; + } + + /* + * MERGE is a special case where we have multiple modify statements + * within itself. Check each INSERT/UPDATE/DELETE individually. + */ + MergeAction *action = NULL; + foreach_ptr(action, originalQuery->mergeActionList) + { + Assert(originalQuery->returningList == NULL); + deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId, + originalQuery->jointree, + action->qual, + action->targetList, + action->commandType); + if (deferredError) + { + /* MERGE's unsupported scenario, raise the exception */ + RaiseDeferredError(deferredError, ERROR); + } + } + + deferredError = + InsertDistributionColumnMatchesSource(originalQuery, resultRte); + if (deferredError) + { + /* MERGE's unsupported scenario, raise the exception */ + RaiseDeferredError(deferredError, ERROR); + } + + if (multiShardQuery) + { + deferredError = + DeferErrorIfUnsupportedSubqueryPushdown(originalQuery, + plannerRestrictionContext); + if (deferredError) + { + return deferredError; + } + } + + if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "a join with USING causes an internal naming " + "conflict, use ON instead", NULL, NULL); + } + + return NULL; + + #endif +} + + +/* + * IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is + * permitted on special relations, such as materialized view, returns true only if + * it's a "source" relation. + */ +bool +IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte) +{ + if (!IsMergeQuery(parse)) + { + return false; + } + + /* Fetch the MERGE target relation */ + RangeTblEntry *targetRte = rt_fetch(parse->resultRelation, parse->rtable); + + /* Is it a target relation? */ + if (targetRte->relid == rte->relid) + { + return false; + } + + return true; +} + + +#if PG_VERSION_NUM >= PG_VERSION_15 + +/* + * ErrorIfDistTablesNotColocated Checks to see if + * + * - There are a minimum of two distributed tables (source and a target). + * - All the distributed tables are indeed colocated. + * + * If any of the conditions are not met, it raises an exception. + */ +static DeferredErrorMessage * +ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList, + PlannerRestrictionContext * + plannerRestrictionContext) +{ + /* All MERGE tables must be distributed */ + if (list_length(distTablesList) < 2) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "For MERGE command, both the source and target " + "must be distributed", NULL, NULL); + } + + /* All distributed tables must be colocated */ + if (!AllRelationsInRTEListColocated(distTablesList)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "For MERGE command, all the distributed tables " + "must be colocated", NULL, NULL); + } + + return NULL; +} + + +/* + * ErrorIfRTETypeIsUnsupported Checks for types of tables that are not supported, such + * as, reference tables, append-distributed tables and materialized view as target relation. + * Routine returns NULL for the supported types, error message for everything else. + */ +static DeferredErrorMessage * +CheckIfRTETypeIsUnsupported(Query *parse, RangeTblEntry *rangeTableEntry) +{ + if (rangeTableEntry->relkind == RELKIND_MATVIEW || + rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE) + { + /* Materialized view or Foreign table as target is not allowed */ + if (IsMergeAllowedOnRelation(parse, rangeTableEntry)) + { + /* Non target relation is ok */ + return NULL; + } + else + { + /* Usually we don't reach this exception as the Postgres parser catches it */ + StringInfo errorMessage = makeStringInfo(); + appendStringInfo(errorMessage, "MERGE command is not allowed on " + "relation type(relkind:%c)", + rangeTableEntry->relkind); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + errorMessage->data, NULL, NULL); + } + } + + if (rangeTableEntry->relkind != RELKIND_RELATION && + rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE) + { + StringInfo errorMessage = makeStringInfo(); + appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) " + "in MERGE command", rangeTableEntry->relkind); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + errorMessage->data, NULL, NULL); + } + + Assert(rangeTableEntry->relid != 0); + + /* Reference tables are not supported yet */ + if (IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command is not supported on reference " + "tables yet", NULL, NULL); + } + + /* Append/Range tables are not supported */ + if (IsCitusTableType(rangeTableEntry->relid, APPEND_DISTRIBUTED) || + IsCitusTableType(rangeTableEntry->relid, RANGE_DISTRIBUTED)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "For MERGE command, all the distributed tables " + "must be colocated, for append/range distribution, " + "colocation is not supported", NULL, + "Consider using hash distribution instead"); + } + + return NULL; +} + + +/* + * ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE + * present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus + * tables (regular Postgres tables), or distributed tables with some restrictions, please + * see header of routine ErrorIfDistTablesNotColocated for details, raises an exception + * for all other combinations. + */ +static DeferredErrorMessage * +ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, + PlannerRestrictionContext *restrictionContext) +{ + List *distTablesList = NIL; + bool foundLocalTables = false; + + RangeTblEntry *rangeTableEntry = NULL; + foreach_ptr(rangeTableEntry, rangeTableList) + { + Oid relationId = rangeTableEntry->relid; + + switch (rangeTableEntry->rtekind) + { + case RTE_RELATION: + { + /* Check the relation type */ + break; + } + + case RTE_SUBQUERY: + case RTE_FUNCTION: + case RTE_TABLEFUNC: + case RTE_VALUES: + case RTE_JOIN: + case RTE_CTE: + { + /* Skip them as base table(s) will be checked */ + continue; + } + + /* + * RTE_NAMEDTUPLESTORE is typically used in ephmeral named relations, + * such as, trigger data; until we find a genuine use case, raise an + * exception. + * RTE_RESULT is a node added by the planner and we shouldn't + * encounter it in the parse tree. + */ + case RTE_NAMEDTUPLESTORE: + case RTE_RESULT: + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command is not supported with " + "Tuplestores and results", + NULL, NULL); + } + + default: + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command: Unrecognized range table entry.", + NULL, NULL); + } + } + + /* RTE Relation can be of various types, check them now */ + + /* skip the regular views as they are replaced with subqueries */ + if (rangeTableEntry->relkind == RELKIND_VIEW) + { + continue; + } + + DeferredErrorMessage *errorMessage = + CheckIfRTETypeIsUnsupported(parse, rangeTableEntry); + if (errorMessage) + { + return errorMessage; + } + + /* + * For now, save all distributed tables, later (below) we will + * check for supported combination(s). + */ + if (IsCitusTableType(relationId, DISTRIBUTED_TABLE)) + { + distTablesList = lappend(distTablesList, rangeTableEntry); + continue; + } + + /* Regular Postgres tables and Citus local tables are allowed */ + if (!IsCitusTable(relationId) || + IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) + { + foundLocalTables = true; + continue; + } + + /* Any other Citus table type missing ? */ + } + + /* Ensure all tables are indeed local */ + if (foundLocalTables && list_length(distTablesList) == 0) + { + /* All the tables are local, supported */ + return NULL; + } + else if (foundLocalTables && list_length(distTablesList) > 0) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command is not supported with " + "combination of distributed/local tables yet", + NULL, NULL); + } + + /* Ensure all distributed tables are indeed co-located */ + return ErrorIfDistTablesNotColocated(parse, + distTablesList, + restrictionContext); +} + + +/* + * IsPartitionColumnInMerge returns true if the given column is a partition column. + * The function uses FindReferencedTableColumn to find the original relation + * id and column that the column expression refers to. It then checks whether + * that column is a partition column of the relation. + * + * Also, the function returns always false for reference tables given that + * reference tables do not have partition column. + * + * If skipOuterVars is true, then it doesn't process the outervars. + */ +bool +IsDistributionColumnInMergeSource(Expr *columnExpression, Query *query, bool + skipOuterVars) +{ + bool isDistributionColumn = false; + Var *column = NULL; + RangeTblEntry *relationRTE = NULL; + + /* ParentQueryList is same as the original query for MERGE */ + FindReferencedTableColumn(columnExpression, list_make1(query), query, &column, + &relationRTE, + skipOuterVars); + Oid relationId = relationRTE ? relationRTE->relid : InvalidOid; + if (relationId != InvalidOid && column != NULL) + { + Var *distributionColumn = DistPartitionKey(relationId); + + /* not all distributed tables have partition column */ + if (distributionColumn != NULL && column->varattno == + distributionColumn->varattno) + { + isDistributionColumn = true; + } + } + + return isDistributionColumn; +} + + +/* + * InsertDistributionColumnMatchesSource check to see if MERGE is inserting a + * value into the target which is not from the source table, if so, it + * raises an exception. + * Note: Inserting random values other than the joined column values will + * result in unexpected behaviour of rows ending up in incorrect shards, to + * prevent such mishaps, we disallow such inserts here. + */ +static DeferredErrorMessage * +InsertDistributionColumnMatchesSource(Query *query, RangeTblEntry *resultRte) +{ + Assert(IsMergeQuery(query)); + + if (!IsCitusTableType(resultRte->relid, DISTRIBUTED_TABLE)) + { + return NULL; + } + + bool foundDistributionColumn = false; + MergeAction *action = NULL; + foreach_ptr(action, query->mergeActionList) + { + /* Skip MATCHED clause as INSERTS are not allowed in it*/ + if (action->matched) + { + continue; + } + + /* NOT MATCHED can have either INSERT or DO NOTHING */ + if (action->commandType == CMD_NOTHING) + { + return NULL; + } + + if (action->targetList == NIL) + { + /* INSERT DEFAULT VALUES is not allowed */ + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot perform MERGE INSERT with DEFAULTS", + NULL, NULL); + } + + Assert(action->commandType == CMD_INSERT); + Var *targetKey = PartitionColumn(resultRte->relid, 1); + + TargetEntry *targetEntry = NULL; + foreach_ptr(targetEntry, action->targetList) + { + AttrNumber originalAttrNo = targetEntry->resno; + + /* skip processing of target table non-partition columns */ + if (originalAttrNo != targetKey->varattno) + { + continue; + } + + foundDistributionColumn = true; + + if (IsA(targetEntry->expr, Var)) + { + if (IsDistributionColumnInMergeSource(targetEntry->expr, query, true)) + { + return NULL; + } + else + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE INSERT must use the source table " + "distribution column value", + NULL, NULL); + } + } + else + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE INSERT must refer a source column " + "for distribution column ", + NULL, NULL); + } + } + + if (!foundDistributionColumn) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE INSERT must have distribution column as value", + NULL, NULL); + } + } + + return NULL; +} + + +/* + * MergeQualAndTargetListFunctionsSupported Checks WHEN/ON clause actions to see what functions + * are allowed, if we are updating distribution column, etc. + */ +static DeferredErrorMessage * +MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, FromExpr *joinTree, + Node *quals, + List *targetList, CmdType commandType) +{ + uint32 rangeTableId = 1; + Var *distributionColumn = NULL; + if (IsCitusTable(resultRelationId) && HasDistributionKey(resultRelationId)) + { + distributionColumn = PartitionColumn(resultRelationId, rangeTableId); + } + + ListCell *targetEntryCell = NULL; + bool hasVarArgument = false; /* A STABLE function is passed a Var argument */ + bool hasBadCoalesce = false; /* CASE/COALESCE passed a mutable function */ + foreach(targetEntryCell, targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + + /* skip resjunk entries: UPDATE adds some for ctid, etc. */ + if (targetEntry->resjunk) + { + continue; + } + + bool targetEntryDistributionColumn = false; + AttrNumber targetColumnAttrNumber = InvalidAttrNumber; + + if (distributionColumn) + { + if (commandType == CMD_UPDATE) + { + /* + * Note that it is not possible to give an alias to + * UPDATE table SET ... + */ + if (targetEntry->resname) + { + targetColumnAttrNumber = get_attnum(resultRelationId, + targetEntry->resname); + if (targetColumnAttrNumber == distributionColumn->varattno) + { + targetEntryDistributionColumn = true; + } + } + } + } + + if (targetEntryDistributionColumn && + TargetEntryChangesValue(targetEntry, distributionColumn, joinTree)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "updating the distribution column is not " + "allowed in MERGE actions", + NULL, NULL); + } + + if (FindNodeMatchingCheckFunction((Node *) targetEntry->expr, + CitusIsVolatileFunction)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "functions used in MERGE actions on distributed " + "tables must not be VOLATILE", + NULL, NULL); + } + + if (MasterIrreducibleExpression((Node *) targetEntry->expr, + &hasVarArgument, &hasBadCoalesce)) + { + Assert(hasVarArgument || hasBadCoalesce); + } + + if (FindNodeMatchingCheckFunction((Node *) targetEntry->expr, + NodeIsFieldStore)) + { + /* DELETE cannot do field indirection already */ + Assert(commandType == CMD_UPDATE || commandType == CMD_INSERT); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "inserting or modifying composite type fields is not " + "supported", NULL, + "Use the column name to insert or update the composite " + "type as a single value"); + } + } + + + /* + * Check the condition, convert list of expressions into expression tree for further processing + */ + if (quals) + { + if (IsA(quals, List)) + { + quals = (Node *) make_ands_explicit((List *) quals); + } + + if (FindNodeMatchingCheckFunction((Node *) quals, CitusIsVolatileFunction)) + { + StringInfo errorMessage = makeStringInfo(); + appendStringInfo(errorMessage, "functions used in the %s clause of MERGE " + "queries on distributed tables must not be VOLATILE", + (commandType == CMD_MERGE) ? "ON" : "WHEN"); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + errorMessage->data, NULL, NULL); + } + else if (MasterIrreducibleExpression(quals, &hasVarArgument, &hasBadCoalesce)) + { + Assert(hasVarArgument || hasBadCoalesce); + } + } + + if (hasVarArgument) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "STABLE functions used in MERGE queries " + "cannot be called with column references", + NULL, NULL); + } + + if (hasBadCoalesce) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "non-IMMUTABLE functions are not allowed in CASE or " + "COALESCE statements", + NULL, NULL); + } + + if (quals != NULL && nodeTag(quals) == T_CurrentOfExpr) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot run MERGE actions with cursors", + NULL, NULL); + } + + return NULL; +} + + +#endif diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index b30dddeb7..be6caf0e2 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2225,17 +2225,14 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, } /* - * For left joins we don't care about the shards pruned for - * the right hand side. If the right hand side would prune - * to a smaller set we should still send it to all tables - * of the left hand side. However if the right hand side is - * bigger than the left hand side we don't have to send the - * query to any shard that is not matching anything on the - * left hand side. + * For left joins we don't care about the shards pruned for the right hand side. + * If the right hand side would prune to a smaller set we should still send it to + * all tables of the left hand side. However if the right hand side is bigger than + * the left hand side we don't have to send the query to any shard that is not + * matching anything on the left hand side. * - * Instead we will simply skip any RelationRestriction if it - * is an OUTER join and the table is part of the non-outer - * side of the join. + * Instead we will simply skip any RelationRestriction if it is an OUTER join and + * the table is part of the non-outer side of the join. */ if (IsInnerTableOfOuterJoin(relationRestriction)) { diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 99beff2c8..407aeaf65 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -33,6 +33,7 @@ #include "distributed/intermediate_result_pruning.h" #include "distributed/metadata_utility.h" #include "distributed/coordinator_protocol.h" +#include "distributed/merge_planner.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" #include "distributed/multi_join_order.h" @@ -125,21 +126,15 @@ static bool IsTidColumn(Node *node); static DeferredErrorMessage * ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, Oid *distributedTableId); -static bool NodeIsFieldStore(Node *node); -static DeferredErrorMessage * MultiShardUpdateDeleteMergeSupported(Query *originalQuery, - PlannerRestrictionContext - * - plannerRestrictionContext); +static DeferredErrorMessage * MultiShardUpdateDeleteSupported(Query *originalQuery, + PlannerRestrictionContext + * + plannerRestrictionContext); static DeferredErrorMessage * SingleShardUpdateDeleteSupported(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); -static bool HasDangerousJoinUsing(List *rtableList, Node *jtnode); -static bool MasterIrreducibleExpression(Node *expression, bool *varArgument, - bool *badCoalesce); static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state); static bool MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context); -static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, - FromExpr *joinTree); static Job * RouterInsertJob(Query *originalQuery); static void ErrorIfNoShardsExist(CitusTableCacheEntry *cacheEntry); static DeferredErrorMessage * DeferErrorIfModifyView(Query *queryTree); @@ -179,12 +174,8 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList); static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList); static bool IsLocallyAccessibleCitusLocalTable(Oid relationId); -static DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId, - FromExpr *joinTree, - Node *quals, - List *targetList, - CmdType commandType, - List *returningList); + + /* * CreateRouterPlan attempts to create a router executor plan for the given * SELECT statement. ->planningError is set if planning fails. @@ -521,7 +512,7 @@ IsTidColumn(Node *node) * updating distribution column, etc. * Note: This subset of checks are repeated for each MERGE modify action. */ -static DeferredErrorMessage * +DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId, FromExpr *joinTree, Node *quals, List *targetList, CmdType commandType, List *returningList) @@ -897,92 +888,13 @@ IsLocallyAccessibleCitusLocalTable(Oid relationId) /* * NodeIsFieldStore returns true if given Node is a FieldStore object. */ -static bool +bool NodeIsFieldStore(Node *node) { return node && IsA(node, FieldStore); } -/* - * MergeQuerySupported does check for a MERGE command in the query, if it finds - * one, it will verify the below criteria - * - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables - * - Distributed tables requirements in ErrorIfDistTablesNotColocated - * - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported - */ -static DeferredErrorMessage * -MergeQuerySupported(Query *originalQuery, - PlannerRestrictionContext *plannerRestrictionContext) -{ - /* For non-MERGE commands it's a no-op */ - if (!QueryHasMergeCommand(originalQuery)) - { - return NULL; - } - - List *rangeTableList = ExtractRangeTableEntryList(originalQuery); - RangeTblEntry *resultRte = ExtractResultRelationRTE(originalQuery); - - /* - * Fast path queries cannot have merge command, and we prevent the remaining here. - * In Citus we have limited support for MERGE, it's allowed only if all - * the tables(target, source or any CTE) tables are are local i.e. a - * combination of Citus local and Non-Citus tables (regular Postgres tables) - * or distributed tables with some restrictions, please see header of routine - * ErrorIfDistTablesNotColocated for details. - */ - DeferredErrorMessage *deferredError = - ErrorIfMergeHasUnsupportedTables(originalQuery, - rangeTableList, - plannerRestrictionContext); - if (deferredError) - { - return deferredError; - } - - Oid resultRelationId = resultRte->relid; - deferredError = - TargetlistAndFunctionsSupported(resultRelationId, - originalQuery->jointree, - originalQuery->jointree->quals, - originalQuery->targetList, - originalQuery->commandType, - originalQuery->returningList); - if (deferredError) - { - return deferredError; - } - - #if PG_VERSION_NUM >= PG_VERSION_15 - - /* - * MERGE is a special case where we have multiple modify statements - * within itself. Check each INSERT/UPDATE/DELETE individually. - */ - MergeAction *action = NULL; - foreach_ptr(action, originalQuery->mergeActionList) - { - Assert(originalQuery->returningList == NULL); - deferredError = - TargetlistAndFunctionsSupported(resultRelationId, - originalQuery->jointree, - action->qual, - action->targetList, - action->commandType, - originalQuery->returningList); - if (deferredError) - { - return deferredError; - } - } - - #endif - - return NULL; -} - - /* * ModifyQuerySupported returns NULL if the query only contains supported * features, otherwise it returns an error description. @@ -998,14 +910,11 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer PlannerRestrictionContext *plannerRestrictionContext) { Oid distributedTableId = InvalidOid; - DeferredErrorMessage *error = MergeQuerySupported(originalQuery, + DeferredErrorMessage *error = MergeQuerySupported(originalQuery, multiShardQuery, plannerRestrictionContext); if (error) { - /* - * For MERGE, we do not do recursive plannning, simply bail out. - */ - RaiseDeferredError(error, ERROR); + return error; } error = ModifyPartialQuerySupported(queryTree, multiShardQuery, &distributedTableId); @@ -1178,13 +1087,13 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer } } - if (commandType != CMD_INSERT) + if (commandType != CMD_INSERT && commandType != CMD_MERGE) { DeferredErrorMessage *errorMessage = NULL; if (multiShardQuery) { - errorMessage = MultiShardUpdateDeleteMergeSupported( + errorMessage = MultiShardUpdateDeleteSupported( originalQuery, plannerRestrictionContext); } @@ -1365,12 +1274,12 @@ ErrorIfOnConflictNotSupported(Query *queryTree) /* - * MultiShardUpdateDeleteMergeSupported returns the error message if the update/delete is + * MultiShardUpdateDeleteSupported returns the error message if the update/delete is * not pushdownable, otherwise it returns NULL. */ static DeferredErrorMessage * -MultiShardUpdateDeleteMergeSupported(Query *originalQuery, - PlannerRestrictionContext *plannerRestrictionContext) +MultiShardUpdateDeleteSupported(Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext) { DeferredErrorMessage *errorMessage = NULL; RangeTblEntry *resultRangeTable = ExtractResultRelationRTE(originalQuery); @@ -1401,8 +1310,9 @@ MultiShardUpdateDeleteMergeSupported(Query *originalQuery, } else { - errorMessage = DeferErrorIfUnsupportedSubqueryPushdown(originalQuery, - plannerRestrictionContext); + errorMessage = DeferErrorIfUnsupportedSubqueryPushdown( + originalQuery, + plannerRestrictionContext); } return errorMessage; @@ -1442,7 +1352,7 @@ SingleShardUpdateDeleteSupported(Query *originalQuery, * HasDangerousJoinUsing search jointree for unnamed JOIN USING. Check the * implementation of has_dangerous_join_using in ruleutils. */ -static bool +bool HasDangerousJoinUsing(List *rtableList, Node *joinTreeNode) { if (IsA(joinTreeNode, RangeTblRef)) @@ -1546,7 +1456,7 @@ IsMergeQuery(Query *query) * which do, but for now we just error out. That makes both the code and user-education * easier. */ -static bool +bool MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce) { WalkerState data; @@ -1694,7 +1604,7 @@ MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context) * expression is a value that is implied by the qualifiers of the join * tree, or the target entry sets a different column. */ -static bool +bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTree) { bool isColumnValueChanged = true; @@ -1965,8 +1875,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon if (*planningError) { /* - * For MERGE, we do _not_ plan anything other than Router job, let's - * not continue further down the lane in distributed planning, simply + * For MERGE, we do _not_ plan any other router job than the MERGE job itself, + * let's not continue further down the lane in distributed planning, simply * bail out. */ if (IsMergeQuery(originalQuery)) @@ -4056,263 +3966,3 @@ CompareInsertValuesByShardId(const void *leftElement, const void *rightElement) } } } - - -/* - * IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is - * permitted on special relations, such as materialized view, returns true only if - * it's a "source" relation. - */ -bool -IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte) -{ - if (!IsMergeQuery(parse)) - { - return false; - } - - RangeTblEntry *targetRte = rt_fetch(parse->resultRelation, parse->rtable); - - /* Is it a target relation? */ - if (targetRte->relid == rte->relid) - { - return false; - } - - return true; -} - - -/* - * ErrorIfDistTablesNotColocated Checks to see if - * - * - There are a minimum of two distributed tables (source and a target). - * - All the distributed tables are indeed colocated. - * - MERGE relations are joined on the distribution column - * MERGE .. USING .. ON target.dist_key = source.dist_key - * - * If any of the conditions are not met, it raises an exception. - */ -static DeferredErrorMessage * -ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList, - PlannerRestrictionContext *plannerRestrictionContext) -{ - /* All MERGE tables must be distributed */ - if (list_length(distTablesList) < 2) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "For MERGE command, both the source and target " - "must be distributed", NULL, NULL); - } - - /* All distributed tables must be colocated */ - if (!AllRelationsInListColocated(distTablesList, RANGETABLE_ENTRY)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "For MERGE command, all the distributed tables " - "must be colocated", NULL, NULL); - } - - /* Are source and target tables joined on distribution column? */ - if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE command is only supported when distributed " - "tables are joined on their distribution column", - NULL, NULL); - } - - return NULL; -} - - -/* - * ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE - * present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus - * tables (regular Postgres tables), or distributed tables with some restrictions, please - * see header of routine ErrorIfDistTablesNotColocated for details, raises an exception - * for all other combinations. - */ -static DeferredErrorMessage * -ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, - PlannerRestrictionContext *restrictionContext) -{ - List *distTablesList = NIL; - bool foundLocalTables = false; - - RangeTblEntry *rangeTableEntry = NULL; - foreach_ptr(rangeTableEntry, rangeTableList) - { - Oid relationId = rangeTableEntry->relid; - - switch (rangeTableEntry->rtekind) - { - case RTE_RELATION: - { - /* Check the relation type */ - break; - } - - case RTE_SUBQUERY: - case RTE_FUNCTION: - case RTE_TABLEFUNC: - case RTE_VALUES: - case RTE_JOIN: - case RTE_CTE: - { - /* Skip them as base table(s) will be checked */ - continue; - } - - /* - * RTE_NAMEDTUPLESTORE is typically used in ephmeral named relations, - * such as, trigger data; until we find a genuine use case, raise an - * exception. - * RTE_RESULT is a node added by the planner and we shouldn't - * encounter it in the parse tree. - */ - case RTE_NAMEDTUPLESTORE: - case RTE_RESULT: - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE command is not supported with " - "Tuplestores and results", - NULL, NULL); - } - - default: - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE command: Unrecognized range table entry.", - NULL, NULL); - } - } - - /* RTE Relation can be of various types, check them now */ - - /* skip the regular views as they are replaced with subqueries */ - if (rangeTableEntry->relkind == RELKIND_VIEW) - { - continue; - } - - if (rangeTableEntry->relkind == RELKIND_MATVIEW || - rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE) - { - /* Materialized view or Foreign table as target is not allowed */ - if (IsMergeAllowedOnRelation(parse, rangeTableEntry)) - { - /* Non target relation is ok */ - continue; - } - else - { - /* Usually we don't reach this exception as the Postgres parser catches it */ - StringInfo errorMessage = makeStringInfo(); - appendStringInfo(errorMessage, - "MERGE command is not allowed on " - "relation type(relkind:%c)", rangeTableEntry->relkind); - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, - NULL, NULL); - } - } - - if (rangeTableEntry->relkind != RELKIND_RELATION && - rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE) - { - StringInfo errorMessage = makeStringInfo(); - appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) " - "in MERGE command", rangeTableEntry->relkind); - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, - NULL, NULL); - } - - Assert(rangeTableEntry->relid != 0); - - /* Reference tables are not supported yet */ - if (IsCitusTableType(relationId, REFERENCE_TABLE)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE command is not supported on reference " - "tables yet", NULL, NULL); - } - - /* Append/Range tables are not supported */ - if (IsCitusTableType(relationId, APPEND_DISTRIBUTED) || - IsCitusTableType(relationId, RANGE_DISTRIBUTED)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "For MERGE command, all the distributed tables " - "must be colocated, for append/range distribution, " - "colocation is not supported", NULL, - "Consider using hash distribution instead"); - } - - /* - * For now, save all distributed tables, later (below) we will - * check for supported combination(s). - */ - if (IsCitusTableType(relationId, DISTRIBUTED_TABLE)) - { - distTablesList = lappend(distTablesList, rangeTableEntry); - continue; - } - - /* Regular Postgres tables and Citus local tables are allowed */ - if (!IsCitusTable(relationId) || - IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) - { - foundLocalTables = true; - continue; - } - - /* Any other Citus table type missing ? */ - } - - /* Ensure all tables are indeed local */ - if (foundLocalTables && list_length(distTablesList) == 0) - { - /* All the tables are local, supported */ - return NULL; - } - else if (foundLocalTables && list_length(distTablesList) > 0) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE command is not supported with " - "combination of distributed/local tables yet", - NULL, NULL); - } - - /* Ensure all distributed tables are indeed co-located */ - return ErrorIfDistTablesNotColocated(parse, distTablesList, restrictionContext); -} - - -/* - * QueryHasMergeCommand walks over the query tree and returns false if there - * is no Merge command (e.g., CMD_MERGE), true otherwise. - */ -static bool -QueryHasMergeCommand(Query *queryTree) -{ - /* function is void for pre-15 versions of Postgres */ - #if PG_VERSION_NUM < PG_VERSION_15 - return false; - #else - - /* - * Postgres currently doesn't support Merge queries inside subqueries and - * ctes, but lets be defensive and do query tree walk anyway. - * - * We do not call this path for fast-path queries to avoid this additional - * overhead. - */ - if (!ContainsMergeCommandWalker((Node *) queryTree)) - { - /* No MERGE found */ - return false; - } - - return true; - #endif -} diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 5cae19497..cbe6a3606 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -591,10 +591,16 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery, } else if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext)) { + StringInfo errorMessage = makeStringInfo(); + bool isMergeCmd = IsMergeQuery(originalQuery); + appendStringInfo(errorMessage, + "%s" + "only supported when all distributed tables are " + "co-located and joined on their distribution columns", + isMergeCmd ? "MERGE command is " : "complex joins are "); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "complex joins are only supported when all distributed tables are " - "co-located and joined on their distribution columns", - NULL, NULL); + errorMessage->data, NULL, NULL); } /* we shouldn't allow reference tables in the FROM clause when the query has sublinks */ diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index f76a95d26..5c91ee79c 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -151,6 +151,9 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass secondClass); static Var * PartitionKeyForRTEIdentityInQuery(Query *query, int targetRTEIndex, Index *partitionKeyIndex); +static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext * + restrictionContext); +static bool AllRelationsInListColocated(List *relationList); static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node); static JoinRestrictionContext * FilterJoinRestrictionContext( JoinRestrictionContext *joinRestrictionContext, Relids @@ -381,8 +384,7 @@ SafeToPushdownUnionSubquery(Query *originalQuery, return false; } - if (!AllRelationsInListColocated(restrictionContext->relationRestrictionList, - RESTRICTION_CONTEXT)) + if (!AllRelationsInRestrictionContextColocated(restrictionContext)) { /* distribution columns are equal, but tables are not co-located */ return false; @@ -1918,34 +1920,56 @@ FindQueryContainingRTEIdentityInternal(Node *node, /* - * AllRelationsInListColocated determines whether all of the relations in the - * given list are co-located. - * Note: The list can be of dofferent types, which is specified by ListEntryType + * AllRelationsInRestrictionContextColocated determines whether all of the relations in the + * given relation restrictions list are co-located. */ -bool -AllRelationsInListColocated(List *relationList, ListEntryType entryType) +static bool +AllRelationsInRestrictionContextColocated(RelationRestrictionContext *restrictionContext) { - void *varPtr = NULL; - RangeTblEntry *rangeTableEntry = NULL; RelationRestriction *relationRestriction = NULL; - int initialColocationId = INVALID_COLOCATION_ID; + List *relationIdList = NIL; /* check whether all relations exists in the main restriction list */ - foreach_ptr(varPtr, relationList) + foreach_ptr(relationRestriction, restrictionContext->relationRestrictionList) { - Oid relationId = InvalidOid; + relationIdList = lappend_oid(relationIdList, relationRestriction->relationId); + } - if (entryType == RANGETABLE_ENTRY) - { - rangeTableEntry = (RangeTblEntry *) varPtr; - relationId = rangeTableEntry->relid; - } - else if (entryType == RESTRICTION_CONTEXT) - { - relationRestriction = (RelationRestriction *) varPtr; - relationId = relationRestriction->relationId; - } + return AllRelationsInListColocated(relationIdList); +} + +/* + * AllRelationsInRTEListColocated determines whether all of the relations in the + * given RangeTableEntry list are co-located. + */ +bool +AllRelationsInRTEListColocated(List *rangeTableEntryList) +{ + RangeTblEntry *rangeTableEntry = NULL; + List *relationIdList = NIL; + + foreach_ptr(rangeTableEntry, rangeTableEntryList) + { + relationIdList = lappend_oid(relationIdList, rangeTableEntry->relid); + } + + return AllRelationsInListColocated(relationIdList); +} + + +/* + * AllRelationsInListColocated determines whether all of the relations in the + * given list are co-located. + */ +static bool +AllRelationsInListColocated(List *relationList) +{ + int initialColocationId = INVALID_COLOCATION_ID; + Oid relationId = InvalidOid; + + foreach_oid(relationId, relationList) + { if (IsCitusTable(relationId) && !HasDistributionKey(relationId)) { continue; diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 19bd9f0c2..412859449 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -255,10 +255,4 @@ extern struct DistributedPlan * CreateDistributedPlan(uint64 planId, PlannerRestrictionContext * plannerRestrictionContext); -extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte); -extern bool ConjunctionContainsColumnFilter(Node *node, - Var *column, - Node **distributionKeyValue); -extern bool ContainsMergeCommandWalker(Node *node); - #endif /* DISTRIBUTED_PLANNER_H */ diff --git a/src/include/distributed/merge_planner.h b/src/include/distributed/merge_planner.h new file mode 100644 index 000000000..243be14d0 --- /dev/null +++ b/src/include/distributed/merge_planner.h @@ -0,0 +1,26 @@ +/*------------------------------------------------------------------------- + * + * merge_planner.h + * + * Declarations for public functions and types related to router planning. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef MERGE_PLANNER_H +#define MERGE_PLANNER_H + +#include "c.h" + +#include "nodes/parsenodes.h" +#include "distributed/distributed_planner.h" +#include "distributed/errormessage.h" + +extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte); +extern DeferredErrorMessage * MergeQuerySupported(Query *originalQuery, + bool multiShardQuery, + PlannerRestrictionContext * + plannerRestrictionContext); +#endif /* MERGE_PLANNER_H */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 07d160865..698a0fd60 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -100,6 +100,17 @@ extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamLi extern bool FastPathRouterQuery(Query *query, Node **distributionKeyValue); extern bool JoinConditionIsOnFalse(List *relOptInfo); extern Oid ResultRelationOidForQuery(Query *query); - +extern DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId, + FromExpr *joinTree, + Node *quals, + List *targetList, + CmdType commandType, + List *returningList); +extern bool NodeIsFieldStore(Node *node); +extern bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, + FromExpr *joinTree); +extern bool MasterIrreducibleExpression(Node *expression, bool *varArgument, + bool *badCoalesce); +extern bool HasDangerousJoinUsing(List *rtableList, Node *jtnode); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index 4fd9c7015..e0e716c7e 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -17,15 +17,6 @@ #define SINGLE_RTE_INDEX 1 -/* - * Represents the pointer type that's being passed in the list. - */ -typedef enum ListEntryType -{ - RANGETABLE_ENTRY, /* RangeTblEntry */ - RESTRICTION_CONTEXT /* RelationRestriction */ -} ListEntryType; - extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); @@ -63,6 +54,5 @@ extern RelationRestrictionContext * FilterRelationRestrictionContext( RelationRestrictionContext *relationRestrictionContext, Relids queryRteIdentities); -extern bool AllRelationsInListColocated(List *relationList, ListEntryType entryType); - +extern bool AllRelationsInRTEListColocated(List *rangeTableEntryList); #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */ diff --git a/src/test/regress/create_schedule b/src/test/regress/create_schedule index 82dfa2475..db2ae92be 100644 --- a/src/test/regress/create_schedule +++ b/src/test/regress/create_schedule @@ -13,3 +13,4 @@ test: arbitrary_configs_truncate_create test: arbitrary_configs_truncate_cascade_create test: arbitrary_configs_truncate_partition_create test: arbitrary_configs_alter_table_add_constraint_without_name_create +test: merge_arbitrary_create diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 02671acd0..e2b3aea65 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -17,8 +17,9 @@ CREATE SCHEMA merge_schema; SET search_path TO merge_schema; SET citus.shard_count TO 4; SET citus.next_shard_id TO 4000000; -SET citus.explain_all_tasks to true; +SET citus.explain_all_tasks TO true; SET citus.shard_replication_factor TO 1; +SET citus.max_adaptive_executor_pool_size TO 1; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata ?column? @@ -268,6 +269,29 @@ SELECT * from target t WHERE t.customer_id = 30004; --------------------------------------------------------------------- (0 rows) +-- Updating distribution column is allowed if the operation is a no-op +SELECT * from target t WHERE t.customer_id = 30000; + customer_id | last_order_id | order_center | order_count | last_order +--------------------------------------------------------------------- + 30000 | 101 | WX | 123 | Sat Jan 01 00:00:00 2022 +(1 row) + +MERGE INTO target t +USING SOURCE s +ON (t.customer_id = s.customer_id AND t.customer_id = 30000) +WHEN MATCHED THEN + UPDATE SET customer_id = 30000; +MERGE INTO target t +USING SOURCE s +ON (t.customer_id = s.customer_id AND t.customer_id = 30000) +WHEN MATCHED THEN + UPDATE SET customer_id = t.customer_id; +SELECT * from target t WHERE t.customer_id = 30000; + customer_id | last_order_id | order_center | order_count | last_order +--------------------------------------------------------------------- + 30000 | 101 | WX | 123 | Sat Jan 01 00:00:00 2022 +(1 row) + -- -- Test MERGE with CTE as source -- @@ -310,7 +334,6 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (pg_res.id, pg_res.val); --- Two rows with id 2 and val incremented, id 3, and id 1 is deleted SELECT * FROM t1 order by id; id | val --------------------------------------------------------------------- @@ -1200,7 +1223,8 @@ END; $$ language plpgsql volatile; CREATE TABLE fn_target(id int, data varchar); MERGE INTO fn_target -USING (SELECT * FROM f_dist() f(id integer, source varchar)) as fn_source +--USING (SELECT * FROM f_dist() f(id integer, source varchar)) as fn_source +USING (SELECT id, source FROM dist_table) as fn_source ON fn_source.id = fn_target.id WHEN MATCHED THEN DO NOTHING @@ -1216,29 +1240,22 @@ SELECT citus_add_local_table_to_metadata('fn_target'); (1 row) -SELECT create_distributed_table('dist_table', 'id'); -NOTICE: Copying data from local table... -NOTICE: copying the data has completed -DETAIL: The local data in the table is no longer visible, but is still on disk. -HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.dist_table$$) - create_distributed_table +SELECT citus_add_local_table_to_metadata('dist_table'); + citus_add_local_table_to_metadata --------------------------------------------------------------------- (1 row) SET client_min_messages TO DEBUG1; MERGE INTO fn_target -USING (SELECT * FROM f_dist() f(id integer, source varchar)) as fn_source +--USING (SELECT * FROM f_dist() f(id integer, source varchar)) as fn_source +USING (SELECT id, source FROM dist_table) as fn_source ON fn_source.id = fn_target.id WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(fn_source.id, fn_source.source); -DEBUG: function does not have co-located tables -DEBUG: generating subplan XXX_1 for subquery SELECT id, source FROM merge_schema.f_dist() f(id integer, source character varying) -DEBUG: -DEBUG: Plan XXX query after replacing subqueries and CTEs: MERGE INTO merge_schema.fn_target USING (SELECT intermediate_result.id, intermediate_result.source FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source) -DEBUG: +DEBUG: RESET client_min_messages; SELECT * INTO fn_local FROM fn_target ORDER BY 1 ; -- Should be equal @@ -1959,7 +1976,7 @@ ON pg_target.id = sub.id AND pg_target.id = $1 WHEN MATCHED THEN UPDATE SET val = 'Updated by prepare using ' || sub.val WHEN NOT MATCHED THEN - DO NOTHING; + INSERT VALUES (sub.id, sub.val); PREPARE citus_prep(int) AS MERGE INTO citus_target USING (SELECT * FROM citus_source) sub @@ -1967,15 +1984,20 @@ ON citus_target.id = sub.id AND citus_target.id = $1 WHEN MATCHED THEN UPDATE SET val = 'Updated by prepare using ' || sub.val WHEN NOT MATCHED THEN - DO NOTHING; + INSERT VALUES (sub.id, sub.val); BEGIN; -SET citus.log_remote_commands to true; SELECT * FROM pg_target WHERE id = 500; -- before merge id | val --------------------------------------------------------------------- 500 | target (1 row) +SELECT count(*) FROM pg_target; -- before merge + count +--------------------------------------------------------------------- + 251 +(1 row) + EXECUTE pg_prep(500); SELECT * FROM pg_target WHERE id = 500; -- non-cached id | val @@ -1994,18 +2016,33 @@ SELECT * FROM pg_target WHERE id = 500; -- cached 500 | Updated by prepare using source (1 row) +SELECT count(*) FROM pg_target; -- cached + count +--------------------------------------------------------------------- + 3245 +(1 row) + SELECT * FROM citus_target WHERE id = 500; -- before merge -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT id, val FROM merge_schema.citus_target_xxxxxxx citus_target WHERE (id OPERATOR(pg_catalog.=) 500) -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx id | val --------------------------------------------------------------------- 500 | target (1 row) +SELECT count(*) FROM citus_target; -- before merge + count +--------------------------------------------------------------------- + 251 +(1 row) + +SET citus.log_remote_commands to true; EXECUTE citus_prep(500); -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SELECT * FROM citus_target WHERE id = 500; -- non-cached NOTICE: issuing SELECT id, val FROM merge_schema.citus_target_xxxxxxx citus_target WHERE (id OPERATOR(pg_catalog.=) 500) @@ -2016,29 +2053,63 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx (1 row) EXECUTE citus_prep(500); -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx EXECUTE citus_prep(500); -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx EXECUTE citus_prep(500); -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx EXECUTE citus_prep(500); -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx EXECUTE citus_prep(500); -NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN DO NOTHING +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing MERGE INTO merge_schema.citus_target_xxxxxxx citus_target USING (SELECT citus_source.id, citus_source.val FROM merge_schema.citus_source_xxxxxxx citus_source) sub ON ((citus_target.id OPERATOR(pg_catalog.=) sub.id) AND (citus_target.id OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = (('Updated by prepare using '::text OPERATOR(pg_catalog.||) (sub.val COLLATE "default")) COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands to false; SELECT * FROM citus_target WHERE id = 500; -- cached -NOTICE: issuing SELECT id, val FROM merge_schema.citus_target_xxxxxxx citus_target WHERE (id OPERATOR(pg_catalog.=) 500) -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx id | val --------------------------------------------------------------------- 500 | Updated by prepare using source (1 row) -SET citus.log_remote_commands to false; +SELECT count(*) FROM citus_target; -- cached + count +--------------------------------------------------------------------- + 3245 +(1 row) + SELECT compare_tables(); compare_tables --------------------------------------------------------------------- @@ -2165,9 +2236,263 @@ SELECT pa_compare_tables(); (1 row) ROLLBACK; +CREATE TABLE source_json( id integer, z int, d jsonb); +CREATE TABLE target_json( id integer, z int, d jsonb); +INSERT INTO source_json SELECT i,i FROM generate_series(0,5)i; +SELECT create_distributed_table('target_json','id'), create_distributed_table('source_json', 'id'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.source_json$$) + create_distributed_table | create_distributed_table +--------------------------------------------------------------------- + | +(1 row) + +-- single shard query given source_json is filtered and Postgres is smart to pushdown +-- filter to the target_json as well +SELECT public.coordinator_plan($Q$ +EXPLAIN (ANALYZE ON, TIMING OFF) MERGE INTO target_json sda +USING (SELECT * FROM source_json WHERE id = 1) sdn +ON sda.id = sdn.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (sdn.id, 5); +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) (actual rows=0 loops=1) + Task Count: 1 +(2 rows) + +SELECT * FROM target_json ORDER BY 1; + id | z | d +--------------------------------------------------------------------- + 1 | 5 | +(1 row) + +-- zero shard query as filters do not match +--SELECT public.coordinator_plan($Q$ +--EXPLAIN (ANALYZE ON, TIMING OFF) MERGE INTO target_json sda +--USING (SELECT * FROM source_json WHERE id = 1) sdn +--ON sda.id = sdn.id AND sda.id = 2 +--WHEN NOT matched THEN +-- INSERT (id, z) VALUES (sdn.id, 5); +--$Q$); +--SELECT * FROM target_json ORDER BY 1; +-- join for source_json is happening at a different place +SELECT public.coordinator_plan($Q$ +EXPLAIN (ANALYZE ON, TIMING OFF) MERGE INTO target_json sda +USING source_json s1 LEFT JOIN (SELECT * FROM source_json) s2 USING(z) +ON sda.id = s1.id AND s1.id = s2.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (s2.id, 5); +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) (actual rows=0 loops=1) + Task Count: 4 +(2 rows) + +SELECT * FROM target_json ORDER BY 1; + id | z | d +--------------------------------------------------------------------- + 0 | 5 | + 1 | 5 | + 2 | 5 | + 3 | 5 | + 4 | 5 | + 5 | 5 | +(6 rows) + +-- update JSON column +SELECT public.coordinator_plan($Q$ +EXPLAIN (ANALYZE ON, TIMING OFF) MERGE INTO target_json sda +USING source_json sdn +ON sda.id = sdn.id +WHEN matched THEN + UPDATE SET d = '{"a" : 5}'; +$Q$); + coordinator_plan +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) (actual rows=0 loops=1) + Task Count: 4 +(2 rows) + +SELECT * FROM target_json ORDER BY 1; + id | z | d +--------------------------------------------------------------------- + 0 | 5 | {"a": 5} + 1 | 5 | {"a": 5} + 2 | 5 | {"a": 5} + 3 | 5 | {"a": 5} + 4 | 5 | {"a": 5} + 5 | 5 | {"a": 5} +(6 rows) + +CREATE FUNCTION immutable_hash(int) RETURNS int +AS 'SELECT hashtext( ($1 + $1)::text);' +LANGUAGE SQL +IMMUTABLE +RETURNS NULL ON NULL INPUT; +MERGE INTO target_json sda +USING source_json sdn +ON sda.id = sdn.id +WHEN matched THEN + UPDATE SET z = immutable_hash(sdn.z); +-- Test bigserial +CREATE TABLE source_serial (id integer, z int, d bigserial); +CREATE TABLE target_serial (id integer, z int, d bigserial); +INSERT INTO source_serial SELECT i,i FROM generate_series(0,100)i; +SELECT create_distributed_table('source_serial', 'id'), + create_distributed_table('target_serial', 'id'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.source_serial$$) + create_distributed_table | create_distributed_table +--------------------------------------------------------------------- + | +(1 row) + +MERGE INTO target_serial sda +USING source_serial sdn +ON sda.id = sdn.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (id, z); +ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables +SELECT count(*) from source_serial; + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT count(*) from target_serial; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(distinct d) from source_serial; + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT count(distinct d) from target_serial; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- Test set operations +CREATE TABLE target_set(t1 int, t2 int); +CREATE TABLE source_set(s1 int, s2 int); +SELECT create_distributed_table('target_set', 't1'), + create_distributed_table('source_set', 's1'); + create_distributed_table | create_distributed_table +--------------------------------------------------------------------- + | +(1 row) + +INSERT INTO target_set VALUES(1, 0); +INSERT INTO source_set VALUES(1, 1); +INSERT INTO source_set VALUES(2, 2); +MERGE INTO target_set +USING (SELECT * FROM source_set UNION SELECT * FROM source_set) AS foo ON target_set.t1 = foo.s1 +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 100 +WHEN NOT MATCHED THEN + INSERT VALUES(foo.s1); +SELECT * FROM target_set ORDER BY 1, 2; + t1 | t2 +--------------------------------------------------------------------- + 1 | 100 + 2 | +(2 rows) + -- -- Error and Unsupported scenarios -- +MERGE INTO target_set +USING (SELECT s1,s2 FROM source_set UNION SELECT s2,s1 FROM source_set) AS foo ON target_set.t1 = foo.s1 +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 1; +ERROR: cannot pushdown the subquery since not all subqueries in the UNION have the partition column in the same position +DETAIL: Each leaf query of the UNION should return the partition column in the same position and all joins must be on the partition column +MERGE INTO target_set +USING (SELECT 2 as s3, source_set.* FROM (SELECT * FROM source_set LIMIT 1) as foo LEFT JOIN source_set USING( s1)) AS foo +ON target_set.t1 = foo.s1 +WHEN MATCHED THEN UPDATE SET t2 = t2 + 1 +WHEN NOT MATCHED THEN INSERT VALUES(s1, s3); +ERROR: cannot push down this subquery +DETAIL: Limit clause is currently unsupported when a subquery references a column from another query +-- modifying CTE not supported +EXPLAIN +WITH cte_1 AS (DELETE FROM target_json) +MERGE INTO target_json sda +USING source_json sdn +ON sda.id = sdn.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (sdn.id, 5); +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns +-- Grouping sets not supported +MERGE INTO citus_target t +USING (SELECT count(*), id FROM citus_source GROUP BY GROUPING SETS (id, val)) subq +ON subq.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated' +WHEN NOT MATCHED THEN + INSERT VALUES (subq.id, 99) +WHEN MATCHED AND t.id < 350 THEN + DELETE; +ERROR: cannot push down this subquery +DETAIL: could not run distributed query with GROUPING SETS, CUBE, or ROLLUP +WITH subq AS +( +SELECT count(*), id FROM citus_source GROUP BY GROUPING SETS (id, val) +) +MERGE INTO citus_target t +USING subq +ON subq.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated' +WHEN NOT MATCHED THEN + INSERT VALUES (subq.id, 99) +WHEN MATCHED AND t.id < 350 THEN + DELETE; +ERROR: cannot push down this subquery +DETAIL: could not run distributed query with GROUPING SETS, CUBE, or ROLLUP +-- try inserting unmatched distribution column value +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT DEFAULT VALUES; +ERROR: cannot perform MERGE INSERT with DEFAULTS +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT VALUES(10000); +ERROR: MERGE INSERT must refer a source column for distribution column +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) VALUES(1000); +ERROR: MERGE INSERT must refer a source column for distribution column +MERGE INTO t1 t +USING s1 s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) VALUES(s.val); +ERROR: MERGE INSERT must use the source table distribution column value +MERGE INTO t1 t +USING s1 s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT (val) VALUES(s.val); +ERROR: MERGE INSERT must have distribution column as value -- try updating the distribution key column BEGIN; MERGE INTO target_cj t @@ -2177,7 +2502,7 @@ MERGE INTO target_cj t UPDATE SET tid = tid + 9, src = src || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid1, 'inserted by merge', val1); -ERROR: modifying the partition value of rows is not allowed +ERROR: updating the distribution column is not allowed in MERGE actions ROLLBACK; -- Foreign table as target MERGE INTO foreign_table @@ -2269,13 +2594,31 @@ BEGIN RETURN TRUE; END; $$; +-- Test functions executing in MERGE statement. This is to prevent the functions from +-- doing a random sql, which may be executed in a remote node or modifying the target +-- relation which will have unexpected/suprising results. +MERGE INTO t1 USING (SELECT * FROM s1 WHERE true) s1 ON + t1.id = s1.id AND s1.id = 2 + WHEN matched THEN + UPDATE SET id = s1.id, val = random(); +ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables +-- Test STABLE function +CREATE FUNCTION add_s(integer, integer) RETURNS integer +AS 'select $1 + $2;' +LANGUAGE SQL +STABLE RETURNS NULL ON NULL INPUT; +MERGE INTO t1 +USING s1 ON t1.id = s1.id +WHEN NOT MATCHED THEN + INSERT VALUES(s1.id, add_s(s1.val, 2)); +ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables -- Test preventing "ON" join condition from writing to the database BEGIN; MERGE INTO t1 USING s1 ON t1.id = s1.id AND t1.id = 2 AND (merge_when_and_write()) WHEN MATCHED THEN UPDATE SET val = t1.val + s1.val; -ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables ROLLBACK; -- Test preventing WHEN clause(s) from writing to the database BEGIN; @@ -2283,7 +2626,7 @@ MERGE INTO t1 USING s1 ON t1.id = s1.id AND t1.id = 2 WHEN MATCHED AND (merge_when_and_write()) THEN UPDATE SET val = t1.val + s1.val; -ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables ROLLBACK; -- Joining on partition columns with sub-query MERGE INTO t1 @@ -2294,7 +2637,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (sub.id, sub.val); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -- Joining on partition columns with CTE WITH s1_res AS ( SELECT * FROM s1 @@ -2307,7 +2650,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -- Constant Join condition WITH s1_res AS ( SELECT * FROM s1 @@ -2320,7 +2663,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -- With a single WHEN clause, which causes a non-left join WITH s1_res AS ( SELECT * FROM s1 @@ -2329,7 +2672,7 @@ WITH s1_res AS ( WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1_res.id, s1_res.val); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -- -- Reference tables -- @@ -2559,7 +2902,7 @@ WHEN MATCHED THEN UPDATE SET val = dist_colocated.val WHEN NOT MATCHED THEN INSERT VALUES(dist_colocated.id, dist_colocated.val); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -- Both the source and target must be distributed MERGE INTO dist_target USING (SELECT 100 id) AS source @@ -2752,14 +3095,14 @@ HINT: Consider using hash distribution instead DROP SERVER foreign_server CASCADE; NOTICE: drop cascades to 3 other objects DETAIL: drop cascades to user mapping for postgres on server foreign_server -drop cascades to foreign table foreign_table_4000046 +drop cascades to foreign table foreign_table_4000043 drop cascades to foreign table foreign_table -NOTICE: foreign table "foreign_table_4000046" does not exist, skipping +NOTICE: foreign table "foreign_table_4000043" does not exist, skipping CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name, drop_shards_metadata_only := false)" PL/pgSQL function citus_drop_trigger() line XX at PERFORM DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 75 other objects +NOTICE: drop cascades to 84 other objects DETAIL: drop cascades to function insert_data() drop cascades to table pg_result drop cascades to table local_local @@ -2801,14 +3144,15 @@ drop cascades to table mv_target drop cascades to table mv_source_table drop cascades to materialized view mv_source drop cascades to table mv_local -drop cascades to table dist_table +drop cascades to table dist_table_4000041 drop cascades to function f_dist() drop cascades to table fn_target_4000040 drop cascades to table fn_result drop cascades to table fn_target +drop cascades to table dist_table drop cascades to table fn_local drop cascades to table ft_target -drop cascades to table ft_source_4000045 +drop cascades to table ft_source_4000042 drop cascades to table ft_source drop cascades to extension postgres_fdw drop cascades to table target_cj @@ -2826,9 +3170,17 @@ drop cascades to table citus_pa_target drop cascades to table pg_pa_source drop cascades to table citus_pa_source drop cascades to function pa_compare_tables() +drop cascades to table source_json +drop cascades to table target_json +drop cascades to function immutable_hash(integer) +drop cascades to table source_serial +drop cascades to table target_serial +drop cascades to table target_set +drop cascades to table source_set +drop cascades to function add_s(integer,integer) drop cascades to table pg -drop cascades to table t1_4000110 -drop cascades to table s1_4000111 +drop cascades to table t1_4000131 +drop cascades to table s1_4000132 drop cascades to table t1 drop cascades to table s1 drop cascades to table dist_colocated diff --git a/src/test/regress/expected/merge_arbitrary.out b/src/test/regress/expected/merge_arbitrary.out new file mode 100644 index 000000000..345ac1410 --- /dev/null +++ b/src/test/regress/expected/merge_arbitrary.out @@ -0,0 +1,150 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif +SET search_path TO merge_arbitrary_schema; +INSERT INTO target_cj VALUES (1, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (3, 'target', 0); +INSERT INTO source_cj1 VALUES (2, 'source-1', 10); +INSERT INTO source_cj2 VALUES (2, 'source-2', 20); +BEGIN; +MERGE INTO target_cj t +USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src2 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | source-2 | 0 + 2 | source-2 | 0 + 3 | target | 0 +(4 rows) + +ROLLBACK; +BEGIN; +-- try accessing columns from either side of the source join +MERGE INTO target_cj t +USING source_cj1 s2 + INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src1, val = val2 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; + tid | src | val +--------------------------------------------------------------------- + 1 | target | 0 + 2 | source-1 | 20 + 2 | source-1 | 20 + 3 | target | 0 +(4 rows) + +ROLLBACK; +-- Test PREPARE +PREPARE insert(int, int, int) AS +MERGE INTO prept +USING (SELECT $2, s1, s2 FROM preps WHERE s2 > $3) as foo +ON prept.t1 = foo.s1 +WHEN MATCHED THEN + UPDATE SET t2 = t2 + $1 +WHEN NOT MATCHED THEN + INSERT VALUES(s1, s2); +PREPARE delete(int) AS +MERGE INTO prept +USING preps +ON prept.t1 = preps.s1 +WHEN MATCHED AND prept.t2 = $1 THEN + DELETE +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 1; +INSERT INTO prept VALUES(100, 0); +INSERT INTO preps VALUES(100, 0); +INSERT INTO preps VALUES(200, 0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +-- sixth time +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +-- Should have the counter as 14 (7 * 2) +SELECT * FROM prept; + t1 | t2 +--------------------------------------------------------------------- + 100 | 14 +(1 row) + +-- Test local tables +INSERT INTO s1 VALUES(1, 0); -- Matches DELETE clause +INSERT INTO s1 VALUES(2, 1); -- Matches UPDATE clause +INSERT INTO s1 VALUES(3, 1); -- No Match INSERT clause +INSERT INTO s1 VALUES(4, 1); -- No Match INSERT clause +INSERT INTO s1 VALUES(6, 1); -- No Match INSERT clause +INSERT INTO t1 VALUES(1, 0); -- Will be deleted +INSERT INTO t1 VALUES(2, 0); -- Will be updated +INSERT INTO t1 VALUES(5, 0); -- Will be intact +PREPARE local(int, int) AS +WITH s1_res AS ( + SELECT * FROM s1 +) +MERGE INTO t1 + USING s1_res ON (s1_res.id = t1.id) + WHEN MATCHED AND s1_res.val = $1 THEN + DELETE + WHEN MATCHED THEN + UPDATE SET val = t1.val + $2 + WHEN NOT MATCHED THEN + INSERT (id, val) VALUES (s1_res.id, s1_res.val); +BEGIN; +EXECUTE local(0, 1); +SELECT * FROM t1 order by id; + id | val +--------------------------------------------------------------------- + 2 | 1 + 3 | 1 + 4 | 1 + 5 | 0 + 6 | 1 +(5 rows) + +ROLLBACK; +BEGIN; +EXECUTE local(0, 1); +ROLLBACK; +BEGIN; +EXECUTE local(0, 1); +ROLLBACK; +BEGIN; +EXECUTE local(0, 1); +ROLLBACK; +BEGIN; +EXECUTE local(0, 1); +ROLLBACK; +-- sixth time +BEGIN; +EXECUTE local(0, 1); +ROLLBACK; +BEGIN; +EXECUTE local(0, 1); +SELECT * FROM t1 order by id; + id | val +--------------------------------------------------------------------- + 2 | 1 + 3 | 1 + 4 | 1 + 5 | 0 + 6 | 1 +(5 rows) + +ROLLBACK; diff --git a/src/test/regress/expected/merge_arbitrary_0.out b/src/test/regress/expected/merge_arbitrary_0.out new file mode 100644 index 000000000..a7e3fbf20 --- /dev/null +++ b/src/test/regress/expected/merge_arbitrary_0.out @@ -0,0 +1,6 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q diff --git a/src/test/regress/expected/merge_arbitrary_create.out b/src/test/regress/expected/merge_arbitrary_create.out new file mode 100644 index 000000000..9b2444f17 --- /dev/null +++ b/src/test/regress/expected/merge_arbitrary_create.out @@ -0,0 +1,72 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif +DROP SCHEMA IF EXISTS merge_arbitrary_schema CASCADE; +CREATE SCHEMA merge_arbitrary_schema; +SET search_path TO merge_arbitrary_schema; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 6000000; +CREATE TABLE target_cj(tid int, src text, val int); +CREATE TABLE source_cj1(sid1 int, src1 text, val1 int); +CREATE TABLE source_cj2(sid2 int, src2 text, val2 int); +SELECT create_distributed_table('target_cj', 'tid'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('source_cj1', 'sid1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('source_cj2', 'sid2'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE prept(t1 int, t2 int); +CREATE TABLE preps(s1 int, s2 int); +SELECT create_distributed_table('prept', 't1'), create_distributed_table('preps', 's1'); + create_distributed_table | create_distributed_table +--------------------------------------------------------------------- + | +(1 row) + +PREPARE insert(int, int, int) AS +MERGE INTO prept +USING (SELECT $2, s1, s2 FROM preps WHERE s2 > $3) as foo +ON prept.t1 = foo.s1 +WHEN MATCHED THEN + UPDATE SET t2 = t2 + $1 +WHEN NOT MATCHED THEN + INSERT VALUES(s1, s2); +PREPARE delete(int) AS +MERGE INTO prept +USING preps +ON prept.t1 = preps.s1 +WHEN MATCHED AND prept.t2 = $1 THEN + DELETE +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 1; +-- Citus local tables +CREATE TABLE t1(id int, val int); +CREATE TABLE s1(id int, val int); +SELECT citus_add_local_table_to_metadata('t1'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('s1'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/expected/merge_arbitrary_create_0.out b/src/test/regress/expected/merge_arbitrary_create_0.out new file mode 100644 index 000000000..a7e3fbf20 --- /dev/null +++ b/src/test/regress/expected/merge_arbitrary_create_0.out @@ -0,0 +1,6 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q diff --git a/src/test/regress/expected/pg15.out b/src/test/regress/expected/pg15.out index d92686b93..7fc102dbb 100644 --- a/src/test/regress/expected/pg15.out +++ b/src/test/regress/expected/pg15.out @@ -419,29 +419,36 @@ SELECT create_distributed_table('tbl2', 'x'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns -- also, not inside subqueries & ctes WITH targq AS ( SELECT * FROM tbl2 ) MERGE INTO tbl1 USING targq ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column --- crashes on beta3, fixed on 15 stable ---WITH foo AS ( --- MERGE INTO tbl1 USING tbl2 ON (true) --- WHEN MATCHED THEN DELETE ---) SELECT * FROM foo; ---COPY ( --- MERGE INTO tbl1 USING tbl2 ON (true) --- WHEN MATCHED THEN DELETE ---) TO stdout; +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns +WITH foo AS ( + MERGE INTO tbl1 USING tbl2 ON (true) + WHEN MATCHED THEN DELETE +) SELECT * FROM foo; +ERROR: MERGE not supported in WITH query +COPY ( + MERGE INTO tbl1 USING tbl2 ON (true) + WHEN MATCHED THEN DELETE +) TO stdout; +ERROR: MERGE not supported in COPY +MERGE INTO tbl1 t +USING tbl2 +ON (true) +WHEN MATCHED THEN + DO NOTHING; +ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns MERGE INTO tbl1 t USING tbl2 ON (true) WHEN MATCHED THEN UPDATE SET x = (SELECT count(*) FROM tbl2); -ERROR: MERGE command is only supported when distributed tables are joined on their distribution column +ERROR: updating the distribution column is not allowed in MERGE actions -- test numeric types with negative scale CREATE TABLE numeric_negative_scale(numeric_column numeric(3,-1), orig_value int); INSERT into numeric_negative_scale SELECT x,x FROM generate_series(111, 115) x; diff --git a/src/test/regress/expected/pgmerge.out b/src/test/regress/expected/pgmerge.out index 0bedf356f..8a74336a0 100644 --- a/src/test/regress/expected/pgmerge.out +++ b/src/test/regress/expected/pgmerge.out @@ -910,7 +910,7 @@ MERGE INTO wq_target t USING wq_source s ON t.tid = s.sid WHEN MATCHED AND (merge_when_and_write()) THEN UPDATE SET balance = t.balance + s.balance; -ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables ROLLBACK; -- Test preventing ON condition from writing to the database BEGIN; @@ -918,7 +918,7 @@ MERGE INTO wq_target t USING wq_source s ON t.tid = s.sid AND (merge_when_and_write()) WHEN MATCHED THEN UPDATE SET balance = t.balance + s.balance; -ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE +ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables ROLLBACK; drop function merge_when_and_write(); DROP TABLE wq_target, wq_source; @@ -1893,13 +1893,15 @@ INSERT INTO pa_target SELECT '2017-02-28', id, id * 100, 'initial' FROM generate SET client_min_messages TO DEBUG1; BEGIN; MERGE INTO pa_target t - USING (SELECT '2017-01-15' AS slogts, * FROM pa_source WHERE sid < 10) s + USING (SELECT * FROM pa_source WHERE sid < 10) s + --USING (SELECT '2017-01-15' AS slogts, * FROM pa_source WHERE sid < 10) s ON t.tid = s.sid WHEN MATCHED THEN UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN - INSERT VALUES (slogts::timestamp, sid, delta, 'inserted by merge'); -DEBUG: + INSERT VALUES ('2017-01-15', sid, delta, 'inserted by merge'); +DEBUG: + --INSERT VALUES (slogts::timestamp, sid, delta, 'inserted by merge'); SELECT * FROM pa_target ORDER BY tid; logts | tid | balance | val --------------------------------------------------------------------- diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 12294b2c9..ded90b69c 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -18,8 +18,9 @@ CREATE SCHEMA merge_schema; SET search_path TO merge_schema; SET citus.shard_count TO 4; SET citus.next_shard_id TO 4000000; -SET citus.explain_all_tasks to true; +SET citus.explain_all_tasks TO true; SET citus.shard_replication_factor TO 1; +SET citus.max_adaptive_executor_pool_size TO 1; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); CREATE TABLE source @@ -185,6 +186,21 @@ MERGE INTO target t VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time); SELECT * from target t WHERE t.customer_id = 30004; +-- Updating distribution column is allowed if the operation is a no-op +SELECT * from target t WHERE t.customer_id = 30000; +MERGE INTO target t +USING SOURCE s +ON (t.customer_id = s.customer_id AND t.customer_id = 30000) +WHEN MATCHED THEN + UPDATE SET customer_id = 30000; + +MERGE INTO target t +USING SOURCE s +ON (t.customer_id = s.customer_id AND t.customer_id = 30000) +WHEN MATCHED THEN + UPDATE SET customer_id = t.customer_id; +SELECT * from target t WHERE t.customer_id = 30000; + -- -- Test MERGE with CTE as source -- @@ -223,7 +239,6 @@ MERGE INTO t1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (pg_res.id, pg_res.val); --- Two rows with id 2 and val incremented, id 3, and id 1 is deleted SELECT * FROM t1 order by id; SELECT * INTO merge_result FROM t1 order by id; @@ -777,7 +792,8 @@ $$ language plpgsql volatile; CREATE TABLE fn_target(id int, data varchar); MERGE INTO fn_target -USING (SELECT * FROM f_dist() f(id integer, source varchar)) as fn_source +--USING (SELECT * FROM f_dist() f(id integer, source varchar)) as fn_source +USING (SELECT id, source FROM dist_table) as fn_source ON fn_source.id = fn_target.id WHEN MATCHED THEN DO NOTHING @@ -790,11 +806,12 @@ SELECT * INTO fn_result FROM fn_target ORDER BY 1 ; -- Clean the slate TRUNCATE TABLE fn_target; SELECT citus_add_local_table_to_metadata('fn_target'); -SELECT create_distributed_table('dist_table', 'id'); +SELECT citus_add_local_table_to_metadata('dist_table'); SET client_min_messages TO DEBUG1; MERGE INTO fn_target -USING (SELECT * FROM f_dist() f(id integer, source varchar)) as fn_source +--USING (SELECT * FROM f_dist() f(id integer, source varchar)) as fn_source +USING (SELECT id, source FROM dist_table) as fn_source ON fn_source.id = fn_target.id WHEN MATCHED THEN DO NOTHING @@ -1287,7 +1304,7 @@ ON pg_target.id = sub.id AND pg_target.id = $1 WHEN MATCHED THEN UPDATE SET val = 'Updated by prepare using ' || sub.val WHEN NOT MATCHED THEN - DO NOTHING; + INSERT VALUES (sub.id, sub.val); PREPARE citus_prep(int) AS MERGE INTO citus_target @@ -1296,12 +1313,12 @@ ON citus_target.id = sub.id AND citus_target.id = $1 WHEN MATCHED THEN UPDATE SET val = 'Updated by prepare using ' || sub.val WHEN NOT MATCHED THEN - DO NOTHING; + INSERT VALUES (sub.id, sub.val); BEGIN; -SET citus.log_remote_commands to true; SELECT * FROM pg_target WHERE id = 500; -- before merge +SELECT count(*) FROM pg_target; -- before merge EXECUTE pg_prep(500); SELECT * FROM pg_target WHERE id = 500; -- non-cached EXECUTE pg_prep(500); @@ -1310,8 +1327,11 @@ EXECUTE pg_prep(500); EXECUTE pg_prep(500); EXECUTE pg_prep(500); SELECT * FROM pg_target WHERE id = 500; -- cached +SELECT count(*) FROM pg_target; -- cached SELECT * FROM citus_target WHERE id = 500; -- before merge +SELECT count(*) FROM citus_target; -- before merge +SET citus.log_remote_commands to true; EXECUTE citus_prep(500); SELECT * FROM citus_target WHERE id = 500; -- non-cached EXECUTE citus_prep(500); @@ -1319,9 +1339,10 @@ EXECUTE citus_prep(500); EXECUTE citus_prep(500); EXECUTE citus_prep(500); EXECUTE citus_prep(500); -SELECT * FROM citus_target WHERE id = 500; -- cached - SET citus.log_remote_commands to false; +SELECT * FROM citus_target WHERE id = 500; -- cached +SELECT count(*) FROM citus_target; -- cached + SELECT compare_tables(); ROLLBACK; @@ -1417,10 +1438,185 @@ MERGE INTO citus_pa_target t SELECT pa_compare_tables(); ROLLBACK; +CREATE TABLE source_json( id integer, z int, d jsonb); +CREATE TABLE target_json( id integer, z int, d jsonb); + +INSERT INTO source_json SELECT i,i FROM generate_series(0,5)i; + +SELECT create_distributed_table('target_json','id'), create_distributed_table('source_json', 'id'); + +-- single shard query given source_json is filtered and Postgres is smart to pushdown +-- filter to the target_json as well +SELECT public.coordinator_plan($Q$ +EXPLAIN (ANALYZE ON, TIMING OFF) MERGE INTO target_json sda +USING (SELECT * FROM source_json WHERE id = 1) sdn +ON sda.id = sdn.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (sdn.id, 5); +$Q$); +SELECT * FROM target_json ORDER BY 1; + +-- zero shard query as filters do not match +--SELECT public.coordinator_plan($Q$ +--EXPLAIN (ANALYZE ON, TIMING OFF) MERGE INTO target_json sda +--USING (SELECT * FROM source_json WHERE id = 1) sdn +--ON sda.id = sdn.id AND sda.id = 2 +--WHEN NOT matched THEN +-- INSERT (id, z) VALUES (sdn.id, 5); +--$Q$); +--SELECT * FROM target_json ORDER BY 1; + +-- join for source_json is happening at a different place +SELECT public.coordinator_plan($Q$ +EXPLAIN (ANALYZE ON, TIMING OFF) MERGE INTO target_json sda +USING source_json s1 LEFT JOIN (SELECT * FROM source_json) s2 USING(z) +ON sda.id = s1.id AND s1.id = s2.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (s2.id, 5); +$Q$); +SELECT * FROM target_json ORDER BY 1; + +-- update JSON column +SELECT public.coordinator_plan($Q$ +EXPLAIN (ANALYZE ON, TIMING OFF) MERGE INTO target_json sda +USING source_json sdn +ON sda.id = sdn.id +WHEN matched THEN + UPDATE SET d = '{"a" : 5}'; +$Q$); +SELECT * FROM target_json ORDER BY 1; + +CREATE FUNCTION immutable_hash(int) RETURNS int +AS 'SELECT hashtext( ($1 + $1)::text);' +LANGUAGE SQL +IMMUTABLE +RETURNS NULL ON NULL INPUT; + +MERGE INTO target_json sda +USING source_json sdn +ON sda.id = sdn.id +WHEN matched THEN + UPDATE SET z = immutable_hash(sdn.z); + +-- Test bigserial +CREATE TABLE source_serial (id integer, z int, d bigserial); +CREATE TABLE target_serial (id integer, z int, d bigserial); +INSERT INTO source_serial SELECT i,i FROM generate_series(0,100)i; +SELECT create_distributed_table('source_serial', 'id'), + create_distributed_table('target_serial', 'id'); + +MERGE INTO target_serial sda +USING source_serial sdn +ON sda.id = sdn.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (id, z); + +SELECT count(*) from source_serial; +SELECT count(*) from target_serial; + +SELECT count(distinct d) from source_serial; +SELECT count(distinct d) from target_serial; + +-- Test set operations +CREATE TABLE target_set(t1 int, t2 int); +CREATE TABLE source_set(s1 int, s2 int); + +SELECT create_distributed_table('target_set', 't1'), + create_distributed_table('source_set', 's1'); + +INSERT INTO target_set VALUES(1, 0); +INSERT INTO source_set VALUES(1, 1); +INSERT INTO source_set VALUES(2, 2); + +MERGE INTO target_set +USING (SELECT * FROM source_set UNION SELECT * FROM source_set) AS foo ON target_set.t1 = foo.s1 +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 100 +WHEN NOT MATCHED THEN + INSERT VALUES(foo.s1); +SELECT * FROM target_set ORDER BY 1, 2; + -- -- Error and Unsupported scenarios -- +MERGE INTO target_set +USING (SELECT s1,s2 FROM source_set UNION SELECT s2,s1 FROM source_set) AS foo ON target_set.t1 = foo.s1 +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 1; + +MERGE INTO target_set +USING (SELECT 2 as s3, source_set.* FROM (SELECT * FROM source_set LIMIT 1) as foo LEFT JOIN source_set USING( s1)) AS foo +ON target_set.t1 = foo.s1 +WHEN MATCHED THEN UPDATE SET t2 = t2 + 1 +WHEN NOT MATCHED THEN INSERT VALUES(s1, s3); + + +-- modifying CTE not supported +EXPLAIN +WITH cte_1 AS (DELETE FROM target_json) +MERGE INTO target_json sda +USING source_json sdn +ON sda.id = sdn.id +WHEN NOT matched THEN + INSERT (id, z) VALUES (sdn.id, 5); + +-- Grouping sets not supported +MERGE INTO citus_target t +USING (SELECT count(*), id FROM citus_source GROUP BY GROUPING SETS (id, val)) subq +ON subq.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated' +WHEN NOT MATCHED THEN + INSERT VALUES (subq.id, 99) +WHEN MATCHED AND t.id < 350 THEN + DELETE; + +WITH subq AS +( +SELECT count(*), id FROM citus_source GROUP BY GROUPING SETS (id, val) +) +MERGE INTO citus_target t +USING subq +ON subq.id = t.id +WHEN MATCHED AND t.id > 350 THEN + UPDATE SET val = t.val || 'Updated' +WHEN NOT MATCHED THEN + INSERT VALUES (subq.id, 99) +WHEN MATCHED AND t.id < 350 THEN + DELETE; + +-- try inserting unmatched distribution column value +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT DEFAULT VALUES; + +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT VALUES(10000); + +MERGE INTO citus_target t +USING citus_source s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) VALUES(1000); + +MERGE INTO t1 t +USING s1 s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT (id) VALUES(s.val); + +MERGE INTO t1 t +USING s1 s +ON t.id = s.id +WHEN NOT MATCHED THEN + INSERT (val) VALUES(s.val); + -- try updating the distribution key column BEGIN; MERGE INTO target_cj t @@ -1473,6 +1669,25 @@ BEGIN END; $$; +-- Test functions executing in MERGE statement. This is to prevent the functions from +-- doing a random sql, which may be executed in a remote node or modifying the target +-- relation which will have unexpected/suprising results. +MERGE INTO t1 USING (SELECT * FROM s1 WHERE true) s1 ON + t1.id = s1.id AND s1.id = 2 + WHEN matched THEN + UPDATE SET id = s1.id, val = random(); + +-- Test STABLE function +CREATE FUNCTION add_s(integer, integer) RETURNS integer +AS 'select $1 + $2;' +LANGUAGE SQL +STABLE RETURNS NULL ON NULL INPUT; + +MERGE INTO t1 +USING s1 ON t1.id = s1.id +WHEN NOT MATCHED THEN + INSERT VALUES(s1.id, add_s(s1.val, 2)); + -- Test preventing "ON" join condition from writing to the database BEGIN; MERGE INTO t1 diff --git a/src/test/regress/sql/merge_arbitrary.sql b/src/test/regress/sql/merge_arbitrary.sql new file mode 100644 index 000000000..17b7d4f90 --- /dev/null +++ b/src/test/regress/sql/merge_arbitrary.sql @@ -0,0 +1,133 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif + +SET search_path TO merge_arbitrary_schema; +INSERT INTO target_cj VALUES (1, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (2, 'target', 0); +INSERT INTO target_cj VALUES (3, 'target', 0); + +INSERT INTO source_cj1 VALUES (2, 'source-1', 10); +INSERT INTO source_cj2 VALUES (2, 'source-2', 20); + +BEGIN; +MERGE INTO target_cj t +USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src2 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +BEGIN; +-- try accessing columns from either side of the source join +MERGE INTO target_cj t +USING source_cj1 s2 + INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10 +ON t.tid = sid1 AND t.tid = 2 +WHEN MATCHED THEN + UPDATE SET src = src1, val = val2 +WHEN NOT MATCHED THEN + DO NOTHING; +SELECT * FROM target_cj ORDER BY 1; +ROLLBACK; + +-- Test PREPARE +PREPARE insert(int, int, int) AS +MERGE INTO prept +USING (SELECT $2, s1, s2 FROM preps WHERE s2 > $3) as foo +ON prept.t1 = foo.s1 +WHEN MATCHED THEN + UPDATE SET t2 = t2 + $1 +WHEN NOT MATCHED THEN + INSERT VALUES(s1, s2); + +PREPARE delete(int) AS +MERGE INTO prept +USING preps +ON prept.t1 = preps.s1 +WHEN MATCHED AND prept.t2 = $1 THEN + DELETE +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 1; + +INSERT INTO prept VALUES(100, 0); + +INSERT INTO preps VALUES(100, 0); +INSERT INTO preps VALUES(200, 0); + +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); + +-- sixth time +EXECUTE insert(1, 1, -1); EXECUTE delete(0); +EXECUTE insert(1, 1, -1); EXECUTE delete(0); + +-- Should have the counter as 14 (7 * 2) +SELECT * FROM prept; + +-- Test local tables +INSERT INTO s1 VALUES(1, 0); -- Matches DELETE clause +INSERT INTO s1 VALUES(2, 1); -- Matches UPDATE clause +INSERT INTO s1 VALUES(3, 1); -- No Match INSERT clause +INSERT INTO s1 VALUES(4, 1); -- No Match INSERT clause +INSERT INTO s1 VALUES(6, 1); -- No Match INSERT clause + +INSERT INTO t1 VALUES(1, 0); -- Will be deleted +INSERT INTO t1 VALUES(2, 0); -- Will be updated +INSERT INTO t1 VALUES(5, 0); -- Will be intact + +PREPARE local(int, int) AS +WITH s1_res AS ( + SELECT * FROM s1 +) +MERGE INTO t1 + USING s1_res ON (s1_res.id = t1.id) + + WHEN MATCHED AND s1_res.val = $1 THEN + DELETE + WHEN MATCHED THEN + UPDATE SET val = t1.val + $2 + WHEN NOT MATCHED THEN + INSERT (id, val) VALUES (s1_res.id, s1_res.val); + +BEGIN; +EXECUTE local(0, 1); +SELECT * FROM t1 order by id; +ROLLBACK; + +BEGIN; +EXECUTE local(0, 1); +ROLLBACK; + +BEGIN; +EXECUTE local(0, 1); +ROLLBACK; + +BEGIN; +EXECUTE local(0, 1); +ROLLBACK; + +BEGIN; +EXECUTE local(0, 1); +ROLLBACK; + +-- sixth time +BEGIN; +EXECUTE local(0, 1); +ROLLBACK; + +BEGIN; +EXECUTE local(0, 1); +SELECT * FROM t1 order by id; +ROLLBACK; diff --git a/src/test/regress/sql/merge_arbitrary_create.sql b/src/test/regress/sql/merge_arbitrary_create.sql new file mode 100644 index 000000000..edf9b0d9d --- /dev/null +++ b/src/test/regress/sql/merge_arbitrary_create.sql @@ -0,0 +1,50 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif + +DROP SCHEMA IF EXISTS merge_arbitrary_schema CASCADE; +CREATE SCHEMA merge_arbitrary_schema; +SET search_path TO merge_arbitrary_schema; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 6000000; +CREATE TABLE target_cj(tid int, src text, val int); +CREATE TABLE source_cj1(sid1 int, src1 text, val1 int); +CREATE TABLE source_cj2(sid2 int, src2 text, val2 int); + +SELECT create_distributed_table('target_cj', 'tid'); +SELECT create_distributed_table('source_cj1', 'sid1'); +SELECT create_distributed_table('source_cj2', 'sid2'); + +CREATE TABLE prept(t1 int, t2 int); +CREATE TABLE preps(s1 int, s2 int); + +SELECT create_distributed_table('prept', 't1'), create_distributed_table('preps', 's1'); + +PREPARE insert(int, int, int) AS +MERGE INTO prept +USING (SELECT $2, s1, s2 FROM preps WHERE s2 > $3) as foo +ON prept.t1 = foo.s1 +WHEN MATCHED THEN + UPDATE SET t2 = t2 + $1 +WHEN NOT MATCHED THEN + INSERT VALUES(s1, s2); + +PREPARE delete(int) AS +MERGE INTO prept +USING preps +ON prept.t1 = preps.s1 +WHEN MATCHED AND prept.t2 = $1 THEN + DELETE +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 1; + +-- Citus local tables +CREATE TABLE t1(id int, val int); +CREATE TABLE s1(id int, val int); + +SELECT citus_add_local_table_to_metadata('t1'); +SELECT citus_add_local_table_to_metadata('s1'); diff --git a/src/test/regress/sql/pg15.sql b/src/test/regress/sql/pg15.sql index 121b41f86..ac8062c65 100644 --- a/src/test/regress/sql/pg15.sql +++ b/src/test/regress/sql/pg15.sql @@ -269,16 +269,21 @@ WITH targq AS ( MERGE INTO tbl1 USING targq ON (true) WHEN MATCHED THEN DELETE; --- crashes on beta3, fixed on 15 stable ---WITH foo AS ( --- MERGE INTO tbl1 USING tbl2 ON (true) --- WHEN MATCHED THEN DELETE ---) SELECT * FROM foo; +WITH foo AS ( + MERGE INTO tbl1 USING tbl2 ON (true) + WHEN MATCHED THEN DELETE +) SELECT * FROM foo; ---COPY ( --- MERGE INTO tbl1 USING tbl2 ON (true) --- WHEN MATCHED THEN DELETE ---) TO stdout; +COPY ( + MERGE INTO tbl1 USING tbl2 ON (true) + WHEN MATCHED THEN DELETE +) TO stdout; + +MERGE INTO tbl1 t +USING tbl2 +ON (true) +WHEN MATCHED THEN + DO NOTHING; MERGE INTO tbl1 t USING tbl2 diff --git a/src/test/regress/sql/pgmerge.sql b/src/test/regress/sql/pgmerge.sql index 83bf01a68..9b828f27e 100644 --- a/src/test/regress/sql/pgmerge.sql +++ b/src/test/regress/sql/pgmerge.sql @@ -1172,12 +1172,14 @@ INSERT INTO pa_target SELECT '2017-02-28', id, id * 100, 'initial' FROM generate SET client_min_messages TO DEBUG1; BEGIN; MERGE INTO pa_target t - USING (SELECT '2017-01-15' AS slogts, * FROM pa_source WHERE sid < 10) s + USING (SELECT * FROM pa_source WHERE sid < 10) s + --USING (SELECT '2017-01-15' AS slogts, * FROM pa_source WHERE sid < 10) s ON t.tid = s.sid WHEN MATCHED THEN UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN - INSERT VALUES (slogts::timestamp, sid, delta, 'inserted by merge'); + INSERT VALUES ('2017-01-15', sid, delta, 'inserted by merge'); + --INSERT VALUES (slogts::timestamp, sid, delta, 'inserted by merge'); SELECT * FROM pa_target ORDER BY tid; ROLLBACK; RESET client_min_messages; diff --git a/src/test/regress/sql_schedule b/src/test/regress/sql_schedule index f07f7af9a..272a84eff 100644 --- a/src/test/regress/sql_schedule +++ b/src/test/regress/sql_schedule @@ -14,3 +14,4 @@ test: arbitrary_configs_truncate test: arbitrary_configs_truncate_cascade test: arbitrary_configs_truncate_partition test: arbitrary_configs_alter_table_add_constraint_without_name +test: merge_arbitrary