mirror of https://github.com/citusdata/citus.git
Refactor CTE handling in PrepareInsertSelectForCitusPlanner to unify and rename duplicate CTEs, improving clarity and functionality
parent
a9b3c126d2
commit
45d1979737
|
@ -28,6 +28,7 @@
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
#include "nodes/pg_list.h"
|
||||||
|
|
||||||
#include "pg_version_constants.h"
|
#include "pg_version_constants.h"
|
||||||
|
|
||||||
|
@ -108,29 +109,44 @@ static void ProcessEntryPair(TargetEntry *insertEntry, TargetEntry *selectEntry,
|
||||||
Form_pg_attribute attr, int targetEntryIndex,
|
Form_pg_attribute attr, int targetEntryIndex,
|
||||||
List **projectedEntries, List **nonProjectedEntries);
|
List **projectedEntries, List **nonProjectedEntries);
|
||||||
|
|
||||||
typedef struct ShiftReferencesWalkerContext
|
typedef struct
|
||||||
{
|
{
|
||||||
/* current nesting depth, 0 for top-level query, increments each time we go one Query deeper */
|
int levelsup; /* current depth, 0 at sub-query root */
|
||||||
int levelsup;
|
int offset; /* -1 */
|
||||||
|
/* sets built once before walk, used read-only during walk */
|
||||||
/* how much to shift ctelevelsup by if it matches levelsup+1 (typically -1) */
|
List *dupCteNameList; /* names duplicated between top & sub */
|
||||||
int offset;
|
/* filled while walking */
|
||||||
|
List *seenDupRefs; /* RTEs whose ctename will need rename */
|
||||||
/* optional: if we want to check ctename is in top-level list */
|
|
||||||
List *topLevelCteList;
|
List *topLevelCteList;
|
||||||
} ShiftReferencesWalkerContext;
|
|
||||||
|
} ShiftCteContext;
|
||||||
|
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
inline_cte_walker(Node *node, ShiftReferencesWalkerContext *context);
|
NameInStringList(List *nameList, const char *name);
|
||||||
|
|
||||||
|
static List *
|
||||||
|
FindDuplicateCteNames(List *topLevel, List *subquery);
|
||||||
|
|
||||||
|
static bool
|
||||||
|
inline_cte_walker(Node *node, ShiftCteContext *ctx);
|
||||||
|
|
||||||
static void
|
static void
|
||||||
DecrementInsertLevelReferences(Query *subquery,
|
RenameDuplicateCtes(Query *subq,
|
||||||
int offset, /* typically -1 */
|
List *dupNameList,
|
||||||
List *topLevelCteList);
|
List *seenRefs);
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
CteNameExists(List *cteList, const char *ctename);
|
CteNameExists(List *cteList, const char *ctename);
|
||||||
|
|
||||||
|
static void
|
||||||
|
CopyAndRenameOuterCtes(Query *outerQuery,
|
||||||
|
List *topLevelCopy,
|
||||||
|
List *dupNameList);
|
||||||
|
|
||||||
|
static bool
|
||||||
|
rename_cte_ref_walker(Node *node, void *dupNameList);
|
||||||
|
|
||||||
|
|
||||||
/* depth of current insert/select planner. */
|
/* depth of current insert/select planner. */
|
||||||
static int insertSelectPlannerLevel = 0;
|
static int insertSelectPlannerLevel = 0;
|
||||||
|
@ -532,38 +548,31 @@ PrepareInsertSelectForCitusPlanner(Query *insertSelectQuery)
|
||||||
{
|
{
|
||||||
RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery);
|
RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery);
|
||||||
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
||||||
Oid targetRelationId = insertRte->relid;
|
Oid targetRel = insertRte->relid;
|
||||||
|
|
||||||
|
/* 0. if SELECT is a UNION we first wrap it so we get a single target list */
|
||||||
if (selectRte->subquery->setOperations != NULL)
|
if (selectRte->subquery->setOperations != NULL)
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Prepare UNION query for reordering and adding casts by
|
|
||||||
* wrapping it in a subquery to have a single target list.
|
|
||||||
*/
|
|
||||||
selectRte->subquery = WrapSubquery(selectRte->subquery);
|
selectRte->subquery = WrapSubquery(selectRte->subquery);
|
||||||
}
|
|
||||||
|
|
||||||
/* this is required for correct deparsing of the query */
|
/* 1. reorder INSERT & SELECT tlist + add casts for COPY optimisation */
|
||||||
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
|
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
|
||||||
|
|
||||||
/*
|
|
||||||
* Cast types of insert target list and select projection list to
|
|
||||||
* match the column types of the target relation.
|
|
||||||
*/
|
|
||||||
selectRte->subquery->targetList =
|
selectRte->subquery->targetList =
|
||||||
AddInsertSelectCasts(insertSelectQuery->targetList,
|
AddInsertSelectCasts(insertSelectQuery->targetList,
|
||||||
copyObject(selectRte->subquery->targetList),
|
copyObject(selectRte->subquery->targetList),
|
||||||
targetRelationId);
|
targetRel);
|
||||||
|
|
||||||
if (list_length(insertSelectQuery->cteList) > 0)
|
/* --- from here on we only work if INSERT defines CTEs ---------------- */
|
||||||
{
|
if (insertSelectQuery->cteList == NIL)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/* we mutate the list, keep an untouched copy for later */
|
||||||
List *topCopy = copyObject(insertSelectQuery->cteList);
|
List *topCopy = copyObject(insertSelectQuery->cteList);
|
||||||
ListCell *lc;
|
ListCell *lc;
|
||||||
|
|
||||||
elog(DEBUG1, "Unifying top‑level CTEs into subquery");
|
elog(DEBUG1, "Unifying top-level CTEs into subquery");
|
||||||
|
|
||||||
/* append only the *new* ones */
|
/* 2. merge INSERT-level CTEs into the SELECT-subquery (skip duplicates) */
|
||||||
foreach(lc, topCopy)
|
foreach (lc, topCopy)
|
||||||
{
|
{
|
||||||
CommonTableExpr *cte = (CommonTableExpr *) lfirst(lc);
|
CommonTableExpr *cte = (CommonTableExpr *) lfirst(lc);
|
||||||
if (!CteNameExists(selectRte->subquery->cteList, cte->ctename))
|
if (!CteNameExists(selectRte->subquery->cteList, cte->ctename))
|
||||||
|
@ -571,15 +580,105 @@ PrepareInsertSelectForCitusPlanner(Query *insertSelectQuery)
|
||||||
lappend(selectRte->subquery->cteList, cte);
|
lappend(selectRte->subquery->cteList, cte);
|
||||||
}
|
}
|
||||||
|
|
||||||
insertSelectQuery->cteList = NIL;
|
/* 3. build list of names that now exist twice */
|
||||||
|
List *dupNames = FindDuplicateCteNames(topCopy, selectRte->subquery->cteList);
|
||||||
|
|
||||||
/* Suppose we physically appended the top-level cteList into the subquery,
|
/* 4. one big walker that shifts ctelevelsup and notes inner refs ----- */
|
||||||
so references are at ctelevelsup=1, 2, etc. We want them all to shift by -1. */
|
ShiftCteContext ctx = {0};
|
||||||
|
ctx.levelsup = 0;
|
||||||
|
ctx.offset = -1; /* shift down by one */
|
||||||
|
ctx.dupCteNameList = dupNames;
|
||||||
|
ctx.seenDupRefs = NIL;
|
||||||
|
|
||||||
DecrementInsertLevelReferences(selectRte->subquery, -1, topCopy /* for ctename check */);
|
/* 4.a walk the SELECT sub-query */
|
||||||
|
query_tree_walker(selectRte->subquery,
|
||||||
|
inline_cte_walker,
|
||||||
|
(void *) &ctx,
|
||||||
|
QTW_EXAMINE_RTES_BEFORE);
|
||||||
|
|
||||||
elog(DEBUG1, "Done shifting ctelevelsup X->X-1 for subquery references");
|
/* 4.b ALSO walk every INSERT-level CTE body - they may reference a
|
||||||
|
* sibling CTE that became duplicate. */
|
||||||
|
foreach (lc, topCopy)
|
||||||
|
{
|
||||||
|
CommonTableExpr *cte = (CommonTableExpr *) lfirst(lc);
|
||||||
|
|
||||||
|
if (IsA(cte->ctequery, Query))
|
||||||
|
query_tree_walker((Query *) cte->ctequery,
|
||||||
|
inline_cte_walker,
|
||||||
|
(void *) &ctx,
|
||||||
|
QTW_EXAMINE_RTES_BEFORE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* 5. rename duplicate definitions and the inner references we stored */
|
||||||
|
if (dupNames != NIL)
|
||||||
|
RenameDuplicateCtes(selectRte->subquery, dupNames, ctx.seenDupRefs);
|
||||||
|
|
||||||
|
/* 6. rename references that live inside the ORIGINAL CTE bodies */
|
||||||
|
foreach (lc, topCopy)
|
||||||
|
{
|
||||||
|
CommonTableExpr *cte = (CommonTableExpr *) lfirst(lc);
|
||||||
|
rename_cte_ref_walker((Node *) cte->ctequery, dupNames);
|
||||||
|
}
|
||||||
|
|
||||||
|
rename_cte_ref_walker((Node *) selectRte->subquery, dupNames);
|
||||||
|
|
||||||
|
/* 7. create renamed copies of the original CTEs for the outer query */
|
||||||
|
CopyAndRenameOuterCtes(insertSelectQuery, topCopy, dupNames);
|
||||||
|
|
||||||
|
/* 8. finally fix any remaining references on *all* query levels */
|
||||||
|
rename_cte_ref_walker((Node *) insertSelectQuery, dupNames);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* inline_cte_walker
|
||||||
|
*
|
||||||
|
* If node is a Query, we increase context->levelsup by 1,
|
||||||
|
* recursively walk the Query, then restore it.
|
||||||
|
*
|
||||||
|
* If node is a RangeTblEntry with RTE_CTE and ctelevelsup == (levelsup + 1),
|
||||||
|
* we do ctelevelsup += offset (e.g. -1 => so k+1 → k).
|
||||||
|
*
|
||||||
|
* We do expression_tree_walker for fallback on expressions.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
inline_cte_walker(Node *node, ShiftCteContext *ctx)
|
||||||
|
{
|
||||||
|
if (node == NULL)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (IsA(node, Query))
|
||||||
|
{
|
||||||
|
ctx->levelsup++;
|
||||||
|
query_tree_walker((Query *) node,
|
||||||
|
inline_cte_walker,
|
||||||
|
ctx,
|
||||||
|
QTW_EXAMINE_RTES_BEFORE);
|
||||||
|
ctx->levelsup--;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
else if (IsA(node, RangeTblEntry))
|
||||||
|
{
|
||||||
|
RangeTblEntry *rte = (RangeTblEntry *) node;
|
||||||
|
|
||||||
|
if (rte->rtekind == RTE_CTE && /* it is a CTE ref */
|
||||||
|
rte->ctelevelsup > 0) /* points above us */
|
||||||
|
{
|
||||||
|
/* 1. shift one level down */
|
||||||
|
rte->ctelevelsup += ctx->offset; /* offset == –1 */
|
||||||
|
|
||||||
|
/* 2. if it is one of the duplicate names, remember it */
|
||||||
|
if (NameInStringList(ctx->dupCteNameList, rte->ctename) &&
|
||||||
|
!list_member_ptr(ctx->seenDupRefs, rte))
|
||||||
|
ctx->seenDupRefs = lappend(ctx->seenDupRefs, rte);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
/* other expression nodes */
|
||||||
|
return expression_tree_walker(node,
|
||||||
|
inline_cte_walker,
|
||||||
|
ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -597,102 +696,143 @@ CteNameExists(List *cteList, const char *ctename)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
RenameDuplicateCtes(Query *subq,
|
||||||
|
List *dupNameList, /* names we must rename */
|
||||||
|
List *seenRefs) /* RTE pointers captured above */
|
||||||
|
{
|
||||||
|
ListCell *lc;
|
||||||
|
/* (A) rename definitions that sit now in subq->cteList */
|
||||||
|
foreach(lc, subq->cteList)
|
||||||
|
{
|
||||||
|
CommonTableExpr *cte = lfirst(lc);
|
||||||
|
if (NameInStringList(dupNameList, cte->ctename))
|
||||||
|
{
|
||||||
|
char *newname = psprintf("_insert_%s", cte->ctename);
|
||||||
|
cte->ctename = pstrdup(newname);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* (B) rename every reference we stored while walking */
|
||||||
|
foreach(lc, seenRefs)
|
||||||
|
{
|
||||||
|
RangeTblEntry *rte = lfirst(lc);
|
||||||
|
char *newname = psprintf("_insert_%s", rte->ctename);
|
||||||
|
rte->ctename = pstrdup(newname);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/* collect names of CTEs that exist both top-level and inside the sub-query */
|
||||||
|
static List *
|
||||||
|
FindDuplicateCteNames(List *topLevel, List *subquery)
|
||||||
|
{
|
||||||
|
List *dups = NIL;
|
||||||
|
ListCell *lc;
|
||||||
|
|
||||||
|
foreach(lc, topLevel)
|
||||||
|
{
|
||||||
|
CommonTableExpr *cte = lfirst(lc);
|
||||||
|
if (CteNameExists(subquery, cte->ctename))
|
||||||
|
dups = lappend(dups, makeString(pstrdup(cte->ctename)));
|
||||||
|
}
|
||||||
|
return dups;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
NameInStringList(List *nameList, const char *name)
|
||||||
|
{
|
||||||
|
ListCell *lc;
|
||||||
|
foreach(lc, nameList)
|
||||||
|
if (strcmp(strVal(lfirst(lc)), name) == 0)
|
||||||
|
return true;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* inline_cte_walker
|
* For each name in dupNameList make a copy of the ORIGINAL top-level CTE,
|
||||||
|
* rename it to "_insert_<name>" and append it to outerQuery->cteList.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CopyAndRenameOuterCtes(Query *outerQuery,
|
||||||
|
List *topLevelCopy, /* copy of original outer CTEs */
|
||||||
|
List *dupNameList) /* names that were duplicated */
|
||||||
|
{
|
||||||
|
ListCell *lc;
|
||||||
|
|
||||||
|
foreach (lc, topLevelCopy)
|
||||||
|
{
|
||||||
|
CommonTableExpr *orig = (CommonTableExpr *) lfirst(lc);
|
||||||
|
|
||||||
|
if (NameInStringList(dupNameList, orig->ctename))
|
||||||
|
{
|
||||||
|
CommonTableExpr *cpy = copyObject(orig);
|
||||||
|
char *newn = psprintf("_insert_%s", orig->ctename);
|
||||||
|
|
||||||
|
cpy->ctename = pstrdup(newn); /* rename definition */
|
||||||
|
outerQuery->cteList = lappend(outerQuery->cteList, cpy);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* rename_cte_ref_walker
|
||||||
|
* Recursively rename every RTE_CTE whose *current* name is in
|
||||||
|
* dupNameList by prefixing it with "_insert_".
|
||||||
*
|
*
|
||||||
* If node is a Query, we increase context->levelsup by 1,
|
* - We do NOT touch cte->ctename in sub-queries here – that was
|
||||||
* recursively walk the Query, then restore it.
|
* already done by RenameDuplicateCtes / CopyAndRenameOuterCtes.
|
||||||
*
|
* - We visit every Query node, so this reaches into CTE bodies,
|
||||||
* If node is a RangeTblEntry with RTE_CTE and ctelevelsup == (levelsup + 1),
|
* sub-queries, InitPlans, etc.
|
||||||
* we do ctelevelsup += offset (e.g. -1 => so k+1 → k).
|
|
||||||
*
|
|
||||||
* We do expression_tree_walker for fallback on expressions.
|
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
inline_cte_walker(Node *node, ShiftReferencesWalkerContext *context)
|
rename_cte_ref_walker(Node *node, void *dupNameList)
|
||||||
{
|
{
|
||||||
if (node == NULL)
|
if (node == NULL)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
if (IsA(node, Query))
|
if (IsA(node, RangeTblEntry))
|
||||||
{
|
|
||||||
Query *query = (Query *) node;
|
|
||||||
|
|
||||||
/* descend one query level deeper */
|
|
||||||
context->levelsup++;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Use QTW_EXAMINE_RTES_AFTER or QTW_EXAMINE_RTES_BEFORE – your snippet
|
|
||||||
* used AFTER, so let's keep that. This means we'll handle the rtable
|
|
||||||
* after the rest, which is okay.
|
|
||||||
*
|
|
||||||
* Or we can do QTW_EXAMINE_RTES_BEFORE so we see RangeTblEntry first.
|
|
||||||
* The key is being consistent with your scenario.
|
|
||||||
*/
|
|
||||||
query_tree_walker(query,
|
|
||||||
inline_cte_walker,
|
|
||||||
context,
|
|
||||||
QTW_EXAMINE_RTES_BEFORE);
|
|
||||||
|
|
||||||
context->levelsup--;
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
else if (IsA(node, RangeTblEntry))
|
|
||||||
{
|
{
|
||||||
RangeTblEntry *rte = (RangeTblEntry *) node;
|
RangeTblEntry *rte = (RangeTblEntry *) node;
|
||||||
|
|
||||||
if (rte->rtekind == RTE_CTE)
|
if (rte->rtekind == RTE_CTE &&
|
||||||
|
NameInStringList((List *) dupNameList, rte->ctename))
|
||||||
{
|
{
|
||||||
/*
|
char *newn = psprintf("_insert_%s", rte->ctename);
|
||||||
* If RTE_CTE's ctelevelsup == (current nesting level + 1),
|
rte->ctename = newn;
|
||||||
* we do ctelevelsup += offset.
|
|
||||||
* e.g. if offset=-1, and ctelevelsup= (levelsup + 1),
|
|
||||||
* that effectively does "k+1 → k".
|
|
||||||
*/
|
|
||||||
if (rte->ctelevelsup == (context->levelsup + 1))
|
|
||||||
{
|
|
||||||
if (context->topLevelCteList == NULL ||
|
|
||||||
CteNameExists(context->topLevelCteList, rte->ctename))
|
|
||||||
{
|
|
||||||
int old = rte->ctelevelsup;
|
|
||||||
rte->ctelevelsup += context->offset; /* usually ‑1 */
|
|
||||||
elog(DEBUG2, "Shifted ctelevelsup for %s from %d to %d",
|
|
||||||
rte->ctename, old, rte->ctelevelsup);
|
|
||||||
}
|
}
|
||||||
else
|
/* no further descent needed inside a bare RTE */
|
||||||
{
|
|
||||||
elog(DEBUG2, "ctename=%s not found in top-level list, skipping shift",
|
|
||||||
rte->ctename);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
else if (IsA(node, Query))
|
||||||
|
{
|
||||||
|
Query *q = (Query *) node;
|
||||||
|
|
||||||
/* fallback for expressions, e.g. scanning function calls, sublinks, etc. */
|
/* first recurse into sub-queries you find in the RTE list */
|
||||||
return expression_tree_walker(node, inline_cte_walker, (void *) context);
|
range_table_walker(q->rtable,
|
||||||
|
rename_cte_ref_walker,
|
||||||
|
dupNameList,
|
||||||
|
QTW_EXAMINE_RTES_BEFORE);
|
||||||
|
|
||||||
|
/* then recurse into every other field of the Query */
|
||||||
|
return query_tree_walker(q,
|
||||||
|
rename_cte_ref_walker,
|
||||||
|
dupNameList,
|
||||||
|
QTW_IGNORE_RANGE_TABLE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* expression nodes, target lists, … */
|
||||||
|
return expression_tree_walker(node,
|
||||||
|
rename_cte_ref_walker,
|
||||||
|
dupNameList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
|
||||||
DecrementInsertLevelReferences(Query *subquery,
|
|
||||||
int offset, /* typically -1 */
|
|
||||||
List *topLevelCteList)
|
|
||||||
{
|
|
||||||
ShiftReferencesWalkerContext ctx;
|
|
||||||
ctx.levelsup = 0; /* so that top-level query => 0, subquery => 1, etc. */
|
|
||||||
ctx.offset = offset;
|
|
||||||
ctx.topLevelCteList = topLevelCteList;
|
|
||||||
|
|
||||||
query_tree_walker(subquery,
|
|
||||||
inline_cte_walker,
|
|
||||||
(void *) &ctx,
|
|
||||||
QTW_EXAMINE_RTES_AFTER /* or BEFORE, but snippet used AFTER */);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue