diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 3e48fe849..c99dd3914 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -83,8 +83,8 @@ static CustomExecMethods RouterSelectCustomExecMethods = { /* local function forward declarations */ static void PrepareMasterJobDirectory(Job *workerJob); -static void LoadTuplesIntoTupleStore(CitusScanState *scanState, Job *workerJob); -static Relation FauxRelation(TupleDesc tupleDescriptor); +static void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); +static Relation StubRelation(TupleDesc tupleDescriptor); /* @@ -277,34 +277,36 @@ PrepareMasterJobDirectory(Job *workerJob) * filled the tuplestores, but that's a fair bit of work. */ static void -LoadTuplesIntoTupleStore(CitusScanState *scanState, Job *workerJob) +LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) { - CustomScanState customScanState = scanState->customScanState; + CustomScanState customScanState = citusScanState->customScanState; List *workerTaskList = workerJob->taskList; EState *executorState = NULL; MemoryContext executorTupleContext = NULL; ExprContext *executorExpressionContext = NULL; TupleDesc tupleDescriptor = NULL; - Relation fauxRelation = NULL; + Relation stubRelation = NULL; ListCell *workerTaskCell = NULL; uint32 columnCount = 0; Datum *columnValues = NULL; bool *columnNulls = NULL; bool randomAccess = true; + bool interTransactions = false; - executorState = scanState->customScanState.ss.ps.state; + executorState = citusScanState->customScanState.ss.ps.state; executorTupleContext = GetPerTupleMemoryContext(executorState); executorExpressionContext = GetPerTupleExprContext(executorState); tupleDescriptor = customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; - fauxRelation = FauxRelation(tupleDescriptor); + stubRelation = StubRelation(tupleDescriptor); columnCount = tupleDescriptor->natts; columnValues = palloc0(columnCount * sizeof(Datum)); columnNulls = palloc0(columnCount * sizeof(bool)); - Assert(scanState->tuplestorestate == NULL); - scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, false, work_mem); + Assert(citusScanState->tuplestorestate == NULL); + citusScanState->tuplestorestate = + tuplestore_begin_heap(randomAccess, interTransactions, work_mem); foreach(workerTaskCell, workerTaskList) { @@ -323,7 +325,7 @@ LoadTuplesIntoTupleStore(CitusScanState *scanState, Job *workerJob) copyOptions = lappend(copyOptions, copyOption); } - copyState = BeginCopyFrom(fauxRelation, taskFilename->data, false, NULL, + copyState = BeginCopyFrom(stubRelation, taskFilename->data, false, NULL, copyOptions); while (true) @@ -342,7 +344,7 @@ LoadTuplesIntoTupleStore(CitusScanState *scanState, Job *workerJob) break; } - tuplestore_putvalues(scanState->tuplestorestate, tupleDescriptor, + tuplestore_putvalues(citusScanState->tuplestorestate, tupleDescriptor, columnValues, columnNulls); MemoryContextSwitchTo(oldContext); } @@ -353,20 +355,20 @@ LoadTuplesIntoTupleStore(CitusScanState *scanState, Job *workerJob) /* - * FauxRelation creates a faux Relation from the given tuple descriptor. + * StubRelation creates a stub Relation from the given tuple descriptor. * To be able to use copy.c, we need a Relation descriptor. As there is no * relation corresponding to the data loaded from workers, we need to fake one. * We just need the bare minimal set of fields accessed by BeginCopyFrom(). */ static Relation -FauxRelation(TupleDesc tupleDescriptor) +StubRelation(TupleDesc tupleDescriptor) { - Relation fauxRelation = palloc0(sizeof(RelationData)); - fauxRelation->rd_att = tupleDescriptor; - fauxRelation->rd_rel = palloc0(sizeof(FormData_pg_class)); - fauxRelation->rd_rel->relkind = RELKIND_RELATION; + Relation stubRelation = palloc0(sizeof(RelationData)); + stubRelation->rd_att = tupleDescriptor; + stubRelation->rd_rel = palloc0(sizeof(FormData_pg_class)); + stubRelation->rd_rel->relkind = RELKIND_RELATION; - return fauxRelation; + return stubRelation; } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index f48a9e249..7e0ba55e2 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -496,10 +496,10 @@ RouterMultiModifyExecScan(CustomScanState *node) if (!scanState->finishedRemoteScan) { MultiPlan *multiPlan = scanState->multiPlan; - bool isModificationQuery = IsModifyMultiPlan(multiPlan); - bool hasReturning = multiPlan->hasReturning; Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; + bool hasReturning = multiPlan->hasReturning; + bool isModificationQuery = true; ProcessMasterEvaluableFunctions(workerJob); @@ -515,7 +515,7 @@ RouterMultiModifyExecScan(CustomScanState *node) /* - * RouterSelectExecScan executes a singler select task on the remote node, + * RouterSelectExecScan executes a single select task on the remote node, * retrieves the results and stores them in custom scan's tuple store. Then, it * returns tuples one by one from this tuple store. */ @@ -1229,6 +1229,7 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, char **columnArray = (char **) palloc0(expectedColumnCount * sizeof(char *)); Tuplestorestate *tupleStore = NULL; bool randomAccess = true; + bool interTransactions = false; bool commandFailed = false; MemoryContext ioContext = AllocSetContextCreate(CurrentMemoryContext, "StoreQueryResult", @@ -1239,7 +1240,8 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, if (scanState->tuplestorestate == NULL) { - scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, false, work_mem); + scanState->tuplestorestate = + tuplestore_begin_heap(randomAccess, interTransactions, work_mem); } else if (!failOnError) { diff --git a/src/backend/distributed/planner/multi_master_planner.c b/src/backend/distributed/planner/multi_master_planner.c index 160e2f1dc..6b4be984f 100644 --- a/src/backend/distributed/planner/multi_master_planner.c +++ b/src/backend/distributed/planner/multi_master_planner.c @@ -170,7 +170,7 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan) * and limit plans on top of the scan statement if necessary. */ static PlannedStmt * -BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *dataScan) +BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *remoteScan) { PlannedStmt *selectStatement = NULL; RangeTblEntry *customScanRangeTableEntry = NULL; @@ -195,7 +195,7 @@ BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *dat columnNameList = lappend(columnNameList, makeString(targetEntry->resname)); } - customScanRangeTableEntry = CustomScanRangeTableEntry(columnNameList); + customScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList); /* set the single element range table list */ selectStatement->rtable = list_make1(customScanRangeTableEntry); @@ -203,16 +203,16 @@ BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *dat /* (2) add an aggregation plan if needed */ if (masterQuery->hasAggs || masterQuery->groupClause) { - dataScan->scan.plan.targetlist = masterTargetList; + remoteScan->scan.plan.targetlist = masterTargetList; - aggregationPlan = BuildAggregatePlan(masterQuery, &dataScan->scan.plan); + aggregationPlan = BuildAggregatePlan(masterQuery, &remoteScan->scan.plan); topLevelPlan = (Plan *) aggregationPlan; } else { /* otherwise set the final projections on the scan plan directly */ - dataScan->scan.plan.targetlist = masterQuery->targetList; - topLevelPlan = &dataScan->scan.plan; + remoteScan->scan.plan.targetlist = masterQuery->targetList; + topLevelPlan = &remoteScan->scan.plan; } /* (3) add a sorting plan if needed */ @@ -258,14 +258,15 @@ BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *dat /* - * MasterNodeSelectPlan takes in a distributed plan, finds the master node query - * structure in that plan, and builds the final select plan to execute on the - * master node. Note that this select plan is executed after result files are - * retrieved from worker nodes and filled into the tuple store inside provided - * custom scan. + * MasterNodeSelectPlan takes in a distributed plan and a custom scan node which + * wraps remote part of the plan. This function finds the master node query + * structure in the multi plan, and builds the final select plan to execute on + * the tuples returned by remote scan on the master node. Note that this select + * plan is executed after result files are retrieved from worker nodes and + * filled into the tuple store inside provided custom scan. */ PlannedStmt * -MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *dataScan) +MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *remoteScan) { Query *masterQuery = multiPlan->masterQuery; PlannedStmt *masterSelectPlan = NULL; @@ -274,7 +275,7 @@ MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *dataScan) List *workerTargetList = workerJob->jobQuery->targetList; List *masterTargetList = MasterTargetList(workerTargetList); - masterSelectPlan = BuildSelectStatement(masterQuery, masterTargetList, dataScan); + masterSelectPlan = BuildSelectStatement(masterQuery, masterTargetList, remoteScan); return masterSelectPlan; } diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 580d75a2e..6a9d29c38 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -59,12 +59,11 @@ static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *origin Query *query, ParamListInfo boundParams, RelationRestrictionContext *restrictionContext); static Node * SerializeMultiPlan(struct MultiPlan *multiPlan); -static MultiPlan * DeSerializeMultiPlan(Node *node); +static MultiPlan * DeserializeMultiPlan(Node *node); static PlannedStmt * FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan); static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, CustomScan *customScan); -static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, - CustomScan *customScan); +static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan); static void CheckNodeIsDumpable(Node *node); static RelationRestrictionContext * CreateAndPushRestrictionContext(void); static RelationRestrictionContext * CurrentRestrictionContext(void); @@ -340,10 +339,9 @@ GetMultiPlan(CustomScan *customScan) { MultiPlan *multiPlan = NULL; - Assert(IsA(customScan, CustomScan)); Assert(list_length(customScan->custom_private) == 1); - multiPlan = DeSerializeMultiPlan(linitial(customScan->custom_private)); + multiPlan = DeserializeMultiPlan(linitial(customScan->custom_private)); return multiPlan; } @@ -377,11 +375,11 @@ SerializeMultiPlan(MultiPlan *multiPlan) /* - * DeSerializeMultiPlan returns the deserialized distributed plan from the string + * DeserializeMultiPlan returns the deserialized distributed plan from the string * representation in a Const node. */ static MultiPlan * -DeSerializeMultiPlan(Node *node) +DeserializeMultiPlan(Node *node) { Const *multiPlanData = NULL; char *serializedMultiPlan = NULL; @@ -454,7 +452,7 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan) } else { - finalPlan = FinalizeRouterPlan(localPlan, multiPlan, customScan); + finalPlan = FinalizeRouterPlan(localPlan, customScan); } return finalPlan; @@ -481,19 +479,21 @@ FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, /* - * FinalizeRouterPlan get the distributed router executor plan and wraps it with - * a proper target list. + * FinalizeRouterPlan gets a CustomScan node which already wrapped distributed + * part of a router plan and sets it as the direct child of the router plan + * because we don't run any query on master node for router executable queries. + * Here, we also rebuild the column list to read from the remote scan. */ static PlannedStmt * -FinalizeRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, CustomScan *customScan) +FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan) { PlannedStmt *routerPlan = NULL; - RangeTblEntry *customScanRangeTableEntry = NULL; + RangeTblEntry *remoteScanRangeTableEntry = NULL; ListCell *targetEntryCell = NULL; List *targetList = NIL; List *columnNameList = NIL; - /* we have only one range table entry */ + /* we will have only one range table entry */ int customScanRangeTableIndex = 1; /* build a targetlist to read from the custom scan output */ @@ -516,7 +516,7 @@ FinalizeRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, CustomScan *cus continue; } - /* build target entry pointing to custom scan range table entry */ + /* build target entry pointing to remote scan range table entry */ newVar = makeVarFromTargetEntry(customScanRangeTableIndex, targetEntry); newTargetEntry = flatCopyTargetEntry(targetEntry); newTargetEntry->expr = (Expr *) newVar; @@ -531,8 +531,8 @@ FinalizeRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, CustomScan *cus routerPlan = makeNode(PlannedStmt); routerPlan->planTree = (Plan *) customScan; - customScanRangeTableEntry = CustomScanRangeTableEntry(columnNameList); - routerPlan->rtable = list_make1(customScanRangeTableEntry); + remoteScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList); + routerPlan->rtable = list_make1(remoteScanRangeTableEntry); routerPlan->canSetTag = true; routerPlan->relationOids = NIL; @@ -547,21 +547,21 @@ FinalizeRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, CustomScan *cus /* - * CustomScanRangeTableEntry creates a range table entry from given column name - * list to represent a custom scan. + * RemoteScanRangeTableEntry creates a range table entry from given column name + * list to represent a remote scan. */ RangeTblEntry * -CustomScanRangeTableEntry(List *columnNameList) +RemoteScanRangeTableEntry(List *columnNameList) { - RangeTblEntry *customScanRangeTableEntry = makeNode(RangeTblEntry); + RangeTblEntry *remoteScanRangeTableEntry = makeNode(RangeTblEntry); /* we use RTE_VALUES for custom scan because we can't look up relation */ - customScanRangeTableEntry->rtekind = RTE_VALUES; - customScanRangeTableEntry->eref = makeAlias("remote_scan", columnNameList); - customScanRangeTableEntry->inh = false; - customScanRangeTableEntry->inFromCl = true; + remoteScanRangeTableEntry->rtekind = RTE_VALUES; + remoteScanRangeTableEntry->eref = makeAlias("remote_scan", columnNameList); + remoteScanRangeTableEntry->inh = false; + remoteScanRangeTableEntry->inFromCl = true; - return customScanRangeTableEntry; + return remoteScanRangeTableEntry; } diff --git a/src/include/distributed/multi_planner.h b/src/include/distributed/multi_planner.h index 2b44c2e54..e21fe3b39 100644 --- a/src/include/distributed/multi_planner.h +++ b/src/include/distributed/multi_planner.h @@ -58,7 +58,7 @@ extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOp extern bool IsModifyQuery(Query *query); extern bool IsModifyMultiPlan(struct MultiPlan *multiPlan); extern void VerifyMultiPlanValidity(struct MultiPlan *multiPlan); -extern RangeTblEntry * CustomScanRangeTableEntry(List *columnNameList); +extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); #endif /* MULTI_PLANNER_H */