Support MERGE with two scenarios

-- All the tables are Citus local
-- All the tables are distributed, co-located and joined on the
   distribution column
teja_merge_poc
Teja Mupparti 2022-08-11 20:36:04 -07:00
parent 13fe89f018
commit 8b125dff9f
16 changed files with 5963 additions and 91 deletions

View File

@ -368,6 +368,8 @@ static void get_insert_query_def(Query *query, deparse_context *context,
bool colNamesVisible);
static void get_update_query_def(Query *query, deparse_context *context,
bool colNamesVisible);
static void get_merge_query_def(Query *query, deparse_context *context);
static void get_update_query_targetlist_def(Query *query, List *targetList,
deparse_context *context,
RangeTblEntry *rte);
@ -469,6 +471,7 @@ static void get_json_path_spec(Node *path_spec, deparse_context *context,
bool showimplicit);
static void get_json_table_columns(TableFunc *tf, JsonTableParent *node,
deparse_context *context, bool showimplicit);
static List *get_insert_column_names_list(List *targetList, StringInfo buf, deparse_context *context, RangeTblEntry *rte);
#define only_marker(rte) ((rte)->inh ? "" : "ONLY ")
@ -2106,6 +2109,10 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace,
get_delete_query_def(query, &context, colNamesVisible);
break;
case CMD_MERGE:
get_merge_query_def(query, &context);
break;
case CMD_NOTHING:
appendStringInfoString(buf, "NOTHING");
break;
@ -3237,9 +3244,8 @@ get_insert_query_def(Query *query, deparse_context *context,
RangeTblEntry *select_rte = NULL;
RangeTblEntry *values_rte = NULL;
RangeTblEntry *rte;
char *sep;
ListCell *l;
List *strippedexprs;
List *strippedexprs = NIL;
/* Insert the WITH clause if given */
get_with_clause(query, context);
@ -3293,43 +3299,11 @@ get_insert_query_def(Query *query, deparse_context *context,
* Add the insert-column-names list. Any indirection decoration needed on
* the column names can be inferred from the top targetlist.
*/
strippedexprs = NIL;
sep = "";
if (query->targetList)
appendStringInfoChar(buf, '(');
foreach(l, query->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(l);
if (tle->resjunk)
continue; /* ignore junk entries */
appendStringInfoString(buf, sep);
sep = ", ";
/*
* Put out name of target column; look in the catalogs, not at
* tle->resname, since resname will fail to track RENAME.
*/
appendStringInfoString(buf,
quote_identifier(get_attname(rte->relid,
tle->resno,
false)));
/*
* Print any indirection needed (subfields or subscripts), and strip
* off the top-level nodes representing the indirection assignments.
* Add the stripped expressions to strippedexprs. (If it's a
* single-VALUES statement, the stripped expressions are the VALUES to
* print below. Otherwise they're just Vars and not really
* interesting.)
*/
strippedexprs = lappend(strippedexprs,
processIndirection((Node *) tle->expr,
context));
strippedexprs = get_insert_column_names_list(query->targetList,
buf, context, rte);
}
if (query->targetList)
appendStringInfoString(buf, ") ");
if (query->override)
{
@ -3757,6 +3731,148 @@ get_delete_query_def(Query *query, deparse_context *context,
}
/* ----------
* get_merge_query_def - Parse back a MERGE parsetree
* ----------
*/
static void
get_merge_query_def(Query *query, deparse_context *context)
{
StringInfo buf = context->buf;
RangeTblEntry *targetRte;
/* Insert the WITH clause if given */
get_with_clause(query, context);
/*
* Start the query with MERGE INTO <target>
*/
targetRte = rt_fetch(query->resultRelation, query->rtable);
if (PRETTY_INDENT(context))
{
appendStringInfoChar(buf, ' ');
context->indentLevel += PRETTYINDENT_STD;
}
/* if it's a shard, do differently */
if (GetRangeTblKind(targetRte) == CITUS_RTE_SHARD)
{
char *fragmentSchemaName = NULL;
char *fragmentTableName = NULL;
ExtractRangeTblExtraData(targetRte, NULL, &fragmentSchemaName, &fragmentTableName, NULL);
/* use schema and table name from the remote alias */
appendStringInfo(buf, "MERGE INTO %s%s",
only_marker(targetRte),
generate_fragment_name(fragmentSchemaName, fragmentTableName));
if(targetRte->eref != NULL)
appendStringInfo(buf, " %s",
quote_identifier(get_rtable_name(query->resultRelation, context)));
}
else
{
appendStringInfo(buf, "MERGE INTO %s%s",
only_marker(targetRte),
generate_relation_or_shard_name(targetRte->relid,
context->distrelid,
context->shardid, NIL));
if (targetRte->alias != NULL)
appendStringInfo(buf, " %s",
quote_identifier(get_rtable_name(query->resultRelation, context)));
}
/*
* Add the MERGE source relation -- USING <source>
*/
get_from_clause(query, " USING ", context);
/*
* Add the MERGE ON condition
*/
Assert(query->jointree->quals != NULL);
{
appendContextKeyword(context, " ON ",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 1);
get_rule_expr(query->jointree->quals, context, false);
}
ListCell *actionCell = NULL;
foreach(actionCell, query->mergeActionList)
{
MergeAction *action = (MergeAction *) lfirst(actionCell);
/* Add WHEN [NOT] MATCHED */
appendContextKeyword(context, " WHEN",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 1);
appendStringInfo(buf, " %s", action->matched ? "MATCHED" : "NOT MATCHED");
/* Add optional AND <condition> */
if (action->qual)
{
appendContextKeyword(context, " AND ",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 1);
get_rule_expr(action->qual, context, false);
}
appendContextKeyword(context, " THEN",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 1);
switch (action->commandType)
{
case CMD_INSERT:
{
appendStringInfo(buf, " INSERT " );
List *strippedexprs = NIL;
if (action->targetList)
{
strippedexprs = get_insert_column_names_list(action->targetList,
buf, context, targetRte);
}
if (strippedexprs)
{
/* Add the single-VALUES expression list */
appendContextKeyword(context, "VALUES (",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 2);
get_rule_list_toplevel(strippedexprs, context, false);
appendStringInfoChar(buf, ')');
}
else
{
/* No expressions, so it must be DEFAULT VALUES */
appendStringInfoString(buf, "DEFAULT VALUES");
}
}
break;
case CMD_UPDATE:
appendStringInfo(buf, " UPDATE SET " );
get_update_query_targetlist_def(query, action->targetList,
context, targetRte);
break;
case CMD_DELETE:
appendStringInfo(buf, " DELETE" );
break;
case CMD_NOTHING:
appendStringInfo(buf, " DO NOTHING " );
break;
default:
elog(ERROR, "unknown action in MERGE WHEN clause");
}
}
ereport(DEBUG1, (errmsg("<Deparsed MERGE query: %s>", buf->data)));
}
/* ----------
* get_utility_query_def - Parse back a UTILITY parsetree
* ----------
@ -9442,4 +9558,54 @@ getOwnedSequences_internal(Oid relid, AttrNumber attnum, char deptype)
return result;
}
/*
* get_insert_column_names_list Prepares the insert-column-names list. Any indirection
* decoration needed on the column names can be inferred from the top targetlist.
*/
static List *
get_insert_column_names_list(List *targetList, StringInfo buf,
deparse_context *context, RangeTblEntry *rte)
{
char *sep;
ListCell *l;
List *strippedexprs;
strippedexprs = NIL;
sep = "";
appendStringInfoChar(buf, '(');
foreach(l, targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(l);
if (tle->resjunk)
continue; /* ignore junk entries */
appendStringInfoString(buf, sep);
sep = ", ";
/*
* Put out name of target column; look in the catalogs, not at
* tle->resname, since resname will fail to track RENAME.
*/
appendStringInfoString(buf,
quote_identifier(get_attname(rte->relid,
tle->resno,
false)));
/*
* Print any indirection needed (subfields or subscripts), and strip
* off the top-level nodes representing the indirection assignments.
* Add the stripped expressions to strippedexprs. (If it's a
* single-VALUES statement, the stripped expressions are the VALUES to
* print below. Otherwise they're just Vars and not really
* interesting.)
*/
strippedexprs = lappend(strippedexprs,
processIndirection((Node *) tle->expr,
context));
}
appendStringInfoString(buf, ") ");
return strippedexprs;
}
#endif /* (PG_VERSION_NUM >= PG_VERSION_15) && (PG_VERSION_NUM < PG_VERSION_16) */

View File

@ -77,7 +77,7 @@ RebuildQueryStrings(Job *workerJob)
query = copyObject(originalQuery);
}
if (UpdateOrDeleteQuery(query))
if (UpdateOrDeleteOrMergeQuery(query))
{
/*
* For UPDATE and DELETE queries, we may have subqueries and joins, so

View File

@ -25,6 +25,7 @@
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/cte_inline.h"
#include "distributed/function_call_delegation.h"
@ -74,7 +75,8 @@ static uint64 NextPlanId = 1;
/* keep track of planner call stack levels */
int PlannerLevel = 0;
static void ErrorIfQueryHasMergeCommand(Query *queryTree);
static void ErrorIfQueryHasUnsupportedMergeCommand(Query *queryTree,
List *rangeTableList);
static bool ContainsMergeCommandWalker(Node *node);
static bool ListContainsDistributedTableRTE(List *rangeTableList,
bool *maybeHasForeignDistributedTable);
@ -130,7 +132,7 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
static List * TranslatedVars(PlannerInfo *root, int relationIndex);
static void WarnIfListHasForeignDistributedTable(List *rangeTableList);
static bool AreTablesMergeSqlCommandSupported(List *rangeTableList);
/* Distributed planner hook */
PlannedStmt *
@ -202,7 +204,7 @@ distributed_planner(Query *parse,
* Fast path queries cannot have merge command, and we
* prevent the remaining here.
*/
ErrorIfQueryHasMergeCommand(parse);
ErrorIfQueryHasUnsupportedMergeCommand(parse, rangeTableList);
/*
* When there are partitioned tables (not applicable to fast path),
@ -303,11 +305,13 @@ distributed_planner(Query *parse,
/*
* ErrorIfQueryHasMergeCommand walks over the query tree and throws error
* if there are any Merge command (e.g., CMD_MERGE) in the query tree.
* ErrorIfQueryHasMergeCommand walks over the query tree and bails out if
* there is no Merge command (e.g., CMD_MERGE) in the query tree. For merge,
* looks for all supported combinations, throws an exception if any violations
* are seen.
*/
static void
ErrorIfQueryHasMergeCommand(Query *queryTree)
ErrorIfQueryHasUnsupportedMergeCommand(Query *queryTree, List *rangeTableList)
{
/*
* Postgres currently doesn't support Merge queries inside subqueries and
@ -316,11 +320,29 @@ ErrorIfQueryHasMergeCommand(Query *queryTree)
* We do not call this path for fast-path queries to avoid this additional
* overhead.
*/
if (ContainsMergeCommandWalker((Node *) queryTree))
if (!ContainsMergeCommandWalker((Node *) queryTree))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE command is not supported on Citus tables yet")));
/* No MERGE found */
return;
}
/*
* In Citus we have limited support for MERGE, it's allowed
* only if both the target and source relations are Citus-local
* or joined on colocated distributed tables.
*/
/* Check if all MERGE-relation(s)-combinations are supported */
if (AreTablesMergeSqlCommandSupported(rangeTableList))
{
/* supported MERGE scenario */
return;
}
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE command is not supported on Citus tables yet")));
}
@ -331,7 +353,10 @@ ErrorIfQueryHasMergeCommand(Query *queryTree)
static bool
ContainsMergeCommandWalker(Node *node)
{
#if PG_VERSION_NUM >= PG_VERSION_15
#if PG_VERSION_NUM < PG_VERSION_15
return false;
#endif
if (node == NULL)
{
return false;
@ -340,7 +365,7 @@ ContainsMergeCommandWalker(Node *node)
if (IsA(node, Query))
{
Query *query = (Query *) node;
if (query->commandType == CMD_MERGE)
if (IsMergeQuery(query))
{
return true;
}
@ -349,7 +374,6 @@ ContainsMergeCommandWalker(Node *node)
}
return expression_tree_walker(node, ContainsMergeCommandWalker, NULL);
#endif
return false;
}
@ -611,7 +635,7 @@ IsModifyCommand(Query *query)
CmdType commandType = query->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
commandType == CMD_DELETE || commandType == CMD_MERGE)
{
return true;
}
@ -2534,3 +2558,128 @@ WarnIfListHasForeignDistributedTable(List *rangeTableList)
}
}
}
/*
* AreTablesMergeSqlCommandSupported returns true if the tables in the MERGE
* command are supported, the two scenario supported are
* -- All the tables are Citus local
* -- All the tables are distributed and co-located
* For everything else, simply raise an exception.
*
* Note: We are deferring the expensive check of whether MERGE joined on the
* distribution column to the push-down planning phase.
*/
static bool
AreTablesMergeSqlCommandSupported(List *rangeTableList)
{
List *localList = NIL;
List *distList = NIL;
ListCell *tableCell = NULL;
foreach(tableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(tableCell);
Oid relationId = rangeTableEntry->relid;
/* Regular Postgres tables and Citus local tables are allowed */
if (IsCitusTable(relationId) ||
IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
continue;
}
/* Skip CTEs, sub-queries */
if (rangeTableEntry->rtekind == RTE_CTE ||
rangeTableEntry->rtekind == RTE_SUBQUERY)
{
continue;
}
if (rangeTableEntry->rtekind != RTE_RELATION)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Unsupported RTE type for MERGE command")));
}
#if 0
/* Combination of Citus distributed and local tables is not supported yet */
char relKind = get_rel_relkind(relationId);
if ((relKind == RELKIND_VIEW || relKind == RELKIND_MATVIEW) &&
IsViewDistributed(relationId))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE command is not supported with combination "
"of Postgres and Citus tables")));
}
#endif
/* Reference tables are not supported */
if (IsCitusTableType(relationId, REFERENCE_TABLE))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE command is not supported on Reference tables yet")));
}
if (IsCitusTableType(relationId, DISTRIBUTED_TABLE))
{
distList = lappend(distList, rangeTableEntry);
}
/* Any other Citus table type missing ? */
}
if (list_length(distList) == 0)
{
/* All the tables are Citus local, supported */
return true;
}
if (list_length(localList) != 0)
{
/* Combination of local and distributed tables is not supported */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE command is not supported on combination "
"of Citus local and distributed tables")));
}
/* Check to see if all the distributed tables are indeed colocated */
uint32 colocationId = INVALID_COLOCATION_ID;
foreach(tableCell, distList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(tableCell);
Oid relationId = rangeTableEntry->relid;
uint32 curColocationId = TableColocationId(relationId);
if (curColocationId == INVALID_COLOCATION_ID)
{
/* Not supported */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("For MERGE command all the tables must be colocated")));
}
if (colocationId == INVALID_COLOCATION_ID)
{
/* Save the first valid one */
colocationId = curColocationId;
}
if (curColocationId != colocationId)
{
/* All distributed tables must be colocated */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("For MERGE command all the tables must be colocated")));
}
}
return true;
}

