Support MERGE Phase – I

All the tables (target, source or any CTE present) in the SQL statement are local i.e. a merge-sql with a combination of Citus local and
Non-Citus tables (regular Postgres tables) should work and give the same result as Postgres MERGE on regular tables. Catch and throw an
exception (not-yet-supported) for all other scenarios during Citus-planning phase.
pull/6567/head
Teja Mupparti 2022-08-11 20:36:04 -07:00 committed by Teja Mupparti
parent 5268d0a6cb
commit 9a9989fc15
17 changed files with 6558 additions and 95 deletions

View File

@ -368,6 +368,8 @@ static void get_insert_query_def(Query *query, deparse_context *context,
bool colNamesVisible);
static void get_update_query_def(Query *query, deparse_context *context,
bool colNamesVisible);
static void get_merge_query_def(Query *query, deparse_context *context);
static void get_update_query_targetlist_def(Query *query, List *targetList,
deparse_context *context,
RangeTblEntry *rte);
@ -459,6 +461,7 @@ static char *generate_function_name(Oid funcid, int nargs,
List *argnames, Oid *argtypes,
bool has_variadic, bool *use_variadic_p,
ParseExprKind special_exprkind);
static List *get_insert_column_names_list(List *targetList, StringInfo buf, deparse_context *context, RangeTblEntry *rte);
#define only_marker(rte) ((rte)->inh ? "" : "ONLY ")
@ -2095,6 +2098,10 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace,
get_delete_query_def(query, &context, colNamesVisible);
break;
case CMD_MERGE:
get_merge_query_def(query, &context);
break;
case CMD_NOTHING:
appendStringInfoString(buf, "NOTHING");
break;
@ -3225,9 +3232,8 @@ get_insert_query_def(Query *query, deparse_context *context,
RangeTblEntry *select_rte = NULL;
RangeTblEntry *values_rte = NULL;
RangeTblEntry *rte;
char *sep;
ListCell *l;
List *strippedexprs;
List *strippedexprs = NIL;
/* Insert the WITH clause if given */
get_with_clause(query, context);
@ -3281,43 +3287,11 @@ get_insert_query_def(Query *query, deparse_context *context,
* Add the insert-column-names list. Any indirection decoration needed on
* the column names can be inferred from the top targetlist.
*/
strippedexprs = NIL;
sep = "";
if (query->targetList)
appendStringInfoChar(buf, '(');
foreach(l, query->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(l);
if (tle->resjunk)
continue; /* ignore junk entries */
appendStringInfoString(buf, sep);
sep = ", ";
/*
* Put out name of target column; look in the catalogs, not at
* tle->resname, since resname will fail to track RENAME.
*/
appendStringInfoString(buf,
quote_identifier(get_attname(rte->relid,
tle->resno,
false)));
/*
* Print any indirection needed (subfields or subscripts), and strip
* off the top-level nodes representing the indirection assignments.
* Add the stripped expressions to strippedexprs. (If it's a
* single-VALUES statement, the stripped expressions are the VALUES to
* print below. Otherwise they're just Vars and not really
* interesting.)
*/
strippedexprs = lappend(strippedexprs,
processIndirection((Node *) tle->expr,
context));
strippedexprs = get_insert_column_names_list(query->targetList,
buf, context, rte);
}
if (query->targetList)
appendStringInfoString(buf, ") ");
if (query->override)
{
@ -3741,6 +3715,148 @@ get_delete_query_def(Query *query, deparse_context *context,
}
}
/* ----------
* get_merge_query_def - Parse back a MERGE parsetree
* ----------
*/
static void
get_merge_query_def(Query *query, deparse_context *context)
{
StringInfo buf = context->buf;
RangeTblEntry *targetRte;
/* Insert the WITH clause if given */
get_with_clause(query, context);
/*
* Start the query with MERGE INTO <target>
*/
targetRte = rt_fetch(query->resultRelation, query->rtable);
if (PRETTY_INDENT(context))
{
appendStringInfoChar(buf, ' ');
context->indentLevel += PRETTYINDENT_STD;
}
/* if it's a shard, do differently */
if (GetRangeTblKind(targetRte) == CITUS_RTE_SHARD)
{
char *fragmentSchemaName = NULL;
char *fragmentTableName = NULL;
ExtractRangeTblExtraData(targetRte, NULL, &fragmentSchemaName, &fragmentTableName, NULL);
/* use schema and table name from the remote alias */
appendStringInfo(buf, "MERGE INTO %s%s",
only_marker(targetRte),
generate_fragment_name(fragmentSchemaName, fragmentTableName));
if(targetRte->eref != NULL)
appendStringInfo(buf, " %s",
quote_identifier(get_rtable_name(query->resultRelation, context)));
}
else
{
appendStringInfo(buf, "MERGE INTO %s%s",
only_marker(targetRte),
generate_relation_or_shard_name(targetRte->relid,
context->distrelid,
context->shardid, NIL));
if (targetRte->alias != NULL)
appendStringInfo(buf, " %s",
quote_identifier(get_rtable_name(query->resultRelation, context)));
}
/*
* Add the MERGE source relation -- USING <source>
*/
get_from_clause(query, " USING ", context);
/*
* Add the MERGE ON condition
*/
Assert(query->jointree->quals != NULL);
{
appendContextKeyword(context, " ON ",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 1);
get_rule_expr(query->jointree->quals, context, false);
}
ListCell *actionCell = NULL;
foreach(actionCell, query->mergeActionList)
{
MergeAction *action = (MergeAction *) lfirst(actionCell);
/* Add WHEN [NOT] MATCHED */
appendContextKeyword(context, " WHEN",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 1);
appendStringInfo(buf, " %s", action->matched ? "MATCHED" : "NOT MATCHED");
/* Add optional AND <condition> */
if (action->qual)
{
appendContextKeyword(context, " AND ",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 1);
get_rule_expr(action->qual, context, false);
}
appendContextKeyword(context, " THEN",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 1);
switch (action->commandType)
{
case CMD_INSERT:
{
appendStringInfo(buf, " INSERT " );
List *strippedexprs = NIL;
if (action->targetList)
{
strippedexprs = get_insert_column_names_list(action->targetList,
buf, context, targetRte);
}
if (strippedexprs)
{
/* Add the single-VALUES expression list */
appendContextKeyword(context, "VALUES (",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 2);
get_rule_list_toplevel(strippedexprs, context, false);
appendStringInfoChar(buf, ')');
}
else
{
/* No expressions, so it must be DEFAULT VALUES */
appendStringInfoString(buf, "DEFAULT VALUES");
}
}
break;
case CMD_UPDATE:
appendStringInfo(buf, " UPDATE SET " );
get_update_query_targetlist_def(query, action->targetList,
context, targetRte);
break;
case CMD_DELETE:
appendStringInfo(buf, " DELETE" );
break;
case CMD_NOTHING:
appendStringInfo(buf, " DO NOTHING " );
break;
default:
elog(ERROR, "unknown action in MERGE WHEN clause");
}
}
ereport(DEBUG1, (errmsg("<Deparsed MERGE query: %s>", buf->data)));
}
/* ----------
* get_utility_query_def - Parse back a UTILITY parsetree
* ----------
@ -8761,4 +8877,54 @@ getOwnedSequences_internal(Oid relid, AttrNumber attnum, char deptype)
return result;
}
/*
* get_insert_column_names_list Prepares the insert-column-names list. Any indirection
* decoration needed on the column names can be inferred from the top targetlist.
*/
static List *
get_insert_column_names_list(List *targetList, StringInfo buf,
deparse_context *context, RangeTblEntry *rte)
{
char *sep;
ListCell *l;
List *strippedexprs;
strippedexprs = NIL;
sep = "";
appendStringInfoChar(buf, '(');
foreach(l, targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(l);
if (tle->resjunk)
continue; /* ignore junk entries */
appendStringInfoString(buf, sep);
sep = ", ";
/*
* Put out name of target column; look in the catalogs, not at
* tle->resname, since resname will fail to track RENAME.
*/
appendStringInfoString(buf,
quote_identifier(get_attname(rte->relid,
tle->resno,
false)));
/*
* Print any indirection needed (subfields or subscripts), and strip
* off the top-level nodes representing the indirection assignments.
* Add the stripped expressions to strippedexprs. (If it's a
* single-VALUES statement, the stripped expressions are the VALUES to
* print below. Otherwise they're just Vars and not really
* interesting.)
*/
strippedexprs = lappend(strippedexprs,
processIndirection((Node *) tle->expr,
context));
}
appendStringInfoString(buf, ") ");
return strippedexprs;
}
#endif /* (PG_VERSION_NUM >= PG_VERSION_15) && (PG_VERSION_NUM < PG_VERSION_16) */

View File

@ -78,7 +78,7 @@ RebuildQueryStrings(Job *workerJob)
query = copyObject(originalQuery);
}
if (UpdateOrDeleteQuery(query))
if (UpdateOrDeleteOrMergeQuery(query))
{
List *relationShardList = task->relationShardList;

View File

@ -25,6 +25,7 @@
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/cte_inline.h"
#include "distributed/function_call_delegation.h"
@ -74,7 +75,8 @@ static uint64 NextPlanId = 1;
/* keep track of planner call stack levels */
int PlannerLevel = 0;
static void ErrorIfQueryHasMergeCommand(Query *queryTree);
static void ErrorIfQueryHasUnsupportedMergeCommand(Query *queryTree,
List *rangeTableList);
static bool ContainsMergeCommandWalker(Node *node);
static bool ListContainsDistributedTableRTE(List *rangeTableList,
bool *maybeHasForeignDistributedTable);
@ -130,7 +132,7 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
static List * TranslatedVars(PlannerInfo *root, int relationIndex);
static void WarnIfListHasForeignDistributedTable(List *rangeTableList);
static void ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList);
/* Distributed planner hook */
PlannedStmt *
@ -202,7 +204,7 @@ distributed_planner(Query *parse,
* Fast path queries cannot have merge command, and we
* prevent the remaining here.
*/
ErrorIfQueryHasMergeCommand(parse);
ErrorIfQueryHasUnsupportedMergeCommand(parse, rangeTableList);
/*
* When there are partitioned tables (not applicable to fast path),
@ -303,11 +305,13 @@ distributed_planner(Query *parse,
/*
* ErrorIfQueryHasMergeCommand walks over the query tree and throws error
* if there are any Merge command (e.g., CMD_MERGE) in the query tree.
* ErrorIfQueryHasUnsupportedMergeCommand walks over the query tree and bails out
* if there is no Merge command (e.g., CMD_MERGE) in the query tree. For merge,
* looks for all supported combinations, throws an exception if any violations
* are seen.
*/
static void
ErrorIfQueryHasMergeCommand(Query *queryTree)
ErrorIfQueryHasUnsupportedMergeCommand(Query *queryTree, List *rangeTableList)
{
/*
* Postgres currently doesn't support Merge queries inside subqueries and
@ -316,11 +320,20 @@ ErrorIfQueryHasMergeCommand(Query *queryTree)
* We do not call this path for fast-path queries to avoid this additional
* overhead.
*/
if (ContainsMergeCommandWalker((Node *) queryTree))
if (!ContainsMergeCommandWalker((Node *) queryTree))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE command is not supported on Citus tables yet")));
/* No MERGE found */
return;
}
/*
* In Citus we have limited support for MERGE, it's allowed
* only if all the tables(target, source or any CTE) tables
* are are local i.e. a combination of Citus local and Non-Citus
* tables (regular Postgres tables).
*/
ErrorIfMergeHasUnsupportedTables(queryTree, rangeTableList);
}
@ -331,7 +344,10 @@ ErrorIfQueryHasMergeCommand(Query *queryTree)
static bool
ContainsMergeCommandWalker(Node *node)
{
#if PG_VERSION_NUM >= PG_VERSION_15
#if PG_VERSION_NUM < PG_VERSION_15
return false;
#endif
if (node == NULL)
{
return false;
@ -340,7 +356,7 @@ ContainsMergeCommandWalker(Node *node)
if (IsA(node, Query))
{
Query *query = (Query *) node;
if (query->commandType == CMD_MERGE)
if (IsMergeQuery(query))
{
return true;
}
@ -349,7 +365,6 @@ ContainsMergeCommandWalker(Node *node)
}
return expression_tree_walker(node, ContainsMergeCommandWalker, NULL);
#endif
return false;
}
@ -628,7 +643,7 @@ IsModifyCommand(Query *query)
CmdType commandType = query->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
commandType == CMD_DELETE || commandType == CMD_MERGE)
{
return true;
}
@ -2592,3 +2607,148 @@ WarnIfListHasForeignDistributedTable(List *rangeTableList)
}
}
}
/*
* IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is
* permitted on special relations, such as materialized view, returns true only if
* it's a "source" relation.
*/
bool
IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte)
{
if (!IsMergeQuery(parse))
{
return false;
}
RangeTblEntry *targetRte = rt_fetch(parse->resultRelation, parse->rtable);
/* Is it a target relation? */
if (targetRte->relid == rte->relid)
{
return false;
}
return true;
}
/*
* ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE
* present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus
* tables (regular Postgres tables), raises an exception for all other combinations.
*/
static void
ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList)
{
ListCell *tableCell = NULL;
foreach(tableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(tableCell);
Oid relationId = rangeTableEntry->relid;
switch (rangeTableEntry->rtekind)
{
case RTE_RELATION:
{
/* Check the relation type */
break;
}
case RTE_SUBQUERY:
case RTE_FUNCTION:
case RTE_TABLEFUNC:
case RTE_VALUES:
case RTE_JOIN:
case RTE_CTE:
{
/* Skip them as base table(s) will be checked */
continue;
}
/*
* RTE_NAMEDTUPLESTORE is typically used in ephmeral named relations,
* such as, trigger data; until we find a genuine use case, raise an
* exception.
* RTE_RESULT is a node added by the planner and we shouldn't
* encounter it in the parse tree.
*/
case RTE_NAMEDTUPLESTORE:
case RTE_RESULT:
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE command is not supported with "
"Tuplestores and results")));
break;
}
default:
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE command: Unrecognized range table entry.")));
}
}
/* RTE Relation can be of various types, check them now */
/* skip the regular views as they are replaced with subqueries */
if (rangeTableEntry->relkind == RELKIND_VIEW)
{
continue;
}
if (rangeTableEntry->relkind == RELKIND_MATVIEW ||
rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE)
{
/* Materialized view or Foreign table as target is not allowed */
if (IsMergeAllowedOnRelation(parse, rangeTableEntry))
{
/* Non target relation is ok */
continue;
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE command is not allowed "
"on materialized view")));
}
}
if (rangeTableEntry->relkind != RELKIND_RELATION &&
rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Unexpected relation type(relkind:%c) in MERGE command",
rangeTableEntry->relkind)));
}
Assert(rangeTableEntry->relid != 0);
/* Distributed tables and Reference tables are not supported yet */
if (IsCitusTableType(relationId, REFERENCE_TABLE) ||
IsCitusTableType(relationId, DISTRIBUTED_TABLE))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE command is not supported on "
"distributed/reference tables yet")));
}
/* Regular Postgres tables and Citus local tables are allowed */
if (!IsCitusTable(relationId) ||
IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
continue;
}
/* Any other Citus table type missing ? */
}
/* All the tables are local, supported */
}

View File

@ -162,16 +162,11 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
return false;
}
#if PG_VERSION_NUM >= PG_VERSION_15
if (query->commandType == CMD_MERGE)
if (IsMergeQuery(query))
{
/*
* Citus doesn't support MERGE command, lets return
* early and explicitly for fast-path queries.
*/
/* MERGE command is not a fast path query */
return false;
}
#endif
/*
* We want to deal with only very simple queries. Some of the

View File

@ -2464,7 +2464,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
* If it is a modify query with sub-select, we need to set result relation shard's id
* as anchor shard id.
*/
if (UpdateOrDeleteQuery(originalQuery))
if (UpdateOrDeleteOrMergeQuery(originalQuery))
{
resultRangeTable = rt_fetch(originalQuery->resultRelation, originalQuery->rtable);
resultRelationOid = resultRangeTable->relid;
@ -2493,7 +2493,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
anchorShardId = shardInterval->shardId;
}
}
else if (UpdateOrDeleteQuery(originalQuery))
else if (UpdateOrDeleteOrMergeQuery(originalQuery))
{
shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex];
if (!modifyWithSubselect || relationId == resultRelationOid)

View File

@ -127,9 +127,10 @@ static DeferredErrorMessage * ModifyPartialQuerySupported(Query *queryTree, bool
multiShardQuery,
Oid *distributedTableId);
static bool NodeIsFieldStore(Node *node);
static DeferredErrorMessage * MultiShardUpdateDeleteSupported(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
static DeferredErrorMessage * MultiShardUpdateDeleteMergeSupported(Query *originalQuery,
PlannerRestrictionContext
*
plannerRestrictionContext);
static DeferredErrorMessage * SingleShardUpdateDeleteSupported(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
@ -233,7 +234,7 @@ CreateModifyPlan(Query *originalQuery, Query *query,
return distributedPlan;
}
if (UpdateOrDeleteQuery(query))
if (UpdateOrDeleteOrMergeQuery(query))
{
job = RouterJob(originalQuery, plannerRestrictionContext,
&distributedPlan->planningError);
@ -539,7 +540,7 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
if (queryTree->hasSubLinks == true)
{
/* we support subqueries for INSERTs only via INSERT INTO ... SELECT */
if (!UpdateOrDeleteQuery(queryTree))
if (!UpdateOrDeleteOrMergeQuery(queryTree))
{
Assert(queryTree->commandType == CMD_INSERT);
@ -954,9 +955,17 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
}
else if (rangeTableEntry->relkind == RELKIND_MATVIEW)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"materialized views in modify queries are not supported",
NULL, NULL);
if (IsMergeAllowedOnRelation(originalQuery, rangeTableEntry))
{
continue;
}
else
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"materialized views in "
"modify queries are not supported",
NULL, NULL);
}
}
/* for other kinds of relations, check if its distributed */
else
@ -995,10 +1004,10 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
char *rangeTableEntryErrorDetail = NULL;
/*
* We support UPDATE and DELETE with subqueries and joins unless
* We support UPDATE, DELETE and MERGE with subqueries and joins unless
* they are multi shard queries.
*/
if (UpdateOrDeleteQuery(queryTree))
if (UpdateOrDeleteOrMergeQuery(queryTree))
{
continue;
}
@ -1059,8 +1068,9 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
if (multiShardQuery)
{
errorMessage = MultiShardUpdateDeleteSupported(originalQuery,
plannerRestrictionContext);
errorMessage = MultiShardUpdateDeleteMergeSupported(
originalQuery,
plannerRestrictionContext);
}
else
{
@ -1239,12 +1249,12 @@ ErrorIfOnConflictNotSupported(Query *queryTree)
/*
* MultiShardUpdateDeleteSupported returns the error message if the update/delete is
* MultiShardUpdateDeleteMergeSupported returns the error message if the update/delete is
* not pushdownable, otherwise it returns NULL.
*/
static DeferredErrorMessage *
MultiShardUpdateDeleteSupported(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext)
MultiShardUpdateDeleteMergeSupported(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
DeferredErrorMessage *errorMessage = NULL;
RangeTblEntry *resultRangeTable = ExtractResultRelationRTE(originalQuery);
@ -1382,14 +1392,25 @@ HasDangerousJoinUsing(List *rtableList, Node *joinTreeNode)
/*
* UpdateOrDeleteQuery checks if the given query is an UPDATE or DELETE command.
* If it is, it returns true otherwise it returns false.
* UpdateOrDeleteOrMergeQuery checks if the given query is an UPDATE or DELETE or
* MERGE command. If it is, it returns true otherwise it returns false.
*/
bool
UpdateOrDeleteQuery(Query *query)
UpdateOrDeleteOrMergeQuery(Query *query)
{
return query->commandType == CMD_UPDATE ||
query->commandType == CMD_DELETE;
return (query->commandType == CMD_UPDATE ||
query->commandType == CMD_DELETE ||
query->commandType == CMD_MERGE);
}
/*
* IsMergeQuery checks if the given query is a MERGE SQL command.
*/
bool
IsMergeQuery(Query *query)
{
return (query->commandType == CMD_MERGE);
}
@ -1827,7 +1848,19 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
if (*planningError)
{
return NULL;
/*
* For MERGE, we do _not_ plan anything other than Router job, let's
* not continue further down the lane in distributed planning, simply
* bail out.
*/
if (IsMergeQuery(originalQuery))
{
RaiseDeferredError(*planningError, ERROR);
}
else
{
return NULL;
}
}
Job *job = CreateJob(originalQuery);
@ -1835,7 +1868,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
if (originalQuery->resultRelation > 0)
{
RangeTblEntry *updateOrDeleteRTE = ExtractResultRelationRTE(originalQuery);
RangeTblEntry *updateOrDeleteOrMergeRTE = ExtractResultRelationRTE(originalQuery);
/*
* If all of the shards are pruned, we replace the relation RTE into
@ -1844,7 +1877,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
* DELETE RTE with subquery type, we just set task list to empty and return
* the job.
*/
if (updateOrDeleteRTE->rtekind == RTE_SUBQUERY)
if (updateOrDeleteOrMergeRTE->rtekind == RTE_SUBQUERY)
{
job->taskList = NIL;
return job;
@ -2250,7 +2283,7 @@ PlanRouterQuery(Query *originalQuery,
* We defer error here, later the planner is forced to use a generic plan
* by assigning arbitrarily high cost to the plan.
*/
if (UpdateOrDeleteQuery(originalQuery) && isMultiShardQuery)
if (UpdateOrDeleteOrMergeQuery(originalQuery) && isMultiShardQuery)
{
planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"Router planner cannot handle multi-shard "
@ -2289,7 +2322,7 @@ PlanRouterQuery(Query *originalQuery,
NULL, NULL);
}
Assert(UpdateOrDeleteQuery(originalQuery));
Assert(UpdateOrDeleteOrMergeQuery(originalQuery));
planningError = ModifyQuerySupported(originalQuery, originalQuery,
isMultiShardQuery,
plannerRestrictionContext);
@ -2361,7 +2394,7 @@ PlanRouterQuery(Query *originalQuery,
* If this is an UPDATE or DELETE query which requires coordinator evaluation,
* don't try update shard names, and postpone that to execution phase.
*/
bool isUpdateOrDelete = UpdateOrDeleteQuery(originalQuery);
bool isUpdateOrDelete = UpdateOrDeleteOrMergeQuery(originalQuery);
if (!(isUpdateOrDelete && RequiresCoordinatorEvaluation(originalQuery)))
{
UpdateRelationToShardNames((Node *) originalQuery, *relationShardList);

View File

@ -29,6 +29,11 @@
#define CURSOR_OPT_FORCE_DISTRIBUTED 0x080000
/* Hack to compile Citus on pre-MERGE Postgres versions */
#if PG_VERSION_NUM < PG_VERSION_15
#define CMD_MERGE CMD_UNKNOWN
#endif
/* level of planner calls */
extern int PlannerLevel;
@ -248,4 +253,6 @@ extern struct DistributedPlan * CreateDistributedPlan(uint64 planId, Query *orig
PlannerRestrictionContext *
plannerRestrictionContext);
extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte);
#endif /* DISTRIBUTED_PLANNER_H */

View File

@ -76,7 +76,8 @@ extern RangeTblEntry * ExtractResultRelationRTEOrError(Query *query);
extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query);
extern bool IsMultiRowInsert(Query *query);
extern void AddPartitionKeyNotNullFilterToSelect(Query *subqery);
extern bool UpdateOrDeleteQuery(Query *query);
extern bool UpdateOrDeleteOrMergeQuery(Query *query);
extern bool IsMergeQuery(Query *query);
extern uint64 GetAnchorShardId(List *relationShardList);
extern List * TargetShardIntervalForFastPathQuery(Query *query,

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,6 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q

View File

@ -254,7 +254,7 @@ NOTICE: renaming the new table to pg15.generated_stored_ref
--
-- In PG15, there is a new command called MERGE
-- It is currently not supported for Citus tables
-- It is currently not supported for Citus non-local tables
-- Test the behavior with various commands with Citus table types
-- Relevant PG Commit: 7103ebb7aae8ab8076b7e85f335ceb8fe799097c
--
@ -287,7 +287,6 @@ SELECT citus_add_local_table_to_metadata('tbl1');
MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE;
ERROR: MERGE command is not supported on Citus tables yet
SELECT undistribute_table('tbl1');
NOTICE: creating a new table for pg15.tbl1
NOTICE: moving the data of pg15.tbl1
@ -307,7 +306,6 @@ SELECT citus_add_local_table_to_metadata('tbl2');
MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE;
ERROR: MERGE command is not supported on Citus tables yet
-- one table is reference, the other local, not supported
SELECT create_reference_table('tbl2');
create_reference_table
@ -317,7 +315,7 @@ SELECT create_reference_table('tbl2');
MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE;
ERROR: MERGE command is not supported on Citus tables yet
ERROR: MERGE command is not supported on distributed/reference tables yet
-- now, both are reference, still not supported
SELECT create_reference_table('tbl1');
create_reference_table
@ -327,7 +325,7 @@ SELECT create_reference_table('tbl1');
MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE;
ERROR: MERGE command is not supported on Citus tables yet
ERROR: MERGE command is not supported on distributed/reference tables yet
-- now, both distributed, not works
SELECT undistribute_table('tbl1');
NOTICE: creating a new table for pg15.tbl1
@ -421,14 +419,14 @@ SELECT create_distributed_table('tbl2', 'x');
MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE;
ERROR: MERGE command is not supported on Citus tables yet
ERROR: MERGE command is not supported on distributed/reference tables yet
-- also, not inside subqueries & ctes
WITH targq AS (
SELECT * FROM tbl2
)
MERGE INTO tbl1 USING targq ON (true)
WHEN MATCHED THEN DELETE;
ERROR: MERGE command is not supported on Citus tables yet
ERROR: MERGE command is not supported on distributed/reference tables yet
-- crashes on beta3, fixed on 15 stable
--WITH foo AS (
-- MERGE INTO tbl1 USING tbl2 ON (true)
@ -443,7 +441,7 @@ USING tbl2
ON (true)
WHEN MATCHED THEN
UPDATE SET x = (SELECT count(*) FROM tbl2);
ERROR: MERGE command is not supported on Citus tables yet
ERROR: MERGE command is not supported on distributed/reference tables yet
-- test numeric types with negative scale
CREATE TABLE numeric_negative_scale(numeric_column numeric(3,-1), orig_value int);
INSERT into numeric_negative_scale SELECT x,x FROM generate_series(111, 115) x;
@ -1490,3 +1488,5 @@ SELECT run_command_on_workers($$DROP ACCESS METHOD heap2$$);
\set VERBOSITY terse
SET client_min_messages TO ERROR;
DROP SCHEMA pg15 CASCADE;
DROP ROLE rls_tenant_1;
DROP ROLE rls_tenant_2;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,6 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q

View File

@ -104,6 +104,10 @@ test: background_task_queue_monitor
# Causal clock test
test: clock
# MERGE tests
test: merge
test: pgmerge
# ---------
# test that no tests leaked intermediate results. This should always be last
# ---------

File diff suppressed because it is too large Load Diff

View File

@ -160,7 +160,7 @@ SELECT undistribute_table('generated_stored_ref');
--
-- In PG15, there is a new command called MERGE
-- It is currently not supported for Citus tables
-- It is currently not supported for Citus non-local tables
-- Test the behavior with various commands with Citus table types
-- Relevant PG Commit: 7103ebb7aae8ab8076b7e85f335ceb8fe799097c
--
@ -943,3 +943,5 @@ SELECT run_command_on_workers($$DROP ACCESS METHOD heap2$$);
\set VERBOSITY terse
SET client_min_messages TO ERROR;
DROP SCHEMA pg15 CASCADE;
DROP ROLE rls_tenant_1;
DROP ROLE rls_tenant_2;

File diff suppressed because it is too large Load Diff