Address feedback

pull/1288/head
Metin Doslu 2017-03-08 16:04:06 +02:00
parent 723494715a
commit 9fcf42d5cd
5 changed files with 36 additions and 56 deletions

View File

@ -33,7 +33,7 @@
/* /*
* Create separate set of scan methods for different executor types. * Define executor methods for the different executor types.
*/ */
static CustomExecMethods RealTimeCustomExecMethods = { static CustomExecMethods RealTimeCustomExecMethods = {
.CustomName = "RealTimeScan", .CustomName = "RealTimeScan",
@ -88,8 +88,7 @@ static Relation StubRelation(TupleDesc tupleDescriptor);
/* /*
* RealTimeCreateScan creates a custom scan node which sets callback functions * RealTimeCreateScan creates the scan state for real-time executor queries.
* for real-time executor.
*/ */
Node * Node *
RealTimeCreateScan(CustomScan *scan) RealTimeCreateScan(CustomScan *scan)
@ -107,8 +106,7 @@ RealTimeCreateScan(CustomScan *scan)
/* /*
* TaskTrackerCreateScan creates a custom scan node which sets callback functions * TaskTrackerCreateScan creates the scan state for task-tracker executor queries.
* for task-tracker executor.
*/ */
Node * Node *
TaskTrackerCreateScan(CustomScan *scan) TaskTrackerCreateScan(CustomScan *scan)
@ -126,8 +124,7 @@ TaskTrackerCreateScan(CustomScan *scan)
/* /*
* RouterCreateScan creates a custom scan node which sets callback functions * RouterCreateScan creates the scan state for router executor queries.
* for router executor depending on the router executor type.
*/ */
Node * Node *
RouterCreateScan(CustomScan *scan) RouterCreateScan(CustomScan *scan)
@ -136,6 +133,7 @@ RouterCreateScan(CustomScan *scan)
MultiPlan *multiPlan = NULL; MultiPlan *multiPlan = NULL;
Job *workerJob = NULL; Job *workerJob = NULL;
List *taskList = NIL; List *taskList = NIL;
bool isModificationQuery = false;
scanState->executorType = MULTI_EXECUTOR_ROUTER; scanState->executorType = MULTI_EXECUTOR_ROUTER;
scanState->customScanState.ss.ps.type = T_CustomScanState; scanState->customScanState.ss.ps.type = T_CustomScanState;
@ -145,10 +143,11 @@ RouterCreateScan(CustomScan *scan)
workerJob = multiPlan->workerJob; workerJob = multiPlan->workerJob;
taskList = workerJob->taskList; taskList = workerJob->taskList;
isModificationQuery = IsModifyMultiPlan(multiPlan);
/* check if this is a single shard query */ /* check if this is a single shard query */
if (list_length(taskList) == 1) if (list_length(taskList) == 1)
{ {
bool isModificationQuery = IsModifyMultiPlan(multiPlan);
if (isModificationQuery) if (isModificationQuery)
{ {
scanState->customScanState.methods = &RouterSingleModifyCustomExecMethods; scanState->customScanState.methods = &RouterSingleModifyCustomExecMethods;
@ -160,6 +159,7 @@ RouterCreateScan(CustomScan *scan)
} }
else else
{ {
Assert(isModificationQuery);
scanState->customScanState.methods = &RouterMultiModifyCustomExecMethods; scanState->customScanState.methods = &RouterMultiModifyCustomExecMethods;
} }
@ -168,8 +168,8 @@ RouterCreateScan(CustomScan *scan)
/* /*
* InvalidCreateScan is only called on an invalid case which we would like to * DelayedErrorCreateScan is only called on an invalid case which we would like
* error out. This is the case when a plan is not ready for execution because * to error out. This is the case when a plan is not ready for execution because
* CreateDistributedPlan() couldn't find a plan due to unresolved prepared * CreateDistributedPlan() couldn't find a plan due to unresolved prepared
* statement parameters, but didn't error out, because we expect custom plans * statement parameters, but didn't error out, because we expect custom plans
* to come to our rescue. But sql (not plpgsql) functions unfortunately don't * to come to our rescue. But sql (not plpgsql) functions unfortunately don't
@ -177,7 +177,7 @@ RouterCreateScan(CustomScan *scan)
* to do this check and provide a meaningfull error message. * to do this check and provide a meaningfull error message.
*/ */
Node * Node *
InvalidCreateScan(CustomScan *scan) DelayedErrorCreateScan(CustomScan *scan)
{ {
CitusScanState *scanState = palloc0(sizeof(CitusScanState)); CitusScanState *scanState = palloc0(sizeof(CitusScanState));
@ -195,29 +195,12 @@ InvalidCreateScan(CustomScan *scan)
/* /*
* CitusSelectBeginScan just checks if the given custom scan node is a proper * CitusSelectBeginScan is a placeholder function for BeginCustomScan callback.
* Citus scan node.
*/ */
void void
CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags) CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags)
{ {
ValidateCitusScanState(node); /* just a placeholder function */
}
/*
* ValidateCitusScanState checks if the given scan nodes contains a valid multi plan.
*/
void
ValidateCitusScanState(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
MultiPlan *multiPlan = scanState->multiPlan;
Assert(IsA(scanState, CustomScanState));
/* ensure plan is executable */
VerifyMultiPlanValidity(multiPlan);
} }
@ -246,7 +229,7 @@ RealTimeExecScan(CustomScanState *node)
scanState->finishedRemoteScan = true; scanState->finishedRemoteScan = true;
} }
resultSlot = ReadNextTuple(scanState); resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot; return resultSlot;
} }
@ -281,6 +264,7 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
{ {
CustomScanState customScanState = citusScanState->customScanState; CustomScanState customScanState = citusScanState->customScanState;
List *workerTaskList = workerJob->taskList; List *workerTaskList = workerJob->taskList;
List *copyOptions = NIL;
EState *executorState = NULL; EState *executorState = NULL;
MemoryContext executorTupleContext = NULL; MemoryContext executorTupleContext = NULL;
ExprContext *executorExpressionContext = NULL; ExprContext *executorExpressionContext = NULL;
@ -308,23 +292,22 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
citusScanState->tuplestorestate = citusScanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem); tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
if (BinaryMasterCopyFormat)
{
DefElem *copyOption = makeDefElem("format", (Node *) makeString("binary"));
copyOptions = lappend(copyOptions, copyOption);
}
foreach(workerTaskCell, workerTaskList) foreach(workerTaskCell, workerTaskList)
{ {
Task *workerTask = (Task *) lfirst(workerTaskCell); Task *workerTask = (Task *) lfirst(workerTaskCell);
StringInfo jobDirectoryName = NULL; StringInfo jobDirectoryName = NULL;
StringInfo taskFilename = NULL; StringInfo taskFilename = NULL;
List *copyOptions = NIL;
CopyState copyState = NULL; CopyState copyState = NULL;
jobDirectoryName = MasterJobDirectoryName(workerTask->jobId); jobDirectoryName = MasterJobDirectoryName(workerTask->jobId);
taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId); taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId);
if (BinaryMasterCopyFormat)
{
DefElem *copyOption = makeDefElem("format", (Node *) makeString("binary"));
copyOptions = lappend(copyOptions, copyOption);
}
copyState = BeginCopyFrom(stubRelation, taskFilename->data, false, NULL, copyState = BeginCopyFrom(stubRelation, taskFilename->data, false, NULL,
copyOptions); copyOptions);
@ -373,12 +356,12 @@ StubRelation(TupleDesc tupleDescriptor)
/* /*
* ReadNextTuple reads the next tuple from the tuple store of the given Citus * ReturnTupleFromTuplestore reads the next tuple from the tuple store of the
* scan node and returns it. It returns null if all tuples are read from the * given Citus scan node and returns it. It returns null if all tuples are read
* tuple store. * from the tuple store.
*/ */
TupleTableSlot * TupleTableSlot *
ReadNextTuple(CitusScanState *scanState) ReturnTupleFromTuplestore(CitusScanState *scanState)
{ {
Tuplestorestate *tupleStore = scanState->tuplestorestate; Tuplestorestate *tupleStore = scanState->tuplestorestate;
TupleTableSlot *resultSlot = NULL; TupleTableSlot *resultSlot = NULL;
@ -430,7 +413,7 @@ TaskTrackerExecScan(CustomScanState *node)
scanState->finishedRemoteScan = true; scanState->finishedRemoteScan = true;
} }
resultSlot = ReadNextTuple(scanState); resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot; return resultSlot;
} }

