Make ConjunctionContainsColumnFilter() static again, and rearrange the code in MergeQuerySupported()

Minor: Restore the original format in the comments section.
merge_locks
Teja Mupparti 2023-02-24 17:00:47 -08:00
parent f7d838add0
commit b19566b7bb
19 changed files with 839 additions and 119 deletions

View File

@ -56,6 +56,9 @@ bool EnableFastPathRouterPlanner = true;
static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey);
static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn,
Node **distributionKeyValue);
static bool ConjunctionContainsColumnFilter(Node *node,
Var *column,
Node **distributionKeyValue);
/*
@ -292,7 +295,7 @@ ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey)
*
* If the conjuction contains column filter which is const, distributionKeyValue is set.
*/
bool
static bool
ConjunctionContainsColumnFilter(Node *node, Var *column, Node **distributionKeyValue)
{
if (node == NULL)

View File

@ -12,28 +12,41 @@
#include <stddef.h>
#include "postgres.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "parser/parsetree.h"
#include "utils/lsyscache.h"
#include "distributed/pg_version_constants.h"
#include "distributed/citus_clauses.h"
#include "distributed/listutils.h"
#include "distributed/merge_planner.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_router_planner.h"
#include "distributed/listutils.h"
#include "distributed/pg_version_constants.h"
#include "distributed/query_pushdown_planning.h"
#if PG_VERSION_NUM >= PG_VERSION_15
static DeferredErrorMessage * ErrorIfTablesNotColocatedAndJoinedOnDistColumn(Query *parse,
List *
distTablesList,
PlannerRestrictionContext
*
plannerRestrictionContext);
static bool QueryHasMergeCommand(Query *queryTree);
static DeferredErrorMessage * ErrorIfMergeHasUnsupportedTables(Query *parse,
List *rangeTableList,
PlannerRestrictionContext *
restrictionContext);
static DeferredErrorMessage * ErrorIfDistTablesNotColocated(Query *parse,
List *distTablesList,
PlannerRestrictionContext *
plannerRestrictionContext);
static bool IsPartitionColumnInMergeSource(Expr *columnExpression, Query *query, bool
skipOuterVars);
#if PG_VERSION_NUM >= PG_VERSION_15
static DeferredErrorMessage * InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte);
skipOuterVars);
static DeferredErrorMessage * InsertPartitionColumnMatchesSource(Query *query,
RangeTblEntry *resultRte);
static DeferredErrorMessage * MergeActionListSupported(Oid resultRelationId,
FromExpr *joinTree, Node *quals,
List *targetList,
CmdType commandType);
#endif
@ -45,9 +58,16 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSource(Query *query, R
* - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported
*/
DeferredErrorMessage *
MergeQuerySupported(Query *originalQuery,
MergeQuerySupported(Query *originalQuery, bool multiShardQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
/* function is void for pre-15 versions of Postgres */
#if PG_VERSION_NUM < PG_VERSION_15
return NULL;
#else
/* For non-MERGE commands it's a no-op */
if (!QueryHasMergeCommand(originalQuery))
{
@ -71,24 +91,22 @@ MergeQuerySupported(Query *originalQuery,
plannerRestrictionContext);
if (deferredError)
{
return deferredError;
/* MERGE's unsupported combination, raise the exception */
RaiseDeferredError(deferredError, ERROR);
}
Oid resultRelationId = resultRte->relid;
deferredError =
TargetlistAndFunctionsSupported(resultRelationId,
originalQuery->jointree,
originalQuery->jointree->quals,
originalQuery->targetList,
originalQuery->commandType,
originalQuery->returningList);
MergeActionListSupported(resultRelationId,
originalQuery->jointree,
originalQuery->jointree->quals,
originalQuery->targetList,
originalQuery->commandType);
if (deferredError)
{
return deferredError;
}
#if PG_VERSION_NUM >= PG_VERSION_15
/*
* MERGE is a special case where we have multiple modify statements
* within itself. Check each INSERT/UPDATE/DELETE individually.
@ -98,15 +116,15 @@ MergeQuerySupported(Query *originalQuery,
{
Assert(originalQuery->returningList == NULL);
deferredError =
TargetlistAndFunctionsSupported(resultRelationId,
originalQuery->jointree,
action->qual,
action->targetList,
action->commandType,
originalQuery->returningList);
MergeActionListSupported(resultRelationId,
originalQuery->jointree,
action->qual,
action->targetList,
action->commandType);
if (deferredError)
{
return deferredError;
/* MERGE's unsupported scenario, raise the exception */
RaiseDeferredError(deferredError, ERROR);
}
}
@ -114,14 +132,33 @@ MergeQuerySupported(Query *originalQuery,
InsertPartitionColumnMatchesSource(originalQuery, resultRte);
if (deferredError)
{
return deferredError;
/* MERGE's unsupported scenario, raise the exception */
RaiseDeferredError(deferredError, ERROR);
}
#endif
if (multiShardQuery)
{
deferredError = DeferErrorIfUnsupportedSubqueryPushdown(originalQuery,
plannerRestrictionContext);
if (deferredError)
{
return deferredError;
}
}
if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"a join with USING causes an internal naming "
"conflict, use ON instead", NULL, NULL);
}
return NULL;
#endif
}
/*
* IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is
* permitted on special relations, such as materialized view, returns true only if
@ -147,6 +184,8 @@ IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte)
}
#if PG_VERSION_NUM >= PG_VERSION_15
/*
* ErrorIfDistTablesNotColocated Checks to see if
*
@ -158,8 +197,9 @@ IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte)
* If any of the conditions are not met, it raises an exception.
*/
static DeferredErrorMessage *
ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList,
PlannerRestrictionContext *plannerRestrictionContext)
ErrorIfTablesNotColocatedAndJoinedOnDistColumn(Query *parse, List *distTablesList,
PlannerRestrictionContext *
plannerRestrictionContext)
{
/* All MERGE tables must be distributed */
if (list_length(distTablesList) < 2)
@ -170,7 +210,7 @@ ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList,
}
/* All distributed tables must be colocated */
if (!AllRelationsInListColocated(distTablesList, RANGETABLE_ENTRY))
if (!AllRelationsInRTEListColocated(distTablesList))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"For MERGE command, all the distributed tables "
@ -347,8 +387,9 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList,
NULL, NULL);
}
/* Ensure all distributed tables are indeed co-located */
return ErrorIfDistTablesNotColocated(parse, distTablesList, restrictionContext);
/* Ensure all distributed tables are indeed co-located and joined on distribution column */
return ErrorIfTablesNotColocatedAndJoinedOnDistColumn(parse, distTablesList,
restrictionContext);
}
@ -359,11 +400,6 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList,
static bool
QueryHasMergeCommand(Query *queryTree)
{
/* function is void for pre-15 versions of Postgres */
#if PG_VERSION_NUM < PG_VERSION_15
return false;
#else
/*
* Postgres currently doesn't support Merge queries inside subqueries and
* ctes, but lets be defensive and do query tree walk anyway.
@ -378,7 +414,6 @@ QueryHasMergeCommand(Query *queryTree)
}
return true;
#endif
}
@ -420,8 +455,6 @@ IsPartitionColumnInMergeSource(Expr *columnExpression, Query *query, bool skipOu
}
#if PG_VERSION_NUM >= PG_VERSION_15
/*
* InsertPartitionColumnMatchesSource check to see if MERGE is inserting a
* value into the target which is not from the source table, if so, it
@ -516,4 +549,153 @@ InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte)
return NULL;
}
/*
* MergeActionListSupported Checks WHEN/ON clause actions to see what functions are allowed, if
* we are updating distribution column, etc.
*/
static DeferredErrorMessage *
MergeActionListSupported(Oid resultRelationId, FromExpr *joinTree, Node *quals,
List *targetList, CmdType commandType)
{
uint32 rangeTableId = 1;
Var *partitionColumn = NULL;
if (IsCitusTable(resultRelationId))
{
partitionColumn = PartitionColumn(resultRelationId, rangeTableId);
}
ListCell *targetEntryCell = NULL;
bool hasVarArgument = false; /* A STABLE function is passed a Var argument */
bool hasBadCoalesce = false; /* CASE/COALESCE passed a mutable function */
foreach(targetEntryCell, targetList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
/* skip resjunk entries: UPDATE adds some for ctid, etc. */
if (targetEntry->resjunk)
{
continue;
}
bool targetEntryPartitionColumn = false;
AttrNumber targetColumnAttrNumber = InvalidAttrNumber;
/* reference tables do not have partition column */
if (partitionColumn == NULL)
{
targetEntryPartitionColumn = false;
}
else
{
if (commandType == CMD_UPDATE)
{
/*
* Note that it is not possible to give an alias to
* UPDATE table SET ...
*/
if (targetEntry->resname)
{
targetColumnAttrNumber = get_attnum(resultRelationId,
targetEntry->resname);
if (targetColumnAttrNumber == partitionColumn->varattno)
{
targetEntryPartitionColumn = true;
}
}
}
}
if (targetEntryPartitionColumn &&
TargetEntryChangesValue(targetEntry, partitionColumn, joinTree))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"updating the distribution column is not "
"allowed in MERGE actions",
NULL, NULL);
}
if (commandType != CMD_INSERT &&
FindNodeMatchingCheckFunction((Node *) targetEntry->expr,
CitusIsVolatileFunction))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"functions used in MERGE actions on distributed "
"tables must not be VOLATILE",
NULL, NULL);
}
if (commandType != CMD_INSERT &&
MasterIrreducibleExpression((Node *) targetEntry->expr,
&hasVarArgument, &hasBadCoalesce))
{
Assert(hasVarArgument || hasBadCoalesce);
}
if (FindNodeMatchingCheckFunction((Node *) targetEntry->expr,
NodeIsFieldStore))
{
/* DELETE cannot do field indirection already */
Assert(commandType == CMD_UPDATE || commandType == CMD_INSERT);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"inserting or modifying composite type fields is not "
"supported", NULL,
"Use the column name to insert or update the composite "
"type as a single value");
}
}
StringInfo errorMessage = makeStringInfo();
appendStringInfo(errorMessage, "functions used in the %s clause of MERGE "
"queries on distributed tables must not be VOLATILE",
(commandType == CMD_MERGE) ? "ON" : "WHEN");
/*
* Check the condition, convert list of expressions into expression tree for further processing
*/
if (quals)
{
if (IsA(quals, List))
{
quals = (Node *) make_ands_explicit((List *) quals);
}
if (FindNodeMatchingCheckFunction((Node *) quals, CitusIsVolatileFunction))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorMessage->data, NULL, NULL);
}
else if (MasterIrreducibleExpression(quals, &hasVarArgument, &hasBadCoalesce))
{
Assert(hasVarArgument || hasBadCoalesce);
}
}
if (hasVarArgument)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"STABLE functions used in MERGE queries "
"cannot be called with column references",
NULL, NULL);
}
if (hasBadCoalesce)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"non-IMMUTABLE functions are not allowed in CASE or "
"COALESCE statements",
NULL, NULL);
}
if (quals != NULL && nodeTag(quals) == T_CurrentOfExpr)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot run MERGE actions with cursors",
NULL, NULL);
}
return NULL;
}
#endif

