From 9d809610d06cb45f2dd5c548de25bcafbb8a298c Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Fri, 28 Mar 2025 08:55:32 +0000 Subject: [PATCH] Refactor INSERT ... SELECT handling to support partial nextval pushdown and improve query rewriting logic --- src/backend/distributed/planner/cte_inline.c | 14 +- .../planner/insert_select_planner.c | 206 ++++++++++++++++++ .../planner/multi_router_planner.c | 7 +- .../distributed/multi_router_planner.h | 2 + 4 files changed, 219 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/planner/cte_inline.c b/src/backend/distributed/planner/cte_inline.c index 63be41723..5ed991ebd 100644 --- a/src/backend/distributed/planner/cte_inline.c +++ b/src/backend/distributed/planner/cte_inline.c @@ -187,13 +187,13 @@ QueryTreeContainsInlinableCteWalker(Node *node, void *context) static bool PostgreSQLCTEInlineCondition(CommonTableExpr *cte, CmdType cmdType) { - /* Prevent inlining if the CTE's query references any distributed table */ - List *cteRTEs = ExtractRangeTableEntryList((Query *) cte->ctequery); - if (ListContainsDistributedTableRTE(cteRTEs, NULL)) - { - elog(DEBUG1, "CTE %s references a distributed table; skipping inlining", cte->ctename); - return false; - } + // /* Prevent inlining if the CTE's query references any distributed table */ + // List *cteRTEs = ExtractRangeTableEntryList((Query *) cte->ctequery); + // if (ListContainsDistributedTableRTE(cteRTEs, NULL)) + // { + // elog(DEBUG1, "CTE %s references a distributed table; skipping inlining", cte->ctename); + // return false; + // } /* * Consider inlining the CTE (creating RTE_SUBQUERY RTE(s)) instead of diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index ce61bd0ae..7d80eabff 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -108,6 +108,12 @@ static void ProcessEntryPair(TargetEntry *insertEntry, TargetEntry *selectEntry, Form_pg_attribute attr, int targetEntryIndex, List **projectedEntries, List **nonProjectedEntries); +static Query * +RewriteInsertSelectForPartialNextval(Query *originalQuery); +static List * +TargetEntryList(List *expressionList); +static int +FindRTEIndexInQuery(Query *query, RangeTblEntry *rte); /* depth of current insert/select planner. */ static int insertSelectPlannerLevel = 0; @@ -304,6 +310,12 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, distributedPlan->modLevel = RowModifyLevelForQuery(originalQuery); + if (QueryContainsNextval(originalQuery)) + { + /* rewrite the query to partial pushdown form */ + originalQuery = RewriteInsertSelectForPartialNextval(originalQuery); + } + /* * Error semantics for INSERT ... SELECT queries are different than regular * modify queries. Thus, handle separately. @@ -320,6 +332,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, } + /* * if the query goes to a single node ("router" in Citus' parlance), * we don't need to go through AllDistributionKeysInQueryAreEqual checks. @@ -394,6 +407,199 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, } +/* + * RewriteInsertSelectForPartialNextval + * + * Given an INSERT ... SELECT query that contains nextval() in its target list, + * rewrite the query so that the inner subquery (which is pushed down to workers) + * returns only the non-volatile (non-nextval) columns. The outer query then + * adds the nextval() calls in its target list. This forces nextval() to be + * evaluated on the coordinator. + * + * For example, this transforms: + * + * INSERT INTO target (col1, col2, col3) + * SELECT col1, nextval('seq'), col3 + * FROM distributed_table; + * + * into a plan roughly equivalent to: + * + * INSERT INTO target (col1, col2, col3) + * SELECT worker.col1, nextval('seq'), worker.col3 + * FROM ( + * SELECT col1, col3 + * FROM distributed_table + * ) AS worker; + */ +Query * +RewriteInsertSelectForPartialNextval(Query *originalQuery) +{ + /* 0) Basic check: should be an INSERT...SELECT query */ + if (originalQuery->commandType != CMD_INSERT) + return originalQuery; + + /* 1) Get the result relation and the SELECT side RTE */ + RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(originalQuery); + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(originalQuery); + + /* 2) Ensure the SELECT side is an RTE_SUBQUERY. If not, wrap it. */ + if (selectRte->rtekind != RTE_SUBQUERY) + { + Query *fakeSelect = makeNode(Query); + fakeSelect->commandType = CMD_SELECT; + fakeSelect->targetList = copyObject(selectRte->subquery->targetList); + /* Additional fields (rtable, jointree) should be copied as needed */ + selectRte->rtekind = RTE_SUBQUERY; + selectRte->subquery = fakeSelect; + } + + /* Optionally, wrap the subquery to get a clean target list */ + selectRte->subquery = WrapSubquery(selectRte->subquery); + Query *subquery = selectRte->subquery; /* This is now our inner subquery */ + + /* + * 3) Partition the subquery’s target list into: + * - workerExprs: expressions that do NOT contain nextval() + * - volatileExprs: expressions that DO contain nextval() + * + * We'll use a helper function RemoveNextvalFromTargetList() that, + * for each TargetEntry in subquery->targetList, places its expr into one of + * two lists depending on whether contain_nextval_expression_walker() returns true. + */ + List *workerExprs = NIL; + List *volatileExprs = NIL; + ListCell *lc; + foreach(lc, subquery->targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + if (contain_nextval_expression_walker((Node *) tle->expr, NULL)) + volatileExprs = lappend(volatileExprs, tle->expr); + else + workerExprs = lappend(workerExprs, tle->expr); + } + + /* + * 4) Build new target lists: + * + * For the inner subquery: Use only the worker expressions. + * For the outer (coordinator) query: Build a target list that combines: + * - Vars that reference the worker subquery’s output columns (for each workerExpr) + * - The volatile expressions (e.g. nextval()) placed in their proper positions. + * + * Here we assume that the original target list order is preserved: + * e.g. if the original target list was: + * [ expr1, expr2 (volatile), expr3 ] + * then the new outer target list should be: + * [ Var(worker_col1), expr2, Var(worker_col2) ] + * + * This example assumes that the order of workerExprs and volatileExprs + * correspond to their positions in the original target list. + * + * A more robust implementation would walk the original target list and, for + * each entry, either replace it with a Var reference (if non-volatile) or + * keep the volatile expression. + */ + + /* Create new inner subquery target list from workerExprs */ + List *newWorkerTList = TargetEntryList(workerExprs); + subquery->targetList = newWorkerTList; /* inner subquery returns only worker columns */ + + /* Build the outer target list */ + List *newOuterTList = NIL; + int outerResno = 1; + int workerIndex = 1; + int volatileIndex = 1; + /* For each original target entry, check if it was volatile or not. + For simplicity, we assume the original order is preserved in the concatenated lists. + A robust solution would store position info. */ + foreach(lc, originalQuery->targetList) + { + TargetEntry *origTle = (TargetEntry *) lfirst(lc); + if (contain_nextval_expression_walker((Node *) origTle->expr, NULL)) + { + /* Keep the volatile expression as is. */ + TargetEntry *newTle = makeTargetEntry(copyObject(origTle->expr), + outerResno, + pstrdup(origTle->resname), + false); + newOuterTList = lappend(newOuterTList, newTle); + volatileIndex++; + } + else + { + /* Replace non-volatile expression with a Var reference to the subquery's output. + We need to get the RTE index of the subquery. + */ + int subqueryIndex = FindRTEIndexInQuery(originalQuery, selectRte); + TargetEntry *workerTle = (TargetEntry *) list_nth(newWorkerTList, workerIndex - 1); + Var *v = makeVar(subqueryIndex, /* the RTE index (1-based) */ + workerIndex, /* attribute number within subquery TList */ + exprType((Node *) workerTle->expr), + exprTypmod((Node *) workerTle->expr), + exprCollation((Node *) workerTle->expr), + 0); + TargetEntry *newTle = makeTargetEntry((Expr *) v, + outerResno, + pstrdup(origTle->resname), + false); + newOuterTList = lappend(newOuterTList, newTle); + workerIndex++; + } + outerResno++; + } + originalQuery->targetList = newOuterTList; + + /* Optionally, re-run any target list reordering to align with the physical table's column order */ + ReorderInsertSelectTargetLists(originalQuery, insertRte, selectRte); + + return originalQuery; +} + +/* + * TargetEntryList creates a new target list from a list of expressions. + * Each expression is wrapped in a TargetEntry with an automatically generated + * column name. + */ +static List * +TargetEntryList(List *expressionList) +{ + List *tlist = NIL; + ListCell *cell; + int colIndex = 1; + + foreach(cell, expressionList) + { + Expr *expr = (Expr *) lfirst(cell); + StringInfo colName = makeStringInfo(); + appendStringInfo(colName, "worker_col%d", colIndex); + TargetEntry *tle = makeTargetEntry(expr, colIndex, pstrdup(colName->data), false); + tlist = lappend(tlist, tle); + colIndex++; + } + return tlist; +} + +/* + * FindRTEIndexInQuery returns the 1-based index of the given RTE in query->rtable. + * If not found, returns 0. + */ +static int +FindRTEIndexInQuery(Query *query, RangeTblEntry *rte) +{ + ListCell *cell; + int index = 0; + foreach(cell, query->rtable) + { + index++; + if (lfirst(cell) == (void *) rte) + return index; + } + return 0; +} + + + + /* * InsertSelectHasRouterSelect is a helper function that returns true of the SELECT * part of the INSERT .. SELECT query is a router query. diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index a6815fbcd..29bccd4fd 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -173,8 +173,8 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList); static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList); static bool IsLocallyAccessibleCitusLocalTable(Oid relationId); -static bool -QueryContainsNextval(Query *query); +// static bool +// QueryContainsNextval(Query *query); /* @@ -3812,7 +3812,8 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query) * then the query will anyway happen on the coordinator, so we can * allow nextval. */ - if (QueryContainsNextval(query) && + // if (QueryContainsNextval(query) && + if (contain_nextval_expression_walker((Node *) query->targetList, NULL) && (hasDistributedTable || hasReferenceTable)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 44be2736e..adc9ff368 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -121,4 +121,6 @@ extern Job * RouterJob(Query *originalQuery, extern bool ContainsOnlyLocalOrReferenceTables(RTEListProperties *rteProperties); extern RangeTblEntry * ExtractSourceResultRangeTableEntry(Query *query); +extern bool +QueryContainsNextval(Query *query); #endif /* MULTI_ROUTER_PLANNER_H */