From 0b01c59fa67f23c4acce9cfe2f31efc2d59bd683 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Tue, 4 Jun 2019 15:19:49 -0700 Subject: [PATCH 1/2] Refactor ScanStateGetTupleDescriptor() into a function. --- src/backend/distributed/executor/citus_custom_scan.c | 11 +++++++++++ src/backend/distributed/executor/multi_executor.c | 2 +- .../distributed/executor/multi_router_executor.c | 6 ++---- src/include/distributed/citus_custom_scan.h | 1 + 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 82ddbfe57..ddbc7c75b 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -318,3 +318,14 @@ CitusReScan(CustomScanState *node) "parameters are currently unsupported"))); } } + + +/* + * ScanStateGetTupleDescriptor returns the tuple descriptor for the given + * scan state. + */ +extern TupleDesc +ScanStateGetTupleDescriptor(CitusScanState *scanState) +{ + return scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; +} diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 110c7a165..11cf69002 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -279,7 +279,7 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) bool interTransactions = false; char *copyFormat = "text"; - tupleDescriptor = customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; + tupleDescriptor = ScanStateGetTupleDescriptor(citusScanState); Assert(citusScanState->tuplestorestate == NULL); citusScanState->tuplestorestate = diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index dba69fe6f..62cbd69c9 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -624,8 +624,7 @@ RouterModifyExecScan(CustomScanState *node) static void SortTupleStore(CitusScanState *scanState) { - TupleDesc tupleDescriptor = - scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); Tuplestorestate *tupleStore = scanState->tuplestorestate; List *targetList = scanState->customScanState.ss.ps.plan->targetlist; @@ -1815,8 +1814,7 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, bool alwaysThrowErrorOnFailure, int64 *rows, DistributedExecutionStats *executionStats) { - TupleDesc tupleDescriptor = - scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); List *targetList = scanState->customScanState.ss.ps.plan->targetlist; uint32 expectedColumnCount = ExecCleanTargetListLength(targetList); diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index 2ef734e63..abf41fbab 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -37,5 +37,6 @@ extern CustomScanMethods DelayedErrorCustomScanMethods; extern void RegisterCitusCustomScanMethods(void); extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es); +extern TupleDesc ScanStateGetTupleDescriptor(CitusScanState *scanState); #endif /* CITUS_CUSTOM_SCAN_H */ From 85325e0098e5eadb0e41f821925beaa8af06ea6a Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Tue, 4 Jun 2019 15:32:50 -0700 Subject: [PATCH 2/2] Refactor ScanStateGetExecutorState into its own function. --- .../distributed/executor/citus_custom_scan.c | 15 +++++++++++++-- .../distributed/executor/insert_select_executor.c | 2 +- src/backend/distributed/executor/multi_executor.c | 4 +++- .../distributed/executor/multi_router_executor.c | 10 +++++----- src/include/distributed/citus_custom_scan.h | 1 + 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index ddbc7c75b..ffce98bdb 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -308,7 +308,7 @@ static void CitusReScan(CustomScanState *node) { CitusScanState *scanState = (CitusScanState *) node; - EState *executorState = scanState->customScanState.ss.ps.state; + EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = executorState->es_param_list_info; if (paramListInfo != NULL) @@ -324,8 +324,19 @@ CitusReScan(CustomScanState *node) * ScanStateGetTupleDescriptor returns the tuple descriptor for the given * scan state. */ -extern TupleDesc +TupleDesc ScanStateGetTupleDescriptor(CitusScanState *scanState) { return scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; } + + +/* + * ScanStateGetExecutorState returns the executor state for the given scan + * state. + */ +EState * +ScanStateGetExecutorState(CitusScanState *scanState) +{ + return scanState->customScanState.ss.ps.state; +} diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index c5e939e83..7df6fdb39 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -63,7 +63,7 @@ CoordinatorInsertSelectExecScan(CustomScanState *node) if (!scanState->finishedRemoteScan) { - EState *executorState = scanState->customScanState.ss.ps.state; + EState *executorState = ScanStateGetExecutorState(scanState); DistributedPlan *distributedPlan = scanState->distributedPlan; Query *selectQuery = distributedPlan->insertSelectSubquery; List *insertTargetList = distributedPlan->insertTargetList; diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 11cf69002..92296080b 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -237,6 +237,7 @@ ReturnTupleFromTuplestore(CitusScanState *scanState) { Tuplestorestate *tupleStore = scanState->tuplestorestate; TupleTableSlot *resultSlot = NULL; + EState *executorState = NULL; ScanDirection scanDirection = NoMovementScanDirection; bool forwardScanDirection = true; @@ -245,7 +246,8 @@ ReturnTupleFromTuplestore(CitusScanState *scanState) return NULL; } - scanDirection = scanState->customScanState.ss.ps.state->es_direction; + executorState = ScanStateGetExecutorState(scanState); + scanDirection = executorState->es_direction; Assert(ScanDirectionIsValid(scanDirection)); if (ScanDirectionIsBackward(scanDirection)) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 62cbd69c9..ca6ff8f5d 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -734,7 +734,7 @@ RouterSequentialModifyExecScan(CustomScanState *node) List *taskList = workerJob->taskList; ListCell *taskCell = NULL; bool multipleTasks = list_length(taskList) > 1; - EState *executorState = scanState->customScanState.ss.ps.state; + EState *executorState = ScanStateGetExecutorState(scanState); bool taskListRequires2PC = TaskListRequires2PC(taskList); bool alwaysThrowErrorOnFailure = false; CmdType operation = scanState->distributedPlan->operation; @@ -905,8 +905,8 @@ RouterSelectExecScan(CustomScanState *node) static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) { - ParamListInfo paramListInfo = - scanState->customScanState.ss.ps.state->es_param_list_info; + EState *executorState = ScanStateGetExecutorState(scanState); + ParamListInfo paramListInfo = executorState->es_param_list_info; List *taskPlacementList = task->taskPlacementList; ListCell *taskPlacementCell = NULL; char *queryString = task->queryString; @@ -1104,7 +1104,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation if (scanState) { - executorState = scanState->customScanState.ss.ps.state; + executorState = ScanStateGetExecutorState(scanState); paramListInfo = executorState->es_param_list_info; } @@ -1350,7 +1350,7 @@ void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, bool isModificationQuery, bool expectResults) { - EState *executorState = scanState->customScanState.ss.ps.state; + EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = executorState->es_param_list_info; int64 affectedTupleCount = -1; diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index abf41fbab..7d41c1340 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -38,5 +38,6 @@ extern void RegisterCitusCustomScanMethods(void); extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es); extern TupleDesc ScanStateGetTupleDescriptor(CitusScanState *scanState); +extern EState * ScanStateGetExecutorState(CitusScanState *scanState); #endif /* CITUS_CUSTOM_SCAN_H */