From 1f34bcbace482b3f6a3dc10b60c4a7fbb38b9f09 Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Fri, 28 Mar 2025 10:11:16 +0000 Subject: [PATCH] . --- .../distributed/planner/distributed_planner.c | 231 ++++++++++++++++++ .../planner/insert_select_planner.c | 10 +- 2 files changed, 236 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 6d84e32ca..50e2128cb 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -155,6 +155,15 @@ static bool CheckPostPlanDistribution(bool isDistributedQuery, List *rangeTableList, Query *plannedQuery); +static Query * +RewriteInsertSelectForPartialNextval(Query *originalQuery); +static List * +TargetEntryList(List *expressionList); +static int +FindRTEIndexInQuery(Query *query, RangeTblEntry *rte); +RangeTblEntry * +ExtractResultRelationRTEOrNull(Query *query); + /* Distributed planner hook */ PlannedStmt * distributed_planner(Query *parse, @@ -848,6 +857,12 @@ InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, /* after inlining, we shouldn't have any inlinable CTEs */ Assert(!QueryTreeContainsInlinableCTE(copyOfOriginalQuery)); + // if (QueryContainsNextval(copyOfOriginalQuery)) + // { + // /* rewrite the query to partial pushdown form */ + // copyOfOriginalQuery = RewriteInsertSelectForPartialNextval(copyOfOriginalQuery); + // } + /* simply recurse into CreateDistributedPlannedStmt() in a PG_TRY() block */ PlannedStmt *result = TryCreateDistributedPlannedStmt(planContext->plan, copyOfOriginalQuery, @@ -860,6 +875,214 @@ InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, } +/* + * RewriteInsertSelectForPartialNextval + * + * Given an INSERT ... SELECT query that contains nextval() in its target list, + * rewrite the query so that the inner subquery (which is pushed down to workers) + * returns only the non-volatile (non-nextval) columns. The outer query then + * adds the nextval() calls in its target list. This forces nextval() to be + * evaluated on the coordinator. + * + * For example, this transforms: + * + * INSERT INTO target (col1, col2, col3) + * SELECT col1, nextval('seq'), col3 + * FROM distributed_table; + * + * into a plan roughly equivalent to: + * + * INSERT INTO target (col1, col2, col3) + * SELECT worker.col1, nextval('seq'), worker.col3 + * FROM ( + * SELECT col1, col3 + * FROM distributed_table + * ) AS worker; + */ +Query * +RewriteInsertSelectForPartialNextval(Query *originalQuery) +{ + /* 0) Basic check: should be an INSERT...SELECT query */ + RangeTblEntry *insertRte = ExtractResultRelationRTEOrNull(originalQuery); + if (insertRte == NULL) + { + /* not an INSERT...SELECT, or no result table => skip rewriting */ + return originalQuery; + } + + /* 1) Get the result relation and the SELECT side RTE */ + // RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(originalQuery); + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(originalQuery); + + /* 2) Ensure the SELECT side is an RTE_SUBQUERY. If not, wrap it. */ + if (selectRte->rtekind != RTE_SUBQUERY) + { + Query *fakeSelect = makeNode(Query); + fakeSelect->commandType = CMD_SELECT; + fakeSelect->targetList = copyObject(selectRte->subquery->targetList); + /* Additional fields (rtable, jointree) should be copied as needed */ + selectRte->rtekind = RTE_SUBQUERY; + selectRte->subquery = fakeSelect; + } + + /* Optionally, wrap the subquery to get a clean target list */ + selectRte->subquery = WrapSubquery(selectRte->subquery); + Query *subquery = selectRte->subquery; /* This is now our inner subquery */ + + /* + * 3) Partition the subquery’s target list into: + * - workerExprs: expressions that do NOT contain nextval() + * - volatileExprs: expressions that DO contain nextval() + * + * We'll use a helper function RemoveNextvalFromTargetList() that, + * for each TargetEntry in subquery->targetList, places its expr into one of + * two lists depending on whether contain_nextval_expression_walker() returns true. + */ + List *workerExprs = NIL; + List *volatileExprs = NIL; + ListCell *lc; + foreach(lc, subquery->targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + if (contain_nextval_expression_walker((Node *) tle->expr, NULL)) + volatileExprs = lappend(volatileExprs, tle->expr); + else + workerExprs = lappend(workerExprs, tle->expr); + } + + /* + * 4) Build new target lists: + * + * For the inner subquery: Use only the worker expressions. + * For the outer (coordinator) query: Build a target list that combines: + * - Vars that reference the worker subquery’s output columns (for each workerExpr) + * - The volatile expressions (e.g. nextval()) placed in their proper positions. + * + * Here we assume that the original target list order is preserved: + * e.g. if the original target list was: + * [ expr1, expr2 (volatile), expr3 ] + * then the new outer target list should be: + * [ Var(worker_col1), expr2, Var(worker_col2) ] + * + * This example assumes that the order of workerExprs and volatileExprs + * correspond to their positions in the original target list. + * + * A more robust implementation would walk the original target list and, for + * each entry, either replace it with a Var reference (if non-volatile) or + * keep the volatile expression. + */ + + /* Create new inner subquery target list from workerExprs */ + List *newWorkerTList = TargetEntryList(workerExprs); + subquery->targetList = newWorkerTList; /* inner subquery returns only worker columns */ + + /* Build the outer target list */ + List *newOuterTList = NIL; + int outerResno = 1; + int workerIndex = 1; + int volatileIndex = 1; + /* For each original target entry, check if it was volatile or not. + For simplicity, we assume the original order is preserved in the concatenated lists. + A robust solution would store position info. */ + foreach(lc, originalQuery->targetList) + { + TargetEntry *origTle = (TargetEntry *) lfirst(lc); + if (contain_nextval_expression_walker((Node *) origTle->expr, NULL)) + { + /* Keep the volatile expression as is. */ + TargetEntry *newTle = makeTargetEntry(copyObject(origTle->expr), + outerResno, + pstrdup(origTle->resname), + false); + newOuterTList = lappend(newOuterTList, newTle); + volatileIndex++; + } + else + { + /* Replace non-volatile expression with a Var reference to the subquery's output. + We need to get the RTE index of the subquery. + */ + int subqueryIndex = FindRTEIndexInQuery(originalQuery, selectRte); + TargetEntry *workerTle = (TargetEntry *) list_nth(newWorkerTList, workerIndex - 1); + Var *v = makeVar(subqueryIndex, /* the RTE index (1-based) */ + workerIndex, /* attribute number within subquery TList */ + exprType((Node *) workerTle->expr), + exprTypmod((Node *) workerTle->expr), + exprCollation((Node *) workerTle->expr), + 0); + TargetEntry *newTle = makeTargetEntry((Expr *) v, + outerResno, + pstrdup(origTle->resname), + false); + newOuterTList = lappend(newOuterTList, newTle); + workerIndex++; + } + outerResno++; + } + originalQuery->targetList = newOuterTList; + + /* Optionally, re-run any target list reordering to align with the physical table's column order */ + ReorderInsertSelectTargetLists(originalQuery, insertRte, selectRte); + + return originalQuery; +} + + +RangeTblEntry * +ExtractResultRelationRTEOrNull(Query *query) +{ + if (query->resultRelation == 0) + return NULL; + + RangeTblEntry *rte = rt_fetch(query->resultRelation, query->rtable); + return rte; +} + + +/* + * TargetEntryList creates a new target list from a list of expressions. + * Each expression is wrapped in a TargetEntry with an automatically generated + * column name. + */ +static List * +TargetEntryList(List *expressionList) +{ + List *tlist = NIL; + ListCell *cell; + int colIndex = 1; + + foreach(cell, expressionList) + { + Expr *expr = (Expr *) lfirst(cell); + StringInfo colName = makeStringInfo(); + appendStringInfo(colName, "worker_col%d", colIndex); + TargetEntry *tle = makeTargetEntry(expr, colIndex, pstrdup(colName->data), false); + tlist = lappend(tlist, tle); + colIndex++; + } + return tlist; +} + +/* + * FindRTEIndexInQuery returns the 1-based index of the given RTE in query->rtable. + * If not found, returns 0. + */ +static int +FindRTEIndexInQuery(Query *query, RangeTblEntry *rte) +{ + ListCell *cell; + int index = 0; + foreach(cell, query->rtable) + { + index++; + if (lfirst(cell) == (void *) rte) + return index; + } + return 0; +} + + + /* * TryCreateDistributedPlannedStmt is a wrapper around CreateDistributedPlannedStmt, simply * calling it in PG_TRY()/PG_CATCH() block. The function returns a PlannedStmt if the input @@ -1003,6 +1226,14 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina case INSERT_SELECT_INTO_LOCAL_TABLE: { + + if (QueryContainsNextval(originalQuery)) + { + /* rewrite the query to partial pushdown form */ + originalQuery = RewriteInsertSelectForPartialNextval(originalQuery); + } + + distributedPlan = CreateInsertSelectIntoLocalTablePlan(planId, originalQuery, diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 7d80eabff..2b3c1950f 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -310,11 +310,11 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, distributedPlan->modLevel = RowModifyLevelForQuery(originalQuery); - if (QueryContainsNextval(originalQuery)) - { - /* rewrite the query to partial pushdown form */ - originalQuery = RewriteInsertSelectForPartialNextval(originalQuery); - } + // if (QueryContainsNextval(originalQuery)) + // { + // /* rewrite the query to partial pushdown form */ + // originalQuery = RewriteInsertSelectForPartialNextval(originalQuery); + // } /* * Error semantics for INSERT ... SELECT queries are different than regular