mirror of https://github.com/citusdata/citus.git
Reduce local insert memory usage during deparsing
parent
644b266dee
commit
715dce1eea
|
@ -59,6 +59,7 @@ RebuildQueryStrings(Job *workerJob)
|
||||||
Query *originalQuery = workerJob->jobQuery;
|
Query *originalQuery = workerJob->jobQuery;
|
||||||
List *taskList = workerJob->taskList;
|
List *taskList = workerJob->taskList;
|
||||||
Task *task = NULL;
|
Task *task = NULL;
|
||||||
|
bool isSingleTask = list_length(taskList) == 1;
|
||||||
|
|
||||||
if (originalQuery->commandType == CMD_INSERT)
|
if (originalQuery->commandType == CMD_INSERT)
|
||||||
{
|
{
|
||||||
|
@ -74,7 +75,7 @@ RebuildQueryStrings(Job *workerJob)
|
||||||
* task, we scribble on the original query to avoid the copying
|
* task, we scribble on the original query to avoid the copying
|
||||||
* overhead.
|
* overhead.
|
||||||
*/
|
*/
|
||||||
if (list_length(taskList) > 1)
|
if (!isSingleTask)
|
||||||
{
|
{
|
||||||
query = copyObject(originalQuery);
|
query = copyObject(originalQuery);
|
||||||
}
|
}
|
||||||
|
@ -119,6 +120,19 @@ RebuildQueryStrings(Job *workerJob)
|
||||||
* deparse_shard_query when the string is needed
|
* deparse_shard_query when the string is needed
|
||||||
*/
|
*/
|
||||||
task->anchorDistributedTableId = modifiedRelationRTE->relid;
|
task->anchorDistributedTableId = modifiedRelationRTE->relid;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* For multi-row inserts, we modify the VALUES before storing the
|
||||||
|
* query in the task.
|
||||||
|
*/
|
||||||
|
RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(query);
|
||||||
|
if (valuesRTE != NULL)
|
||||||
|
{
|
||||||
|
Assert(valuesRTE->rtekind == RTE_VALUES);
|
||||||
|
Assert(task->rowValuesLists != NULL);
|
||||||
|
|
||||||
|
valuesRTE->values_lists = task->rowValuesLists;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT ||
|
bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT ||
|
||||||
|
@ -180,39 +194,7 @@ AddInsertAliasIfNeeded(Query *query)
|
||||||
static void
|
static void
|
||||||
UpdateTaskQueryString(Query *query, Task *task)
|
UpdateTaskQueryString(Query *query, Task *task)
|
||||||
{
|
{
|
||||||
List *oldValuesLists = NIL;
|
|
||||||
RangeTblEntry *valuesRTE = NULL;
|
|
||||||
|
|
||||||
if (query->commandType == CMD_INSERT)
|
|
||||||
{
|
|
||||||
/* extract the VALUES from the INSERT */
|
|
||||||
valuesRTE = ExtractDistributedInsertValuesRTE(query);
|
|
||||||
|
|
||||||
if (valuesRTE != NULL)
|
|
||||||
{
|
|
||||||
Assert(valuesRTE->rtekind == RTE_VALUES);
|
|
||||||
Assert(task->rowValuesLists != NULL);
|
|
||||||
|
|
||||||
oldValuesLists = valuesRTE->values_lists;
|
|
||||||
valuesRTE->values_lists = task->rowValuesLists;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ShouldLazyDeparseQuery(task))
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* not all insert queries are copied before calling this
|
|
||||||
* function, so we do it here
|
|
||||||
*/
|
|
||||||
query = copyObject(query);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SetTaskQueryIfShouldLazyDeparse(task, query);
|
SetTaskQueryIfShouldLazyDeparse(task, query);
|
||||||
|
|
||||||
if (valuesRTE != NULL)
|
|
||||||
{
|
|
||||||
valuesRTE->values_lists = oldValuesLists;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue