From 9bab819f26361c272427f05b2b396a3e756a5a34 Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Fri, 24 Mar 2023 14:38:36 -0700 Subject: [PATCH] Disentangle MERGE planning code from the modify-planning code path --- .../distributed/planner/distributed_planner.c | 15 +----- .../distributed/planner/merge_planner.c | 54 +++++++++++++------ .../planner/multi_router_planner.c | 49 ++++++++--------- .../distributed/multi_router_planner.h | 3 ++ src/test/regress/expected/merge.out | 11 ++++ src/test/regress/expected/pgmerge.out | 2 + src/test/regress/sql/merge.sql | 5 ++ 7 files changed, 82 insertions(+), 57 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index eb9e21786..1fcc45585 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -89,7 +89,6 @@ int PlannerLevel = 0; static bool ListContainsDistributedTableRTE(List *rangeTableList, bool *maybeHasForeignDistributedTable); -static bool IsUpdateOrDeleteOrMerge(Query *query); static PlannedStmt * CreateDistributedPlannedStmt( DistributedPlanningContext *planContext); static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, @@ -609,18 +608,6 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan) } -/* - * IsUpdateOrDelete returns true if the query performs an update or delete. - */ -bool -IsUpdateOrDeleteOrMerge(Query *query) -{ - return query->commandType == CMD_UPDATE || - query->commandType == CMD_DELETE || - query->commandType == CMD_MERGE; -} - - /* * PlanFastPathDistributedStmt creates a distributed planned statement using * the FastPathPlanner. @@ -791,7 +778,7 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) * if it is planned as a multi shard modify query. */ if ((distributedPlan->planningError || - (IsUpdateOrDeleteOrMerge(planContext->originalQuery) && IsMultiTaskPlan( + (UpdateOrDeleteOrMergeQuery(planContext->originalQuery) && IsMultiTaskPlan( distributedPlan))) && hasUnresolvedParams) { diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 46a2484bd..c67095624 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -63,11 +63,43 @@ DistributedPlan * CreateMergePlan(Query *originalQuery, Query *query, PlannerRestrictionContext *plannerRestrictionContext) { - /* - * For now, this is a place holder until we isolate the merge - * planning into it's own code-path. - */ - return CreateModifyPlan(originalQuery, query, plannerRestrictionContext); + DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); + bool multiShardQuery = false; + + Assert(originalQuery->commandType == CMD_MERGE); + + distributedPlan->modLevel = RowModifyLevelForQuery(query); + + distributedPlan->planningError = MergeQuerySupported(originalQuery, + multiShardQuery, + plannerRestrictionContext); + + if (distributedPlan->planningError != NULL) + { + return distributedPlan; + } + + Job *job = RouterJob(originalQuery, plannerRestrictionContext, + &distributedPlan->planningError); + + if (distributedPlan->planningError != NULL) + { + return distributedPlan; + } + + ereport(DEBUG1, (errmsg("Creating MERGE router plan"))); + + distributedPlan->workerJob = job; + distributedPlan->combineQuery = NULL; + + /* MERGE doesn't support RETURNING clause */ + distributedPlan->expectResults = false; + distributedPlan->targetRelationId = ResultRelationOidForQuery(query); + + distributedPlan->fastPathRouterPlan = + plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; + + return distributedPlan; } @@ -89,12 +121,6 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery, #else - /* For non-MERGE commands it's a no-op */ - if (!IsMergeQuery(originalQuery)) - { - return NULL; - } - /* * TODO: For now, we are adding an exception where any volatile or stable * functions are not allowed in the MERGE query, but this will become too @@ -596,12 +622,6 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, FromExpr *joinTre { TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - /* skip resjunk entries: UPDATE adds some for ctid, etc. */ - if (targetEntry->resjunk) - { - continue; - } - bool targetEntryDistributionColumn = false; AttrNumber targetColumnAttrNumber = InvalidAttrNumber; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 407aeaf65..d216bb8b4 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -140,9 +140,6 @@ static void ErrorIfNoShardsExist(CitusTableCacheEntry *cacheEntry); static DeferredErrorMessage * DeferErrorIfModifyView(Query *queryTree); static Job * CreateJob(Query *query); static Task * CreateTask(TaskType taskType); -static Job * RouterJob(Query *originalQuery, - PlannerRestrictionContext *plannerRestrictionContext, - DeferredErrorMessage **planningError); static bool RelationPrunesToMultipleShards(List *relationShardList); static void NormalizeMultiRowInsertTargetList(Query *query); static void AppendNextDummyColReference(Alias *expendedReferenceNames); @@ -910,14 +907,10 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer PlannerRestrictionContext *plannerRestrictionContext) { Oid distributedTableId = InvalidOid; - DeferredErrorMessage *error = MergeQuerySupported(originalQuery, multiShardQuery, - plannerRestrictionContext); - if (error) - { - return error; - } - error = ModifyPartialQuerySupported(queryTree, multiShardQuery, &distributedTableId); + DeferredErrorMessage *error = + ModifyPartialQuerySupported(queryTree, multiShardQuery, + &distributedTableId); if (error) { return error; @@ -982,17 +975,10 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer } else if (rangeTableEntry->relkind == RELKIND_MATVIEW) { - if (IsMergeAllowedOnRelation(originalQuery, rangeTableEntry)) - { - continue; - } - else - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "materialized views in " - "modify queries are not supported", - NULL, NULL); - } + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "materialized views in " + "modify queries are not supported", + NULL, NULL); } /* for other kinds of relations, check if it's distributed */ else @@ -1087,7 +1073,7 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer } } - if (commandType != CMD_INSERT && commandType != CMD_MERGE) + if (commandType != CMD_INSERT) { DeferredErrorMessage *errorMessage = NULL; @@ -1825,7 +1811,7 @@ ExtractFirstCitusTableId(Query *query) * RouterJob builds a Job to represent a single shard select/update/delete and * multiple shard update/delete queries. */ -static Job * +Job * RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionContext, DeferredErrorMessage **planningError) { @@ -2349,9 +2335,20 @@ PlanRouterQuery(Query *originalQuery, } Assert(UpdateOrDeleteOrMergeQuery(originalQuery)); - planningError = ModifyQuerySupported(originalQuery, originalQuery, - isMultiShardQuery, - plannerRestrictionContext); + + if (IsMergeQuery(originalQuery)) + { + planningError = MergeQuerySupported(originalQuery, + isMultiShardQuery, + plannerRestrictionContext); + } + else + { + planningError = ModifyQuerySupported(originalQuery, originalQuery, + isMultiShardQuery, + plannerRestrictionContext); + } + if (planningError != NULL) { return planningError; diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 698a0fd60..89134415b 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -112,5 +112,8 @@ extern bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, extern bool MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce); extern bool HasDangerousJoinUsing(List *rtableList, Node *jtnode); +extern Job * RouterJob(Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext, + DeferredErrorMessage **planningError); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index e2b3aea65..412667037 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -1072,6 +1072,7 @@ WHEN MATCHED THEN UPDATE SET value = vl_source.value, id = vl_target.id + 1 WHEN NOT MATCHED THEN INSERT VALUES(vl_source.ID, vl_source.value); +DEBUG: Creating MERGE router plan DEBUG: RESET client_min_messages; SELECT * INTO vl_local FROM vl_target ORDER BY 1 ; @@ -1125,6 +1126,7 @@ WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(rs_source.id); +DEBUG: Creating MERGE router plan DEBUG: RESET client_min_messages; SELECT * INTO rs_local FROM rs_target ORDER BY 1 ; @@ -1255,6 +1257,7 @@ WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT VALUES(fn_source.id, fn_source.source); +DEBUG: Creating MERGE router plan DEBUG: RESET client_min_messages; SELECT * INTO fn_local FROM fn_target ORDER BY 1 ; @@ -1327,6 +1330,7 @@ MERGE INTO ft_target DELETE WHEN NOT MATCHED THEN INSERT (id, user_val) VALUES (foreign_table.id, foreign_table.user_val); +DEBUG: Creating MERGE router plan DEBUG: RESET client_min_messages; SELECT * FROM ft_target; @@ -1557,7 +1561,9 @@ DEBUG: DEBUG: DEBUG: +DEBUG: Creating MERGE router plan DEBUG: +DEBUG: Creating MERGE router plan NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING @@ -2414,6 +2420,11 @@ SELECT * FROM target_set ORDER BY 1, 2; -- Error and Unsupported scenarios -- MERGE INTO target_set +USING source_set AS foo ON target_set.t1 = foo.s1 +WHEN MATCHED THEN + UPDATE SET ctid = '(0,100)'; +ERROR: cannot assign to system column "ctid" +MERGE INTO target_set USING (SELECT s1,s2 FROM source_set UNION SELECT s2,s1 FROM source_set) AS foo ON target_set.t1 = foo.s1 WHEN MATCHED THEN UPDATE SET t2 = t2 + 1; diff --git a/src/test/regress/expected/pgmerge.out b/src/test/regress/expected/pgmerge.out index 8a74336a0..6bdb7f771 100644 --- a/src/test/regress/expected/pgmerge.out +++ b/src/test/regress/expected/pgmerge.out @@ -1900,6 +1900,7 @@ MERGE INTO pa_target t UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES ('2017-01-15', sid, delta, 'inserted by merge'); +DEBUG: Creating MERGE router plan DEBUG: --INSERT VALUES (slogts::timestamp, sid, delta, 'inserted by merge'); SELECT * FROM pa_target ORDER BY tid; @@ -2093,6 +2094,7 @@ WHEN MATCHED THEN UPDATE WHEN NOT MATCHED THEN INSERT (city_id, logdate, peaktemp, unitsales) VALUES (city_id, logdate, peaktemp, unitsales); +DEBUG: Creating MERGE router plan DEBUG: RESET client_min_messages; SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate; diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index ded90b69c..d663491ae 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1540,6 +1540,11 @@ SELECT * FROM target_set ORDER BY 1, 2; -- Error and Unsupported scenarios -- +MERGE INTO target_set +USING source_set AS foo ON target_set.t1 = foo.s1 +WHEN MATCHED THEN + UPDATE SET ctid = '(0,100)'; + MERGE INTO target_set USING (SELECT s1,s2 FROM source_set UNION SELECT s2,s1 FROM source_set) AS foo ON target_set.t1 = foo.s1 WHEN MATCHED THEN