View File

@ -417,8 +417,6 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
Job *workerJob = multiPlan->workerJob; Job *workerJob = multiPlan->workerJob;
List *taskList = workerJob->taskList; List *taskList = workerJob->taskList;
ValidateCitusScanState(node);
/* /*
* If we are executing a prepared statement, then we may not yet have obtained * If we are executing a prepared statement, then we may not yet have obtained
* the metadata locks in this transaction. To prevent a concurrent shard copy, * the metadata locks in this transaction. To prevent a concurrent shard copy,
@ -458,7 +456,7 @@ RouterSingleModifyExecScan(CustomScanState *node)
scanState->finishedRemoteScan = true; scanState->finishedRemoteScan = true;
} }
resultSlot = ReadNextTuple(scanState); resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot; return resultSlot;
} }
@ -508,7 +506,7 @@ RouterMultiModifyExecScan(CustomScanState *node)
scanState->finishedRemoteScan = true; scanState->finishedRemoteScan = true;
} }
resultSlot = ReadNextTuple(scanState); resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot; return resultSlot;
} }
@ -539,7 +537,7 @@ RouterSelectExecScan(CustomScanState *node)
scanState->finishedRemoteScan = true; scanState->finishedRemoteScan = true;
} }
resultSlot = ReadNextTuple(scanState); resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot; return resultSlot;
} }

View File

@ -50,7 +50,7 @@ static CustomScanMethods RouterCustomScanMethods = {
static CustomScanMethods InvalidCustomScanMethods = { static CustomScanMethods InvalidCustomScanMethods = {
"Citus Invalid", "Citus Invalid",
InvalidCreateScan DelayedErrorCreateScan
}; };
@ -143,11 +143,11 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
/* /*
* IsModifyQuery returns true if the query performs modifications, false * IsModifyCommand returns true if the query performs modifications, false
* otherwise. * otherwise.
*/ */
bool bool
IsModifyQuery(Query *query) IsModifyCommand(Query *query)
{ {
CmdType commandType = query->commandType; CmdType commandType = query->commandType;
@ -218,7 +218,7 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
hasUnresolvedParams = true; hasUnresolvedParams = true;
} }
if (IsModifyQuery(query)) if (IsModifyCommand(query))
{ {
/* modifications are always routed through the same planner/executor */ /* modifications are always routed through the same planner/executor */
distributedPlan = CreateModifyPlan(originalQuery, query, restrictionContext); distributedPlan = CreateModifyPlan(originalQuery, query, restrictionContext);

View File

@ -38,7 +38,7 @@ typedef struct CitusScanState
extern Node * RealTimeCreateScan(CustomScan *scan); extern Node * RealTimeCreateScan(CustomScan *scan);
extern Node * TaskTrackerCreateScan(CustomScan *scan); extern Node * TaskTrackerCreateScan(CustomScan *scan);
extern Node * RouterCreateScan(CustomScan *scan); extern Node * RouterCreateScan(CustomScan *scan);
extern Node * InvalidCreateScan(CustomScan *scan); extern Node * DelayedErrorCreateScan(CustomScan *scan);
extern void CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags); extern void CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags);
extern TupleTableSlot * RealTimeExecScan(CustomScanState *node); extern TupleTableSlot * RealTimeExecScan(CustomScanState *node);
extern TupleTableSlot * TaskTrackerExecScan(CustomScanState *node); extern TupleTableSlot * TaskTrackerExecScan(CustomScanState *node);
@ -46,8 +46,7 @@ extern void CitusEndScan(CustomScanState *node);
extern void CitusReScan(CustomScanState *node); extern void CitusReScan(CustomScanState *node);
extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct
ExplainState *es); ExplainState *es);
extern void ValidateCitusScanState(CustomScanState *node); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
extern TupleTableSlot * ReadNextTuple(CitusScanState *scanState);
#endif /* MULTI_EXECUTOR_H */ #endif /* MULTI_EXECUTOR_H */

View File

@ -55,7 +55,7 @@ struct MultiPlan;
extern struct MultiPlan * GetMultiPlan(CustomScan *node); extern struct MultiPlan * GetMultiPlan(CustomScan *node);
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
Index index, RangeTblEntry *rte); Index index, RangeTblEntry *rte);
extern bool IsModifyQuery(Query *query); extern bool IsModifyCommand(Query *query);
extern bool IsModifyMultiPlan(struct MultiPlan *multiPlan); extern bool IsModifyMultiPlan(struct MultiPlan *multiPlan);
extern void VerifyMultiPlanValidity(struct MultiPlan *multiPlan); extern void VerifyMultiPlanValidity(struct MultiPlan *multiPlan);
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);