use taskQuery as a struct to simplify the code

pull/3659/head
SaitTalhaNisanci 2020-03-31 12:16:32 +03:00
parent 8806c4d697
commit b5591b1b28
9 changed files with 45 additions and 70 deletions

View File

@ -227,6 +227,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
}
else
{
/* avoid the overhead of deparsing when using local execution */
shardQueryString = "<optimized out by local execution>";
}

View File

@ -1606,9 +1606,8 @@ TrackerQueueTask(TaskTracker *taskTracker, Task *task)
HTAB *taskStateHash = taskTracker->taskStateHash;
/* wrap a task assignment query outside the original query */
StringInfo taskAssignmentQuery = TaskAssignmentQuery(task,
TaskQueryStringForAllPlacements(
task));
StringInfo taskAssignmentQuery =
TaskAssignmentQuery(task, TaskQueryStringForAllPlacements(task));
TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId,
task->taskId);

View File

@ -42,7 +42,6 @@ static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte);
static bool ShouldLazyDeparseQuery(Task *task);
static char * DeparseTaskQuery(Task *task, Query *query);
static bool IsEachPlacementQueryStringDifferent(Task *task);
static void InitializeTaskQueryIfNecessary(Task *task);
/*
@ -427,9 +426,8 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query)
{
if (ShouldLazyDeparseQuery(task))
{
InitializeTaskQueryIfNecessary(task);
task->taskQuery->queryType = TASK_QUERY_OBJECT;
task->taskQuery->data.jobQueryReferenceForLazyDeparsing = query;
task->taskQuery.queryType = TASK_QUERY_OBJECT;
task->taskQuery.data.jobQueryReferenceForLazyDeparsing = query;
return;
}
@ -445,26 +443,8 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query)
void
SetTaskQueryString(Task *task, char *queryString)
{
InitializeTaskQueryIfNecessary(task);
task->taskQuery->queryType = TASK_QUERY_TEXT;
task->taskQuery->data.queryStringLazy = queryString;
}
/*
* InitializeTaskQueryIfNecessary initializes task query if it
* is not yet allocated.
*/
static void
InitializeTaskQueryIfNecessary(Task *task)
{
if (task->taskQuery == NULL)
{
MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext(
task));
task->taskQuery = CitusMakeNode(TaskQuery);
MemoryContextSwitchTo(previousContext);
}
task->taskQuery.queryType = TASK_QUERY_TEXT;
task->taskQuery.data.queryStringLazy = queryString;
}
@ -474,10 +454,9 @@ InitializeTaskQueryIfNecessary(Task *task)
void
SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList)
{
InitializeTaskQueryIfNecessary(task);
Assert(perPlacementQueryStringList != NIL);
task->taskQuery->queryType = TASK_QUERY_TEXT_PER_PLACEMENT;
task->taskQuery->data.perPlacementQueryStrings = perPlacementQueryStringList;
task->taskQuery.queryType = TASK_QUERY_TEXT_PER_PLACEMENT;
task->taskQuery.data.perPlacementQueryStrings = perPlacementQueryStringList;
}
@ -487,9 +466,8 @@ SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList)
void
SetTaskQueryStringList(Task *task, List *queryStringList)
{
InitializeTaskQueryIfNecessary(task);
task->taskQuery->queryType = TASK_QUERY_TEXT_LIST;
task->taskQuery->data.queryStringList = queryStringList;
task->taskQuery.queryType = TASK_QUERY_TEXT_LIST;
task->taskQuery.data.queryStringList = queryStringList;
}
@ -528,7 +506,7 @@ DeparseTaskQuery(Task *task, Query *query)
int
GetTaskQueryType(Task *task)
{
return task->taskQuery->queryType;
return task->taskQuery.queryType;
}
@ -544,15 +522,15 @@ TaskQueryStringForAllPlacements(Task *task)
{
if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST)
{
return StringJoin(task->taskQuery->data.queryStringList, ';');
return StringJoin(task->taskQuery.data.queryStringList, ';');
}
if (GetTaskQueryType(task) == TASK_QUERY_TEXT)
{
return task->taskQuery->data.queryStringLazy;
return task->taskQuery.data.queryStringLazy;
}
Query *jobQueryReferenceForLazyDeparsing =
task->taskQuery->data.jobQueryReferenceForLazyDeparsing;
Assert(task->taskQuery->queryType == TASK_QUERY_OBJECT &&
task->taskQuery.data.jobQueryReferenceForLazyDeparsing;
Assert(task->taskQuery.queryType == TASK_QUERY_OBJECT &&
jobQueryReferenceForLazyDeparsing != NULL);
@ -569,7 +547,7 @@ TaskQueryStringForAllPlacements(Task *task)
char *queryString = DeparseTaskQuery(task, jobQueryReferenceForLazyDeparsing);
MemoryContextSwitchTo(previousContext);
SetTaskQueryString(task, queryString);
return task->taskQuery->data.queryStringLazy;
return task->taskQuery.data.queryStringLazy;
}
@ -583,7 +561,7 @@ TaskQueryStringForPlacement(Task *task, int placementIndex)
if (IsEachPlacementQueryStringDifferent(task))
{
List *perPlacementQueryStringList =
task->taskQuery->data.perPlacementQueryStrings;
task->taskQuery.data.perPlacementQueryStrings;
Assert(list_length(perPlacementQueryStringList) > placementIndex);
return list_nth(perPlacementQueryStringList, placementIndex);
}

