diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 85de59949..1d5eb2197 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -52,14 +52,13 @@ static TupleTableSlot * CoordinatorInsertSelectExecScanInternal(CustomScanState static Query * WrapSubquery(Query *subquery); static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, char *resultIdPrefix); -static void ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, - Query *selectQuery, EState *executorState); -static HTAB * ExecuteSelectIntoColocatedIntermediateResults(Oid targetRelationId, - List *insertTargetList, - Query *selectQuery, - EState *executorState, - char * - intermediateResultIdPrefix); +static void ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, + PlannedStmt *selectPlan, EState *executorState); +static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, + List *insertTargetList, + PlannedStmt *selectPlan, + EState *executorState, + char *intermediateResultIdPrefix); static List * BuildColumnNameListFromTargetList(Oid targetRelationId, List *insertTargetList); static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList); @@ -106,6 +105,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) if (!scanState->finishedRemoteScan) { EState *executorState = ScanStateGetExecutorState(scanState); + ParamListInfo paramListInfo = executorState->es_param_list_info; DistributedPlan *distributedPlan = scanState->distributedPlan; Query *insertSelectQuery = copyObject(distributedPlan->insertSelectQuery); List *insertTargetList = insertSelectQuery->targetList; @@ -134,6 +134,18 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) selectRte->subquery = selectQuery; ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); + /* + * Make a copy of the query, since pg_plan_query may scribble on it and we + * want it to be replanned every time if it is stored in a prepared + * statement. + */ + selectQuery = copyObject(selectQuery); + + /* plan the subquery, this may be another distributed query */ + int cursorOptions = CURSOR_OPT_PARALLEL_OK; + PlannedStmt *selectPlan = pg_plan_query(selectQuery, cursorOptions, + paramListInfo); + /* * If we are dealing with partitioned table, we also need to lock its * partitions. Here we only lock targetRelation, we acquire necessary @@ -156,10 +168,10 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) ListCell *taskCell = NULL; List *prunedTaskList = NIL; - shardStateHash = ExecuteSelectIntoColocatedIntermediateResults( + shardStateHash = ExecutePlanIntoColocatedIntermediateResults( targetRelationId, insertTargetList, - selectQuery, + selectPlan, executorState, intermediateResultIdPrefix); @@ -209,8 +221,8 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) } else { - ExecuteSelectIntoRelation(targetRelationId, insertTargetList, selectQuery, - executorState); + ExecutePlanIntoRelation(targetRelationId, insertTargetList, selectPlan, + executorState); } scanState->finishedRemoteScan = true; @@ -438,17 +450,18 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, /* - * ExecuteSelectIntoColocatedIntermediateResults executes the given select query + * ExecutePlanIntoColocatedIntermediateResults executes the given PlannedStmt * and inserts tuples into a set of intermediate results that are colocated with * the target table for further processing of ON CONFLICT or RETURNING. It also * returns the hash of shard states that were used to insert tuplesinto the target * relation. */ static HTAB * -ExecuteSelectIntoColocatedIntermediateResults(Oid targetRelationId, - List *insertTargetList, - Query *selectQuery, EState *executorState, - char *intermediateResultIdPrefix) +ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, + List *insertTargetList, + PlannedStmt *selectPlan, + EState *executorState, + char *intermediateResultIdPrefix) { ParamListInfo paramListInfo = executorState->es_param_list_info; bool stopOnFailure = false; @@ -473,14 +486,7 @@ ExecuteSelectIntoColocatedIntermediateResults(Oid targetRelationId, stopOnFailure, intermediateResultIdPrefix); - /* - * Make a copy of the query, since ExecuteQueryIntoDestReceiver may scribble on it - * and we want it to be replanned every time if it is stored in a prepared - * statement. - */ - Query *queryCopy = copyObject(selectQuery); - - ExecuteQueryIntoDestReceiver(queryCopy, paramListInfo, (DestReceiver *) copyDest); + ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); executorState->es_processed = copyDest->tuplesSent; @@ -491,13 +497,13 @@ ExecuteSelectIntoColocatedIntermediateResults(Oid targetRelationId, /* - * ExecuteSelectIntoRelation executes given SELECT query and inserts the + * ExecutePlanIntoRelation executes the given plan and inserts the * results into the target relation, which is assumed to be a distributed * table. */ static void -ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, - Query *selectQuery, EState *executorState) +ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, + PlannedStmt *selectPlan, EState *executorState) { ParamListInfo paramListInfo = executorState->es_param_list_info; bool stopOnFailure = false; @@ -521,14 +527,7 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, executorState, stopOnFailure, NULL); - /* - * Make a copy of the query, since ExecuteQueryIntoDestReceiver may scribble on it - * and we want it to be replanned every time if it is stored in a prepared - * statement. - */ - Query *queryCopy = copyObject(selectQuery); - - ExecuteQueryIntoDestReceiver(queryCopy, paramListInfo, (DestReceiver *) copyDest); + ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); executorState->es_processed = copyDest->tuplesSent;