From b5591b1b284aeed0f0171e3da17b9244ef8d6542 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 31 Mar 2020 12:16:32 +0300 Subject: [PATCH] use taskQuery as a struct to simplify the code --- .../distributed/executor/local_executor.c | 1 + .../executor/multi_task_tracker_executor.c | 5 +- .../distributed/planner/deparse_shard_query.c | 52 ++++++------------- .../distributed/utils/citus_copyfuncs.c | 21 ++++---- .../distributed/utils/citus_nodefuncs.c | 4 +- .../distributed/utils/citus_outfuncs.c | 21 ++++---- src/include/distributed/citus_nodefuncs.h | 1 - src/include/distributed/citus_nodes.h | 3 +- .../distributed/multi_physical_planner.h | 7 ++- 9 files changed, 45 insertions(+), 70 deletions(-) diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 9d4ce0ab9..f90bd43f6 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -227,6 +227,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) } else { + /* avoid the overhead of deparsing when using local execution */ shardQueryString = ""; } diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 4d5b8ea17..4ece5b34d 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -1606,9 +1606,8 @@ TrackerQueueTask(TaskTracker *taskTracker, Task *task) HTAB *taskStateHash = taskTracker->taskStateHash; /* wrap a task assignment query outside the original query */ - StringInfo taskAssignmentQuery = TaskAssignmentQuery(task, - TaskQueryStringForAllPlacements( - task)); + StringInfo taskAssignmentQuery = + TaskAssignmentQuery(task, TaskQueryStringForAllPlacements(task)); TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId, task->taskId); diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 464d71ac8..8f271e042 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -42,7 +42,6 @@ static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); static bool ShouldLazyDeparseQuery(Task *task); static char * DeparseTaskQuery(Task *task, Query *query); static bool IsEachPlacementQueryStringDifferent(Task *task); -static void InitializeTaskQueryIfNecessary(Task *task); /* @@ -427,9 +426,8 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query) { if (ShouldLazyDeparseQuery(task)) { - InitializeTaskQueryIfNecessary(task); - task->taskQuery->queryType = TASK_QUERY_OBJECT; - task->taskQuery->data.jobQueryReferenceForLazyDeparsing = query; + task->taskQuery.queryType = TASK_QUERY_OBJECT; + task->taskQuery.data.jobQueryReferenceForLazyDeparsing = query; return; } @@ -445,26 +443,8 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query) void SetTaskQueryString(Task *task, char *queryString) { - InitializeTaskQueryIfNecessary(task); - task->taskQuery->queryType = TASK_QUERY_TEXT; - task->taskQuery->data.queryStringLazy = queryString; -} - - -/* - * InitializeTaskQueryIfNecessary initializes task query if it - * is not yet allocated. - */ -static void -InitializeTaskQueryIfNecessary(Task *task) -{ - if (task->taskQuery == NULL) - { - MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext( - task)); - task->taskQuery = CitusMakeNode(TaskQuery); - MemoryContextSwitchTo(previousContext); - } + task->taskQuery.queryType = TASK_QUERY_TEXT; + task->taskQuery.data.queryStringLazy = queryString; } @@ -474,10 +454,9 @@ InitializeTaskQueryIfNecessary(Task *task) void SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList) { - InitializeTaskQueryIfNecessary(task); Assert(perPlacementQueryStringList != NIL); - task->taskQuery->queryType = TASK_QUERY_TEXT_PER_PLACEMENT; - task->taskQuery->data.perPlacementQueryStrings = perPlacementQueryStringList; + task->taskQuery.queryType = TASK_QUERY_TEXT_PER_PLACEMENT; + task->taskQuery.data.perPlacementQueryStrings = perPlacementQueryStringList; } @@ -487,9 +466,8 @@ SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList) void SetTaskQueryStringList(Task *task, List *queryStringList) { - InitializeTaskQueryIfNecessary(task); - task->taskQuery->queryType = TASK_QUERY_TEXT_LIST; - task->taskQuery->data.queryStringList = queryStringList; + task->taskQuery.queryType = TASK_QUERY_TEXT_LIST; + task->taskQuery.data.queryStringList = queryStringList; } @@ -528,7 +506,7 @@ DeparseTaskQuery(Task *task, Query *query) int GetTaskQueryType(Task *task) { - return task->taskQuery->queryType; + return task->taskQuery.queryType; } @@ -544,15 +522,15 @@ TaskQueryStringForAllPlacements(Task *task) { if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) { - return StringJoin(task->taskQuery->data.queryStringList, ';'); + return StringJoin(task->taskQuery.data.queryStringList, ';'); } if (GetTaskQueryType(task) == TASK_QUERY_TEXT) { - return task->taskQuery->data.queryStringLazy; + return task->taskQuery.data.queryStringLazy; } Query *jobQueryReferenceForLazyDeparsing = - task->taskQuery->data.jobQueryReferenceForLazyDeparsing; - Assert(task->taskQuery->queryType == TASK_QUERY_OBJECT && + task->taskQuery.data.jobQueryReferenceForLazyDeparsing; + Assert(task->taskQuery.queryType == TASK_QUERY_OBJECT && jobQueryReferenceForLazyDeparsing != NULL); @@ -569,7 +547,7 @@ TaskQueryStringForAllPlacements(Task *task) char *queryString = DeparseTaskQuery(task, jobQueryReferenceForLazyDeparsing); MemoryContextSwitchTo(previousContext); SetTaskQueryString(task, queryString); - return task->taskQuery->data.queryStringLazy; + return task->taskQuery.data.queryStringLazy; } @@ -583,7 +561,7 @@ TaskQueryStringForPlacement(Task *task, int placementIndex) if (IsEachPlacementQueryStringDifferent(task)) { List *perPlacementQueryStringList = - task->taskQuery->data.perPlacementQueryStrings; + task->taskQuery.data.perPlacementQueryStrings; Assert(list_length(perPlacementQueryStringList) > placementIndex); return list_nth(perPlacementQueryStringList, placementIndex); } diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 23cbdf349..1aa27b5bf 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -73,6 +73,7 @@ CitusSetTag(Node *node, int tag) } \ while (0) +static void CopyTaskQuery(Task *newnode, Task *from); static void copyJobInfo(Job *newnode, Job *from) @@ -250,35 +251,33 @@ CopyNodeRelationRowLock(COPYFUNC_ARGS) } -void -CopyNodeTaskQuery(COPYFUNC_ARGS) +static void +CopyTaskQuery(Task *newnode, Task *from) { - DECLARE_FROM_AND_NEW_NODE(TaskQuery); - COPY_SCALAR_FIELD(queryType); - - switch (from->queryType) + COPY_SCALAR_FIELD(taskQuery.queryType); + switch (from->taskQuery.queryType) { case TASK_QUERY_TEXT: { - COPY_STRING_FIELD(data.queryStringLazy); + COPY_STRING_FIELD(taskQuery.data.queryStringLazy); break; } case TASK_QUERY_OBJECT: { - COPY_NODE_FIELD(data.jobQueryReferenceForLazyDeparsing); + COPY_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing); break; } case TASK_QUERY_TEXT_PER_PLACEMENT: { - COPY_NODE_FIELD(data.perPlacementQueryStrings); + COPY_NODE_FIELD(taskQuery.data.perPlacementQueryStrings); break; } case TASK_QUERY_TEXT_LIST: { - COPY_NODE_FIELD(data.queryStringList); + COPY_NODE_FIELD(taskQuery.data.queryStringList); break; } @@ -298,7 +297,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(taskType); COPY_SCALAR_FIELD(jobId); COPY_SCALAR_FIELD(taskId); - COPY_NODE_FIELD(taskQuery); + CopyTaskQuery(newnode, from); COPY_SCALAR_FIELD(anchorDistributedTableId); COPY_SCALAR_FIELD(anchorShardId); COPY_NODE_FIELD(taskPlacementList); diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index ec225a826..ebb34f5e4 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -44,8 +44,7 @@ static const char *CitusNodeTagNamesD[] = { "RelationShard", "RelationRowLock", "DeferredErrorMessage", - "GroupShardPlacement", - "TaskQuery", + "GroupShardPlacement" }; const char **CitusNodeTagNames = CitusNodeTagNamesD; @@ -402,7 +401,6 @@ const ExtensibleNodeMethods nodeMethods[] = DEFINE_NODE_METHODS(TaskExecution), DEFINE_NODE_METHODS(DeferredErrorMessage), DEFINE_NODE_METHODS(GroupShardPlacement), - DEFINE_NODE_METHODS(TaskQuery), /* nodes with only output support */ DEFINE_NODE_METHODS_NO_READ(MultiNode), diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 5ec14e9e4..1f51db70d 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -135,7 +135,7 @@ #define booltostr(x) ((x) ? "true" : "false") - +static void WriteTaskQuery(OUTFUNC_ARGS); /***************************************************************************** * Output routines for Citus node types @@ -469,35 +469,34 @@ OutRelationRowLock(OUTFUNC_ARGS) WRITE_ENUM_FIELD(rowLockStrength, LockClauseStrength); } -void OutTaskQuery(OUTFUNC_ARGS) { - WRITE_LOCALS(TaskQuery); - WRITE_NODE_TYPE("TASKQUERY"); +static void WriteTaskQuery(OUTFUNC_ARGS) { + WRITE_LOCALS(Task); - WRITE_ENUM_FIELD(queryType, TaskQueryType); + WRITE_ENUM_FIELD(taskQuery.queryType, TaskQueryType); - switch (node->queryType) + switch (node->taskQuery.queryType) { case TASK_QUERY_TEXT: { - WRITE_STRING_FIELD(data.queryStringLazy); + WRITE_STRING_FIELD(taskQuery.data.queryStringLazy); break; } case TASK_QUERY_OBJECT: { - WRITE_NODE_FIELD(data.jobQueryReferenceForLazyDeparsing); + WRITE_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing); break; } case TASK_QUERY_TEXT_PER_PLACEMENT: { - WRITE_NODE_FIELD(data.perPlacementQueryStrings); + WRITE_NODE_FIELD(taskQuery.data.perPlacementQueryStrings); break; } case TASK_QUERY_TEXT_LIST: { - WRITE_NODE_FIELD(data.queryStringList); + WRITE_NODE_FIELD(taskQuery.data.queryStringList); break; } @@ -517,7 +516,7 @@ OutTask(OUTFUNC_ARGS) WRITE_ENUM_FIELD(taskType, TaskType); WRITE_UINT64_FIELD(jobId); WRITE_UINT_FIELD(taskId); - WRITE_NODE_FIELD(taskQuery); + WriteTaskQuery(str, raw_node); WRITE_OID_FIELD(anchorDistributedTableId); WRITE_UINT64_FIELD(anchorShardId); WRITE_NODE_FIELD(taskPlacementList); diff --git a/src/include/distributed/citus_nodefuncs.h b/src/include/distributed/citus_nodefuncs.h index 2e565a1be..d25f6222c 100644 --- a/src/include/distributed/citus_nodefuncs.h +++ b/src/include/distributed/citus_nodefuncs.h @@ -47,7 +47,6 @@ extern void OutMapMergeJob(OUTFUNC_ARGS); extern void OutShardPlacement(OUTFUNC_ARGS); extern void OutRelationShard(OUTFUNC_ARGS); extern void OutRelationRowLock(OUTFUNC_ARGS); -extern void OutTaskQuery(OUTFUNC_ARGS); extern void OutTask(OUTFUNC_ARGS); extern void OutLocalPlannedStatement(OUTFUNC_ARGS); extern void OutTaskExecution(OUTFUNC_ARGS); diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index 4c39d4d59..27fc8d7b0 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -66,8 +66,7 @@ typedef enum CitusNodeTag T_RelationShard, T_RelationRowLock, T_DeferredErrorMessage, - T_GroupShardPlacement, - T_TaskQuery + T_GroupShardPlacement } CitusNodeTag; diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 9106ab381..74c6e8be0 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -213,7 +213,6 @@ typedef enum TaskQueryType typedef struct TaskQuery { - CitusNode type; TaskQueryType queryType; union @@ -269,7 +268,11 @@ typedef struct Task uint64 jobId; uint32 taskId; - TaskQuery *taskQuery; + /* + * taskQuery contains query string information. The way we get queryString can be different + * so this is abstracted with taskQuery. + */ + TaskQuery taskQuery; Oid anchorDistributedTableId; /* only applies to insert tasks */ uint64 anchorShardId; /* only applies to compute tasks */