move queryStringList into taskQuery

Also allocate task query in the memory context of task.
pull/3659/head
SaitTalhaNisanci 2020-03-31 10:24:15 +03:00
parent c796ac335d
commit 8806c4d697
7 changed files with 75 additions and 21 deletions

View File

@ -221,7 +221,8 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList,
shardPlacement->nodeId, shardPlacement->nodeId,
quote_literal_cstr(taskPrefix), quote_literal_cstr(taskPrefix),
quote_literal_cstr(taskPrefix), quote_literal_cstr(taskPrefix),
quote_literal_cstr(TaskQueryStringForAllPlacements(selectTask)), quote_literal_cstr(TaskQueryStringForAllPlacements(
selectTask)),
partitionColumnIndex, partitionColumnIndex,
quote_literal_cstr(partitionMethodString), quote_literal_cstr(partitionMethodString),
minValuesString->data, maxValuesString->data, minValuesString->data, maxValuesString->data,

View File

@ -1570,7 +1570,8 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task)
*/ */
StringInfo sqlTaskQueryString = makeStringInfo(); StringInfo sqlTaskQueryString = makeStringInfo();
char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringForAllPlacements(task)); char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringForAllPlacements(
task));
if (BinaryMasterCopyFormat) if (BinaryMasterCopyFormat)
{ {

View File

@ -130,7 +130,8 @@ RebuildQueryStrings(Job *workerJob)
task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved; task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved;
ereport(DEBUG4, (errmsg("query after rebuilding: %s", ereport(DEBUG4, (errmsg("query after rebuilding: %s",
ApplyLogRedaction(TaskQueryStringForAllPlacements(task))))); ApplyLogRedaction(TaskQueryStringForAllPlacements(
task)))));
} }
} }
@ -459,7 +460,10 @@ InitializeTaskQueryIfNecessary(Task *task)
{ {
if (task->taskQuery == NULL) if (task->taskQuery == NULL)
{ {
MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext(
task));
task->taskQuery = CitusMakeNode(TaskQuery); task->taskQuery = CitusMakeNode(TaskQuery);
MemoryContextSwitchTo(previousContext);
} }
} }
@ -483,8 +487,9 @@ SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList)
void void
SetTaskQueryStringList(Task *task, List *queryStringList) SetTaskQueryStringList(Task *task, List *queryStringList)
{ {
task->queryStringList = queryStringList; InitializeTaskQueryIfNecessary(task);
SetTaskQueryString(task, StringJoin(queryStringList, ';')); task->taskQuery->queryType = TASK_QUERY_TEXT_LIST;
task->taskQuery->data.queryStringList = queryStringList;
} }
@ -537,13 +542,18 @@ GetTaskQueryType(Task *task)
char * char *
TaskQueryStringForAllPlacements(Task *task) TaskQueryStringForAllPlacements(Task *task)
{ {
if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST)
{
return StringJoin(task->taskQuery->data.queryStringList, ';');
}
if (GetTaskQueryType(task) == TASK_QUERY_TEXT) if (GetTaskQueryType(task) == TASK_QUERY_TEXT)
{ {
return task->taskQuery->data.queryStringLazy; return task->taskQuery->data.queryStringLazy;
} }
Query *jobQueryReferenceForLazyDeparsing = task->taskQuery->data.jobQueryReferenceForLazyDeparsing; Query *jobQueryReferenceForLazyDeparsing =
Assert(task->taskQuery->queryType == TASK_QUERY_OBJECT && jobQueryReferenceForLazyDeparsing != task->taskQuery->data.jobQueryReferenceForLazyDeparsing;
NULL); Assert(task->taskQuery->queryType == TASK_QUERY_OBJECT &&
jobQueryReferenceForLazyDeparsing != NULL);
/* /*
@ -563,6 +573,10 @@ TaskQueryStringForAllPlacements(Task *task)
} }
/*
* TaskQueryStringForPlacement returns the query string that should be executed
* on the placement with the given placementIndex.
*/
char * char *
TaskQueryStringForPlacement(Task *task, int placementIndex) TaskQueryStringForPlacement(Task *task, int placementIndex)
{ {

View File

@ -399,7 +399,8 @@ RemoteExplain(Task *task, ExplainState *es)
RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0( RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0(
sizeof(RemoteExplainPlan)); sizeof(RemoteExplainPlan));
StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringForAllPlacements(task), StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringForAllPlacements(
task),
es); es);
/* /*

View File

@ -276,6 +276,12 @@ CopyNodeTaskQuery(COPYFUNC_ARGS)
break; break;
} }
case TASK_QUERY_TEXT_LIST:
{
COPY_NODE_FIELD(data.queryStringList);
break;
}
default: default:
{ {
break; break;
@ -293,7 +299,6 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_SCALAR_FIELD(jobId); COPY_SCALAR_FIELD(jobId);
COPY_SCALAR_FIELD(taskId); COPY_SCALAR_FIELD(taskId);
COPY_NODE_FIELD(taskQuery); COPY_NODE_FIELD(taskQuery);
COPY_NODE_FIELD(queryStringList);
COPY_SCALAR_FIELD(anchorDistributedTableId); COPY_SCALAR_FIELD(anchorDistributedTableId);
COPY_SCALAR_FIELD(anchorShardId); COPY_SCALAR_FIELD(anchorShardId);
COPY_NODE_FIELD(taskPlacementList); COPY_NODE_FIELD(taskPlacementList);

View File

@ -474,6 +474,38 @@ void OutTaskQuery(OUTFUNC_ARGS) {
WRITE_NODE_TYPE("TASKQUERY"); WRITE_NODE_TYPE("TASKQUERY");
WRITE_ENUM_FIELD(queryType, TaskQueryType); WRITE_ENUM_FIELD(queryType, TaskQueryType);
switch (node->queryType)
{
case TASK_QUERY_TEXT:
{
WRITE_STRING_FIELD(data.queryStringLazy);
break;
}
case TASK_QUERY_OBJECT:
{
WRITE_NODE_FIELD(data.jobQueryReferenceForLazyDeparsing);
break;
}
case TASK_QUERY_TEXT_PER_PLACEMENT:
{
WRITE_NODE_FIELD(data.perPlacementQueryStrings);
break;
}
case TASK_QUERY_TEXT_LIST:
{
WRITE_NODE_FIELD(data.queryStringList);
break;
}
default:
{
break;
}
}
} }
void void
@ -485,8 +517,7 @@ OutTask(OUTFUNC_ARGS)
WRITE_ENUM_FIELD(taskType, TaskType); WRITE_ENUM_FIELD(taskType, TaskType);
WRITE_UINT64_FIELD(jobId); WRITE_UINT64_FIELD(jobId);
WRITE_UINT_FIELD(taskId); WRITE_UINT_FIELD(taskId);
// WRITE_SCALAR_FIELD(taskQuery); WRITE_NODE_FIELD(taskQuery);
WRITE_NODE_FIELD(queryStringList);
WRITE_OID_FIELD(anchorDistributedTableId); WRITE_OID_FIELD(anchorDistributedTableId);
WRITE_UINT64_FIELD(anchorShardId); WRITE_UINT64_FIELD(anchorShardId);
WRITE_NODE_FIELD(taskPlacementList); WRITE_NODE_FIELD(taskPlacementList);

View File

@ -207,7 +207,8 @@ typedef enum TaskQueryType
{ {
TASK_QUERY_TEXT, TASK_QUERY_TEXT,
TASK_QUERY_OBJECT, TASK_QUERY_OBJECT,
TASK_QUERY_TEXT_PER_PLACEMENT TASK_QUERY_TEXT_PER_PLACEMENT,
TASK_QUERY_TEXT_LIST
} TaskQueryType; } TaskQueryType;
typedef struct TaskQuery typedef struct TaskQuery
@ -250,6 +251,14 @@ typedef struct TaskQuery
* perPlacementQueryStrings is used when we have different query strings for each placement. * perPlacementQueryStrings is used when we have different query strings for each placement.
*/ */
List *perPlacementQueryStrings; List *perPlacementQueryStrings;
/*
* queryStringList contains query strings. They should be
* run sequentially. The concatenated version of this list
* will already be set for queryStringLazy, this can be useful
* when we want to access each query string.
*/
List *queryStringList;
}data; }data;
}TaskQuery; }TaskQuery;
@ -262,14 +271,6 @@ typedef struct Task
TaskQuery *taskQuery; TaskQuery *taskQuery;
/*
* queryStringList contains query strings. They should be
* run sequentially. The concatenated version of this list
* will already be set for queryStringLazy, this can be useful
* when we want to access each query string.
*/
List *queryStringList;
Oid anchorDistributedTableId; /* only applies to insert tasks */ Oid anchorDistributedTableId; /* only applies to insert tasks */
uint64 anchorShardId; /* only applies to compute tasks */ uint64 anchorShardId; /* only applies to compute tasks */
List *taskPlacementList; /* only applies to compute tasks */ List *taskPlacementList; /* only applies to compute tasks */