Remove distinction between SQL_TASK and ROUTER_TASK

pull/3243/head
Marco Slot 2019-11-28 05:59:29 +01:00
parent aeec3d1544
commit 16d1ad3666
7 changed files with 25 additions and 27 deletions

View File

@ -1161,10 +1161,10 @@ TaskListRequires2PC(List *taskList)
bool bool
ReadOnlyTask(TaskType taskType) ReadOnlyTask(TaskType taskType)
{ {
if (taskType == ROUTER_TASK || taskType == SQL_TASK) if (taskType == SELECT_TASK)
{ {
/* /*
* TODO: We currently do not execute modifying CTEs via ROUTER_TASK/SQL_TASK. * TODO: We currently do not execute modifying CTEs via SELECT_TASK.
* When we implement it, we should either not use the mentioned task types for * When we implement it, we should either not use the mentioned task types for
* modifying CTEs detect them here. * modifying CTEs detect them here.
*/ */
@ -1606,8 +1606,7 @@ ExecutionOrderForTask(RowModifyLevel modLevel, Task *task)
{ {
switch (task->taskType) switch (task->taskType)
{ {
case SQL_TASK: case SELECT_TASK:
case ROUTER_TASK:
{ {
return EXECUTION_ORDER_ANY; return EXECUTION_ORDER_ANY;
} }

View File

@ -639,7 +639,7 @@ TopLevelTask(Task *task)
* SQL tasks can only appear at the top level in our query tree. Further, no * SQL tasks can only appear at the top level in our query tree. Further, no
* other task type can appear at the top level in our tree. * other task type can appear at the top level in our tree.
*/ */
if (task->taskType == SQL_TASK) if (task->taskType == SELECT_TASK)
{ {
topLevelTask = true; topLevelTask = true;
} }
@ -1067,7 +1067,7 @@ ManageTaskExecution(TaskTracker *taskTracker, TaskTracker *sourceTaskTracker,
* We finally queue this task for execution. Note that we queue sql and * We finally queue this task for execution. Note that we queue sql and
* other tasks slightly differently. * other tasks slightly differently.
*/ */
if (taskType == SQL_TASK) if (taskType == SELECT_TASK)
{ {
TrackerQueueSqlTask(taskTracker, task); TrackerQueueSqlTask(taskTracker, task);
} }
@ -1257,7 +1257,7 @@ ManageTransmitExecution(TaskTracker *transmitTracker,
TransmitExecStatus *transmitStatusArray = taskExecution->transmitStatusArray; TransmitExecStatus *transmitStatusArray = taskExecution->transmitStatusArray;
TransmitExecStatus currentTransmitStatus = transmitStatusArray[currentNodeIndex]; TransmitExecStatus currentTransmitStatus = transmitStatusArray[currentNodeIndex];
TransmitExecStatus nextTransmitStatus = EXEC_TRANSMIT_INVALID_FIRST; TransmitExecStatus nextTransmitStatus = EXEC_TRANSMIT_INVALID_FIRST;
Assert(task->taskType == SQL_TASK); Assert(task->taskType == SELECT_TASK);
switch (currentTransmitStatus) switch (currentTransmitStatus)
{ {
@ -1857,7 +1857,7 @@ ConstrainedNonMergeTaskList(List *taskAndExecutionList, Task *task)
List *dependentTaskList = NIL; List *dependentTaskList = NIL;
TaskType taskType = task->taskType; TaskType taskType = task->taskType;
if (taskType == SQL_TASK || taskType == MAP_TASK) if (taskType == SELECT_TASK || taskType == MAP_TASK)
{ {
upstreamTask = task; upstreamTask = task;
dependentTaskList = upstreamTask->dependentTaskList; dependentTaskList = upstreamTask->dependentTaskList;
@ -1935,7 +1935,7 @@ ConstrainedMergeTaskList(List *taskAndExecutionList, Task *task)
* given task is a SQL or map task, we simply need to find its merge task * given task is a SQL or map task, we simply need to find its merge task
* dependencies -- if any. * dependencies -- if any.
*/ */
if (taskType == SQL_TASK || taskType == MAP_TASK) if (taskType == SELECT_TASK || taskType == MAP_TASK)
{ {
constrainedMergeTaskList = MergeTaskList(task->dependentTaskList); constrainedMergeTaskList = MergeTaskList(task->dependentTaskList);
} }
@ -2017,7 +2017,7 @@ ReassignTaskList(List *taskList)
TaskExecution *taskExecution = task->taskExecution; TaskExecution *taskExecution = task->taskExecution;
bool transmitCompleted = TransmitExecutionCompleted(taskExecution); bool transmitCompleted = TransmitExecutionCompleted(taskExecution);
if ((task->taskType == SQL_TASK) && transmitCompleted) if ((task->taskType == SELECT_TASK) && transmitCompleted)
{ {
completedTaskList = lappend(completedTaskList, task); completedTaskList = lappend(completedTaskList, task);
} }

View File

@ -370,7 +370,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
pg_get_query_def(query, queryString); pg_get_query_def(query, queryString);
task = CitusMakeNode(Task); task = CitusMakeNode(Task);
task->taskType = SQL_TASK; task->taskType = SELECT_TASK;
task->queryString = queryString->data; task->queryString = queryString->data;
task->taskPlacementList = placementList; task->taskPlacementList = placementList;
task->anchorShardId = shardInterval->shardId; task->anchorShardId = shardInterval->shardId;

View File

@ -2044,7 +2044,7 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
sqlTaskList = QueryPushdownSqlTaskList(job->jobQuery, job->jobId, sqlTaskList = QueryPushdownSqlTaskList(job->jobQuery, job->jobId,
plannerRestrictionContext-> plannerRestrictionContext->
relationRestrictionContext, relationRestrictionContext,
prunedRelationShardList, SQL_TASK, prunedRelationShardList, SELECT_TASK,
false); false);
} }
else else
@ -2441,7 +2441,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
Task *subqueryTask = CreateBasicTask(jobId, taskId, taskType, NULL); Task *subqueryTask = CreateBasicTask(jobId, taskId, taskType, NULL);
if ((taskType == MODIFY_TASK && !modifyRequiresMasterEvaluation) || if ((taskType == MODIFY_TASK && !modifyRequiresMasterEvaluation) ||
taskType == SQL_TASK) taskType == SELECT_TASK)
{ {
pg_get_query_def(taskQuery, queryString); pg_get_query_def(taskQuery, queryString);
ereport(DEBUG4, (errmsg("distributed statement: %s", ereport(DEBUG4, (errmsg("distributed statement: %s",
@ -2704,7 +2704,7 @@ SqlTaskList(Job *job)
StringInfo sqlQueryString = makeStringInfo(); StringInfo sqlQueryString = makeStringInfo();
pg_get_query_def(taskQuery, sqlQueryString); pg_get_query_def(taskQuery, sqlQueryString);
Task *sqlTask = CreateBasicTask(jobId, taskIdIndex, SQL_TASK, Task *sqlTask = CreateBasicTask(jobId, taskIdIndex, SELECT_TASK,
sqlQueryString->data); sqlQueryString->data);
sqlTask->dependentTaskList = dataFetchTaskList; sqlTask->dependentTaskList = dataFetchTaskList;
sqlTask->relationShardList = BuildRelationShardList(fragmentRangeTableList, sqlTask->relationShardList = BuildRelationShardList(fragmentRangeTableList,
@ -5404,7 +5404,7 @@ AssignDataFetchDependencies(List *taskList)
ListCell *dependentTaskCell = NULL; ListCell *dependentTaskCell = NULL;
Assert(task->taskPlacementList != NIL); Assert(task->taskPlacementList != NIL);
Assert(task->taskType == SQL_TASK || task->taskType == MERGE_TASK); Assert(task->taskType == SELECT_TASK || task->taskType == MERGE_TASK);
foreach(dependentTaskCell, dependentTaskList) foreach(dependentTaskCell, dependentTaskList)
{ {

View File

@ -1807,7 +1807,7 @@ SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList,
List *placementList, List *placementList,
uint64 shardId) uint64 shardId)
{ {
Task *task = CreateTask(ROUTER_TASK); Task *task = CreateTask(SELECT_TASK);
StringInfo queryString = makeStringInfo(); StringInfo queryString = makeStringInfo();
List *relationRowLockList = NIL; List *relationRowLockList = NIL;

View File

@ -287,7 +287,7 @@ RecordParallelRelationAccessForTaskList(List *taskList)
*/ */
Task *firstTask = linitial(taskList); Task *firstTask = linitial(taskList);
if (firstTask->taskType == SQL_TASK) if (firstTask->taskType == SELECT_TASK)
{ {
RecordRelationParallelSelectAccessForTask(firstTask); RecordRelationParallelSelectAccessForTask(firstTask);
} }

View File

@ -77,16 +77,15 @@ typedef enum
/* Enumeration that defines different task types */ /* Enumeration that defines different task types */
typedef enum typedef enum
{ {
TASK_TYPE_INVALID_FIRST = 0, TASK_TYPE_INVALID_FIRST,
SQL_TASK = 1, SELECT_TASK,
MAP_TASK = 2, MAP_TASK,
MERGE_TASK = 3, MERGE_TASK,
MAP_OUTPUT_FETCH_TASK = 4, MAP_OUTPUT_FETCH_TASK,
MERGE_FETCH_TASK = 5, MERGE_FETCH_TASK,
MODIFY_TASK = 6, MODIFY_TASK,
ROUTER_TASK = 7, DDL_TASK,
DDL_TASK = 8, VACUUM_ANALYZE_TASK
VACUUM_ANALYZE_TASK = 9
} TaskType; } TaskType;