diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 1cda964ba..79e181c7b 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -3196,18 +3196,9 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, List *placementAccessList = PlacementAccessListForTask(task, taskPlacement); int querySent = 0; - char *queryString = NULL; - if (list_length(task->perPlacementQueryStrings) == 0) - { - queryString = TaskQueryString(task); - } - else - { - Assert(list_length(task->taskPlacementList) == list_length( - task->perPlacementQueryStrings)); - queryString = list_nth(task->perPlacementQueryStrings, - placementExecution->placementExecutionIndex); - } + char *queryString = TaskQueryStringForPlacement(task, + placementExecution-> + placementExecutionIndex); if (execution->transactionProperties->useRemoteTransactionBlocks != TRANSACTION_BLOCKS_DISALLOWED) diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index 7e9d47218..06614592c 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -221,7 +221,7 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, shardPlacement->nodeId, quote_literal_cstr(taskPrefix), quote_literal_cstr(taskPrefix), - quote_literal_cstr(TaskQueryString(selectTask)), + quote_literal_cstr(TaskQueryStringAllPlacements(selectTask)), partitionColumnIndex, quote_literal_cstr(partitionMethodString), minValuesString->data, maxValuesString->data, diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 4b7c7cc32..809d689ae 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -1091,7 +1091,7 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries) StringInfo wrappedQuery = makeStringInfo(); appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery", projectedColumnsString->data, - TaskQueryString(task)); + TaskQueryStringAllPlacements(task)); SetTaskQueryString(task, wrappedQuery->data); } } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 490028c5b..a15e92849 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -201,7 +201,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) taskParameterTypes = NULL; } - Query *shardQuery = ParseQueryString(TaskQueryString(task), + Query *shardQuery = ParseQueryString(TaskQueryStringAllPlacements(task), taskParameterTypes, taskNumParams); @@ -302,7 +302,7 @@ ExecuteLocalUtilityTaskList(List *localTaskList) foreach_ptr(localTask, localTaskList) { - const char *localTaskQueryCommand = TaskQueryString(localTask); + const char *localTaskQueryCommand = TaskQueryStringAllPlacements(localTask); /* we do not expect tasks with INVALID_SHARD_ID for utility commands */ Assert(localTask->anchorShardId != INVALID_SHARD_ID); @@ -390,7 +390,7 @@ LogLocalCommand(Task *task) } ereport(NOTICE, (errmsg("executing the command locally: %s", - ApplyLogRedaction(TaskQueryString(task))))); + ApplyLogRedaction(TaskQueryStringAllPlacements(task))))); } diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 5dd0db6ea..26a8bc432 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -1570,7 +1570,7 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task) */ StringInfo sqlTaskQueryString = makeStringInfo(); - char *escapedTaskQueryString = quote_literal_cstr(TaskQueryString(task)); + char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringAllPlacements(task)); if (BinaryMasterCopyFormat) { @@ -1605,7 +1605,9 @@ TrackerQueueTask(TaskTracker *taskTracker, Task *task) HTAB *taskStateHash = taskTracker->taskStateHash; /* wrap a task assignment query outside the original query */ - StringInfo taskAssignmentQuery = TaskAssignmentQuery(task, TaskQueryString(task)); + StringInfo taskAssignmentQuery = TaskAssignmentQuery(task, + TaskQueryStringAllPlacements( + task)); TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId, task->taskId); @@ -2742,7 +2744,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) { /* assign through task tracker to manage resource utilization */ StringInfo jobCleanupQuery = TaskAssignmentQuery( - jobCleanupTask, TaskQueryString(jobCleanupTask)); + jobCleanupTask, TaskQueryStringAllPlacements(jobCleanupTask)); jobCleanupQuerySent = MultiClientSendQuery(taskTracker->connectionId, jobCleanupQuery->data); @@ -2821,7 +2823,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) nodeName, nodePort, (int) queryStatus), errhint("Manually clean job resources on node " "\"%s:%u\" by running \"%s\" ", nodeName, - nodePort, TaskQueryString( + nodePort, TaskQueryStringAllPlacements( jobCleanupTask)))); } else @@ -2840,7 +2842,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) nodePort, (int) resultStatus), errhint("Manually clean job resources on node " "\"%s:%u\" by running \"%s\" ", nodeName, - nodePort, TaskQueryString(jobCleanupTask)))); + nodePort, TaskQueryStringAllPlacements( + jobCleanupTask)))); } else { diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 397e1459e..77c119a59 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -447,7 +447,8 @@ DropShards(Oid relationId, char *schemaName, char *relationName, * connect to that node to drop the shard placement over that * remote connection. */ - const char *dropShardPlacementCommand = TaskQueryString(task); + const char *dropShardPlacementCommand = TaskQueryStringAllPlacements( + task); ExecuteDropShardPlacementCommandRemotely(shardPlacement, relationName, dropShardPlacementCommand); diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 69d3012bc..a657b7809 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -41,7 +41,8 @@ static void UpdateTaskQueryString(Query *query, Oid distributedTableId, static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); static bool ShouldLazyDeparseQuery(Task *task); static char * DeparseTaskQuery(Task *task, Query *query); -static bool IsEachPlacementQueryStringDifferent(Task* task); +static bool IsEachPlacementQueryStringDifferent(Task *task); +static char * TaskQueryStringForAllPlacements(Task *task); /* @@ -116,7 +117,8 @@ RebuildQueryStrings(Job *workerJob) task->queryForLocalExecution == NULL && task->queryStringLazy == NULL ? "(null)" - : ApplyLogRedaction(TaskQueryString(task))))); + : ApplyLogRedaction(TaskQueryStringAllPlacements( + task))))); UpdateTaskQueryString(query, relationId, valuesRTE, task); @@ -127,7 +129,7 @@ RebuildQueryStrings(Job *workerJob) task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved; ereport(DEBUG4, (errmsg("query after rebuilding: %s", - ApplyLogRedaction(TaskQueryString(task))))); + ApplyLogRedaction(TaskQueryStringAllPlacements(task))))); } } @@ -182,7 +184,7 @@ UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *value task->anchorDistributedTableId = distributedTableId; } - SetTaskQuery(task, query); + SetTaskQueryIfShouldLazyDeparse(task, query); if (valuesRTE != NULL) { @@ -413,13 +415,13 @@ ShouldLazyDeparseQuery(Task *task) /* - * SetTaskQuery attaches the query to the task so that it can be used during + * SetTaskQueryIfShouldLazyDeparse attaches the query to the task so that it can be used during * execution. If local execution can possibly take place it sets task->queryForLocalExecution. * If not it deparses the query and sets queryStringLazy, to avoid blowing the * size of the task unnecesarily. */ void -SetTaskQuery(Task *task, Query *query) +SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query) { if (ShouldLazyDeparseQuery(task)) { @@ -427,7 +429,7 @@ SetTaskQuery(Task *task, Query *query) task->queryStringLazy = NULL; return; } - SetTaskQueryString(task, DeparseTaskQuery(task,query)); + SetTaskQueryString(task, DeparseTaskQuery(task, query)); } @@ -443,10 +445,13 @@ SetTaskQueryString(Task *task, char *queryString) task->queryStringLazy = queryString; } + /* * SetTaskPerPlacementQueryStrings set the perPlacementQueryString for the given task. */ -void SetTaskPerPlacementQueryStrings(Task *task, List* perPlacementQueryStringList) { +void +SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList) +{ Assert(perPlacementQueryStringList != NIL); task->perPlacementQueryStrings = perPlacementQueryStringList; SetTaskQueryString(task, NULL); @@ -494,14 +499,25 @@ DeparseTaskQuery(Task *task, Query *query) /* - * TaskQueryString generates task->queryStringLazy if missing. + * TaskQueryStringAllPlacements generates task->queryStringLazy if missing. * * For performance reasons, the queryString is generated lazily. For example * for local queries it is usually not needed to generate it, so this way we * can skip the expensive deparsing+parsing. */ char * -TaskQueryString(Task *task) +TaskQueryStringAllPlacements(Task *task) +{ + return TaskQueryStringForAllPlacements(task); +} + + +/* + * TaskQueryStringForAllPlacements returns the query string for the given task. + * In this case, each placement has the same query string. + */ +static char * +TaskQueryStringForAllPlacements(Task *task) { if (task->queryStringLazy != NULL) { @@ -520,15 +536,30 @@ TaskQueryString(Task *task) */ MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext( task->queryForLocalExecution)); - task->queryStringLazy = DeparseTaskQuery(task, task->queryForLocalExecution); + SetTaskQueryString(task, DeparseTaskQuery(task, task->queryForLocalExecution)); MemoryContextSwitchTo(previousContext); return task->queryStringLazy; } + +char * +TaskQueryStringForPlacement(Task *task, int placementIndex) +{ + if (IsEachPlacementQueryStringDifferent(task)) + { + Assert(list_length(task->perPlacementQueryStrings) > placementIndex); + return list_nth(task->perPlacementQueryStrings, placementIndex); + } + return TaskQueryStringForAllPlacements(task); +} + + /* * IsEachPlacementQueryStringDifferent returns true if each placement * has a different query string. */ -static bool IsEachPlacementQueryStringDifferent(Task* task) { +static bool +IsEachPlacementQueryStringDifferent(Task *task) +{ return list_length(task->perPlacementQueryStrings) > 0; -} \ No newline at end of file +} diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index a521b36ab..69e711560 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -367,7 +367,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) task = CitusMakeNode(Task); task->taskType = SELECT_TASK; task->taskPlacementList = placementList; - SetTaskQuery(task, planContext->query); + SetTaskQueryIfShouldLazyDeparse(task, planContext->query); task->anchorShardId = shardInterval->shardId; task->replicationModel = distTable->replicationModel; diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index cc36677f7..c5a237f22 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -399,7 +399,8 @@ RemoteExplain(Task *task, ExplainState *es) RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0( sizeof(RemoteExplainPlan)); - StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryString(task), es); + StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringAllPlacements(task), + es); /* * Use a coordinated transaction to ensure that we open a transaction block diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 00d4908d7..c630bc5c9 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -4388,7 +4388,7 @@ CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask, /* wrap repartition query string around filter query string */ StringInfo mapQueryString = makeStringInfo(); - char *filterQueryString = TaskQueryString(filterTask); + char *filterQueryString = TaskQueryStringAllPlacements(filterTask); char *filterQueryEscapedText = quote_literal_cstr(filterQueryString); PartitionType partitionType = mapMergeJob->partitionType; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 588bbd3db..fa2e16e3c 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1820,7 +1820,7 @@ SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList, * that the query cannot be executed locally. */ task->taskPlacementList = placementList; - SetTaskQuery(task, query); + SetTaskQueryIfShouldLazyDeparse(task, query); task->anchorShardId = shardId; task->jobId = jobId; task->relationShardList = relationShardList; @@ -1901,7 +1901,7 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, } task->taskPlacementList = placementList; - SetTaskQuery(task, query); + SetTaskQueryIfShouldLazyDeparse(task, query); task->anchorShardId = shardId; task->jobId = jobId; task->relationShardList = relationShardList; diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 95baeb6e4..e4d3734c1 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -23,11 +23,13 @@ extern void RebuildQueryStrings(Job *workerJob); extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); -extern void SetTaskQuery(Task *task, Query *query); +extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryStringList(Task *task, List *queryStringList); -extern void SetTaskPerPlacementQueryStrings(Task *task, List* perPlacementQueryStringList) -extern char * TaskQueryString(Task *task); +extern void SetTaskPerPlacementQueryStrings(Task *task, + List *perPlacementQueryStringList); +extern char * TaskQueryStringAllPlacements(Task *task); +extern char * TaskQueryStringForPlacement(Task *task, int placementIndex); extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList); #endif /* DEPARSE_SHARD_QUERY_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 2f2a65acc..ac619ba1f 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -224,17 +224,17 @@ typedef struct Task * query could possibly be locally executed. In that case deparsing+parsing * the query might not be necessary, so we do that lazily. * - * queryForLocalExecution should only be set by using SetTaskQuery() + * queryForLocalExecution should only be set by using SetTaskQueryIfShouldLazyDeparse() */ Query *queryForLocalExecution; /* * In almost all cases queryStringLazy should be read only indirectly by - * using TaskQueryString(). This will populate the field if only the + * using TaskQueryStringAllPlacements(). This will populate the field if only the * queryForLocalExecution field is not NULL. * * This field should only be set by using SetTaskQueryString() (or as a - * side effect from TaskQueryString()). Otherwise it might not be in sync + * side effect from TaskQueryStringAllPlacements()). Otherwise it might not be in sync * with queryForLocalExecution. */ char *queryStringLazy;