refactor distributed_planner.c

pull/3701/head
Onur Tirtir 2020-04-02 17:58:23 +03:00
parent 13a35c6813
commit abdabbedb2
1 changed files with 20 additions and 21 deletions

View File

@ -131,26 +131,18 @@ static bool UpdateReferenceTablesWithShard(Node *node, void *context);
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
Node *distributionKeyValue); Node *distributionKeyValue);
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
List *rangeTableList, int rteIdCounter); int rteIdCounter);
/* Distributed planner hook */ /* Distributed planner hook */
PlannedStmt * PlannedStmt *
distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
{ {
PlannedStmt *result = NULL;
bool needsDistributedPlanning = false; bool needsDistributedPlanning = false;
bool setPartitionedTablesInherited = false;
List *rangeTableList = ExtractRangeTableEntryList(parse);
int rteIdCounter = 1;
bool fastPathRouterQuery = false; bool fastPathRouterQuery = false;
Node *distributionKeyValue = NULL; Node *distributionKeyValue = NULL;
DistributedPlanningContext planContext = {
.query = parse,
.cursorOptions = cursorOptions,
.boundParams = boundParams,
};
List *rangeTableList = ExtractRangeTableEntryList(parse);
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED) if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
{ {
@ -181,6 +173,14 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
} }
} }
int rteIdCounter = 1;
DistributedPlanningContext planContext = {
.query = parse,
.cursorOptions = cursorOptions,
.boundParams = boundParams,
};
if (fastPathRouterQuery) if (fastPathRouterQuery)
{ {
/* /*
@ -217,7 +217,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
planContext.originalQuery = copyObject(parse); planContext.originalQuery = copyObject(parse);
setPartitionedTablesInherited = false; bool setPartitionedTablesInherited = false;
AdjustPartitioningForDistributedPlanning(rangeTableList, AdjustPartitioningForDistributedPlanning(rangeTableList,
setPartitionedTablesInherited); setPartitionedTablesInherited);
} }
@ -239,6 +239,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
*/ */
PlannerLevel++; PlannerLevel++;
PlannedStmt *result = NULL;
PG_TRY(); PG_TRY();
{ {
@ -258,7 +259,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
planContext.boundParams); planContext.boundParams);
if (needsDistributedPlanning) if (needsDistributedPlanning)
{ {
result = PlanDistributedStmt(&planContext, rangeTableList, rteIdCounter); result = PlanDistributedStmt(&planContext, rteIdCounter);
} }
else if ((result = TryToDelegateFunctionCall(&planContext)) == NULL) else if ((result = TryToDelegateFunctionCall(&planContext)) == NULL)
{ {
@ -309,11 +310,11 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
List * List *
ExtractRangeTableEntryList(Query *query) ExtractRangeTableEntryList(Query *query)
{ {
List *rangeTblList = NIL; List *rteList = NIL;
ExtractRangeTableEntryWalker((Node *) query, &rangeTblList); ExtractRangeTableEntryWalker((Node *) query, &rteList);
return rangeTblList; return rteList;
} }
@ -328,21 +329,20 @@ ExtractRangeTableEntryList(Query *query)
bool bool
NeedsDistributedPlanning(Query *query) NeedsDistributedPlanning(Query *query)
{ {
List *allRTEs = NIL;
CmdType commandType = query->commandType;
if (!CitusHasBeenLoaded()) if (!CitusHasBeenLoaded())
{ {
return false; return false;
} }
CmdType commandType = query->commandType;
if (commandType != CMD_SELECT && commandType != CMD_INSERT && if (commandType != CMD_SELECT && commandType != CMD_INSERT &&
commandType != CMD_UPDATE && commandType != CMD_DELETE) commandType != CMD_UPDATE && commandType != CMD_DELETE)
{ {
return false; return false;
} }
ExtractRangeTableEntryWalker((Node *) query, &allRTEs); List *allRTEs = ExtractRangeTableEntryList(query);
return ListContainsDistributedTableRTE(allRTEs); return ListContainsDistributedTableRTE(allRTEs);
} }
@ -594,11 +594,10 @@ PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
*/ */
static PlannedStmt * static PlannedStmt *
PlanDistributedStmt(DistributedPlanningContext *planContext, PlanDistributedStmt(DistributedPlanningContext *planContext,
List *rangeTableList,
int rteIdCounter) int rteIdCounter)
{ {
/* may've inlined new relation rtes */ /* may've inlined new relation rtes */
rangeTableList = ExtractRangeTableEntryList(planContext->query); List *rangeTableList = ExtractRangeTableEntryList(planContext->query);
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter); rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);