Implement review comments

pull/7781/head
Teja Mupparti 2024-12-12 21:03:27 -08:00 committed by naisila
parent fd1b475542
commit 0fdfc5e244
9 changed files with 154 additions and 72 deletions

View File

@ -346,9 +346,7 @@ static LocalCopyStatus GetLocalCopyStatus(void);
static bool ShardIntervalListHasLocalPlacements(List *shardIntervalList); static bool ShardIntervalListHasLocalPlacements(List *shardIntervalList);
static void LogLocalCopyToRelationExecution(uint64 shardId); static void LogLocalCopyToRelationExecution(uint64 shardId);
static void LogLocalCopyToFileExecution(uint64 shardId); static void LogLocalCopyToFileExecution(uint64 shardId);
#if PG_VERSION_NUM >= PG_VERSION_15
static void ErrorIfMergeInCopy(CopyStmt *copyStatement); static void ErrorIfMergeInCopy(CopyStmt *copyStatement);
#endif
/* exports for SQL callable functions */ /* exports for SQL callable functions */
@ -2831,23 +2829,22 @@ CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName)
} }
#if PG_VERSION_NUM >= PG_VERSION_15
/* /*
* ErrorIfMergeInCopy Raises an exception if the MERGE is called in the COPY. * ErrorIfMergeInCopy Raises an exception if the MERGE is called in the COPY.
*/ */
static void static void
ErrorIfMergeInCopy(CopyStmt *copyStatement) ErrorIfMergeInCopy(CopyStmt *copyStatement)
{ {
#if PG_VERSION_NUM < 150000
return;
#else
if (!copyStatement->relation && (IsA(copyStatement->query, MergeStmt))) if (!copyStatement->relation && (IsA(copyStatement->query, MergeStmt)))
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE not supported in COPY"))); errmsg("MERGE not supported in COPY")));
} }
}
#endif #endif
}
/* /*
@ -2860,9 +2857,7 @@ Node *
ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletion *completionTag, const ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletion *completionTag, const
char *queryString) char *queryString)
{ {
#if PG_VERSION_NUM >= PG_VERSION_15
ErrorIfMergeInCopy(copyStatement); ErrorIfMergeInCopy(copyStatement);
#endif
/* /*
* Handle special COPY "resultid" FROM STDIN WITH (format result) commands * Handle special COPY "resultid" FROM STDIN WITH (format result) commands

View File

@ -98,6 +98,7 @@ static DistributedPlan * CreateNonPushableMergePlan(Oid targetRelationId, uint64
ParamListInfo boundParams); ParamListInfo boundParams);
static char * MergeCommandResultIdPrefix(uint64 planId); static char * MergeCommandResultIdPrefix(uint64 planId);
static void ErrorIfMergeHasReturningList(Query *query); static void ErrorIfMergeHasReturningList(Query *query);
static Node * GetMergeJoinCondition(Query *mergeQuery);
#endif #endif
@ -158,33 +159,49 @@ CreateMergePlan(uint64 planId, Query *originalQuery, Query *query,
/* /*
* GetMergeJoinConditionList returns all the Join conditions from the ON clause * GetMergeJoinTree constructs and returns the jointree for a MERGE query.
*/ */
List * FromExpr *
GetMergeJoinConditionList(Query *mergeQuery) GetMergeJoinTree(Query *mergeQuery)
{ {
FromExpr *mergeJointree = NULL;
#if PG_VERSION_NUM >= PG_VERSION_17 #if PG_VERSION_NUM >= PG_VERSION_17
List *mergeJoinConditionList = NIL;
if (IsA(mergeQuery->mergeJoinCondition, List)) /*
{ * In Postgres 17, the query tree has a specific field for the merge condition.
mergeJoinConditionList = (List *) mergeQuery->mergeJoinCondition; * For deriving the WhereClauseList from the merge condition, we construct a dummy
} * jointree with an empty fromlist. This works because the fromlist of a merge query
else * join tree consists of range table references only, and range table references are
{ * disregarded by the WhereClauseList() walker.
Node *joinClause = */
eval_const_expressions(NULL, mergeQuery->mergeJoinCondition); mergeJointree = makeFromExpr(NIL, mergeQuery->mergeJoinCondition);
joinClause = (Node *) canonicalize_qual((Expr *) joinClause, false);
mergeJoinConditionList = make_ands_implicit((Expr *) joinClause);
}
#else #else
List *mergeJoinConditionList = WhereClauseList(mergeQuery->jointree); mergeJointree = mergeQuery->jointree;
#endif #endif
return mergeJoinConditionList;
return mergeJointree;
} }
#if PG_VERSION_NUM >= PG_VERSION_15 #if PG_VERSION_NUM >= PG_VERSION_15
/*
* GetMergeJoinCondition returns the quals of the ON condition
*/
static Node *
GetMergeJoinCondition(Query *mergeQuery)
{
Node *joinCondition = NULL;
#if PG_VERSION_NUM >= PG_VERSION_17
joinCondition = (Node *) mergeQuery->mergeJoinCondition;
#else
joinCondition = (Node *) mergeQuery->jointree->quals;
#endif
return joinCondition;
}
/* /*
* CreateRouterMergePlan attempts to create a pushable plan for the given MERGE * CreateRouterMergePlan attempts to create a pushable plan for the given MERGE
* SQL statement. If the planning fails, the ->planningError is set to a description * SQL statement. If the planning fails, the ->planningError is set to a description
@ -589,6 +606,7 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, Query *query,
List *targetList, CmdType commandType) List *targetList, CmdType commandType)
{ {
uint32 targetRangeTableIndex = query->resultRelation; uint32 targetRangeTableIndex = query->resultRelation;
FromExpr *joinTree = GetMergeJoinTree(query);
Var *distributionColumn = NULL; Var *distributionColumn = NULL;
if (IsCitusTable(resultRelationId) && HasDistributionKey(resultRelationId)) if (IsCitusTable(resultRelationId) && HasDistributionKey(resultRelationId))
{ {
@ -626,8 +644,7 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, Query *query,
} }
if (targetEntryDistributionColumn && if (targetEntryDistributionColumn &&
TargetEntryChangesValue(targetEntry, distributionColumn, (Node *) query, TargetEntryChangesValue(targetEntry, distributionColumn, joinTree))
CMD_MERGE))
{ {
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"updating the distribution column is not " "updating the distribution column is not "
@ -749,11 +766,7 @@ ErrorIfRepartitionMergeNotSupported(Oid targetRelationId, Query *mergeQuery,
/* /*
* Sub-queries and CTEs are not allowed in actions and ON clause * Sub-queries and CTEs are not allowed in actions and ON clause
*/ */
#if PG_VERSION_NUM >= PG_VERSION_17 Node *joinCondition = GetMergeJoinCondition(mergeQuery);
Node *joinCondition = (Node *) mergeQuery->mergeJoinCondition;
#else
Node *joinCondition = (Node *) mergeQuery->jointree->quals;
#endif
if (FindNodeMatchingCheckFunction(joinCondition, IsNodeSubquery)) if (FindNodeMatchingCheckFunction(joinCondition, IsNodeSubquery))
{ {
@ -1255,11 +1268,7 @@ ErrorIfMergeQueryQualAndTargetListNotSupported(Oid targetRelationId, Query *orig
"supported in MERGE sql with distributed tables"))); "supported in MERGE sql with distributed tables")));
} }
#if PG_VERSION_NUM >= PG_VERSION_17 Node *joinCondition = GetMergeJoinCondition(originalQuery);
Node *joinCondition = (Node *) originalQuery->mergeJoinCondition;
#else
Node *joinCondition = (Node *) originalQuery->jointree->quals;
#endif
DeferredErrorMessage *deferredError = DeferredErrorMessage *deferredError =
MergeQualAndTargetListFunctionsSupported( MergeQualAndTargetListFunctionsSupported(
@ -1341,7 +1350,7 @@ static int
SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList, SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
CitusTableCacheEntry *targetRelation) CitusTableCacheEntry *targetRelation)
{ {
List *mergeJoinConditionList = GetMergeJoinConditionList(mergeQuery); List *mergeJoinConditionList = WhereClauseList(GetMergeJoinTree(mergeQuery));
Var *targetColumn = targetRelation->partitionColumn; Var *targetColumn = targetRelation->partitionColumn;
Var *sourceRepartitionVar = NULL; Var *sourceRepartitionVar = NULL;
bool foundTypeMismatch = false; bool foundTypeMismatch = false;

View File

@ -597,9 +597,8 @@ TargetlistAndFunctionsSupported(Oid resultRelationId, FromExpr *joinTree, Node *
NULL, NULL); NULL, NULL);
} }
if (targetEntryPartitionColumn && if (commandType == CMD_UPDATE && targetEntryPartitionColumn &&
TargetEntryChangesValue(targetEntry, partitionColumn, TargetEntryChangesValue(targetEntry, partitionColumn, joinTree))
(Node *) joinTree, commandType))
{ {
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"modifying the partition value of rows is not " "modifying the partition value of rows is not "
@ -1611,15 +1610,8 @@ MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context)
* tree, or the target entry sets a different column. * tree, or the target entry sets a different column.
*/ */
bool bool
TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, Node *node, CmdType TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTree)
commandType)
{ {
if (commandType != CMD_UPDATE && commandType != CMD_MERGE)
{
/* no-op */
return false;
}
bool isColumnValueChanged = true; bool isColumnValueChanged = true;
Expr *setExpr = targetEntry->expr; Expr *setExpr = targetEntry->expr;
@ -1635,6 +1627,7 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, Node *node, CmdTy
else if (IsA(setExpr, Const)) else if (IsA(setExpr, Const))
{ {
Const *newValue = (Const *) setExpr; Const *newValue = (Const *) setExpr;
List *restrictClauseList = WhereClauseList(joinTree);
OpExpr *equalityExpr = MakeOpExpression(column, BTEqualStrategyNumber); OpExpr *equalityExpr = MakeOpExpression(column, BTEqualStrategyNumber);
Node *rightOp = get_rightop((Expr *) equalityExpr); Node *rightOp = get_rightop((Expr *) equalityExpr);
@ -1646,16 +1639,6 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, Node *node, CmdTy
rightConst->constisnull = newValue->constisnull; rightConst->constisnull = newValue->constisnull;
rightConst->constbyval = newValue->constbyval; rightConst->constbyval = newValue->constbyval;
List *restrictClauseList = NIL;
if (commandType == CMD_UPDATE)
{
restrictClauseList = WhereClauseList((FromExpr *) node);
}
else
{
restrictClauseList = GetMergeJoinConditionList((Query *) node);
}
bool predicateIsImplied = predicate_implied_by(list_make1(equalityExpr), bool predicateIsImplied = predicate_implied_by(list_make1(equalityExpr),
restrictClauseList, false); restrictClauseList, false);
if (predicateIsImplied) if (predicateIsImplied)

View File

@ -32,7 +32,7 @@ extern void NonPushableMergeCommandExplainScan(CustomScanState *node, List *ance
struct ExplainState *es); struct ExplainState *es);
extern Var * FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query); extern Var * FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query);
extern RangeTblEntry * ExtractMergeSourceRangeTableEntry(Query *query, bool joinSourceOk); extern RangeTblEntry * ExtractMergeSourceRangeTableEntry(Query *query, bool joinSourceOk);
extern List * GetMergeJoinConditionList(Query *mergeQuery); extern FromExpr * GetMergeJoinTree(Query *mergeQuery);
#endif /* MERGE_PLANNER_H */ #endif /* MERGE_PLANNER_H */

View File

@ -111,7 +111,7 @@ extern DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelation
List *returningList); List *returningList);
extern bool NodeIsFieldStore(Node *node); extern bool NodeIsFieldStore(Node *node);
extern bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, extern bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
Node *node, CmdType commandType); FromExpr *joinTree);
extern bool MasterIrreducibleExpression(Node *expression, bool *varArgument, extern bool MasterIrreducibleExpression(Node *expression, bool *varArgument,
bool *badCoalesce); bool *badCoalesce);
extern bool HasDangerousJoinUsing(List *rtableList, Node *jtnode); extern bool HasDangerousJoinUsing(List *rtableList, Node *jtnode);

