From 8806c4d69793d7c474471187e05216a9dd458cc1 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 31 Mar 2020 10:24:15 +0300 Subject: [PATCH] move queryStringList into taskQuery Also allocate task query in the memory context of task. --- .../distributed_intermediate_results.c | 3 +- .../executor/multi_task_tracker_executor.c | 3 +- .../distributed/planner/deparse_shard_query.c | 26 ++++++++++---- .../distributed/planner/multi_explain.c | 3 +- .../distributed/utils/citus_copyfuncs.c | 7 +++- .../distributed/utils/citus_outfuncs.c | 35 +++++++++++++++++-- .../distributed/multi_physical_planner.h | 19 +++++----- 7 files changed, 75 insertions(+), 21 deletions(-) diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index 71071f139..ddaad030e 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -221,7 +221,8 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, shardPlacement->nodeId, quote_literal_cstr(taskPrefix), quote_literal_cstr(taskPrefix), - quote_literal_cstr(TaskQueryStringForAllPlacements(selectTask)), + quote_literal_cstr(TaskQueryStringForAllPlacements( + selectTask)), partitionColumnIndex, quote_literal_cstr(partitionMethodString), minValuesString->data, maxValuesString->data, diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 8ebb2e5de..4d5b8ea17 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -1570,7 +1570,8 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task) */ StringInfo sqlTaskQueryString = makeStringInfo(); - char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringForAllPlacements(task)); + char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringForAllPlacements( + task)); if (BinaryMasterCopyFormat) { diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 286f7bf7b..464d71ac8 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -130,7 +130,8 @@ RebuildQueryStrings(Job *workerJob) task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved; ereport(DEBUG4, (errmsg("query after rebuilding: %s", - ApplyLogRedaction(TaskQueryStringForAllPlacements(task))))); + ApplyLogRedaction(TaskQueryStringForAllPlacements( + task))))); } } @@ -459,7 +460,10 @@ InitializeTaskQueryIfNecessary(Task *task) { if (task->taskQuery == NULL) { + MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext( + task)); task->taskQuery = CitusMakeNode(TaskQuery); + MemoryContextSwitchTo(previousContext); } } @@ -483,8 +487,9 @@ SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList) void SetTaskQueryStringList(Task *task, List *queryStringList) { - task->queryStringList = queryStringList; - SetTaskQueryString(task, StringJoin(queryStringList, ';')); + InitializeTaskQueryIfNecessary(task); + task->taskQuery->queryType = TASK_QUERY_TEXT_LIST; + task->taskQuery->data.queryStringList = queryStringList; } @@ -537,13 +542,18 @@ GetTaskQueryType(Task *task) char * TaskQueryStringForAllPlacements(Task *task) { + if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) + { + return StringJoin(task->taskQuery->data.queryStringList, ';'); + } if (GetTaskQueryType(task) == TASK_QUERY_TEXT) { return task->taskQuery->data.queryStringLazy; } - Query *jobQueryReferenceForLazyDeparsing = task->taskQuery->data.jobQueryReferenceForLazyDeparsing; - Assert(task->taskQuery->queryType == TASK_QUERY_OBJECT && jobQueryReferenceForLazyDeparsing != - NULL); + Query *jobQueryReferenceForLazyDeparsing = + task->taskQuery->data.jobQueryReferenceForLazyDeparsing; + Assert(task->taskQuery->queryType == TASK_QUERY_OBJECT && + jobQueryReferenceForLazyDeparsing != NULL); /* @@ -563,6 +573,10 @@ TaskQueryStringForAllPlacements(Task *task) } +/* + * TaskQueryStringForPlacement returns the query string that should be executed + * on the placement with the given placementIndex. + */ char * TaskQueryStringForPlacement(Task *task, int placementIndex) { diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 95c295405..21a91951f 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -399,7 +399,8 @@ RemoteExplain(Task *task, ExplainState *es) RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0( sizeof(RemoteExplainPlan)); - StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringForAllPlacements(task), + StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringForAllPlacements( + task), es); /* diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index becb8c87b..23cbdf349 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -276,6 +276,12 @@ CopyNodeTaskQuery(COPYFUNC_ARGS) break; } + case TASK_QUERY_TEXT_LIST: + { + COPY_NODE_FIELD(data.queryStringList); + break; + } + default: { break; @@ -293,7 +299,6 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(jobId); COPY_SCALAR_FIELD(taskId); COPY_NODE_FIELD(taskQuery); - COPY_NODE_FIELD(queryStringList); COPY_SCALAR_FIELD(anchorDistributedTableId); COPY_SCALAR_FIELD(anchorShardId); COPY_NODE_FIELD(taskPlacementList); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index df6a16796..5ec14e9e4 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -474,6 +474,38 @@ void OutTaskQuery(OUTFUNC_ARGS) { WRITE_NODE_TYPE("TASKQUERY"); WRITE_ENUM_FIELD(queryType, TaskQueryType); + + switch (node->queryType) + { + case TASK_QUERY_TEXT: + { + WRITE_STRING_FIELD(data.queryStringLazy); + break; + } + + case TASK_QUERY_OBJECT: + { + WRITE_NODE_FIELD(data.jobQueryReferenceForLazyDeparsing); + break; + } + + case TASK_QUERY_TEXT_PER_PLACEMENT: + { + WRITE_NODE_FIELD(data.perPlacementQueryStrings); + break; + } + + case TASK_QUERY_TEXT_LIST: + { + WRITE_NODE_FIELD(data.queryStringList); + break; + } + + default: + { + break; + } + } } void @@ -485,8 +517,7 @@ OutTask(OUTFUNC_ARGS) WRITE_ENUM_FIELD(taskType, TaskType); WRITE_UINT64_FIELD(jobId); WRITE_UINT_FIELD(taskId); - // WRITE_SCALAR_FIELD(taskQuery); - WRITE_NODE_FIELD(queryStringList); + WRITE_NODE_FIELD(taskQuery); WRITE_OID_FIELD(anchorDistributedTableId); WRITE_UINT64_FIELD(anchorShardId); WRITE_NODE_FIELD(taskPlacementList); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 865194117..9106ab381 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -207,7 +207,8 @@ typedef enum TaskQueryType { TASK_QUERY_TEXT, TASK_QUERY_OBJECT, - TASK_QUERY_TEXT_PER_PLACEMENT + TASK_QUERY_TEXT_PER_PLACEMENT, + TASK_QUERY_TEXT_LIST } TaskQueryType; typedef struct TaskQuery @@ -250,6 +251,14 @@ typedef struct TaskQuery * perPlacementQueryStrings is used when we have different query strings for each placement. */ List *perPlacementQueryStrings; + + /* + * queryStringList contains query strings. They should be + * run sequentially. The concatenated version of this list + * will already be set for queryStringLazy, this can be useful + * when we want to access each query string. + */ + List *queryStringList; }data; }TaskQuery; @@ -262,14 +271,6 @@ typedef struct Task TaskQuery *taskQuery; - /* - * queryStringList contains query strings. They should be - * run sequentially. The concatenated version of this list - * will already be set for queryStringLazy, this can be useful - * when we want to access each query string. - */ - List *queryStringList; - Oid anchorDistributedTableId; /* only applies to insert tasks */ uint64 anchorShardId; /* only applies to compute tasks */ List *taskPlacementList; /* only applies to compute tasks */