mirror of https://github.com/citusdata/citus.git
Remove distinction between SQL_TASK and ROUTER_TASK (#3243)
Remove distinction between SQL_TASK and ROUTER_TASKpull/3248/head
commit
cb7105d1dd
|
@ -1161,10 +1161,10 @@ TaskListRequires2PC(List *taskList)
|
||||||
bool
|
bool
|
||||||
ReadOnlyTask(TaskType taskType)
|
ReadOnlyTask(TaskType taskType)
|
||||||
{
|
{
|
||||||
if (taskType == ROUTER_TASK || taskType == SQL_TASK)
|
if (taskType == SELECT_TASK)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* TODO: We currently do not execute modifying CTEs via ROUTER_TASK/SQL_TASK.
|
* TODO: We currently do not execute modifying CTEs via SELECT_TASK.
|
||||||
* When we implement it, we should either not use the mentioned task types for
|
* When we implement it, we should either not use the mentioned task types for
|
||||||
* modifying CTEs detect them here.
|
* modifying CTEs detect them here.
|
||||||
*/
|
*/
|
||||||
|
@ -1606,8 +1606,7 @@ ExecutionOrderForTask(RowModifyLevel modLevel, Task *task)
|
||||||
{
|
{
|
||||||
switch (task->taskType)
|
switch (task->taskType)
|
||||||
{
|
{
|
||||||
case SQL_TASK:
|
case SELECT_TASK:
|
||||||
case ROUTER_TASK:
|
|
||||||
{
|
{
|
||||||
return EXECUTION_ORDER_ANY;
|
return EXECUTION_ORDER_ANY;
|
||||||
}
|
}
|
||||||
|
|
|
@ -639,7 +639,7 @@ TopLevelTask(Task *task)
|
||||||
* SQL tasks can only appear at the top level in our query tree. Further, no
|
* SQL tasks can only appear at the top level in our query tree. Further, no
|
||||||
* other task type can appear at the top level in our tree.
|
* other task type can appear at the top level in our tree.
|
||||||
*/
|
*/
|
||||||
if (task->taskType == SQL_TASK)
|
if (task->taskType == SELECT_TASK)
|
||||||
{
|
{
|
||||||
topLevelTask = true;
|
topLevelTask = true;
|
||||||
}
|
}
|
||||||
|
@ -1067,7 +1067,7 @@ ManageTaskExecution(TaskTracker *taskTracker, TaskTracker *sourceTaskTracker,
|
||||||
* We finally queue this task for execution. Note that we queue sql and
|
* We finally queue this task for execution. Note that we queue sql and
|
||||||
* other tasks slightly differently.
|
* other tasks slightly differently.
|
||||||
*/
|
*/
|
||||||
if (taskType == SQL_TASK)
|
if (taskType == SELECT_TASK)
|
||||||
{
|
{
|
||||||
TrackerQueueSqlTask(taskTracker, task);
|
TrackerQueueSqlTask(taskTracker, task);
|
||||||
}
|
}
|
||||||
|
@ -1257,7 +1257,7 @@ ManageTransmitExecution(TaskTracker *transmitTracker,
|
||||||
TransmitExecStatus *transmitStatusArray = taskExecution->transmitStatusArray;
|
TransmitExecStatus *transmitStatusArray = taskExecution->transmitStatusArray;
|
||||||
TransmitExecStatus currentTransmitStatus = transmitStatusArray[currentNodeIndex];
|
TransmitExecStatus currentTransmitStatus = transmitStatusArray[currentNodeIndex];
|
||||||
TransmitExecStatus nextTransmitStatus = EXEC_TRANSMIT_INVALID_FIRST;
|
TransmitExecStatus nextTransmitStatus = EXEC_TRANSMIT_INVALID_FIRST;
|
||||||
Assert(task->taskType == SQL_TASK);
|
Assert(task->taskType == SELECT_TASK);
|
||||||
|
|
||||||
switch (currentTransmitStatus)
|
switch (currentTransmitStatus)
|
||||||
{
|
{
|
||||||
|
@ -1857,7 +1857,7 @@ ConstrainedNonMergeTaskList(List *taskAndExecutionList, Task *task)
|
||||||
List *dependentTaskList = NIL;
|
List *dependentTaskList = NIL;
|
||||||
|
|
||||||
TaskType taskType = task->taskType;
|
TaskType taskType = task->taskType;
|
||||||
if (taskType == SQL_TASK || taskType == MAP_TASK)
|
if (taskType == SELECT_TASK || taskType == MAP_TASK)
|
||||||
{
|
{
|
||||||
upstreamTask = task;
|
upstreamTask = task;
|
||||||
dependentTaskList = upstreamTask->dependentTaskList;
|
dependentTaskList = upstreamTask->dependentTaskList;
|
||||||
|
@ -1935,7 +1935,7 @@ ConstrainedMergeTaskList(List *taskAndExecutionList, Task *task)
|
||||||
* given task is a SQL or map task, we simply need to find its merge task
|
* given task is a SQL or map task, we simply need to find its merge task
|
||||||
* dependencies -- if any.
|
* dependencies -- if any.
|
||||||
*/
|
*/
|
||||||
if (taskType == SQL_TASK || taskType == MAP_TASK)
|
if (taskType == SELECT_TASK || taskType == MAP_TASK)
|
||||||
{
|
{
|
||||||
constrainedMergeTaskList = MergeTaskList(task->dependentTaskList);
|
constrainedMergeTaskList = MergeTaskList(task->dependentTaskList);
|
||||||
}
|
}
|
||||||
|
@ -2017,7 +2017,7 @@ ReassignTaskList(List *taskList)
|
||||||
TaskExecution *taskExecution = task->taskExecution;
|
TaskExecution *taskExecution = task->taskExecution;
|
||||||
|
|
||||||
bool transmitCompleted = TransmitExecutionCompleted(taskExecution);
|
bool transmitCompleted = TransmitExecutionCompleted(taskExecution);
|
||||||
if ((task->taskType == SQL_TASK) && transmitCompleted)
|
if ((task->taskType == SELECT_TASK) && transmitCompleted)
|
||||||
{
|
{
|
||||||
completedTaskList = lappend(completedTaskList, task);
|
completedTaskList = lappend(completedTaskList, task);
|
||||||
}
|
}
|
||||||
|
|
|
@ -370,7 +370,7 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam)
|
||||||
pg_get_query_def(query, queryString);
|
pg_get_query_def(query, queryString);
|
||||||
|
|
||||||
task = CitusMakeNode(Task);
|
task = CitusMakeNode(Task);
|
||||||
task->taskType = SQL_TASK;
|
task->taskType = SELECT_TASK;
|
||||||
task->queryString = queryString->data;
|
task->queryString = queryString->data;
|
||||||
task->taskPlacementList = placementList;
|
task->taskPlacementList = placementList;
|
||||||
task->anchorShardId = shardInterval->shardId;
|
task->anchorShardId = shardInterval->shardId;
|
||||||
|
|
|
@ -2044,7 +2044,7 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
|
||||||
sqlTaskList = QueryPushdownSqlTaskList(job->jobQuery, job->jobId,
|
sqlTaskList = QueryPushdownSqlTaskList(job->jobQuery, job->jobId,
|
||||||
plannerRestrictionContext->
|
plannerRestrictionContext->
|
||||||
relationRestrictionContext,
|
relationRestrictionContext,
|
||||||
prunedRelationShardList, SQL_TASK,
|
prunedRelationShardList, SELECT_TASK,
|
||||||
false);
|
false);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -2441,7 +2441,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
||||||
Task *subqueryTask = CreateBasicTask(jobId, taskId, taskType, NULL);
|
Task *subqueryTask = CreateBasicTask(jobId, taskId, taskType, NULL);
|
||||||
|
|
||||||
if ((taskType == MODIFY_TASK && !modifyRequiresMasterEvaluation) ||
|
if ((taskType == MODIFY_TASK && !modifyRequiresMasterEvaluation) ||
|
||||||
taskType == SQL_TASK)
|
taskType == SELECT_TASK)
|
||||||
{
|
{
|
||||||
pg_get_query_def(taskQuery, queryString);
|
pg_get_query_def(taskQuery, queryString);
|
||||||
ereport(DEBUG4, (errmsg("distributed statement: %s",
|
ereport(DEBUG4, (errmsg("distributed statement: %s",
|
||||||
|
@ -2704,7 +2704,7 @@ SqlTaskList(Job *job)
|
||||||
StringInfo sqlQueryString = makeStringInfo();
|
StringInfo sqlQueryString = makeStringInfo();
|
||||||
pg_get_query_def(taskQuery, sqlQueryString);
|
pg_get_query_def(taskQuery, sqlQueryString);
|
||||||
|
|
||||||
Task *sqlTask = CreateBasicTask(jobId, taskIdIndex, SQL_TASK,
|
Task *sqlTask = CreateBasicTask(jobId, taskIdIndex, SELECT_TASK,
|
||||||
sqlQueryString->data);
|
sqlQueryString->data);
|
||||||
sqlTask->dependentTaskList = dataFetchTaskList;
|
sqlTask->dependentTaskList = dataFetchTaskList;
|
||||||
sqlTask->relationShardList = BuildRelationShardList(fragmentRangeTableList,
|
sqlTask->relationShardList = BuildRelationShardList(fragmentRangeTableList,
|
||||||
|
@ -5404,7 +5404,7 @@ AssignDataFetchDependencies(List *taskList)
|
||||||
ListCell *dependentTaskCell = NULL;
|
ListCell *dependentTaskCell = NULL;
|
||||||
|
|
||||||
Assert(task->taskPlacementList != NIL);
|
Assert(task->taskPlacementList != NIL);
|
||||||
Assert(task->taskType == SQL_TASK || task->taskType == MERGE_TASK);
|
Assert(task->taskType == SELECT_TASK || task->taskType == MERGE_TASK);
|
||||||
|
|
||||||
foreach(dependentTaskCell, dependentTaskList)
|
foreach(dependentTaskCell, dependentTaskList)
|
||||||
{
|
{
|
||||||
|
|
|
@ -1807,7 +1807,7 @@ SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||||
List *placementList,
|
List *placementList,
|
||||||
uint64 shardId)
|
uint64 shardId)
|
||||||
{
|
{
|
||||||
Task *task = CreateTask(ROUTER_TASK);
|
Task *task = CreateTask(SELECT_TASK);
|
||||||
StringInfo queryString = makeStringInfo();
|
StringInfo queryString = makeStringInfo();
|
||||||
List *relationRowLockList = NIL;
|
List *relationRowLockList = NIL;
|
||||||
|
|
||||||
|
|
|
@ -287,7 +287,7 @@ RecordParallelRelationAccessForTaskList(List *taskList)
|
||||||
*/
|
*/
|
||||||
Task *firstTask = linitial(taskList);
|
Task *firstTask = linitial(taskList);
|
||||||
|
|
||||||
if (firstTask->taskType == SQL_TASK)
|
if (firstTask->taskType == SELECT_TASK)
|
||||||
{
|
{
|
||||||
RecordRelationParallelSelectAccessForTask(firstTask);
|
RecordRelationParallelSelectAccessForTask(firstTask);
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,16 +77,15 @@ typedef enum
|
||||||
/* Enumeration that defines different task types */
|
/* Enumeration that defines different task types */
|
||||||
typedef enum
|
typedef enum
|
||||||
{
|
{
|
||||||
TASK_TYPE_INVALID_FIRST = 0,
|
TASK_TYPE_INVALID_FIRST,
|
||||||
SQL_TASK = 1,
|
SELECT_TASK,
|
||||||
MAP_TASK = 2,
|
MAP_TASK,
|
||||||
MERGE_TASK = 3,
|
MERGE_TASK,
|
||||||
MAP_OUTPUT_FETCH_TASK = 4,
|
MAP_OUTPUT_FETCH_TASK,
|
||||||
MERGE_FETCH_TASK = 5,
|
MERGE_FETCH_TASK,
|
||||||
MODIFY_TASK = 6,
|
MODIFY_TASK,
|
||||||
ROUTER_TASK = 7,
|
DDL_TASK,
|
||||||
DDL_TASK = 8,
|
VACUUM_ANALYZE_TASK
|
||||||
VACUUM_ANALYZE_TASK = 9
|
|
||||||
} TaskType;
|
} TaskType;
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue