mirror of https://github.com/citusdata/citus.git
add TaskQueryStringForPlacement
TaskQueryStringForPlacement simplifies how the executor gets the query string for a given placement. Task will use the necessary fields to return the correct query placement string. Executor doesn't need to know the details for this. rename TaskQueryString as TaskQueryStringAllPlacements TaskQueryString returns the query string that will be the same for all the placements. In INSERT..SELECT the query string can be different for each placement. Adaptive executor uses TaskQueryStringForPlacement, which returns the query string for a placement. It makes sense to rename TaskQueryString as TaskQueryStringAllPlacements as it is returning the query string for all placements. rename SetTaskQuery as SetTaskQueryIfShouldLazyDeparse SetTaskQuery does not always sets the task query. It can set the query string as well. So it is more clear to name it SetTaskQueryIfShouldLazyDeparse, since it will set the query not query string only when we should deparse the query in a lazy way.pull/3659/head
parent
982b5fbabf
commit
98f95e2a5e
|
@ -3196,18 +3196,9 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
|
|||
List *placementAccessList = PlacementAccessListForTask(task, taskPlacement);
|
||||
int querySent = 0;
|
||||
|
||||
char *queryString = NULL;
|
||||
if (list_length(task->perPlacementQueryStrings) == 0)
|
||||
{
|
||||
queryString = TaskQueryString(task);
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert(list_length(task->taskPlacementList) == list_length(
|
||||
task->perPlacementQueryStrings));
|
||||
queryString = list_nth(task->perPlacementQueryStrings,
|
||||
placementExecution->placementExecutionIndex);
|
||||
}
|
||||
char *queryString = TaskQueryStringForPlacement(task,
|
||||
placementExecution->
|
||||
placementExecutionIndex);
|
||||
|
||||
if (execution->transactionProperties->useRemoteTransactionBlocks !=
|
||||
TRANSACTION_BLOCKS_DISALLOWED)
|
||||
|
|
|
@ -221,7 +221,7 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList,
|
|||
shardPlacement->nodeId,
|
||||
quote_literal_cstr(taskPrefix),
|
||||
quote_literal_cstr(taskPrefix),
|
||||
quote_literal_cstr(TaskQueryString(selectTask)),
|
||||
quote_literal_cstr(TaskQueryStringAllPlacements(selectTask)),
|
||||
partitionColumnIndex,
|
||||
quote_literal_cstr(partitionMethodString),
|
||||
minValuesString->data, maxValuesString->data,
|
||||
|
|
|
@ -1091,7 +1091,7 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries)
|
|||
StringInfo wrappedQuery = makeStringInfo();
|
||||
appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery",
|
||||
projectedColumnsString->data,
|
||||
TaskQueryString(task));
|
||||
TaskQueryStringAllPlacements(task));
|
||||
SetTaskQueryString(task, wrappedQuery->data);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -201,7 +201,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
|
|||
taskParameterTypes = NULL;
|
||||
}
|
||||
|
||||
Query *shardQuery = ParseQueryString(TaskQueryString(task),
|
||||
Query *shardQuery = ParseQueryString(TaskQueryStringAllPlacements(task),
|
||||
taskParameterTypes,
|
||||
taskNumParams);
|
||||
|
||||
|
@ -302,7 +302,7 @@ ExecuteLocalUtilityTaskList(List *localTaskList)
|
|||
|
||||
foreach_ptr(localTask, localTaskList)
|
||||
{
|
||||
const char *localTaskQueryCommand = TaskQueryString(localTask);
|
||||
const char *localTaskQueryCommand = TaskQueryStringAllPlacements(localTask);
|
||||
|
||||
/* we do not expect tasks with INVALID_SHARD_ID for utility commands */
|
||||
Assert(localTask->anchorShardId != INVALID_SHARD_ID);
|
||||
|
@ -390,7 +390,7 @@ LogLocalCommand(Task *task)
|
|||
}
|
||||
|
||||
ereport(NOTICE, (errmsg("executing the command locally: %s",
|
||||
ApplyLogRedaction(TaskQueryString(task)))));
|
||||
ApplyLogRedaction(TaskQueryStringAllPlacements(task)))));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1570,7 +1570,7 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task)
|
|||
*/
|
||||
|
||||
StringInfo sqlTaskQueryString = makeStringInfo();
|
||||
char *escapedTaskQueryString = quote_literal_cstr(TaskQueryString(task));
|
||||
char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringAllPlacements(task));
|
||||
|
||||
if (BinaryMasterCopyFormat)
|
||||
{
|
||||
|
@ -1605,7 +1605,9 @@ TrackerQueueTask(TaskTracker *taskTracker, Task *task)
|
|||
HTAB *taskStateHash = taskTracker->taskStateHash;
|
||||
|
||||
/* wrap a task assignment query outside the original query */
|
||||
StringInfo taskAssignmentQuery = TaskAssignmentQuery(task, TaskQueryString(task));
|
||||
StringInfo taskAssignmentQuery = TaskAssignmentQuery(task,
|
||||
TaskQueryStringAllPlacements(
|
||||
task));
|
||||
|
||||
TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId,
|
||||
task->taskId);
|
||||
|
@ -2742,7 +2744,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
|||
{
|
||||
/* assign through task tracker to manage resource utilization */
|
||||
StringInfo jobCleanupQuery = TaskAssignmentQuery(
|
||||
jobCleanupTask, TaskQueryString(jobCleanupTask));
|
||||
jobCleanupTask, TaskQueryStringAllPlacements(jobCleanupTask));
|
||||
|
||||
jobCleanupQuerySent = MultiClientSendQuery(taskTracker->connectionId,
|
||||
jobCleanupQuery->data);
|
||||
|
@ -2821,7 +2823,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
|||
nodeName, nodePort, (int) queryStatus),
|
||||
errhint("Manually clean job resources on node "
|
||||
"\"%s:%u\" by running \"%s\" ", nodeName,
|
||||
nodePort, TaskQueryString(
|
||||
nodePort, TaskQueryStringAllPlacements(
|
||||
jobCleanupTask))));
|
||||
}
|
||||
else
|
||||
|
@ -2840,7 +2842,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
|||
nodePort, (int) resultStatus),
|
||||
errhint("Manually clean job resources on node "
|
||||
"\"%s:%u\" by running \"%s\" ", nodeName,
|
||||
nodePort, TaskQueryString(jobCleanupTask))));
|
||||
nodePort, TaskQueryStringAllPlacements(
|
||||
jobCleanupTask))));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -447,7 +447,8 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
|
|||
* connect to that node to drop the shard placement over that
|
||||
* remote connection.
|
||||
*/
|
||||
const char *dropShardPlacementCommand = TaskQueryString(task);
|
||||
const char *dropShardPlacementCommand = TaskQueryStringAllPlacements(
|
||||
task);
|
||||
ExecuteDropShardPlacementCommandRemotely(shardPlacement,
|
||||
relationName,
|
||||
dropShardPlacementCommand);
|
||||
|
|
|
@ -41,7 +41,8 @@ static void UpdateTaskQueryString(Query *query, Oid distributedTableId,
|
|||
static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte);
|
||||
static bool ShouldLazyDeparseQuery(Task *task);
|
||||
static char * DeparseTaskQuery(Task *task, Query *query);
|
||||
static bool IsEachPlacementQueryStringDifferent(Task* task);
|
||||
static bool IsEachPlacementQueryStringDifferent(Task *task);
|
||||
static char * TaskQueryStringForAllPlacements(Task *task);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -116,7 +117,8 @@ RebuildQueryStrings(Job *workerJob)
|
|||
task->queryForLocalExecution == NULL &&
|
||||
task->queryStringLazy == NULL
|
||||
? "(null)"
|
||||
: ApplyLogRedaction(TaskQueryString(task)))));
|
||||
: ApplyLogRedaction(TaskQueryStringAllPlacements(
|
||||
task)))));
|
||||
|
||||
UpdateTaskQueryString(query, relationId, valuesRTE, task);
|
||||
|
||||
|
@ -127,7 +129,7 @@ RebuildQueryStrings(Job *workerJob)
|
|||
task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved;
|
||||
|
||||
ereport(DEBUG4, (errmsg("query after rebuilding: %s",
|
||||
ApplyLogRedaction(TaskQueryString(task)))));
|
||||
ApplyLogRedaction(TaskQueryStringAllPlacements(task)))));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -182,7 +184,7 @@ UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *value
|
|||
task->anchorDistributedTableId = distributedTableId;
|
||||
}
|
||||
|
||||
SetTaskQuery(task, query);
|
||||
SetTaskQueryIfShouldLazyDeparse(task, query);
|
||||
|
||||
if (valuesRTE != NULL)
|
||||
{
|
||||
|
@ -413,13 +415,13 @@ ShouldLazyDeparseQuery(Task *task)
|
|||
|
||||
|
||||
/*
|
||||
* SetTaskQuery attaches the query to the task so that it can be used during
|
||||
* SetTaskQueryIfShouldLazyDeparse attaches the query to the task so that it can be used during
|
||||
* execution. If local execution can possibly take place it sets task->queryForLocalExecution.
|
||||
* If not it deparses the query and sets queryStringLazy, to avoid blowing the
|
||||
* size of the task unnecesarily.
|
||||
*/
|
||||
void
|
||||
SetTaskQuery(Task *task, Query *query)
|
||||
SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query)
|
||||
{
|
||||
if (ShouldLazyDeparseQuery(task))
|
||||
{
|
||||
|
@ -427,7 +429,7 @@ SetTaskQuery(Task *task, Query *query)
|
|||
task->queryStringLazy = NULL;
|
||||
return;
|
||||
}
|
||||
SetTaskQueryString(task, DeparseTaskQuery(task,query));
|
||||
SetTaskQueryString(task, DeparseTaskQuery(task, query));
|
||||
}
|
||||
|
||||
|
||||
|
@ -443,10 +445,13 @@ SetTaskQueryString(Task *task, char *queryString)
|
|||
task->queryStringLazy = queryString;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetTaskPerPlacementQueryStrings set the perPlacementQueryString for the given task.
|
||||
*/
|
||||
void SetTaskPerPlacementQueryStrings(Task *task, List* perPlacementQueryStringList) {
|
||||
void
|
||||
SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList)
|
||||
{
|
||||
Assert(perPlacementQueryStringList != NIL);
|
||||
task->perPlacementQueryStrings = perPlacementQueryStringList;
|
||||
SetTaskQueryString(task, NULL);
|
||||
|
@ -494,14 +499,25 @@ DeparseTaskQuery(Task *task, Query *query)
|
|||
|
||||
|
||||
/*
|
||||
* TaskQueryString generates task->queryStringLazy if missing.
|
||||
* TaskQueryStringAllPlacements generates task->queryStringLazy if missing.
|
||||
*
|
||||
* For performance reasons, the queryString is generated lazily. For example
|
||||
* for local queries it is usually not needed to generate it, so this way we
|
||||
* can skip the expensive deparsing+parsing.
|
||||
*/
|
||||
char *
|
||||
TaskQueryString(Task *task)
|
||||
TaskQueryStringAllPlacements(Task *task)
|
||||
{
|
||||
return TaskQueryStringForAllPlacements(task);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TaskQueryStringForAllPlacements returns the query string for the given task.
|
||||
* In this case, each placement has the same query string.
|
||||
*/
|
||||
static char *
|
||||
TaskQueryStringForAllPlacements(Task *task)
|
||||
{
|
||||
if (task->queryStringLazy != NULL)
|
||||
{
|
||||
|
@ -520,15 +536,30 @@ TaskQueryString(Task *task)
|
|||
*/
|
||||
MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext(
|
||||
task->queryForLocalExecution));
|
||||
task->queryStringLazy = DeparseTaskQuery(task, task->queryForLocalExecution);
|
||||
SetTaskQueryString(task, DeparseTaskQuery(task, task->queryForLocalExecution));
|
||||
MemoryContextSwitchTo(previousContext);
|
||||
return task->queryStringLazy;
|
||||
}
|
||||
|
||||
|
||||
char *
|
||||
TaskQueryStringForPlacement(Task *task, int placementIndex)
|
||||
{
|
||||
if (IsEachPlacementQueryStringDifferent(task))
|
||||
{
|
||||
Assert(list_length(task->perPlacementQueryStrings) > placementIndex);
|
||||
return list_nth(task->perPlacementQueryStrings, placementIndex);
|
||||
}
|
||||
return TaskQueryStringForAllPlacements(task);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsEachPlacementQueryStringDifferent returns true if each placement
|
||||
* has a different query string.
|
||||
*/
|
||||
static bool IsEachPlacementQueryStringDifferent(Task* task) {
|
||||
static bool
|
||||
IsEachPlacementQueryStringDifferent(Task *task)
|
||||
{
|
||||
return list_length(task->perPlacementQueryStrings) > 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -367,7 +367,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
|
|||
task = CitusMakeNode(Task);
|
||||
task->taskType = SELECT_TASK;
|
||||
task->taskPlacementList = placementList;
|
||||
SetTaskQuery(task, planContext->query);
|
||||
SetTaskQueryIfShouldLazyDeparse(task, planContext->query);
|
||||
task->anchorShardId = shardInterval->shardId;
|
||||
task->replicationModel = distTable->replicationModel;
|
||||
|
||||
|
|
|
@ -399,7 +399,8 @@ RemoteExplain(Task *task, ExplainState *es)
|
|||
|
||||
RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0(
|
||||
sizeof(RemoteExplainPlan));
|
||||
StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryString(task), es);
|
||||
StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringAllPlacements(task),
|
||||
es);
|
||||
|
||||
/*
|
||||
* Use a coordinated transaction to ensure that we open a transaction block
|
||||
|
|
|
@ -4388,7 +4388,7 @@ CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask,
|
|||
|
||||
/* wrap repartition query string around filter query string */
|
||||
StringInfo mapQueryString = makeStringInfo();
|
||||
char *filterQueryString = TaskQueryString(filterTask);
|
||||
char *filterQueryString = TaskQueryStringAllPlacements(filterTask);
|
||||
char *filterQueryEscapedText = quote_literal_cstr(filterQueryString);
|
||||
PartitionType partitionType = mapMergeJob->partitionType;
|
||||
|
||||
|
|
|
@ -1820,7 +1820,7 @@ SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList,
|
|||
* that the query cannot be executed locally.
|
||||
*/
|
||||
task->taskPlacementList = placementList;
|
||||
SetTaskQuery(task, query);
|
||||
SetTaskQueryIfShouldLazyDeparse(task, query);
|
||||
task->anchorShardId = shardId;
|
||||
task->jobId = jobId;
|
||||
task->relationShardList = relationShardList;
|
||||
|
@ -1901,7 +1901,7 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList,
|
|||
}
|
||||
|
||||
task->taskPlacementList = placementList;
|
||||
SetTaskQuery(task, query);
|
||||
SetTaskQueryIfShouldLazyDeparse(task, query);
|
||||
task->anchorShardId = shardId;
|
||||
task->jobId = jobId;
|
||||
task->relationShardList = relationShardList;
|
||||
|
|
|
@ -23,11 +23,13 @@
|
|||
|
||||
extern void RebuildQueryStrings(Job *workerJob);
|
||||
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
|
||||
extern void SetTaskQuery(Task *task, Query *query);
|
||||
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
|
||||
extern void SetTaskQueryString(Task *task, char *queryString);
|
||||
extern void SetTaskQueryStringList(Task *task, List *queryStringList);
|
||||
extern void SetTaskPerPlacementQueryStrings(Task *task, List* perPlacementQueryStringList)
|
||||
extern char * TaskQueryString(Task *task);
|
||||
extern void SetTaskPerPlacementQueryStrings(Task *task,
|
||||
List *perPlacementQueryStringList);
|
||||
extern char * TaskQueryStringAllPlacements(Task *task);
|
||||
extern char * TaskQueryStringForPlacement(Task *task, int placementIndex);
|
||||
extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList);
|
||||
|
||||
#endif /* DEPARSE_SHARD_QUERY_H */
|
||||
|
|
|
@ -224,17 +224,17 @@ typedef struct Task
|
|||
* query could possibly be locally executed. In that case deparsing+parsing
|
||||
* the query might not be necessary, so we do that lazily.
|
||||
*
|
||||
* queryForLocalExecution should only be set by using SetTaskQuery()
|
||||
* queryForLocalExecution should only be set by using SetTaskQueryIfShouldLazyDeparse()
|
||||
*/
|
||||
Query *queryForLocalExecution;
|
||||
|
||||
/*
|
||||
* In almost all cases queryStringLazy should be read only indirectly by
|
||||
* using TaskQueryString(). This will populate the field if only the
|
||||
* using TaskQueryStringAllPlacements(). This will populate the field if only the
|
||||
* queryForLocalExecution field is not NULL.
|
||||
*
|
||||
* This field should only be set by using SetTaskQueryString() (or as a
|
||||
* side effect from TaskQueryString()). Otherwise it might not be in sync
|
||||
* side effect from TaskQueryStringAllPlacements()). Otherwise it might not be in sync
|
||||
* with queryForLocalExecution.
|
||||
*/
|
||||
char *queryStringLazy;
|
||||
|
|
Loading…
Reference in New Issue