View File

@ -2225,17 +2225,14 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
}
/*
* For left joins we don't care about the shards pruned for
* the right hand side. If the right hand side would prune
* to a smaller set we should still send it to all tables
* of the left hand side. However if the right hand side is
* bigger than the left hand side we don't have to send the
* query to any shard that is not matching anything on the
* left hand side.
* For left joins we don't care about the shards pruned for the right hand side.
* If the right hand side would prune to a smaller set we should still send it to
* all tables of the left hand side. However if the right hand side is bigger than
* the left hand side we don't have to send the query to any shard that is not
* matching anything on the left hand side.
*
* Instead we will simply skip any RelationRestriction if it
* is an OUTER join and the table is part of the non-outer
* side of the join.
* Instead we will simply skip any RelationRestriction if it is an OUTER join and
* the table is part of the non-outer side of the join.
*/
if (IsInnerTableOfOuterJoin(relationRestriction))
{

View File

@ -126,21 +126,15 @@ static bool IsTidColumn(Node *node);
static DeferredErrorMessage * ModifyPartialQuerySupported(Query *queryTree, bool
multiShardQuery,
Oid *distributedTableId);
static bool NodeIsFieldStore(Node *node);
static DeferredErrorMessage * MultiShardUpdateDeleteMergeSupported(Query *originalQuery,
static DeferredErrorMessage * MultiShardUpdateDeleteSupported(Query *originalQuery,
PlannerRestrictionContext
*
plannerRestrictionContext);
static DeferredErrorMessage * SingleShardUpdateDeleteSupported(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
static bool HasDangerousJoinUsing(List *rtableList, Node *jtnode);
static bool MasterIrreducibleExpression(Node *expression, bool *varArgument,
bool *badCoalesce);
static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state);
static bool MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context);
static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
FromExpr *joinTree);
static Job * RouterInsertJob(Query *originalQuery);
static void ErrorIfNoShardsExist(CitusTableCacheEntry *cacheEntry);
static DeferredErrorMessage * DeferErrorIfModifyView(Query *queryTree);
@ -894,7 +888,7 @@ IsLocallyAccessibleCitusLocalTable(Oid relationId)
/*
* NodeIsFieldStore returns true if given Node is a FieldStore object.
*/
static bool
bool
NodeIsFieldStore(Node *node)
{
return node && IsA(node, FieldStore);
@ -916,14 +910,11 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
PlannerRestrictionContext *plannerRestrictionContext)
{
Oid distributedTableId = InvalidOid;
DeferredErrorMessage *error = MergeQuerySupported(originalQuery,
DeferredErrorMessage *error = MergeQuerySupported(originalQuery, multiShardQuery,
plannerRestrictionContext);
if (error)
{
/*
* For MERGE, we do not do recursive plannning, simply bail out.
*/
RaiseDeferredError(error, ERROR);
return error;
}
error = ModifyPartialQuerySupported(queryTree, multiShardQuery, &distributedTableId);
@ -1096,13 +1087,13 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
}
}
if (commandType != CMD_INSERT)
if (commandType != CMD_INSERT && commandType != CMD_MERGE)
{
DeferredErrorMessage *errorMessage = NULL;
if (multiShardQuery)
{
errorMessage = MultiShardUpdateDeleteMergeSupported(
errorMessage = MultiShardUpdateDeleteSupported(
originalQuery,
plannerRestrictionContext);
}
@ -1283,11 +1274,11 @@ ErrorIfOnConflictNotSupported(Query *queryTree)
/*
* MultiShardUpdateDeleteMergeSupported returns the error message if the update/delete is
* MultiShardUpdateDeleteSupported returns the error message if the update/delete is
* not pushdownable, otherwise it returns NULL.
*/
static DeferredErrorMessage *
MultiShardUpdateDeleteMergeSupported(Query *originalQuery,
MultiShardUpdateDeleteSupported(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
DeferredErrorMessage *errorMessage = NULL;
@ -1360,7 +1351,7 @@ SingleShardUpdateDeleteSupported(Query *originalQuery,
* HasDangerousJoinUsing search jointree for unnamed JOIN USING. Check the
* implementation of has_dangerous_join_using in ruleutils.
*/
static bool
bool
HasDangerousJoinUsing(List *rtableList, Node *joinTreeNode)
{
if (IsA(joinTreeNode, RangeTblRef))
@ -1464,7 +1455,7 @@ IsMergeQuery(Query *query)
* which do, but for now we just error out. That makes both the code and user-education
* easier.
*/
static bool
bool
MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce)
{
WalkerState data;
@ -1612,7 +1603,7 @@ MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context)
* expression is a value that is implied by the qualifiers of the join
* tree, or the target entry sets a different column.
*/
static bool
bool
TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTree)
{
bool isColumnValueChanged = true;

View File

@ -151,6 +151,9 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass
secondClass);
static Var * PartitionKeyForRTEIdentityInQuery(Query *query, int targetRTEIndex,
Index *partitionKeyIndex);
static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext *
restrictionContext);
static bool AllRelationsInListColocated(List *relationList);
static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node);
static JoinRestrictionContext * FilterJoinRestrictionContext(
JoinRestrictionContext *joinRestrictionContext, Relids
@ -381,8 +384,7 @@ SafeToPushdownUnionSubquery(Query *originalQuery,
return false;
}
if (!AllRelationsInListColocated(restrictionContext->relationRestrictionList,
RESTRICTION_CONTEXT))
if (!AllRelationsInRestrictionContextColocated(restrictionContext))
{
/* distribution columns are equal, but tables are not co-located */
return false;
@ -1918,34 +1920,56 @@ FindQueryContainingRTEIdentityInternal(Node *node,
/*
* AllRelationsInListColocated determines whether all of the relations in the
* given list are co-located.
* Note: The list can be of dofferent types, which is specified by ListEntryType
* AllRelationsInRestrictionContextColocated determines whether all of the relations in the
* given relation restrictions list are co-located.
*/
bool
AllRelationsInListColocated(List *relationList, ListEntryType entryType)
static bool
AllRelationsInRestrictionContextColocated(RelationRestrictionContext *restrictionContext)
{
void *varPtr = NULL;
RangeTblEntry *rangeTableEntry = NULL;
RelationRestriction *relationRestriction = NULL;
int initialColocationId = INVALID_COLOCATION_ID;
List *relationIdList = NIL;
/* check whether all relations exists in the main restriction list */
foreach_ptr(varPtr, relationList)
foreach_ptr(relationRestriction, restrictionContext->relationRestrictionList)
{
Oid relationId = InvalidOid;
relationIdList = lappend_oid(relationIdList, relationRestriction->relationId);
}
if (entryType == RANGETABLE_ENTRY)
{
rangeTableEntry = (RangeTblEntry *) varPtr;
relationId = rangeTableEntry->relid;
}
else if (entryType == RESTRICTION_CONTEXT)
{
relationRestriction = (RelationRestriction *) varPtr;
relationId = relationRestriction->relationId;
}
return AllRelationsInListColocated(relationIdList);
}
/*
* AllRelationsInRTEListColocated determines whether all of the relations in the
* given RangeTableEntry list are co-located.
*/
bool
AllRelationsInRTEListColocated(List *rangeTableEntryList)
{
RangeTblEntry *rangeTableEntry = NULL;
List *relationIdList = NIL;
foreach_ptr(rangeTableEntry, rangeTableEntryList)
{
relationIdList = lappend_oid(relationIdList, rangeTableEntry->relid);
}
return AllRelationsInListColocated(relationIdList);
}
/*
* AllRelationsInListColocated determines whether all of the relations in the
* given list are co-located.
*/
static bool
AllRelationsInListColocated(List *relationList)
{
int initialColocationId = INVALID_COLOCATION_ID;
Oid relationId = InvalidOid;
foreach_oid(relationId, relationList)
{
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
{
continue;

View File

@ -254,9 +254,6 @@ extern struct DistributedPlan * CreateDistributedPlan(uint64 planId,
bool hasUnresolvedParams,
PlannerRestrictionContext *
plannerRestrictionContext);
extern bool ConjunctionContainsColumnFilter(Node *node,
Var *column,
Node **distributionKeyValue);
extern bool ContainsMergeCommandWalker(Node *node);
#endif /* DISTRIBUTED_PLANNER_H */

View File

@ -19,7 +19,7 @@
#include "distributed/errormessage.h"
extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte);
extern DeferredErrorMessage * MergeQuerySupported(Query *originalQuery,
extern DeferredErrorMessage * MergeQuerySupported(Query *originalQuery, bool multiShardQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
#endif /* MERGE_PLANNER_H */

View File

@ -106,7 +106,11 @@ extern DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelation
List *targetList,
CmdType commandType,
List *returningList);
extern bool NodeIsFieldStore(Node *node);
extern bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
FromExpr *joinTree);
extern bool MasterIrreducibleExpression(Node *expression, bool *varArgument,
bool *badCoalesce);
extern bool HasDangerousJoinUsing(List *rtableList, Node *jtnode);
#endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -17,15 +17,6 @@
#define SINGLE_RTE_INDEX 1
/*
* Represents the pointer type that's being passed in the list.
*/
typedef enum ListEntryType
{
RANGETABLE_ENTRY, /* RangeTblEntry */
RESTRICTION_CONTEXT /* RelationRestriction */
} ListEntryType;
extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
@ -63,6 +54,5 @@ extern RelationRestrictionContext * FilterRelationRestrictionContext(
RelationRestrictionContext *relationRestrictionContext,
Relids
queryRteIdentities);
extern bool AllRelationsInListColocated(List *relationList, ListEntryType entryType);
extern bool AllRelationsInRTEListColocated(List *rangeTableEntryList);
#endif /* RELATION_RESTRICTION_EQUIVALENCE_H */

View File

@ -13,3 +13,4 @@ test: arbitrary_configs_truncate_create
test: arbitrary_configs_truncate_cascade_create
test: arbitrary_configs_truncate_partition_create
test: arbitrary_configs_alter_table_add_constraint_without_name_create
test: merge_arbitrary_create

View File

@ -2220,9 +2220,231 @@ SELECT pa_compare_tables();
(1 row)
ROLLBACK;
CREATE TABLE source_json( id integer, z int, d jsonb);
CREATE TABLE target_json( id integer, z int, d jsonb);
INSERT INTO source_json SELECT i,i FROM generate_series(0,100)i;
SELECT create_distributed_table('target_json','id'), create_distributed_table('source_json', 'id');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.source_json$$)
create_distributed_table | create_distributed_table
---------------------------------------------------------------------
|
(1 row)
-- single shard query given source_json is filtered and Postgres is smart to pushdown
-- filter to the target_json as well
EXPLAIN MERGE INTO target_json sda
USING (SELECT * FROM source_json WHERE id = 1) sdn
ON sda.id = sdn.id
WHEN NOT matched THEN
INSERT (id, z) VALUES (sdn.id, 5);
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_json_4000091 sda (cost=0.00..50.56 rows=0 width=0)
-> Nested Loop Left Join (cost=0.00..50.56 rows=36 width=10)
Join Filter: (sda.id = source_json.id)
-> Seq Scan on source_json_4000095 source_json (cost=0.00..25.00 rows=6 width=4)
Filter: (id = 1)
-> Materialize (cost=0.00..25.03 rows=6 width=10)
-> Seq Scan on target_json_4000091 sda (cost=0.00..25.00 rows=6 width=10)
Filter: (id = 1)
(13 rows)
-- zero shard query as filters do not match
EXPLAIN MERGE INTO target_json sda
USING (SELECT * FROM source_json WHERE id = 1) sdn
ON sda.id = sdn.id AND sda.id = 2
WHEN NOT matched THEN
INSERT (id, z) VALUES (sdn.id, 5);
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 0
Tasks Shown: All
(3 rows)
-- join for source_json is happening at at different place
EXPLAIN MERGE INTO target_json sda
USING source_json s1 LEFT JOIN (SELECT * FROM source_json) s2 USING(z)
ON sda.id = s1.id AND s1.id = s2.id
WHEN NOT matched THEN
INSERT (id, z) VALUES (s2.id, 5);
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_json_4000091 sda (cost=825.41..1617.41 rows=0 width=0)
-> Merge Left Join (cost=825.41..1617.41 rows=7200 width=10)
Merge Cond: (s1.id = sda.id)
Join Filter: (s1.id = source_json.id)
-> Sort (cost=742.04..760.04 rows=7200 width=8)
Sort Key: s1.id
-> Merge Left Join (cost=166.75..280.75 rows=7200 width=8)
Merge Cond: (s1.z = source_json.z)
-> Sort (cost=83.37..86.37 rows=1200 width=8)
Sort Key: s1.z
-> Seq Scan on source_json_4000095 s1 (cost=0.00..22.00 rows=1200 width=8)
-> Sort (cost=83.37..86.37 rows=1200 width=8)
Sort Key: source_json.z
-> Seq Scan on source_json_4000095 source_json (cost=0.00..22.00 rows=1200 width=8)
-> Sort (cost=83.37..86.37 rows=1200 width=10)
Sort Key: sda.id
-> Seq Scan on target_json_4000091 sda (cost=0.00..22.00 rows=1200 width=10)
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_json_4000092 sda (cost=825.41..1617.41 rows=0 width=0)
-> Merge Left Join (cost=825.41..1617.41 rows=7200 width=10)
Merge Cond: (s1.id = sda.id)
Join Filter: (s1.id = source_json.id)
-> Sort (cost=742.04..760.04 rows=7200 width=8)
Sort Key: s1.id
-> Merge Left Join (cost=166.75..280.75 rows=7200 width=8)
Merge Cond: (s1.z = source_json.z)
-> Sort (cost=83.37..86.37 rows=1200 width=8)
Sort Key: s1.z
-> Seq Scan on source_json_4000096 s1 (cost=0.00..22.00 rows=1200 width=8)
-> Sort (cost=83.37..86.37 rows=1200 width=8)
Sort Key: source_json.z
-> Seq Scan on source_json_4000096 source_json (cost=0.00..22.00 rows=1200 width=8)
-> Sort (cost=83.37..86.37 rows=1200 width=10)
Sort Key: sda.id
-> Seq Scan on target_json_4000092 sda (cost=0.00..22.00 rows=1200 width=10)
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_json_4000093 sda (cost=825.41..1617.41 rows=0 width=0)
-> Merge Left Join (cost=825.41..1617.41 rows=7200 width=10)
Merge Cond: (s1.id = sda.id)
Join Filter: (s1.id = source_json.id)
-> Sort (cost=742.04..760.04 rows=7200 width=8)
Sort Key: s1.id
-> Merge Left Join (cost=166.75..280.75 rows=7200 width=8)
Merge Cond: (s1.z = source_json.z)
-> Sort (cost=83.37..86.37 rows=1200 width=8)
Sort Key: s1.z
-> Seq Scan on source_json_4000097 s1 (cost=0.00..22.00 rows=1200 width=8)
-> Sort (cost=83.37..86.37 rows=1200 width=8)
Sort Key: source_json.z
-> Seq Scan on source_json_4000097 source_json (cost=0.00..22.00 rows=1200 width=8)
-> Sort (cost=83.37..86.37 rows=1200 width=10)
Sort Key: sda.id
-> Seq Scan on target_json_4000093 sda (cost=0.00..22.00 rows=1200 width=10)
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_json_4000094 sda (cost=825.41..1617.41 rows=0 width=0)
-> Merge Left Join (cost=825.41..1617.41 rows=7200 width=10)
Merge Cond: (s1.id = sda.id)
Join Filter: (s1.id = source_json.id)
-> Sort (cost=742.04..760.04 rows=7200 width=8)
Sort Key: s1.id
-> Merge Left Join (cost=166.75..280.75 rows=7200 width=8)
Merge Cond: (s1.z = source_json.z)
-> Sort (cost=83.37..86.37 rows=1200 width=8)
Sort Key: s1.z
-> Seq Scan on source_json_4000098 s1 (cost=0.00..22.00 rows=1200 width=8)
-> Sort (cost=83.37..86.37 rows=1200 width=8)
Sort Key: source_json.z
-> Seq Scan on source_json_4000098 source_json (cost=0.00..22.00 rows=1200 width=8)
-> Sort (cost=83.37..86.37 rows=1200 width=10)
Sort Key: sda.id
-> Seq Scan on target_json_4000094 sda (cost=0.00..22.00 rows=1200 width=10)
(79 rows)
-- update JSON column
EXPLAIN MERGE INTO target_json sda
USING source_json sdn
ON sda.id = sdn.id
WHEN matched THEN
UPDATE SET d = '{"a" : 5}';
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_json_4000091 sda (cost=166.75..280.75 rows=0 width=0)
-> Merge Join (cost=166.75..280.75 rows=7200 width=6)
Merge Cond: (sda.id = sdn.id)
-> Sort (cost=83.37..86.37 rows=1200 width=10)
Sort Key: sda.id
-> Seq Scan on target_json_4000091 sda (cost=0.00..22.00 rows=1200 width=10)
-> Sort (cost=83.37..86.37 rows=1200 width=4)
Sort Key: sdn.id
-> Seq Scan on source_json_4000095 sdn (cost=0.00..22.00 rows=1200 width=4)
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_json_4000092 sda (cost=166.75..280.75 rows=0 width=0)
-> Merge Join (cost=166.75..280.75 rows=7200 width=6)
Merge Cond: (sda.id = sdn.id)
-> Sort (cost=83.37..86.37 rows=1200 width=10)
Sort Key: sda.id
-> Seq Scan on target_json_4000092 sda (cost=0.00..22.00 rows=1200 width=10)
-> Sort (cost=83.37..86.37 rows=1200 width=4)
Sort Key: sdn.id
-> Seq Scan on source_json_4000096 sdn (cost=0.00..22.00 rows=1200 width=4)
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_json_4000093 sda (cost=166.75..280.75 rows=0 width=0)
-> Merge Join (cost=166.75..280.75 rows=7200 width=6)
Merge Cond: (sda.id = sdn.id)
-> Sort (cost=83.37..86.37 rows=1200 width=10)
Sort Key: sda.id
-> Seq Scan on target_json_4000093 sda (cost=0.00..22.00 rows=1200 width=10)
-> Sort (cost=83.37..86.37 rows=1200 width=4)
Sort Key: sdn.id
-> Seq Scan on source_json_4000097 sdn (cost=0.00..22.00 rows=1200 width=4)
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge on target_json_4000094 sda (cost=166.75..280.75 rows=0 width=0)
-> Merge Join (cost=166.75..280.75 rows=7200 width=6)
Merge Cond: (sda.id = sdn.id)
-> Sort (cost=83.37..86.37 rows=1200 width=10)
Sort Key: sda.id
-> Seq Scan on target_json_4000094 sda (cost=0.00..22.00 rows=1200 width=10)
-> Sort (cost=83.37..86.37 rows=1200 width=4)
Sort Key: sdn.id
-> Seq Scan on source_json_4000098 sdn (cost=0.00..22.00 rows=1200 width=4)
(47 rows)
CREATE FUNCTION immutable_hash(int) RETURNS int
AS 'SELECT hashtext( ($1 + $1)::text);'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
MERGE INTO target_json sda
USING source_json sdn
ON sda.id = sdn.id
WHEN matched THEN
UPDATE SET z = immutable_hash(sdn.z);
--
-- Error and Unsupported scenarios
--
-- zero shard query as source_json is zero shard
EXPLAIN MERGE INTO target_json sda
USING (SELECT * FROM source_json WHERE false) sdn
ON sda.id = sdn.id AND sda.id = 2
WHEN NOT matched THEN
INSERT (id, z) VALUES (sdn.id, 5);
ERROR: MERGE command is only supported when distributed tables are joined on their distribution column
-- modifying CTE not supported
EXPLAIN
WITH cte_1 AS (DELETE FROM target_json)
MERGE INTO target_json sda
USING source_json sdn
ON sda.id = sdn.id
WHEN NOT matched THEN
INSERT (id, z) VALUES (sdn.id, 5);
ERROR: MERGE command is only supported when distributed tables are joined on their distribution column
-- Grouping sets not supported
MERGE INTO citus_target t
USING (SELECT count(*), id FROM citus_source GROUP BY GROUPING SETS (id, val)) subq
@ -2290,7 +2512,7 @@ MERGE INTO target_cj t
UPDATE SET tid = tid + 9, src = src || ' updated by merge'
WHEN NOT MATCHED THEN
INSERT VALUES (sid1, 'inserted by merge', val1);
ERROR: modifying the partition value of rows is not allowed
ERROR: updating the distribution column is not allowed in MERGE actions
ROLLBACK;
-- Foreign table as target
MERGE INTO foreign_table
@ -2382,13 +2604,22 @@ BEGIN
RETURN TRUE;
END;
$$;
-- Test functions executing in MERGE statement. This is to prevent the functions from
-- doing a random sql, which may be executed in a remote node or modifying the target
-- relation which will have unexpected/suprising results.
MERGE INTO t1 USING (SELECT * FROM s1 WHERE true) s1 ON
t1.id = s1.id AND s1.id = 2
WHEN NOT matched THEN
INSERT (id, val)
VALUES (s1.id , random());
ERROR: functions used in MERGE actions on distributed tables must not be VOLATILE
-- Test preventing "ON" join condition from writing to the database
BEGIN;
MERGE INTO t1
USING s1 ON t1.id = s1.id AND t1.id = 2 AND (merge_when_and_write())
WHEN MATCHED THEN
UPDATE SET val = t1.val + s1.val;
ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE
ERROR: functions used in the ON clause of MERGE queries on distributed tables must not be VOLATILE
ROLLBACK;
-- Test preventing WHEN clause(s) from writing to the database
BEGIN;
@ -2396,7 +2627,7 @@ MERGE INTO t1
USING s1 ON t1.id = s1.id AND t1.id = 2
WHEN MATCHED AND (merge_when_and_write()) THEN
UPDATE SET val = t1.val + s1.val;
ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE
ERROR: functions used in the WHEN clause of MERGE queries on distributed tables must not be VOLATILE
ROLLBACK;
-- Joining on partition columns with sub-query
MERGE INTO t1
@ -2872,7 +3103,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_
PL/pgSQL function citus_drop_trigger() line XX at PERFORM
DROP FUNCTION merge_when_and_write();
DROP SCHEMA merge_schema CASCADE;
NOTICE: drop cascades to 75 other objects
NOTICE: drop cascades to 78 other objects
DETAIL: drop cascades to function insert_data()
drop cascades to table pg_result
drop cascades to table local_local
@ -2939,9 +3170,12 @@ drop cascades to table citus_pa_target
drop cascades to table pg_pa_source
drop cascades to table citus_pa_source
drop cascades to function pa_compare_tables()
drop cascades to table source_json
drop cascades to table target_json
drop cascades to function immutable_hash(integer)
drop cascades to table pg
drop cascades to table t1_4000110
drop cascades to table s1_4000111
drop cascades to table t1_4000118
drop cascades to table s1_4000119
drop cascades to table t1
drop cascades to table s1
drop cascades to table dist_colocated

View File

@ -0,0 +1,88 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
\endif
SET search_path TO merge_arbitrary_schema;
INSERT INTO target_cj VALUES (1, 'target', 0);
INSERT INTO target_cj VALUES (2, 'target', 0);
INSERT INTO target_cj VALUES (2, 'target', 0);
INSERT INTO target_cj VALUES (3, 'target', 0);
INSERT INTO source_cj1 VALUES (2, 'source-1', 10);
INSERT INTO source_cj2 VALUES (2, 'source-2', 20);
BEGIN;
MERGE INTO target_cj t
USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2
ON t.tid = sid1 AND t.tid = 2
WHEN MATCHED THEN
UPDATE SET src = src2
WHEN NOT MATCHED THEN
DO NOTHING;
SELECT * FROM target_cj ORDER BY 1;
tid | src | val
---------------------------------------------------------------------
1 | target | 0
2 | source-2 | 0
2 | source-2 | 0
3 | target | 0
(4 rows)
ROLLBACK;
BEGIN;
-- try accessing columns from either side of the source join
MERGE INTO target_cj t
USING source_cj1 s2
INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10
ON t.tid = sid1 AND t.tid = 2
WHEN MATCHED THEN
UPDATE SET src = src1, val = val2
WHEN NOT MATCHED THEN
DO NOTHING;
SELECT * FROM target_cj ORDER BY 1;
tid | src | val
---------------------------------------------------------------------
1 | target | 0
2 | source-1 | 20
2 | source-1 | 20
3 | target | 0
(4 rows)
ROLLBACK;
-- Test PREPARE
CREATE TABLE prept(t1 int, t2 int);
CREATE TABLE preps(s1 int, s2 int);
INSERT INTO prept VALUES(100, 0);
INSERT INTO preps VALUES(100, 0);
INSERT INTO preps VALUES(200, 0);
PREPARE insert(int) AS
MERGE INTO prept
USING preps
ON prept.t1 = preps.s1
WHEN MATCHED THEN
UPDATE SET t2 = t2 + $1
WHEN NOT MATCHED THEN
INSERT VALUES(s1, s2);
PREPARE delete(int) AS
MERGE INTO prept
USING preps
ON prept.t1 = preps.s1
WHEN MATCHED AND prept.t2 = $1 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 1;
EXECUTE insert(1); EXECUTE delete(0);
EXECUTE insert(1); EXECUTE delete(0);
EXECUTE insert(1); EXECUTE delete(0);
EXECUTE insert(1); EXECUTE delete(0);
EXECUTE insert(1); EXECUTE delete(0);
-- sixth time
EXECUTE insert(1); EXECUTE delete(0);
-- Should have the counter as 12 (6 * 2)
SELECT * FROM prept;
t1 | t2
---------------------------------------------------------------------
100 | 12
(1 row)

View File

@ -0,0 +1,6 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q

View File

@ -0,0 +1,26 @@
DROP SCHEMA IF EXISTS merge_arbitrary_schema CASCADE;
CREATE SCHEMA merge_arbitrary_schema;
SET search_path TO merge_arbitrary_schema;
SET citus.shard_count TO 4;
SET citus.next_shard_id TO 6000000;
CREATE TABLE target_cj(tid int, src text, val int);
CREATE TABLE source_cj1(sid1 int, src1 text, val1 int);
CREATE TABLE source_cj2(sid2 int, src2 text, val2 int);
SELECT create_distributed_table('target_cj', 'tid');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('source_cj1', 'sid1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('source_cj2', 'sid2');
create_distributed_table
---------------------------------------------------------------------
(1 row)

View File

@ -910,7 +910,7 @@ MERGE INTO wq_target t
USING wq_source s ON t.tid = s.sid
WHEN MATCHED AND (merge_when_and_write()) THEN
UPDATE SET balance = t.balance + s.balance;
ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE
ERROR: functions used in the WHEN clause of MERGE queries on distributed tables must not be VOLATILE
ROLLBACK;
-- Test preventing ON condition from writing to the database
BEGIN;
@ -918,7 +918,7 @@ MERGE INTO wq_target t
USING wq_source s ON t.tid = s.sid AND (merge_when_and_write())
WHEN MATCHED THEN
UPDATE SET balance = t.balance + s.balance;
ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on distributed tables must not be VOLATILE
ERROR: functions used in the ON clause of MERGE queries on distributed tables must not be VOLATILE
ROLLBACK;
drop function merge_when_and_write();
DROP TABLE wq_target, wq_source;

View File

@ -1422,10 +1422,86 @@ MERGE INTO citus_pa_target t
SELECT pa_compare_tables();
ROLLBACK;
CREATE TABLE source_json( id integer, z int, d jsonb);
CREATE TABLE target_json( id integer, z int, d jsonb);
INSERT INTO source_json SELECT i,i FROM generate_series(0,100)i;
SELECT create_distributed_table('target_json','id'), create_distributed_table('source_json', 'id');
-- single shard query given source_json is filtered and Postgres is smart to pushdown
-- filter to the target_json as well
EXPLAIN MERGE INTO target_json sda
USING (SELECT * FROM source_json WHERE id = 1) sdn
ON sda.id = sdn.id
WHEN NOT matched THEN
INSERT (id, z) VALUES (sdn.id, 5);
-- zero shard query as filters do not match
EXPLAIN MERGE INTO target_json sda
USING (SELECT * FROM source_json WHERE id = 1) sdn
ON sda.id = sdn.id AND sda.id = 2
WHEN NOT matched THEN
INSERT (id, z) VALUES (sdn.id, 5);
-- join for source_json is happening at at different place
EXPLAIN MERGE INTO target_json sda
USING source_json s1 LEFT JOIN (SELECT * FROM source_json) s2 USING(z)
ON sda.id = s1.id AND s1.id = s2.id
WHEN NOT matched THEN
INSERT (id, z) VALUES (s2.id, 5);
-- update JSON column
EXPLAIN MERGE INTO target_json sda
USING source_json sdn
ON sda.id = sdn.id
WHEN matched THEN
UPDATE SET d = '{"a" : 5}';
CREATE FUNCTION immutable_hash(int) RETURNS int
AS 'SELECT hashtext( ($1 + $1)::text);'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
MERGE INTO target_json sda
USING source_json sdn
ON sda.id = sdn.id
WHEN matched THEN
UPDATE SET z = immutable_hash(sdn.z);
-- Test bigserial
CREATE TABLE source_serial (id integer, z int, d bigserial);
CREATE TABLE target_serial (id integer, z int, d bigserial);
INSERT INTO source_serial SELECT i,i FROM generate_series(0,100)i;
SELECT create_distributed_table('source_serial', 'id'), create_distributed_table('target_serial', 'id');
MERGE INTO target_serial sda
USING source_serial sdn
ON sda.id = sdn.id
WHEN NOT matched THEN
INSERT (id, z) VALUES (id, z);
--
-- Error and Unsupported scenarios
--
-- zero shard query as source_json is zero shard
EXPLAIN MERGE INTO target_json sda
USING (SELECT * FROM source_json WHERE false) sdn
ON sda.id = sdn.id AND sda.id = 2
WHEN NOT matched THEN
INSERT (id, z) VALUES (sdn.id, 5);
-- modifying CTE not supported
EXPLAIN
WITH cte_1 AS (DELETE FROM target_json)
MERGE INTO target_json sda
USING source_json sdn
ON sda.id = sdn.id
WHEN NOT matched THEN
INSERT (id, z) VALUES (sdn.id, 5);
-- Grouping sets not supported
MERGE INTO citus_target t
USING (SELECT count(*), id FROM citus_source GROUP BY GROUPING SETS (id, val)) subq
@ -1534,6 +1610,15 @@ BEGIN
END;
$$;
-- Test functions executing in MERGE statement. This is to prevent the functions from
-- doing a random sql, which may be executed in a remote node or modifying the target
-- relation which will have unexpected/suprising results.
MERGE INTO t1 USING (SELECT * FROM s1 WHERE true) s1 ON
t1.id = s1.id AND s1.id = 2
WHEN NOT matched THEN
INSERT (id, val)
VALUES (s1.id , random());
-- Test preventing "ON" join condition from writing to the database
BEGIN;
MERGE INTO t1

View File

@ -0,0 +1,79 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
\endif
SET search_path TO merge_arbitrary_schema;
INSERT INTO target_cj VALUES (1, 'target', 0);
INSERT INTO target_cj VALUES (2, 'target', 0);
INSERT INTO target_cj VALUES (2, 'target', 0);
INSERT INTO target_cj VALUES (3, 'target', 0);
INSERT INTO source_cj1 VALUES (2, 'source-1', 10);
INSERT INTO source_cj2 VALUES (2, 'source-2', 20);
BEGIN;
MERGE INTO target_cj t
USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2
ON t.tid = sid1 AND t.tid = 2
WHEN MATCHED THEN
UPDATE SET src = src2
WHEN NOT MATCHED THEN
DO NOTHING;
SELECT * FROM target_cj ORDER BY 1;
ROLLBACK;
BEGIN;
-- try accessing columns from either side of the source join
MERGE INTO target_cj t
USING source_cj1 s2
INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10
ON t.tid = sid1 AND t.tid = 2
WHEN MATCHED THEN
UPDATE SET src = src1, val = val2
WHEN NOT MATCHED THEN
DO NOTHING;
SELECT * FROM target_cj ORDER BY 1;
ROLLBACK;
-- Test PREPARE
CREATE TABLE prept(t1 int, t2 int);
CREATE TABLE preps(s1 int, s2 int);
INSERT INTO prept VALUES(100, 0);
INSERT INTO preps VALUES(100, 0);
INSERT INTO preps VALUES(200, 0);
PREPARE insert(int) AS
MERGE INTO prept
USING preps
ON prept.t1 = preps.s1
WHEN MATCHED THEN
UPDATE SET t2 = t2 + $1
WHEN NOT MATCHED THEN
INSERT VALUES(s1, s2);
PREPARE delete(int) AS
MERGE INTO prept
USING preps
ON prept.t1 = preps.s1
WHEN MATCHED AND prept.t2 = $1 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 1;
EXECUTE insert(1); EXECUTE delete(0);
EXECUTE insert(1); EXECUTE delete(0);
EXECUTE insert(1); EXECUTE delete(0);
EXECUTE insert(1); EXECUTE delete(0);
EXECUTE insert(1); EXECUTE delete(0);
-- sixth time
EXECUTE insert(1); EXECUTE delete(0);
-- Should have the counter as 12 (6 * 2)
SELECT * FROM prept;

View File

@ -0,0 +1,12 @@
DROP SCHEMA IF EXISTS merge_arbitrary_schema CASCADE;
CREATE SCHEMA merge_arbitrary_schema;
SET search_path TO merge_arbitrary_schema;
SET citus.shard_count TO 4;
SET citus.next_shard_id TO 6000000;
CREATE TABLE target_cj(tid int, src text, val int);
CREATE TABLE source_cj1(sid1 int, src1 text, val1 int);
CREATE TABLE source_cj2(sid2 int, src2 text, val2 int);
SELECT create_distributed_table('target_cj', 'tid');
SELECT create_distributed_table('source_cj1', 'sid1');
SELECT create_distributed_table('source_cj2', 'sid2');

View File

@ -14,3 +14,4 @@ test: arbitrary_configs_truncate
test: arbitrary_configs_truncate_cascade
test: arbitrary_configs_truncate_partition
test: arbitrary_configs_alter_table_add_constraint_without_name
test: merge_arbitrary