mirror of https://github.com/citusdata/citus.git
add TaskQuery struct to abstract query string related fields
We had many fields in task related to query strings. It was kind of complex, and only of them could be set at a time. Therefore it makes more sense to abstract this and use a union so that it is clear that only of them should be set. We have three fields that could have query related strings: - queryForLocation - queryStringLazy - perPlacementQueryStrings Relatively, they can be set with: - SetTaskQueryString - SetTaskQueryIfShouldLazyDeparse - SetTaskPerPlacementQueryStrings The direct usage of the query related fields are also removed. Rename queryForLocalExecution Currently queryForLocalExecution is only used for deparsing purposes, therefore it makes sense to rename it to what it is doing.pull/3659/head
parent
98f95e2a5e
commit
c796ac335d
|
@ -221,7 +221,7 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList,
|
|||
shardPlacement->nodeId,
|
||||
quote_literal_cstr(taskPrefix),
|
||||
quote_literal_cstr(taskPrefix),
|
||||
quote_literal_cstr(TaskQueryStringAllPlacements(selectTask)),
|
||||
quote_literal_cstr(TaskQueryStringForAllPlacements(selectTask)),
|
||||
partitionColumnIndex,
|
||||
quote_literal_cstr(partitionMethodString),
|
||||
minValuesString->data, maxValuesString->data,
|
||||
|
|
|
@ -1062,7 +1062,7 @@ IsRedistributablePlan(Plan *selectPlan)
|
|||
|
||||
|
||||
/*
|
||||
* WrapTaskListForProjection wraps task->queryString to only select given
|
||||
* WrapTaskListForProjection wraps task query string to only select given
|
||||
* projected columns. It modifies the taskList.
|
||||
*/
|
||||
static void
|
||||
|
@ -1091,7 +1091,7 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries)
|
|||
StringInfo wrappedQuery = makeStringInfo();
|
||||
appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery",
|
||||
projectedColumnsString->data,
|
||||
TaskQueryStringAllPlacements(task));
|
||||
TaskQueryStringForAllPlacements(task));
|
||||
SetTaskQueryString(task, wrappedQuery->data);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -201,7 +201,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
|
|||
taskParameterTypes = NULL;
|
||||
}
|
||||
|
||||
Query *shardQuery = ParseQueryString(TaskQueryStringAllPlacements(task),
|
||||
Query *shardQuery = ParseQueryString(TaskQueryStringForAllPlacements(task),
|
||||
taskParameterTypes,
|
||||
taskNumParams);
|
||||
|
||||
|
@ -220,9 +220,15 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
|
|||
|
||||
LogLocalCommand(task);
|
||||
|
||||
char *shardQueryString = task->queryStringLazy
|
||||
? task->queryStringLazy
|
||||
: "<optimized out by local execution>";
|
||||
char *shardQueryString = NULL;
|
||||
if (GetTaskQueryType(task) == TASK_QUERY_TEXT)
|
||||
{
|
||||
shardQueryString = TaskQueryStringForAllPlacements(task);
|
||||
}
|
||||
else
|
||||
{
|
||||
shardQueryString = "<optimized out by local execution>";
|
||||
}
|
||||
|
||||
totalRowsProcessed +=
|
||||
ExecuteLocalTaskPlan(scanState, localPlan, shardQueryString);
|
||||
|
@ -302,7 +308,7 @@ ExecuteLocalUtilityTaskList(List *localTaskList)
|
|||
|
||||
foreach_ptr(localTask, localTaskList)
|
||||
{
|
||||
const char *localTaskQueryCommand = TaskQueryStringAllPlacements(localTask);
|
||||
const char *localTaskQueryCommand = TaskQueryStringForAllPlacements(localTask);
|
||||
|
||||
/* we do not expect tasks with INVALID_SHARD_ID for utility commands */
|
||||
Assert(localTask->anchorShardId != INVALID_SHARD_ID);
|
||||
|
@ -390,7 +396,7 @@ LogLocalCommand(Task *task)
|
|||
}
|
||||
|
||||
ereport(NOTICE, (errmsg("executing the command locally: %s",
|
||||
ApplyLogRedaction(TaskQueryStringAllPlacements(task)))));
|
||||
ApplyLogRedaction(TaskQueryStringForAllPlacements(task)))));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1570,7 +1570,7 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task)
|
|||
*/
|
||||
|
||||
StringInfo sqlTaskQueryString = makeStringInfo();
|
||||
char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringAllPlacements(task));
|
||||
char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringForAllPlacements(task));
|
||||
|
||||
if (BinaryMasterCopyFormat)
|
||||
{
|
||||
|
@ -1606,7 +1606,7 @@ TrackerQueueTask(TaskTracker *taskTracker, Task *task)
|
|||
|
||||
/* wrap a task assignment query outside the original query */
|
||||
StringInfo taskAssignmentQuery = TaskAssignmentQuery(task,
|
||||
TaskQueryStringAllPlacements(
|
||||
TaskQueryStringForAllPlacements(
|
||||
task));
|
||||
|
||||
TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId,
|
||||
|
@ -2744,7 +2744,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
|||
{
|
||||
/* assign through task tracker to manage resource utilization */
|
||||
StringInfo jobCleanupQuery = TaskAssignmentQuery(
|
||||
jobCleanupTask, TaskQueryStringAllPlacements(jobCleanupTask));
|
||||
jobCleanupTask, TaskQueryStringForAllPlacements(jobCleanupTask));
|
||||
|
||||
jobCleanupQuerySent = MultiClientSendQuery(taskTracker->connectionId,
|
||||
jobCleanupQuery->data);
|
||||
|
@ -2823,7 +2823,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
|||
nodeName, nodePort, (int) queryStatus),
|
||||
errhint("Manually clean job resources on node "
|
||||
"\"%s:%u\" by running \"%s\" ", nodeName,
|
||||
nodePort, TaskQueryStringAllPlacements(
|
||||
nodePort, TaskQueryStringForAllPlacements(
|
||||
jobCleanupTask))));
|
||||
}
|
||||
else
|
||||
|
@ -2842,7 +2842,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
|||
nodePort, (int) resultStatus),
|
||||
errhint("Manually clean job resources on node "
|
||||
"\"%s:%u\" by running \"%s\" ", nodeName,
|
||||
nodePort, TaskQueryStringAllPlacements(
|
||||
nodePort, TaskQueryStringForAllPlacements(
|
||||
jobCleanupTask))));
|
||||
}
|
||||
else
|
||||
|
|
|
@ -447,7 +447,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
|
|||
* connect to that node to drop the shard placement over that
|
||||
* remote connection.
|
||||
*/
|
||||
const char *dropShardPlacementCommand = TaskQueryStringAllPlacements(
|
||||
const char *dropShardPlacementCommand = TaskQueryStringForAllPlacements(
|
||||
task);
|
||||
ExecuteDropShardPlacementCommandRemotely(shardPlacement,
|
||||
relationName,
|
||||
|
|
|
@ -42,7 +42,7 @@ static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte);
|
|||
static bool ShouldLazyDeparseQuery(Task *task);
|
||||
static char * DeparseTaskQuery(Task *task, Query *query);
|
||||
static bool IsEachPlacementQueryStringDifferent(Task *task);
|
||||
static char * TaskQueryStringForAllPlacements(Task *task);
|
||||
static void InitializeTaskQueryIfNecessary(Task *task);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -113,11 +113,12 @@ RebuildQueryStrings(Job *workerJob)
|
|||
}
|
||||
}
|
||||
|
||||
bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT ||
|
||||
GetTaskQueryType(task) == TASK_QUERY_OBJECT;
|
||||
ereport(DEBUG4, (errmsg("query before rebuilding: %s",
|
||||
task->queryForLocalExecution == NULL &&
|
||||
task->queryStringLazy == NULL
|
||||
!isQueryObjectOrText
|
||||
? "(null)"
|
||||
: ApplyLogRedaction(TaskQueryStringAllPlacements(
|
||||
: ApplyLogRedaction(TaskQueryStringForAllPlacements(
|
||||
task)))));
|
||||
|
||||
UpdateTaskQueryString(query, relationId, valuesRTE, task);
|
||||
|
@ -129,7 +130,7 @@ RebuildQueryStrings(Job *workerJob)
|
|||
task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved;
|
||||
|
||||
ereport(DEBUG4, (errmsg("query after rebuilding: %s",
|
||||
ApplyLogRedaction(TaskQueryStringAllPlacements(task)))));
|
||||
ApplyLogRedaction(TaskQueryStringForAllPlacements(task)))));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -416,7 +417,7 @@ ShouldLazyDeparseQuery(Task *task)
|
|||
|
||||
/*
|
||||
* SetTaskQueryIfShouldLazyDeparse attaches the query to the task so that it can be used during
|
||||
* execution. If local execution can possibly take place it sets task->queryForLocalExecution.
|
||||
* execution. If local execution can possibly take place it sets task->jobQueryReferenceForLazyDeparsing.
|
||||
* If not it deparses the query and sets queryStringLazy, to avoid blowing the
|
||||
* size of the task unnecesarily.
|
||||
*/
|
||||
|
@ -425,24 +426,41 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query)
|
|||
{
|
||||
if (ShouldLazyDeparseQuery(task))
|
||||
{
|
||||
task->queryForLocalExecution = query;
|
||||
task->queryStringLazy = NULL;
|
||||
InitializeTaskQueryIfNecessary(task);
|
||||
task->taskQuery->queryType = TASK_QUERY_OBJECT;
|
||||
task->taskQuery->data.jobQueryReferenceForLazyDeparsing = query;
|
||||
return;
|
||||
}
|
||||
|
||||
SetTaskQueryString(task, DeparseTaskQuery(task, query));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetTaskQueryString attaches the query string to the task so that it can be
|
||||
* used during execution. It also unsets queryForLocalExecution to be sure
|
||||
* used during execution. It also unsets jobQueryReferenceForLazyDeparsing to be sure
|
||||
* these are kept in sync.
|
||||
*/
|
||||
void
|
||||
SetTaskQueryString(Task *task, char *queryString)
|
||||
{
|
||||
task->queryForLocalExecution = NULL;
|
||||
task->queryStringLazy = queryString;
|
||||
InitializeTaskQueryIfNecessary(task);
|
||||
task->taskQuery->queryType = TASK_QUERY_TEXT;
|
||||
task->taskQuery->data.queryStringLazy = queryString;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InitializeTaskQueryIfNecessary initializes task query if it
|
||||
* is not yet allocated.
|
||||
*/
|
||||
static void
|
||||
InitializeTaskQueryIfNecessary(Task *task)
|
||||
{
|
||||
if (task->taskQuery == NULL)
|
||||
{
|
||||
task->taskQuery = CitusMakeNode(TaskQuery);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -452,9 +470,10 @@ SetTaskQueryString(Task *task, char *queryString)
|
|||
void
|
||||
SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList)
|
||||
{
|
||||
InitializeTaskQueryIfNecessary(task);
|
||||
Assert(perPlacementQueryStringList != NIL);
|
||||
task->perPlacementQueryStrings = perPlacementQueryStringList;
|
||||
SetTaskQueryString(task, NULL);
|
||||
task->taskQuery->queryType = TASK_QUERY_TEXT_PER_PLACEMENT;
|
||||
task->taskQuery->data.perPlacementQueryStrings = perPlacementQueryStringList;
|
||||
}
|
||||
|
||||
|
||||
|
@ -499,46 +518,48 @@ DeparseTaskQuery(Task *task, Query *query)
|
|||
|
||||
|
||||
/*
|
||||
* TaskQueryStringAllPlacements generates task->queryStringLazy if missing.
|
||||
* GetTaskQueryType returns the type of the task query.
|
||||
*/
|
||||
int
|
||||
GetTaskQueryType(Task *task)
|
||||
{
|
||||
return task->taskQuery->queryType;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TaskQueryStringForAllPlacements generates task query string text if missing.
|
||||
*
|
||||
* For performance reasons, the queryString is generated lazily. For example
|
||||
* for local queries it is usually not needed to generate it, so this way we
|
||||
* can skip the expensive deparsing+parsing.
|
||||
*/
|
||||
char *
|
||||
TaskQueryStringAllPlacements(Task *task)
|
||||
{
|
||||
return TaskQueryStringForAllPlacements(task);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TaskQueryStringForAllPlacements returns the query string for the given task.
|
||||
* In this case, each placement has the same query string.
|
||||
*/
|
||||
static char *
|
||||
TaskQueryStringForAllPlacements(Task *task)
|
||||
{
|
||||
if (task->queryStringLazy != NULL)
|
||||
if (GetTaskQueryType(task) == TASK_QUERY_TEXT)
|
||||
{
|
||||
return task->queryStringLazy;
|
||||
return task->taskQuery->data.queryStringLazy;
|
||||
}
|
||||
Assert(task->queryForLocalExecution != NULL);
|
||||
Query *jobQueryReferenceForLazyDeparsing = task->taskQuery->data.jobQueryReferenceForLazyDeparsing;
|
||||
Assert(task->taskQuery->queryType == TASK_QUERY_OBJECT && jobQueryReferenceForLazyDeparsing !=
|
||||
NULL);
|
||||
|
||||
|
||||
/*
|
||||
* Switch to the memory context of task->queryForLocalExecution before generating the query
|
||||
* Switch to the memory context of task->jobQueryReferenceForLazyDeparsing before generating the query
|
||||
* string. This way the query string is not freed in between multiple
|
||||
* executions of a prepared statement. Except when UpdateTaskQueryString is
|
||||
* used to set task->queryForLocalExecution, in that case it is freed but it will be set to
|
||||
* used to set task->jobQueryReferenceForLazyDeparsing, in that case it is freed but it will be set to
|
||||
* NULL on the next execution of the query because UpdateTaskQueryString
|
||||
* does that.
|
||||
*/
|
||||
MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext(
|
||||
task->queryForLocalExecution));
|
||||
SetTaskQueryString(task, DeparseTaskQuery(task, task->queryForLocalExecution));
|
||||
jobQueryReferenceForLazyDeparsing));
|
||||
char *queryString = DeparseTaskQuery(task, jobQueryReferenceForLazyDeparsing);
|
||||
MemoryContextSwitchTo(previousContext);
|
||||
return task->queryStringLazy;
|
||||
SetTaskQueryString(task, queryString);
|
||||
return task->taskQuery->data.queryStringLazy;
|
||||
}
|
||||
|
||||
|
||||
|
@ -547,8 +568,10 @@ TaskQueryStringForPlacement(Task *task, int placementIndex)
|
|||
{
|
||||
if (IsEachPlacementQueryStringDifferent(task))
|
||||
{
|
||||
Assert(list_length(task->perPlacementQueryStrings) > placementIndex);
|
||||
return list_nth(task->perPlacementQueryStrings, placementIndex);
|
||||
List *perPlacementQueryStringList =
|
||||
task->taskQuery->data.perPlacementQueryStrings;
|
||||
Assert(list_length(perPlacementQueryStringList) > placementIndex);
|
||||
return list_nth(perPlacementQueryStringList, placementIndex);
|
||||
}
|
||||
return TaskQueryStringForAllPlacements(task);
|
||||
}
|
||||
|
@ -561,5 +584,5 @@ TaskQueryStringForPlacement(Task *task, int placementIndex)
|
|||
static bool
|
||||
IsEachPlacementQueryStringDifferent(Task *task)
|
||||
{
|
||||
return list_length(task->perPlacementQueryStrings) > 0;
|
||||
return GetTaskQueryType(task) == TASK_QUERY_TEXT_PER_PLACEMENT;
|
||||
}
|
||||
|
|
|
@ -399,7 +399,7 @@ RemoteExplain(Task *task, ExplainState *es)
|
|||
|
||||
RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0(
|
||||
sizeof(RemoteExplainPlan));
|
||||
StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringAllPlacements(task),
|
||||
StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringForAllPlacements(task),
|
||||
es);
|
||||
|
||||
/*
|
||||
|
|
|
@ -4388,7 +4388,7 @@ CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask,
|
|||
|
||||
/* wrap repartition query string around filter query string */
|
||||
StringInfo mapQueryString = makeStringInfo();
|
||||
char *filterQueryString = TaskQueryStringAllPlacements(filterTask);
|
||||
char *filterQueryString = TaskQueryStringForAllPlacements(filterTask);
|
||||
char *filterQueryEscapedText = quote_literal_cstr(filterQueryString);
|
||||
PartitionType partitionType = mapMergeJob->partitionType;
|
||||
|
||||
|
|
|
@ -250,6 +250,40 @@ CopyNodeRelationRowLock(COPYFUNC_ARGS)
|
|||
}
|
||||
|
||||
|
||||
void
|
||||
CopyNodeTaskQuery(COPYFUNC_ARGS)
|
||||
{
|
||||
DECLARE_FROM_AND_NEW_NODE(TaskQuery);
|
||||
COPY_SCALAR_FIELD(queryType);
|
||||
|
||||
switch (from->queryType)
|
||||
{
|
||||
case TASK_QUERY_TEXT:
|
||||
{
|
||||
COPY_STRING_FIELD(data.queryStringLazy);
|
||||
break;
|
||||
}
|
||||
|
||||
case TASK_QUERY_OBJECT:
|
||||
{
|
||||
COPY_NODE_FIELD(data.jobQueryReferenceForLazyDeparsing);
|
||||
break;
|
||||
}
|
||||
|
||||
case TASK_QUERY_TEXT_PER_PLACEMENT:
|
||||
{
|
||||
COPY_NODE_FIELD(data.perPlacementQueryStrings);
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
CopyNodeTask(COPYFUNC_ARGS)
|
||||
{
|
||||
|
@ -258,9 +292,7 @@ CopyNodeTask(COPYFUNC_ARGS)
|
|||
COPY_SCALAR_FIELD(taskType);
|
||||
COPY_SCALAR_FIELD(jobId);
|
||||
COPY_SCALAR_FIELD(taskId);
|
||||
COPY_NODE_FIELD(queryForLocalExecution);
|
||||
COPY_STRING_FIELD(queryStringLazy);
|
||||
COPY_NODE_FIELD(perPlacementQueryStrings);
|
||||
COPY_NODE_FIELD(taskQuery);
|
||||
COPY_NODE_FIELD(queryStringList);
|
||||
COPY_SCALAR_FIELD(anchorDistributedTableId);
|
||||
COPY_SCALAR_FIELD(anchorShardId);
|
||||
|
|
|
@ -44,7 +44,8 @@ static const char *CitusNodeTagNamesD[] = {
|
|||
"RelationShard",
|
||||
"RelationRowLock",
|
||||
"DeferredErrorMessage",
|
||||
"GroupShardPlacement"
|
||||
"GroupShardPlacement",
|
||||
"TaskQuery",
|
||||
};
|
||||
|
||||
const char **CitusNodeTagNames = CitusNodeTagNamesD;
|
||||
|
@ -401,6 +402,7 @@ const ExtensibleNodeMethods nodeMethods[] =
|
|||
DEFINE_NODE_METHODS(TaskExecution),
|
||||
DEFINE_NODE_METHODS(DeferredErrorMessage),
|
||||
DEFINE_NODE_METHODS(GroupShardPlacement),
|
||||
DEFINE_NODE_METHODS(TaskQuery),
|
||||
|
||||
/* nodes with only output support */
|
||||
DEFINE_NODE_METHODS_NO_READ(MultiNode),
|
||||
|
|
|
@ -469,6 +469,12 @@ OutRelationRowLock(OUTFUNC_ARGS)
|
|||
WRITE_ENUM_FIELD(rowLockStrength, LockClauseStrength);
|
||||
}
|
||||
|
||||
void OutTaskQuery(OUTFUNC_ARGS) {
|
||||
WRITE_LOCALS(TaskQuery);
|
||||
WRITE_NODE_TYPE("TASKQUERY");
|
||||
|
||||
WRITE_ENUM_FIELD(queryType, TaskQueryType);
|
||||
}
|
||||
|
||||
void
|
||||
OutTask(OUTFUNC_ARGS)
|
||||
|
@ -479,9 +485,7 @@ OutTask(OUTFUNC_ARGS)
|
|||
WRITE_ENUM_FIELD(taskType, TaskType);
|
||||
WRITE_UINT64_FIELD(jobId);
|
||||
WRITE_UINT_FIELD(taskId);
|
||||
WRITE_NODE_FIELD(queryForLocalExecution);
|
||||
WRITE_STRING_FIELD(queryStringLazy);
|
||||
WRITE_NODE_FIELD(perPlacementQueryStrings);
|
||||
// WRITE_SCALAR_FIELD(taskQuery);
|
||||
WRITE_NODE_FIELD(queryStringList);
|
||||
WRITE_OID_FIELD(anchorDistributedTableId);
|
||||
WRITE_UINT64_FIELD(anchorShardId);
|
||||
|
|
|
@ -47,6 +47,7 @@ extern void OutMapMergeJob(OUTFUNC_ARGS);
|
|||
extern void OutShardPlacement(OUTFUNC_ARGS);
|
||||
extern void OutRelationShard(OUTFUNC_ARGS);
|
||||
extern void OutRelationRowLock(OUTFUNC_ARGS);
|
||||
extern void OutTaskQuery(OUTFUNC_ARGS);
|
||||
extern void OutTask(OUTFUNC_ARGS);
|
||||
extern void OutLocalPlannedStatement(OUTFUNC_ARGS);
|
||||
extern void OutTaskExecution(OUTFUNC_ARGS);
|
||||
|
@ -76,6 +77,7 @@ extern void CopyNodeRelationShard(COPYFUNC_ARGS);
|
|||
extern void CopyNodeRelationRowLock(COPYFUNC_ARGS);
|
||||
extern void CopyNodeTask(COPYFUNC_ARGS);
|
||||
extern void CopyNodeLocalPlannedStatement(COPYFUNC_ARGS);
|
||||
extern void CopyNodeTaskQuery(COPYFUNC_ARGS);
|
||||
extern void CopyNodeTaskExecution(COPYFUNC_ARGS);
|
||||
extern void CopyNodeDeferredErrorMessage(COPYFUNC_ARGS);
|
||||
|
||||
|
|
|
@ -66,7 +66,8 @@ typedef enum CitusNodeTag
|
|||
T_RelationShard,
|
||||
T_RelationRowLock,
|
||||
T_DeferredErrorMessage,
|
||||
T_GroupShardPlacement
|
||||
T_GroupShardPlacement,
|
||||
T_TaskQuery
|
||||
} CitusNodeTag;
|
||||
|
||||
|
||||
|
|
|
@ -28,8 +28,9 @@ extern void SetTaskQueryString(Task *task, char *queryString);
|
|||
extern void SetTaskQueryStringList(Task *task, List *queryStringList);
|
||||
extern void SetTaskPerPlacementQueryStrings(Task *task,
|
||||
List *perPlacementQueryStringList);
|
||||
extern char * TaskQueryStringAllPlacements(Task *task);
|
||||
extern char * TaskQueryStringForAllPlacements(Task *task);
|
||||
extern char * TaskQueryStringForPlacement(Task *task, int placementIndex);
|
||||
extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList);
|
||||
extern int GetTaskQueryType(Task *task);
|
||||
|
||||
#endif /* DEPARSE_SHARD_QUERY_H */
|
||||
|
|
|
@ -203,6 +203,56 @@ typedef struct MapMergeJob
|
|||
*/
|
||||
typedef struct TaskExecution TaskExecution;
|
||||
|
||||
typedef enum TaskQueryType
|
||||
{
|
||||
TASK_QUERY_TEXT,
|
||||
TASK_QUERY_OBJECT,
|
||||
TASK_QUERY_TEXT_PER_PLACEMENT
|
||||
} TaskQueryType;
|
||||
|
||||
typedef struct TaskQuery
|
||||
{
|
||||
CitusNode type;
|
||||
TaskQueryType queryType;
|
||||
|
||||
union
|
||||
{
|
||||
/*
|
||||
* For most queries jobQueryReferenceForLazyDeparsing and/or queryStringLazy is not
|
||||
* NULL. This means we have a single query for all placements.
|
||||
*
|
||||
* If this is not the case, the length of perPlacementQueryStrings is
|
||||
* non-zero and equal to length of taskPlacementList. Like this it can
|
||||
* assign a different query for each placement. We need this flexibility
|
||||
* when a query should return node specific values. For example, on which
|
||||
* node did we succeed storing some result files?
|
||||
*
|
||||
* jobQueryReferenceForLazyDeparsing is only not null when the planner thinks the
|
||||
* query could possibly be locally executed. In that case deparsing+parsing
|
||||
* the query might not be necessary, so we do that lazily.
|
||||
*
|
||||
* jobQueryReferenceForLazyDeparsing should only be set by using SetTaskQueryIfShouldLazyDeparse()
|
||||
*/
|
||||
Query *jobQueryReferenceForLazyDeparsing;
|
||||
|
||||
/*
|
||||
* In almost all cases queryStringLazy should be read only indirectly by
|
||||
* using TaskQueryStringForAllPlacements(). This will populate the field if only the
|
||||
* jobQueryReferenceForLazyDeparsing field is not NULL.
|
||||
*
|
||||
* This field should only be set by using SetTaskQueryString() (or as a
|
||||
* side effect from TaskQueryStringForAllPlacements()). Otherwise it might not be in sync
|
||||
* with jobQueryReferenceForLazyDeparsing.
|
||||
*/
|
||||
char *queryStringLazy;
|
||||
|
||||
/*
|
||||
* perPlacementQueryStrings is used when we have different query strings for each placement.
|
||||
*/
|
||||
List *perPlacementQueryStrings;
|
||||
}data;
|
||||
}TaskQuery;
|
||||
|
||||
typedef struct Task
|
||||
{
|
||||
CitusNode type;
|
||||
|
@ -210,35 +260,7 @@ typedef struct Task
|
|||
uint64 jobId;
|
||||
uint32 taskId;
|
||||
|
||||
/*
|
||||
* For most queries queryForLocalExecution and/or queryStringLazy is not
|
||||
* NULL. This means we have a single query for all placements.
|
||||
*
|
||||
* If this is not the case, the length of perPlacementQueryStrings is
|
||||
* non-zero and equal to length of taskPlacementList. Like this it can
|
||||
* assign a different query for each placement. We need this flexibility
|
||||
* when a query should return node specific values. For example, on which
|
||||
* node did we succeed storing some result files?
|
||||
*
|
||||
* queryForLocalExecution is only not null when the planner thinks the
|
||||
* query could possibly be locally executed. In that case deparsing+parsing
|
||||
* the query might not be necessary, so we do that lazily.
|
||||
*
|
||||
* queryForLocalExecution should only be set by using SetTaskQueryIfShouldLazyDeparse()
|
||||
*/
|
||||
Query *queryForLocalExecution;
|
||||
|
||||
/*
|
||||
* In almost all cases queryStringLazy should be read only indirectly by
|
||||
* using TaskQueryStringAllPlacements(). This will populate the field if only the
|
||||
* queryForLocalExecution field is not NULL.
|
||||
*
|
||||
* This field should only be set by using SetTaskQueryString() (or as a
|
||||
* side effect from TaskQueryStringAllPlacements()). Otherwise it might not be in sync
|
||||
* with queryForLocalExecution.
|
||||
*/
|
||||
char *queryStringLazy;
|
||||
List *perPlacementQueryStrings;
|
||||
TaskQuery *taskQuery;
|
||||
|
||||
/*
|
||||
* queryStringList contains query strings. They should be
|
||||
|
|
Loading…
Reference in New Issue