Refactor INSERT ... SELECT handling to support partial nextval pushdown and improve query rewriting logic

issue_7784_rewrite
Mehmet Yilmaz 2025-03-28 08:55:32 +00:00
parent e02d49c220
commit 9d809610d0
4 changed files with 219 additions and 10 deletions

View File

@ -187,13 +187,13 @@ QueryTreeContainsInlinableCteWalker(Node *node, void *context)
static bool
PostgreSQLCTEInlineCondition(CommonTableExpr *cte, CmdType cmdType)
{
/* Prevent inlining if the CTE's query references any distributed table */
List *cteRTEs = ExtractRangeTableEntryList((Query *) cte->ctequery);
if (ListContainsDistributedTableRTE(cteRTEs, NULL))
{
elog(DEBUG1, "CTE %s references a distributed table; skipping inlining", cte->ctename);
return false;
}
// /* Prevent inlining if the CTE's query references any distributed table */
// List *cteRTEs = ExtractRangeTableEntryList((Query *) cte->ctequery);
// if (ListContainsDistributedTableRTE(cteRTEs, NULL))
// {
// elog(DEBUG1, "CTE %s references a distributed table; skipping inlining", cte->ctename);
// return false;
// }
/*
* Consider inlining the CTE (creating RTE_SUBQUERY RTE(s)) instead of

View File

@ -108,6 +108,12 @@ static void ProcessEntryPair(TargetEntry *insertEntry, TargetEntry *selectEntry,
Form_pg_attribute attr, int targetEntryIndex,
List **projectedEntries, List **nonProjectedEntries);
static Query *
RewriteInsertSelectForPartialNextval(Query *originalQuery);
static List *
TargetEntryList(List *expressionList);
static int
FindRTEIndexInQuery(Query *query, RangeTblEntry *rte);
/* depth of current insert/select planner. */
static int insertSelectPlannerLevel = 0;
@ -304,6 +310,12 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
distributedPlan->modLevel = RowModifyLevelForQuery(originalQuery);
if (QueryContainsNextval(originalQuery))
{
/* rewrite the query to partial pushdown form */
originalQuery = RewriteInsertSelectForPartialNextval(originalQuery);
}
/*
* Error semantics for INSERT ... SELECT queries are different than regular
* modify queries. Thus, handle separately.
@ -320,6 +332,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
}
/*
* if the query goes to a single node ("router" in Citus' parlance),
* we don't need to go through AllDistributionKeysInQueryAreEqual checks.
@ -394,6 +407,199 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
}
/*
* RewriteInsertSelectForPartialNextval
*
* Given an INSERT ... SELECT query that contains nextval() in its target list,
* rewrite the query so that the inner subquery (which is pushed down to workers)
* returns only the non-volatile (non-nextval) columns. The outer query then
* adds the nextval() calls in its target list. This forces nextval() to be
* evaluated on the coordinator.
*
* For example, this transforms:
*
* INSERT INTO target (col1, col2, col3)
* SELECT col1, nextval('seq'), col3
* FROM distributed_table;
*
* into a plan roughly equivalent to:
*
* INSERT INTO target (col1, col2, col3)
* SELECT worker.col1, nextval('seq'), worker.col3
* FROM (
* SELECT col1, col3
* FROM distributed_table
* ) AS worker;
*/
Query *
RewriteInsertSelectForPartialNextval(Query *originalQuery)
{
/* 0) Basic check: should be an INSERT...SELECT query */
if (originalQuery->commandType != CMD_INSERT)
return originalQuery;
/* 1) Get the result relation and the SELECT side RTE */
RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(originalQuery);
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(originalQuery);
/* 2) Ensure the SELECT side is an RTE_SUBQUERY. If not, wrap it. */
if (selectRte->rtekind != RTE_SUBQUERY)
{
Query *fakeSelect = makeNode(Query);
fakeSelect->commandType = CMD_SELECT;
fakeSelect->targetList = copyObject(selectRte->subquery->targetList);
/* Additional fields (rtable, jointree) should be copied as needed */
selectRte->rtekind = RTE_SUBQUERY;
selectRte->subquery = fakeSelect;
}
/* Optionally, wrap the subquery to get a clean target list */
selectRte->subquery = WrapSubquery(selectRte->subquery);
Query *subquery = selectRte->subquery; /* This is now our inner subquery */
/*
* 3) Partition the subquerys target list into:
* - workerExprs: expressions that do NOT contain nextval()
* - volatileExprs: expressions that DO contain nextval()
*
* We'll use a helper function RemoveNextvalFromTargetList() that,
* for each TargetEntry in subquery->targetList, places its expr into one of
* two lists depending on whether contain_nextval_expression_walker() returns true.
*/
List *workerExprs = NIL;
List *volatileExprs = NIL;
ListCell *lc;
foreach(lc, subquery->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
if (contain_nextval_expression_walker((Node *) tle->expr, NULL))
volatileExprs = lappend(volatileExprs, tle->expr);
else
workerExprs = lappend(workerExprs, tle->expr);
}
/*
* 4) Build new target lists:
*
* For the inner subquery: Use only the worker expressions.
* For the outer (coordinator) query: Build a target list that combines:
* - Vars that reference the worker subquerys output columns (for each workerExpr)
* - The volatile expressions (e.g. nextval()) placed in their proper positions.
*
* Here we assume that the original target list order is preserved:
* e.g. if the original target list was:
* [ expr1, expr2 (volatile), expr3 ]
* then the new outer target list should be:
* [ Var(worker_col1), expr2, Var(worker_col2) ]
*
* This example assumes that the order of workerExprs and volatileExprs
* correspond to their positions in the original target list.
*
* A more robust implementation would walk the original target list and, for
* each entry, either replace it with a Var reference (if non-volatile) or
* keep the volatile expression.
*/
/* Create new inner subquery target list from workerExprs */
List *newWorkerTList = TargetEntryList(workerExprs);
subquery->targetList = newWorkerTList; /* inner subquery returns only worker columns */
/* Build the outer target list */
List *newOuterTList = NIL;
int outerResno = 1;
int workerIndex = 1;
int volatileIndex = 1;
/* For each original target entry, check if it was volatile or not.
For simplicity, we assume the original order is preserved in the concatenated lists.
A robust solution would store position info. */
foreach(lc, originalQuery->targetList)
{
TargetEntry *origTle = (TargetEntry *) lfirst(lc);
if (contain_nextval_expression_walker((Node *) origTle->expr, NULL))
{
/* Keep the volatile expression as is. */
TargetEntry *newTle = makeTargetEntry(copyObject(origTle->expr),
outerResno,
pstrdup(origTle->resname),
false);
newOuterTList = lappend(newOuterTList, newTle);
volatileIndex++;
}
else
{
/* Replace non-volatile expression with a Var reference to the subquery's output.
We need to get the RTE index of the subquery.
*/
int subqueryIndex = FindRTEIndexInQuery(originalQuery, selectRte);
TargetEntry *workerTle = (TargetEntry *) list_nth(newWorkerTList, workerIndex - 1);
Var *v = makeVar(subqueryIndex, /* the RTE index (1-based) */
workerIndex, /* attribute number within subquery TList */
exprType((Node *) workerTle->expr),
exprTypmod((Node *) workerTle->expr),
exprCollation((Node *) workerTle->expr),
0);
TargetEntry *newTle = makeTargetEntry((Expr *) v,
outerResno,
pstrdup(origTle->resname),
false);
newOuterTList = lappend(newOuterTList, newTle);
workerIndex++;
}
outerResno++;
}
originalQuery->targetList = newOuterTList;
/* Optionally, re-run any target list reordering to align with the physical table's column order */
ReorderInsertSelectTargetLists(originalQuery, insertRte, selectRte);
return originalQuery;
}
/*
* TargetEntryList creates a new target list from a list of expressions.
* Each expression is wrapped in a TargetEntry with an automatically generated
* column name.
*/
static List *
TargetEntryList(List *expressionList)
{
List *tlist = NIL;
ListCell *cell;
int colIndex = 1;
foreach(cell, expressionList)
{
Expr *expr = (Expr *) lfirst(cell);
StringInfo colName = makeStringInfo();
appendStringInfo(colName, "worker_col%d", colIndex);
TargetEntry *tle = makeTargetEntry(expr, colIndex, pstrdup(colName->data), false);
tlist = lappend(tlist, tle);
colIndex++;
}
return tlist;
}
/*
* FindRTEIndexInQuery returns the 1-based index of the given RTE in query->rtable.
* If not found, returns 0.
*/
static int
FindRTEIndexInQuery(Query *query, RangeTblEntry *rte)
{
ListCell *cell;
int index = 0;
foreach(cell, query->rtable)
{
index++;
if (lfirst(cell) == (void *) rte)
return index;
}
return 0;
}
/*
* InsertSelectHasRouterSelect is a helper function that returns true of the SELECT
* part of the INSERT .. SELECT query is a router query.

View File

@ -173,8 +173,8 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList);
static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList);
static bool IsLocallyAccessibleCitusLocalTable(Oid relationId);
static bool
QueryContainsNextval(Query *query);
// static bool
// QueryContainsNextval(Query *query);
/*
@ -3812,7 +3812,8 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query)
* then the query will anyway happen on the coordinator, so we can
* allow nextval.
*/
if (QueryContainsNextval(query) &&
// if (QueryContainsNextval(query) &&
if (contain_nextval_expression_walker((Node *) query->targetList, NULL) &&
(hasDistributedTable || hasReferenceTable))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,

View File

@ -121,4 +121,6 @@ extern Job * RouterJob(Query *originalQuery,
extern bool ContainsOnlyLocalOrReferenceTables(RTEListProperties *rteProperties);
extern RangeTblEntry * ExtractSourceResultRangeTableEntry(Query *query);
extern bool
QueryContainsNextval(Query *query);
#endif /* MULTI_ROUTER_PLANNER_H */