enh/localExecuteSelectInto
SaitTalhaNisanci 2020-03-24 12:26:02 +03:00
parent 4dc006d52a
commit 45415b28ef
2 changed files with 28 additions and 9 deletions

View File

@ -228,7 +228,6 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList,
binaryFormatString);
perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data);
}
SetTaskQueryString(selectTask, NULL);
selectTask->perPlacementQueryStrings = perPlacementQueries;
}
@ -413,7 +412,7 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor,
ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor,
resultStore, hasReturning, targetPoolSize, &xactProperties,
NIL, false);
NIL, true);
return resultStore;
}

View File

@ -107,7 +107,7 @@ bool LogLocalCommands = false;
bool TransactionAccessedLocalPlacement = false;
bool TransactionConnectedToLocalGroup = false;
static void SplitLocalAndRemotePlacements(List *taskPlacementList,
static uint32 SplitLocalAndRemotePlacements(List *taskPlacementList,
List **localTaskPlacementList,
List **remoteTaskPlacementList);
static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
@ -200,7 +200,8 @@ ExecuteLocalTaskList(List *taskList, ParamListInfo orig_paramListInfo,
taskNumParams = 0;
taskParameterTypes = NULL;
}
List *queryStrings = SplitIntoQueries(TaskQueryString(task));
char * taskQueryString = TaskQueryString(task);
List *queryStrings = SplitIntoQueries(taskQueryString);
if (list_length(queryStrings) > 1)
{
LogLocalCommand(task);
@ -209,7 +210,7 @@ ExecuteLocalTaskList(List *taskList, ParamListInfo orig_paramListInfo,
return totalRowsProcessed;
}
Query *shardQuery = ParseQueryString(linitial(queryStrings),
Query *shardQuery = ParseQueryString(taskQueryString,
taskParameterTypes,
taskNumParams);
@ -418,12 +419,11 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
List *localTaskPlacementList = NULL;
List *remoteTaskPlacementList = NULL;
SplitLocalAndRemotePlacements(
uint32 localPlacementIndex = SplitLocalAndRemotePlacements(
task->taskPlacementList, &localTaskPlacementList, &remoteTaskPlacementList);
/* either the local or the remote should be non-nil */
Assert(!(localTaskPlacementList == NIL && remoteTaskPlacementList == NIL));
if (list_length(task->taskPlacementList) == 1)
{
/*
@ -437,6 +437,10 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
}
else
{
if (list_length(task->perPlacementQueryStrings) > 0) {
task->queryStringLazy = linitial(task->perPlacementQueryStrings);
task->perPlacementQueryStrings = NIL;
}
*localTaskList = lappend(*localTaskList, task);
}
}
@ -452,6 +456,14 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
Task *localTask = copyObject(task);
ListCell* localPlacementQueryStringForInsertSelectCell = NULL;
if (list_length(task->perPlacementQueryStrings) > 0) {
localPlacementQueryStringForInsertSelectCell = list_nth_cell(task->perPlacementQueryStrings,
localPlacementIndex);
localTask->queryStringLazy = (char*)localPlacementQueryStringForInsertSelectCell;
localTask->perPlacementQueryStrings = NIL;
}
localTask->taskPlacementList = localTaskPlacementList;
*localTaskList = lappend(*localTaskList, localTask);
@ -463,7 +475,10 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
{
Task *remoteTask = copyObject(task);
remoteTask->taskPlacementList = remoteTaskPlacementList;
if (localPlacementQueryStringForInsertSelectCell != NULL) {
list_delete_cell(remoteTask->perPlacementQueryStrings,
localPlacementQueryStringForInsertSelectCell, NULL);
}
*remoteTaskList = lappend(*remoteTaskList, remoteTask);
}
}
@ -476,7 +491,7 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
* input taskPlacementList and puts the placements into corresponding list of
* either localTaskPlacementList or remoteTaskPlacementList.
*/
static void
static uint32
SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList,
List **remoteTaskPlacementList)
{
@ -486,17 +501,22 @@ SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacement
*remoteTaskPlacementList = NIL;
ShardPlacement *taskPlacement = NULL;
uint32 localPlacementIndex = 0;
uint32 index = 0;
foreach_ptr(taskPlacement, taskPlacementList)
{
if (taskPlacement->groupId == localGroupId)
{
*localTaskPlacementList = lappend(*localTaskPlacementList, taskPlacement);
localPlacementIndex = index;
}
else
{
*remoteTaskPlacementList = lappend(*remoteTaskPlacementList, taskPlacement);
}
index++;
}
return localPlacementIndex;
}