diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 1cda964ba..79e181c7b 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -3196,18 +3196,9 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, List *placementAccessList = PlacementAccessListForTask(task, taskPlacement); int querySent = 0; - char *queryString = NULL; - if (list_length(task->perPlacementQueryStrings) == 0) - { - queryString = TaskQueryString(task); - } - else - { - Assert(list_length(task->taskPlacementList) == list_length( - task->perPlacementQueryStrings)); - queryString = list_nth(task->perPlacementQueryStrings, - placementExecution->placementExecutionIndex); - } + char *queryString = TaskQueryStringForPlacement(task, + placementExecution-> + placementExecutionIndex); if (execution->transactionProperties->useRemoteTransactionBlocks != TRANSACTION_BLOCKS_DISALLOWED) diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index ee11c03b7..ddaad030e 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -221,16 +221,15 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, shardPlacement->nodeId, quote_literal_cstr(taskPrefix), quote_literal_cstr(taskPrefix), - quote_literal_cstr(TaskQueryString(selectTask)), + quote_literal_cstr(TaskQueryStringForAllPlacements( + selectTask)), partitionColumnIndex, quote_literal_cstr(partitionMethodString), minValuesString->data, maxValuesString->data, binaryFormatString); perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data); } - - SetTaskQueryString(selectTask, NULL); - selectTask->perPlacementQueryStrings = perPlacementQueries; + SetTaskPerPlacementQueryStrings(selectTask, perPlacementQueries); } } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 4b7c7cc32..a982213c8 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -1062,7 +1062,7 @@ IsRedistributablePlan(Plan *selectPlan) /* - * WrapTaskListForProjection wraps task->queryString to only select given + * WrapTaskListForProjection wraps task query string to only select given * projected columns. It modifies the taskList. */ static void @@ -1091,7 +1091,7 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries) StringInfo wrappedQuery = makeStringInfo(); appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery", projectedColumnsString->data, - TaskQueryString(task)); + TaskQueryStringForAllPlacements(task)); SetTaskQueryString(task, wrappedQuery->data); } } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 490028c5b..f90bd43f6 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -201,7 +201,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) taskParameterTypes = NULL; } - Query *shardQuery = ParseQueryString(TaskQueryString(task), + Query *shardQuery = ParseQueryString(TaskQueryStringForAllPlacements(task), taskParameterTypes, taskNumParams); @@ -220,9 +220,16 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) LogLocalCommand(task); - char *shardQueryString = task->queryStringLazy - ? task->queryStringLazy - : ""; + char *shardQueryString = NULL; + if (GetTaskQueryType(task) == TASK_QUERY_TEXT) + { + shardQueryString = TaskQueryStringForAllPlacements(task); + } + else + { + /* avoid the overhead of deparsing when using local execution */ + shardQueryString = ""; + } totalRowsProcessed += ExecuteLocalTaskPlan(scanState, localPlan, shardQueryString); @@ -302,7 +309,7 @@ ExecuteLocalUtilityTaskList(List *localTaskList) foreach_ptr(localTask, localTaskList) { - const char *localTaskQueryCommand = TaskQueryString(localTask); + const char *localTaskQueryCommand = TaskQueryStringForAllPlacements(localTask); /* we do not expect tasks with INVALID_SHARD_ID for utility commands */ Assert(localTask->anchorShardId != INVALID_SHARD_ID); @@ -390,7 +397,7 @@ LogLocalCommand(Task *task) } ereport(NOTICE, (errmsg("executing the command locally: %s", - ApplyLogRedaction(TaskQueryString(task))))); + ApplyLogRedaction(TaskQueryStringForAllPlacements(task))))); } diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 5dd0db6ea..4ece5b34d 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -1570,7 +1570,8 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task) */ StringInfo sqlTaskQueryString = makeStringInfo(); - char *escapedTaskQueryString = quote_literal_cstr(TaskQueryString(task)); + char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringForAllPlacements( + task)); if (BinaryMasterCopyFormat) { @@ -1605,7 +1606,8 @@ TrackerQueueTask(TaskTracker *taskTracker, Task *task) HTAB *taskStateHash = taskTracker->taskStateHash; /* wrap a task assignment query outside the original query */ - StringInfo taskAssignmentQuery = TaskAssignmentQuery(task, TaskQueryString(task)); + StringInfo taskAssignmentQuery = + TaskAssignmentQuery(task, TaskQueryStringForAllPlacements(task)); TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId, task->taskId); @@ -2742,7 +2744,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) { /* assign through task tracker to manage resource utilization */ StringInfo jobCleanupQuery = TaskAssignmentQuery( - jobCleanupTask, TaskQueryString(jobCleanupTask)); + jobCleanupTask, TaskQueryStringForAllPlacements(jobCleanupTask)); jobCleanupQuerySent = MultiClientSendQuery(taskTracker->connectionId, jobCleanupQuery->data); @@ -2821,7 +2823,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) nodeName, nodePort, (int) queryStatus), errhint("Manually clean job resources on node " "\"%s:%u\" by running \"%s\" ", nodeName, - nodePort, TaskQueryString( + nodePort, TaskQueryStringForAllPlacements( jobCleanupTask)))); } else @@ -2840,7 +2842,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) nodePort, (int) resultStatus), errhint("Manually clean job resources on node " "\"%s:%u\" by running \"%s\" ", nodeName, - nodePort, TaskQueryString(jobCleanupTask)))); + nodePort, TaskQueryStringForAllPlacements( + jobCleanupTask)))); } else { diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 397e1459e..63827f5f3 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -447,7 +447,8 @@ DropShards(Oid relationId, char *schemaName, char *relationName, * connect to that node to drop the shard placement over that * remote connection. */ - const char *dropShardPlacementCommand = TaskQueryString(task); + const char *dropShardPlacementCommand = TaskQueryStringForAllPlacements( + task); ExecuteDropShardPlacementCommandRemotely(shardPlacement, relationName, dropShardPlacementCommand); diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 3651da73a..8f271e042 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -41,6 +41,7 @@ static void UpdateTaskQueryString(Query *query, Oid distributedTableId, static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); static bool ShouldLazyDeparseQuery(Task *task); static char * DeparseTaskQuery(Task *task, Query *query); +static bool IsEachPlacementQueryStringDifferent(Task *task); /* @@ -111,11 +112,13 @@ RebuildQueryStrings(Job *workerJob) } } + bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT || + GetTaskQueryType(task) == TASK_QUERY_OBJECT; ereport(DEBUG4, (errmsg("query before rebuilding: %s", - task->queryForLocalExecution == NULL && - task->queryStringLazy == NULL + !isQueryObjectOrText ? "(null)" - : ApplyLogRedaction(TaskQueryString(task))))); + : ApplyLogRedaction(TaskQueryStringForAllPlacements( + task))))); UpdateTaskQueryString(query, relationId, valuesRTE, task); @@ -126,7 +129,8 @@ RebuildQueryStrings(Job *workerJob) task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved; ereport(DEBUG4, (errmsg("query after rebuilding: %s", - ApplyLogRedaction(TaskQueryString(task))))); + ApplyLogRedaction(TaskQueryStringForAllPlacements( + task))))); } } @@ -181,7 +185,7 @@ UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *value task->anchorDistributedTableId = distributedTableId; } - SetTaskQuery(task, query); + SetTaskQueryIfShouldLazyDeparse(task, query); if (valuesRTE != NULL) { @@ -412,36 +416,47 @@ ShouldLazyDeparseQuery(Task *task) /* - * SetTaskQuery attaches the query to the task so that it can be used during - * execution. If local execution can possibly take place it sets task->queryForLocalExecution. + * SetTaskQueryIfShouldLazyDeparse attaches the query to the task so that it can be used during + * execution. If local execution can possibly take place it sets task->jobQueryReferenceForLazyDeparsing. * If not it deparses the query and sets queryStringLazy, to avoid blowing the * size of the task unnecesarily. */ void -SetTaskQuery(Task *task, Query *query) +SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query) { if (ShouldLazyDeparseQuery(task)) { - task->queryForLocalExecution = query; - task->queryStringLazy = NULL; + task->taskQuery.queryType = TASK_QUERY_OBJECT; + task->taskQuery.data.jobQueryReferenceForLazyDeparsing = query; return; } - task->queryForLocalExecution = NULL; - task->queryStringLazy = DeparseTaskQuery(task, query); + SetTaskQueryString(task, DeparseTaskQuery(task, query)); } /* * SetTaskQueryString attaches the query string to the task so that it can be - * used during execution. It also unsets queryForLocalExecution to be sure + * used during execution. It also unsets jobQueryReferenceForLazyDeparsing to be sure * these are kept in sync. */ void SetTaskQueryString(Task *task, char *queryString) { - task->queryForLocalExecution = NULL; - task->queryStringLazy = queryString; + task->taskQuery.queryType = TASK_QUERY_TEXT; + task->taskQuery.data.queryStringLazy = queryString; +} + + +/* + * SetTaskPerPlacementQueryStrings set the perPlacementQueryString for the given task. + */ +void +SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList) +{ + Assert(perPlacementQueryStringList != NIL); + task->taskQuery.queryType = TASK_QUERY_TEXT_PER_PLACEMENT; + task->taskQuery.data.perPlacementQueryStrings = perPlacementQueryStringList; } @@ -451,8 +466,8 @@ SetTaskQueryString(Task *task, char *queryString) void SetTaskQueryStringList(Task *task, List *queryStringList) { - task->queryStringList = queryStringList; - SetTaskQueryString(task, StringJoin(queryStringList, ';')); + task->taskQuery.queryType = TASK_QUERY_TEXT_LIST; + task->taskQuery.data.queryStringList = queryStringList; } @@ -486,33 +501,80 @@ DeparseTaskQuery(Task *task, Query *query) /* - * TaskQueryString generates task->queryStringLazy if missing. + * GetTaskQueryType returns the type of the task query. + */ +int +GetTaskQueryType(Task *task) +{ + return task->taskQuery.queryType; +} + + +/* + * TaskQueryStringForAllPlacements generates task query string text if missing. * * For performance reasons, the queryString is generated lazily. For example * for local queries it is usually not needed to generate it, so this way we * can skip the expensive deparsing+parsing. */ char * -TaskQueryString(Task *task) +TaskQueryStringForAllPlacements(Task *task) { - if (task->queryStringLazy != NULL) + if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) { - return task->queryStringLazy; + return StringJoin(task->taskQuery.data.queryStringList, ';'); } - Assert(task->queryForLocalExecution != NULL); + if (GetTaskQueryType(task) == TASK_QUERY_TEXT) + { + return task->taskQuery.data.queryStringLazy; + } + Query *jobQueryReferenceForLazyDeparsing = + task->taskQuery.data.jobQueryReferenceForLazyDeparsing; + Assert(task->taskQuery.queryType == TASK_QUERY_OBJECT && + jobQueryReferenceForLazyDeparsing != NULL); /* - * Switch to the memory context of task->queryForLocalExecution before generating the query + * Switch to the memory context of task->jobQueryReferenceForLazyDeparsing before generating the query * string. This way the query string is not freed in between multiple * executions of a prepared statement. Except when UpdateTaskQueryString is - * used to set task->queryForLocalExecution, in that case it is freed but it will be set to + * used to set task->jobQueryReferenceForLazyDeparsing, in that case it is freed but it will be set to * NULL on the next execution of the query because UpdateTaskQueryString * does that. */ MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext( - task->queryForLocalExecution)); - task->queryStringLazy = DeparseTaskQuery(task, task->queryForLocalExecution); + jobQueryReferenceForLazyDeparsing)); + char *queryString = DeparseTaskQuery(task, jobQueryReferenceForLazyDeparsing); MemoryContextSwitchTo(previousContext); - return task->queryStringLazy; + SetTaskQueryString(task, queryString); + return task->taskQuery.data.queryStringLazy; +} + + +/* + * TaskQueryStringForPlacement returns the query string that should be executed + * on the placement with the given placementIndex. + */ +char * +TaskQueryStringForPlacement(Task *task, int placementIndex) +{ + if (IsEachPlacementQueryStringDifferent(task)) + { + List *perPlacementQueryStringList = + task->taskQuery.data.perPlacementQueryStrings; + Assert(list_length(perPlacementQueryStringList) > placementIndex); + return list_nth(perPlacementQueryStringList, placementIndex); + } + return TaskQueryStringForAllPlacements(task); +} + + +/* + * IsEachPlacementQueryStringDifferent returns true if each placement + * has a different query string. + */ +static bool +IsEachPlacementQueryStringDifferent(Task *task) +{ + return GetTaskQueryType(task) == TASK_QUERY_TEXT_PER_PLACEMENT; } diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index a521b36ab..69e711560 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -367,7 +367,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) task = CitusMakeNode(Task); task->taskType = SELECT_TASK; task->taskPlacementList = placementList; - SetTaskQuery(task, planContext->query); + SetTaskQueryIfShouldLazyDeparse(task, planContext->query); task->anchorShardId = shardInterval->shardId; task->replicationModel = distTable->replicationModel; diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index cc36677f7..21a91951f 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -399,7 +399,9 @@ RemoteExplain(Task *task, ExplainState *es) RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0( sizeof(RemoteExplainPlan)); - StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryString(task), es); + StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringForAllPlacements( + task), + es); /* * Use a coordinated transaction to ensure that we open a transaction block diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 00d4908d7..1287e47b6 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -4388,7 +4388,7 @@ CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask, /* wrap repartition query string around filter query string */ StringInfo mapQueryString = makeStringInfo(); - char *filterQueryString = TaskQueryString(filterTask); + char *filterQueryString = TaskQueryStringForAllPlacements(filterTask); char *filterQueryEscapedText = quote_literal_cstr(filterQueryString); PartitionType partitionType = mapMergeJob->partitionType; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 588bbd3db..fa2e16e3c 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1820,7 +1820,7 @@ SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList, * that the query cannot be executed locally. */ task->taskPlacementList = placementList; - SetTaskQuery(task, query); + SetTaskQueryIfShouldLazyDeparse(task, query); task->anchorShardId = shardId; task->jobId = jobId; task->relationShardList = relationShardList; @@ -1901,7 +1901,7 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, } task->taskPlacementList = placementList; - SetTaskQuery(task, query); + SetTaskQueryIfShouldLazyDeparse(task, query); task->anchorShardId = shardId; task->jobId = jobId; task->relationShardList = relationShardList; diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 5d6d47218..1aa27b5bf 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -73,6 +73,7 @@ CitusSetTag(Node *node, int tag) } \ while (0) +static void CopyTaskQuery(Task *newnode, Task *from); static void copyJobInfo(Job *newnode, Job *from) @@ -250,6 +251,44 @@ CopyNodeRelationRowLock(COPYFUNC_ARGS) } +static void +CopyTaskQuery(Task *newnode, Task *from) +{ + COPY_SCALAR_FIELD(taskQuery.queryType); + switch (from->taskQuery.queryType) + { + case TASK_QUERY_TEXT: + { + COPY_STRING_FIELD(taskQuery.data.queryStringLazy); + break; + } + + case TASK_QUERY_OBJECT: + { + COPY_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing); + break; + } + + case TASK_QUERY_TEXT_PER_PLACEMENT: + { + COPY_NODE_FIELD(taskQuery.data.perPlacementQueryStrings); + break; + } + + case TASK_QUERY_TEXT_LIST: + { + COPY_NODE_FIELD(taskQuery.data.queryStringList); + break; + } + + default: + { + break; + } + } +} + + void CopyNodeTask(COPYFUNC_ARGS) { @@ -258,10 +297,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(taskType); COPY_SCALAR_FIELD(jobId); COPY_SCALAR_FIELD(taskId); - COPY_NODE_FIELD(queryForLocalExecution); - COPY_STRING_FIELD(queryStringLazy); - COPY_NODE_FIELD(perPlacementQueryStrings); - COPY_NODE_FIELD(queryStringList); + CopyTaskQuery(newnode, from); COPY_SCALAR_FIELD(anchorDistributedTableId); COPY_SCALAR_FIELD(anchorShardId); COPY_NODE_FIELD(taskPlacementList); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 029d8d964..1f51db70d 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -135,7 +135,7 @@ #define booltostr(x) ((x) ? "true" : "false") - +static void WriteTaskQuery(OUTFUNC_ARGS); /***************************************************************************** * Output routines for Citus node types @@ -469,6 +469,43 @@ OutRelationRowLock(OUTFUNC_ARGS) WRITE_ENUM_FIELD(rowLockStrength, LockClauseStrength); } +static void WriteTaskQuery(OUTFUNC_ARGS) { + WRITE_LOCALS(Task); + + WRITE_ENUM_FIELD(taskQuery.queryType, TaskQueryType); + + switch (node->taskQuery.queryType) + { + case TASK_QUERY_TEXT: + { + WRITE_STRING_FIELD(taskQuery.data.queryStringLazy); + break; + } + + case TASK_QUERY_OBJECT: + { + WRITE_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing); + break; + } + + case TASK_QUERY_TEXT_PER_PLACEMENT: + { + WRITE_NODE_FIELD(taskQuery.data.perPlacementQueryStrings); + break; + } + + case TASK_QUERY_TEXT_LIST: + { + WRITE_NODE_FIELD(taskQuery.data.queryStringList); + break; + } + + default: + { + break; + } + } +} void OutTask(OUTFUNC_ARGS) @@ -479,10 +516,7 @@ OutTask(OUTFUNC_ARGS) WRITE_ENUM_FIELD(taskType, TaskType); WRITE_UINT64_FIELD(jobId); WRITE_UINT_FIELD(taskId); - WRITE_NODE_FIELD(queryForLocalExecution); - WRITE_STRING_FIELD(queryStringLazy); - WRITE_NODE_FIELD(perPlacementQueryStrings); - WRITE_NODE_FIELD(queryStringList); + WriteTaskQuery(str, raw_node); WRITE_OID_FIELD(anchorDistributedTableId); WRITE_UINT64_FIELD(anchorShardId); WRITE_NODE_FIELD(taskPlacementList); diff --git a/src/include/distributed/citus_nodefuncs.h b/src/include/distributed/citus_nodefuncs.h index e4b811d95..d25f6222c 100644 --- a/src/include/distributed/citus_nodefuncs.h +++ b/src/include/distributed/citus_nodefuncs.h @@ -76,6 +76,7 @@ extern void CopyNodeRelationShard(COPYFUNC_ARGS); extern void CopyNodeRelationRowLock(COPYFUNC_ARGS); extern void CopyNodeTask(COPYFUNC_ARGS); extern void CopyNodeLocalPlannedStatement(COPYFUNC_ARGS); +extern void CopyNodeTaskQuery(COPYFUNC_ARGS); extern void CopyNodeTaskExecution(COPYFUNC_ARGS); extern void CopyNodeDeferredErrorMessage(COPYFUNC_ARGS); diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index dc340821b..7eef84675 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -23,10 +23,14 @@ extern void RebuildQueryStrings(Job *workerJob); extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); -extern void SetTaskQuery(Task *task, Query *query); +extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryStringList(Task *task, List *queryStringList); -extern char * TaskQueryString(Task *task); +extern void SetTaskPerPlacementQueryStrings(Task *task, + List *perPlacementQueryStringList); +extern char * TaskQueryStringForAllPlacements(Task *task); +extern char * TaskQueryStringForPlacement(Task *task, int placementIndex); extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList); +extern int GetTaskQueryType(Task *task); #endif /* DEPARSE_SHARD_QUERY_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 2f2a65acc..74c6e8be0 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -203,6 +203,64 @@ typedef struct MapMergeJob */ typedef struct TaskExecution TaskExecution; +typedef enum TaskQueryType +{ + TASK_QUERY_TEXT, + TASK_QUERY_OBJECT, + TASK_QUERY_TEXT_PER_PLACEMENT, + TASK_QUERY_TEXT_LIST +} TaskQueryType; + +typedef struct TaskQuery +{ + TaskQueryType queryType; + + union + { + /* + * For most queries jobQueryReferenceForLazyDeparsing and/or queryStringLazy is not + * NULL. This means we have a single query for all placements. + * + * If this is not the case, the length of perPlacementQueryStrings is + * non-zero and equal to length of taskPlacementList. Like this it can + * assign a different query for each placement. We need this flexibility + * when a query should return node specific values. For example, on which + * node did we succeed storing some result files? + * + * jobQueryReferenceForLazyDeparsing is only not null when the planner thinks the + * query could possibly be locally executed. In that case deparsing+parsing + * the query might not be necessary, so we do that lazily. + * + * jobQueryReferenceForLazyDeparsing should only be set by using SetTaskQueryIfShouldLazyDeparse() + */ + Query *jobQueryReferenceForLazyDeparsing; + + /* + * In almost all cases queryStringLazy should be read only indirectly by + * using TaskQueryStringForAllPlacements(). This will populate the field if only the + * jobQueryReferenceForLazyDeparsing field is not NULL. + * + * This field should only be set by using SetTaskQueryString() (or as a + * side effect from TaskQueryStringForAllPlacements()). Otherwise it might not be in sync + * with jobQueryReferenceForLazyDeparsing. + */ + char *queryStringLazy; + + /* + * perPlacementQueryStrings is used when we have different query strings for each placement. + */ + List *perPlacementQueryStrings; + + /* + * queryStringList contains query strings. They should be + * run sequentially. The concatenated version of this list + * will already be set for queryStringLazy, this can be useful + * when we want to access each query string. + */ + List *queryStringList; + }data; +}TaskQuery; + typedef struct Task { CitusNode type; @@ -211,42 +269,10 @@ typedef struct Task uint32 taskId; /* - * For most queries queryForLocalExecution and/or queryStringLazy is not - * NULL. This means we have a single query for all placements. - * - * If this is not the case, the length of perPlacementQueryStrings is - * non-zero and equal to length of taskPlacementList. Like this it can - * assign a different query for each placement. We need this flexibility - * when a query should return node specific values. For example, on which - * node did we succeed storing some result files? - * - * queryForLocalExecution is only not null when the planner thinks the - * query could possibly be locally executed. In that case deparsing+parsing - * the query might not be necessary, so we do that lazily. - * - * queryForLocalExecution should only be set by using SetTaskQuery() + * taskQuery contains query string information. The way we get queryString can be different + * so this is abstracted with taskQuery. */ - Query *queryForLocalExecution; - - /* - * In almost all cases queryStringLazy should be read only indirectly by - * using TaskQueryString(). This will populate the field if only the - * queryForLocalExecution field is not NULL. - * - * This field should only be set by using SetTaskQueryString() (or as a - * side effect from TaskQueryString()). Otherwise it might not be in sync - * with queryForLocalExecution. - */ - char *queryStringLazy; - List *perPlacementQueryStrings; - - /* - * queryStringList contains query strings. They should be - * run sequentially. The concatenated version of this list - * will already be set for queryStringLazy, this can be useful - * when we want to access each query string. - */ - List *queryStringList; + TaskQuery taskQuery; Oid anchorDistributedTableId; /* only applies to insert tasks */ uint64 anchorShardId; /* only applies to compute tasks */