Address feedback

pull/1288/head
Metin Doslu 2017-03-07 15:34:44 +02:00
parent 5ac239801f
commit 723494715a
5 changed files with 66 additions and 61 deletions

View File

@ -83,8 +83,8 @@ static CustomExecMethods RouterSelectCustomExecMethods = {
/* local function forward declarations */
static void PrepareMasterJobDirectory(Job *workerJob);
static void LoadTuplesIntoTupleStore(CitusScanState *scanState, Job *workerJob);
static Relation FauxRelation(TupleDesc tupleDescriptor);
static void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
static Relation StubRelation(TupleDesc tupleDescriptor);
/*
@ -277,34 +277,36 @@ PrepareMasterJobDirectory(Job *workerJob)
* filled the tuplestores, but that's a fair bit of work.
*/
static void
LoadTuplesIntoTupleStore(CitusScanState *scanState, Job *workerJob)
LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
{
CustomScanState customScanState = scanState->customScanState;
CustomScanState customScanState = citusScanState->customScanState;
List *workerTaskList = workerJob->taskList;
EState *executorState = NULL;
MemoryContext executorTupleContext = NULL;
ExprContext *executorExpressionContext = NULL;
TupleDesc tupleDescriptor = NULL;
Relation fauxRelation = NULL;
Relation stubRelation = NULL;
ListCell *workerTaskCell = NULL;
uint32 columnCount = 0;
Datum *columnValues = NULL;
bool *columnNulls = NULL;
bool randomAccess = true;
bool interTransactions = false;
executorState = scanState->customScanState.ss.ps.state;
executorState = citusScanState->customScanState.ss.ps.state;
executorTupleContext = GetPerTupleMemoryContext(executorState);
executorExpressionContext = GetPerTupleExprContext(executorState);
tupleDescriptor = customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
fauxRelation = FauxRelation(tupleDescriptor);
stubRelation = StubRelation(tupleDescriptor);
columnCount = tupleDescriptor->natts;
columnValues = palloc0(columnCount * sizeof(Datum));
columnNulls = palloc0(columnCount * sizeof(bool));
Assert(scanState->tuplestorestate == NULL);
scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, false, work_mem);
Assert(citusScanState->tuplestorestate == NULL);
citusScanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
foreach(workerTaskCell, workerTaskList)
{
@ -323,7 +325,7 @@ LoadTuplesIntoTupleStore(CitusScanState *scanState, Job *workerJob)
copyOptions = lappend(copyOptions, copyOption);
}
copyState = BeginCopyFrom(fauxRelation, taskFilename->data, false, NULL,
copyState = BeginCopyFrom(stubRelation, taskFilename->data, false, NULL,
copyOptions);
while (true)
@ -342,7 +344,7 @@ LoadTuplesIntoTupleStore(CitusScanState *scanState, Job *workerJob)
break;
}
tuplestore_putvalues(scanState->tuplestorestate, tupleDescriptor,
tuplestore_putvalues(citusScanState->tuplestorestate, tupleDescriptor,
columnValues, columnNulls);
MemoryContextSwitchTo(oldContext);
}
@ -353,20 +355,20 @@ LoadTuplesIntoTupleStore(CitusScanState *scanState, Job *workerJob)
/*
* FauxRelation creates a faux Relation from the given tuple descriptor.
* StubRelation creates a stub Relation from the given tuple descriptor.
* To be able to use copy.c, we need a Relation descriptor. As there is no
* relation corresponding to the data loaded from workers, we need to fake one.
* We just need the bare minimal set of fields accessed by BeginCopyFrom().
*/
static Relation
FauxRelation(TupleDesc tupleDescriptor)
StubRelation(TupleDesc tupleDescriptor)
{
Relation fauxRelation = palloc0(sizeof(RelationData));
fauxRelation->rd_att = tupleDescriptor;
fauxRelation->rd_rel = palloc0(sizeof(FormData_pg_class));
fauxRelation->rd_rel->relkind = RELKIND_RELATION;
Relation stubRelation = palloc0(sizeof(RelationData));
stubRelation->rd_att = tupleDescriptor;
stubRelation->rd_rel = palloc0(sizeof(FormData_pg_class));
stubRelation->rd_rel->relkind = RELKIND_RELATION;
return fauxRelation;
return stubRelation;
}

View File

@ -496,10 +496,10 @@ RouterMultiModifyExecScan(CustomScanState *node)
if (!scanState->finishedRemoteScan)
{
MultiPlan *multiPlan = scanState->multiPlan;
bool isModificationQuery = IsModifyMultiPlan(multiPlan);
bool hasReturning = multiPlan->hasReturning;
Job *workerJob = multiPlan->workerJob;
List *taskList = workerJob->taskList;
bool hasReturning = multiPlan->hasReturning;
bool isModificationQuery = true;
ProcessMasterEvaluableFunctions(workerJob);
@ -515,7 +515,7 @@ RouterMultiModifyExecScan(CustomScanState *node)
/*
* RouterSelectExecScan executes a singler select task on the remote node,
* RouterSelectExecScan executes a single select task on the remote node,
* retrieves the results and stores them in custom scan's tuple store. Then, it
* returns tuples one by one from this tuple store.
*/
@ -1229,6 +1229,7 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
char **columnArray = (char **) palloc0(expectedColumnCount * sizeof(char *));
Tuplestorestate *tupleStore = NULL;
bool randomAccess = true;
bool interTransactions = false;
bool commandFailed = false;
MemoryContext ioContext = AllocSetContextCreate(CurrentMemoryContext,
"StoreQueryResult",
@ -1239,7 +1240,8 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
if (scanState->tuplestorestate == NULL)
{
scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, false, work_mem);
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
}
else if (!failOnError)
{

View File

@ -170,7 +170,7 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
* and limit plans on top of the scan statement if necessary.
*/
static PlannedStmt *
BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *dataScan)
BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *remoteScan)
{
PlannedStmt *selectStatement = NULL;
RangeTblEntry *customScanRangeTableEntry = NULL;
@ -195,7 +195,7 @@ BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *dat
columnNameList = lappend(columnNameList, makeString(targetEntry->resname));
}
customScanRangeTableEntry = CustomScanRangeTableEntry(columnNameList);
customScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList);
/* set the single element range table list */
selectStatement->rtable = list_make1(customScanRangeTableEntry);
@ -203,16 +203,16 @@ BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *dat
/* (2) add an aggregation plan if needed */
if (masterQuery->hasAggs || masterQuery->groupClause)
{
dataScan->scan.plan.targetlist = masterTargetList;
remoteScan->scan.plan.targetlist = masterTargetList;
aggregationPlan = BuildAggregatePlan(masterQuery, &dataScan->scan.plan);
aggregationPlan = BuildAggregatePlan(masterQuery, &remoteScan->scan.plan);
topLevelPlan = (Plan *) aggregationPlan;
}
else
{
/* otherwise set the final projections on the scan plan directly */
dataScan->scan.plan.targetlist = masterQuery->targetList;
topLevelPlan = &dataScan->scan.plan;
remoteScan->scan.plan.targetlist = masterQuery->targetList;
topLevelPlan = &remoteScan->scan.plan;
}
/* (3) add a sorting plan if needed */
@ -258,14 +258,15 @@ BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *dat
/*
* MasterNodeSelectPlan takes in a distributed plan, finds the master node query
* structure in that plan, and builds the final select plan to execute on the
* master node. Note that this select plan is executed after result files are
* retrieved from worker nodes and filled into the tuple store inside provided
* custom scan.
* MasterNodeSelectPlan takes in a distributed plan and a custom scan node which
* wraps remote part of the plan. This function finds the master node query
* structure in the multi plan, and builds the final select plan to execute on
* the tuples returned by remote scan on the master node. Note that this select
* plan is executed after result files are retrieved from worker nodes and
* filled into the tuple store inside provided custom scan.
*/
PlannedStmt *
MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *dataScan)
MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *remoteScan)
{
Query *masterQuery = multiPlan->masterQuery;
PlannedStmt *masterSelectPlan = NULL;
@ -274,7 +275,7 @@ MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *dataScan)
List *workerTargetList = workerJob->jobQuery->targetList;
List *masterTargetList = MasterTargetList(workerTargetList);
masterSelectPlan = BuildSelectStatement(masterQuery, masterTargetList, dataScan);
masterSelectPlan = BuildSelectStatement(masterQuery, masterTargetList, remoteScan);
return masterSelectPlan;
}