View File

@ -4,3 +4,102 @@ SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\if :server_version_ge_15 \if :server_version_ge_15
\else \else
\q \q
\endif
--
-- MERGE test from PG community (adapted to Citus by converting all tables to Citus local)
--
DROP SCHEMA IF EXISTS pgmerge_schema CASCADE;
NOTICE: schema "pgmerge_schema" does not exist, skipping
CREATE SCHEMA pgmerge_schema;
SET search_path TO pgmerge_schema;
SET citus.use_citus_managed_tables to true;
DROP TABLE IF EXISTS target;
NOTICE: table "target" does not exist, skipping
DROP TABLE IF EXISTS source;
NOTICE: table "source" does not exist, skipping
CREATE TABLE target (tid integer, balance integer)
WITH (autovacuum_enabled=off);
CREATE TABLE source (sid integer, delta integer) -- no index
WITH (autovacuum_enabled=off);
SELECT citus_add_local_table_to_metadata('target');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
SELECT citus_add_local_table_to_metadata('source');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
\set SHOW_CONTEXT errors
-- used in a CTE
WITH foo AS (
MERGE INTO target USING source ON (true)
WHEN MATCHED THEN DELETE
) SELECT * FROM foo;
ERROR: MERGE not supported in WITH query
-- used in COPY
COPY (
MERGE INTO target USING source ON (true)
WHEN MATCHED THEN DELETE
) TO stdout;
ERROR: MERGE not supported in COPY
-- used in a CTE with RETURNING
WITH foo AS (
MERGE INTO target USING source ON (true)
WHEN MATCHED THEN DELETE RETURNING target.*
) SELECT * FROM foo;
ERROR: syntax error at or near "RETURNING"
-- used in COPY with RETURNING
COPY (
MERGE INTO target USING source ON (true)
WHEN MATCHED THEN DELETE RETURNING target.*
) TO stdout;
ERROR: syntax error at or near "RETURNING"
-- unsupported relation types
-- view
CREATE VIEW tv AS SELECT count(tid) AS tid FROM target;
MERGE INTO tv t
USING source s
ON t.tid = s.sid
WHEN NOT MATCHED THEN
INSERT DEFAULT VALUES;
ERROR: cannot execute MERGE on relation "tv"
DETAIL: This operation is not supported for views.
DROP VIEW tv;
CREATE TABLE sq_target (tid integer NOT NULL, balance integer)
WITH (autovacuum_enabled=off);
CREATE TABLE sq_source (delta integer, sid integer, balance integer DEFAULT 0)
WITH (autovacuum_enabled=off);
SELECT citus_add_local_table_to_metadata('sq_target');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
SELECT citus_add_local_table_to_metadata('sq_source');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
INSERT INTO sq_target(tid, balance) VALUES (1,100), (2,200), (3,300);
INSERT INTO sq_source(sid, delta) VALUES (1,10), (2,20), (4,40);
CREATE VIEW v AS SELECT * FROM sq_source WHERE sid < 2;
-- RETURNING
BEGIN;
INSERT INTO sq_source (sid, balance, delta) VALUES (-1, -1, -10);
MERGE INTO sq_target t
USING v
ON tid = sid
WHEN MATCHED AND tid > 2 THEN
UPDATE SET balance = t.balance + delta
WHEN NOT MATCHED THEN
INSERT (balance, tid) VALUES (balance + delta, sid)
WHEN MATCHED AND tid < 2 THEN
DELETE
RETURNING *;
ERROR: syntax error at or near "RETURNING"
ROLLBACK;

View File

@ -0,0 +1,6 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q

View File

@ -437,11 +437,6 @@ MERGE INTO tbl1 USING targq ON (true)
WHEN MATCHED THEN DELETE; WHEN MATCHED THEN DELETE;
ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from a non-equi-join. ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from a non-equi-join.
DETAIL: Without a equi-join condition on the target's distribution column, the source rows cannot be efficiently redistributed, and the NOT-MATCHED condition cannot be evaluated unambiguously. This can result in incorrect or unexpected results when attempting to merge tables in a distributed setting DETAIL: Without a equi-join condition on the target's distribution column, the source rows cannot be efficiently redistributed, and the NOT-MATCHED condition cannot be evaluated unambiguously. This can result in incorrect or unexpected results when attempting to merge tables in a distributed setting
WITH foo AS (
MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE
) SELECT * FROM foo;
ERROR: MERGE not supported in WITH query
COPY ( COPY (
MERGE INTO tbl1 USING tbl2 ON (true) MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE WHEN MATCHED THEN DELETE

View File

@ -287,11 +287,6 @@ WITH targq AS (
MERGE INTO tbl1 USING targq ON (true) MERGE INTO tbl1 USING targq ON (true)
WHEN MATCHED THEN DELETE; WHEN MATCHED THEN DELETE;
WITH foo AS (
MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE
) SELECT * FROM foo;
COPY ( COPY (
MERGE INTO tbl1 USING tbl2 ON (true) MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE WHEN MATCHED THEN DELETE