View File

@ -162,16 +162,11 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
return false;
}
#if PG_VERSION_NUM >= PG_VERSION_15
if (query->commandType == CMD_MERGE)
if (IsMergeQuery(query))
{
/*
* Citus doesn't support MERGE command, lets return
* early and explicitly for fast-path queries.
*/
/* MERGE command is not a fast path query */
return false;
}
#endif
/*
* We want to deal with only very simple queries. Some of the

View File

@ -2472,7 +2472,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
* If it is a modify query with sub-select, we need to set result relation shard's id
* as anchor shard id.
*/
if (UpdateOrDeleteQuery(originalQuery))
if (UpdateOrDeleteOrMergeQuery(originalQuery))
{
resultRangeTable = rt_fetch(originalQuery->resultRelation, originalQuery->rtable);
resultRelationOid = resultRangeTable->relid;
@ -2501,7 +2501,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
anchorShardId = shardInterval->shardId;
}
}
else if (UpdateOrDeleteQuery(originalQuery))
else if (UpdateOrDeleteOrMergeQuery(originalQuery))
{
shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex];
if (!modifyWithSubselect || relationId == resultRelationOid)

View File

@ -127,9 +127,10 @@ static DeferredErrorMessage * ModifyPartialQuerySupported(Query *queryTree, bool
multiShardQuery,
Oid *distributedTableId);
static bool NodeIsFieldStore(Node *node);
static DeferredErrorMessage * MultiShardUpdateDeleteSupported(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
static DeferredErrorMessage * MultiShardUpdateDeleteMergeSupported(Query *originalQuery,
PlannerRestrictionContext
*
plannerRestrictionContext);
static DeferredErrorMessage * SingleShardUpdateDeleteSupported(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
@ -233,7 +234,7 @@ CreateModifyPlan(Query *originalQuery, Query *query,
return distributedPlan;
}
if (UpdateOrDeleteQuery(query))
if (UpdateOrDeleteOrMergeQuery(query))
{
job = RouterJob(originalQuery, plannerRestrictionContext,
&distributedPlan->planningError);
@ -539,7 +540,7 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
if (queryTree->hasSubLinks == true)
{
/* we support subqueries for INSERTs only via INSERT INTO ... SELECT */
if (!UpdateOrDeleteQuery(queryTree))
if (!UpdateOrDeleteOrMergeQuery(queryTree))
{
Assert(queryTree->commandType == CMD_INSERT);
@ -998,7 +999,7 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
* We support UPDATE and DELETE with subqueries and joins unless
* they are multi shard queries.
*/
if (UpdateOrDeleteQuery(queryTree))
if (UpdateOrDeleteOrMergeQuery(queryTree))
{
continue;
}
@ -1059,8 +1060,9 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
if (multiShardQuery)
{
errorMessage = MultiShardUpdateDeleteSupported(originalQuery,
plannerRestrictionContext);
errorMessage = MultiShardUpdateDeleteMergeSupported(
originalQuery,
plannerRestrictionContext);
}
else
{
@ -1243,8 +1245,8 @@ ErrorIfOnConflictNotSupported(Query *queryTree)
* not pushdownable, otherwise it returns NULL.
*/
static DeferredErrorMessage *
MultiShardUpdateDeleteSupported(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext)
MultiShardUpdateDeleteMergeSupported(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
DeferredErrorMessage *errorMessage = NULL;
RangeTblEntry *resultRangeTable = ExtractResultRelationRTE(originalQuery);
@ -1382,14 +1384,25 @@ HasDangerousJoinUsing(List *rtableList, Node *joinTreeNode)
/*
* UpdateOrDeleteQuery checks if the given query is an UPDATE or DELETE command.
* UpdateOrDeleteOrMergeQuery checks if the given query is an UPDATE or DELETE command.
* If it is, it returns true otherwise it returns false.
*/
bool
UpdateOrDeleteQuery(Query *query)
UpdateOrDeleteOrMergeQuery(Query *query)
{
return query->commandType == CMD_UPDATE ||
query->commandType == CMD_DELETE;
return (query->commandType == CMD_UPDATE ||
query->commandType == CMD_DELETE ||
query->commandType == CMD_MERGE);
}
/*
* IsMergeQuery checks if the given query is a MERGE SQL command.
*/
bool
IsMergeQuery(Query *query)
{
return (query->commandType == CMD_MERGE);
}
@ -1827,7 +1840,19 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
if (*planningError)
{
return NULL;
/*
* For MERGE, we do _not_ plan anything other than Router job, let's
* not continue further down the lane in distributed planning, simply
* bail out.
*/
if (IsMergeQuery(originalQuery))
{
RaiseDeferredError(*planningError, ERROR);
}
else
{
return NULL;
}
}
Job *job = CreateJob(originalQuery);
@ -1835,7 +1860,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
if (originalQuery->resultRelation > 0)
{
RangeTblEntry *updateOrDeleteRTE = ExtractResultRelationRTE(originalQuery);
RangeTblEntry *updateOrDeleteOrMergeRTE = ExtractResultRelationRTE(originalQuery);
/*
* If all of the shards are pruned, we replace the relation RTE into
@ -1843,9 +1868,11 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
* for UPDATE and DELETE queries. Therefore, if we detect a UPDATE or
* DELETE RTE with subquery type, we just set task list to empty and return
* the job.
* Note: This scenario is invalid for MERGE query.
*/
if (updateOrDeleteRTE->rtekind == RTE_SUBQUERY)
if (updateOrDeleteOrMergeRTE->rtekind == RTE_SUBQUERY)
{
Assert(!IsMergeQuery(originalQuery));
job->taskList = NIL;
return job;
}
@ -2250,7 +2277,7 @@ PlanRouterQuery(Query *originalQuery,
* We defer error here, later the planner is forced to use a generic plan
* by assigning arbitrarily high cost to the plan.
*/
if (UpdateOrDeleteQuery(originalQuery) && isMultiShardQuery)
if (UpdateOrDeleteOrMergeQuery(originalQuery) && isMultiShardQuery)
{
planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"Router planner cannot handle multi-shard "
@ -2289,7 +2316,7 @@ PlanRouterQuery(Query *originalQuery,
NULL, NULL);
}
Assert(UpdateOrDeleteQuery(originalQuery));
Assert(UpdateOrDeleteOrMergeQuery(originalQuery));
planningError = ModifyQuerySupported(originalQuery, originalQuery,
isMultiShardQuery,
plannerRestrictionContext);
@ -2361,7 +2388,7 @@ PlanRouterQuery(Query *originalQuery,
* If this is an UPDATE or DELETE query which requires coordinator evaluation,
* don't try update shard names, and postpone that to execution phase.
*/
bool isUpdateOrDelete = UpdateOrDeleteQuery(originalQuery);
bool isUpdateOrDelete = UpdateOrDeleteOrMergeQuery(originalQuery);
if (!(isUpdateOrDelete && RequiresCoordinatorEvaluation(originalQuery)))
{
UpdateRelationToShardNames((Node *) originalQuery, *relationShardList);

View File

@ -618,10 +618,20 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
}
else if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"complex joins are only supported when all distributed tables are "
"co-located and joined on their distribution columns",
NULL, NULL);
if (IsMergeQuery(originalQuery))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"MERGE ON join condition is supported when all distributed tables are "
"co-located and joined on their distribution columns",
NULL, NULL);
}
else
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"complex joins are only supported when all distributed tables are "
"co-located and joined on their distribution columns",
NULL, NULL);
}
}
/* we shouldn't allow reference tables in the FROM clause when the query has sublinks */

View File

@ -29,6 +29,11 @@
#define CURSOR_OPT_FORCE_DISTRIBUTED 0x080000
/* Hack to compile Citus on pre-MERGE Postgres versions */
#if PG_VERSION_NUM < PG_VERSION_15
#define CMD_MERGE CMD_UNKNOWN
#endif
/* level of planner calls */
extern int PlannerLevel;

View File

@ -76,7 +76,8 @@ extern RangeTblEntry * ExtractResultRelationRTEOrError(Query *query);
extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query);
extern bool IsMultiRowInsert(Query *query);
extern void AddPartitionKeyNotNullFilterToSelect(Query *subqery);
extern bool UpdateOrDeleteQuery(Query *query);
extern bool UpdateOrDeleteOrMergeQuery(Query *query);
extern bool IsMergeQuery(Query *query);
extern uint64 GetAnchorShardId(List *relationShardList);
extern List * TargetShardIntervalForFastPathQuery(Query *query,

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,6 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q

View File

@ -283,7 +283,7 @@ SELECT citus_add_local_table_to_metadata('tbl1');
MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE;
ERROR: MERGE command is not supported on Citus tables yet
ERROR: MERGE command is not supported with combination of Postgres and Citus tables
SELECT undistribute_table('tbl1');
NOTICE: creating a new table for pg15.tbl1
NOTICE: moving the data of pg15.tbl1
@ -303,7 +303,7 @@ SELECT citus_add_local_table_to_metadata('tbl2');
MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE;
ERROR: MERGE command is not supported on Citus tables yet
ERROR: MERGE command is not supported with combination of Postgres and Citus tables
-- one table is reference, the other local, not supported
SELECT create_reference_table('tbl2');
create_reference_table
@ -313,7 +313,7 @@ SELECT create_reference_table('tbl2');
MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE;
ERROR: MERGE command is not supported on Citus tables yet
ERROR: MERGE command is not supported with combination of Postgres and Citus tables
-- now, both are reference, still not supported
SELECT create_reference_table('tbl1');
create_reference_table
@ -323,7 +323,7 @@ SELECT create_reference_table('tbl1');
MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE;
ERROR: MERGE command is not supported on Citus tables yet
ERROR: MERGE command is not supported on Reference tables yet
-- now, both distributed, not works
SELECT undistribute_table('tbl1');
NOTICE: creating a new table for pg15.tbl1
@ -365,14 +365,14 @@ SELECT create_distributed_table('tbl2', 'x');
MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE;
ERROR: MERGE command is not supported on Citus tables yet
ERROR: MERGE ON join condition is supported when all distributed tables are co-located and joined on their distribution columns
-- also, not inside subqueries & ctes
WITH targq AS (
SELECT * FROM tbl2
)
MERGE INTO tbl1 USING targq ON (true)
WHEN MATCHED THEN DELETE;
ERROR: MERGE command is not supported on Citus tables yet
ERROR: MERGE ON join condition is supported when all distributed tables are co-located and joined on their distribution columns
-- crashes on beta3, fixed on 15 stable
--WITH foo AS (
-- MERGE INTO tbl1 USING tbl2 ON (true)
@ -387,7 +387,7 @@ USING tbl2
ON (true)
WHEN MATCHED THEN
UPDATE SET x = (SELECT count(*) FROM tbl2);
ERROR: MERGE command is not supported on Citus tables yet
ERROR: MERGE ON join condition is supported when all distributed tables are co-located and joined on their distribution columns
-- Clean up
DROP SCHEMA pg15 CASCADE;
NOTICE: drop cascades to 9 other objects

File diff suppressed because it is too large Load Diff

View File

@ -111,3 +111,5 @@ test: ensure_no_intermediate_data_leak
# --------
test: ensure_no_shared_connection_leak
test: check_mx
test: merge

View File

@ -0,0 +1,710 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
\endif
-- MERGE command performs a join from data_source to target_table_name
DROP SCHEMA IF EXISTS merge_schema CASCADE;
--MERGE INTO target
--USING source
--WHEN NOT MATCHED
--WHEN MATCHED AND <condition>
--WHEN MATCHED
CREATE SCHEMA merge_schema;
SET search_path TO merge_schema;
SET citus.shard_count TO 4;
SET citus.next_shard_id TO 4000000;
SET citus.explain_all_tasks to true;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
CREATE TABLE source
(
order_id INT,
customer_id INT,
order_center VARCHAR,
order_time timestamp
);
CREATE TABLE target
(
customer_id INT,
last_order_id INT,
order_center VARCHAR,
order_count INT,
last_order timestamp
);
CREATE FUNCTION insert_data() RETURNS VOID AS $$
INSERT INTO source (order_id, customer_id, order_center, order_time)
VALUES (101, 30000, 'WX', '2022-01-01 00:00:00'); -- Do not match
INSERT INTO source (order_id, customer_id, order_center, order_time)
VALUES (102, 30001, 'CX', '2022-01-01 00:00:00'); -- Do not match
INSERT INTO source (order_id, customer_id, order_center, order_time)
VALUES (103, 30002, 'AX', '2022-01-01 00:00:00'); -- Does match
INSERT INTO source (order_id, customer_id, order_center, order_time)
VALUES (104, 30003, 'JX','2022-01-01 00:00:00' ); -- Does match
INSERT INTO source (order_id, customer_id, order_center, order_time)
VALUES (105, 30004, 'JX','2022-01-01 00:00:00' ); -- Does match
INSERT INTO target (customer_id, last_order_id, order_center, order_count, last_order)
VALUES (40000, 097, 'MK', -1, '2019-09-15 08:13:00');
INSERT INTO target (customer_id, last_order_id, order_center, order_count, last_order)
VALUES (40001, 098, 'NU', -1, '2020-07-12 01:05:00');
INSERT INTO target (customer_id, last_order_id, order_center, order_count, last_order)
VALUES (40002, 100, 'DS', -1, '2022-05-21 04:12:00');
INSERT INTO target (customer_id, last_order_id, order_center, order_count, last_order)
VALUES (30002, 103, 'AX', -1, '2021-01-17 19:53:00'); -- Matches the source
INSERT INTO target (customer_id, last_order_id, order_center, order_count, last_order)
VALUES (30003, 099, 'JX', -1, '2020-09-11 03:23:00'); -- Matches the source
INSERT INTO target (customer_id, last_order_id, order_center, order_count, last_order)
VALUES (30004, 099, 'XX', -1, '2020-09-11 03:23:00'); -- Matches the source id AND the condition.
$$
LANGUAGE SQL;
SELECT insert_data();
SELECT 'Testing PG tables';
MERGE INTO target t
USING source s
ON (t.customer_id = s.customer_id)
WHEN MATCHED AND t.order_center = 'XX' THEN
DELETE
WHEN MATCHED THEN
UPDATE SET -- Existing customer, update the order count and last_order_id
order_count = t.order_count + 1,
last_order_id = s.order_id
WHEN NOT MATCHED THEN -- New entry, record it.
INSERT (customer_id, last_order_id, order_center, order_count, last_order)
VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time);
-- Our gold result to compare against
SELECT * INTO pg_result FROM target ORDER BY 1 ;
-- Clean the slate
TRUNCATE source;
TRUNCATE target;
SELECT insert_data();
-- Test with both target and source as Citus local
SELECT 'local - local';
SELECT citus_add_local_table_to_metadata('target');
SELECT citus_add_local_table_to_metadata('source');
MERGE INTO target t
USING source s
ON (t.customer_id = s.customer_id)
WHEN MATCHED AND t.order_center = 'XX' THEN
DELETE
WHEN MATCHED THEN
UPDATE SET -- Existing customer, update the order count and last_order_id
order_count = t.order_count + 1,
last_order_id = s.order_id
WHEN NOT MATCHED THEN -- New entry, record it.
INSERT (customer_id, last_order_id, order_center, order_count, last_order)
VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time);
SELECT * INTO local_local FROM target ORDER BY 1 ;
-- Should be equal
SELECT c.*, p.*
FROM local_local c, pg_result p
WHERE c.customer_id = p.customer_id
ORDER BY 1,2;
-- Must return zero rows
SELECT *
FROM pg_result p
WHERE NOT EXISTS (SELECT FROM local_local c WHERE c.customer_id = p.customer_id);
SELECT 'Testing Dist - Dist';
-- Clean the slate
TRUNCATE source;
TRUNCATE target;
SELECT insert_data();
SELECT undistribute_table('target');
SELECT undistribute_table('source');
SELECT create_distributed_table('target', 'customer_id');
SELECT create_distributed_table('source', 'customer_id');
MERGE INTO target t
USING source s
ON (t.customer_id = s.customer_id)
WHEN MATCHED AND t.order_center = 'XX' THEN
DELETE
WHEN MATCHED THEN
UPDATE SET -- Existing customer, update the order count and last_order_id
order_count = t.order_count + 1,
last_order_id = s.order_id
WHEN NOT MATCHED THEN -- New entry, record it.
INSERT (customer_id, last_order_id, order_center, order_count, last_order)
VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time);
SELECT * INTO dist_dist FROM target ORDER BY 1 ;
-- Should be equal
SELECT c.*, p.*
FROM dist_dist c, pg_result p
WHERE c.customer_id = p.customer_id
ORDER BY 1,2;
-- Must return zero rows
SELECT *
FROM pg_result p
WHERE NOT EXISTS (SELECT FROM dist_dist c WHERE c.customer_id = p.customer_id);
-- Test EXPLAIN
EXPLAIN
MERGE INTO target t
USING source s
ON (t.customer_id = s.customer_id)
WHEN MATCHED AND t.order_center = 'XX' THEN
DELETE
WHEN MATCHED THEN
UPDATE SET -- Existing customer, update the order count and last_order_id
order_count = t.order_count + 1,
last_order_id = s.order_id
WHEN NOT MATCHED THEN -- New entry, record it.
INSERT (customer_id, last_order_id, order_center, order_count, last_order)
VALUES (customer_id, s.order_id, s.order_center, 123, s.order_time);
--
-- Test MERGE with CTE as source
--
CREATE TABLE t1(id int, val int);
CREATE TABLE s1(id int, val int);
CREATE FUNCTION load() RETURNS VOID AS $$
INSERT INTO s1 VALUES(1, 0); -- Matches DELETE clause
INSERT INTO s1 VALUES(2, 1); -- Matches UPDATE clause
INSERT INTO s1 VALUES(3, 1); -- No Match INSERT clause
INSERT INTO s1 VALUES(4, 1); -- No Match INSERT clause
INSERT INTO s1 VALUES(6, 1); -- No Match INSERT clause
INSERT INTO t1 VALUES(1, 0); -- Will be deleted
INSERT INTO t1 VALUES(2, 0); -- Will be updated
INSERT INTO t1 VALUES(5, 0); -- Will be intact
$$
LANGUAGE SQL;
SELECT 'Testing PG tables';
SELECT load();
WITH pg_res AS (
SELECT * FROM s1
)
MERGE INTO t1
USING pg_res ON (pg_res.id = t1.id)
WHEN MATCHED AND pg_res.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (pg_res.id, pg_res.val);
-- Two rows with id 2 and val incremented, id 3, and id 1 is deleted
SELECT * FROM t1 order by id;
SELECT * INTO merge_result FROM t1 order by id;
-- Test Citus local tables
TRUNCATE t1;
TRUNCATE s1;
SELECT load();
SELECT citus_add_local_table_to_metadata('t1');
SELECT citus_add_local_table_to_metadata('s1');
WITH s1_res AS (
SELECT * FROM s1
)
MERGE INTO t1
USING s1_res ON (s1_res.id = t1.id)
WHEN MATCHED AND s1_res.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1_res.id, s1_res.val);
-- Two rows with id 2 and val incremented, id 3, and id 1 is deleted
SELECT * FROM t1 order by id;
-- Should be empty
SELECT *
FROM merge_result p
WHERE NOT EXISTS (SELECT 1 FROM t1 c WHERE c.id = p.id);
SELECT 'Testing dist - dist';
SELECT undistribute_table('t1');
SELECT undistribute_table('s1');
TRUNCATE t1;
TRUNCATE s1;
SELECT load();
SELECT create_distributed_table('t1', 'id');
SELECT create_distributed_table('s1', 'id');
WITH s1_res AS (
SELECT * FROM s1
)
MERGE INTO t1
USING s1_res ON (s1_res.id = t1.id)
WHEN MATCHED AND s1_res.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1_res.id, s1_res.val);
-- Two rows with id 2 and val incremented, id 3, and id 1 is deleted
SELECT * FROM t1 order by id;
-- Both the results should be equal
SELECT c.id, c.val, p.id, p.val
FROM t1 c, merge_result p
WHERE c.id = p.id ORDER BY c.id;
-- Test EXPLAIN with CTE
EXPLAIN
WITH s1_res AS (
SELECT * FROM s1
)
MERGE INTO t1
USING s1_res ON (s1_res.id = t1.id)
WHEN MATCHED AND s1_res.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1_res.id, s1_res.val);
--
-- Test with multiple join conditions
--
CREATE TABLE t2(id int, val int, src text);
CREATE TABLE s2(id int, val int, src text);
CREATE OR REPLACE FUNCTION insert_data() RETURNS VOID AS $$
INSERT INTO t2 VALUES(1, 0, 'target');
INSERT INTO t2 VALUES(2, 0, 'target');
INSERT INTO t2 VALUES(3, 1, 'match');
INSERT INTO t2 VALUES(4, 0, 'match');
INSERT INTO s2 VALUES(2, 0, 'source'); -- No match insert
INSERT INTO s2 VALUES(4, 0, 'match'); -- Match delete
INSERT INTO s2 VALUES(3, 10, 'match'); -- Match update
$$
LANGUAGE SQL;
SELECT 'Testing PG tables';
SELECT insert_data();
MERGE INTO t2
USING s2
ON t2.id = s2.id AND t2.src = s2.src
WHEN MATCHED AND t2.val = 1 THEN
UPDATE SET val = s2.val + 10
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src);
SELECT * FROM t2 ORDER BY 1;
SELECT * INTO pg_t2 FROM t2;
SELECT 'Testing Citus local tables';
TRUNCATE t2;
TRUNCATE s2;
SELECT insert_data();
SELECT citus_add_local_table_to_metadata('t2');
SELECT citus_add_local_table_to_metadata('s2');
MERGE INTO t2
USING s2
ON t2.id = s2.id AND t2.src = s2.src
WHEN MATCHED AND t2.val = 1 THEN
UPDATE SET val = s2.val + 10
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src);
SELECT * FROM t2 ORDER BY 1;
-- Should be empty
SELECT *
FROM pg_t2 p
WHERE NOT EXISTS (SELECT 1 FROM t2 c WHERE c.id = p.id);
SELECT 'Testing Dist - Dist';
-- Clean the slate
TRUNCATE t2;
TRUNCATE s2;
SELECT insert_data();
SELECT undistribute_table('t2');
SELECT undistribute_table('s2');
SELECT create_distributed_table('t2', 'id');
SELECT create_distributed_table('s2', 'id');
MERGE INTO t2
USING s2
ON t2.id = s2.id AND t2.src = s2.src
WHEN MATCHED AND t2.val = 1 THEN
UPDATE SET val = s2.val + 10
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT (id, val, src) VALUES (s2.id, s2.val, s2.src);
SELECT * FROM t2 ORDER BY 1;
SELECT * INTO dist_t2 FROM t2 ORDER BY 1;
-- Should be equal
SELECT c.*, p.*
FROM t2 c,pg_t2 p
WHERE c.id = p.id AND c.src = p.src
ORDER BY 1,2;
-- Should be empty
SELECT *
FROM pg_t2 p
WHERE NOT EXISTS (SELECT 1 FROM dist_t2 c WHERE c.id = p.id);
--
-- With sub-query as the MERGE source
--
TRUNCATE t2;
TRUNCATE s2;
SELECT insert_data();
MERGE INTO t2 t
USING (SELECT * FROM s2) s
ON t.id = s.id AND t.src = s.src
WHEN MATCHED AND t.val = 1 THEN
UPDATE SET val = s.val + 10
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT (id, val, src) VALUES (s.id, s.val, s.src);
SELECT * FROM t2 ORDER BY 1;
SELECT * INTO dist_res FROM t2 ORDER BY 1;
-- Should be equal
SELECT c.*, p.*
FROM t2 c, pg_t2 p
WHERE c.id = p.id AND c.src = p.src
ORDER BY 1,2;
-- Should be empty
SELECT *
FROM pg_t2 p
WHERE NOT EXISTS (SELECT 1 FROM dist_res c WHERE c.id = p.id);
--
-- Using two source tables
--
CREATE TABLE t3(id int, val int, src text);
CREATE TABLE s3_1(id int, val int, src text);
CREATE TABLE s3_2(id int, val int, src text);
CREATE OR REPLACE FUNCTION insert_data() RETURNS VOID AS $$
INSERT INTO t3 VALUES(1, 0, 'target'); -- Intact
INSERT INTO t3 VALUES(2, 0, 'target');
INSERT INTO t3 VALUES(3, 0, 'target');
INSERT INTO t3 VALUES(5, 0, 'target'); -- Intact
INSERT INTO s3_1 VALUES(2, 0, 'source1');
INSERT INTO s3_1 VALUES(3, 0, 'source1');
INSERT INTO s3_1 VALUES(4, 0, 'source1');
INSERT INTO s3_2 VALUES(2, 1, 'source2'); -- Match update
INSERT INTO s3_2 VALUES(3, 0, 'source2'); -- Match delete
INSERT INTO s3_2 VALUES(4, 0, 'source2'); -- No match insert
INSERT INTO s3_2 VALUES(6, 0, 'source2'); -- Will miss the source-subquery-join
$$
LANGUAGE SQL;
SELECT insert_data();
MERGE INTO t3
USING (SELECT s3_1.id, s3_2.val, s3_2.src FROM s3_1, s3_2 WHERE s3_1.id = s3_2.id) sub
ON (t3.id = sub.id)
WHEN MATCHED AND sub.val = 1 THEN
UPDATE SET val = t3.val + 10
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT (id, val, src) VALUES (sub.id, sub.val, sub.src);
SELECT * FROM t3 ORDER BY 1;
SELECT * INTO pg_t3 FROM t3 ORDER BY 1;
SELECT 'Testing dist - dist';
TRUNCATE t3;
TRUNCATE s3_1;
TRUNCATE s3_2;
SELECT insert_data();
SELECT create_distributed_table('t3', 'id');
SELECT create_distributed_table('s3_1', 'id');
SELECT create_distributed_table('s3_2', 'id');
MERGE INTO t3
USING (SELECT s3_1.id, s3_2.val, s3_2.src FROM s3_1, s3_2 WHERE s3_1.id = s3_2.id) sub
ON (t3.id = sub.id)
WHEN MATCHED AND sub.val = 1 THEN
UPDATE SET val = t3.val + 10
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT (id, val, src) VALUES (sub.id, sub.val, sub.src);
SELECT * FROM t3 ORDER BY 1;
SELECT * INTO dist_t3 FROM t3 ORDER BY 1;
-- Should be equal
SELECT c.*, p.*
FROM t3 c, pg_t3 p
WHERE c.id = p.id
ORDER BY 1,2;
-- Should be empty
SELECT *
FROM pg_t3 p
WHERE NOT EXISTS (SELECT 1 FROM dist_t3 c WHERE c.id = p.id);
--
-- Error and Unsupported scenarios
--
TRUNCATE t1;
TRUNCATE s1;
SELECT load();
-- Not joining on partition columns
MERGE INTO t1
USING s1 ON (s1.id = t1.val) -- val is not a distribution column
WHEN MATCHED AND s1.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1.id, s1.val);
-- Not joining on partition columns with sub-query
MERGE INTO t1
USING (SELECT * FROM s1) sub ON (sub.val = t1.id) -- sub.val is not a distribution column
WHEN MATCHED AND sub.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (sub.id, sub.val);
-- Not joining on partition columns with CTE
WITH s1_res AS (
SELECT * FROM s1
)
MERGE INTO t1
USING s1_res ON (s1_res.val = t1.id)
WHEN MATCHED AND s1_res.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1_res.id, s1_res.val);
-- Not joining on partition columns inside the sub-query
MERGE INTO t3
-- s3_2.val is not a distribution column
USING (SELECT s3_1.id, s3_2.val, s3_2.src FROM s3_1, s3_2 WHERE s3_1.id = s3_2.val) sub
ON (t3.id = sub.id)
WHEN MATCHED AND sub.val = 1 THEN
UPDATE SET val = t3.val + 10
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT (id, val, src) VALUES (sub.id, sub.val, sub.src);
-- Constant Join condition
WITH s1_res AS (
SELECT * FROM s1
)
MERGE INTO t1
USING s1_res ON (TRUE)
WHEN MATCHED AND s1_res.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1_res.id, s1_res.val);
-- With a single WHEN clause, which causes a non-left join
WITH s1_res AS (
SELECT * FROM s1
)
MERGE INTO t1 USING s1_res ON (s1_res.id = t1.val)
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1_res.id, s1_res.val);
--
-- Tables not co-located
--
SELECT undistribute_table('t1');
SELECT undistribute_table('s1');
SELECT create_distributed_table('t1', 'id');
SELECT create_distributed_table('s1', 'id', colocate_with => 'none');
MERGE INTO t1
USING s1 ON (s1.id = t1.id)
WHEN MATCHED AND s1.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1.id, s1.val);
MERGE INTO t1
USING (SELECT * FROM s1) sub ON (sub.id = t1.id) -- Non co-located table in subquery
WHEN MATCHED AND sub.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (sub.id, sub.val);
--
-- Reference tables
--
SELECT undistribute_table('t1');
SELECT undistribute_table('s1');
SELECT create_reference_table('t1');
SELECT create_reference_table('s1');
MERGE INTO t1
USING s1 ON (s1.id = t1.id)
WHEN MATCHED AND s1.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1.id, s1.val);
--
-- Citus-Local + Citus-Distributed table
--
SELECT undistribute_table('t1');
SELECT undistribute_table('s1');
SELECT create_distributed_table('t1', 'id');
SELECT citus_add_local_table_to_metadata('s1');
MERGE INTO t1
USING s1 ON (s1.id = t1.id)
WHEN MATCHED AND s1.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1.id, s1.val);
MERGE INTO t1
USING (SELECT * FROM s1) sub ON (sub.id = t1.id)
WHEN MATCHED AND sub.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (sub.id, sub.val);
CREATE TABLE pg(val int);
SELECT undistribute_table('s1');
SELECT create_distributed_table('s1', 'id');
-- Both t1 and s1 are citus distributed tables now, mix Postgres table in sub-query
MERGE INTO t1
USING (SELECT s1.id, pg.val FROM s1, pg) sub ON (sub.id = t1.id)
WHEN MATCHED AND sub.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (sub.id, sub.val);
-- Mix Postgres table in CTE
WITH pg_res AS (
SELECT * FROM pg
)
MERGE INTO t1
USING (SELECT s1.id, pg_res.val FROM s1, pg_res) sub ON (sub.id = t1.id)
WHEN MATCHED AND sub.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (sub.id, sub.val);
-- Bad Join condition
MERGE INTO t1 t
USING s1 s
ON t.id != s.id
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s.id, s.val);
-- Match more than one source row should fail same as Postgres behavior
SELECT undistribute_table('t1');
SELECT undistribute_table('s1');
SELECT create_distributed_table('t1', 'id');
SELECT create_distributed_table('s1', 'id');
INSERT INTO s1 VALUES(1, 1); -- From load(), we already have row with id = 1
MERGE INTO t1
USING s1 ON (s1.id = t1.id)
WHEN MATCHED AND s1.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1.id, s1.val);
DROP SCHEMA merge_schema CASCADE;
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
SELECT 1 FROM master_remove_node('localhost', :master_port);

File diff suppressed because it is too large Load Diff