View File

@ -59,12 +59,11 @@ static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *origin
Query *query, ParamListInfo boundParams,
RelationRestrictionContext *restrictionContext);
static Node * SerializeMultiPlan(struct MultiPlan *multiPlan);
static MultiPlan * DeSerializeMultiPlan(Node *node);
static MultiPlan * DeserializeMultiPlan(Node *node);
static PlannedStmt * FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan);
static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan,
CustomScan *customScan);
static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan,
CustomScan *customScan);
static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
static void CheckNodeIsDumpable(Node *node);
static RelationRestrictionContext * CreateAndPushRestrictionContext(void);
static RelationRestrictionContext * CurrentRestrictionContext(void);
@ -340,10 +339,9 @@ GetMultiPlan(CustomScan *customScan)
{
MultiPlan *multiPlan = NULL;
Assert(IsA(customScan, CustomScan));
Assert(list_length(customScan->custom_private) == 1);
multiPlan = DeSerializeMultiPlan(linitial(customScan->custom_private));
multiPlan = DeserializeMultiPlan(linitial(customScan->custom_private));
return multiPlan;
}
@ -377,11 +375,11 @@ SerializeMultiPlan(MultiPlan *multiPlan)
/*
* DeSerializeMultiPlan returns the deserialized distributed plan from the string
* DeserializeMultiPlan returns the deserialized distributed plan from the string
* representation in a Const node.
*/
static MultiPlan *
DeSerializeMultiPlan(Node *node)
DeserializeMultiPlan(Node *node)
{
Const *multiPlanData = NULL;
char *serializedMultiPlan = NULL;
@ -454,7 +452,7 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan)
}
else
{
finalPlan = FinalizeRouterPlan(localPlan, multiPlan, customScan);
finalPlan = FinalizeRouterPlan(localPlan, customScan);
}
return finalPlan;
@ -481,19 +479,21 @@ FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan,
/*
* FinalizeRouterPlan get the distributed router executor plan and wraps it with
* a proper target list.
* FinalizeRouterPlan gets a CustomScan node which already wrapped distributed
* part of a router plan and sets it as the direct child of the router plan
* because we don't run any query on master node for router executable queries.
* Here, we also rebuild the column list to read from the remote scan.
*/
static PlannedStmt *
FinalizeRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, CustomScan *customScan)
FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan)
{
PlannedStmt *routerPlan = NULL;
RangeTblEntry *customScanRangeTableEntry = NULL;
RangeTblEntry *remoteScanRangeTableEntry = NULL;
ListCell *targetEntryCell = NULL;
List *targetList = NIL;
List *columnNameList = NIL;
/* we have only one range table entry */
/* we will have only one range table entry */
int customScanRangeTableIndex = 1;
/* build a targetlist to read from the custom scan output */
@ -516,7 +516,7 @@ FinalizeRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, CustomScan *cus
continue;
}
/* build target entry pointing to custom scan range table entry */
/* build target entry pointing to remote scan range table entry */
newVar = makeVarFromTargetEntry(customScanRangeTableIndex, targetEntry);
newTargetEntry = flatCopyTargetEntry(targetEntry);
newTargetEntry->expr = (Expr *) newVar;
@ -531,8 +531,8 @@ FinalizeRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, CustomScan *cus
routerPlan = makeNode(PlannedStmt);
routerPlan->planTree = (Plan *) customScan;
customScanRangeTableEntry = CustomScanRangeTableEntry(columnNameList);
routerPlan->rtable = list_make1(customScanRangeTableEntry);
remoteScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList);
routerPlan->rtable = list_make1(remoteScanRangeTableEntry);
routerPlan->canSetTag = true;
routerPlan->relationOids = NIL;
@ -547,21 +547,21 @@ FinalizeRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, CustomScan *cus
/*
* CustomScanRangeTableEntry creates a range table entry from given column name
* list to represent a custom scan.
* RemoteScanRangeTableEntry creates a range table entry from given column name
* list to represent a remote scan.
*/
RangeTblEntry *
CustomScanRangeTableEntry(List *columnNameList)
RemoteScanRangeTableEntry(List *columnNameList)
{
RangeTblEntry *customScanRangeTableEntry = makeNode(RangeTblEntry);
RangeTblEntry *remoteScanRangeTableEntry = makeNode(RangeTblEntry);
/* we use RTE_VALUES for custom scan because we can't look up relation */
customScanRangeTableEntry->rtekind = RTE_VALUES;
customScanRangeTableEntry->eref = makeAlias("remote_scan", columnNameList);
customScanRangeTableEntry->inh = false;
customScanRangeTableEntry->inFromCl = true;
remoteScanRangeTableEntry->rtekind = RTE_VALUES;
remoteScanRangeTableEntry->eref = makeAlias("remote_scan", columnNameList);
remoteScanRangeTableEntry->inh = false;
remoteScanRangeTableEntry->inFromCl = true;
return customScanRangeTableEntry;
return remoteScanRangeTableEntry;
}

View File

@ -58,7 +58,7 @@ extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOp
extern bool IsModifyQuery(Query *query);
extern bool IsModifyMultiPlan(struct MultiPlan *multiPlan);
extern void VerifyMultiPlanValidity(struct MultiPlan *multiPlan);
extern RangeTblEntry * CustomScanRangeTableEntry(List *columnNameList);
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);
#endif /* MULTI_PLANNER_H */