From 982b5fbabf05b1a811ad80484426333b89ab573f Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Thu, 26 Mar 2020 13:21:31 +0300 Subject: [PATCH 1/5] add SetTaskPerPlacementStrings It is possible that a task will have different query string for each placement. This is the case in INSERT..SELECT via repartitioning. When we are setting task->perPlacementQueryString, we should set queryStringLazy to NULL. Therefore a method for that purpose is created. --- .../distributed_intermediate_results.c | 4 +--- .../distributed/planner/deparse_shard_query.c | 22 ++++++++++++++++--- src/include/distributed/deparse_shard_query.h | 1 + 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index ee11c03b7..7e9d47218 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -228,9 +228,7 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, binaryFormatString); perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data); } - - SetTaskQueryString(selectTask, NULL); - selectTask->perPlacementQueryStrings = perPlacementQueries; + SetTaskPerPlacementQueryStrings(selectTask, perPlacementQueries); } } diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 3651da73a..69d3012bc 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -41,6 +41,7 @@ static void UpdateTaskQueryString(Query *query, Oid distributedTableId, static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); static bool ShouldLazyDeparseQuery(Task *task); static char * DeparseTaskQuery(Task *task, Query *query); +static bool IsEachPlacementQueryStringDifferent(Task* task); /* @@ -426,9 +427,7 @@ SetTaskQuery(Task *task, Query *query) task->queryStringLazy = NULL; return; } - - task->queryForLocalExecution = NULL; - task->queryStringLazy = DeparseTaskQuery(task, query); + SetTaskQueryString(task, DeparseTaskQuery(task,query)); } @@ -444,6 +443,15 @@ SetTaskQueryString(Task *task, char *queryString) task->queryStringLazy = queryString; } +/* + * SetTaskPerPlacementQueryStrings set the perPlacementQueryString for the given task. + */ +void SetTaskPerPlacementQueryStrings(Task *task, List* perPlacementQueryStringList) { + Assert(perPlacementQueryStringList != NIL); + task->perPlacementQueryStrings = perPlacementQueryStringList; + SetTaskQueryString(task, NULL); +} + /* * SetTaskQueryStringList sets the queryStringList of the given task. @@ -516,3 +524,11 @@ TaskQueryString(Task *task) MemoryContextSwitchTo(previousContext); return task->queryStringLazy; } + +/* + * IsEachPlacementQueryStringDifferent returns true if each placement + * has a different query string. + */ +static bool IsEachPlacementQueryStringDifferent(Task* task) { + return list_length(task->perPlacementQueryStrings) > 0; +} \ No newline at end of file diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index dc340821b..95baeb6e4 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -26,6 +26,7 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); extern void SetTaskQuery(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryStringList(Task *task, List *queryStringList); +extern void SetTaskPerPlacementQueryStrings(Task *task, List* perPlacementQueryStringList) extern char * TaskQueryString(Task *task); extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList); From 98f95e2a5e47befe37bc357d4022d4d67aeee7aa Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Thu, 26 Mar 2020 13:32:59 +0300 Subject: [PATCH 2/5] add TaskQueryStringForPlacement TaskQueryStringForPlacement simplifies how the executor gets the query string for a given placement. Task will use the necessary fields to return the correct query placement string. Executor doesn't need to know the details for this. rename TaskQueryString as TaskQueryStringAllPlacements TaskQueryString returns the query string that will be the same for all the placements. In INSERT..SELECT the query string can be different for each placement. Adaptive executor uses TaskQueryStringForPlacement, which returns the query string for a placement. It makes sense to rename TaskQueryString as TaskQueryStringAllPlacements as it is returning the query string for all placements. rename SetTaskQuery as SetTaskQueryIfShouldLazyDeparse SetTaskQuery does not always sets the task query. It can set the query string as well. So it is more clear to name it SetTaskQueryIfShouldLazyDeparse, since it will set the query not query string only when we should deparse the query in a lazy way. --- .../distributed/executor/adaptive_executor.c | 15 +---- .../distributed_intermediate_results.c | 2 +- .../executor/insert_select_executor.c | 2 +- .../distributed/executor/local_executor.c | 6 +- .../executor/multi_task_tracker_executor.c | 13 +++-- .../master/master_delete_protocol.c | 3 +- .../distributed/planner/deparse_shard_query.c | 57 ++++++++++++++----- .../planner/function_call_delegation.c | 2 +- .../distributed/planner/multi_explain.c | 3 +- .../planner/multi_physical_planner.c | 2 +- .../planner/multi_router_planner.c | 4 +- src/include/distributed/deparse_shard_query.h | 8 ++- .../distributed/multi_physical_planner.h | 6 +- 13 files changed, 76 insertions(+), 47 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 1cda964ba..79e181c7b 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -3196,18 +3196,9 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, List *placementAccessList = PlacementAccessListForTask(task, taskPlacement); int querySent = 0; - char *queryString = NULL; - if (list_length(task->perPlacementQueryStrings) == 0) - { - queryString = TaskQueryString(task); - } - else - { - Assert(list_length(task->taskPlacementList) == list_length( - task->perPlacementQueryStrings)); - queryString = list_nth(task->perPlacementQueryStrings, - placementExecution->placementExecutionIndex); - } + char *queryString = TaskQueryStringForPlacement(task, + placementExecution-> + placementExecutionIndex); if (execution->transactionProperties->useRemoteTransactionBlocks != TRANSACTION_BLOCKS_DISALLOWED) diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index 7e9d47218..06614592c 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -221,7 +221,7 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, shardPlacement->nodeId, quote_literal_cstr(taskPrefix), quote_literal_cstr(taskPrefix), - quote_literal_cstr(TaskQueryString(selectTask)), + quote_literal_cstr(TaskQueryStringAllPlacements(selectTask)), partitionColumnIndex, quote_literal_cstr(partitionMethodString), minValuesString->data, maxValuesString->data, diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 4b7c7cc32..809d689ae 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -1091,7 +1091,7 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries) StringInfo wrappedQuery = makeStringInfo(); appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery", projectedColumnsString->data, - TaskQueryString(task)); + TaskQueryStringAllPlacements(task)); SetTaskQueryString(task, wrappedQuery->data); } } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 490028c5b..a15e92849 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -201,7 +201,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) taskParameterTypes = NULL; } - Query *shardQuery = ParseQueryString(TaskQueryString(task), + Query *shardQuery = ParseQueryString(TaskQueryStringAllPlacements(task), taskParameterTypes, taskNumParams); @@ -302,7 +302,7 @@ ExecuteLocalUtilityTaskList(List *localTaskList) foreach_ptr(localTask, localTaskList) { - const char *localTaskQueryCommand = TaskQueryString(localTask); + const char *localTaskQueryCommand = TaskQueryStringAllPlacements(localTask); /* we do not expect tasks with INVALID_SHARD_ID for utility commands */ Assert(localTask->anchorShardId != INVALID_SHARD_ID); @@ -390,7 +390,7 @@ LogLocalCommand(Task *task) } ereport(NOTICE, (errmsg("executing the command locally: %s", - ApplyLogRedaction(TaskQueryString(task))))); + ApplyLogRedaction(TaskQueryStringAllPlacements(task))))); } diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 5dd0db6ea..26a8bc432 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -1570,7 +1570,7 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task) */ StringInfo sqlTaskQueryString = makeStringInfo(); - char *escapedTaskQueryString = quote_literal_cstr(TaskQueryString(task)); + char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringAllPlacements(task)); if (BinaryMasterCopyFormat) { @@ -1605,7 +1605,9 @@ TrackerQueueTask(TaskTracker *taskTracker, Task *task) HTAB *taskStateHash = taskTracker->taskStateHash; /* wrap a task assignment query outside the original query */ - StringInfo taskAssignmentQuery = TaskAssignmentQuery(task, TaskQueryString(task)); + StringInfo taskAssignmentQuery = TaskAssignmentQuery(task, + TaskQueryStringAllPlacements( + task)); TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId, task->taskId); @@ -2742,7 +2744,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) { /* assign through task tracker to manage resource utilization */ StringInfo jobCleanupQuery = TaskAssignmentQuery( - jobCleanupTask, TaskQueryString(jobCleanupTask)); + jobCleanupTask, TaskQueryStringAllPlacements(jobCleanupTask)); jobCleanupQuerySent = MultiClientSendQuery(taskTracker->connectionId, jobCleanupQuery->data); @@ -2821,7 +2823,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) nodeName, nodePort, (int) queryStatus), errhint("Manually clean job resources on node " "\"%s:%u\" by running \"%s\" ", nodeName, - nodePort, TaskQueryString( + nodePort, TaskQueryStringAllPlacements( jobCleanupTask)))); } else @@ -2840,7 +2842,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) nodePort, (int) resultStatus), errhint("Manually clean job resources on node " "\"%s:%u\" by running \"%s\" ", nodeName, - nodePort, TaskQueryString(jobCleanupTask)))); + nodePort, TaskQueryStringAllPlacements( + jobCleanupTask)))); } else { diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 397e1459e..77c119a59 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -447,7 +447,8 @@ DropShards(Oid relationId, char *schemaName, char *relationName, * connect to that node to drop the shard placement over that * remote connection. */ - const char *dropShardPlacementCommand = TaskQueryString(task); + const char *dropShardPlacementCommand = TaskQueryStringAllPlacements( + task); ExecuteDropShardPlacementCommandRemotely(shardPlacement, relationName, dropShardPlacementCommand); diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 69d3012bc..a657b7809 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -41,7 +41,8 @@ static void UpdateTaskQueryString(Query *query, Oid distributedTableId, static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); static bool ShouldLazyDeparseQuery(Task *task); static char * DeparseTaskQuery(Task *task, Query *query); -static bool IsEachPlacementQueryStringDifferent(Task* task); +static bool IsEachPlacementQueryStringDifferent(Task *task); +static char * TaskQueryStringForAllPlacements(Task *task); /* @@ -116,7 +117,8 @@ RebuildQueryStrings(Job *workerJob) task->queryForLocalExecution == NULL && task->queryStringLazy == NULL ? "(null)" - : ApplyLogRedaction(TaskQueryString(task))))); + : ApplyLogRedaction(TaskQueryStringAllPlacements( + task))))); UpdateTaskQueryString(query, relationId, valuesRTE, task); @@ -127,7 +129,7 @@ RebuildQueryStrings(Job *workerJob) task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved; ereport(DEBUG4, (errmsg("query after rebuilding: %s", - ApplyLogRedaction(TaskQueryString(task))))); + ApplyLogRedaction(TaskQueryStringAllPlacements(task))))); } } @@ -182,7 +184,7 @@ UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *value task->anchorDistributedTableId = distributedTableId; } - SetTaskQuery(task, query); + SetTaskQueryIfShouldLazyDeparse(task, query); if (valuesRTE != NULL) { @@ -413,13 +415,13 @@ ShouldLazyDeparseQuery(Task *task) /* - * SetTaskQuery attaches the query to the task so that it can be used during + * SetTaskQueryIfShouldLazyDeparse attaches the query to the task so that it can be used during * execution. If local execution can possibly take place it sets task->queryForLocalExecution. * If not it deparses the query and sets queryStringLazy, to avoid blowing the * size of the task unnecesarily. */ void -SetTaskQuery(Task *task, Query *query) +SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query) { if (ShouldLazyDeparseQuery(task)) { @@ -427,7 +429,7 @@ SetTaskQuery(Task *task, Query *query) task->queryStringLazy = NULL; return; } - SetTaskQueryString(task, DeparseTaskQuery(task,query)); + SetTaskQueryString(task, DeparseTaskQuery(task, query)); } @@ -443,10 +445,13 @@ SetTaskQueryString(Task *task, char *queryString) task->queryStringLazy = queryString; } + /* * SetTaskPerPlacementQueryStrings set the perPlacementQueryString for the given task. */ -void SetTaskPerPlacementQueryStrings(Task *task, List* perPlacementQueryStringList) { +void +SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList) +{ Assert(perPlacementQueryStringList != NIL); task->perPlacementQueryStrings = perPlacementQueryStringList; SetTaskQueryString(task, NULL); @@ -494,14 +499,25 @@ DeparseTaskQuery(Task *task, Query *query) /* - * TaskQueryString generates task->queryStringLazy if missing. + * TaskQueryStringAllPlacements generates task->queryStringLazy if missing. * * For performance reasons, the queryString is generated lazily. For example * for local queries it is usually not needed to generate it, so this way we * can skip the expensive deparsing+parsing. */ char * -TaskQueryString(Task *task) +TaskQueryStringAllPlacements(Task *task) +{ + return TaskQueryStringForAllPlacements(task); +} + + +/* + * TaskQueryStringForAllPlacements returns the query string for the given task. + * In this case, each placement has the same query string. + */ +static char * +TaskQueryStringForAllPlacements(Task *task) { if (task->queryStringLazy != NULL) { @@ -520,15 +536,30 @@ TaskQueryString(Task *task) */ MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext( task->queryForLocalExecution)); - task->queryStringLazy = DeparseTaskQuery(task, task->queryForLocalExecution); + SetTaskQueryString(task, DeparseTaskQuery(task, task->queryForLocalExecution)); MemoryContextSwitchTo(previousContext); return task->queryStringLazy; } + +char * +TaskQueryStringForPlacement(Task *task, int placementIndex) +{ + if (IsEachPlacementQueryStringDifferent(task)) + { + Assert(list_length(task->perPlacementQueryStrings) > placementIndex); + return list_nth(task->perPlacementQueryStrings, placementIndex); + } + return TaskQueryStringForAllPlacements(task); +} + + /* * IsEachPlacementQueryStringDifferent returns true if each placement * has a different query string. */ -static bool IsEachPlacementQueryStringDifferent(Task* task) { +static bool +IsEachPlacementQueryStringDifferent(Task *task) +{ return list_length(task->perPlacementQueryStrings) > 0; -} \ No newline at end of file +} diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index a521b36ab..69e711560 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -367,7 +367,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) task = CitusMakeNode(Task); task->taskType = SELECT_TASK; task->taskPlacementList = placementList; - SetTaskQuery(task, planContext->query); + SetTaskQueryIfShouldLazyDeparse(task, planContext->query); task->anchorShardId = shardInterval->shardId; task->replicationModel = distTable->replicationModel; diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index cc36677f7..c5a237f22 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -399,7 +399,8 @@ RemoteExplain(Task *task, ExplainState *es) RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0( sizeof(RemoteExplainPlan)); - StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryString(task), es); + StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringAllPlacements(task), + es); /* * Use a coordinated transaction to ensure that we open a transaction block diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 00d4908d7..c630bc5c9 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -4388,7 +4388,7 @@ CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask, /* wrap repartition query string around filter query string */ StringInfo mapQueryString = makeStringInfo(); - char *filterQueryString = TaskQueryString(filterTask); + char *filterQueryString = TaskQueryStringAllPlacements(filterTask); char *filterQueryEscapedText = quote_literal_cstr(filterQueryString); PartitionType partitionType = mapMergeJob->partitionType; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 588bbd3db..fa2e16e3c 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1820,7 +1820,7 @@ SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList, * that the query cannot be executed locally. */ task->taskPlacementList = placementList; - SetTaskQuery(task, query); + SetTaskQueryIfShouldLazyDeparse(task, query); task->anchorShardId = shardId; task->jobId = jobId; task->relationShardList = relationShardList; @@ -1901,7 +1901,7 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, } task->taskPlacementList = placementList; - SetTaskQuery(task, query); + SetTaskQueryIfShouldLazyDeparse(task, query); task->anchorShardId = shardId; task->jobId = jobId; task->relationShardList = relationShardList; diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 95baeb6e4..e4d3734c1 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -23,11 +23,13 @@ extern void RebuildQueryStrings(Job *workerJob); extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); -extern void SetTaskQuery(Task *task, Query *query); +extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryStringList(Task *task, List *queryStringList); -extern void SetTaskPerPlacementQueryStrings(Task *task, List* perPlacementQueryStringList) -extern char * TaskQueryString(Task *task); +extern void SetTaskPerPlacementQueryStrings(Task *task, + List *perPlacementQueryStringList); +extern char * TaskQueryStringAllPlacements(Task *task); +extern char * TaskQueryStringForPlacement(Task *task, int placementIndex); extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList); #endif /* DEPARSE_SHARD_QUERY_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 2f2a65acc..ac619ba1f 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -224,17 +224,17 @@ typedef struct Task * query could possibly be locally executed. In that case deparsing+parsing * the query might not be necessary, so we do that lazily. * - * queryForLocalExecution should only be set by using SetTaskQuery() + * queryForLocalExecution should only be set by using SetTaskQueryIfShouldLazyDeparse() */ Query *queryForLocalExecution; /* * In almost all cases queryStringLazy should be read only indirectly by - * using TaskQueryString(). This will populate the field if only the + * using TaskQueryStringAllPlacements(). This will populate the field if only the * queryForLocalExecution field is not NULL. * * This field should only be set by using SetTaskQueryString() (or as a - * side effect from TaskQueryString()). Otherwise it might not be in sync + * side effect from TaskQueryStringAllPlacements()). Otherwise it might not be in sync * with queryForLocalExecution. */ char *queryStringLazy; From c796ac335df80e87fc0ed8801a9250cbb817e93e Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Fri, 27 Mar 2020 14:14:09 +0300 Subject: [PATCH 3/5] add TaskQuery struct to abstract query string related fields We had many fields in task related to query strings. It was kind of complex, and only of them could be set at a time. Therefore it makes more sense to abstract this and use a union so that it is clear that only of them should be set. We have three fields that could have query related strings: - queryForLocation - queryStringLazy - perPlacementQueryStrings Relatively, they can be set with: - SetTaskQueryString - SetTaskQueryIfShouldLazyDeparse - SetTaskPerPlacementQueryStrings The direct usage of the query related fields are also removed. Rename queryForLocalExecution Currently queryForLocalExecution is only used for deparsing purposes, therefore it makes sense to rename it to what it is doing. --- .../distributed_intermediate_results.c | 2 +- .../executor/insert_select_executor.c | 4 +- .../distributed/executor/local_executor.c | 18 ++-- .../executor/multi_task_tracker_executor.c | 10 +- .../master/master_delete_protocol.c | 2 +- .../distributed/planner/deparse_shard_query.c | 95 ++++++++++++------- .../distributed/planner/multi_explain.c | 2 +- .../planner/multi_physical_planner.c | 2 +- .../distributed/utils/citus_copyfuncs.c | 38 +++++++- .../distributed/utils/citus_nodefuncs.c | 4 +- .../distributed/utils/citus_outfuncs.c | 10 +- src/include/distributed/citus_nodefuncs.h | 2 + src/include/distributed/citus_nodes.h | 3 +- src/include/distributed/deparse_shard_query.h | 3 +- .../distributed/multi_physical_planner.h | 80 ++++++++++------ 15 files changed, 184 insertions(+), 91 deletions(-) diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index 06614592c..71071f139 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -221,7 +221,7 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, shardPlacement->nodeId, quote_literal_cstr(taskPrefix), quote_literal_cstr(taskPrefix), - quote_literal_cstr(TaskQueryStringAllPlacements(selectTask)), + quote_literal_cstr(TaskQueryStringForAllPlacements(selectTask)), partitionColumnIndex, quote_literal_cstr(partitionMethodString), minValuesString->data, maxValuesString->data, diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 809d689ae..a982213c8 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -1062,7 +1062,7 @@ IsRedistributablePlan(Plan *selectPlan) /* - * WrapTaskListForProjection wraps task->queryString to only select given + * WrapTaskListForProjection wraps task query string to only select given * projected columns. It modifies the taskList. */ static void @@ -1091,7 +1091,7 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries) StringInfo wrappedQuery = makeStringInfo(); appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery", projectedColumnsString->data, - TaskQueryStringAllPlacements(task)); + TaskQueryStringForAllPlacements(task)); SetTaskQueryString(task, wrappedQuery->data); } } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index a15e92849..9d4ce0ab9 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -201,7 +201,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) taskParameterTypes = NULL; } - Query *shardQuery = ParseQueryString(TaskQueryStringAllPlacements(task), + Query *shardQuery = ParseQueryString(TaskQueryStringForAllPlacements(task), taskParameterTypes, taskNumParams); @@ -220,9 +220,15 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) LogLocalCommand(task); - char *shardQueryString = task->queryStringLazy - ? task->queryStringLazy - : ""; + char *shardQueryString = NULL; + if (GetTaskQueryType(task) == TASK_QUERY_TEXT) + { + shardQueryString = TaskQueryStringForAllPlacements(task); + } + else + { + shardQueryString = ""; + } totalRowsProcessed += ExecuteLocalTaskPlan(scanState, localPlan, shardQueryString); @@ -302,7 +308,7 @@ ExecuteLocalUtilityTaskList(List *localTaskList) foreach_ptr(localTask, localTaskList) { - const char *localTaskQueryCommand = TaskQueryStringAllPlacements(localTask); + const char *localTaskQueryCommand = TaskQueryStringForAllPlacements(localTask); /* we do not expect tasks with INVALID_SHARD_ID for utility commands */ Assert(localTask->anchorShardId != INVALID_SHARD_ID); @@ -390,7 +396,7 @@ LogLocalCommand(Task *task) } ereport(NOTICE, (errmsg("executing the command locally: %s", - ApplyLogRedaction(TaskQueryStringAllPlacements(task))))); + ApplyLogRedaction(TaskQueryStringForAllPlacements(task))))); } diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 26a8bc432..8ebb2e5de 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -1570,7 +1570,7 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task) */ StringInfo sqlTaskQueryString = makeStringInfo(); - char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringAllPlacements(task)); + char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringForAllPlacements(task)); if (BinaryMasterCopyFormat) { @@ -1606,7 +1606,7 @@ TrackerQueueTask(TaskTracker *taskTracker, Task *task) /* wrap a task assignment query outside the original query */ StringInfo taskAssignmentQuery = TaskAssignmentQuery(task, - TaskQueryStringAllPlacements( + TaskQueryStringForAllPlacements( task)); TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId, @@ -2744,7 +2744,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) { /* assign through task tracker to manage resource utilization */ StringInfo jobCleanupQuery = TaskAssignmentQuery( - jobCleanupTask, TaskQueryStringAllPlacements(jobCleanupTask)); + jobCleanupTask, TaskQueryStringForAllPlacements(jobCleanupTask)); jobCleanupQuerySent = MultiClientSendQuery(taskTracker->connectionId, jobCleanupQuery->data); @@ -2823,7 +2823,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) nodeName, nodePort, (int) queryStatus), errhint("Manually clean job resources on node " "\"%s:%u\" by running \"%s\" ", nodeName, - nodePort, TaskQueryStringAllPlacements( + nodePort, TaskQueryStringForAllPlacements( jobCleanupTask)))); } else @@ -2842,7 +2842,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) nodePort, (int) resultStatus), errhint("Manually clean job resources on node " "\"%s:%u\" by running \"%s\" ", nodeName, - nodePort, TaskQueryStringAllPlacements( + nodePort, TaskQueryStringForAllPlacements( jobCleanupTask)))); } else diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 77c119a59..63827f5f3 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -447,7 +447,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName, * connect to that node to drop the shard placement over that * remote connection. */ - const char *dropShardPlacementCommand = TaskQueryStringAllPlacements( + const char *dropShardPlacementCommand = TaskQueryStringForAllPlacements( task); ExecuteDropShardPlacementCommandRemotely(shardPlacement, relationName, diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index a657b7809..286f7bf7b 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -42,7 +42,7 @@ static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); static bool ShouldLazyDeparseQuery(Task *task); static char * DeparseTaskQuery(Task *task, Query *query); static bool IsEachPlacementQueryStringDifferent(Task *task); -static char * TaskQueryStringForAllPlacements(Task *task); +static void InitializeTaskQueryIfNecessary(Task *task); /* @@ -113,11 +113,12 @@ RebuildQueryStrings(Job *workerJob) } } + bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT || + GetTaskQueryType(task) == TASK_QUERY_OBJECT; ereport(DEBUG4, (errmsg("query before rebuilding: %s", - task->queryForLocalExecution == NULL && - task->queryStringLazy == NULL + !isQueryObjectOrText ? "(null)" - : ApplyLogRedaction(TaskQueryStringAllPlacements( + : ApplyLogRedaction(TaskQueryStringForAllPlacements( task))))); UpdateTaskQueryString(query, relationId, valuesRTE, task); @@ -129,7 +130,7 @@ RebuildQueryStrings(Job *workerJob) task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved; ereport(DEBUG4, (errmsg("query after rebuilding: %s", - ApplyLogRedaction(TaskQueryStringAllPlacements(task))))); + ApplyLogRedaction(TaskQueryStringForAllPlacements(task))))); } } @@ -416,7 +417,7 @@ ShouldLazyDeparseQuery(Task *task) /* * SetTaskQueryIfShouldLazyDeparse attaches the query to the task so that it can be used during - * execution. If local execution can possibly take place it sets task->queryForLocalExecution. + * execution. If local execution can possibly take place it sets task->jobQueryReferenceForLazyDeparsing. * If not it deparses the query and sets queryStringLazy, to avoid blowing the * size of the task unnecesarily. */ @@ -425,24 +426,41 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query) { if (ShouldLazyDeparseQuery(task)) { - task->queryForLocalExecution = query; - task->queryStringLazy = NULL; + InitializeTaskQueryIfNecessary(task); + task->taskQuery->queryType = TASK_QUERY_OBJECT; + task->taskQuery->data.jobQueryReferenceForLazyDeparsing = query; return; } + SetTaskQueryString(task, DeparseTaskQuery(task, query)); } /* * SetTaskQueryString attaches the query string to the task so that it can be - * used during execution. It also unsets queryForLocalExecution to be sure + * used during execution. It also unsets jobQueryReferenceForLazyDeparsing to be sure * these are kept in sync. */ void SetTaskQueryString(Task *task, char *queryString) { - task->queryForLocalExecution = NULL; - task->queryStringLazy = queryString; + InitializeTaskQueryIfNecessary(task); + task->taskQuery->queryType = TASK_QUERY_TEXT; + task->taskQuery->data.queryStringLazy = queryString; +} + + +/* + * InitializeTaskQueryIfNecessary initializes task query if it + * is not yet allocated. + */ +static void +InitializeTaskQueryIfNecessary(Task *task) +{ + if (task->taskQuery == NULL) + { + task->taskQuery = CitusMakeNode(TaskQuery); + } } @@ -452,9 +470,10 @@ SetTaskQueryString(Task *task, char *queryString) void SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList) { + InitializeTaskQueryIfNecessary(task); Assert(perPlacementQueryStringList != NIL); - task->perPlacementQueryStrings = perPlacementQueryStringList; - SetTaskQueryString(task, NULL); + task->taskQuery->queryType = TASK_QUERY_TEXT_PER_PLACEMENT; + task->taskQuery->data.perPlacementQueryStrings = perPlacementQueryStringList; } @@ -499,46 +518,48 @@ DeparseTaskQuery(Task *task, Query *query) /* - * TaskQueryStringAllPlacements generates task->queryStringLazy if missing. + * GetTaskQueryType returns the type of the task query. + */ +int +GetTaskQueryType(Task *task) +{ + return task->taskQuery->queryType; +} + + +/* + * TaskQueryStringForAllPlacements generates task query string text if missing. * * For performance reasons, the queryString is generated lazily. For example * for local queries it is usually not needed to generate it, so this way we * can skip the expensive deparsing+parsing. */ char * -TaskQueryStringAllPlacements(Task *task) -{ - return TaskQueryStringForAllPlacements(task); -} - - -/* - * TaskQueryStringForAllPlacements returns the query string for the given task. - * In this case, each placement has the same query string. - */ -static char * TaskQueryStringForAllPlacements(Task *task) { - if (task->queryStringLazy != NULL) + if (GetTaskQueryType(task) == TASK_QUERY_TEXT) { - return task->queryStringLazy; + return task->taskQuery->data.queryStringLazy; } - Assert(task->queryForLocalExecution != NULL); + Query *jobQueryReferenceForLazyDeparsing = task->taskQuery->data.jobQueryReferenceForLazyDeparsing; + Assert(task->taskQuery->queryType == TASK_QUERY_OBJECT && jobQueryReferenceForLazyDeparsing != + NULL); /* - * Switch to the memory context of task->queryForLocalExecution before generating the query + * Switch to the memory context of task->jobQueryReferenceForLazyDeparsing before generating the query * string. This way the query string is not freed in between multiple * executions of a prepared statement. Except when UpdateTaskQueryString is - * used to set task->queryForLocalExecution, in that case it is freed but it will be set to + * used to set task->jobQueryReferenceForLazyDeparsing, in that case it is freed but it will be set to * NULL on the next execution of the query because UpdateTaskQueryString * does that. */ MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext( - task->queryForLocalExecution)); - SetTaskQueryString(task, DeparseTaskQuery(task, task->queryForLocalExecution)); + jobQueryReferenceForLazyDeparsing)); + char *queryString = DeparseTaskQuery(task, jobQueryReferenceForLazyDeparsing); MemoryContextSwitchTo(previousContext); - return task->queryStringLazy; + SetTaskQueryString(task, queryString); + return task->taskQuery->data.queryStringLazy; } @@ -547,8 +568,10 @@ TaskQueryStringForPlacement(Task *task, int placementIndex) { if (IsEachPlacementQueryStringDifferent(task)) { - Assert(list_length(task->perPlacementQueryStrings) > placementIndex); - return list_nth(task->perPlacementQueryStrings, placementIndex); + List *perPlacementQueryStringList = + task->taskQuery->data.perPlacementQueryStrings; + Assert(list_length(perPlacementQueryStringList) > placementIndex); + return list_nth(perPlacementQueryStringList, placementIndex); } return TaskQueryStringForAllPlacements(task); } @@ -561,5 +584,5 @@ TaskQueryStringForPlacement(Task *task, int placementIndex) static bool IsEachPlacementQueryStringDifferent(Task *task) { - return list_length(task->perPlacementQueryStrings) > 0; + return GetTaskQueryType(task) == TASK_QUERY_TEXT_PER_PLACEMENT; } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index c5a237f22..95c295405 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -399,7 +399,7 @@ RemoteExplain(Task *task, ExplainState *es) RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0( sizeof(RemoteExplainPlan)); - StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringAllPlacements(task), + StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringForAllPlacements(task), es); /* diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index c630bc5c9..1287e47b6 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -4388,7 +4388,7 @@ CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask, /* wrap repartition query string around filter query string */ StringInfo mapQueryString = makeStringInfo(); - char *filterQueryString = TaskQueryStringAllPlacements(filterTask); + char *filterQueryString = TaskQueryStringForAllPlacements(filterTask); char *filterQueryEscapedText = quote_literal_cstr(filterQueryString); PartitionType partitionType = mapMergeJob->partitionType; diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 5d6d47218..becb8c87b 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -250,6 +250,40 @@ CopyNodeRelationRowLock(COPYFUNC_ARGS) } +void +CopyNodeTaskQuery(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(TaskQuery); + COPY_SCALAR_FIELD(queryType); + + switch (from->queryType) + { + case TASK_QUERY_TEXT: + { + COPY_STRING_FIELD(data.queryStringLazy); + break; + } + + case TASK_QUERY_OBJECT: + { + COPY_NODE_FIELD(data.jobQueryReferenceForLazyDeparsing); + break; + } + + case TASK_QUERY_TEXT_PER_PLACEMENT: + { + COPY_NODE_FIELD(data.perPlacementQueryStrings); + break; + } + + default: + { + break; + } + } +} + + void CopyNodeTask(COPYFUNC_ARGS) { @@ -258,9 +292,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(taskType); COPY_SCALAR_FIELD(jobId); COPY_SCALAR_FIELD(taskId); - COPY_NODE_FIELD(queryForLocalExecution); - COPY_STRING_FIELD(queryStringLazy); - COPY_NODE_FIELD(perPlacementQueryStrings); + COPY_NODE_FIELD(taskQuery); COPY_NODE_FIELD(queryStringList); COPY_SCALAR_FIELD(anchorDistributedTableId); COPY_SCALAR_FIELD(anchorShardId); diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index ebb34f5e4..ec225a826 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -44,7 +44,8 @@ static const char *CitusNodeTagNamesD[] = { "RelationShard", "RelationRowLock", "DeferredErrorMessage", - "GroupShardPlacement" + "GroupShardPlacement", + "TaskQuery", }; const char **CitusNodeTagNames = CitusNodeTagNamesD; @@ -401,6 +402,7 @@ const ExtensibleNodeMethods nodeMethods[] = DEFINE_NODE_METHODS(TaskExecution), DEFINE_NODE_METHODS(DeferredErrorMessage), DEFINE_NODE_METHODS(GroupShardPlacement), + DEFINE_NODE_METHODS(TaskQuery), /* nodes with only output support */ DEFINE_NODE_METHODS_NO_READ(MultiNode), diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 029d8d964..df6a16796 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -469,6 +469,12 @@ OutRelationRowLock(OUTFUNC_ARGS) WRITE_ENUM_FIELD(rowLockStrength, LockClauseStrength); } +void OutTaskQuery(OUTFUNC_ARGS) { + WRITE_LOCALS(TaskQuery); + WRITE_NODE_TYPE("TASKQUERY"); + + WRITE_ENUM_FIELD(queryType, TaskQueryType); +} void OutTask(OUTFUNC_ARGS) @@ -479,9 +485,7 @@ OutTask(OUTFUNC_ARGS) WRITE_ENUM_FIELD(taskType, TaskType); WRITE_UINT64_FIELD(jobId); WRITE_UINT_FIELD(taskId); - WRITE_NODE_FIELD(queryForLocalExecution); - WRITE_STRING_FIELD(queryStringLazy); - WRITE_NODE_FIELD(perPlacementQueryStrings); + // WRITE_SCALAR_FIELD(taskQuery); WRITE_NODE_FIELD(queryStringList); WRITE_OID_FIELD(anchorDistributedTableId); WRITE_UINT64_FIELD(anchorShardId); diff --git a/src/include/distributed/citus_nodefuncs.h b/src/include/distributed/citus_nodefuncs.h index e4b811d95..2e565a1be 100644 --- a/src/include/distributed/citus_nodefuncs.h +++ b/src/include/distributed/citus_nodefuncs.h @@ -47,6 +47,7 @@ extern void OutMapMergeJob(OUTFUNC_ARGS); extern void OutShardPlacement(OUTFUNC_ARGS); extern void OutRelationShard(OUTFUNC_ARGS); extern void OutRelationRowLock(OUTFUNC_ARGS); +extern void OutTaskQuery(OUTFUNC_ARGS); extern void OutTask(OUTFUNC_ARGS); extern void OutLocalPlannedStatement(OUTFUNC_ARGS); extern void OutTaskExecution(OUTFUNC_ARGS); @@ -76,6 +77,7 @@ extern void CopyNodeRelationShard(COPYFUNC_ARGS); extern void CopyNodeRelationRowLock(COPYFUNC_ARGS); extern void CopyNodeTask(COPYFUNC_ARGS); extern void CopyNodeLocalPlannedStatement(COPYFUNC_ARGS); +extern void CopyNodeTaskQuery(COPYFUNC_ARGS); extern void CopyNodeTaskExecution(COPYFUNC_ARGS); extern void CopyNodeDeferredErrorMessage(COPYFUNC_ARGS); diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index 27fc8d7b0..4c39d4d59 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -66,7 +66,8 @@ typedef enum CitusNodeTag T_RelationShard, T_RelationRowLock, T_DeferredErrorMessage, - T_GroupShardPlacement + T_GroupShardPlacement, + T_TaskQuery } CitusNodeTag; diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index e4d3734c1..7eef84675 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -28,8 +28,9 @@ extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryStringList(Task *task, List *queryStringList); extern void SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList); -extern char * TaskQueryStringAllPlacements(Task *task); +extern char * TaskQueryStringForAllPlacements(Task *task); extern char * TaskQueryStringForPlacement(Task *task, int placementIndex); extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList); +extern int GetTaskQueryType(Task *task); #endif /* DEPARSE_SHARD_QUERY_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index ac619ba1f..865194117 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -203,6 +203,56 @@ typedef struct MapMergeJob */ typedef struct TaskExecution TaskExecution; +typedef enum TaskQueryType +{ + TASK_QUERY_TEXT, + TASK_QUERY_OBJECT, + TASK_QUERY_TEXT_PER_PLACEMENT +} TaskQueryType; + +typedef struct TaskQuery +{ + CitusNode type; + TaskQueryType queryType; + + union + { + /* + * For most queries jobQueryReferenceForLazyDeparsing and/or queryStringLazy is not + * NULL. This means we have a single query for all placements. + * + * If this is not the case, the length of perPlacementQueryStrings is + * non-zero and equal to length of taskPlacementList. Like this it can + * assign a different query for each placement. We need this flexibility + * when a query should return node specific values. For example, on which + * node did we succeed storing some result files? + * + * jobQueryReferenceForLazyDeparsing is only not null when the planner thinks the + * query could possibly be locally executed. In that case deparsing+parsing + * the query might not be necessary, so we do that lazily. + * + * jobQueryReferenceForLazyDeparsing should only be set by using SetTaskQueryIfShouldLazyDeparse() + */ + Query *jobQueryReferenceForLazyDeparsing; + + /* + * In almost all cases queryStringLazy should be read only indirectly by + * using TaskQueryStringForAllPlacements(). This will populate the field if only the + * jobQueryReferenceForLazyDeparsing field is not NULL. + * + * This field should only be set by using SetTaskQueryString() (or as a + * side effect from TaskQueryStringForAllPlacements()). Otherwise it might not be in sync + * with jobQueryReferenceForLazyDeparsing. + */ + char *queryStringLazy; + + /* + * perPlacementQueryStrings is used when we have different query strings for each placement. + */ + List *perPlacementQueryStrings; + }data; +}TaskQuery; + typedef struct Task { CitusNode type; @@ -210,35 +260,7 @@ typedef struct Task uint64 jobId; uint32 taskId; - /* - * For most queries queryForLocalExecution and/or queryStringLazy is not - * NULL. This means we have a single query for all placements. - * - * If this is not the case, the length of perPlacementQueryStrings is - * non-zero and equal to length of taskPlacementList. Like this it can - * assign a different query for each placement. We need this flexibility - * when a query should return node specific values. For example, on which - * node did we succeed storing some result files? - * - * queryForLocalExecution is only not null when the planner thinks the - * query could possibly be locally executed. In that case deparsing+parsing - * the query might not be necessary, so we do that lazily. - * - * queryForLocalExecution should only be set by using SetTaskQueryIfShouldLazyDeparse() - */ - Query *queryForLocalExecution; - - /* - * In almost all cases queryStringLazy should be read only indirectly by - * using TaskQueryStringAllPlacements(). This will populate the field if only the - * queryForLocalExecution field is not NULL. - * - * This field should only be set by using SetTaskQueryString() (or as a - * side effect from TaskQueryStringAllPlacements()). Otherwise it might not be in sync - * with queryForLocalExecution. - */ - char *queryStringLazy; - List *perPlacementQueryStrings; + TaskQuery *taskQuery; /* * queryStringList contains query strings. They should be From 8806c4d69793d7c474471187e05216a9dd458cc1 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 31 Mar 2020 10:24:15 +0300 Subject: [PATCH 4/5] move queryStringList into taskQuery Also allocate task query in the memory context of task. --- .../distributed_intermediate_results.c | 3 +- .../executor/multi_task_tracker_executor.c | 3 +- .../distributed/planner/deparse_shard_query.c | 26 ++++++++++---- .../distributed/planner/multi_explain.c | 3 +- .../distributed/utils/citus_copyfuncs.c | 7 +++- .../distributed/utils/citus_outfuncs.c | 35 +++++++++++++++++-- .../distributed/multi_physical_planner.h | 19 +++++----- 7 files changed, 75 insertions(+), 21 deletions(-) diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index 71071f139..ddaad030e 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -221,7 +221,8 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, shardPlacement->nodeId, quote_literal_cstr(taskPrefix), quote_literal_cstr(taskPrefix), - quote_literal_cstr(TaskQueryStringForAllPlacements(selectTask)), + quote_literal_cstr(TaskQueryStringForAllPlacements( + selectTask)), partitionColumnIndex, quote_literal_cstr(partitionMethodString), minValuesString->data, maxValuesString->data, diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 8ebb2e5de..4d5b8ea17 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -1570,7 +1570,8 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task) */ StringInfo sqlTaskQueryString = makeStringInfo(); - char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringForAllPlacements(task)); + char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringForAllPlacements( + task)); if (BinaryMasterCopyFormat) { diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 286f7bf7b..464d71ac8 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -130,7 +130,8 @@ RebuildQueryStrings(Job *workerJob) task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved; ereport(DEBUG4, (errmsg("query after rebuilding: %s", - ApplyLogRedaction(TaskQueryStringForAllPlacements(task))))); + ApplyLogRedaction(TaskQueryStringForAllPlacements( + task))))); } } @@ -459,7 +460,10 @@ InitializeTaskQueryIfNecessary(Task *task) { if (task->taskQuery == NULL) { + MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext( + task)); task->taskQuery = CitusMakeNode(TaskQuery); + MemoryContextSwitchTo(previousContext); } } @@ -483,8 +487,9 @@ SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList) void SetTaskQueryStringList(Task *task, List *queryStringList) { - task->queryStringList = queryStringList; - SetTaskQueryString(task, StringJoin(queryStringList, ';')); + InitializeTaskQueryIfNecessary(task); + task->taskQuery->queryType = TASK_QUERY_TEXT_LIST; + task->taskQuery->data.queryStringList = queryStringList; } @@ -537,13 +542,18 @@ GetTaskQueryType(Task *task) char * TaskQueryStringForAllPlacements(Task *task) { + if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) + { + return StringJoin(task->taskQuery->data.queryStringList, ';'); + } if (GetTaskQueryType(task) == TASK_QUERY_TEXT) { return task->taskQuery->data.queryStringLazy; } - Query *jobQueryReferenceForLazyDeparsing = task->taskQuery->data.jobQueryReferenceForLazyDeparsing; - Assert(task->taskQuery->queryType == TASK_QUERY_OBJECT && jobQueryReferenceForLazyDeparsing != - NULL); + Query *jobQueryReferenceForLazyDeparsing = + task->taskQuery->data.jobQueryReferenceForLazyDeparsing; + Assert(task->taskQuery->queryType == TASK_QUERY_OBJECT && + jobQueryReferenceForLazyDeparsing != NULL); /* @@ -563,6 +573,10 @@ TaskQueryStringForAllPlacements(Task *task) } +/* + * TaskQueryStringForPlacement returns the query string that should be executed + * on the placement with the given placementIndex. + */ char * TaskQueryStringForPlacement(Task *task, int placementIndex) { diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 95c295405..21a91951f 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -399,7 +399,8 @@ RemoteExplain(Task *task, ExplainState *es) RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0( sizeof(RemoteExplainPlan)); - StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringForAllPlacements(task), + StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringForAllPlacements( + task), es); /* diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index becb8c87b..23cbdf349 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -276,6 +276,12 @@ CopyNodeTaskQuery(COPYFUNC_ARGS) break; } + case TASK_QUERY_TEXT_LIST: + { + COPY_NODE_FIELD(data.queryStringList); + break; + } + default: { break; @@ -293,7 +299,6 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(jobId); COPY_SCALAR_FIELD(taskId); COPY_NODE_FIELD(taskQuery); - COPY_NODE_FIELD(queryStringList); COPY_SCALAR_FIELD(anchorDistributedTableId); COPY_SCALAR_FIELD(anchorShardId); COPY_NODE_FIELD(taskPlacementList); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index df6a16796..5ec14e9e4 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -474,6 +474,38 @@ void OutTaskQuery(OUTFUNC_ARGS) { WRITE_NODE_TYPE("TASKQUERY"); WRITE_ENUM_FIELD(queryType, TaskQueryType); + + switch (node->queryType) + { + case TASK_QUERY_TEXT: + { + WRITE_STRING_FIELD(data.queryStringLazy); + break; + } + + case TASK_QUERY_OBJECT: + { + WRITE_NODE_FIELD(data.jobQueryReferenceForLazyDeparsing); + break; + } + + case TASK_QUERY_TEXT_PER_PLACEMENT: + { + WRITE_NODE_FIELD(data.perPlacementQueryStrings); + break; + } + + case TASK_QUERY_TEXT_LIST: + { + WRITE_NODE_FIELD(data.queryStringList); + break; + } + + default: + { + break; + } + } } void @@ -485,8 +517,7 @@ OutTask(OUTFUNC_ARGS) WRITE_ENUM_FIELD(taskType, TaskType); WRITE_UINT64_FIELD(jobId); WRITE_UINT_FIELD(taskId); - // WRITE_SCALAR_FIELD(taskQuery); - WRITE_NODE_FIELD(queryStringList); + WRITE_NODE_FIELD(taskQuery); WRITE_OID_FIELD(anchorDistributedTableId); WRITE_UINT64_FIELD(anchorShardId); WRITE_NODE_FIELD(taskPlacementList); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 865194117..9106ab381 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -207,7 +207,8 @@ typedef enum TaskQueryType { TASK_QUERY_TEXT, TASK_QUERY_OBJECT, - TASK_QUERY_TEXT_PER_PLACEMENT + TASK_QUERY_TEXT_PER_PLACEMENT, + TASK_QUERY_TEXT_LIST } TaskQueryType; typedef struct TaskQuery @@ -250,6 +251,14 @@ typedef struct TaskQuery * perPlacementQueryStrings is used when we have different query strings for each placement. */ List *perPlacementQueryStrings; + + /* + * queryStringList contains query strings. They should be + * run sequentially. The concatenated version of this list + * will already be set for queryStringLazy, this can be useful + * when we want to access each query string. + */ + List *queryStringList; }data; }TaskQuery; @@ -262,14 +271,6 @@ typedef struct Task TaskQuery *taskQuery; - /* - * queryStringList contains query strings. They should be - * run sequentially. The concatenated version of this list - * will already be set for queryStringLazy, this can be useful - * when we want to access each query string. - */ - List *queryStringList; - Oid anchorDistributedTableId; /* only applies to insert tasks */ uint64 anchorShardId; /* only applies to compute tasks */ List *taskPlacementList; /* only applies to compute tasks */ From b5591b1b284aeed0f0171e3da17b9244ef8d6542 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 31 Mar 2020 12:16:32 +0300 Subject: [PATCH 5/5] use taskQuery as a struct to simplify the code --- .../distributed/executor/local_executor.c | 1 + .../executor/multi_task_tracker_executor.c | 5 +- .../distributed/planner/deparse_shard_query.c | 52 ++++++------------- .../distributed/utils/citus_copyfuncs.c | 21 ++++---- .../distributed/utils/citus_nodefuncs.c | 4 +- .../distributed/utils/citus_outfuncs.c | 21 ++++---- src/include/distributed/citus_nodefuncs.h | 1 - src/include/distributed/citus_nodes.h | 3 +- .../distributed/multi_physical_planner.h | 7 ++- 9 files changed, 45 insertions(+), 70 deletions(-) diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 9d4ce0ab9..f90bd43f6 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -227,6 +227,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) } else { + /* avoid the overhead of deparsing when using local execution */ shardQueryString = ""; } diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 4d5b8ea17..4ece5b34d 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -1606,9 +1606,8 @@ TrackerQueueTask(TaskTracker *taskTracker, Task *task) HTAB *taskStateHash = taskTracker->taskStateHash; /* wrap a task assignment query outside the original query */ - StringInfo taskAssignmentQuery = TaskAssignmentQuery(task, - TaskQueryStringForAllPlacements( - task)); + StringInfo taskAssignmentQuery = + TaskAssignmentQuery(task, TaskQueryStringForAllPlacements(task)); TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId, task->taskId); diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 464d71ac8..8f271e042 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -42,7 +42,6 @@ static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); static bool ShouldLazyDeparseQuery(Task *task); static char * DeparseTaskQuery(Task *task, Query *query); static bool IsEachPlacementQueryStringDifferent(Task *task); -static void InitializeTaskQueryIfNecessary(Task *task); /* @@ -427,9 +426,8 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query) { if (ShouldLazyDeparseQuery(task)) { - InitializeTaskQueryIfNecessary(task); - task->taskQuery->queryType = TASK_QUERY_OBJECT; - task->taskQuery->data.jobQueryReferenceForLazyDeparsing = query; + task->taskQuery.queryType = TASK_QUERY_OBJECT; + task->taskQuery.data.jobQueryReferenceForLazyDeparsing = query; return; } @@ -445,26 +443,8 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query) void SetTaskQueryString(Task *task, char *queryString) { - InitializeTaskQueryIfNecessary(task); - task->taskQuery->queryType = TASK_QUERY_TEXT; - task->taskQuery->data.queryStringLazy = queryString; -} - - -/* - * InitializeTaskQueryIfNecessary initializes task query if it - * is not yet allocated. - */ -static void -InitializeTaskQueryIfNecessary(Task *task) -{ - if (task->taskQuery == NULL) - { - MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext( - task)); - task->taskQuery = CitusMakeNode(TaskQuery); - MemoryContextSwitchTo(previousContext); - } + task->taskQuery.queryType = TASK_QUERY_TEXT; + task->taskQuery.data.queryStringLazy = queryString; } @@ -474,10 +454,9 @@ InitializeTaskQueryIfNecessary(Task *task) void SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList) { - InitializeTaskQueryIfNecessary(task); Assert(perPlacementQueryStringList != NIL); - task->taskQuery->queryType = TASK_QUERY_TEXT_PER_PLACEMENT; - task->taskQuery->data.perPlacementQueryStrings = perPlacementQueryStringList; + task->taskQuery.queryType = TASK_QUERY_TEXT_PER_PLACEMENT; + task->taskQuery.data.perPlacementQueryStrings = perPlacementQueryStringList; } @@ -487,9 +466,8 @@ SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList) void SetTaskQueryStringList(Task *task, List *queryStringList) { - InitializeTaskQueryIfNecessary(task); - task->taskQuery->queryType = TASK_QUERY_TEXT_LIST; - task->taskQuery->data.queryStringList = queryStringList; + task->taskQuery.queryType = TASK_QUERY_TEXT_LIST; + task->taskQuery.data.queryStringList = queryStringList; } @@ -528,7 +506,7 @@ DeparseTaskQuery(Task *task, Query *query) int GetTaskQueryType(Task *task) { - return task->taskQuery->queryType; + return task->taskQuery.queryType; } @@ -544,15 +522,15 @@ TaskQueryStringForAllPlacements(Task *task) { if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) { - return StringJoin(task->taskQuery->data.queryStringList, ';'); + return StringJoin(task->taskQuery.data.queryStringList, ';'); } if (GetTaskQueryType(task) == TASK_QUERY_TEXT) { - return task->taskQuery->data.queryStringLazy; + return task->taskQuery.data.queryStringLazy; } Query *jobQueryReferenceForLazyDeparsing = - task->taskQuery->data.jobQueryReferenceForLazyDeparsing; - Assert(task->taskQuery->queryType == TASK_QUERY_OBJECT && + task->taskQuery.data.jobQueryReferenceForLazyDeparsing; + Assert(task->taskQuery.queryType == TASK_QUERY_OBJECT && jobQueryReferenceForLazyDeparsing != NULL); @@ -569,7 +547,7 @@ TaskQueryStringForAllPlacements(Task *task) char *queryString = DeparseTaskQuery(task, jobQueryReferenceForLazyDeparsing); MemoryContextSwitchTo(previousContext); SetTaskQueryString(task, queryString); - return task->taskQuery->data.queryStringLazy; + return task->taskQuery.data.queryStringLazy; } @@ -583,7 +561,7 @@ TaskQueryStringForPlacement(Task *task, int placementIndex) if (IsEachPlacementQueryStringDifferent(task)) { List *perPlacementQueryStringList = - task->taskQuery->data.perPlacementQueryStrings; + task->taskQuery.data.perPlacementQueryStrings; Assert(list_length(perPlacementQueryStringList) > placementIndex); return list_nth(perPlacementQueryStringList, placementIndex); } diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 23cbdf349..1aa27b5bf 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -73,6 +73,7 @@ CitusSetTag(Node *node, int tag) } \ while (0) +static void CopyTaskQuery(Task *newnode, Task *from); static void copyJobInfo(Job *newnode, Job *from) @@ -250,35 +251,33 @@ CopyNodeRelationRowLock(COPYFUNC_ARGS) } -void -CopyNodeTaskQuery(COPYFUNC_ARGS) +static void +CopyTaskQuery(Task *newnode, Task *from) { - DECLARE_FROM_AND_NEW_NODE(TaskQuery); - COPY_SCALAR_FIELD(queryType); - - switch (from->queryType) + COPY_SCALAR_FIELD(taskQuery.queryType); + switch (from->taskQuery.queryType) { case TASK_QUERY_TEXT: { - COPY_STRING_FIELD(data.queryStringLazy); + COPY_STRING_FIELD(taskQuery.data.queryStringLazy); break; } case TASK_QUERY_OBJECT: { - COPY_NODE_FIELD(data.jobQueryReferenceForLazyDeparsing); + COPY_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing); break; } case TASK_QUERY_TEXT_PER_PLACEMENT: { - COPY_NODE_FIELD(data.perPlacementQueryStrings); + COPY_NODE_FIELD(taskQuery.data.perPlacementQueryStrings); break; } case TASK_QUERY_TEXT_LIST: { - COPY_NODE_FIELD(data.queryStringList); + COPY_NODE_FIELD(taskQuery.data.queryStringList); break; } @@ -298,7 +297,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(taskType); COPY_SCALAR_FIELD(jobId); COPY_SCALAR_FIELD(taskId); - COPY_NODE_FIELD(taskQuery); + CopyTaskQuery(newnode, from); COPY_SCALAR_FIELD(anchorDistributedTableId); COPY_SCALAR_FIELD(anchorShardId); COPY_NODE_FIELD(taskPlacementList); diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index ec225a826..ebb34f5e4 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -44,8 +44,7 @@ static const char *CitusNodeTagNamesD[] = { "RelationShard", "RelationRowLock", "DeferredErrorMessage", - "GroupShardPlacement", - "TaskQuery", + "GroupShardPlacement" }; const char **CitusNodeTagNames = CitusNodeTagNamesD; @@ -402,7 +401,6 @@ const ExtensibleNodeMethods nodeMethods[] = DEFINE_NODE_METHODS(TaskExecution), DEFINE_NODE_METHODS(DeferredErrorMessage), DEFINE_NODE_METHODS(GroupShardPlacement), - DEFINE_NODE_METHODS(TaskQuery), /* nodes with only output support */ DEFINE_NODE_METHODS_NO_READ(MultiNode), diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 5ec14e9e4..1f51db70d 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -135,7 +135,7 @@ #define booltostr(x) ((x) ? "true" : "false") - +static void WriteTaskQuery(OUTFUNC_ARGS); /***************************************************************************** * Output routines for Citus node types @@ -469,35 +469,34 @@ OutRelationRowLock(OUTFUNC_ARGS) WRITE_ENUM_FIELD(rowLockStrength, LockClauseStrength); } -void OutTaskQuery(OUTFUNC_ARGS) { - WRITE_LOCALS(TaskQuery); - WRITE_NODE_TYPE("TASKQUERY"); +static void WriteTaskQuery(OUTFUNC_ARGS) { + WRITE_LOCALS(Task); - WRITE_ENUM_FIELD(queryType, TaskQueryType); + WRITE_ENUM_FIELD(taskQuery.queryType, TaskQueryType); - switch (node->queryType) + switch (node->taskQuery.queryType) { case TASK_QUERY_TEXT: { - WRITE_STRING_FIELD(data.queryStringLazy); + WRITE_STRING_FIELD(taskQuery.data.queryStringLazy); break; } case TASK_QUERY_OBJECT: { - WRITE_NODE_FIELD(data.jobQueryReferenceForLazyDeparsing); + WRITE_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing); break; } case TASK_QUERY_TEXT_PER_PLACEMENT: { - WRITE_NODE_FIELD(data.perPlacementQueryStrings); + WRITE_NODE_FIELD(taskQuery.data.perPlacementQueryStrings); break; } case TASK_QUERY_TEXT_LIST: { - WRITE_NODE_FIELD(data.queryStringList); + WRITE_NODE_FIELD(taskQuery.data.queryStringList); break; } @@ -517,7 +516,7 @@ OutTask(OUTFUNC_ARGS) WRITE_ENUM_FIELD(taskType, TaskType); WRITE_UINT64_FIELD(jobId); WRITE_UINT_FIELD(taskId); - WRITE_NODE_FIELD(taskQuery); + WriteTaskQuery(str, raw_node); WRITE_OID_FIELD(anchorDistributedTableId); WRITE_UINT64_FIELD(anchorShardId); WRITE_NODE_FIELD(taskPlacementList); diff --git a/src/include/distributed/citus_nodefuncs.h b/src/include/distributed/citus_nodefuncs.h index 2e565a1be..d25f6222c 100644 --- a/src/include/distributed/citus_nodefuncs.h +++ b/src/include/distributed/citus_nodefuncs.h @@ -47,7 +47,6 @@ extern void OutMapMergeJob(OUTFUNC_ARGS); extern void OutShardPlacement(OUTFUNC_ARGS); extern void OutRelationShard(OUTFUNC_ARGS); extern void OutRelationRowLock(OUTFUNC_ARGS); -extern void OutTaskQuery(OUTFUNC_ARGS); extern void OutTask(OUTFUNC_ARGS); extern void OutLocalPlannedStatement(OUTFUNC_ARGS); extern void OutTaskExecution(OUTFUNC_ARGS); diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index 4c39d4d59..27fc8d7b0 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -66,8 +66,7 @@ typedef enum CitusNodeTag T_RelationShard, T_RelationRowLock, T_DeferredErrorMessage, - T_GroupShardPlacement, - T_TaskQuery + T_GroupShardPlacement } CitusNodeTag; diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 9106ab381..74c6e8be0 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -213,7 +213,6 @@ typedef enum TaskQueryType typedef struct TaskQuery { - CitusNode type; TaskQueryType queryType; union @@ -269,7 +268,11 @@ typedef struct Task uint64 jobId; uint32 taskId; - TaskQuery *taskQuery; + /* + * taskQuery contains query string information. The way we get queryString can be different + * so this is abstracted with taskQuery. + */ + TaskQuery taskQuery; Oid anchorDistributedTableId; /* only applies to insert tasks */ uint64 anchorShardId; /* only applies to compute tasks */