mirror of https://github.com/citusdata/citus.git
Support MERGE on distributed tables with restrictions
This implements the phase - II of MERGE sql support Support routable query where all the tables in the merge-sql are distributed, co-located, and both the source and target relations are joined on the distribution column with a constant qual. This should be a Citus single-task query. Below is an example. SELECT create_distributed_table('t1', 'id'); SELECT create_distributed_table('s1', 'id', colocate_with => ‘t1’); MERGE INTO t1 USING s1 ON t1.id = s1.id AND t1.id = 100 WHEN MATCHED THEN UPDATE SET val = s1.val + 10 WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val, src) VALUES (s1.id, s1.val, s1.src) Basically, MERGE checks to see if There are a minimum of two distributed tables (source and a target). All the distributed tables are indeed colocated. MERGE relations are joined on the distribution column MERGE .. USING .. ON target.dist_key = source.dist_key The query should touch only a single shard i.e. JOIN AND with a constant qual MERGE .. USING .. ON target.dist_key = source.dist_key AND target.dist_key = <> If any of the conditions are not met, it raises an exception. (cherry picked from commitpull/6772/head44c387b978
) This implements MERGE phase3 Support pushdown query where all the tables in the merge-sql are Citus-distributed, co-located, and both the source and target relations are joined on the distribution column. This will generate multiple tasks which execute independently after pushdown. SELECT create_distributed_table('t1', 'id'); SELECT create_distributed_table('s1', 'id', colocate_with => ‘t1’); MERGE INTO t1 USING s1 ON t1.id = s1.id WHEN MATCHED THEN UPDATE SET val = s1.val + 10 WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, val, src) VALUES (s1.id, s1.val, s1.src) *The only exception for both the phases II and III is, UPDATEs and INSERTs must be done on the same shard-group as the joined key; for example, below scenarios are NOT supported as the key-value to be inserted/updated is not guaranteed to be on the same node as the id distribution-column. MERGE INTO target t USING source s ON (t.customer_id = s.customer_id) WHEN NOT MATCHED THEN - - INSERT(customer_id, …) VALUES (<non-local-constant-key-value>, ……); OR this scenario where we update the distribution column itself MERGE INTO target t USING source s On (t.customer_id = s.customer_id) WHEN MATCHED THEN UPDATE SET customer_id = 100; (cherry picked from commitfa7b8949a8
)
parent
b8b85072d6
commit
1e42cd3da0
|
@ -75,9 +75,6 @@ static uint64 NextPlanId = 1;
|
|||
/* keep track of planner call stack levels */
|
||||
int PlannerLevel = 0;
|
||||
|
||||
static void ErrorIfQueryHasUnsupportedMergeCommand(Query *queryTree,
|
||||
List *rangeTableList);
|
||||
static bool ContainsMergeCommandWalker(Node *node);
|
||||
static bool ListContainsDistributedTableRTE(List *rangeTableList,
|
||||
bool *maybeHasForeignDistributedTable);
|
||||
static bool IsUpdateOrDelete(Query *query);
|
||||
|
@ -132,7 +129,7 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext
|
|||
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
|
||||
static List * TranslatedVars(PlannerInfo *root, int relationIndex);
|
||||
static void WarnIfListHasForeignDistributedTable(List *rangeTableList);
|
||||
static void ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList);
|
||||
|
||||
|
||||
/* Distributed planner hook */
|
||||
PlannedStmt *
|
||||
|
@ -200,12 +197,6 @@ distributed_planner(Query *parse,
|
|||
|
||||
if (!fastPathRouterQuery)
|
||||
{
|
||||
/*
|
||||
* Fast path queries cannot have merge command, and we
|
||||
* prevent the remaining here.
|
||||
*/
|
||||
ErrorIfQueryHasUnsupportedMergeCommand(parse, rangeTableList);
|
||||
|
||||
/*
|
||||
* When there are partitioned tables (not applicable to fast path),
|
||||
* pretend that they are regular tables to avoid unnecessary work
|
||||
|
@ -304,44 +295,11 @@ distributed_planner(Query *parse,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfQueryHasUnsupportedMergeCommand walks over the query tree and bails out
|
||||
* if there is no Merge command (e.g., CMD_MERGE) in the query tree. For merge,
|
||||
* looks for all supported combinations, throws an exception if any violations
|
||||
* are seen.
|
||||
*/
|
||||
static void
|
||||
ErrorIfQueryHasUnsupportedMergeCommand(Query *queryTree, List *rangeTableList)
|
||||
{
|
||||
/*
|
||||
* Postgres currently doesn't support Merge queries inside subqueries and
|
||||
* ctes, but lets be defensive and do query tree walk anyway.
|
||||
*
|
||||
* We do not call this path for fast-path queries to avoid this additional
|
||||
* overhead.
|
||||
*/
|
||||
if (!ContainsMergeCommandWalker((Node *) queryTree))
|
||||
{
|
||||
/* No MERGE found */
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* In Citus we have limited support for MERGE, it's allowed
|
||||
* only if all the tables(target, source or any CTE) tables
|
||||
* are are local i.e. a combination of Citus local and Non-Citus
|
||||
* tables (regular Postgres tables).
|
||||
*/
|
||||
ErrorIfMergeHasUnsupportedTables(queryTree, rangeTableList);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ContainsMergeCommandWalker walks over the node and finds if there are any
|
||||
* Merge command (e.g., CMD_MERGE) in the node.
|
||||
*/
|
||||
static bool
|
||||
bool
|
||||
ContainsMergeCommandWalker(Node *node)
|
||||
{
|
||||
#if PG_VERSION_NUM < PG_VERSION_15
|
||||
|
@ -676,7 +634,8 @@ bool
|
|||
IsUpdateOrDelete(Query *query)
|
||||
{
|
||||
return query->commandType == CMD_UPDATE ||
|
||||
query->commandType == CMD_DELETE;
|
||||
query->commandType == CMD_DELETE ||
|
||||
query->commandType == CMD_MERGE;
|
||||
}
|
||||
|
||||
|
||||
|
@ -2611,148 +2570,3 @@ WarnIfListHasForeignDistributedTable(List *rangeTableList)
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is
|
||||
* permitted on special relations, such as materialized view, returns true only if
|
||||
* it's a "source" relation.
|
||||
*/
|
||||
bool
|
||||
IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte)
|
||||
{
|
||||
if (!IsMergeQuery(parse))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
RangeTblEntry *targetRte = rt_fetch(parse->resultRelation, parse->rtable);
|
||||
|
||||
/* Is it a target relation? */
|
||||
if (targetRte->relid == rte->relid)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE
|
||||
* present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus
|
||||
* tables (regular Postgres tables), raises an exception for all other combinations.
|
||||
*/
|
||||
static void
|
||||
ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList)
|
||||
{
|
||||
ListCell *tableCell = NULL;
|
||||
|
||||
foreach(tableCell, rangeTableList)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(tableCell);
|
||||
Oid relationId = rangeTableEntry->relid;
|
||||
|
||||
switch (rangeTableEntry->rtekind)
|
||||
{
|
||||
case RTE_RELATION:
|
||||
{
|
||||
/* Check the relation type */
|
||||
break;
|
||||
}
|
||||
|
||||
case RTE_SUBQUERY:
|
||||
case RTE_FUNCTION:
|
||||
case RTE_TABLEFUNC:
|
||||
case RTE_VALUES:
|
||||
case RTE_JOIN:
|
||||
case RTE_CTE:
|
||||
{
|
||||
/* Skip them as base table(s) will be checked */
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* RTE_NAMEDTUPLESTORE is typically used in ephmeral named relations,
|
||||
* such as, trigger data; until we find a genuine use case, raise an
|
||||
* exception.
|
||||
* RTE_RESULT is a node added by the planner and we shouldn't
|
||||
* encounter it in the parse tree.
|
||||
*/
|
||||
case RTE_NAMEDTUPLESTORE:
|
||||
case RTE_RESULT:
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("MERGE command is not supported with "
|
||||
"Tuplestores and results")));
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("MERGE command: Unrecognized range table entry.")));
|
||||
}
|
||||
}
|
||||
|
||||
/* RTE Relation can be of various types, check them now */
|
||||
|
||||
/* skip the regular views as they are replaced with subqueries */
|
||||
if (rangeTableEntry->relkind == RELKIND_VIEW)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (rangeTableEntry->relkind == RELKIND_MATVIEW ||
|
||||
rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
/* Materialized view or Foreign table as target is not allowed */
|
||||
if (IsMergeAllowedOnRelation(parse, rangeTableEntry))
|
||||
{
|
||||
/* Non target relation is ok */
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("MERGE command is not allowed "
|
||||
"on materialized view")));
|
||||
}
|
||||
}
|
||||
|
||||
if (rangeTableEntry->relkind != RELKIND_RELATION &&
|
||||
rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("Unexpected relation type(relkind:%c) in MERGE command",
|
||||
rangeTableEntry->relkind)));
|
||||
}
|
||||
|
||||
Assert(rangeTableEntry->relid != 0);
|
||||
|
||||
/* Distributed tables and Reference tables are not supported yet */
|
||||
if (IsCitusTableType(relationId, REFERENCE_TABLE) ||
|
||||
IsCitusTableType(relationId, DISTRIBUTED_TABLE))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("MERGE command is not supported on "
|
||||
"distributed/reference tables yet")));
|
||||
}
|
||||
|
||||
/* Regular Postgres tables and Citus local tables are allowed */
|
||||
if (!IsCitusTable(relationId) ||
|
||||
IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
/* Any other Citus table type missing ? */
|
||||
}
|
||||
|
||||
/* All the tables are local, supported */
|
||||
}
|
||||
|
|
|
@ -54,8 +54,6 @@
|
|||
bool EnableFastPathRouterPlanner = true;
|
||||
|
||||
static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey);
|
||||
static bool ConjunctionContainsColumnFilter(Node *node, Var *column,
|
||||
Node **distributionKeyValue);
|
||||
static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn,
|
||||
Node **distributionKeyValue);
|
||||
|
||||
|
@ -294,7 +292,7 @@ ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey)
|
|||
*
|
||||
* If the conjuction contains column filter which is const, distributionKeyValue is set.
|
||||
*/
|
||||
static bool
|
||||
bool
|
||||
ConjunctionContainsColumnFilter(Node *node, Var *column, Node **distributionKeyValue)
|
||||
{
|
||||
if (node == NULL)
|
||||
|
|
|
@ -2225,14 +2225,17 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
|||
}
|
||||
|
||||
/*
|
||||
* For left joins we don't care about the shards pruned for the right hand side.
|
||||
* If the right hand side would prune to a smaller set we should still send it to
|
||||
* all tables of the left hand side. However if the right hand side is bigger than
|
||||
* the left hand side we don't have to send the query to any shard that is not
|
||||
* matching anything on the left hand side.
|
||||
* For left joins we don't care about the shards pruned for
|
||||
* the right hand side. If the right hand side would prune
|
||||
* to a smaller set we should still send it to all tables
|
||||
* of the left hand side. However if the right hand side is
|
||||
* bigger than the left hand side we don't have to send the
|
||||
* query to any shard that is not matching anything on the
|
||||
* left hand side.
|
||||
*
|
||||
* Instead we will simply skip any RelationRestriction if it is an OUTER join and
|
||||
* the table is part of the non-outer side of the join.
|
||||
* Instead we will simply skip any RelationRestriction if it
|
||||
* is an OUTER join and the table is part of the non-outer
|
||||
* side of the join.
|
||||
*/
|
||||
if (IsInnerTableOfOuterJoin(relationRestriction))
|
||||
{
|
||||
|
|
|
@ -185,7 +185,6 @@ static DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelation
|
|||
List *targetList,
|
||||
CmdType commandType,
|
||||
List *returningList);
|
||||
|
||||
/*
|
||||
* CreateRouterPlan attempts to create a router executor plan for the given
|
||||
* SELECT statement. ->planningError is set if planning fails.
|
||||
|
@ -905,6 +904,85 @@ NodeIsFieldStore(Node *node)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* MergeQuerySupported does check for a MERGE command in the query, if it finds
|
||||
* one, it will verify the below criteria
|
||||
* - Supported tables and combinations in ErrorIfMergeHasUnsupportedTables
|
||||
* - Distributed tables requirements in ErrorIfDistTablesNotColocated
|
||||
* - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported
|
||||
*/
|
||||
static DeferredErrorMessage *
|
||||
MergeQuerySupported(Query *originalQuery,
|
||||
PlannerRestrictionContext *plannerRestrictionContext)
|
||||
{
|
||||
/* For non-MERGE commands it's a no-op */
|
||||
if (!QueryHasMergeCommand(originalQuery))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
List *rangeTableList = ExtractRangeTableEntryList(originalQuery);
|
||||
RangeTblEntry *resultRte = ExtractResultRelationRTE(originalQuery);
|
||||
|
||||
/*
|
||||
* Fast path queries cannot have merge command, and we prevent the remaining here.
|
||||
* In Citus we have limited support for MERGE, it's allowed only if all
|
||||
* the tables(target, source or any CTE) tables are are local i.e. a
|
||||
* combination of Citus local and Non-Citus tables (regular Postgres tables)
|
||||
* or distributed tables with some restrictions, please see header of routine
|
||||
* ErrorIfDistTablesNotColocated for details.
|
||||
*/
|
||||
DeferredErrorMessage *deferredError =
|
||||
ErrorIfMergeHasUnsupportedTables(originalQuery,
|
||||
rangeTableList,
|
||||
plannerRestrictionContext);
|
||||
if (deferredError)
|
||||
{
|
||||
return deferredError;
|
||||
}
|
||||
|
||||
Oid resultRelationId = resultRte->relid;
|
||||
deferredError =
|
||||
TargetlistAndFunctionsSupported(resultRelationId,
|
||||
originalQuery->jointree,
|
||||
originalQuery->jointree->quals,
|
||||
originalQuery->targetList,
|
||||
originalQuery->commandType,
|
||||
originalQuery->returningList);
|
||||
if (deferredError)
|
||||
{
|
||||
return deferredError;
|
||||
}
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||
|
||||
/*
|
||||
* MERGE is a special case where we have multiple modify statements
|
||||
* within itself. Check each INSERT/UPDATE/DELETE individually.
|
||||
*/
|
||||
MergeAction *action = NULL;
|
||||
foreach_ptr(action, originalQuery->mergeActionList)
|
||||
{
|
||||
Assert(originalQuery->returningList == NULL);
|
||||
deferredError =
|
||||
TargetlistAndFunctionsSupported(resultRelationId,
|
||||
originalQuery->jointree,
|
||||
action->qual,
|
||||
action->targetList,
|
||||
action->commandType,
|
||||
originalQuery->returningList);
|
||||
if (deferredError)
|
||||
{
|
||||
return deferredError;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ModifyQuerySupported returns NULL if the query only contains supported
|
||||
* features, otherwise it returns an error description.
|
||||
|
@ -920,8 +998,17 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
|
|||
PlannerRestrictionContext *plannerRestrictionContext)
|
||||
{
|
||||
Oid distributedTableId = InvalidOid;
|
||||
DeferredErrorMessage *error = ModifyPartialQuerySupported(queryTree, multiShardQuery,
|
||||
&distributedTableId);
|
||||
DeferredErrorMessage *error = MergeQuerySupported(originalQuery,
|
||||
plannerRestrictionContext);
|
||||
if (error)
|
||||
{
|
||||
/*
|
||||
* For MERGE, we do not do recursive plannning, simply bail out.
|
||||
*/
|
||||
RaiseDeferredError(error, ERROR);
|
||||
}
|
||||
|
||||
error = ModifyPartialQuerySupported(queryTree, multiShardQuery, &distributedTableId);
|
||||
if (error)
|
||||
{
|
||||
return error;
|
||||
|
@ -3969,3 +4056,263 @@ CompareInsertValuesByShardId(const void *leftElement, const void *rightElement)
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is
|
||||
* permitted on special relations, such as materialized view, returns true only if
|
||||
* it's a "source" relation.
|
||||
*/
|
||||
bool
|
||||
IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte)
|
||||
{
|
||||
if (!IsMergeQuery(parse))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
RangeTblEntry *targetRte = rt_fetch(parse->resultRelation, parse->rtable);
|
||||
|
||||
/* Is it a target relation? */
|
||||
if (targetRte->relid == rte->relid)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfDistTablesNotColocated Checks to see if
|
||||
*
|
||||
* - There are a minimum of two distributed tables (source and a target).
|
||||
* - All the distributed tables are indeed colocated.
|
||||
* - MERGE relations are joined on the distribution column
|
||||
* MERGE .. USING .. ON target.dist_key = source.dist_key
|
||||
*
|
||||
* If any of the conditions are not met, it raises an exception.
|
||||
*/
|
||||
static DeferredErrorMessage *
|
||||
ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList,
|
||||
PlannerRestrictionContext *plannerRestrictionContext)
|
||||
{
|
||||
/* All MERGE tables must be distributed */
|
||||
if (list_length(distTablesList) < 2)
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"For MERGE command, both the source and target "
|
||||
"must be distributed", NULL, NULL);
|
||||
}
|
||||
|
||||
/* All distributed tables must be colocated */
|
||||
if (!AllRelationsInListColocated(distTablesList, RANGETABLE_ENTRY))
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"For MERGE command, all the distributed tables "
|
||||
"must be colocated", NULL, NULL);
|
||||
}
|
||||
|
||||
/* Are source and target tables joined on distribution column? */
|
||||
if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext))
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"MERGE command is only supported when distributed "
|
||||
"tables are joined on their distribution column",
|
||||
NULL, NULL);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE
|
||||
* present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus
|
||||
* tables (regular Postgres tables), or distributed tables with some restrictions, please
|
||||
* see header of routine ErrorIfDistTablesNotColocated for details, raises an exception
|
||||
* for all other combinations.
|
||||
*/
|
||||
static DeferredErrorMessage *
|
||||
ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList,
|
||||
PlannerRestrictionContext *restrictionContext)
|
||||
{
|
||||
List *distTablesList = NIL;
|
||||
bool foundLocalTables = false;
|
||||
|
||||
RangeTblEntry *rangeTableEntry = NULL;
|
||||
foreach_ptr(rangeTableEntry, rangeTableList)
|
||||
{
|
||||
Oid relationId = rangeTableEntry->relid;
|
||||
|
||||
switch (rangeTableEntry->rtekind)
|
||||
{
|
||||
case RTE_RELATION:
|
||||
{
|
||||
/* Check the relation type */
|
||||
break;
|
||||
}
|
||||
|
||||
case RTE_SUBQUERY:
|
||||
case RTE_FUNCTION:
|
||||
case RTE_TABLEFUNC:
|
||||
case RTE_VALUES:
|
||||
case RTE_JOIN:
|
||||
case RTE_CTE:
|
||||
{
|
||||
/* Skip them as base table(s) will be checked */
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* RTE_NAMEDTUPLESTORE is typically used in ephmeral named relations,
|
||||
* such as, trigger data; until we find a genuine use case, raise an
|
||||
* exception.
|
||||
* RTE_RESULT is a node added by the planner and we shouldn't
|
||||
* encounter it in the parse tree.
|
||||
*/
|
||||
case RTE_NAMEDTUPLESTORE:
|
||||
case RTE_RESULT:
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"MERGE command is not supported with "
|
||||
"Tuplestores and results",
|
||||
NULL, NULL);
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"MERGE command: Unrecognized range table entry.",
|
||||
NULL, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
/* RTE Relation can be of various types, check them now */
|
||||
|
||||
/* skip the regular views as they are replaced with subqueries */
|
||||
if (rangeTableEntry->relkind == RELKIND_VIEW)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (rangeTableEntry->relkind == RELKIND_MATVIEW ||
|
||||
rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
/* Materialized view or Foreign table as target is not allowed */
|
||||
if (IsMergeAllowedOnRelation(parse, rangeTableEntry))
|
||||
{
|
||||
/* Non target relation is ok */
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Usually we don't reach this exception as the Postgres parser catches it */
|
||||
StringInfo errorMessage = makeStringInfo();
|
||||
appendStringInfo(errorMessage,
|
||||
"MERGE command is not allowed on "
|
||||
"relation type(relkind:%c)", rangeTableEntry->relkind);
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data,
|
||||
NULL, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
if (rangeTableEntry->relkind != RELKIND_RELATION &&
|
||||
rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE)
|
||||
{
|
||||
StringInfo errorMessage = makeStringInfo();
|
||||
appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) "
|
||||
"in MERGE command", rangeTableEntry->relkind);
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data,
|
||||
NULL, NULL);
|
||||
}
|
||||
|
||||
Assert(rangeTableEntry->relid != 0);
|
||||
|
||||
/* Reference tables are not supported yet */
|
||||
if (IsCitusTableType(relationId, REFERENCE_TABLE))
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"MERGE command is not supported on reference "
|
||||
"tables yet", NULL, NULL);
|
||||
}
|
||||
|
||||
/* Append/Range tables are not supported */
|
||||
if (IsCitusTableType(relationId, APPEND_DISTRIBUTED) ||
|
||||
IsCitusTableType(relationId, RANGE_DISTRIBUTED))
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"For MERGE command, all the distributed tables "
|
||||
"must be colocated, for append/range distribution, "
|
||||
"colocation is not supported", NULL,
|
||||
"Consider using hash distribution instead");
|
||||
}
|
||||
|
||||
/*
|
||||
* For now, save all distributed tables, later (below) we will
|
||||
* check for supported combination(s).
|
||||
*/
|
||||
if (IsCitusTableType(relationId, DISTRIBUTED_TABLE))
|
||||
{
|
||||
distTablesList = lappend(distTablesList, rangeTableEntry);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Regular Postgres tables and Citus local tables are allowed */
|
||||
if (!IsCitusTable(relationId) ||
|
||||
IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
|
||||
{
|
||||
foundLocalTables = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Any other Citus table type missing ? */
|
||||
}
|
||||
|
||||
/* Ensure all tables are indeed local */
|
||||
if (foundLocalTables && list_length(distTablesList) == 0)
|
||||
{
|
||||
/* All the tables are local, supported */
|
||||
return NULL;
|
||||
}
|
||||
else if (foundLocalTables && list_length(distTablesList) > 0)
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"MERGE command is not supported with "
|
||||
"combination of distributed/local tables yet",
|
||||
NULL, NULL);
|
||||
}
|
||||
|
||||
/* Ensure all distributed tables are indeed co-located */
|
||||
return ErrorIfDistTablesNotColocated(parse, distTablesList, restrictionContext);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* QueryHasMergeCommand walks over the query tree and returns false if there
|
||||
* is no Merge command (e.g., CMD_MERGE), true otherwise.
|
||||
*/
|
||||
static bool
|
||||
QueryHasMergeCommand(Query *queryTree)
|
||||
{
|
||||
/* function is void for pre-15 versions of Postgres */
|
||||
#if PG_VERSION_NUM < PG_VERSION_15
|
||||
return false;
|
||||
#else
|
||||
|
||||
/*
|
||||
* Postgres currently doesn't support Merge queries inside subqueries and
|
||||
* ctes, but lets be defensive and do query tree walk anyway.
|
||||
*
|
||||
* We do not call this path for fast-path queries to avoid this additional
|
||||
* overhead.
|
||||
*/
|
||||
if (!ContainsMergeCommandWalker((Node *) queryTree))
|
||||
{
|
||||
/* No MERGE found */
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
#endif
|
||||
}
|
||||
|
|
|
@ -151,8 +151,6 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass
|
|||
secondClass);
|
||||
static Var * PartitionKeyForRTEIdentityInQuery(Query *query, int targetRTEIndex,
|
||||
Index *partitionKeyIndex);
|
||||
static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext *
|
||||
restrictionContext);
|
||||
static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node);
|
||||
static JoinRestrictionContext * FilterJoinRestrictionContext(
|
||||
JoinRestrictionContext *joinRestrictionContext, Relids
|
||||
|
@ -383,7 +381,8 @@ SafeToPushdownUnionSubquery(Query *originalQuery,
|
|||
return false;
|
||||
}
|
||||
|
||||
if (!AllRelationsInRestrictionContextColocated(restrictionContext))
|
||||
if (!AllRelationsInListColocated(restrictionContext->relationRestrictionList,
|
||||
RESTRICTION_CONTEXT))
|
||||
{
|
||||
/* distribution columns are equal, but tables are not co-located */
|
||||
return false;
|
||||
|
@ -1919,19 +1918,33 @@ FindQueryContainingRTEIdentityInternal(Node *node,
|
|||
|
||||
|
||||
/*
|
||||
* AllRelationsInRestrictionContextColocated determines whether all of the relations in the
|
||||
* given relation restrictions list are co-located.
|
||||
* AllRelationsInListColocated determines whether all of the relations in the
|
||||
* given list are co-located.
|
||||
* Note: The list can be of dofferent types, which is specified by ListEntryType
|
||||
*/
|
||||
static bool
|
||||
AllRelationsInRestrictionContextColocated(RelationRestrictionContext *restrictionContext)
|
||||
bool
|
||||
AllRelationsInListColocated(List *relationList, ListEntryType entryType)
|
||||
{
|
||||
void *varPtr = NULL;
|
||||
RangeTblEntry *rangeTableEntry = NULL;
|
||||
RelationRestriction *relationRestriction = NULL;
|
||||
int initialColocationId = INVALID_COLOCATION_ID;
|
||||
|
||||
/* check whether all relations exists in the main restriction list */
|
||||
foreach_ptr(relationRestriction, restrictionContext->relationRestrictionList)
|
||||
foreach_ptr(varPtr, relationList)
|
||||
{
|
||||
Oid relationId = relationRestriction->relationId;
|
||||
Oid relationId = InvalidOid;
|
||||
|
||||
if (entryType == RANGETABLE_ENTRY)
|
||||
{
|
||||
rangeTableEntry = (RangeTblEntry *) varPtr;
|
||||
relationId = rangeTableEntry->relid;
|
||||
}
|
||||
else if (entryType == RESTRICTION_CONTEXT)
|
||||
{
|
||||
relationRestriction = (RelationRestriction *) varPtr;
|
||||
relationId = relationRestriction->relationId;
|
||||
}
|
||||
|
||||
if (IsCitusTable(relationId) && !HasDistributionKey(relationId))
|
||||
{
|
||||
|
|
|
@ -256,5 +256,9 @@ extern struct DistributedPlan * CreateDistributedPlan(uint64 planId,
|
|||
plannerRestrictionContext);
|
||||
|
||||
extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte);
|
||||
extern bool ConjunctionContainsColumnFilter(Node *node,
|
||||
Var *column,
|
||||
Node **distributionKeyValue);
|
||||
extern bool ContainsMergeCommandWalker(Node *node);
|
||||
|
||||
#endif /* DISTRIBUTED_PLANNER_H */
|
||||
|
|
|
@ -17,6 +17,15 @@
|
|||
|
||||
#define SINGLE_RTE_INDEX 1
|
||||
|
||||
/*
|
||||
* Represents the pointer type that's being passed in the list.
|
||||
*/
|
||||
typedef enum ListEntryType
|
||||
{
|
||||
RANGETABLE_ENTRY, /* RangeTblEntry */
|
||||
RESTRICTION_CONTEXT /* RelationRestriction */
|
||||
} ListEntryType;
|
||||
|
||||
extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery,
|
||||
PlannerRestrictionContext *
|
||||
plannerRestrictionContext);
|
||||
|
@ -54,4 +63,6 @@ extern RelationRestrictionContext * FilterRelationRestrictionContext(
|
|||
RelationRestrictionContext *relationRestrictionContext,
|
||||
Relids
|
||||
queryRteIdentities);
|
||||
extern bool AllRelationsInListColocated(List *relationList, ListEntryType entryType);
|
||||
|
||||
#endif /* RELATION_RESTRICTION_EQUIVALENCE_H */
|
||||
|
|
|
@ -28,6 +28,10 @@ s/\(ref_id\)=\([0-9]+\)/(ref_id)=(X)/g
|
|||
# shard table names for multi_subtransactions
|
||||
s/"t2_[0-9]+"/"t2_xxxxxxx"/g
|
||||
|
||||
# shard table names for MERGE tests
|
||||
s/merge_schema\.([_a-z0-9]+)_40[0-9]+ /merge_schema.\1_xxxxxxx /g
|
||||
s/pgmerge_schema\.([_a-z0-9]+)_40[0-9]+ /pgmerge_schema.\1_xxxxxxx /g
|
||||
|
||||
# shard table names for multi_subquery
|
||||
s/ keyval(1|2|ref)_[0-9]+ / keyval\1_xxxxxxx /g
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -315,7 +315,7 @@ SELECT create_reference_table('tbl2');
|
|||
|
||||
MERGE INTO tbl1 USING tbl2 ON (true)
|
||||
WHEN MATCHED THEN DELETE;
|
||||
ERROR: MERGE command is not supported on distributed/reference tables yet
|
||||
ERROR: MERGE command is not supported on reference tables yet
|
||||
-- now, both are reference, still not supported
|
||||
SELECT create_reference_table('tbl1');
|
||||
create_reference_table
|
||||
|
@ -325,7 +325,7 @@ SELECT create_reference_table('tbl1');
|
|||
|
||||
MERGE INTO tbl1 USING tbl2 ON (true)
|
||||
WHEN MATCHED THEN DELETE;
|
||||
ERROR: MERGE command is not supported on distributed/reference tables yet
|
||||
ERROR: MERGE command is not supported on reference tables yet
|
||||
-- now, both distributed, not works
|
||||
SELECT undistribute_table('tbl1');
|
||||
NOTICE: creating a new table for pg15.tbl1
|
||||
|
@ -419,14 +419,14 @@ SELECT create_distributed_table('tbl2', 'x');
|
|||
|
||||
MERGE INTO tbl1 USING tbl2 ON (true)
|
||||
WHEN MATCHED THEN DELETE;
|
||||
ERROR: MERGE command is not supported on distributed/reference tables yet
|
||||
ERROR: MERGE command is only supported when distributed tables are joined on their distribution column
|
||||
-- also, not inside subqueries & ctes
|
||||
WITH targq AS (
|
||||
SELECT * FROM tbl2
|
||||
)
|
||||
MERGE INTO tbl1 USING targq ON (true)
|
||||
WHEN MATCHED THEN DELETE;
|
||||
ERROR: MERGE command is not supported on distributed/reference tables yet
|
||||
ERROR: MERGE command is only supported when distributed tables are joined on their distribution column
|
||||
-- crashes on beta3, fixed on 15 stable
|
||||
--WITH foo AS (
|
||||
-- MERGE INTO tbl1 USING tbl2 ON (true)
|
||||
|
@ -441,7 +441,7 @@ USING tbl2
|
|||
ON (true)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET x = (SELECT count(*) FROM tbl2);
|
||||
ERROR: MERGE command is not supported on distributed/reference tables yet
|
||||
ERROR: MERGE command is only supported when distributed tables are joined on their distribution column
|
||||
-- test numeric types with negative scale
|
||||
CREATE TABLE numeric_negative_scale(numeric_column numeric(3,-1), orig_value int);
|
||||
INSERT into numeric_negative_scale SELECT x,x FROM generate_series(111, 115) x;
|
||||
|
|
|
@ -910,7 +910,15 @@ MERGE INTO wq_target t
|
|||
USING wq_source s ON t.tid = s.sid
|
||||
WHEN MATCHED AND (merge_when_and_write()) THEN
|
||||
UPDATE SET balance = t.balance + s.balance;
|
||||
ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE
|
||||
ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE
|
||||
ROLLBACK;
|
||||
-- Test preventing ON condition from writing to the database
|
||||
BEGIN;
|
||||
MERGE INTO wq_target t
|
||||
USING wq_source s ON t.tid = s.sid AND (merge_when_and_write())
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = t.balance + s.balance;
|
||||
ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE
|
||||
ROLLBACK;
|
||||
drop function merge_when_and_write();
|
||||
DROP TABLE wq_target, wq_source;
|
||||
|
@ -1891,7 +1899,7 @@ MERGE INTO pa_target t
|
|||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (slogts::timestamp, sid, delta, 'inserted by merge');
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO pgmerge_schema.pa_target t USING (SELECT '2017-01-15'::text AS slogts, pa_source.sid, pa_source.delta FROM pgmerge_schema.pa_source_4001021 pa_source WHERE (pa_source.sid OPERATOR(pg_catalog.<) 10)) s ON (t.tid OPERATOR(pg_catalog.=) s.sid) WHEN MATCHED THEN UPDATE SET balance = (t.balance OPERATOR(pg_catalog.+) s.delta), val = (t.val OPERATOR(pg_catalog.||) ' updated by merge'::text) WHEN NOT MATCHED THEN INSERT (logts, tid, balance, val) VALUES ((s.slogts)::timestamp without time zone, s.sid, s.delta, 'inserted by merge'::text)>
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO pgmerge_schema.pa_target t USING (SELECT '2017-01-15'::text AS slogts, pa_source.sid, pa_source.delta FROM pgmerge_schema.pa_source_xxxxxxx pa_source WHERE (pa_source.sid OPERATOR(pg_catalog.<) 10)) s ON (t.tid OPERATOR(pg_catalog.=) s.sid) WHEN MATCHED THEN UPDATE SET balance = (t.balance OPERATOR(pg_catalog.+) s.delta), val = (t.val OPERATOR(pg_catalog.||) ' updated by merge'::text) WHEN NOT MATCHED THEN INSERT (logts, tid, balance, val) VALUES ((s.slogts)::timestamp without time zone, s.sid, s.delta, 'inserted by merge'::text)>
|
||||
SELECT * FROM pa_target ORDER BY tid;
|
||||
logts | tid | balance | val
|
||||
---------------------------------------------------------------------
|
||||
|
@ -2083,7 +2091,7 @@ WHEN MATCHED THEN UPDATE
|
|||
WHEN NOT MATCHED THEN INSERT
|
||||
(city_id, logdate, peaktemp, unitsales)
|
||||
VALUES (city_id, logdate, peaktemp, unitsales);
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO pgmerge_schema.measurement m USING pgmerge_schema.new_measurement_4001026 nm ON ((m.city_id OPERATOR(pg_catalog.=) nm.city_id) AND (m.logdate OPERATOR(pg_catalog.=) nm.logdate)) WHEN MATCHED AND (nm.peaktemp IS NULL) THEN DELETE WHEN MATCHED THEN UPDATE SET peaktemp = GREATEST(m.peaktemp, nm.peaktemp), unitsales = (m.unitsales OPERATOR(pg_catalog.+) COALESCE(nm.unitsales, 0)) WHEN NOT MATCHED THEN INSERT (city_id, logdate, peaktemp, unitsales) VALUES (nm.city_id, nm.logdate, nm.peaktemp, nm.unitsales)>
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO pgmerge_schema.measurement m USING pgmerge_schema.new_measurement_xxxxxxx nm ON ((m.city_id OPERATOR(pg_catalog.=) nm.city_id) AND (m.logdate OPERATOR(pg_catalog.=) nm.logdate)) WHEN MATCHED AND (nm.peaktemp IS NULL) THEN DELETE WHEN MATCHED THEN UPDATE SET peaktemp = GREATEST(m.peaktemp, nm.peaktemp), unitsales = (m.unitsales OPERATOR(pg_catalog.+) COALESCE(nm.unitsales, 0)) WHEN NOT MATCHED THEN INSERT (city_id, logdate, peaktemp, unitsales) VALUES (nm.city_id, nm.logdate, nm.peaktemp, nm.unitsales)>
|
||||
RESET client_min_messages;
|
||||
SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate;
|
||||
tableoid | city_id | logdate | peaktemp | unitsales
|
||||
|
|
|
@ -19,6 +19,7 @@ SET search_path TO merge_schema;
|
|||
SET citus.shard_count TO 4;
|
||||
SET citus.next_shard_id TO 4000000;
|
||||
SET citus.explain_all_tasks to true;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
|
||||
|
||||
CREATE TABLE source
|
||||
|
@ -143,9 +144,33 @@ SELECT undistribute_table('source');
|
|||
SELECT create_distributed_table('target', 'customer_id');
|
||||
SELECT create_distributed_table('source', 'customer_id');
|
||||
|
||||
-- Updates one of the row with customer_id = 30002
|
||||
SELECT * from target t WHERE t.customer_id = 30002;
|
||||
-- Turn on notice to print tasks sent to nodes
|
||||
SET citus.log_remote_commands to true;
|
||||
MERGE INTO target t
|
||||
USING source s
|
||||
ON (t.customer_id = s.customer_id)
|
||||
ON (t.customer_id = s.customer_id) AND t.customer_id = 30002
|
||||
|
||||
WHEN MATCHED AND t.order_center = 'XX' THEN
|
||||
DELETE
|
||||
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET -- Existing customer, update the order count and last_order_id
|
||||
order_count = t.order_count + 1,
|
||||
last_order_id = s.order_id
|
||||
|
||||
WHEN NOT MATCHED THEN
|
||||
DO NOTHING;
|
||||
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT * from target t WHERE t.customer_id = 30002;
|
||||
|
||||
-- Deletes one of the row with customer_id = 30004
|
||||
SELECT * from target t WHERE t.customer_id = 30004;
|
||||
MERGE INTO target t
|
||||
USING source s
|
||||
ON (t.customer_id = s.customer_id) AND t.customer_id = 30004
|
||||
|
||||
WHEN MATCHED AND t.order_center = 'XX' THEN
|
||||
DELETE
|
||||
|
@ -158,6 +183,7 @@ MERGE INTO target t
|
|||
WHEN NOT MATCHED THEN -- New entry, record it.
|
||||
INSERT (customer_id, last_order_id, order_center, order_count, last_order)
|
||||
VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time);
|
||||
SELECT * from target t WHERE t.customer_id = 30004;
|
||||
|
||||
--
|
||||
-- Test MERGE with CTE as source
|
||||
|
@ -243,11 +269,13 @@ SELECT create_distributed_table('t1', 'id');
|
|||
SELECT create_distributed_table('s1', 'id');
|
||||
|
||||
|
||||
SELECT * FROM t1 order by id;
|
||||
SET citus.log_remote_commands to true;
|
||||
WITH s1_res AS (
|
||||
SELECT * FROM s1
|
||||
)
|
||||
MERGE INTO t1
|
||||
USING s1_res ON (s1_res.id = t1.id)
|
||||
USING s1_res ON (s1_res.id = t1.id) AND t1.id = 6
|
||||
|
||||
WHEN MATCHED AND s1_res.val = 0 THEN
|
||||
DELETE
|
||||
|
@ -255,6 +283,9 @@ MERGE INTO t1
|
|||
UPDATE SET val = t1.val + 1
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, val) VALUES (s1_res.id, s1_res.val);
|
||||
SET citus.log_remote_commands to false;
|
||||
-- Other than id 6 everything else is a NO match, and should appear in target
|
||||
SELECT * FROM t1 order by 1, 2;
|
||||
|
||||
--
|
||||
-- Test with multiple join conditions
|
||||
|
@ -325,15 +356,21 @@ SELECT undistribute_table('s2');
|
|||
SELECT create_distributed_table('t2', 'id');
|
||||
SELECT create_distributed_table('s2', 'id');
|
||||
|
||||
SELECT * FROM t2 ORDER BY 1;
|
||||
SET citus.log_remote_commands to true;
|
||||
MERGE INTO t2
|
||||
USING s2
|
||||
ON t2.id = s2.id AND t2.src = s2.src
|
||||
ON t2.id = s2.id AND t2.src = s2.src AND t2.id = 4
|
||||
WHEN MATCHED AND t2.val = 1 THEN
|
||||
UPDATE SET val = s2.val + 10
|
||||
WHEN MATCHED THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src);
|
||||
DO NOTHING;
|
||||
SET citus.log_remote_commands to false;
|
||||
-- Row with id = 4 is a match for delete clause, row should be deleted
|
||||
-- Row with id = 3 is a NO match, row from source will be inserted
|
||||
SELECT * FROM t2 ORDER BY 1;
|
||||
|
||||
--
|
||||
-- With sub-query as the MERGE source
|
||||
|
@ -824,10 +861,577 @@ RESET client_min_messages;
|
|||
|
||||
SELECT * FROM ft_target;
|
||||
|
||||
--
|
||||
-- complex joins on the source side
|
||||
--
|
||||
|
||||
-- source(join of two relations) relation is an unaliased join
|
||||
|
||||
CREATE TABLE target_cj(tid int, src text, val int);
|
||||
CREATE TABLE source_cj1(sid1 int, src1 text, val1 int);
|
||||
CREATE TABLE source_cj2(sid2 int, src2 text, val2 int);
|
||||
|
||||
INSERT INTO target_cj VALUES (1, 'target', 0);
|
||||
INSERT INTO target_cj VALUES (2, 'target', 0);
|
||||
INSERT INTO target_cj VALUES (2, 'target', 0);
|
||||
INSERT INTO target_cj VALUES (3, 'target', 0);
|
||||
|
||||
INSERT INTO source_cj1 VALUES (2, 'source-1', 10);
|
||||
INSERT INTO source_cj2 VALUES (2, 'source-2', 20);
|
||||
|
||||
BEGIN;
|
||||
MERGE INTO target_cj t
|
||||
USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2
|
||||
ON t.tid = sid1 AND t.tid = 2
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET src = src2
|
||||
WHEN NOT MATCHED THEN
|
||||
DO NOTHING;
|
||||
-- Gold result to compare against
|
||||
SELECT * FROM target_cj ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
-- try accessing columns from either side of the source join
|
||||
MERGE INTO target_cj t
|
||||
USING source_cj1 s2
|
||||
INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10
|
||||
ON t.tid = sid1 AND t.tid = 2
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET tid = sid2, src = src1, val = val2
|
||||
WHEN NOT MATCHED THEN
|
||||
DO NOTHING;
|
||||
-- Gold result to compare against
|
||||
SELECT * FROM target_cj ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
-- Test the same scenarios with distributed tables
|
||||
|
||||
SELECT create_distributed_table('target_cj', 'tid');
|
||||
SELECT create_distributed_table('source_cj1', 'sid1');
|
||||
SELECT create_distributed_table('source_cj2', 'sid2');
|
||||
|
||||
BEGIN;
|
||||
SET citus.log_remote_commands to true;
|
||||
MERGE INTO target_cj t
|
||||
USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2
|
||||
ON t.tid = sid1 AND t.tid = 2
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET src = src2
|
||||
WHEN NOT MATCHED THEN
|
||||
DO NOTHING;
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT * FROM target_cj ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
-- try accessing columns from either side of the source join
|
||||
MERGE INTO target_cj t
|
||||
USING source_cj1 s2
|
||||
INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10
|
||||
ON t.tid = sid1 AND t.tid = 2
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET src = src1, val = val2
|
||||
WHEN NOT MATCHED THEN
|
||||
DO NOTHING;
|
||||
SELECT * FROM target_cj ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
-- sub-query as a source
|
||||
BEGIN;
|
||||
MERGE INTO target_cj t
|
||||
USING (SELECT * FROM source_cj1 WHERE sid1 = 2) sub
|
||||
ON t.tid = sub.sid1 AND t.tid = 2
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET src = sub.src1, val = val1
|
||||
WHEN NOT MATCHED THEN
|
||||
DO NOTHING;
|
||||
SELECT * FROM target_cj ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
-- Test self-join
|
||||
BEGIN;
|
||||
SELECT * FROM target_cj ORDER BY 1;
|
||||
set citus.log_remote_commands to true;
|
||||
MERGE INTO target_cj t1
|
||||
USING (SELECT * FROM target_cj) sub
|
||||
ON t1.tid = sub.tid AND t1.tid = 3
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET src = sub.src, val = sub.val + 100
|
||||
WHEN NOT MATCHED THEN
|
||||
DO NOTHING;
|
||||
set citus.log_remote_commands to false;
|
||||
SELECT * FROM target_cj ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
|
||||
-- Test PREPARE
|
||||
PREPARE foo(int) AS
|
||||
MERGE INTO target_cj target
|
||||
USING (SELECT * FROM source_cj1) sub
|
||||
ON target.tid = sub.sid1 AND target.tid = $1
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET val = sub.val1
|
||||
WHEN NOT MATCHED THEN
|
||||
DO NOTHING;
|
||||
|
||||
SELECT * FROM target_cj ORDER BY 1;
|
||||
|
||||
BEGIN;
|
||||
EXECUTE foo(2);
|
||||
EXECUTE foo(2);
|
||||
EXECUTE foo(2);
|
||||
EXECUTE foo(2);
|
||||
EXECUTE foo(2);
|
||||
SELECT * FROM target_cj ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
|
||||
SET citus.log_remote_commands to true;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
EXECUTE foo(2);
|
||||
RESET client_min_messages;
|
||||
|
||||
EXECUTE foo(2);
|
||||
SET citus.log_remote_commands to false;
|
||||
|
||||
SELECT * FROM target_cj ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
-- Test distributed tables, must be co-located and joined on distribution column.
|
||||
|
||||
--
|
||||
-- We create two sets of source and target tables, one set is Postgres and the other
|
||||
-- is Citus distributed. Run the _exact_ MERGE SQL on both the sets and compare the
|
||||
-- final results of target tables of Postgres and Citus, the result should match.
|
||||
-- This is repeated for various MERGE SQL combinations
|
||||
--
|
||||
CREATE TABLE pg_target(id int, val varchar);
|
||||
CREATE TABLE pg_source(id int, val varchar);
|
||||
CREATE TABLE citus_target(id int, val varchar);
|
||||
CREATE TABLE citus_source(id int, val varchar);
|
||||
|
||||
-- Half of the source rows do not match
|
||||
INSERT INTO pg_target SELECT i, 'target' FROM generate_series(250, 500) i;
|
||||
INSERT INTO pg_source SELECT i, 'source' FROM generate_series(1, 500) i;
|
||||
|
||||
INSERT INTO citus_target SELECT i, 'target' FROM generate_series(250, 500) i;
|
||||
INSERT INTO citus_source SELECT i, 'source' FROM generate_series(1, 500) i;
|
||||
|
||||
SELECT create_distributed_table('citus_target', 'id');
|
||||
SELECT create_distributed_table('citus_source', 'id');
|
||||
|
||||
--
|
||||
-- This routine compares the target tables of Postgres and Citus and
|
||||
-- returns true if they match, false if the results do not match.
|
||||
--
|
||||
CREATE OR REPLACE FUNCTION compare_tables() RETURNS BOOLEAN AS $$
|
||||
DECLARE ret BOOL;
|
||||
BEGIN
|
||||
SELECT count(1) = 0 INTO ret
|
||||
FROM pg_target
|
||||
FULL OUTER JOIN citus_target
|
||||
USING (id, val)
|
||||
WHERE pg_target.id IS NULL
|
||||
OR citus_target.id IS NULL;
|
||||
RETURN ret;
|
||||
END
|
||||
$$ LANGUAGE PLPGSQL;
|
||||
|
||||
-- Make sure we start with exact data in Postgres and Citus
|
||||
SELECT compare_tables();
|
||||
|
||||
-- Run the MERGE on both Postgres and Citus, and compare the final target tables
|
||||
|
||||
BEGIN;
|
||||
SET citus.log_remote_commands to true;
|
||||
|
||||
MERGE INTO pg_target t
|
||||
USING pg_source s
|
||||
ON t.id = s.id
|
||||
WHEN MATCHED AND t.id > 400 THEN
|
||||
UPDATE SET val = t.val || 'Updated by Merge'
|
||||
WHEN MATCHED THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
|
||||
MERGE INTO citus_target t
|
||||
USING citus_source s
|
||||
ON t.id = s.id
|
||||
WHEN MATCHED AND t.id > 400 THEN
|
||||
UPDATE SET val = t.val || 'Updated by Merge'
|
||||
WHEN MATCHED THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
ROLLBACK;
|
||||
|
||||
--
|
||||
-- ON clause filter on source
|
||||
--
|
||||
BEGIN;
|
||||
SET citus.log_remote_commands to true;
|
||||
|
||||
MERGE INTO pg_target t
|
||||
USING pg_source s
|
||||
ON t.id = s.id AND s.id < 100
|
||||
WHEN MATCHED AND t.id > 400 THEN
|
||||
UPDATE SET val = t.val || 'Updated by Merge'
|
||||
WHEN MATCHED THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
|
||||
MERGE INTO citus_target t
|
||||
USING citus_source s
|
||||
ON t.id = s.id AND s.id < 100
|
||||
WHEN MATCHED AND t.id > 400 THEN
|
||||
UPDATE SET val = t.val || 'Updated by Merge'
|
||||
WHEN MATCHED THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
ROLLBACK;
|
||||
|
||||
--
|
||||
-- ON clause filter on target
|
||||
--
|
||||
BEGIN;
|
||||
SET citus.log_remote_commands to true;
|
||||
|
||||
MERGE INTO pg_target t
|
||||
USING pg_source s
|
||||
ON t.id = s.id AND t.id < 100
|
||||
WHEN MATCHED AND t.id > 400 THEN
|
||||
UPDATE SET val = t.val || 'Updated by Merge'
|
||||
WHEN MATCHED THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
|
||||
MERGE INTO citus_target t
|
||||
USING citus_source s
|
||||
ON t.id = s.id AND t.id < 100
|
||||
WHEN MATCHED AND t.id > 400 THEN
|
||||
UPDATE SET val = t.val || 'Updated by Merge'
|
||||
WHEN MATCHED THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
ROLLBACK;
|
||||
|
||||
--
|
||||
-- NOT MATCHED clause filter on source
|
||||
--
|
||||
BEGIN;
|
||||
SET citus.log_remote_commands to true;
|
||||
|
||||
MERGE INTO pg_target t
|
||||
USING pg_source s
|
||||
ON t.id = s.id
|
||||
WHEN MATCHED THEN
|
||||
DO NOTHING
|
||||
WHEN NOT MATCHED AND s.id < 100 THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
|
||||
MERGE INTO citus_target t
|
||||
USING citus_source s
|
||||
ON t.id = s.id
|
||||
WHEN MATCHED THEN
|
||||
DO NOTHING
|
||||
WHEN NOT MATCHED AND s.id < 100 THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
ROLLBACK;
|
||||
|
||||
--
|
||||
-- Test constant filter in ON clause to check if shards are pruned
|
||||
-- with restriction information
|
||||
--
|
||||
|
||||
--
|
||||
-- Though constant filter is present, this won't prune shards as
|
||||
-- NOT MATCHED clause is present
|
||||
--
|
||||
BEGIN;
|
||||
SET citus.log_remote_commands to true;
|
||||
|
||||
MERGE INTO pg_target t
|
||||
USING pg_source s
|
||||
ON t.id = s.id AND s.id = 250
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET val = t.val || 'Updated by Merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
|
||||
MERGE INTO citus_target t
|
||||
USING citus_source s
|
||||
ON t.id = s.id AND s.id = 250
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET val = t.val || 'Updated by Merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
ROLLBACK;
|
||||
|
||||
-- This will prune shards with restriction information as NOT MATCHED is void
|
||||
BEGIN;
|
||||
SET citus.log_remote_commands to true;
|
||||
|
||||
MERGE INTO pg_target t
|
||||
USING pg_source s
|
||||
ON t.id = s.id AND s.id = 250
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET val = t.val || 'Updated by Merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
DO NOTHING;
|
||||
|
||||
MERGE INTO citus_target t
|
||||
USING citus_source s
|
||||
ON t.id = s.id AND s.id = 250
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET val = t.val || 'Updated by Merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
DO NOTHING;
|
||||
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
ROLLBACK;
|
||||
|
||||
-- Test CTE with distributed tables
|
||||
CREATE VIEW pg_source_view AS SELECT * FROM pg_source WHERE id < 400;
|
||||
CREATE VIEW citus_source_view AS SELECT * FROM citus_source WHERE id < 400;
|
||||
|
||||
BEGIN;
|
||||
SEt citus.log_remote_commands to true;
|
||||
|
||||
WITH cte AS (
|
||||
SELECT * FROM pg_source_view
|
||||
)
|
||||
MERGE INTO pg_target t
|
||||
USING cte
|
||||
ON cte.id = t.id
|
||||
WHEN MATCHED AND t.id > 350 THEN
|
||||
UPDATE SET val = t.val || 'Updated by CTE'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (cte.id, cte.val)
|
||||
WHEN MATCHED AND t.id < 350 THEN
|
||||
DELETE;
|
||||
|
||||
WITH cte AS (
|
||||
SELECT * FROM citus_source_view
|
||||
)
|
||||
MERGE INTO citus_target t
|
||||
USING cte
|
||||
ON cte.id = t.id
|
||||
WHEN MATCHED AND t.id > 350 THEN
|
||||
UPDATE SET val = t.val || 'Updated by CTE'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (cte.id, cte.val)
|
||||
WHEN MATCHED AND t.id < 350 THEN
|
||||
DELETE;
|
||||
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
ROLLBACK;
|
||||
|
||||
|
||||
-- Test sub-query with distributed tables
|
||||
BEGIN;
|
||||
SEt citus.log_remote_commands to true;
|
||||
|
||||
MERGE INTO pg_target t
|
||||
USING (SELECT * FROM pg_source) subq
|
||||
ON subq.id = t.id
|
||||
WHEN MATCHED AND t.id > 350 THEN
|
||||
UPDATE SET val = t.val || 'Updated by subquery'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (subq.id, subq.val)
|
||||
WHEN MATCHED AND t.id < 350 THEN
|
||||
DELETE;
|
||||
|
||||
MERGE INTO citus_target t
|
||||
USING (SELECT * FROM citus_source) subq
|
||||
ON subq.id = t.id
|
||||
WHEN MATCHED AND t.id > 350 THEN
|
||||
UPDATE SET val = t.val || 'Updated by subquery'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (subq.id, subq.val)
|
||||
WHEN MATCHED AND t.id < 350 THEN
|
||||
DELETE;
|
||||
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
ROLLBACK;
|
||||
|
||||
-- Test PREPARE
|
||||
PREPARE pg_prep(int) AS
|
||||
MERGE INTO pg_target
|
||||
USING (SELECT * FROM pg_source) sub
|
||||
ON pg_target.id = sub.id AND pg_target.id = $1
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET val = 'Updated by prepare using ' || sub.val
|
||||
WHEN NOT MATCHED THEN
|
||||
DO NOTHING;
|
||||
|
||||
PREPARE citus_prep(int) AS
|
||||
MERGE INTO citus_target
|
||||
USING (SELECT * FROM citus_source) sub
|
||||
ON citus_target.id = sub.id AND citus_target.id = $1
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET val = 'Updated by prepare using ' || sub.val
|
||||
WHEN NOT MATCHED THEN
|
||||
DO NOTHING;
|
||||
|
||||
BEGIN;
|
||||
SET citus.log_remote_commands to true;
|
||||
|
||||
SELECT * FROM pg_target WHERE id = 500; -- before merge
|
||||
EXECUTE pg_prep(500);
|
||||
SELECT * FROM pg_target WHERE id = 500; -- non-cached
|
||||
EXECUTE pg_prep(500);
|
||||
EXECUTE pg_prep(500);
|
||||
EXECUTE pg_prep(500);
|
||||
EXECUTE pg_prep(500);
|
||||
EXECUTE pg_prep(500);
|
||||
SELECT * FROM pg_target WHERE id = 500; -- cached
|
||||
|
||||
SELECT * FROM citus_target WHERE id = 500; -- before merge
|
||||
EXECUTE citus_prep(500);
|
||||
SELECT * FROM citus_target WHERE id = 500; -- non-cached
|
||||
EXECUTE citus_prep(500);
|
||||
EXECUTE citus_prep(500);
|
||||
EXECUTE citus_prep(500);
|
||||
EXECUTE citus_prep(500);
|
||||
EXECUTE citus_prep(500);
|
||||
SELECT * FROM citus_target WHERE id = 500; -- cached
|
||||
|
||||
SET citus.log_remote_commands to false;
|
||||
SELECT compare_tables();
|
||||
ROLLBACK;
|
||||
|
||||
-- Test partitions + distributed tables
|
||||
|
||||
CREATE TABLE pg_pa_target (tid integer, balance float, val text)
|
||||
PARTITION BY LIST (tid);
|
||||
CREATE TABLE citus_pa_target (tid integer, balance float, val text)
|
||||
PARTITION BY LIST (tid);
|
||||
|
||||
CREATE TABLE part1 PARTITION OF pg_pa_target FOR VALUES IN (1,4)
|
||||
WITH (autovacuum_enabled=off);
|
||||
CREATE TABLE part2 PARTITION OF pg_pa_target FOR VALUES IN (2,5,6)
|
||||
WITH (autovacuum_enabled=off);
|
||||
CREATE TABLE part3 PARTITION OF pg_pa_target FOR VALUES IN (3,8,9)
|
||||
WITH (autovacuum_enabled=off);
|
||||
CREATE TABLE part4 PARTITION OF pg_pa_target DEFAULT
|
||||
WITH (autovacuum_enabled=off);
|
||||
CREATE TABLE part5 PARTITION OF citus_pa_target FOR VALUES IN (1,4)
|
||||
WITH (autovacuum_enabled=off);
|
||||
CREATE TABLE part6 PARTITION OF citus_pa_target FOR VALUES IN (2,5,6)
|
||||
WITH (autovacuum_enabled=off);
|
||||
CREATE TABLE part7 PARTITION OF citus_pa_target FOR VALUES IN (3,8,9)
|
||||
WITH (autovacuum_enabled=off);
|
||||
CREATE TABLE part8 PARTITION OF citus_pa_target DEFAULT
|
||||
WITH (autovacuum_enabled=off);
|
||||
|
||||
CREATE TABLE pg_pa_source (sid integer, delta float);
|
||||
CREATE TABLE citus_pa_source (sid integer, delta float);
|
||||
|
||||
-- insert many rows to the source table
|
||||
INSERT INTO pg_pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||
INSERT INTO citus_pa_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||
-- insert a few rows in the target table (odd numbered tid)
|
||||
INSERT INTO pg_pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id;
|
||||
INSERT INTO citus_pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id;
|
||||
|
||||
SELECT create_distributed_table('citus_pa_target', 'tid');
|
||||
SELECT create_distributed_table('citus_pa_source', 'sid');
|
||||
|
||||
CREATE OR REPLACE FUNCTION pa_compare_tables() RETURNS BOOLEAN AS $$
|
||||
DECLARE ret BOOL;
|
||||
BEGIN
|
||||
SELECT count(1) = 0 INTO ret
|
||||
FROM pg_pa_target
|
||||
FULL OUTER JOIN citus_pa_target
|
||||
USING (tid, balance, val)
|
||||
WHERE pg_pa_target.tid IS NULL
|
||||
OR citus_pa_target.tid IS NULL;
|
||||
RETURN ret;
|
||||
END
|
||||
$$ LANGUAGE PLPGSQL;
|
||||
|
||||
-- try simple MERGE
|
||||
BEGIN;
|
||||
MERGE INTO pg_pa_target t
|
||||
USING pg_pa_source s
|
||||
ON t.tid = s.sid
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge');
|
||||
|
||||
MERGE INTO citus_pa_target t
|
||||
USING citus_pa_source s
|
||||
ON t.tid = s.sid
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge');
|
||||
|
||||
SELECT pa_compare_tables();
|
||||
ROLLBACK;
|
||||
|
||||
-- same with a constant qual
|
||||
BEGIN;
|
||||
MERGE INTO pg_pa_target t
|
||||
USING pg_pa_source s
|
||||
ON t.tid = s.sid AND tid = 1
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge');
|
||||
|
||||
MERGE INTO citus_pa_target t
|
||||
USING citus_pa_source s
|
||||
ON t.tid = s.sid AND tid = 1
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge');
|
||||
|
||||
SELECT pa_compare_tables();
|
||||
ROLLBACK;
|
||||
|
||||
--
|
||||
-- Error and Unsupported scenarios
|
||||
--
|
||||
|
||||
-- try updating the distribution key column
|
||||
BEGIN;
|
||||
MERGE INTO target_cj t
|
||||
USING source_cj1 s
|
||||
ON t.tid = s.sid1 AND t.tid = 2
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET tid = tid + 9, src = src || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid1, 'inserted by merge', val1);
|
||||
ROLLBACK;
|
||||
|
||||
-- Foreign table as target
|
||||
MERGE INTO foreign_table
|
||||
USING ft_target ON (foreign_table.id = ft_target.id)
|
||||
|
@ -854,6 +1458,38 @@ MERGE INTO t1
|
|||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, val) VALUES (s1.id, s1.val);
|
||||
|
||||
-- Now both s1 and t1 are distributed tables
|
||||
SELECT undistribute_table('t1');
|
||||
SELECT create_distributed_table('t1', 'id');
|
||||
|
||||
-- We have a potential pitfall where a function can be invoked in
|
||||
-- the MERGE conditions which can insert/update to a random shard
|
||||
CREATE OR REPLACE function merge_when_and_write() RETURNS BOOLEAN
|
||||
LANGUAGE PLPGSQL AS
|
||||
$$
|
||||
BEGIN
|
||||
INSERT INTO t1 VALUES (100, 100);
|
||||
RETURN TRUE;
|
||||
END;
|
||||
$$;
|
||||
|
||||
-- Test preventing "ON" join condition from writing to the database
|
||||
BEGIN;
|
||||
MERGE INTO t1
|
||||
USING s1 ON t1.id = s1.id AND t1.id = 2 AND (merge_when_and_write())
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET val = t1.val + s1.val;
|
||||
ROLLBACK;
|
||||
|
||||
-- Test preventing WHEN clause(s) from writing to the database
|
||||
BEGIN;
|
||||
MERGE INTO t1
|
||||
USING s1 ON t1.id = s1.id AND t1.id = 2
|
||||
WHEN MATCHED AND (merge_when_and_write()) THEN
|
||||
UPDATE SET val = t1.val + s1.val;
|
||||
ROLLBACK;
|
||||
|
||||
|
||||
-- Joining on partition columns with sub-query
|
||||
MERGE INTO t1
|
||||
USING (SELECT * FROM s1) sub ON (sub.val = t1.id) -- sub.val is not a distribution column
|
||||
|
@ -997,6 +1633,104 @@ WHEN MATCHED THEN
|
|||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(mv_source.id, mv_source.val);
|
||||
|
||||
-- Distributed tables *must* be colocated
|
||||
CREATE TABLE dist_target(id int, val varchar);
|
||||
SELECT create_distributed_table('dist_target', 'id');
|
||||
CREATE TABLE dist_source(id int, val varchar);
|
||||
SELECT create_distributed_table('dist_source', 'id', colocate_with => 'none');
|
||||
|
||||
MERGE INTO dist_target
|
||||
USING dist_source
|
||||
ON dist_target.id = dist_source.id
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET val = dist_source.val
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(dist_source.id, dist_source.val);
|
||||
|
||||
-- Distributed tables *must* be joined on distribution column
|
||||
CREATE TABLE dist_colocated(id int, val int);
|
||||
SELECT create_distributed_table('dist_colocated', 'id', colocate_with => 'dist_target');
|
||||
|
||||
MERGE INTO dist_target
|
||||
USING dist_colocated
|
||||
ON dist_target.id = dist_colocated.val -- val is not the distribution column
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET val = dist_colocated.val
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(dist_colocated.id, dist_colocated.val);
|
||||
|
||||
|
||||
-- Both the source and target must be distributed
|
||||
MERGE INTO dist_target
|
||||
USING (SELECT 100 id) AS source
|
||||
ON dist_target.id = source.id AND dist_target.val = 'const'
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET val = 'source'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(source.id, 'source');
|
||||
|
||||
-- Non-hash distributed tables (append/range).
|
||||
CREATE VIEW show_tables AS
|
||||
SELECT logicalrelid, partmethod
|
||||
FROM pg_dist_partition
|
||||
WHERE (logicalrelid = 'dist_target'::regclass) OR (logicalrelid = 'dist_source'::regclass)
|
||||
ORDER BY 1;
|
||||
|
||||
SELECT undistribute_table('dist_source');
|
||||
SELECT create_distributed_table('dist_source', 'id', 'append');
|
||||
SELECT * FROM show_tables;
|
||||
|
||||
MERGE INTO dist_target
|
||||
USING dist_source
|
||||
ON dist_target.id = dist_source.id
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET val = dist_source.val
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(dist_source.id, dist_source.val);
|
||||
|
||||
SELECT undistribute_table('dist_source');
|
||||
SELECT create_distributed_table('dist_source', 'id', 'range');
|
||||
SELECT * FROM show_tables;
|
||||
|
||||
MERGE INTO dist_target
|
||||
USING dist_source
|
||||
ON dist_target.id = dist_source.id
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET val = dist_source.val
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(dist_source.id, dist_source.val);
|
||||
|
||||
-- Both are append tables
|
||||
SELECT undistribute_table('dist_target');
|
||||
SELECT undistribute_table('dist_source');
|
||||
SELECT create_distributed_table('dist_target', 'id', 'append');
|
||||
SELECT create_distributed_table('dist_source', 'id', 'append');
|
||||
SELECT * FROM show_tables;
|
||||
|
||||
MERGE INTO dist_target
|
||||
USING dist_source
|
||||
ON dist_target.id = dist_source.id
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET val = dist_source.val
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(dist_source.id, dist_source.val);
|
||||
|
||||
-- Both are range tables
|
||||
SELECT undistribute_table('dist_target');
|
||||
SELECT undistribute_table('dist_source');
|
||||
SELECT create_distributed_table('dist_target', 'id', 'range');
|
||||
SELECT create_distributed_table('dist_source', 'id', 'range');
|
||||
SELECT * FROM show_tables;
|
||||
|
||||
MERGE INTO dist_target
|
||||
USING dist_source
|
||||
ON dist_target.id = dist_source.id
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET val = dist_source.val
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(dist_source.id, dist_source.val);
|
||||
|
||||
DROP SERVER foreign_server CASCADE;
|
||||
DROP FUNCTION merge_when_and_write();
|
||||
DROP SCHEMA merge_schema CASCADE;
|
||||
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
||||
|
|
|
@ -608,6 +608,14 @@ USING wq_source s ON t.tid = s.sid
|
|||
WHEN MATCHED AND (merge_when_and_write()) THEN
|
||||
UPDATE SET balance = t.balance + s.balance;
|
||||
ROLLBACK;
|
||||
|
||||
-- Test preventing ON condition from writing to the database
|
||||
BEGIN;
|
||||
MERGE INTO wq_target t
|
||||
USING wq_source s ON t.tid = s.sid AND (merge_when_and_write())
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = t.balance + s.balance;
|
||||
ROLLBACK;
|
||||
drop function merge_when_and_write();
|
||||
|
||||
DROP TABLE wq_target, wq_source;
|
||||
|
|
Loading…
Reference in New Issue