diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 82ddbfe57..ffce98bdb 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -308,7 +308,7 @@ static void CitusReScan(CustomScanState *node) { CitusScanState *scanState = (CitusScanState *) node; - EState *executorState = scanState->customScanState.ss.ps.state; + EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = executorState->es_param_list_info; if (paramListInfo != NULL) @@ -318,3 +318,25 @@ CitusReScan(CustomScanState *node) "parameters are currently unsupported"))); } } + + +/* + * ScanStateGetTupleDescriptor returns the tuple descriptor for the given + * scan state. + */ +TupleDesc +ScanStateGetTupleDescriptor(CitusScanState *scanState) +{ + return scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; +} + + +/* + * ScanStateGetExecutorState returns the executor state for the given scan + * state. + */ +EState * +ScanStateGetExecutorState(CitusScanState *scanState) +{ + return scanState->customScanState.ss.ps.state; +} diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index c5e939e83..7df6fdb39 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -63,7 +63,7 @@ CoordinatorInsertSelectExecScan(CustomScanState *node) if (!scanState->finishedRemoteScan) { - EState *executorState = scanState->customScanState.ss.ps.state; + EState *executorState = ScanStateGetExecutorState(scanState); DistributedPlan *distributedPlan = scanState->distributedPlan; Query *selectQuery = distributedPlan->insertSelectSubquery; List *insertTargetList = distributedPlan->insertTargetList; diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 110c7a165..92296080b 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -237,6 +237,7 @@ ReturnTupleFromTuplestore(CitusScanState *scanState) { Tuplestorestate *tupleStore = scanState->tuplestorestate; TupleTableSlot *resultSlot = NULL; + EState *executorState = NULL; ScanDirection scanDirection = NoMovementScanDirection; bool forwardScanDirection = true; @@ -245,7 +246,8 @@ ReturnTupleFromTuplestore(CitusScanState *scanState) return NULL; } - scanDirection = scanState->customScanState.ss.ps.state->es_direction; + executorState = ScanStateGetExecutorState(scanState); + scanDirection = executorState->es_direction; Assert(ScanDirectionIsValid(scanDirection)); if (ScanDirectionIsBackward(scanDirection)) @@ -279,7 +281,7 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) bool interTransactions = false; char *copyFormat = "text"; - tupleDescriptor = customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; + tupleDescriptor = ScanStateGetTupleDescriptor(citusScanState); Assert(citusScanState->tuplestorestate == NULL); citusScanState->tuplestorestate = diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index dba69fe6f..ca6ff8f5d 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -624,8 +624,7 @@ RouterModifyExecScan(CustomScanState *node) static void SortTupleStore(CitusScanState *scanState) { - TupleDesc tupleDescriptor = - scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); Tuplestorestate *tupleStore = scanState->tuplestorestate; List *targetList = scanState->customScanState.ss.ps.plan->targetlist; @@ -735,7 +734,7 @@ RouterSequentialModifyExecScan(CustomScanState *node) List *taskList = workerJob->taskList; ListCell *taskCell = NULL; bool multipleTasks = list_length(taskList) > 1; - EState *executorState = scanState->customScanState.ss.ps.state; + EState *executorState = ScanStateGetExecutorState(scanState); bool taskListRequires2PC = TaskListRequires2PC(taskList); bool alwaysThrowErrorOnFailure = false; CmdType operation = scanState->distributedPlan->operation; @@ -906,8 +905,8 @@ RouterSelectExecScan(CustomScanState *node) static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) { - ParamListInfo paramListInfo = - scanState->customScanState.ss.ps.state->es_param_list_info; + EState *executorState = ScanStateGetExecutorState(scanState); + ParamListInfo paramListInfo = executorState->es_param_list_info; List *taskPlacementList = task->taskPlacementList; ListCell *taskPlacementCell = NULL; char *queryString = task->queryString; @@ -1105,7 +1104,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation if (scanState) { - executorState = scanState->customScanState.ss.ps.state; + executorState = ScanStateGetExecutorState(scanState); paramListInfo = executorState->es_param_list_info; } @@ -1351,7 +1350,7 @@ void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, bool isModificationQuery, bool expectResults) { - EState *executorState = scanState->customScanState.ss.ps.state; + EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = executorState->es_param_list_info; int64 affectedTupleCount = -1; @@ -1815,8 +1814,7 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, bool alwaysThrowErrorOnFailure, int64 *rows, DistributedExecutionStats *executionStats) { - TupleDesc tupleDescriptor = - scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); List *targetList = scanState->customScanState.ss.ps.plan->targetlist; uint32 expectedColumnCount = ExecCleanTargetListLength(targetList); diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index 2ef734e63..7d41c1340 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -37,5 +37,7 @@ extern CustomScanMethods DelayedErrorCustomScanMethods; extern void RegisterCitusCustomScanMethods(void); extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es); +extern TupleDesc ScanStateGetTupleDescriptor(CitusScanState *scanState); +extern EState * ScanStateGetExecutorState(CitusScanState *scanState); #endif /* CITUS_CUSTOM_SCAN_H */