diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index 68626b039..206b2bb08 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -228,7 +228,6 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, binaryFormatString); perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data); } - SetTaskQueryString(selectTask, NULL); selectTask->perPlacementQueryStrings = perPlacementQueries; } @@ -413,7 +412,7 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor, resultStore, hasReturning, targetPoolSize, &xactProperties, - NIL, false); + NIL, true); return resultStore; } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index ffe5e59f6..a7380f496 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -107,7 +107,7 @@ bool LogLocalCommands = false; bool TransactionAccessedLocalPlacement = false; bool TransactionConnectedToLocalGroup = false; -static void SplitLocalAndRemotePlacements(List *taskPlacementList, +static uint32 SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList, List **remoteTaskPlacementList); static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, @@ -200,7 +200,8 @@ ExecuteLocalTaskList(List *taskList, ParamListInfo orig_paramListInfo, taskNumParams = 0; taskParameterTypes = NULL; } - List *queryStrings = SplitIntoQueries(TaskQueryString(task)); + char * taskQueryString = TaskQueryString(task); + List *queryStrings = SplitIntoQueries(taskQueryString); if (list_length(queryStrings) > 1) { LogLocalCommand(task); @@ -209,7 +210,7 @@ ExecuteLocalTaskList(List *taskList, ParamListInfo orig_paramListInfo, return totalRowsProcessed; } - Query *shardQuery = ParseQueryString(linitial(queryStrings), + Query *shardQuery = ParseQueryString(taskQueryString, taskParameterTypes, taskNumParams); @@ -418,12 +419,11 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, List *localTaskPlacementList = NULL; List *remoteTaskPlacementList = NULL; - SplitLocalAndRemotePlacements( + uint32 localPlacementIndex = SplitLocalAndRemotePlacements( task->taskPlacementList, &localTaskPlacementList, &remoteTaskPlacementList); /* either the local or the remote should be non-nil */ Assert(!(localTaskPlacementList == NIL && remoteTaskPlacementList == NIL)); - if (list_length(task->taskPlacementList) == 1) { /* @@ -437,6 +437,10 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, } else { + if (list_length(task->perPlacementQueryStrings) > 0) { + task->queryStringLazy = linitial(task->perPlacementQueryStrings); + task->perPlacementQueryStrings = NIL; + } *localTaskList = lappend(*localTaskList, task); } } @@ -452,6 +456,14 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, Task *localTask = copyObject(task); + ListCell* localPlacementQueryStringForInsertSelectCell = NULL; + if (list_length(task->perPlacementQueryStrings) > 0) { + localPlacementQueryStringForInsertSelectCell = list_nth_cell(task->perPlacementQueryStrings, + localPlacementIndex); + localTask->queryStringLazy = (char*)localPlacementQueryStringForInsertSelectCell; + localTask->perPlacementQueryStrings = NIL; + } + localTask->taskPlacementList = localTaskPlacementList; *localTaskList = lappend(*localTaskList, localTask); @@ -463,7 +475,10 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, { Task *remoteTask = copyObject(task); remoteTask->taskPlacementList = remoteTaskPlacementList; - + if (localPlacementQueryStringForInsertSelectCell != NULL) { + list_delete_cell(remoteTask->perPlacementQueryStrings, + localPlacementQueryStringForInsertSelectCell, NULL); + } *remoteTaskList = lappend(*remoteTaskList, remoteTask); } } @@ -476,7 +491,7 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, * input taskPlacementList and puts the placements into corresponding list of * either localTaskPlacementList or remoteTaskPlacementList. */ -static void +static uint32 SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList, List **remoteTaskPlacementList) { @@ -486,17 +501,22 @@ SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacement *remoteTaskPlacementList = NIL; ShardPlacement *taskPlacement = NULL; + uint32 localPlacementIndex = 0; + uint32 index = 0; foreach_ptr(taskPlacement, taskPlacementList) { if (taskPlacement->groupId == localGroupId) { *localTaskPlacementList = lappend(*localTaskPlacementList, taskPlacement); + localPlacementIndex = index; } else { *remoteTaskPlacementList = lappend(*remoteTaskPlacementList, taskPlacement); } + index++; } + return localPlacementIndex; }