mirror of https://github.com/citusdata/citus.git
Make multi_master_planner.c coding convention compliant
Changed order of function definitions and added declarations in the beginning of the filepull/1701/head
parent
f7ab901766
commit
4832abc7cb
|
@ -30,6 +30,37 @@
|
|||
#include "utils/syscache.h"
|
||||
|
||||
|
||||
static List * MasterTargetList(List *workerTargetList);
|
||||
static PlannedStmt * BuildSelectStatement(Query *masterQuery, List *masterTargetList,
|
||||
CustomScan *remoteScan);
|
||||
static Agg * BuildAggregatePlan(Query *masterQuery, Plan *subPlan);
|
||||
static Plan * BuildDistinctPlan(Query *masterQuery, Plan *subPlan);
|
||||
|
||||
|
||||
/*
|
||||
* MasterNodeSelectPlan takes in a distributed plan and a custom scan node which
|
||||
* wraps remote part of the plan. This function finds the master node query
|
||||
* structure in the multi plan, and builds the final select plan to execute on
|
||||
* the tuples returned by remote scan on the master node. Note that this select
|
||||
* plan is executed after result files are retrieved from worker nodes and
|
||||
* filled into the tuple store inside provided custom scan.
|
||||
*/
|
||||
PlannedStmt *
|
||||
MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *remoteScan)
|
||||
{
|
||||
Query *masterQuery = multiPlan->masterQuery;
|
||||
PlannedStmt *masterSelectPlan = NULL;
|
||||
|
||||
Job *workerJob = multiPlan->workerJob;
|
||||
List *workerTargetList = workerJob->jobQuery->targetList;
|
||||
List *masterTargetList = MasterTargetList(workerTargetList);
|
||||
|
||||
masterSelectPlan = BuildSelectStatement(masterQuery, masterTargetList, remoteScan);
|
||||
|
||||
return masterSelectPlan;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MasterTargetList uses the given worker target list's expressions, and creates
|
||||
* a target target list for the master node. This master target list keeps the
|
||||
|
@ -68,6 +99,139 @@ MasterTargetList(List *workerTargetList)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* BuildSelectStatement builds the final select statement to run on the master
|
||||
* node, before returning results to the user. The function first gets the custom
|
||||
* scan node for all results fetched to the master, and layers aggregation, sort
|
||||
* and limit plans on top of the scan statement if necessary.
|
||||
*/
|
||||
static PlannedStmt *
|
||||
BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *remoteScan)
|
||||
{
|
||||
PlannedStmt *selectStatement = NULL;
|
||||
RangeTblEntry *customScanRangeTableEntry = NULL;
|
||||
Agg *aggregationPlan = NULL;
|
||||
Plan *topLevelPlan = NULL;
|
||||
ListCell *targetEntryCell = NULL;
|
||||
List *columnNameList = NULL;
|
||||
List *sortClauseList = copyObject(masterQuery->sortClause);
|
||||
|
||||
/* (1) make PlannedStmt and set basic information */
|
||||
selectStatement = makeNode(PlannedStmt);
|
||||
selectStatement->canSetTag = true;
|
||||
selectStatement->relationOids = NIL;
|
||||
selectStatement->commandType = CMD_SELECT;
|
||||
|
||||
/* top level select query should have only one range table entry */
|
||||
Assert(list_length(masterQuery->rtable) == 1);
|
||||
|
||||
/* compute column names for the custom range table entry */
|
||||
foreach(targetEntryCell, masterTargetList)
|
||||
{
|
||||
TargetEntry *targetEntry = lfirst(targetEntryCell);
|
||||
columnNameList = lappend(columnNameList, makeString(targetEntry->resname));
|
||||
}
|
||||
|
||||
customScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList);
|
||||
|
||||
/* set the single element range table list */
|
||||
selectStatement->rtable = list_make1(customScanRangeTableEntry);
|
||||
|
||||
/* (2) add an aggregation plan if needed */
|
||||
if (masterQuery->hasAggs || masterQuery->groupClause)
|
||||
{
|
||||
remoteScan->scan.plan.targetlist = masterTargetList;
|
||||
|
||||
aggregationPlan = BuildAggregatePlan(masterQuery, &remoteScan->scan.plan);
|
||||
topLevelPlan = (Plan *) aggregationPlan;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* otherwise set the final projections on the scan plan directly */
|
||||
remoteScan->scan.plan.targetlist = masterQuery->targetList;
|
||||
topLevelPlan = &remoteScan->scan.plan;
|
||||
}
|
||||
|
||||
/*
|
||||
* (3) create distinct plan if needed.
|
||||
*
|
||||
* distinct on() requires sort + unique plans. Unique itself is not enough
|
||||
* as it only compares the current value with previous one when checking
|
||||
* uniqueness, thus ordering is necessary. If already has order by
|
||||
* clause we append distinct clauses to the end of it. Postgresql requires
|
||||
* that if both distinct on() and order by exists, ordering shall start
|
||||
* on distinct clauses. Therefore we can safely append distinct clauses to
|
||||
* the end of order by clauses. Although the same column may appear more
|
||||
* than once in order by clauses, created plan uses only one instance, for
|
||||
* example order by a,b,a,a,b,c is translated to equivalent order by a,b,c.
|
||||
*
|
||||
* If the query has distinct clause but not distinct on, we first create
|
||||
* distinct plan that is either HashAggreate or Sort + Unique plans depending
|
||||
* on hashable property of columns in distinct clause. If there is order by
|
||||
* clause, it is handled after distinct planning.
|
||||
*/
|
||||
if (masterQuery->hasDistinctOn)
|
||||
{
|
||||
ListCell *distinctCell = NULL;
|
||||
foreach(distinctCell, masterQuery->distinctClause)
|
||||
{
|
||||
SortGroupClause *singleDistinctClause = lfirst(distinctCell);
|
||||
Index sortGroupRef = singleDistinctClause->tleSortGroupRef;
|
||||
|
||||
if (get_sortgroupref_clause_noerr(sortGroupRef, sortClauseList) == NULL)
|
||||
{
|
||||
sortClauseList = lappend(sortClauseList, singleDistinctClause);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (masterQuery->distinctClause)
|
||||
{
|
||||
Plan *distinctPlan = BuildDistinctPlan(masterQuery, topLevelPlan);
|
||||
topLevelPlan = distinctPlan;
|
||||
}
|
||||
|
||||
/* (4) add a sorting plan if needed */
|
||||
if (sortClauseList)
|
||||
{
|
||||
Sort *sortPlan = make_sort_from_sortclauses(sortClauseList, topLevelPlan);
|
||||
|
||||
/* just for reproducible costs between different PostgreSQL versions */
|
||||
sortPlan->plan.startup_cost = 0;
|
||||
sortPlan->plan.total_cost = 0;
|
||||
sortPlan->plan.plan_rows = 0;
|
||||
|
||||
topLevelPlan = (Plan *) sortPlan;
|
||||
}
|
||||
|
||||
/*
|
||||
* (5) add a unique plan for distinctOn.
|
||||
* If the query has distinct on we add a sort clause in step 3. Therefore
|
||||
* Step 4 always creates a sort plan.
|
||||
* */
|
||||
if (masterQuery->hasDistinctOn)
|
||||
{
|
||||
Assert(IsA(topLevelPlan, Sort));
|
||||
topLevelPlan =
|
||||
(Plan *) make_unique_from_sortclauses(topLevelPlan,
|
||||
masterQuery->distinctClause);
|
||||
}
|
||||
|
||||
/* (5) add a limit plan if needed */
|
||||
if (masterQuery->limitCount || masterQuery->limitOffset)
|
||||
{
|
||||
Node *limitCount = masterQuery->limitCount;
|
||||
Node *limitOffset = masterQuery->limitOffset;
|
||||
Limit *limitPlan = make_limit(topLevelPlan, limitOffset, limitCount);
|
||||
topLevelPlan = (Plan *) limitPlan;
|
||||
}
|
||||
|
||||
/* (6) finally set our top level plan in the plan tree */
|
||||
selectStatement->planTree = topLevelPlan;
|
||||
|
||||
return selectStatement;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* BuildAggregatePlan creates and returns an aggregate plan. This aggregate plan
|
||||
* builds aggreation and grouping operators (if any) that are to be executed on
|
||||
|
@ -216,160 +380,3 @@ BuildDistinctPlan(Query *masterQuery, Plan *subPlan)
|
|||
|
||||
return distinctPlan;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* BuildSelectStatement builds the final select statement to run on the master
|
||||
* node, before returning results to the user. The function first gets the custom
|
||||
* scan node for all results fetched to the master, and layers aggregation, sort
|
||||
* and limit plans on top of the scan statement if necessary.
|
||||
*/
|
||||
static PlannedStmt *
|
||||
BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *remoteScan)
|
||||
{
|
||||
PlannedStmt *selectStatement = NULL;
|
||||
RangeTblEntry *customScanRangeTableEntry = NULL;
|
||||
Agg *aggregationPlan = NULL;
|
||||
Plan *topLevelPlan = NULL;
|
||||
ListCell *targetEntryCell = NULL;
|
||||
List *columnNameList = NULL;
|
||||
List *sortClauseList = copyObject(masterQuery->sortClause);
|
||||
|
||||
/* (1) make PlannedStmt and set basic information */
|
||||
selectStatement = makeNode(PlannedStmt);
|
||||
selectStatement->canSetTag = true;
|
||||
selectStatement->relationOids = NIL;
|
||||
selectStatement->commandType = CMD_SELECT;
|
||||
|
||||
/* top level select query should have only one range table entry */
|
||||
Assert(list_length(masterQuery->rtable) == 1);
|
||||
|
||||
/* compute column names for the custom range table entry */
|
||||
foreach(targetEntryCell, masterTargetList)
|
||||
{
|
||||
TargetEntry *targetEntry = lfirst(targetEntryCell);
|
||||
columnNameList = lappend(columnNameList, makeString(targetEntry->resname));
|
||||
}
|
||||
|
||||
customScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList);
|
||||
|
||||
/* set the single element range table list */
|
||||
selectStatement->rtable = list_make1(customScanRangeTableEntry);
|
||||
|
||||
/* (2) add an aggregation plan if needed */
|
||||
if (masterQuery->hasAggs || masterQuery->groupClause)
|
||||
{
|
||||
remoteScan->scan.plan.targetlist = masterTargetList;
|
||||
|
||||
aggregationPlan = BuildAggregatePlan(masterQuery, &remoteScan->scan.plan);
|
||||
topLevelPlan = (Plan *) aggregationPlan;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* otherwise set the final projections on the scan plan directly */
|
||||
remoteScan->scan.plan.targetlist = masterQuery->targetList;
|
||||
topLevelPlan = &remoteScan->scan.plan;
|
||||
}
|
||||
|
||||
/*
|
||||
* (3) create distinct plan if needed.
|
||||
*
|
||||
* distinct on() requires sort + unique plans. Unique itself is not enough
|
||||
* as it only compares the current value with previous one when checking
|
||||
* uniqueness, thus ordering is necessary. If already has order by
|
||||
* clause we append distinct clauses to the end of it. Postgresql requires
|
||||
* that if both distinct on() and order by exists, ordering shall start
|
||||
* on distinct clauses. Therefore we can safely append distinct clauses to
|
||||
* the end of order by clauses. Although the same column may appear more
|
||||
* than once in order by clauses, created plan uses only one instance, for
|
||||
* example order by a,b,a,a,b,c is translated to equivalent order by a,b,c.
|
||||
*
|
||||
* If the query has distinct clause but not distinct on, we first create
|
||||
* distinct plan that is either HashAggreate or Sort + Unique plans depending
|
||||
* on hashable property of columns in distinct clause. If there is order by
|
||||
* clause, it is handled after distinct planning.
|
||||
*/
|
||||
if (masterQuery->hasDistinctOn)
|
||||
{
|
||||
ListCell *distinctCell = NULL;
|
||||
foreach(distinctCell, masterQuery->distinctClause)
|
||||
{
|
||||
SortGroupClause *singleDistinctClause = lfirst(distinctCell);
|
||||
Index sortGroupRef = singleDistinctClause->tleSortGroupRef;
|
||||
|
||||
if (get_sortgroupref_clause_noerr(sortGroupRef, sortClauseList) == NULL)
|
||||
{
|
||||
sortClauseList = lappend(sortClauseList, singleDistinctClause);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (masterQuery->distinctClause)
|
||||
{
|
||||
Plan *distinctPlan = BuildDistinctPlan(masterQuery, topLevelPlan);
|
||||
topLevelPlan = distinctPlan;
|
||||
}
|
||||
|
||||
/* (4) add a sorting plan if needed */
|
||||
if (sortClauseList)
|
||||
{
|
||||
Sort *sortPlan = make_sort_from_sortclauses(sortClauseList, topLevelPlan);
|
||||
|
||||
/* just for reproducible costs between different PostgreSQL versions */
|
||||
sortPlan->plan.startup_cost = 0;
|
||||
sortPlan->plan.total_cost = 0;
|
||||
sortPlan->plan.plan_rows = 0;
|
||||
|
||||
topLevelPlan = (Plan *) sortPlan;
|
||||
}
|
||||
|
||||
/*
|
||||
* (5) add a unique plan for distinctOn.
|
||||
* If the query has distinct on we add a sort clause in step 3. Therefore
|
||||
* Step 4 always creates a sort plan.
|
||||
* */
|
||||
if (masterQuery->hasDistinctOn)
|
||||
{
|
||||
Assert(IsA(topLevelPlan, Sort));
|
||||
topLevelPlan =
|
||||
(Plan *) make_unique_from_sortclauses(topLevelPlan,
|
||||
masterQuery->distinctClause);
|
||||
}
|
||||
|
||||
/* (5) add a limit plan if needed */
|
||||
if (masterQuery->limitCount || masterQuery->limitOffset)
|
||||
{
|
||||
Node *limitCount = masterQuery->limitCount;
|
||||
Node *limitOffset = masterQuery->limitOffset;
|
||||
Limit *limitPlan = make_limit(topLevelPlan, limitOffset, limitCount);
|
||||
topLevelPlan = (Plan *) limitPlan;
|
||||
}
|
||||
|
||||
/* (6) finally set our top level plan in the plan tree */
|
||||
selectStatement->planTree = topLevelPlan;
|
||||
|
||||
return selectStatement;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MasterNodeSelectPlan takes in a distributed plan and a custom scan node which
|
||||
* wraps remote part of the plan. This function finds the master node query
|
||||
* structure in the multi plan, and builds the final select plan to execute on
|
||||
* the tuples returned by remote scan on the master node. Note that this select
|
||||
* plan is executed after result files are retrieved from worker nodes and
|
||||
* filled into the tuple store inside provided custom scan.
|
||||
*/
|
||||
PlannedStmt *
|
||||
MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *remoteScan)
|
||||
{
|
||||
Query *masterQuery = multiPlan->masterQuery;
|
||||
PlannedStmt *masterSelectPlan = NULL;
|
||||
|
||||
Job *workerJob = multiPlan->workerJob;
|
||||
List *workerTargetList = workerJob->jobQuery->targetList;
|
||||
List *masterTargetList = MasterTargetList(workerTargetList);
|
||||
|
||||
masterSelectPlan = BuildSelectStatement(masterQuery, masterTargetList, remoteScan);
|
||||
|
||||
return masterSelectPlan;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue