mirror of https://github.com/citusdata/citus.git
Add functions to support distributed table handling in query planning
parent
680b870d45
commit
f0fbfb486d
|
@ -0,0 +1 @@
|
|||
Subproject commit 3376bd6845f0614908ed304f5033bd644c82d3bf
|
|
@ -21,6 +21,7 @@
|
|||
#include "pg_version_constants.h"
|
||||
|
||||
#include "distributed/cte_inline.h"
|
||||
#include "distributed/distributed_planner.h"
|
||||
|
||||
typedef struct inline_cte_walker_context
|
||||
{
|
||||
|
@ -186,6 +187,14 @@ QueryTreeContainsInlinableCteWalker(Node *node, void *context)
|
|||
static bool
|
||||
PostgreSQLCTEInlineCondition(CommonTableExpr *cte, CmdType cmdType)
|
||||
{
|
||||
/* Prevent inlining if the CTE's query references any distributed table */
|
||||
List *cteRTEs = ExtractRangeTableEntryList((Query *) cte->ctequery);
|
||||
if (ListContainsDistributedTableRTE(cteRTEs, NULL))
|
||||
{
|
||||
elog(DEBUG1, "CTE %s references a distributed table; skipping inlining", cte->ctename);
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Consider inlining the CTE (creating RTE_SUBQUERY RTE(s)) instead of
|
||||
* implementing it as a separately-planned CTE.
|
||||
|
|
|
@ -93,8 +93,6 @@ static uint64 NextPlanId = 1;
|
|||
/* keep track of planner call stack levels */
|
||||
int PlannerLevel = 0;
|
||||
|
||||
static bool ListContainsDistributedTableRTE(List *rangeTableList,
|
||||
bool *maybeHasForeignDistributedTable);
|
||||
static PlannedStmt * CreateDistributedPlannedStmt(
|
||||
DistributedPlanningContext *planContext);
|
||||
static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
|
||||
|
@ -156,6 +154,11 @@ static bool CheckPostPlanDistribution(bool isDistributedQuery,
|
|||
Query *origQuery,
|
||||
List *rangeTableList,
|
||||
Query *plannedQuery);
|
||||
static bool
|
||||
ListContainsDistributedTableRTERecursive(Query *outerQuery, List *rangeTableList,
|
||||
bool *maybeHasForeignDistributedTable);
|
||||
static Query *
|
||||
FindCTEQueryByName(Query *outerQuery, const char *cteName);
|
||||
|
||||
/* Distributed planner hook */
|
||||
PlannedStmt *
|
||||
|
@ -358,70 +361,144 @@ ExtractRangeTableEntryList(Query *query)
|
|||
bool
|
||||
NeedsDistributedPlanning(Query *query)
|
||||
{
|
||||
if (!CitusHasBeenLoaded())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (!CitusHasBeenLoaded())
|
||||
return false;
|
||||
|
||||
CmdType commandType = query->commandType;
|
||||
CmdType commandType = query->commandType;
|
||||
if (commandType != CMD_SELECT && commandType != CMD_INSERT &&
|
||||
commandType != CMD_UPDATE && commandType != CMD_DELETE)
|
||||
return false;
|
||||
|
||||
if (commandType != CMD_SELECT && commandType != CMD_INSERT &&
|
||||
commandType != CMD_UPDATE && commandType != CMD_DELETE)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
List *allRTEs = ExtractRangeTableEntryList(query);
|
||||
|
||||
return ListContainsDistributedTableRTE(allRTEs, NULL);
|
||||
List *allRTEs = ExtractRangeTableEntryList(query);
|
||||
return ListContainsDistributedTableRTE(allRTEs, NULL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ListContainsDistributedTableRTE gets a list of range table entries
|
||||
* and returns true if there is at least one distributed relation range
|
||||
* table entry in the list. The boolean maybeHasForeignDistributedTable
|
||||
* variable is set to true if the list contains a foreign table.
|
||||
* Wrapper for top-level calls.
|
||||
*/
|
||||
bool
|
||||
ListContainsDistributedTableRTE(List *rangeTableList, bool *maybeHasForeignDistributedTable)
|
||||
{
|
||||
return ListContainsDistributedTableRTERecursive(NULL, rangeTableList, maybeHasForeignDistributedTable);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FindCTEQueryByName
|
||||
*
|
||||
* Given an outer query and a CTE name, returns the corresponding
|
||||
* CTE's query definition from outerQuery->cteList, or NULL if not found.
|
||||
* (In PG17, ctelevelsup is removed so we match only on cte name.)
|
||||
*/
|
||||
static Query *
|
||||
FindCTEQueryByName(Query *outerQuery, const char *cteName)
|
||||
{
|
||||
ListCell *lc;
|
||||
foreach(lc, outerQuery->cteList)
|
||||
{
|
||||
CommonTableExpr *cte = (CommonTableExpr *) lfirst(lc);
|
||||
if (strcmp(cte->ctename, cteName) == 0)
|
||||
{
|
||||
return (Query *) cte->ctequery;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* ListContainsDistributedTableRTERecursive
|
||||
*
|
||||
* Recursively checks the given range table list for an RTE that represents a
|
||||
* distributed table. For RTE_SUBQUERY and RTE_CTE, it extracts all RTEs from the
|
||||
* subquery (or CTE definition, if the subquery pointer is NULL) and recurses.
|
||||
*
|
||||
* The outerQuery parameter is the top-level query; it is used to look up CTE
|
||||
* definitions if an RTE_CTE does not have its subquery pointer set.
|
||||
*/
|
||||
static bool
|
||||
ListContainsDistributedTableRTE(List *rangeTableList,
|
||||
bool *maybeHasForeignDistributedTable)
|
||||
ListContainsDistributedTableRTERecursive(Query *outerQuery, List *rangeTableList,
|
||||
bool *maybeHasForeignDistributedTable)
|
||||
{
|
||||
ListCell *rangeTableCell = NULL;
|
||||
ListCell *rangeTableCell = NULL;
|
||||
|
||||
foreach(rangeTableCell, rangeTableList)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||
foreach(rangeTableCell, rangeTableList)
|
||||
{
|
||||
RangeTblEntry *rte = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||
|
||||
if (rangeTableEntry->rtekind != RTE_RELATION)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
/* For base relations, check as before */
|
||||
if (rte->rtekind == RTE_RELATION)
|
||||
{
|
||||
if (HideCitusDependentObjects &&
|
||||
IsolationIsSerializable() &&
|
||||
IsPgLocksTable(rte))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
if (IsCitusTable(rte->relid))
|
||||
{
|
||||
if (maybeHasForeignDistributedTable != NULL &&
|
||||
IsForeignTable(rte->relid))
|
||||
{
|
||||
*maybeHasForeignDistributedTable = true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
/* For subqueries, check their internal range table */
|
||||
else if (rte->rtekind == RTE_SUBQUERY)
|
||||
{
|
||||
Query *subquery = rte->subquery;
|
||||
if (subquery != NULL)
|
||||
{
|
||||
List *allRTEs = ExtractRangeTableEntryList(subquery);
|
||||
if (ListContainsDistributedTableRTERecursive(outerQuery, allRTEs,
|
||||
maybeHasForeignDistributedTable))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
/* Also check any CTEs defined inside the subquery */
|
||||
if (subquery->cteList)
|
||||
{
|
||||
ListCell *cteCell;
|
||||
foreach(cteCell, subquery->cteList)
|
||||
{
|
||||
CommonTableExpr *cte = (CommonTableExpr *) lfirst(cteCell);
|
||||
if (cte && cte->ctequery)
|
||||
{
|
||||
List *cteRTEs = ExtractRangeTableEntryList((Query *) cte->ctequery);
|
||||
if (ListContainsDistributedTableRTERecursive(outerQuery, cteRTEs,
|
||||
maybeHasForeignDistributedTable))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/* For CTE entries, if rte->subquery is NULL then look up its definition */
|
||||
else if (rte->rtekind == RTE_CTE)
|
||||
{
|
||||
Query *cteQuery = rte->subquery;
|
||||
if (cteQuery == NULL && outerQuery != NULL)
|
||||
{
|
||||
cteQuery = FindCTEQueryByName(outerQuery, rte->ctename);
|
||||
}
|
||||
if (cteQuery != NULL)
|
||||
{
|
||||
List *cteRTEs = ExtractRangeTableEntryList(cteQuery);
|
||||
if (ListContainsDistributedTableRTERecursive(outerQuery, cteRTEs,
|
||||
maybeHasForeignDistributedTable))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
/* Optionally: handle other RTE kinds if necessary */
|
||||
}
|
||||
|
||||
if (HideCitusDependentObjects && IsolationIsSerializable() && IsPgLocksTable(
|
||||
rangeTableEntry))
|
||||
{
|
||||
/*
|
||||
* Postgres tidscan.sql test fails if we do not filter pg_locks table because
|
||||
* test results, which show taken locks in serializable isolation mode,
|
||||
* fails by showing extra lock taken by IsCitusTable below.
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
|
||||
if (IsCitusTable(rangeTableEntry->relid))
|
||||
{
|
||||
if (maybeHasForeignDistributedTable != NULL &&
|
||||
IsForeignTable(rangeTableEntry->relid))
|
||||
{
|
||||
*maybeHasForeignDistributedTable = true;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -107,6 +107,10 @@ static void ResetTargetEntryResno(List *targetList);
|
|||
static void ProcessEntryPair(TargetEntry *insertEntry, TargetEntry *selectEntry,
|
||||
Form_pg_attribute attr, int targetEntryIndex,
|
||||
List **projectedEntries, List **nonProjectedEntries);
|
||||
static List *
|
||||
TargetEntryList(List *expressionList);
|
||||
static int
|
||||
FindRTEIndexInQuery(Query *query, RangeTblEntry *rte);
|
||||
|
||||
|
||||
/* depth of current insert/select planner. */
|
||||
|
@ -304,6 +308,17 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
|
|||
|
||||
distributedPlan->modLevel = RowModifyLevelForQuery(originalQuery);
|
||||
|
||||
/*
|
||||
* Check for volatile functions (e.g. nextval) in the target list.
|
||||
* If present, call our rewrite function to "split" the query.
|
||||
*/
|
||||
if (contain_nextval_expression_walker((Node *) originalQuery->targetList, NULL))
|
||||
{
|
||||
originalQuery = RewriteInsertSelectForPartialNextval(originalQuery);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* Error semantics for INSERT ... SELECT queries are different than regular
|
||||
* modify queries. Thus, handle separately.
|
||||
|
@ -394,6 +409,199 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* RewriteInsertSelectForPartialNextval
|
||||
*
|
||||
* Given an INSERT ... SELECT query that contains nextval() in its target list,
|
||||
* rewrite the query so that the inner subquery (which is pushed down to workers)
|
||||
* returns only the non-volatile (non-nextval) columns. The outer query then
|
||||
* adds the nextval() calls in its target list. This forces nextval() to be
|
||||
* evaluated on the coordinator.
|
||||
*
|
||||
* For example, this transforms:
|
||||
*
|
||||
* INSERT INTO target (col1, col2, col3)
|
||||
* SELECT col1, nextval('seq'), col3
|
||||
* FROM distributed_table;
|
||||
*
|
||||
* into a plan roughly equivalent to:
|
||||
*
|
||||
* INSERT INTO target (col1, col2, col3)
|
||||
* SELECT worker.col1, nextval('seq'), worker.col3
|
||||
* FROM (
|
||||
* SELECT col1, col3
|
||||
* FROM distributed_table
|
||||
* ) AS worker;
|
||||
*/
|
||||
Query *
|
||||
RewriteInsertSelectForPartialNextval(Query *originalQuery)
|
||||
{
|
||||
/* 0) Basic check: should be an INSERT...SELECT query */
|
||||
if (originalQuery->commandType != CMD_INSERT)
|
||||
return originalQuery;
|
||||
|
||||
/* 1) Get the result relation and the SELECT side RTE */
|
||||
RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(originalQuery);
|
||||
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(originalQuery);
|
||||
|
||||
/* 2) Ensure the SELECT side is an RTE_SUBQUERY. If not, wrap it. */
|
||||
if (selectRte->rtekind != RTE_SUBQUERY)
|
||||
{
|
||||
Query *fakeSelect = makeNode(Query);
|
||||
fakeSelect->commandType = CMD_SELECT;
|
||||
fakeSelect->targetList = copyObject(selectRte->subquery->targetList);
|
||||
/* Additional fields (rtable, jointree) should be copied as needed */
|
||||
selectRte->rtekind = RTE_SUBQUERY;
|
||||
selectRte->subquery = fakeSelect;
|
||||
}
|
||||
|
||||
/* Optionally, wrap the subquery to get a clean target list */
|
||||
selectRte->subquery = WrapSubquery(selectRte->subquery);
|
||||
Query *subquery = selectRte->subquery; /* This is now our inner subquery */
|
||||
|
||||
/*
|
||||
* 3) Partition the subquery’s target list into:
|
||||
* - workerExprs: expressions that do NOT contain nextval()
|
||||
* - volatileExprs: expressions that DO contain nextval()
|
||||
*
|
||||
* We'll use a helper function RemoveNextvalFromTargetList() that,
|
||||
* for each TargetEntry in subquery->targetList, places its expr into one of
|
||||
* two lists depending on whether contain_nextval_expression_walker() returns true.
|
||||
*/
|
||||
List *workerExprs = NIL;
|
||||
List *volatileExprs = NIL;
|
||||
ListCell *lc;
|
||||
foreach(lc, subquery->targetList)
|
||||
{
|
||||
TargetEntry *tle = (TargetEntry *) lfirst(lc);
|
||||
if (contain_nextval_expression_walker((Node *) tle->expr, NULL))
|
||||
volatileExprs = lappend(volatileExprs, tle->expr);
|
||||
else
|
||||
workerExprs = lappend(workerExprs, tle->expr);
|
||||
}
|
||||
|
||||
/*
|
||||
* 4) Build new target lists:
|
||||
*
|
||||
* For the inner subquery: Use only the worker expressions.
|
||||
* For the outer (coordinator) query: Build a target list that combines:
|
||||
* - Vars that reference the worker subquery’s output columns (for each workerExpr)
|
||||
* - The volatile expressions (e.g. nextval()) placed in their proper positions.
|
||||
*
|
||||
* Here we assume that the original target list order is preserved:
|
||||
* e.g. if the original target list was:
|
||||
* [ expr1, expr2 (volatile), expr3 ]
|
||||
* then the new outer target list should be:
|
||||
* [ Var(worker_col1), expr2, Var(worker_col2) ]
|
||||
*
|
||||
* This example assumes that the order of workerExprs and volatileExprs
|
||||
* correspond to their positions in the original target list.
|
||||
*
|
||||
* A more robust implementation would walk the original target list and, for
|
||||
* each entry, either replace it with a Var reference (if non-volatile) or
|
||||
* keep the volatile expression.
|
||||
*/
|
||||
|
||||
/* Create new inner subquery target list from workerExprs */
|
||||
List *newWorkerTList = TargetEntryList(workerExprs);
|
||||
subquery->targetList = newWorkerTList; /* inner subquery returns only worker columns */
|
||||
|
||||
/* Build the outer target list */
|
||||
List *newOuterTList = NIL;
|
||||
int outerResno = 1;
|
||||
int workerIndex = 1;
|
||||
int volatileIndex = 1;
|
||||
/* For each original target entry, check if it was volatile or not.
|
||||
For simplicity, we assume the original order is preserved in the concatenated lists.
|
||||
A robust solution would store position info. */
|
||||
foreach(lc, originalQuery->targetList)
|
||||
{
|
||||
TargetEntry *origTle = (TargetEntry *) lfirst(lc);
|
||||
if (contain_nextval_expression_walker((Node *) origTle->expr, NULL))
|
||||
{
|
||||
/* Keep the volatile expression as is. */
|
||||
TargetEntry *newTle = makeTargetEntry(copyObject(origTle->expr),
|
||||
outerResno,
|
||||
pstrdup(origTle->resname),
|
||||
false);
|
||||
newOuterTList = lappend(newOuterTList, newTle);
|
||||
volatileIndex++;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* b) Replace non-volatile expression with a Var reference to the subquery's output.
|
||||
We need to get the RTE index of the subquery.
|
||||
*/
|
||||
int subqueryIndex = FindRTEIndexInQuery(originalQuery, selectRte);
|
||||
TargetEntry *workerTle = (TargetEntry *) list_nth(newWorkerTList, workerIndex - 1);
|
||||
Var *v = makeVar(subqueryIndex, /* the RTE index (1-based) */
|
||||
workerIndex, /* attribute number within subquery TList */
|
||||
exprType((Node *) workerTle->expr),
|
||||
exprTypmod((Node *) workerTle->expr),
|
||||
exprCollation((Node *) workerTle->expr),
|
||||
0);
|
||||
TargetEntry *newTle = makeTargetEntry((Expr *) v,
|
||||
outerResno,
|
||||
pstrdup(origTle->resname),
|
||||
false);
|
||||
newOuterTList = lappend(newOuterTList, newTle);
|
||||
workerIndex++;
|
||||
}
|
||||
outerResno++;
|
||||
}
|
||||
originalQuery->targetList = newOuterTList;
|
||||
|
||||
/* Optionally, re-run any target list reordering to align with the physical table's column order */
|
||||
ReorderInsertSelectTargetLists(originalQuery, insertRte, selectRte);
|
||||
|
||||
return originalQuery;
|
||||
}
|
||||
|
||||
/*
|
||||
* TargetEntryList creates a new target list from a list of expressions.
|
||||
* Each expression is wrapped in a TargetEntry with an automatically generated
|
||||
* column name.
|
||||
*/
|
||||
static List *
|
||||
TargetEntryList(List *expressionList)
|
||||
{
|
||||
List *tlist = NIL;
|
||||
ListCell *cell;
|
||||
int colIndex = 1;
|
||||
|
||||
foreach(cell, expressionList)
|
||||
{
|
||||
Expr *expr = (Expr *) lfirst(cell);
|
||||
StringInfo colName = makeStringInfo();
|
||||
appendStringInfo(colName, "worker_col%d", colIndex);
|
||||
TargetEntry *tle = makeTargetEntry(expr, colIndex, pstrdup(colName->data), false);
|
||||
tlist = lappend(tlist, tle);
|
||||
colIndex++;
|
||||
}
|
||||
return tlist;
|
||||
}
|
||||
|
||||
/*
|
||||
* FindRTEIndexInQuery returns the 1-based index of the given RTE in query->rtable.
|
||||
* If not found, returns 0.
|
||||
*/
|
||||
static int
|
||||
FindRTEIndexInQuery(Query *query, RangeTblEntry *rte)
|
||||
{
|
||||
ListCell *cell;
|
||||
int index = 0;
|
||||
foreach(cell, query->rtable)
|
||||
{
|
||||
index++;
|
||||
if (lfirst(cell) == (void *) rte)
|
||||
return index;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* InsertSelectHasRouterSelect is a helper function that returns true of the SELECT
|
||||
* part of the INSERT .. SELECT query is a router query.
|
||||
|
|
|
@ -251,6 +251,9 @@ extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
|
|||
struct DistributedPlan *distributedPlan);
|
||||
extern bool ContainsSingleShardTable(Query *query);
|
||||
extern RTEListProperties * GetRTEListPropertiesForQuery(Query *query);
|
||||
extern List *ExtractRangeTableEntryList(Query *query);
|
||||
extern bool ListContainsDistributedTableRTE(List *rangeTableList, bool *maybeHasForeignDistributedTable);
|
||||
|
||||
|
||||
|
||||
extern struct DistributedPlan * CreateDistributedPlan(uint64 planId,
|
||||
|
|
|
@ -46,6 +46,8 @@ extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId,
|
|||
extern char * InsertSelectResultIdPrefix(uint64 planId);
|
||||
extern bool PlanningInsertSelect(void);
|
||||
extern Query * WrapSubquery(Query *subquery);
|
||||
extern Query *
|
||||
RewriteInsertSelectForPartialNextval(Query *originalQuery);
|
||||
|
||||
|
||||
#endif /* INSERT_SELECT_PLANNER_H */
|
||||
|
|
Loading…
Reference in New Issue