Disentangle MERGE planning code from the modify-planning code path

pull/6793/head
Teja Mupparti 2023-03-24 14:38:36 -07:00 committed by Teja Mupparti
parent 372a93b529
commit 9bab819f26
7 changed files with 82 additions and 57 deletions

View File

@ -89,7 +89,6 @@ int PlannerLevel = 0;
static bool ListContainsDistributedTableRTE(List *rangeTableList,
bool *maybeHasForeignDistributedTable);
static bool IsUpdateOrDeleteOrMerge(Query *query);
static PlannedStmt * CreateDistributedPlannedStmt(
DistributedPlanningContext *planContext);
static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
@ -609,18 +608,6 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan)
}
/*
* IsUpdateOrDelete returns true if the query performs an update or delete.
*/
bool
IsUpdateOrDeleteOrMerge(Query *query)
{
return query->commandType == CMD_UPDATE ||
query->commandType == CMD_DELETE ||
query->commandType == CMD_MERGE;
}
/*
* PlanFastPathDistributedStmt creates a distributed planned statement using
* the FastPathPlanner.
@ -791,7 +778,7 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
* if it is planned as a multi shard modify query.
*/
if ((distributedPlan->planningError ||
(IsUpdateOrDeleteOrMerge(planContext->originalQuery) && IsMultiTaskPlan(
(UpdateOrDeleteOrMergeQuery(planContext->originalQuery) && IsMultiTaskPlan(
distributedPlan))) &&
hasUnresolvedParams)
{

View File

@ -63,11 +63,43 @@ DistributedPlan *
CreateMergePlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *plannerRestrictionContext)
{
/*
* For now, this is a place holder until we isolate the merge
* planning into it's own code-path.
*/
return CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
bool multiShardQuery = false;
Assert(originalQuery->commandType == CMD_MERGE);
distributedPlan->modLevel = RowModifyLevelForQuery(query);
distributedPlan->planningError = MergeQuerySupported(originalQuery,
multiShardQuery,
plannerRestrictionContext);
if (distributedPlan->planningError != NULL)
{
return distributedPlan;
}
Job *job = RouterJob(originalQuery, plannerRestrictionContext,
&distributedPlan->planningError);
if (distributedPlan->planningError != NULL)
{
return distributedPlan;
}
ereport(DEBUG1, (errmsg("Creating MERGE router plan")));
distributedPlan->workerJob = job;
distributedPlan->combineQuery = NULL;
/* MERGE doesn't support RETURNING clause */
distributedPlan->expectResults = false;
distributedPlan->targetRelationId = ResultRelationOidForQuery(query);
distributedPlan->fastPathRouterPlan =
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
return distributedPlan;
}
@ -89,12 +121,6 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery,
#else
/* For non-MERGE commands it's a no-op */
if (!IsMergeQuery(originalQuery))
{
return NULL;
}
/*
* TODO: For now, we are adding an exception where any volatile or stable
* functions are not allowed in the MERGE query, but this will become too
@ -596,12 +622,6 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, FromExpr *joinTre
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
/* skip resjunk entries: UPDATE adds some for ctid, etc. */
if (targetEntry->resjunk)
{
continue;
}
bool targetEntryDistributionColumn = false;
AttrNumber targetColumnAttrNumber = InvalidAttrNumber;

View File

@ -140,9 +140,6 @@ static void ErrorIfNoShardsExist(CitusTableCacheEntry *cacheEntry);
static DeferredErrorMessage * DeferErrorIfModifyView(Query *queryTree);
static Job * CreateJob(Query *query);
static Task * CreateTask(TaskType taskType);
static Job * RouterJob(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext,
DeferredErrorMessage **planningError);
static bool RelationPrunesToMultipleShards(List *relationShardList);
static void NormalizeMultiRowInsertTargetList(Query *query);
static void AppendNextDummyColReference(Alias *expendedReferenceNames);
@ -910,14 +907,10 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
PlannerRestrictionContext *plannerRestrictionContext)
{
Oid distributedTableId = InvalidOid;
DeferredErrorMessage *error = MergeQuerySupported(originalQuery, multiShardQuery,
plannerRestrictionContext);
if (error)
{
return error;
}
error = ModifyPartialQuerySupported(queryTree, multiShardQuery, &distributedTableId);
DeferredErrorMessage *error =
ModifyPartialQuerySupported(queryTree, multiShardQuery,
&distributedTableId);
if (error)
{
return error;
@ -982,17 +975,10 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
}
else if (rangeTableEntry->relkind == RELKIND_MATVIEW)
{
if (IsMergeAllowedOnRelation(originalQuery, rangeTableEntry))
{
continue;
}
else
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"materialized views in "
"modify queries are not supported",
NULL, NULL);
}
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"materialized views in "
"modify queries are not supported",
NULL, NULL);
}
/* for other kinds of relations, check if it's distributed */
else
@ -1087,7 +1073,7 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
}
}
if (commandType != CMD_INSERT && commandType != CMD_MERGE)
if (commandType != CMD_INSERT)
{
DeferredErrorMessage *errorMessage = NULL;
@ -1825,7 +1811,7 @@ ExtractFirstCitusTableId(Query *query)
* RouterJob builds a Job to represent a single shard select/update/delete and
* multiple shard update/delete queries.
*/
static Job *
Job *
RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionContext,
DeferredErrorMessage **planningError)
{
@ -2349,9 +2335,20 @@ PlanRouterQuery(Query *originalQuery,
}
Assert(UpdateOrDeleteOrMergeQuery(originalQuery));
planningError = ModifyQuerySupported(originalQuery, originalQuery,
isMultiShardQuery,
plannerRestrictionContext);
if (IsMergeQuery(originalQuery))
{
planningError = MergeQuerySupported(originalQuery,
isMultiShardQuery,
plannerRestrictionContext);
}
else
{
planningError = ModifyQuerySupported(originalQuery, originalQuery,
isMultiShardQuery,
plannerRestrictionContext);
}
if (planningError != NULL)
{
return planningError;

View File

@ -112,5 +112,8 @@ extern bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
extern bool MasterIrreducibleExpression(Node *expression, bool *varArgument,
bool *badCoalesce);
extern bool HasDangerousJoinUsing(List *rtableList, Node *jtnode);
extern Job * RouterJob(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext,
DeferredErrorMessage **planningError);
#endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -1072,6 +1072,7 @@ WHEN MATCHED THEN
UPDATE SET value = vl_source.value, id = vl_target.id + 1
WHEN NOT MATCHED THEN
INSERT VALUES(vl_source.ID, vl_source.value);
DEBUG: Creating MERGE router plan
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.vl_target_xxxxxxx vl_target USING (SELECT vl.id, vl.value FROM (VALUES (100,'source1'::text), (200,'source2'::text)) vl(id, value)) vl_source ON (vl_source.id OPERATOR(pg_catalog.=) vl_target.id) WHEN MATCHED THEN UPDATE SET id = (vl_target.id OPERATOR(pg_catalog.+) 1), value = (vl_source.value COLLATE "default") WHEN NOT MATCHED THEN INSERT (id, value) VALUES (vl_source.id, (vl_source.value COLLATE "default"))>
RESET client_min_messages;
SELECT * INTO vl_local FROM vl_target ORDER BY 1 ;
@ -1125,6 +1126,7 @@ WHEN MATCHED THEN
DO NOTHING
WHEN NOT MATCHED THEN
INSERT VALUES(rs_source.id);
DEBUG: Creating MERGE router plan
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.rs_target_xxxxxxx rs_target USING (SELECT id.id FROM merge_schema.f_immutable(99) id(id) WHERE (id.id OPERATOR(pg_catalog.=) ANY (SELECT 99))) rs_source ON (rs_source.id OPERATOR(pg_catalog.=) rs_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id) VALUES (rs_source.id)>
RESET client_min_messages;
SELECT * INTO rs_local FROM rs_target ORDER BY 1 ;
@ -1255,6 +1257,7 @@ WHEN MATCHED THEN
DO NOTHING
WHEN NOT MATCHED THEN
INSERT VALUES(fn_source.id, fn_source.source);
DEBUG: Creating MERGE router plan
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target_xxxxxxx fn_target USING (SELECT dist_table.id, dist_table.source FROM merge_schema.dist_table_xxxxxxx dist_table) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)>
RESET client_min_messages;
SELECT * INTO fn_local FROM fn_target ORDER BY 1 ;
@ -1327,6 +1330,7 @@ MERGE INTO ft_target
DELETE
WHEN NOT MATCHED THEN
INSERT (id, user_val) VALUES (foreign_table.id, foreign_table.user_val);
DEBUG: Creating MERGE router plan
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.ft_target USING merge_schema.foreign_table_xxxxxxx foreign_table ON (foreign_table.id OPERATOR(pg_catalog.=) ft_target.id) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (id, user_val) VALUES (foreign_table.id, (foreign_table.user_val COLLATE "default"))>
RESET client_min_messages;
SELECT * FROM ft_target;
@ -1557,7 +1561,9 @@ DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
DEBUG: Creating MERGE router plan
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING >
DEBUG: Creating MERGE router plan
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing MERGE INTO merge_schema.target_cj_xxxxxxx target USING (SELECT source_cj1.sid1, source_cj1.src1, source_cj1.val1 FROM merge_schema.source_cj1_xxxxxxx source_cj1) sub ON ((target.tid OPERATOR(pg_catalog.=) sub.sid1) AND (target.tid OPERATOR(pg_catalog.=) $1)) WHEN MATCHED THEN UPDATE SET val = sub.val1 WHEN NOT MATCHED THEN DO NOTHING
@ -2414,6 +2420,11 @@ SELECT * FROM target_set ORDER BY 1, 2;
-- Error and Unsupported scenarios
--
MERGE INTO target_set
USING source_set AS foo ON target_set.t1 = foo.s1
WHEN MATCHED THEN
UPDATE SET ctid = '(0,100)';
ERROR: cannot assign to system column "ctid"
MERGE INTO target_set
USING (SELECT s1,s2 FROM source_set UNION SELECT s2,s1 FROM source_set) AS foo ON target_set.t1 = foo.s1
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 1;

View File

@ -1900,6 +1900,7 @@ MERGE INTO pa_target t
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
WHEN NOT MATCHED THEN
INSERT VALUES ('2017-01-15', sid, delta, 'inserted by merge');
DEBUG: Creating MERGE router plan
DEBUG: <Deparsed MERGE query: MERGE INTO pgmerge_schema.pa_target t USING (SELECT pa_source.sid, pa_source.delta FROM pgmerge_schema.pa_source_xxxxxxx pa_source WHERE (pa_source.sid OPERATOR(pg_catalog.<) 10)) s ON (t.tid OPERATOR(pg_catalog.=) s.sid) WHEN MATCHED THEN UPDATE SET balance = (t.balance OPERATOR(pg_catalog.+) s.delta), val = (t.val OPERATOR(pg_catalog.||) ' updated by merge'::text) WHEN NOT MATCHED THEN INSERT (logts, tid, balance, val) VALUES ('Sun Jan 15 00:00:00 2017'::timestamp without time zone, s.sid, s.delta, 'inserted by merge'::text)>
--INSERT VALUES (slogts::timestamp, sid, delta, 'inserted by merge');
SELECT * FROM pa_target ORDER BY tid;
@ -2093,6 +2094,7 @@ WHEN MATCHED THEN UPDATE
WHEN NOT MATCHED THEN INSERT
(city_id, logdate, peaktemp, unitsales)
VALUES (city_id, logdate, peaktemp, unitsales);
DEBUG: Creating MERGE router plan
DEBUG: <Deparsed MERGE query: MERGE INTO pgmerge_schema.measurement m USING pgmerge_schema.new_measurement_xxxxxxx nm ON ((m.city_id OPERATOR(pg_catalog.=) nm.city_id) AND (m.logdate OPERATOR(pg_catalog.=) nm.logdate)) WHEN MATCHED AND (nm.peaktemp IS NULL) THEN DELETE WHEN MATCHED THEN UPDATE SET peaktemp = GREATEST(m.peaktemp, nm.peaktemp), unitsales = (m.unitsales OPERATOR(pg_catalog.+) COALESCE(nm.unitsales, 0)) WHEN NOT MATCHED THEN INSERT (city_id, logdate, peaktemp, unitsales) VALUES (nm.city_id, nm.logdate, nm.peaktemp, nm.unitsales)>
RESET client_min_messages;
SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate;

View File

@ -1540,6 +1540,11 @@ SELECT * FROM target_set ORDER BY 1, 2;
-- Error and Unsupported scenarios
--
MERGE INTO target_set
USING source_set AS foo ON target_set.t1 = foo.s1
WHEN MATCHED THEN
UPDATE SET ctid = '(0,100)';
MERGE INTO target_set
USING (SELECT s1,s2 FROM source_set UNION SELECT s2,s1 FROM source_set) AS foo ON target_set.t1 = foo.s1
WHEN MATCHED THEN