mirror of https://github.com/citusdata/citus.git
Merge pull request #2748 from citusdata/ScanStateGetTupleDescriptor
Refactor some scan state info into their own functions.pull/2750/head
commit
674b7ce29a
|
@ -308,7 +308,7 @@ static void
|
||||||
CitusReScan(CustomScanState *node)
|
CitusReScan(CustomScanState *node)
|
||||||
{
|
{
|
||||||
CitusScanState *scanState = (CitusScanState *) node;
|
CitusScanState *scanState = (CitusScanState *) node;
|
||||||
EState *executorState = scanState->customScanState.ss.ps.state;
|
EState *executorState = ScanStateGetExecutorState(scanState);
|
||||||
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
||||||
|
|
||||||
if (paramListInfo != NULL)
|
if (paramListInfo != NULL)
|
||||||
|
@ -318,3 +318,25 @@ CitusReScan(CustomScanState *node)
|
||||||
"parameters are currently unsupported")));
|
"parameters are currently unsupported")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ScanStateGetTupleDescriptor returns the tuple descriptor for the given
|
||||||
|
* scan state.
|
||||||
|
*/
|
||||||
|
TupleDesc
|
||||||
|
ScanStateGetTupleDescriptor(CitusScanState *scanState)
|
||||||
|
{
|
||||||
|
return scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ScanStateGetExecutorState returns the executor state for the given scan
|
||||||
|
* state.
|
||||||
|
*/
|
||||||
|
EState *
|
||||||
|
ScanStateGetExecutorState(CitusScanState *scanState)
|
||||||
|
{
|
||||||
|
return scanState->customScanState.ss.ps.state;
|
||||||
|
}
|
||||||
|
|
|
@ -63,7 +63,7 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
|
||||||
|
|
||||||
if (!scanState->finishedRemoteScan)
|
if (!scanState->finishedRemoteScan)
|
||||||
{
|
{
|
||||||
EState *executorState = scanState->customScanState.ss.ps.state;
|
EState *executorState = ScanStateGetExecutorState(scanState);
|
||||||
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
||||||
Query *selectQuery = distributedPlan->insertSelectSubquery;
|
Query *selectQuery = distributedPlan->insertSelectSubquery;
|
||||||
List *insertTargetList = distributedPlan->insertTargetList;
|
List *insertTargetList = distributedPlan->insertTargetList;
|
||||||
|
|
|
@ -237,6 +237,7 @@ ReturnTupleFromTuplestore(CitusScanState *scanState)
|
||||||
{
|
{
|
||||||
Tuplestorestate *tupleStore = scanState->tuplestorestate;
|
Tuplestorestate *tupleStore = scanState->tuplestorestate;
|
||||||
TupleTableSlot *resultSlot = NULL;
|
TupleTableSlot *resultSlot = NULL;
|
||||||
|
EState *executorState = NULL;
|
||||||
ScanDirection scanDirection = NoMovementScanDirection;
|
ScanDirection scanDirection = NoMovementScanDirection;
|
||||||
bool forwardScanDirection = true;
|
bool forwardScanDirection = true;
|
||||||
|
|
||||||
|
@ -245,7 +246,8 @@ ReturnTupleFromTuplestore(CitusScanState *scanState)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
scanDirection = scanState->customScanState.ss.ps.state->es_direction;
|
executorState = ScanStateGetExecutorState(scanState);
|
||||||
|
scanDirection = executorState->es_direction;
|
||||||
Assert(ScanDirectionIsValid(scanDirection));
|
Assert(ScanDirectionIsValid(scanDirection));
|
||||||
|
|
||||||
if (ScanDirectionIsBackward(scanDirection))
|
if (ScanDirectionIsBackward(scanDirection))
|
||||||
|
@ -279,7 +281,7 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
|
||||||
bool interTransactions = false;
|
bool interTransactions = false;
|
||||||
char *copyFormat = "text";
|
char *copyFormat = "text";
|
||||||
|
|
||||||
tupleDescriptor = customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
|
tupleDescriptor = ScanStateGetTupleDescriptor(citusScanState);
|
||||||
|
|
||||||
Assert(citusScanState->tuplestorestate == NULL);
|
Assert(citusScanState->tuplestorestate == NULL);
|
||||||
citusScanState->tuplestorestate =
|
citusScanState->tuplestorestate =
|
||||||
|
|
|
@ -624,8 +624,7 @@ RouterModifyExecScan(CustomScanState *node)
|
||||||
static void
|
static void
|
||||||
SortTupleStore(CitusScanState *scanState)
|
SortTupleStore(CitusScanState *scanState)
|
||||||
{
|
{
|
||||||
TupleDesc tupleDescriptor =
|
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
||||||
scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
|
|
||||||
Tuplestorestate *tupleStore = scanState->tuplestorestate;
|
Tuplestorestate *tupleStore = scanState->tuplestorestate;
|
||||||
|
|
||||||
List *targetList = scanState->customScanState.ss.ps.plan->targetlist;
|
List *targetList = scanState->customScanState.ss.ps.plan->targetlist;
|
||||||
|
@ -735,7 +734,7 @@ RouterSequentialModifyExecScan(CustomScanState *node)
|
||||||
List *taskList = workerJob->taskList;
|
List *taskList = workerJob->taskList;
|
||||||
ListCell *taskCell = NULL;
|
ListCell *taskCell = NULL;
|
||||||
bool multipleTasks = list_length(taskList) > 1;
|
bool multipleTasks = list_length(taskList) > 1;
|
||||||
EState *executorState = scanState->customScanState.ss.ps.state;
|
EState *executorState = ScanStateGetExecutorState(scanState);
|
||||||
bool taskListRequires2PC = TaskListRequires2PC(taskList);
|
bool taskListRequires2PC = TaskListRequires2PC(taskList);
|
||||||
bool alwaysThrowErrorOnFailure = false;
|
bool alwaysThrowErrorOnFailure = false;
|
||||||
CmdType operation = scanState->distributedPlan->operation;
|
CmdType operation = scanState->distributedPlan->operation;
|
||||||
|
@ -906,8 +905,8 @@ RouterSelectExecScan(CustomScanState *node)
|
||||||
static void
|
static void
|
||||||
ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
|
ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
|
||||||
{
|
{
|
||||||
ParamListInfo paramListInfo =
|
EState *executorState = ScanStateGetExecutorState(scanState);
|
||||||
scanState->customScanState.ss.ps.state->es_param_list_info;
|
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
||||||
List *taskPlacementList = task->taskPlacementList;
|
List *taskPlacementList = task->taskPlacementList;
|
||||||
ListCell *taskPlacementCell = NULL;
|
ListCell *taskPlacementCell = NULL;
|
||||||
char *queryString = task->queryString;
|
char *queryString = task->queryString;
|
||||||
|
@ -1105,7 +1104,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation
|
||||||
|
|
||||||
if (scanState)
|
if (scanState)
|
||||||
{
|
{
|
||||||
executorState = scanState->customScanState.ss.ps.state;
|
executorState = ScanStateGetExecutorState(scanState);
|
||||||
paramListInfo = executorState->es_param_list_info;
|
paramListInfo = executorState->es_param_list_info;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1351,7 +1350,7 @@ void
|
||||||
ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
|
ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
|
||||||
bool isModificationQuery, bool expectResults)
|
bool isModificationQuery, bool expectResults)
|
||||||
{
|
{
|
||||||
EState *executorState = scanState->customScanState.ss.ps.state;
|
EState *executorState = ScanStateGetExecutorState(scanState);
|
||||||
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
||||||
int64 affectedTupleCount = -1;
|
int64 affectedTupleCount = -1;
|
||||||
|
|
||||||
|
@ -1815,8 +1814,7 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
|
||||||
bool alwaysThrowErrorOnFailure, int64 *rows,
|
bool alwaysThrowErrorOnFailure, int64 *rows,
|
||||||
DistributedExecutionStats *executionStats)
|
DistributedExecutionStats *executionStats)
|
||||||
{
|
{
|
||||||
TupleDesc tupleDescriptor =
|
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
||||||
scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
|
|
||||||
AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
|
AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
|
||||||
List *targetList = scanState->customScanState.ss.ps.plan->targetlist;
|
List *targetList = scanState->customScanState.ss.ps.plan->targetlist;
|
||||||
uint32 expectedColumnCount = ExecCleanTargetListLength(targetList);
|
uint32 expectedColumnCount = ExecCleanTargetListLength(targetList);
|
||||||
|
|
|
@ -37,5 +37,7 @@ extern CustomScanMethods DelayedErrorCustomScanMethods;
|
||||||
extern void RegisterCitusCustomScanMethods(void);
|
extern void RegisterCitusCustomScanMethods(void);
|
||||||
extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct
|
extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct
|
||||||
ExplainState *es);
|
ExplainState *es);
|
||||||
|
extern TupleDesc ScanStateGetTupleDescriptor(CitusScanState *scanState);
|
||||||
|
extern EState * ScanStateGetExecutorState(CitusScanState *scanState);
|
||||||
|
|
||||||
#endif /* CITUS_CUSTOM_SCAN_H */
|
#endif /* CITUS_CUSTOM_SCAN_H */
|
||||||
|
|
Loading…
Reference in New Issue