diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index c99dd3914..b5a94efa7 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -33,7 +33,7 @@ /* - * Create separate set of scan methods for different executor types. + * Define executor methods for the different executor types. */ static CustomExecMethods RealTimeCustomExecMethods = { .CustomName = "RealTimeScan", @@ -88,8 +88,7 @@ static Relation StubRelation(TupleDesc tupleDescriptor); /* - * RealTimeCreateScan creates a custom scan node which sets callback functions - * for real-time executor. + * RealTimeCreateScan creates the scan state for real-time executor queries. */ Node * RealTimeCreateScan(CustomScan *scan) @@ -107,8 +106,7 @@ RealTimeCreateScan(CustomScan *scan) /* - * TaskTrackerCreateScan creates a custom scan node which sets callback functions - * for task-tracker executor. + * TaskTrackerCreateScan creates the scan state for task-tracker executor queries. */ Node * TaskTrackerCreateScan(CustomScan *scan) @@ -126,8 +124,7 @@ TaskTrackerCreateScan(CustomScan *scan) /* - * RouterCreateScan creates a custom scan node which sets callback functions - * for router executor depending on the router executor type. + * RouterCreateScan creates the scan state for router executor queries. */ Node * RouterCreateScan(CustomScan *scan) @@ -136,6 +133,7 @@ RouterCreateScan(CustomScan *scan) MultiPlan *multiPlan = NULL; Job *workerJob = NULL; List *taskList = NIL; + bool isModificationQuery = false; scanState->executorType = MULTI_EXECUTOR_ROUTER; scanState->customScanState.ss.ps.type = T_CustomScanState; @@ -145,10 +143,11 @@ RouterCreateScan(CustomScan *scan) workerJob = multiPlan->workerJob; taskList = workerJob->taskList; + isModificationQuery = IsModifyMultiPlan(multiPlan); + /* check if this is a single shard query */ if (list_length(taskList) == 1) { - bool isModificationQuery = IsModifyMultiPlan(multiPlan); if (isModificationQuery) { scanState->customScanState.methods = &RouterSingleModifyCustomExecMethods; @@ -160,6 +159,7 @@ RouterCreateScan(CustomScan *scan) } else { + Assert(isModificationQuery); scanState->customScanState.methods = &RouterMultiModifyCustomExecMethods; } @@ -168,8 +168,8 @@ RouterCreateScan(CustomScan *scan) /* - * InvalidCreateScan is only called on an invalid case which we would like to - * error out. This is the case when a plan is not ready for execution because + * DelayedErrorCreateScan is only called on an invalid case which we would like + * to error out. This is the case when a plan is not ready for execution because * CreateDistributedPlan() couldn't find a plan due to unresolved prepared * statement parameters, but didn't error out, because we expect custom plans * to come to our rescue. But sql (not plpgsql) functions unfortunately don't @@ -177,7 +177,7 @@ RouterCreateScan(CustomScan *scan) * to do this check and provide a meaningfull error message. */ Node * -InvalidCreateScan(CustomScan *scan) +DelayedErrorCreateScan(CustomScan *scan) { CitusScanState *scanState = palloc0(sizeof(CitusScanState)); @@ -195,29 +195,12 @@ InvalidCreateScan(CustomScan *scan) /* - * CitusSelectBeginScan just checks if the given custom scan node is a proper - * Citus scan node. + * CitusSelectBeginScan is a placeholder function for BeginCustomScan callback. */ void CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags) { - ValidateCitusScanState(node); -} - - -/* - * ValidateCitusScanState checks if the given scan nodes contains a valid multi plan. - */ -void -ValidateCitusScanState(CustomScanState *node) -{ - CitusScanState *scanState = (CitusScanState *) node; - MultiPlan *multiPlan = scanState->multiPlan; - - Assert(IsA(scanState, CustomScanState)); - - /* ensure plan is executable */ - VerifyMultiPlanValidity(multiPlan); + /* just a placeholder function */ } @@ -246,7 +229,7 @@ RealTimeExecScan(CustomScanState *node) scanState->finishedRemoteScan = true; } - resultSlot = ReadNextTuple(scanState); + resultSlot = ReturnTupleFromTuplestore(scanState); return resultSlot; } @@ -281,6 +264,7 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) { CustomScanState customScanState = citusScanState->customScanState; List *workerTaskList = workerJob->taskList; + List *copyOptions = NIL; EState *executorState = NULL; MemoryContext executorTupleContext = NULL; ExprContext *executorExpressionContext = NULL; @@ -308,23 +292,22 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) citusScanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); + if (BinaryMasterCopyFormat) + { + DefElem *copyOption = makeDefElem("format", (Node *) makeString("binary")); + copyOptions = lappend(copyOptions, copyOption); + } + foreach(workerTaskCell, workerTaskList) { Task *workerTask = (Task *) lfirst(workerTaskCell); StringInfo jobDirectoryName = NULL; StringInfo taskFilename = NULL; - List *copyOptions = NIL; CopyState copyState = NULL; jobDirectoryName = MasterJobDirectoryName(workerTask->jobId); taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId); - if (BinaryMasterCopyFormat) - { - DefElem *copyOption = makeDefElem("format", (Node *) makeString("binary")); - copyOptions = lappend(copyOptions, copyOption); - } - copyState = BeginCopyFrom(stubRelation, taskFilename->data, false, NULL, copyOptions); @@ -373,12 +356,12 @@ StubRelation(TupleDesc tupleDescriptor) /* - * ReadNextTuple reads the next tuple from the tuple store of the given Citus - * scan node and returns it. It returns null if all tuples are read from the - * tuple store. + * ReturnTupleFromTuplestore reads the next tuple from the tuple store of the + * given Citus scan node and returns it. It returns null if all tuples are read + * from the tuple store. */ TupleTableSlot * -ReadNextTuple(CitusScanState *scanState) +ReturnTupleFromTuplestore(CitusScanState *scanState) { Tuplestorestate *tupleStore = scanState->tuplestorestate; TupleTableSlot *resultSlot = NULL; @@ -430,7 +413,7 @@ TaskTrackerExecScan(CustomScanState *node) scanState->finishedRemoteScan = true; } - resultSlot = ReadNextTuple(scanState); + resultSlot = ReturnTupleFromTuplestore(scanState); return resultSlot; } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 7e0ba55e2..9a26090f8 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -417,8 +417,6 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; - ValidateCitusScanState(node); - /* * If we are executing a prepared statement, then we may not yet have obtained * the metadata locks in this transaction. To prevent a concurrent shard copy, @@ -458,7 +456,7 @@ RouterSingleModifyExecScan(CustomScanState *node) scanState->finishedRemoteScan = true; } - resultSlot = ReadNextTuple(scanState); + resultSlot = ReturnTupleFromTuplestore(scanState); return resultSlot; } @@ -508,7 +506,7 @@ RouterMultiModifyExecScan(CustomScanState *node) scanState->finishedRemoteScan = true; } - resultSlot = ReadNextTuple(scanState); + resultSlot = ReturnTupleFromTuplestore(scanState); return resultSlot; } @@ -539,7 +537,7 @@ RouterSelectExecScan(CustomScanState *node) scanState->finishedRemoteScan = true; } - resultSlot = ReadNextTuple(scanState); + resultSlot = ReturnTupleFromTuplestore(scanState); return resultSlot; } diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 6a9d29c38..172978b40 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -50,7 +50,7 @@ static CustomScanMethods RouterCustomScanMethods = { static CustomScanMethods InvalidCustomScanMethods = { "Citus Invalid", - InvalidCreateScan + DelayedErrorCreateScan }; @@ -143,11 +143,11 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) /* - * IsModifyQuery returns true if the query performs modifications, false + * IsModifyCommand returns true if the query performs modifications, false * otherwise. */ bool -IsModifyQuery(Query *query) +IsModifyCommand(Query *query) { CmdType commandType = query->commandType; @@ -218,7 +218,7 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query hasUnresolvedParams = true; } - if (IsModifyQuery(query)) + if (IsModifyCommand(query)) { /* modifications are always routed through the same planner/executor */ distributedPlan = CreateModifyPlan(originalQuery, query, restrictionContext); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index ea048dace..e3b53a327 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -38,7 +38,7 @@ typedef struct CitusScanState extern Node * RealTimeCreateScan(CustomScan *scan); extern Node * TaskTrackerCreateScan(CustomScan *scan); extern Node * RouterCreateScan(CustomScan *scan); -extern Node * InvalidCreateScan(CustomScan *scan); +extern Node * DelayedErrorCreateScan(CustomScan *scan); extern void CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags); extern TupleTableSlot * RealTimeExecScan(CustomScanState *node); extern TupleTableSlot * TaskTrackerExecScan(CustomScanState *node); @@ -46,8 +46,7 @@ extern void CitusEndScan(CustomScanState *node); extern void CitusReScan(CustomScanState *node); extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es); -extern void ValidateCitusScanState(CustomScanState *node); -extern TupleTableSlot * ReadNextTuple(CitusScanState *scanState); +extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); #endif /* MULTI_EXECUTOR_H */ diff --git a/src/include/distributed/multi_planner.h b/src/include/distributed/multi_planner.h index e21fe3b39..ed9dae4ea 100644 --- a/src/include/distributed/multi_planner.h +++ b/src/include/distributed/multi_planner.h @@ -55,7 +55,7 @@ struct MultiPlan; extern struct MultiPlan * GetMultiPlan(CustomScan *node); extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index index, RangeTblEntry *rte); -extern bool IsModifyQuery(Query *query); +extern bool IsModifyCommand(Query *query); extern bool IsModifyMultiPlan(struct MultiPlan *multiPlan); extern void VerifyMultiPlanValidity(struct MultiPlan *multiPlan); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);