INSERT/SELECT: Use ExecutePlan* instead of ExecuteSelect*

pull/3376/head
Hadi Moshayedi 2020-01-10 02:16:58 -08:00
parent a53b844939
commit d449c1857c
1 changed files with 35 additions and 36 deletions

View File

@ -52,14 +52,13 @@ static TupleTableSlot * CoordinatorInsertSelectExecScanInternal(CustomScanState
static Query * WrapSubquery(Query *subquery);
static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
char *resultIdPrefix);
static void ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
Query *selectQuery, EState *executorState);
static HTAB * ExecuteSelectIntoColocatedIntermediateResults(Oid targetRelationId,
static void ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
PlannedStmt *selectPlan, EState *executorState);
static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
List *insertTargetList,
Query *selectQuery,
PlannedStmt *selectPlan,
EState *executorState,
char *
intermediateResultIdPrefix);
char *intermediateResultIdPrefix);
static List * BuildColumnNameListFromTargetList(Oid targetRelationId,
List *insertTargetList);
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
@ -106,6 +105,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
if (!scanState->finishedRemoteScan)
{
EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info;
DistributedPlan *distributedPlan = scanState->distributedPlan;
Query *insertSelectQuery = copyObject(distributedPlan->insertSelectQuery);
List *insertTargetList = insertSelectQuery->targetList;
@ -134,6 +134,18 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
selectRte->subquery = selectQuery;
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
/*
* Make a copy of the query, since pg_plan_query may scribble on it and we
* want it to be replanned every time if it is stored in a prepared
* statement.
*/
selectQuery = copyObject(selectQuery);
/* plan the subquery, this may be another distributed query */
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
PlannedStmt *selectPlan = pg_plan_query(selectQuery, cursorOptions,
paramListInfo);
/*
* If we are dealing with partitioned table, we also need to lock its
* partitions. Here we only lock targetRelation, we acquire necessary
@ -156,10 +168,10 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
ListCell *taskCell = NULL;
List *prunedTaskList = NIL;
shardStateHash = ExecuteSelectIntoColocatedIntermediateResults(
shardStateHash = ExecutePlanIntoColocatedIntermediateResults(
targetRelationId,
insertTargetList,
selectQuery,
selectPlan,
executorState,
intermediateResultIdPrefix);
@ -209,7 +221,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
}
else
{
ExecuteSelectIntoRelation(targetRelationId, insertTargetList, selectQuery,
ExecutePlanIntoRelation(targetRelationId, insertTargetList, selectPlan,
executorState);
}
@ -438,16 +450,17 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
/*
* ExecuteSelectIntoColocatedIntermediateResults executes the given select query
* ExecutePlanIntoColocatedIntermediateResults executes the given PlannedStmt
* and inserts tuples into a set of intermediate results that are colocated with
* the target table for further processing of ON CONFLICT or RETURNING. It also
* returns the hash of shard states that were used to insert tuplesinto the target
* relation.
*/
static HTAB *
ExecuteSelectIntoColocatedIntermediateResults(Oid targetRelationId,
ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
List *insertTargetList,
Query *selectQuery, EState *executorState,
PlannedStmt *selectPlan,
EState *executorState,
char *intermediateResultIdPrefix)
{
ParamListInfo paramListInfo = executorState->es_param_list_info;
@ -473,14 +486,7 @@ ExecuteSelectIntoColocatedIntermediateResults(Oid targetRelationId,
stopOnFailure,
intermediateResultIdPrefix);
/*
* Make a copy of the query, since ExecuteQueryIntoDestReceiver may scribble on it
* and we want it to be replanned every time if it is stored in a prepared
* statement.
*/
Query *queryCopy = copyObject(selectQuery);
ExecuteQueryIntoDestReceiver(queryCopy, paramListInfo, (DestReceiver *) copyDest);
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
executorState->es_processed = copyDest->tuplesSent;
@ -491,13 +497,13 @@ ExecuteSelectIntoColocatedIntermediateResults(Oid targetRelationId,
/*
* ExecuteSelectIntoRelation executes given SELECT query and inserts the
* ExecutePlanIntoRelation executes the given plan and inserts the
* results into the target relation, which is assumed to be a distributed
* table.
*/
static void
ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
Query *selectQuery, EState *executorState)
ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
PlannedStmt *selectPlan, EState *executorState)
{
ParamListInfo paramListInfo = executorState->es_param_list_info;
bool stopOnFailure = false;
@ -521,14 +527,7 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
executorState,
stopOnFailure, NULL);
/*
* Make a copy of the query, since ExecuteQueryIntoDestReceiver may scribble on it
* and we want it to be replanned every time if it is stored in a prepared
* statement.
*/
Query *queryCopy = copyObject(selectQuery);
ExecuteQueryIntoDestReceiver(queryCopy, paramListInfo, (DestReceiver *) copyDest);
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
executorState->es_processed = copyDest->tuplesSent;