diff --git a/citus-tools b/citus-tools new file mode 160000 index 000000000..3376bd684 --- /dev/null +++ b/citus-tools @@ -0,0 +1 @@ +Subproject commit 3376bd6845f0614908ed304f5033bd644c82d3bf diff --git a/src/backend/distributed/planner/cte_inline.c b/src/backend/distributed/planner/cte_inline.c index d6f88525c..63be41723 100644 --- a/src/backend/distributed/planner/cte_inline.c +++ b/src/backend/distributed/planner/cte_inline.c @@ -21,6 +21,7 @@ #include "pg_version_constants.h" #include "distributed/cte_inline.h" +#include "distributed/distributed_planner.h" typedef struct inline_cte_walker_context { @@ -186,6 +187,14 @@ QueryTreeContainsInlinableCteWalker(Node *node, void *context) static bool PostgreSQLCTEInlineCondition(CommonTableExpr *cte, CmdType cmdType) { + /* Prevent inlining if the CTE's query references any distributed table */ + List *cteRTEs = ExtractRangeTableEntryList((Query *) cte->ctequery); + if (ListContainsDistributedTableRTE(cteRTEs, NULL)) + { + elog(DEBUG1, "CTE %s references a distributed table; skipping inlining", cte->ctename); + return false; + } + /* * Consider inlining the CTE (creating RTE_SUBQUERY RTE(s)) instead of * implementing it as a separately-planned CTE. diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index ac7754cb9..ce53f81bb 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -93,8 +93,6 @@ static uint64 NextPlanId = 1; /* keep track of planner call stack levels */ int PlannerLevel = 0; -static bool ListContainsDistributedTableRTE(List *rangeTableList, - bool *maybeHasForeignDistributedTable); static PlannedStmt * CreateDistributedPlannedStmt( DistributedPlanningContext *planContext); static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, @@ -156,6 +154,11 @@ static bool CheckPostPlanDistribution(bool isDistributedQuery, Query *origQuery, List *rangeTableList, Query *plannedQuery); +static bool +ListContainsDistributedTableRTERecursive(Query *outerQuery, List *rangeTableList, + bool *maybeHasForeignDistributedTable); +static Query * +FindCTEQueryByName(Query *outerQuery, const char *cteName); /* Distributed planner hook */ PlannedStmt * @@ -358,70 +361,144 @@ ExtractRangeTableEntryList(Query *query) bool NeedsDistributedPlanning(Query *query) { - if (!CitusHasBeenLoaded()) - { - return false; - } + if (!CitusHasBeenLoaded()) + return false; - CmdType commandType = query->commandType; + CmdType commandType = query->commandType; + if (commandType != CMD_SELECT && commandType != CMD_INSERT && + commandType != CMD_UPDATE && commandType != CMD_DELETE) + return false; - if (commandType != CMD_SELECT && commandType != CMD_INSERT && - commandType != CMD_UPDATE && commandType != CMD_DELETE) - { - return false; - } - - List *allRTEs = ExtractRangeTableEntryList(query); - - return ListContainsDistributedTableRTE(allRTEs, NULL); + List *allRTEs = ExtractRangeTableEntryList(query); + return ListContainsDistributedTableRTE(allRTEs, NULL); } /* - * ListContainsDistributedTableRTE gets a list of range table entries - * and returns true if there is at least one distributed relation range - * table entry in the list. The boolean maybeHasForeignDistributedTable - * variable is set to true if the list contains a foreign table. + * Wrapper for top-level calls. + */ +bool +ListContainsDistributedTableRTE(List *rangeTableList, bool *maybeHasForeignDistributedTable) +{ + return ListContainsDistributedTableRTERecursive(NULL, rangeTableList, maybeHasForeignDistributedTable); +} + + +/* + * FindCTEQueryByName + * + * Given an outer query and a CTE name, returns the corresponding + * CTE's query definition from outerQuery->cteList, or NULL if not found. + * (In PG17, ctelevelsup is removed so we match only on cte name.) + */ +static Query * +FindCTEQueryByName(Query *outerQuery, const char *cteName) +{ + ListCell *lc; + foreach(lc, outerQuery->cteList) + { + CommonTableExpr *cte = (CommonTableExpr *) lfirst(lc); + if (strcmp(cte->ctename, cteName) == 0) + { + return (Query *) cte->ctequery; + } + } + return NULL; +} + +/* + * ListContainsDistributedTableRTERecursive + * + * Recursively checks the given range table list for an RTE that represents a + * distributed table. For RTE_SUBQUERY and RTE_CTE, it extracts all RTEs from the + * subquery (or CTE definition, if the subquery pointer is NULL) and recurses. + * + * The outerQuery parameter is the top-level query; it is used to look up CTE + * definitions if an RTE_CTE does not have its subquery pointer set. */ static bool -ListContainsDistributedTableRTE(List *rangeTableList, - bool *maybeHasForeignDistributedTable) +ListContainsDistributedTableRTERecursive(Query *outerQuery, List *rangeTableList, + bool *maybeHasForeignDistributedTable) { - ListCell *rangeTableCell = NULL; + ListCell *rangeTableCell = NULL; - foreach(rangeTableCell, rangeTableList) - { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + foreach(rangeTableCell, rangeTableList) + { + RangeTblEntry *rte = (RangeTblEntry *) lfirst(rangeTableCell); - if (rangeTableEntry->rtekind != RTE_RELATION) - { - continue; - } + /* For base relations, check as before */ + if (rte->rtekind == RTE_RELATION) + { + if (HideCitusDependentObjects && + IsolationIsSerializable() && + IsPgLocksTable(rte)) + { + continue; + } + if (IsCitusTable(rte->relid)) + { + if (maybeHasForeignDistributedTable != NULL && + IsForeignTable(rte->relid)) + { + *maybeHasForeignDistributedTable = true; + } + return true; + } + } + /* For subqueries, check their internal range table */ + else if (rte->rtekind == RTE_SUBQUERY) + { + Query *subquery = rte->subquery; + if (subquery != NULL) + { + List *allRTEs = ExtractRangeTableEntryList(subquery); + if (ListContainsDistributedTableRTERecursive(outerQuery, allRTEs, + maybeHasForeignDistributedTable)) + { + return true; + } + /* Also check any CTEs defined inside the subquery */ + if (subquery->cteList) + { + ListCell *cteCell; + foreach(cteCell, subquery->cteList) + { + CommonTableExpr *cte = (CommonTableExpr *) lfirst(cteCell); + if (cte && cte->ctequery) + { + List *cteRTEs = ExtractRangeTableEntryList((Query *) cte->ctequery); + if (ListContainsDistributedTableRTERecursive(outerQuery, cteRTEs, + maybeHasForeignDistributedTable)) + { + return true; + } + } + } + } + } + } + /* For CTE entries, if rte->subquery is NULL then look up its definition */ + else if (rte->rtekind == RTE_CTE) + { + Query *cteQuery = rte->subquery; + if (cteQuery == NULL && outerQuery != NULL) + { + cteQuery = FindCTEQueryByName(outerQuery, rte->ctename); + } + if (cteQuery != NULL) + { + List *cteRTEs = ExtractRangeTableEntryList(cteQuery); + if (ListContainsDistributedTableRTERecursive(outerQuery, cteRTEs, + maybeHasForeignDistributedTable)) + { + return true; + } + } + } + /* Optionally: handle other RTE kinds if necessary */ + } - if (HideCitusDependentObjects && IsolationIsSerializable() && IsPgLocksTable( - rangeTableEntry)) - { - /* - * Postgres tidscan.sql test fails if we do not filter pg_locks table because - * test results, which show taken locks in serializable isolation mode, - * fails by showing extra lock taken by IsCitusTable below. - */ - continue; - } - - if (IsCitusTable(rangeTableEntry->relid)) - { - if (maybeHasForeignDistributedTable != NULL && - IsForeignTable(rangeTableEntry->relid)) - { - *maybeHasForeignDistributedTable = true; - } - - return true; - } - } - - return false; + return false; } diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index ce61bd0ae..893f16ed2 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -107,6 +107,10 @@ static void ResetTargetEntryResno(List *targetList); static void ProcessEntryPair(TargetEntry *insertEntry, TargetEntry *selectEntry, Form_pg_attribute attr, int targetEntryIndex, List **projectedEntries, List **nonProjectedEntries); + static List * + TargetEntryList(List *expressionList); + static int +FindRTEIndexInQuery(Query *query, RangeTblEntry *rte); /* depth of current insert/select planner. */ @@ -304,6 +308,17 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, distributedPlan->modLevel = RowModifyLevelForQuery(originalQuery); + /* + * Check for volatile functions (e.g. nextval) in the target list. + * If present, call our rewrite function to "split" the query. + */ + if (contain_nextval_expression_walker((Node *) originalQuery->targetList, NULL)) + { + originalQuery = RewriteInsertSelectForPartialNextval(originalQuery); + } + + + /* * Error semantics for INSERT ... SELECT queries are different than regular * modify queries. Thus, handle separately. @@ -394,6 +409,199 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, } +/* + * RewriteInsertSelectForPartialNextval + * + * Given an INSERT ... SELECT query that contains nextval() in its target list, + * rewrite the query so that the inner subquery (which is pushed down to workers) + * returns only the non-volatile (non-nextval) columns. The outer query then + * adds the nextval() calls in its target list. This forces nextval() to be + * evaluated on the coordinator. + * + * For example, this transforms: + * + * INSERT INTO target (col1, col2, col3) + * SELECT col1, nextval('seq'), col3 + * FROM distributed_table; + * + * into a plan roughly equivalent to: + * + * INSERT INTO target (col1, col2, col3) + * SELECT worker.col1, nextval('seq'), worker.col3 + * FROM ( + * SELECT col1, col3 + * FROM distributed_table + * ) AS worker; + */ +Query * +RewriteInsertSelectForPartialNextval(Query *originalQuery) +{ + /* 0) Basic check: should be an INSERT...SELECT query */ + if (originalQuery->commandType != CMD_INSERT) + return originalQuery; + + /* 1) Get the result relation and the SELECT side RTE */ + RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(originalQuery); + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(originalQuery); + + /* 2) Ensure the SELECT side is an RTE_SUBQUERY. If not, wrap it. */ + if (selectRte->rtekind != RTE_SUBQUERY) + { + Query *fakeSelect = makeNode(Query); + fakeSelect->commandType = CMD_SELECT; + fakeSelect->targetList = copyObject(selectRte->subquery->targetList); + /* Additional fields (rtable, jointree) should be copied as needed */ + selectRte->rtekind = RTE_SUBQUERY; + selectRte->subquery = fakeSelect; + } + + /* Optionally, wrap the subquery to get a clean target list */ + selectRte->subquery = WrapSubquery(selectRte->subquery); + Query *subquery = selectRte->subquery; /* This is now our inner subquery */ + + /* + * 3) Partition the subquery’s target list into: + * - workerExprs: expressions that do NOT contain nextval() + * - volatileExprs: expressions that DO contain nextval() + * + * We'll use a helper function RemoveNextvalFromTargetList() that, + * for each TargetEntry in subquery->targetList, places its expr into one of + * two lists depending on whether contain_nextval_expression_walker() returns true. + */ + List *workerExprs = NIL; + List *volatileExprs = NIL; + ListCell *lc; + foreach(lc, subquery->targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + if (contain_nextval_expression_walker((Node *) tle->expr, NULL)) + volatileExprs = lappend(volatileExprs, tle->expr); + else + workerExprs = lappend(workerExprs, tle->expr); + } + + /* + * 4) Build new target lists: + * + * For the inner subquery: Use only the worker expressions. + * For the outer (coordinator) query: Build a target list that combines: + * - Vars that reference the worker subquery’s output columns (for each workerExpr) + * - The volatile expressions (e.g. nextval()) placed in their proper positions. + * + * Here we assume that the original target list order is preserved: + * e.g. if the original target list was: + * [ expr1, expr2 (volatile), expr3 ] + * then the new outer target list should be: + * [ Var(worker_col1), expr2, Var(worker_col2) ] + * + * This example assumes that the order of workerExprs and volatileExprs + * correspond to their positions in the original target list. + * + * A more robust implementation would walk the original target list and, for + * each entry, either replace it with a Var reference (if non-volatile) or + * keep the volatile expression. + */ + + /* Create new inner subquery target list from workerExprs */ + List *newWorkerTList = TargetEntryList(workerExprs); + subquery->targetList = newWorkerTList; /* inner subquery returns only worker columns */ + + /* Build the outer target list */ + List *newOuterTList = NIL; + int outerResno = 1; + int workerIndex = 1; + int volatileIndex = 1; + /* For each original target entry, check if it was volatile or not. + For simplicity, we assume the original order is preserved in the concatenated lists. + A robust solution would store position info. */ + foreach(lc, originalQuery->targetList) + { + TargetEntry *origTle = (TargetEntry *) lfirst(lc); + if (contain_nextval_expression_walker((Node *) origTle->expr, NULL)) + { + /* Keep the volatile expression as is. */ + TargetEntry *newTle = makeTargetEntry(copyObject(origTle->expr), + outerResno, + pstrdup(origTle->resname), + false); + newOuterTList = lappend(newOuterTList, newTle); + volatileIndex++; + } + else + { + /* b) Replace non-volatile expression with a Var reference to the subquery's output. + We need to get the RTE index of the subquery. + */ + int subqueryIndex = FindRTEIndexInQuery(originalQuery, selectRte); + TargetEntry *workerTle = (TargetEntry *) list_nth(newWorkerTList, workerIndex - 1); + Var *v = makeVar(subqueryIndex, /* the RTE index (1-based) */ + workerIndex, /* attribute number within subquery TList */ + exprType((Node *) workerTle->expr), + exprTypmod((Node *) workerTle->expr), + exprCollation((Node *) workerTle->expr), + 0); + TargetEntry *newTle = makeTargetEntry((Expr *) v, + outerResno, + pstrdup(origTle->resname), + false); + newOuterTList = lappend(newOuterTList, newTle); + workerIndex++; + } + outerResno++; + } + originalQuery->targetList = newOuterTList; + + /* Optionally, re-run any target list reordering to align with the physical table's column order */ + ReorderInsertSelectTargetLists(originalQuery, insertRte, selectRte); + + return originalQuery; +} + +/* + * TargetEntryList creates a new target list from a list of expressions. + * Each expression is wrapped in a TargetEntry with an automatically generated + * column name. + */ +static List * +TargetEntryList(List *expressionList) +{ + List *tlist = NIL; + ListCell *cell; + int colIndex = 1; + + foreach(cell, expressionList) + { + Expr *expr = (Expr *) lfirst(cell); + StringInfo colName = makeStringInfo(); + appendStringInfo(colName, "worker_col%d", colIndex); + TargetEntry *tle = makeTargetEntry(expr, colIndex, pstrdup(colName->data), false); + tlist = lappend(tlist, tle); + colIndex++; + } + return tlist; +} + +/* + * FindRTEIndexInQuery returns the 1-based index of the given RTE in query->rtable. + * If not found, returns 0. + */ +static int +FindRTEIndexInQuery(Query *query, RangeTblEntry *rte) +{ + ListCell *cell; + int index = 0; + foreach(cell, query->rtable) + { + index++; + if (lfirst(cell) == (void *) rte) + return index; + } + return 0; +} + + + + /* * InsertSelectHasRouterSelect is a helper function that returns true of the SELECT * part of the INSERT .. SELECT query is a router query. diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 33a9c2fa8..5832fb7a2 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -251,6 +251,9 @@ extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan, struct DistributedPlan *distributedPlan); extern bool ContainsSingleShardTable(Query *query); extern RTEListProperties * GetRTEListPropertiesForQuery(Query *query); +extern List *ExtractRangeTableEntryList(Query *query); +extern bool ListContainsDistributedTableRTE(List *rangeTableList, bool *maybeHasForeignDistributedTable); + extern struct DistributedPlan * CreateDistributedPlan(uint64 planId, diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h index a9100b02d..4ba2d6027 100644 --- a/src/include/distributed/insert_select_planner.h +++ b/src/include/distributed/insert_select_planner.h @@ -46,6 +46,8 @@ extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId, extern char * InsertSelectResultIdPrefix(uint64 planId); extern bool PlanningInsertSelect(void); extern Query * WrapSubquery(Query *subquery); +extern Query * +RewriteInsertSelectForPartialNextval(Query *originalQuery); #endif /* INSERT_SELECT_PLANNER_H */