Clean up insert_select_planner.c

onder_ins_select
Marco Slot 2022-07-22 15:57:37 +02:00
parent 2892e07e32
commit 150205223c
1 changed files with 80 additions and 65 deletions

View File

@ -50,6 +50,7 @@
#include "utils/rel.h"
static void PrepareInsertSelectForCitusPlanner(Query *insertSelectQuery);
static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId,
Query *originalQuery,
PlannerRestrictionContext *
@ -375,42 +376,12 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery,
ParamListInfo boundParams, bool hasUnresolvedParams,
PlannerRestrictionContext *plannerRestrictionContext)
{
RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery);
Oid targetRelationId = insertRte->relid;
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
PrepareInsertSelectForCitusPlanner(insertSelectQuery);
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
Query *selectQuery = selectRte->subquery;
bool hasSetOperations = false;
if (selectQuery->setOperations != NULL)
{
selectQuery = selectRte->subquery = WrapSubquery(selectQuery);
hasSetOperations = true;
}
/* this is required for correct deparsing of the query */
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
/*
* Cast types of insert target list and select projection list to
* match the column types of the target relation.
*/
selectQuery->targetList =
AddInsertSelectCasts(insertSelectQuery->targetList,
copyObject(selectQuery->targetList),
targetRelationId);
if (list_length(insertSelectQuery->cteList) > 0)
{
if (!hasSetOperations)
{
selectQuery = selectRte->subquery = WrapSubquery(selectQuery);
}
/* copy CTEs from the INSERT ... SELECT statement into outer SELECT */
selectQuery->cteList = copyObject(insertSelectQuery->cteList);
selectQuery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE;
insertSelectQuery->cteList = NIL;
}
DistributedPlan *distPlan = CreateDistributedPlan(planId, selectQuery,
copyObject(selectQuery),
@ -457,6 +428,78 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery,
}
/*
* PrepareInsertSelectForCitusPlanner prepares an INSERT..SELECT query tree
* that was passed to the planner for use by Citus.
*
* First, it rebuilds the target lists of the INSERT and the SELECT
* to be in the same order, which is not guaranteed in the parse tree.
*
* Second, some of the constants in the target list will have type
* "unknown", which would confuse the Citus planner. To address that,
* we add casts to SELECT target list entries whose type does not correspond
* to the destination. This also helps us feed the output directly into
* a COPY stream for INSERT..SELECT via coordinator.
*
* In case of UNION or other set operations, the SELECT does not have a
* clearly defined target list, so we first wrap the UNION in a subquery.
* UNION queries do not have the "unknown" type problem.
*
* Finally, if the INSERT has CTEs, we move those CTEs into the SELECT,
* such that we can plan the SELECT as an independent query. To ensure
* the ctelevelsup for CTE RTE's remain the same, we wrap the SELECT into
* a subquery, unless we already did so in case of a UNION.
*/
static void
PrepareInsertSelectForCitusPlanner(Query *insertSelectQuery)
{
RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery);
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
Oid targetRelationId = insertRte->relid;
bool isWrapped = false;
if (selectRte->subquery->setOperations != NULL)
{
/*
* Prepare UNION query for reordering and adding casts by
* wrapping it in a subquery to have a single target list.
*/
selectRte->subquery = WrapSubquery(selectRte->subquery);
isWrapped = true;
}
/* this is required for correct deparsing of the query */
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
/*
* Cast types of insert target list and select projection list to
* match the column types of the target relation.
*/
selectRte->subquery->targetList =
AddInsertSelectCasts(insertSelectQuery->targetList,
copyObject(selectRte->subquery->targetList),
targetRelationId);
if (list_length(insertSelectQuery->cteList) > 0)
{
if (!isWrapped)
{
/*
* By wrapping the SELECT in a subquery, we can avoid adjusting
* ctelevelsup in RTE's that point to the CTEs.
*/
selectRte->subquery = WrapSubquery(selectRte->subquery);
}
/* copy CTEs from the INSERT ... SELECT statement into outer SELECT */
selectRte->subquery->cteList = copyObject(insertSelectQuery->cteList);
selectRte->subquery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE;
insertSelectQuery->cteList = NIL;
}
}
/*
* CreateCombineQueryForRouterPlan is used for creating a dummy combineQuery
* for a router plan, since router plans normally don't have one.
@ -1444,38 +1487,10 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
return distributedPlan;
}
PrepareInsertSelectForCitusPlanner(insertSelectQuery);
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
Query *selectQuery = selectRte->subquery;
bool hasSetOperations = false;
if (selectQuery->setOperations != NULL)
{
selectQuery = selectRte->subquery = WrapSubquery(selectQuery);
hasSetOperations = true;
}
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
/*
* Cast types of insert target list and select projection list to
* match the column types of the target relation.
*/
selectQuery->targetList =
AddInsertSelectCasts(insertSelectQuery->targetList,
selectQuery->targetList,
targetRelationId);
if (list_length(insertSelectQuery->cteList) > 0)
{
if (!hasSetOperations)
{
selectQuery = selectRte->subquery = WrapSubquery(selectQuery);
}
/* copy CTEs from the INSERT ... SELECT statement into outer SELECT */
selectQuery->cteList = copyObject(insertSelectQuery->cteList);
selectQuery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE;
insertSelectQuery->cteList = NIL;
}
/*
* Later we might need to call WrapTaskListForProjection(), which requires