View File

@ -73,6 +73,7 @@ CitusSetTag(Node *node, int tag)
} \
while (0)
static void CopyTaskQuery(Task *newnode, Task *from);
static void
copyJobInfo(Job *newnode, Job *from)
@ -250,35 +251,33 @@ CopyNodeRelationRowLock(COPYFUNC_ARGS)
}
void
CopyNodeTaskQuery(COPYFUNC_ARGS)
static void
CopyTaskQuery(Task *newnode, Task *from)
{
DECLARE_FROM_AND_NEW_NODE(TaskQuery);
COPY_SCALAR_FIELD(queryType);
switch (from->queryType)
COPY_SCALAR_FIELD(taskQuery.queryType);
switch (from->taskQuery.queryType)
{
case TASK_QUERY_TEXT:
{
COPY_STRING_FIELD(data.queryStringLazy);
COPY_STRING_FIELD(taskQuery.data.queryStringLazy);
break;
}
case TASK_QUERY_OBJECT:
{
COPY_NODE_FIELD(data.jobQueryReferenceForLazyDeparsing);
COPY_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing);
break;
}
case TASK_QUERY_TEXT_PER_PLACEMENT:
{
COPY_NODE_FIELD(data.perPlacementQueryStrings);
COPY_NODE_FIELD(taskQuery.data.perPlacementQueryStrings);
break;
}
case TASK_QUERY_TEXT_LIST:
{
COPY_NODE_FIELD(data.queryStringList);
COPY_NODE_FIELD(taskQuery.data.queryStringList);
break;
}
@ -298,7 +297,7 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_SCALAR_FIELD(taskType);
COPY_SCALAR_FIELD(jobId);
COPY_SCALAR_FIELD(taskId);
COPY_NODE_FIELD(taskQuery);
CopyTaskQuery(newnode, from);
COPY_SCALAR_FIELD(anchorDistributedTableId);
COPY_SCALAR_FIELD(anchorShardId);
COPY_NODE_FIELD(taskPlacementList);

View File

@ -44,8 +44,7 @@ static const char *CitusNodeTagNamesD[] = {
"RelationShard",
"RelationRowLock",
"DeferredErrorMessage",
"GroupShardPlacement",
"TaskQuery",
"GroupShardPlacement"
};
const char **CitusNodeTagNames = CitusNodeTagNamesD;
@ -402,7 +401,6 @@ const ExtensibleNodeMethods nodeMethods[] =
DEFINE_NODE_METHODS(TaskExecution),
DEFINE_NODE_METHODS(DeferredErrorMessage),
DEFINE_NODE_METHODS(GroupShardPlacement),
DEFINE_NODE_METHODS(TaskQuery),
/* nodes with only output support */
DEFINE_NODE_METHODS_NO_READ(MultiNode),

View File

@ -135,7 +135,7 @@
#define booltostr(x) ((x) ? "true" : "false")
static void WriteTaskQuery(OUTFUNC_ARGS);
/*****************************************************************************
* Output routines for Citus node types
@ -469,35 +469,34 @@ OutRelationRowLock(OUTFUNC_ARGS)
WRITE_ENUM_FIELD(rowLockStrength, LockClauseStrength);
}
void OutTaskQuery(OUTFUNC_ARGS) {
WRITE_LOCALS(TaskQuery);
WRITE_NODE_TYPE("TASKQUERY");
static void WriteTaskQuery(OUTFUNC_ARGS) {
WRITE_LOCALS(Task);
WRITE_ENUM_FIELD(queryType, TaskQueryType);
WRITE_ENUM_FIELD(taskQuery.queryType, TaskQueryType);
switch (node->queryType)
switch (node->taskQuery.queryType)
{
case TASK_QUERY_TEXT:
{
WRITE_STRING_FIELD(data.queryStringLazy);
WRITE_STRING_FIELD(taskQuery.data.queryStringLazy);
break;
}
case TASK_QUERY_OBJECT:
{
WRITE_NODE_FIELD(data.jobQueryReferenceForLazyDeparsing);
WRITE_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing);
break;
}
case TASK_QUERY_TEXT_PER_PLACEMENT:
{
WRITE_NODE_FIELD(data.perPlacementQueryStrings);
WRITE_NODE_FIELD(taskQuery.data.perPlacementQueryStrings);
break;
}
case TASK_QUERY_TEXT_LIST:
{
WRITE_NODE_FIELD(data.queryStringList);
WRITE_NODE_FIELD(taskQuery.data.queryStringList);
break;
}
@ -517,7 +516,7 @@ OutTask(OUTFUNC_ARGS)
WRITE_ENUM_FIELD(taskType, TaskType);
WRITE_UINT64_FIELD(jobId);
WRITE_UINT_FIELD(taskId);
WRITE_NODE_FIELD(taskQuery);
WriteTaskQuery(str, raw_node);
WRITE_OID_FIELD(anchorDistributedTableId);
WRITE_UINT64_FIELD(anchorShardId);
WRITE_NODE_FIELD(taskPlacementList);

View File

@ -47,7 +47,6 @@ extern void OutMapMergeJob(OUTFUNC_ARGS);
extern void OutShardPlacement(OUTFUNC_ARGS);
extern void OutRelationShard(OUTFUNC_ARGS);
extern void OutRelationRowLock(OUTFUNC_ARGS);
extern void OutTaskQuery(OUTFUNC_ARGS);
extern void OutTask(OUTFUNC_ARGS);
extern void OutLocalPlannedStatement(OUTFUNC_ARGS);
extern void OutTaskExecution(OUTFUNC_ARGS);

View File

@ -66,8 +66,7 @@ typedef enum CitusNodeTag
T_RelationShard,
T_RelationRowLock,
T_DeferredErrorMessage,
T_GroupShardPlacement,
T_TaskQuery
T_GroupShardPlacement
} CitusNodeTag;

View File

@ -213,7 +213,6 @@ typedef enum TaskQueryType
typedef struct TaskQuery
{
CitusNode type;
TaskQueryType queryType;
union
@ -269,7 +268,11 @@ typedef struct Task
uint64 jobId;
uint32 taskId;
TaskQuery *taskQuery;
/*
* taskQuery contains query string information. The way we get queryString can be different
* so this is abstracted with taskQuery.
*/
TaskQuery taskQuery;
Oid anchorDistributedTableId; /* only applies to insert tasks */
uint64 anchorShardId; /* only applies to compute tasks */