Stray depended to dependent tidy up

pull/3258/head
Philip Dubé 2019-12-02 22:19:23 +00:00
parent 0b71697d88
commit 5fcc169a3a
6 changed files with 42 additions and 42 deletions

View File

@ -254,9 +254,9 @@ ExplainJob(Job *job, ExplainState *es)
} }
else else
{ {
ExplainOpenGroup("Depended Jobs", "Depended Jobs", false, es); ExplainOpenGroup("Dependent Jobs", "Dependent Jobs", false, es);
/* show explain output for depended jobs, if any */ /* show explain output for dependent jobs, if any */
foreach(dependentJobCell, dependentJobList) foreach(dependentJobCell, dependentJobList)
{ {
Job *dependentJob = (Job *) lfirst(dependentJobCell); Job *dependentJob = (Job *) lfirst(dependentJobCell);
@ -267,7 +267,7 @@ ExplainJob(Job *job, ExplainState *es)
} }
} }
ExplainCloseGroup("Depended Jobs", "Depended Jobs", false, es); ExplainCloseGroup("Dependent Jobs", "Dependent Jobs", false, es);
} }
ExplainCloseGroup("Job", "Job", true, es); ExplainCloseGroup("Job", "Job", true, es);
@ -302,7 +302,7 @@ ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es)
if (dependentJobCount > 0) if (dependentJobCount > 0)
{ {
ExplainOpenGroup("Depended Jobs", "Depended Jobs", false, es); ExplainOpenGroup("Dependent Jobs", "Dependent Jobs", false, es);
foreach(dependentJobCell, dependentJobList) foreach(dependentJobCell, dependentJobList)
{ {
@ -314,7 +314,7 @@ ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es)
} }
} }
ExplainCloseGroup("Depended Jobs", "Depended Jobs", false, es); ExplainCloseGroup("Dependent Jobs", "Dependent Jobs", false, es);
} }
ExplainCloseGroup("MapMergeJob", NULL, true, es); ExplainCloseGroup("MapMergeJob", NULL, true, es);

View File

@ -188,7 +188,7 @@ static int CompareTasksByShardId(const void *leftElement, const void *rightEleme
static List * ActiveShardPlacementLists(List *taskList); static List * ActiveShardPlacementLists(List *taskList);
static List * ActivePlacementList(List *placementList); static List * ActivePlacementList(List *placementList);
static List * LeftRotateList(List *list, uint32 rotateCount); static List * LeftRotateList(List *list, uint32 rotateCount);
static List * FindDependedMergeTaskList(Task *sqlTask); static List * FindDependentMergeTaskList(Task *sqlTask);
static List * AssignDualHashTaskList(List *taskList); static List * AssignDualHashTaskList(List *taskList);
static void AssignDataFetchDependencies(List *taskList); static void AssignDataFetchDependencies(List *taskList);
static uint32 TaskListHighestTaskId(List *taskList); static uint32 TaskListHighestTaskId(List *taskList);
@ -218,7 +218,7 @@ DistributedPlan *
CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree, CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree,
PlannerRestrictionContext *plannerRestrictionContext) PlannerRestrictionContext *plannerRestrictionContext)
{ {
/* build the worker job tree and check that we only one job in the tree */ /* build the worker job tree and check that we only have one job in the tree */
Job *workerJob = BuildJobTree(multiTree); Job *workerJob = BuildJobTree(multiTree);
/* create the tree of executable tasks for the worker job */ /* create the tree of executable tasks for the worker job */
@ -384,7 +384,7 @@ BuildJobTree(MultiTreeRoot *multiTree)
baseRelationId, baseRelationId,
JOIN_MAP_MERGE_JOB); JOIN_MAP_MERGE_JOB);
/* reset depended job list */ /* reset dependent job list */
loopDependentJobList = NIL; loopDependentJobList = NIL;
loopDependentJobList = list_make1(mapMergeJob); loopDependentJobList = list_make1(mapMergeJob);
} }
@ -405,7 +405,7 @@ BuildJobTree(MultiTreeRoot *multiTree)
baseRelationId, baseRelationId,
JOIN_MAP_MERGE_JOB); JOIN_MAP_MERGE_JOB);
/* append to the depended job list for on-going dependencies */ /* append to the dependent job list for on-going dependencies */
loopDependentJobList = lappend(loopDependentJobList, mapMergeJob); loopDependentJobList = lappend(loopDependentJobList, mapMergeJob);
} }
} }
@ -429,7 +429,7 @@ BuildJobTree(MultiTreeRoot *multiTree)
list_make1(mapMergeJob)); list_make1(mapMergeJob));
mapMergeJob->reduceQuery = reduceQuery; mapMergeJob->reduceQuery = reduceQuery;
/* reset depended job list */ /* reset dependent job list */
loopDependentJobList = NIL; loopDependentJobList = NIL;
loopDependentJobList = list_make1(mapMergeJob); loopDependentJobList = list_make1(mapMergeJob);
} }
@ -566,7 +566,7 @@ FindTableNode(MultiNode *multiNode, int rangeTableId)
/* /*
* BuildJobQuery traverses the given logical plan tree, determines the job that * BuildJobQuery traverses the given logical plan tree, determines the job that
* corresponds to this part of the tree, and builds the query structure for that * corresponds to this part of the tree, and builds the query structure for that
* particular job. The function assumes that jobs, this particular job depends on, * particular job. The function assumes that jobs this particular job depends on
* have already been built, as their output is needed to build the query. * have already been built, as their output is needed to build the query.
*/ */
static Query * static Query *
@ -736,8 +736,8 @@ BuildReduceQuery(MultiExtendedOp *extendedOpNode, List *dependentJobList)
List *columnNameList = NIL; List *columnNameList = NIL;
Job *dependentJob = linitial(dependentJobList); Job *dependentJob = linitial(dependentJobList);
List *dependedTargetList = dependentJob->jobQuery->targetList; List *dependentTargetList = dependentJob->jobQuery->targetList;
uint32 columnCount = (uint32) list_length(dependedTargetList); uint32 columnCount = (uint32) list_length(dependentTargetList);
for (uint32 columnIndex = 0; columnIndex < columnCount; columnIndex++) for (uint32 columnIndex = 0; columnIndex < columnCount; columnIndex++)
{ {
@ -1146,7 +1146,7 @@ QueryJoinTree(MultiNode *multiNode, List *dependentJobList, List **rangeTableLis
/* /*
* PostgreSQL's optimizer may mark left joins as anti-joins, when there * PostgreSQL's optimizer may mark left joins as anti-joins, when there
* is an right-hand-join-key-is-null restriction, but there is no logic * is a right-hand-join-key-is-null restriction, but there is no logic
* in ruleutils to deparse anti-joins, so we cannot construct a task * in ruleutils to deparse anti-joins, so we cannot construct a task
* query containing anti-joins. We therefore translate anti-joins back * query containing anti-joins. We therefore translate anti-joins back
* into left-joins. At some point, we may also want to use different * into left-joins. At some point, we may also want to use different
@ -1211,10 +1211,10 @@ QueryJoinTree(MultiNode *multiNode, List *dependentJobList, List **rangeTableLis
{ {
List *tableIdList = OutputTableIdList(multiNode); List *tableIdList = OutputTableIdList(multiNode);
Job *dependentJob = JobForTableIdList(dependentJobList, tableIdList); Job *dependentJob = JobForTableIdList(dependentJobList, tableIdList);
List *dependedTargetList = dependentJob->jobQuery->targetList; List *dependentTargetList = dependentJob->jobQuery->targetList;
/* compute column names for the derived table */ /* compute column names for the derived table */
uint32 columnCount = (uint32) list_length(dependedTargetList); uint32 columnCount = (uint32) list_length(dependentTargetList);
List *columnNameList = DerivedColumnNameList(columnCount, List *columnNameList = DerivedColumnNameList(columnCount,
dependentJob->jobId); dependentJob->jobId);
@ -1645,7 +1645,7 @@ NewTableId(Index originalTableId, List *rangeTableList)
* built. In this query, the original columnId corresponds to the column in base * built. In this query, the original columnId corresponds to the column in base
* tables. When the current query is a partition job and generates intermediate * tables. When the current query is a partition job and generates intermediate
* tables, the columns have a different order and the new columnId corresponds * tables, the columns have a different order and the new columnId corresponds
* to this order. Please note that this function assumes columnIds for depended * to this order. Please note that this function assumes columnIds for dependent
* jobs have already been updated. * jobs have already been updated.
*/ */
static AttrNumber static AttrNumber
@ -1852,7 +1852,7 @@ UniqueJobId(void)
} }
/* Builds a job from the given job query and depended job list. */ /* Builds a job from the given job query and dependent job list. */
static Job * static Job *
BuildJob(Query *jobQuery, List *dependentJobList) BuildJob(Query *jobQuery, List *dependentJobList)
{ {
@ -1867,7 +1867,7 @@ BuildJob(Query *jobQuery, List *dependentJobList)
/* /*
* BuildMapMergeJob builds a MapMerge job from the given query and depended job * BuildMapMergeJob builds a MapMerge job from the given query and dependent job
* list. The function then copies and updates the logical plan's partition * list. The function then copies and updates the logical plan's partition
* column, and uses the join rule type to determine the physical repartitioning * column, and uses the join rule type to determine the physical repartitioning
* method to apply. * method to apply.
@ -2065,7 +2065,7 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
foreach(assignedSqlTaskCell, assignedSqlTaskList) foreach(assignedSqlTaskCell, assignedSqlTaskList)
{ {
Task *assignedSqlTask = (Task *) lfirst(assignedSqlTaskCell); Task *assignedSqlTask = (Task *) lfirst(assignedSqlTaskCell);
List *assignedMergeTaskList = FindDependedMergeTaskList(assignedSqlTask); List *assignedMergeTaskList = FindDependentMergeTaskList(assignedSqlTask);
AssignDataFetchDependencies(assignedMergeTaskList); AssignDataFetchDependencies(assignedMergeTaskList);
} }
@ -2980,10 +2980,10 @@ RangeTableFragmentsList(List *rangeTableList, List *whereClauseList,
Job *dependentJob = JobForRangeTable(dependentJobList, rangeTableEntry); Job *dependentJob = JobForRangeTable(dependentJobList, rangeTableEntry);
Assert(CitusIsA(dependentJob, MapMergeJob)); Assert(CitusIsA(dependentJob, MapMergeJob));
MapMergeJob *dependedMapMergeJob = (MapMergeJob *) dependentJob; MapMergeJob *dependentMapMergeJob = (MapMergeJob *) dependentJob;
List *mergeTaskList = dependedMapMergeJob->mergeTaskList; List *mergeTaskList = dependentMapMergeJob->mergeTaskList;
/* if there are no tasks for the depended job, just return NIL */ /* if there are no tasks for the dependent job, just return NIL */
if (mergeTaskList == NIL) if (mergeTaskList == NIL)
{ {
return NIL; return NIL;
@ -4624,7 +4624,7 @@ AssignTaskList(List *sqlTaskList)
foreach(sqlTaskCell, sqlTaskList) foreach(sqlTaskCell, sqlTaskList)
{ {
Task *sqlTask = (Task *) lfirst(sqlTaskCell); Task *sqlTask = (Task *) lfirst(sqlTaskCell);
List *mergeTaskList = FindDependedMergeTaskList(sqlTask); List *mergeTaskList = FindDependentMergeTaskList(sqlTask);
Task *firstMergeTask = (Task *) linitial(mergeTaskList); Task *firstMergeTask = (Task *) linitial(mergeTaskList);
if (!firstMergeTask->assignmentConstrained) if (!firstMergeTask->assignmentConstrained)
@ -4648,7 +4648,7 @@ AssignTaskList(List *sqlTaskList)
foreach(primarySqlTaskCell, primarySqlTaskList) foreach(primarySqlTaskCell, primarySqlTaskList)
{ {
Task *sqlTask = (Task *) lfirst(primarySqlTaskCell); Task *sqlTask = (Task *) lfirst(primarySqlTaskCell);
List *mergeTaskList = FindDependedMergeTaskList(sqlTask); List *mergeTaskList = FindDependentMergeTaskList(sqlTask);
ListCell *mergeTaskCell = NULL; ListCell *mergeTaskCell = NULL;
foreach(mergeTaskCell, mergeTaskList) foreach(mergeTaskCell, mergeTaskList)
@ -4674,7 +4674,7 @@ AssignTaskList(List *sqlTaskList)
foreach(constrainedSqlTaskCell, constrainedSqlTaskList) foreach(constrainedSqlTaskCell, constrainedSqlTaskList)
{ {
Task *sqlTask = (Task *) lfirst(constrainedSqlTaskCell); Task *sqlTask = (Task *) lfirst(constrainedSqlTaskCell);
List *mergeTaskList = FindDependedMergeTaskList(sqlTask); List *mergeTaskList = FindDependentMergeTaskList(sqlTask);
List *mergeTaskPlacementList = NIL; List *mergeTaskPlacementList = NIL;
ListCell *mergeTaskCell = NULL; ListCell *mergeTaskCell = NULL;
@ -5285,14 +5285,14 @@ LeftRotateList(List *list, uint32 rotateCount)
/* /*
* FindDependedMergeTaskList walks over the given task's depended task list, * FindDependentMergeTaskList walks over the given task's dependent task list,
* finds the merge tasks in the list, and returns those found tasks in a new * finds the merge tasks in the list, and returns those found tasks in a new
* list. * list.
*/ */
static List * static List *
FindDependedMergeTaskList(Task *sqlTask) FindDependentMergeTaskList(Task *sqlTask)
{ {
List *dependedMergeTaskList = NIL; List *dependentMergeTaskList = NIL;
List *dependentTaskList = sqlTask->dependentTaskList; List *dependentTaskList = sqlTask->dependentTaskList;
ListCell *dependentTaskCell = NULL; ListCell *dependentTaskCell = NULL;
@ -5301,11 +5301,11 @@ FindDependedMergeTaskList(Task *sqlTask)
Task *dependentTask = (Task *) lfirst(dependentTaskCell); Task *dependentTask = (Task *) lfirst(dependentTaskCell);
if (dependentTask->taskType == MERGE_TASK) if (dependentTask->taskType == MERGE_TASK)
{ {
dependedMergeTaskList = lappend(dependedMergeTaskList, dependentTask); dependentMergeTaskList = lappend(dependentMergeTaskList, dependentTask);
} }
} }
return dependedMergeTaskList; return dependentMergeTaskList;
} }
@ -5422,7 +5422,7 @@ AssignDataFetchDependencies(List *taskList)
* TaskListHighestTaskId walks over tasks in the given task list, finds the task * TaskListHighestTaskId walks over tasks in the given task list, finds the task
* that has the largest taskId, and returns that taskId. * that has the largest taskId, and returns that taskId.
* *
* Note: This function assumes that the depended taskId's are set before the * Note: This function assumes that the dependent taskId's are set before the
* taskId's for the given task list. * taskId's for the given task list.
*/ */
static uint32 static uint32

View File

@ -995,11 +995,11 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
"Job": { "Job": {
"Task Count": 1, "Task Count": 1,
"Tasks Shown": "None, not supported for re-partition queries", "Tasks Shown": "None, not supported for re-partition queries",
"Depended Jobs": [ "Dependent Jobs": [
{ {
"Map Task Count": 1, "Map Task Count": 1,
"Merge Task Count": 1, "Merge Task Count": 1,
"Depended Jobs": [ "Dependent Jobs": [
{ {
"Map Task Count": 2, "Map Task Count": 2,
"Merge Task Count": 1 "Merge Task Count": 1
@ -1044,18 +1044,18 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Job> <Job>
<Task-Count>1</Task-Count> <Task-Count>1</Task-Count>
<Tasks-Shown>None, not supported for re-partition queries</Tasks-Shown> <Tasks-Shown>None, not supported for re-partition queries</Tasks-Shown>
<Depended-Jobs> <Dependent-Jobs>
<MapMergeJob> <MapMergeJob>
<Map-Task-Count>1</Map-Task-Count> <Map-Task-Count>1</Map-Task-Count>
<Merge-Task-Count>1</Merge-Task-Count> <Merge-Task-Count>1</Merge-Task-Count>
<Depended-Jobs> <Dependent-Jobs>
<MapMergeJob> <MapMergeJob>
<Map-Task-Count>2</Map-Task-Count> <Map-Task-Count>2</Map-Task-Count>
<Merge-Task-Count>1</Merge-Task-Count> <Merge-Task-Count>1</Merge-Task-Count>
</MapMergeJob> </MapMergeJob>
</Depended-Jobs> </Dependent-Jobs>
</MapMergeJob> </MapMergeJob>
</Depended-Jobs> </Dependent-Jobs>
</Job> </Job>
</Distributed-Query> </Distributed-Query>
</Plan> </Plan>
@ -1103,7 +1103,7 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Job: Job:
Task Count: 1 Task Count: 1
Tasks Shown: "None, not supported for re-partition queries" Tasks Shown: "None, not supported for re-partition queries"
Depended Jobs: Dependent Jobs:
- Map Task Count: 2 - Map Task Count: 2
Merge Task Count: 1 Merge Task Count: 1
-- test parallel aggregates -- test parallel aggregates

View File

@ -3,7 +3,7 @@
-- --
-- Tests which cover task assignment for MapMerge jobs for single range repartition -- Tests which cover task assignment for MapMerge jobs for single range repartition
-- and dual hash repartition joins. The tests also cover task assignment propagation -- and dual hash repartition joins. The tests also cover task assignment propagation
-- from a sql task to its depended tasks. Note that we set the executor type to task -- from a sql task to its dependent tasks. Note that we set the executor type to task
-- tracker executor here, as we cannot run repartition jobs with real time executor. -- tracker executor here, as we cannot run repartition jobs with real time executor.
SET citus.next_shard_id TO 710000; SET citus.next_shard_id TO 710000;
BEGIN; BEGIN;

View File

@ -174,7 +174,7 @@ test: multi_outer_join
# functionality related to metadata, shard creation, shard pruning and # functionality related to metadata, shard creation, shard pruning and
# "hacky" copy script for hash partitioned tables. # "hacky" copy script for hash partitioned tables.
# Note that the order of the following tests are important. multi_complex_count_distinct # Note that the order of the following tests are important. multi_complex_count_distinct
# is independed from the rest of the group, it is added to increase parallelism. # is independent from the rest of the group, it is added to increase parallelism.
# --- # ---
test: multi_create_fdw test: multi_create_fdw
test: multi_complex_count_distinct multi_select_distinct test: multi_complex_count_distinct multi_select_distinct

View File

@ -3,7 +3,7 @@
-- --
-- Tests which cover task assignment for MapMerge jobs for single range repartition -- Tests which cover task assignment for MapMerge jobs for single range repartition
-- and dual hash repartition joins. The tests also cover task assignment propagation -- and dual hash repartition joins. The tests also cover task assignment propagation
-- from a sql task to its depended tasks. Note that we set the executor type to task -- from a sql task to its dependent tasks. Note that we set the executor type to task
-- tracker executor here, as we cannot run repartition jobs with real time executor. -- tracker executor here, as we cannot run repartition jobs with real time executor.