From aeec3d1544e91ec47c1ac57c6e746ad7d3aac833 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Thu, 28 Nov 2019 23:47:28 +0300 Subject: [PATCH] fix typo in dependent jobs and dependent task (#3244) --- src/backend/distributed/commands/call.c | 2 +- src/backend/distributed/commands/index.c | 6 +- src/backend/distributed/commands/table.c | 2 +- .../distributed/commands/utility_hook.c | 2 +- src/backend/distributed/commands/vacuum.c | 2 +- .../executor/multi_server_executor.c | 4 +- .../executor/multi_task_tracker_executor.c | 60 ++-- .../master/master_stage_protocol.c | 2 +- .../distributed/master/master_truncate.c | 2 +- .../planner/insert_select_planner.c | 8 +- .../distributed/planner/multi_explain.c | 34 +-- .../planner/multi_physical_planner.c | 256 +++++++++--------- .../planner/multi_router_planner.c | 4 +- .../distributed/utils/citus_copyfuncs.c | 4 +- .../distributed/utils/citus_outfuncs.c | 4 +- .../distributed/utils/citus_readfuncs.c | 4 +- .../distributed/multi_physical_planner.h | 4 +- 17 files changed, 202 insertions(+), 198 deletions(-) diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 745e30105..370e924af 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -169,7 +169,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, task->taskType = DDL_TASK; task->queryString = callCommand->data; task->replicationModel = REPLICATION_MODEL_INVALID; - task->dependedTaskList = NIL; + task->dependentTaskList = NIL; task->anchorShardId = placement->shardId; task->relationShardList = NIL; task->taskPlacementList = placementList; diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index e217fb5c6..024336205 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -523,7 +523,7 @@ CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt) task->taskType = DDL_TASK; task->queryString = pstrdup(ddlString.data); task->replicationModel = REPLICATION_MODEL_INVALID; - task->dependedTaskList = NULL; + task->dependentTaskList = NULL; task->anchorShardId = shardId; task->taskPlacementList = FinalizedShardPlacementList(shardId); @@ -568,7 +568,7 @@ CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt) task->taskType = DDL_TASK; task->queryString = pstrdup(ddlString.data); task->replicationModel = REPLICATION_MODEL_INVALID; - task->dependedTaskList = NULL; + task->dependentTaskList = NULL; task->anchorShardId = shardId; task->taskPlacementList = FinalizedShardPlacementList(shardId); @@ -898,7 +898,7 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt) task->taskType = DDL_TASK; task->queryString = pstrdup(ddlString.data); task->replicationModel = REPLICATION_MODEL_INVALID; - task->dependedTaskList = NULL; + task->dependentTaskList = NULL; task->anchorShardId = shardId; task->taskPlacementList = FinalizedShardPlacementList(shardId); diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index c3c0309b1..79b861cf6 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -1292,7 +1292,7 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId, task->taskId = taskId++; task->taskType = DDL_TASK; task->queryString = applyCommand->data; - task->dependedTaskList = NULL; + task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = leftShardId; task->taskPlacementList = FinalizedShardPlacementList(leftShardId); diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index d9703f849..ddf7416ae 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -1227,7 +1227,7 @@ DDLTaskList(Oid relationId, const char *commandString) task->taskType = DDL_TASK; task->queryString = applyCommand->data; task->replicationModel = REPLICATION_MODEL_INVALID; - task->dependedTaskList = NULL; + task->dependentTaskList = NULL; task->anchorShardId = shardId; task->taskPlacementList = FinalizedShardPlacementList(shardId); diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index fd715d1f3..031b9d566 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -226,7 +226,7 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum task->taskId = taskId++; task->taskType = VACUUM_ANALYZE_TASK; task->queryString = pstrdup(vacuumString->data); - task->dependedTaskList = NULL; + task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = shardId; task->taskPlacementList = FinalizedShardPlacementList(shardId); diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 99a2638c8..5602ff87a 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -94,8 +94,8 @@ JobExecutorType(DistributedPlan *distributedPlan) /* if we have repartition jobs with adaptive executor and repartition * joins are not enabled, error out. Otherwise, switch to task-tracker */ - int dependedJobCount = list_length(job->dependedJobList); - if (dependedJobCount > 0) + int dependentJobCount = list_length(job->dependentJobList); + if (dependentJobCount > 0) { if (!EnableRepartitionJoins) { diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 08188dac4..2c9e20ff8 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -273,7 +273,7 @@ MultiTaskTrackerExecute(Job *job) else if (taskExecutionStatus == EXEC_SOURCE_TASK_TRACKER_FAILED) { /* first resolve the map task this map fetch task depends on */ - Task *mapTask = (Task *) linitial(task->dependedTaskList); + Task *mapTask = (Task *) linitial(task->dependentTaskList); Assert(task->taskType == MAP_OUTPUT_FETCH_TASK); List *mapFetchTaskList = UpstreamDependencyList(taskAndExecutionList, @@ -469,7 +469,7 @@ TaskAndExecutionList(List *jobTaskList) List *taskQueue = list_copy(jobTaskList); while (taskQueue != NIL) { - ListCell *dependedTaskCell = NULL; + ListCell *dependentTaskCell = NULL; /* pop first element from the task queue */ Task *task = (Task *) linitial(taskQueue); @@ -481,7 +481,7 @@ TaskAndExecutionList(List *jobTaskList) taskAndExecutionList = lappend(taskAndExecutionList, task); - List *dependendTaskList = task->dependedTaskList; + List *dependendTaskList = task->dependentTaskList; /* * Push task node's children into the task queue, if and only if @@ -499,9 +499,9 @@ TaskAndExecutionList(List *jobTaskList) * taskHash is used to reduce the complexity of keeping track of * the tasks that are already encountered. */ - foreach(dependedTaskCell, dependendTaskList) + foreach(dependentTaskCell, dependendTaskList) { - Task *dependendTask = lfirst(dependedTaskCell); + Task *dependendTask = lfirst(dependentTaskCell); Task *dependendTaskInHash = TaskHashLookup(taskHash, dependendTask->taskType, dependendTask->jobId, @@ -519,8 +519,8 @@ TaskAndExecutionList(List *jobTaskList) taskQueue = lappend(taskQueue, dependendTaskInHash); } - /* update dependedTaskList element to the one which is in the hash */ - lfirst(dependedTaskCell) = dependendTaskInHash; + /* update dependentTaskList element to the one which is in the hash */ + lfirst(dependentTaskCell) = dependendTaskInHash; } } @@ -966,8 +966,8 @@ ResolveMapTaskTracker(HTAB *trackerHash, Task *task, TaskExecution *taskExecutio return NULL; } - Assert(task->dependedTaskList != NIL); - Task *mapTask = (Task *) linitial(task->dependedTaskList); + Assert(task->dependentTaskList != NIL); + Task *mapTask = (Task *) linitial(task->dependentTaskList); TaskExecution *mapTaskExecution = mapTask->taskExecution; TaskTracker *mapTaskTracker = ResolveTaskTracker(trackerHash, mapTask, @@ -1043,7 +1043,7 @@ ManageTaskExecution(TaskTracker *taskTracker, TaskTracker *sourceTaskTracker, * if these dependencies' executions have completed. */ bool taskExecutionsCompleted = TaskExecutionsCompleted( - task->dependedTaskList); + task->dependentTaskList); if (!taskExecutionsCompleted) { nextExecutionStatus = EXEC_TASK_UNASSIGNED; @@ -1054,7 +1054,7 @@ ManageTaskExecution(TaskTracker *taskTracker, TaskTracker *sourceTaskTracker, TaskType taskType = task->taskType; if (taskType == MAP_OUTPUT_FETCH_TASK) { - Task *mapTask = (Task *) linitial(task->dependedTaskList); + Task *mapTask = (Task *) linitial(task->dependentTaskList); TaskExecution *mapTaskExecution = mapTask->taskExecution; StringInfo mapFetchTaskQueryString = MapFetchTaskQueryString(task, @@ -1162,7 +1162,7 @@ ManageTaskExecution(TaskTracker *taskTracker, TaskTracker *sourceTaskTracker, case EXEC_SOURCE_TASK_TRACKER_RETRY: { - Task *mapTask = (Task *) linitial(task->dependedTaskList); + Task *mapTask = (Task *) linitial(task->dependentTaskList); TaskExecution *mapTaskExecution = mapTask->taskExecution; uint32 sourceNodeIndex = mapTaskExecution->currentNodeIndex; @@ -1807,10 +1807,11 @@ ConstrainedTaskList(List *taskAndExecutionList, Task *task) foreach(mergeTaskCell, mergeTaskList) { Task *mergeTask = (Task *) lfirst(mergeTaskCell); - List *dependedTaskList = mergeTask->dependedTaskList; + List *dependentTaskList = mergeTask->dependentTaskList; constrainedTaskList = lappend(constrainedTaskList, mergeTask); - constrainedTaskList = TaskListConcatUnique(constrainedTaskList, dependedTaskList); + constrainedTaskList = TaskListConcatUnique(constrainedTaskList, + dependentTaskList); } /* @@ -1829,14 +1830,15 @@ ConstrainedTaskList(List *taskAndExecutionList, Task *task) foreach(upstreamTaskCell, upstreamTaskList) { Task *upstreamTask = (Task *) lfirst(upstreamTaskCell); - List *dependedTaskList = upstreamTask->dependedTaskList; + List *dependentTaskList = upstreamTask->dependentTaskList; /* * We already added merge tasks to our constrained list. We therefore use * concat unique to ensure they don't get appended for a second time. */ constrainedTaskList = TaskListAppendUnique(constrainedTaskList, upstreamTask); - constrainedTaskList = TaskListConcatUnique(constrainedTaskList, dependedTaskList); + constrainedTaskList = TaskListConcatUnique(constrainedTaskList, + dependentTaskList); } return constrainedTaskList; @@ -1852,18 +1854,18 @@ static List * ConstrainedNonMergeTaskList(List *taskAndExecutionList, Task *task) { Task *upstreamTask = NULL; - List *dependedTaskList = NIL; + List *dependentTaskList = NIL; TaskType taskType = task->taskType; if (taskType == SQL_TASK || taskType == MAP_TASK) { upstreamTask = task; - dependedTaskList = upstreamTask->dependedTaskList; + dependentTaskList = upstreamTask->dependentTaskList; } Assert(upstreamTask != NULL); List *constrainedTaskList = list_make1(upstreamTask); - constrainedTaskList = list_concat(constrainedTaskList, dependedTaskList); + constrainedTaskList = list_concat(constrainedTaskList, dependentTaskList); return constrainedTaskList; } @@ -1884,8 +1886,8 @@ UpstreamDependencyList(List *taskAndExecutionList, Task *searchedTask) foreach(taskAndExecutionCell, taskAndExecutionList) { Task *upstreamTask = (Task *) lfirst(taskAndExecutionCell); - List *dependedTaskList = upstreamTask->dependedTaskList; - ListCell *dependedTaskCell = NULL; + List *dependentTaskList = upstreamTask->dependentTaskList; + ListCell *dependentTaskCell = NULL; /* * The given task and its upstream dependency cannot be of the same type. @@ -1901,10 +1903,10 @@ UpstreamDependencyList(List *taskAndExecutionList, Task *searchedTask) * We walk over the upstream task's dependency list, and check if any of * them is the task we are looking for. */ - foreach(dependedTaskCell, dependedTaskList) + foreach(dependentTaskCell, dependentTaskList) { - Task *dependedTask = (Task *) lfirst(dependedTaskCell); - if (TasksEqual(dependedTask, searchedTask)) + Task *dependentTask = (Task *) lfirst(dependentTaskCell); + if (TasksEqual(dependentTask, searchedTask)) { upstreamTaskList = lappend(upstreamTaskList, upstreamTask); } @@ -1935,7 +1937,7 @@ ConstrainedMergeTaskList(List *taskAndExecutionList, Task *task) */ if (taskType == SQL_TASK || taskType == MAP_TASK) { - constrainedMergeTaskList = MergeTaskList(task->dependedTaskList); + constrainedMergeTaskList = MergeTaskList(task->dependentTaskList); } else if (taskType == MAP_OUTPUT_FETCH_TASK) { @@ -1949,7 +1951,7 @@ ConstrainedMergeTaskList(List *taskAndExecutionList, Task *task) List *upstreamTaskList = UpstreamDependencyList(taskAndExecutionList, mergeTask); Task *upstreamTask = (Task *) linitial(upstreamTaskList); - constrainedMergeTaskList = MergeTaskList(upstreamTask->dependedTaskList); + constrainedMergeTaskList = MergeTaskList(upstreamTask->dependentTaskList); } else if (taskType == MERGE_TASK) { @@ -1963,7 +1965,7 @@ ConstrainedMergeTaskList(List *taskAndExecutionList, Task *task) Assert(upstreamTaskList != NIL); Task *upstreamTask = (Task *) linitial(upstreamTaskList); - constrainedMergeTaskList = MergeTaskList(upstreamTask->dependedTaskList); + constrainedMergeTaskList = MergeTaskList(upstreamTask->dependentTaskList); } return constrainedMergeTaskList; @@ -2593,8 +2595,8 @@ JobIdList(Job *job) (*jobIdPointer) = currJob->jobId; jobIdList = lappend(jobIdList, jobIdPointer); - /* prevent dependedJobList being modified on list_concat() call */ - List *jobChildrenList = list_copy(currJob->dependedJobList); + /* prevent dependentJobList being modified on list_concat() call */ + List *jobChildrenList = list_copy(currJob->dependentJobList); if (jobChildrenList != NIL) { jobQueue = list_concat(jobQueue, jobChildrenList); diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 18f43502a..502b361d9 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -516,7 +516,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, task->taskType = DDL_TASK; task->queryString = StringJoin(commandList, ';'); task->replicationModel = REPLICATION_MODEL_INVALID; - task->dependedTaskList = NIL; + task->dependentTaskList = NIL; task->anchorShardId = shardId; task->relationShardList = relationShardList; task->taskPlacementList = list_make1(shardPlacement); diff --git a/src/backend/distributed/master/master_truncate.c b/src/backend/distributed/master/master_truncate.c index 7bfa0ddac..0e9fdaae8 100644 --- a/src/backend/distributed/master/master_truncate.c +++ b/src/backend/distributed/master/master_truncate.c @@ -117,7 +117,7 @@ TruncateTaskList(Oid relationId) task->taskId = taskId++; task->taskType = DDL_TASK; task->queryString = shardQueryString->data; - task->dependedTaskList = NULL; + task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = shardId; task->taskPlacementList = FinalizedShardPlacementList(shardId); diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index e31fa0e40..29291b275 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -283,7 +283,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, Job *workerJob = CitusMakeNode(Job); workerJob->taskList = sqlTaskList; workerJob->subqueryPushdown = false; - workerJob->dependedJobList = NIL; + workerJob->dependentJobList = NIL; workerJob->jobId = jobId; workerJob->jobQuery = originalQuery; workerJob->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); @@ -575,7 +575,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, queryString->data); - modifyTask->dependedTaskList = NULL; + modifyTask->dependentTaskList = NULL; modifyTask->anchorShardId = shardId; modifyTask->taskPlacementList = insertShardPlacementList; modifyTask->relationShardList = relationShardList; @@ -1178,7 +1178,7 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse) Job *workerJob = CitusMakeNode(Job); workerJob->taskList = taskList; workerJob->subqueryPushdown = false; - workerJob->dependedJobList = NIL; + workerJob->dependentJobList = NIL; workerJob->jobId = jobId; workerJob->jobQuery = insertSelectQuery; workerJob->requiresMasterEvaluation = false; @@ -1397,7 +1397,7 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, queryString->data); - modifyTask->dependedTaskList = NULL; + modifyTask->dependentTaskList = NULL; modifyTask->anchorShardId = shardId; modifyTask->taskPlacementList = insertShardPlacementList; modifyTask->relationShardList = list_make1(relationShard); diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index f3dae835e..369296707 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -213,9 +213,9 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) static void ExplainJob(Job *job, ExplainState *es) { - List *dependedJobList = job->dependedJobList; - int dependedJobCount = list_length(dependedJobList); - ListCell *dependedJobCell = NULL; + List *dependentJobList = job->dependentJobList; + int dependentJobCount = list_length(dependentJobList); + ListCell *dependentJobCell = NULL; List *taskList = job->taskList; int taskCount = list_length(taskList); @@ -223,7 +223,7 @@ ExplainJob(Job *job, ExplainState *es) ExplainPropertyInteger("Task Count", NULL, taskCount, es); - if (dependedJobCount > 0) + if (dependentJobCount > 0) { ExplainPropertyText("Tasks Shown", "None, not supported for re-partition " "queries", es); @@ -244,7 +244,7 @@ ExplainJob(Job *job, ExplainState *es) * We cannot fetch EXPLAIN plans for jobs that have dependencies, since the * intermediate tables have not been created. */ - if (dependedJobCount == 0) + if (dependentJobCount == 0) { ExplainOpenGroup("Tasks", "Tasks", false, es); @@ -257,13 +257,13 @@ ExplainJob(Job *job, ExplainState *es) ExplainOpenGroup("Depended Jobs", "Depended Jobs", false, es); /* show explain output for depended jobs, if any */ - foreach(dependedJobCell, dependedJobList) + foreach(dependentJobCell, dependentJobList) { - Job *dependedJob = (Job *) lfirst(dependedJobCell); + Job *dependentJob = (Job *) lfirst(dependentJobCell); - if (CitusIsA(dependedJob, MapMergeJob)) + if (CitusIsA(dependentJob, MapMergeJob)) { - ExplainMapMergeJob((MapMergeJob *) dependedJob, es); + ExplainMapMergeJob((MapMergeJob *) dependentJob, es); } } @@ -283,9 +283,9 @@ ExplainJob(Job *job, ExplainState *es) static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es) { - List *dependedJobList = mapMergeJob->job.dependedJobList; - int dependedJobCount = list_length(dependedJobList); - ListCell *dependedJobCell = NULL; + List *dependentJobList = mapMergeJob->job.dependentJobList; + int dependentJobCount = list_length(dependentJobList); + ListCell *dependentJobCell = NULL; int mapTaskCount = list_length(mapMergeJob->mapTaskList); int mergeTaskCount = list_length(mapMergeJob->mergeTaskList); @@ -300,17 +300,17 @@ ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es) ExplainPropertyInteger("Map Task Count", NULL, mapTaskCount, es); ExplainPropertyInteger("Merge Task Count", NULL, mergeTaskCount, es); - if (dependedJobCount > 0) + if (dependentJobCount > 0) { ExplainOpenGroup("Depended Jobs", "Depended Jobs", false, es); - foreach(dependedJobCell, dependedJobList) + foreach(dependentJobCell, dependentJobList) { - Job *dependedJob = (Job *) lfirst(dependedJobCell); + Job *dependentJob = (Job *) lfirst(dependentJobCell); - if (CitusIsA(dependedJob, MapMergeJob)) + if (CitusIsA(dependentJob, MapMergeJob)) { - ExplainMapMergeJob((MapMergeJob *) dependedJob, es); + ExplainMapMergeJob((MapMergeJob *) dependentJob, es); } } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index efbd35d1e..1524d4b31 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -94,8 +94,8 @@ static Job * BuildJobTree(MultiTreeRoot *multiTree); static MultiNode * LeftMostNode(MultiTreeRoot *multiTree); static Oid RangePartitionJoinBaseRelationId(MultiJoin *joinNode); static MultiTable * FindTableNode(MultiNode *multiNode, int rangeTableId); -static Query * BuildJobQuery(MultiNode *multiNode, List *dependedJobList); -static Query * BuildReduceQuery(MultiExtendedOp *extendedOpNode, List *dependedJobList); +static Query * BuildJobQuery(MultiNode *multiNode, List *dependentJobList); +static Query * BuildReduceQuery(MultiExtendedOp *extendedOpNode, List *dependentJobList); static List * BaseRangeTableList(MultiNode *multiNode); static List * QueryTargetList(MultiNode *multiNode); static List * TargetEntryList(List *expressionList); @@ -103,29 +103,29 @@ static List * QueryGroupClauseList(MultiNode *multiNode); static List * QuerySelectClauseList(MultiNode *multiNode); static List * QueryJoinClauseList(MultiNode *multiNode); static List * QueryFromList(List *rangeTableList); -static Node * QueryJoinTree(MultiNode *multiNode, List *dependedJobList, +static Node * QueryJoinTree(MultiNode *multiNode, List *dependentJobList, List **rangeTableList); -static RangeTblEntry * JoinRangeTableEntry(JoinExpr *joinExpr, List *dependedJobList, +static RangeTblEntry * JoinRangeTableEntry(JoinExpr *joinExpr, List *dependentJobList, List *rangeTableList); static int ExtractRangeTableId(Node *node); static void ExtractColumns(RangeTblEntry *rangeTableEntry, int rangeTableId, - List *dependedJobList, List **columnNames, List **columnVars); + List *dependentJobList, List **columnNames, List **columnVars); static RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *columnNames, List *tableIdList); static List * DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId); static Query * BuildSubqueryJobQuery(MultiNode *multiNode); static void UpdateAllColumnAttributes(Node *columnContainer, List *rangeTableList, - List *dependedJobList); + List *dependentJobList); static void UpdateColumnAttributes(Var *column, List *rangeTableList, - List *dependedJobList); + List *dependentJobList); static Index NewTableId(Index originalTableId, List *rangeTableList); static AttrNumber NewColumnId(Index originalTableId, AttrNumber originalColumnId, - RangeTblEntry *newRangeTableEntry, List *dependedJobList); + RangeTblEntry *newRangeTableEntry, List *dependentJobList); static Job * JobForRangeTable(List *jobList, RangeTblEntry *rangeTableEntry); static Job * JobForTableIdList(List *jobList, List *searchedTableIdList); static List * ChildNodeList(MultiNode *multiNode); -static Job * BuildJob(Query *jobQuery, List *dependedJobList); -static MapMergeJob * BuildMapMergeJob(Query *jobQuery, List *dependedJobList, +static Job * BuildJob(Query *jobQuery, List *dependentJobList); +static MapMergeJob * BuildMapMergeJob(Query *jobQuery, List *dependentJobList, Var *partitionKey, PartitionType partitionType, Oid baseRelationId, BoundaryNodeJobType boundaryNodeJobType); @@ -153,15 +153,16 @@ static List * BaseRangeTableIdList(List *rangeTableList); static List * AnchorRangeTableIdList(List *rangeTableList, List *baseRangeTableIdList); static void AdjustColumnOldAttributes(List *expressionList); static List * RangeTableFragmentsList(List *rangeTableList, List *whereClauseList, - List *dependedJobList); + List *dependentJobList); static OperatorCacheEntry * LookupOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber); static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber); static List * FragmentCombinationList(List *rangeTableFragmentsList, Query *jobQuery, - List *dependedJobList); + List *dependentJobList); static JoinSequenceNode * JoinSequenceArray(List *rangeTableFragmentsList, - Query *jobQuery, List *dependedJobList); -static bool PartitionedOnColumn(Var *column, List *rangeTableList, List *dependedJobList); + Query *jobQuery, List *dependentJobList); +static bool PartitionedOnColumn(Var *column, List *rangeTableList, + List *dependentJobList); static void CheckJoinBetweenColumns(OpExpr *joinClause); static List * FindRangeTableFragmentsList(List *rangeTableFragmentsList, int taskId); static bool JoinPrunable(RangeTableFragment *leftFragment, @@ -224,8 +225,8 @@ CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree, workerJob = BuildJobTreeTaskList(workerJob, plannerRestrictionContext); /* build the final merge query to execute on the master */ - List *masterDependedJobList = list_make1(workerJob); - Query *masterQuery = BuildJobQuery((MultiNode *) multiTree, masterDependedJobList); + List *masterDependentJobList = list_make1(workerJob); + Query *masterQuery = BuildJobQuery((MultiNode *) multiTree, masterDependentJobList); DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); distributedPlan->workerJob = workerJob; @@ -252,7 +253,7 @@ DistributedPlanRouterExecutable(DistributedPlan *distributedPlan) Job *job = distributedPlan->workerJob; List *workerTaskList = job->taskList; int taskCount = list_length(workerTaskList); - int dependedJobCount = list_length(job->dependedJobList); + int dependentJobCount = list_length(job->dependentJobList); if (!EnableRouterExecution) { @@ -266,7 +267,7 @@ DistributedPlanRouterExecutable(DistributedPlan *distributedPlan) } /* router executor cannot execute repartition jobs */ - if (dependedJobCount > 0) + if (dependentJobCount > 0) { return false; } @@ -309,7 +310,7 @@ BuildJobTree(MultiTreeRoot *multiTree) MultiNode *leftMostNode = LeftMostNode(multiTree); MultiNode *currentNode = leftMostNode; MultiNode *parentNode = ParentNode(currentNode); - List *loopDependedJobList = NIL; + List *loopDependentJobList = NIL; Job *topLevelJob = NULL; while (parentNode != NULL) @@ -375,17 +376,17 @@ BuildJobTree(MultiTreeRoot *multiTree) Var *partitionKey = partitionNode->partitionColumn; /* build query and partition job */ - List *dependedJobList = list_copy(loopDependedJobList); - Query *jobQuery = BuildJobQuery(queryNode, dependedJobList); + List *dependentJobList = list_copy(loopDependentJobList); + Query *jobQuery = BuildJobQuery(queryNode, dependentJobList); - MapMergeJob *mapMergeJob = BuildMapMergeJob(jobQuery, dependedJobList, + MapMergeJob *mapMergeJob = BuildMapMergeJob(jobQuery, dependentJobList, partitionKey, partitionType, baseRelationId, JOIN_MAP_MERGE_JOB); /* reset depended job list */ - loopDependedJobList = NIL; - loopDependedJobList = list_make1(mapMergeJob); + loopDependentJobList = NIL; + loopDependentJobList = list_make1(mapMergeJob); } if (CitusIsA(rightChildNode, MultiPartition)) @@ -405,7 +406,7 @@ BuildJobTree(MultiTreeRoot *multiTree) JOIN_MAP_MERGE_JOB); /* append to the depended job list for on-going dependencies */ - loopDependedJobList = lappend(loopDependedJobList, mapMergeJob); + loopDependentJobList = lappend(loopDependentJobList, mapMergeJob); } } else if (boundaryNodeJobType == SUBQUERY_MAP_MERGE_JOB) @@ -415,10 +416,10 @@ BuildJobTree(MultiTreeRoot *multiTree) Var *partitionKey = partitionNode->partitionColumn; /* build query and partition job */ - List *dependedJobList = list_copy(loopDependedJobList); - Query *jobQuery = BuildJobQuery(queryNode, dependedJobList); + List *dependentJobList = list_copy(loopDependentJobList); + Query *jobQuery = BuildJobQuery(queryNode, dependentJobList); - MapMergeJob *mapMergeJob = BuildMapMergeJob(jobQuery, dependedJobList, + MapMergeJob *mapMergeJob = BuildMapMergeJob(jobQuery, dependentJobList, partitionKey, DUAL_HASH_PARTITION_TYPE, InvalidOid, @@ -429,13 +430,13 @@ BuildJobTree(MultiTreeRoot *multiTree) mapMergeJob->reduceQuery = reduceQuery; /* reset depended job list */ - loopDependedJobList = NIL; - loopDependedJobList = list_make1(mapMergeJob); + loopDependentJobList = NIL; + loopDependentJobList = list_make1(mapMergeJob); } else if (boundaryNodeJobType == TOP_LEVEL_WORKER_JOB) { MultiNode *childNode = ChildNode((MultiUnaryNode *) currentNode); - List *dependedJobList = list_copy(loopDependedJobList); + List *dependentJobList = list_copy(loopDependentJobList); bool subqueryPushdown = false; List *subqueryMultiTableList = SubqueryMultiTableList(childNode); @@ -456,14 +457,14 @@ BuildJobTree(MultiTreeRoot *multiTree) { Query *topLevelQuery = BuildSubqueryJobQuery(childNode); - topLevelJob = BuildJob(topLevelQuery, dependedJobList); + topLevelJob = BuildJob(topLevelQuery, dependentJobList); topLevelJob->subqueryPushdown = true; } else { - Query *topLevelQuery = BuildJobQuery(childNode, dependedJobList); + Query *topLevelQuery = BuildJobQuery(childNode, dependentJobList); - topLevelJob = BuildJob(topLevelQuery, dependedJobList); + topLevelJob = BuildJob(topLevelQuery, dependentJobList); } } @@ -569,7 +570,7 @@ FindTableNode(MultiNode *multiNode, int rangeTableId) * have already been built, as their output is needed to build the query. */ static Query * -BuildJobQuery(MultiNode *multiNode, List *dependedJobList) +BuildJobQuery(MultiNode *multiNode, List *dependentJobList) { bool updateColumnAttributes = false; List *targetList = NIL; @@ -604,9 +605,9 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList) * If we are building this query on a repartitioned subquery job then we * don't need to update column attributes. */ - if (dependedJobList != NIL) + if (dependentJobList != NIL) { - Job *job = (Job *) linitial(dependedJobList); + Job *job = (Job *) linitial(dependentJobList); if (CitusIsA(job, MapMergeJob)) { MapMergeJob *mapMergeJob = (MapMergeJob *) job; @@ -638,12 +639,12 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList) /* build the join tree and the range table list */ List *rangeTableList = BaseRangeTableList(multiNode); - Node *joinRoot = QueryJoinTree(multiNode, dependedJobList, &rangeTableList); + Node *joinRoot = QueryJoinTree(multiNode, dependentJobList, &rangeTableList); /* update the column attributes for target entries */ if (updateColumnAttributes) { - UpdateAllColumnAttributes((Node *) targetList, rangeTableList, dependedJobList); + UpdateAllColumnAttributes((Node *) targetList, rangeTableList, dependentJobList); } /* extract limit count/offset and sort clauses */ @@ -668,8 +669,8 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList) if (updateColumnAttributes) { UpdateAllColumnAttributes((Node *) selectClauseList, rangeTableList, - dependedJobList); - UpdateAllColumnAttributes(havingQual, rangeTableList, dependedJobList); + dependentJobList); + UpdateAllColumnAttributes(havingQual, rangeTableList, dependentJobList); } /* @@ -726,7 +727,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList) * have already been built, as their output is needed to build the query. */ static Query * -BuildReduceQuery(MultiExtendedOp *extendedOpNode, List *dependedJobList) +BuildReduceQuery(MultiExtendedOp *extendedOpNode, List *dependentJobList) { MultiNode *multiNode = (MultiNode *) extendedOpNode; List *derivedRangeTableList = NIL; @@ -734,8 +735,8 @@ BuildReduceQuery(MultiExtendedOp *extendedOpNode, List *dependedJobList) ListCell *columnCell = NULL; List *columnNameList = NIL; - Job *dependedJob = linitial(dependedJobList); - List *dependedTargetList = dependedJob->jobQuery->targetList; + Job *dependentJob = linitial(dependentJobList); + List *dependedTargetList = dependentJob->jobQuery->targetList; uint32 columnCount = (uint32) list_length(dependedTargetList); for (uint32 columnIndex = 0; columnIndex < columnCount; columnIndex++) @@ -1121,7 +1122,7 @@ QueryJoinClauseList(MultiNode *multiNode) * the entries at the same time as the tree to know the appropriate rtindex. */ static Node * -QueryJoinTree(MultiNode *multiNode, List *dependedJobList, List **rangeTableList) +QueryJoinTree(MultiNode *multiNode, List *dependentJobList, List **rangeTableList) { CitusNodeTag nodeType = CitusNodeTag(multiNode); @@ -1135,9 +1136,9 @@ QueryJoinTree(MultiNode *multiNode, List *dependedJobList, List **rangeTableList JoinExpr *joinExpr = makeNode(JoinExpr); joinExpr->jointype = joinNode->joinType; joinExpr->isNatural = false; - joinExpr->larg = QueryJoinTree(binaryNode->leftChildNode, dependedJobList, + joinExpr->larg = QueryJoinTree(binaryNode->leftChildNode, dependentJobList, rangeTableList); - joinExpr->rarg = QueryJoinTree(binaryNode->rightChildNode, dependedJobList, + joinExpr->rarg = QueryJoinTree(binaryNode->rightChildNode, dependentJobList, rangeTableList); joinExpr->usingClause = NIL; joinExpr->alias = NULL; @@ -1161,7 +1162,7 @@ QueryJoinTree(MultiNode *multiNode, List *dependedJobList, List **rangeTableList } RangeTblEntry *rangeTableEntry = JoinRangeTableEntry(joinExpr, - dependedJobList, + dependentJobList, *rangeTableList); *rangeTableList = lappend(*rangeTableList, rangeTableEntry); @@ -1170,7 +1171,7 @@ QueryJoinTree(MultiNode *multiNode, List *dependedJobList, List **rangeTableList foreach(columnCell, columnList) { Var *column = (Var *) lfirst(columnCell); - UpdateColumnAttributes(column, *rangeTableList, dependedJobList); + UpdateColumnAttributes(column, *rangeTableList, dependentJobList); /* adjust our column old attributes for partition pruning to work */ column->varnoold = column->varno; @@ -1191,7 +1192,7 @@ QueryJoinTree(MultiNode *multiNode, List *dependedJobList, List **rangeTableList if (unaryNode->childNode != NULL) { /* MultiTable is actually a subquery, return the query tree below */ - Node *childNode = QueryJoinTree(unaryNode->childNode, dependedJobList, + Node *childNode = QueryJoinTree(unaryNode->childNode, dependentJobList, rangeTableList); return childNode; @@ -1209,12 +1210,13 @@ QueryJoinTree(MultiNode *multiNode, List *dependedJobList, List **rangeTableList case T_MultiCollect: { List *tableIdList = OutputTableIdList(multiNode); - Job *dependedJob = JobForTableIdList(dependedJobList, tableIdList); - List *dependedTargetList = dependedJob->jobQuery->targetList; + Job *dependentJob = JobForTableIdList(dependentJobList, tableIdList); + List *dependedTargetList = dependentJob->jobQuery->targetList; /* compute column names for the derived table */ uint32 columnCount = (uint32) list_length(dependedTargetList); - List *columnNameList = DerivedColumnNameList(columnCount, dependedJob->jobId); + List *columnNameList = DerivedColumnNameList(columnCount, + dependentJob->jobId); RangeTblEntry *rangeTableEntry = DerivedRangeTableEntry(multiNode, columnNameList, @@ -1234,9 +1236,9 @@ QueryJoinTree(MultiNode *multiNode, List *dependedJobList, List **rangeTableList JoinExpr *joinExpr = makeNode(JoinExpr); joinExpr->jointype = JOIN_INNER; joinExpr->isNatural = false; - joinExpr->larg = QueryJoinTree(binaryNode->leftChildNode, dependedJobList, + joinExpr->larg = QueryJoinTree(binaryNode->leftChildNode, dependentJobList, rangeTableList); - joinExpr->rarg = QueryJoinTree(binaryNode->rightChildNode, dependedJobList, + joinExpr->rarg = QueryJoinTree(binaryNode->rightChildNode, dependentJobList, rangeTableList); joinExpr->usingClause = NIL; joinExpr->alias = NULL; @@ -1244,7 +1246,7 @@ QueryJoinTree(MultiNode *multiNode, List *dependedJobList, List **rangeTableList joinExpr->rtindex = list_length(*rangeTableList) + 1; RangeTblEntry *rangeTableEntry = JoinRangeTableEntry(joinExpr, - dependedJobList, + dependentJobList, *rangeTableList); *rangeTableList = lappend(*rangeTableList, rangeTableEntry); @@ -1261,7 +1263,7 @@ QueryJoinTree(MultiNode *multiNode, List *dependedJobList, List **rangeTableList Assert(UnaryOperator(multiNode)); - Node *childNode = QueryJoinTree(unaryNode->childNode, dependedJobList, + Node *childNode = QueryJoinTree(unaryNode->childNode, dependentJobList, rangeTableList); return childNode; @@ -1281,7 +1283,7 @@ QueryJoinTree(MultiNode *multiNode, List *dependedJobList, List **rangeTableList * transformFromClauseItem. */ static RangeTblEntry * -JoinRangeTableEntry(JoinExpr *joinExpr, List *dependedJobList, List *rangeTableList) +JoinRangeTableEntry(JoinExpr *joinExpr, List *dependentJobList, List *rangeTableList) { RangeTblEntry *rangeTableEntry = makeNode(RangeTblEntry); List *joinedColumnNames = NIL; @@ -1303,9 +1305,9 @@ JoinRangeTableEntry(JoinExpr *joinExpr, List *dependedJobList, List *rangeTableL rangeTableEntry->subquery = NULL; rangeTableEntry->eref = makeAlias("unnamed_join", NIL); - ExtractColumns(leftRTE, leftRangeTableId, dependedJobList, + ExtractColumns(leftRTE, leftRangeTableId, dependentJobList, &leftColumnNames, &leftColumnVars); - ExtractColumns(rightRTE, rightRangeTableId, dependedJobList, + ExtractColumns(rightRTE, rightRangeTableId, dependentJobList, &rightColumnNames, &rightColumnVars); joinedColumnNames = list_concat(joinedColumnNames, leftColumnNames); @@ -1353,7 +1355,7 @@ ExtractRangeTableId(Node *node) * to a form that expandRTE can handle. */ static void -ExtractColumns(RangeTblEntry *rangeTableEntry, int rangeTableId, List *dependedJobList, +ExtractColumns(RangeTblEntry *rangeTableEntry, int rangeTableId, List *dependentJobList, List **columnNames, List **columnVars) { RangeTblEntry *callingRTE = NULL; @@ -1380,8 +1382,8 @@ ExtractColumns(RangeTblEntry *rangeTableEntry, int rangeTableId, List *dependedJ } else if (rangeTableKind == CITUS_RTE_REMOTE_QUERY) { - Job *dependedJob = JobForRangeTable(dependedJobList, rangeTableEntry); - Query *jobQuery = dependedJob->jobQuery; + Job *dependentJob = JobForRangeTable(dependentJobList, rangeTableEntry); + Query *jobQuery = dependentJob->jobQuery; /* * For re-partition jobs, we construct a subquery RTE to call expandRTE, @@ -1564,14 +1566,14 @@ BuildSubqueryJobQuery(MultiNode *multiNode) */ static void UpdateAllColumnAttributes(Node *columnContainer, List *rangeTableList, - List *dependedJobList) + List *dependentJobList) { ListCell *columnCell = NULL; List *columnList = pull_var_clause_default(columnContainer); foreach(columnCell, columnList) { Var *column = (Var *) lfirst(columnCell); - UpdateColumnAttributes(column, rangeTableList, dependedJobList); + UpdateColumnAttributes(column, rangeTableList, dependentJobList); } } @@ -1582,7 +1584,7 @@ UpdateAllColumnAttributes(Node *columnContainer, List *rangeTableList, * newly built range table list to update the given column's attributes. */ static void -UpdateColumnAttributes(Var *column, List *rangeTableList, List *dependedJobList) +UpdateColumnAttributes(Var *column, List *rangeTableList, List *dependentJobList) { Index originalTableId = column->varnoold; AttrNumber originalColumnId = column->varoattno; @@ -1596,7 +1598,7 @@ UpdateColumnAttributes(Var *column, List *rangeTableList, List *dependedJobList) if (GetRangeTblKind(newRangeTableEntry) == CITUS_RTE_REMOTE_QUERY) { newColumnId = NewColumnId(originalTableId, originalColumnId, - newRangeTableEntry, dependedJobList); + newRangeTableEntry, dependentJobList); } column->varno = newTableId; @@ -1648,13 +1650,13 @@ NewTableId(Index originalTableId, List *rangeTableList) */ static AttrNumber NewColumnId(Index originalTableId, AttrNumber originalColumnId, - RangeTblEntry *newRangeTableEntry, List *dependedJobList) + RangeTblEntry *newRangeTableEntry, List *dependentJobList) { AttrNumber newColumnId = 1; AttrNumber columnIndex = 1; - Job *dependedJob = JobForRangeTable(dependedJobList, newRangeTableEntry); - List *targetEntryList = dependedJob->jobQuery->targetList; + Job *dependentJob = JobForRangeTable(dependentJobList, newRangeTableEntry); + List *targetEntryList = dependentJob->jobQuery->targetList; ListCell *targetEntryCell = NULL; foreach(targetEntryCell, targetEntryList) @@ -1852,12 +1854,12 @@ UniqueJobId(void) /* Builds a job from the given job query and depended job list. */ static Job * -BuildJob(Query *jobQuery, List *dependedJobList) +BuildJob(Query *jobQuery, List *dependentJobList) { Job *job = CitusMakeNode(Job); job->jobId = UniqueJobId(); job->jobQuery = jobQuery; - job->dependedJobList = dependedJobList; + job->dependentJobList = dependentJobList; job->requiresMasterEvaluation = false; return job; @@ -1871,7 +1873,7 @@ BuildJob(Query *jobQuery, List *dependedJobList) * method to apply. */ static MapMergeJob * -BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey, +BuildMapMergeJob(Query *jobQuery, List *dependentJobList, Var *partitionKey, PartitionType partitionType, Oid baseRelationId, BoundaryNodeJobType boundaryNodeJobType) { @@ -1881,13 +1883,13 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey, /* update the logical partition key's table and column identifiers */ if (boundaryNodeJobType != SUBQUERY_MAP_MERGE_JOB) { - UpdateColumnAttributes(partitionColumn, rangeTableList, dependedJobList); + UpdateColumnAttributes(partitionColumn, rangeTableList, dependentJobList); } MapMergeJob *mapMergeJob = CitusMakeNode(MapMergeJob); mapMergeJob->job.jobId = UniqueJobId(); mapMergeJob->job.jobQuery = jobQuery; - mapMergeJob->job.dependedJobList = dependedJobList; + mapMergeJob->job.dependentJobList = dependentJobList; mapMergeJob->partitionColumn = partitionColumn; mapMergeJob->sortedShardIntervalArrayLength = 0; @@ -2015,7 +2017,7 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction /* pop top element and push its children to the stack */ jobStack = list_delete_ptr(jobStack, job); - jobStack = list_union_ptr(jobStack, job->dependedJobList); + jobStack = list_union_ptr(jobStack, job->dependentJobList); } /* @@ -2447,7 +2449,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, subqueryTask->queryString = queryString->data; } - subqueryTask->dependedTaskList = NULL; + subqueryTask->dependentTaskList = NULL; subqueryTask->anchorShardId = anchorShardId; subqueryTask->taskPlacementList = selectPlacementList; subqueryTask->relationShardList = relationShardList; @@ -2631,7 +2633,7 @@ SqlTaskList(Job *job) Query *jobQuery = job->jobQuery; List *rangeTableList = jobQuery->rtable; List *whereClauseList = (List *) jobQuery->jointree->quals; - List *dependedJobList = job->dependedJobList; + List *dependentJobList = job->dependentJobList; /* * If we don't depend on a hash partition, then we determine the largest @@ -2668,7 +2670,7 @@ SqlTaskList(Job *job) */ List *rangeTableFragmentsList = RangeTableFragmentsList(rangeTableList, whereClauseList, - dependedJobList); + dependentJobList); if (rangeTableFragmentsList == NIL) { return NIL; @@ -2680,7 +2682,7 @@ SqlTaskList(Job *job) * represents one SQL task's dependencies. */ List *fragmentCombinationList = FragmentCombinationList(rangeTableFragmentsList, - jobQuery, dependedJobList); + jobQuery, dependentJobList); ListCell *fragmentCombinationCell = NULL; foreach(fragmentCombinationCell, fragmentCombinationList) @@ -2704,7 +2706,7 @@ SqlTaskList(Job *job) Task *sqlTask = CreateBasicTask(jobId, taskIdIndex, SQL_TASK, sqlQueryString->data); - sqlTask->dependedTaskList = dataFetchTaskList; + sqlTask->dependentTaskList = dataFetchTaskList; sqlTask->relationShardList = BuildRelationShardList(fragmentRangeTableList, fragmentCombination); @@ -2736,15 +2738,15 @@ static bool DependsOnHashPartitionJob(Job *job) { bool dependsOnHashPartitionJob = false; - List *dependedJobList = job->dependedJobList; + List *dependentJobList = job->dependentJobList; - uint32 dependedJobCount = (uint32) list_length(dependedJobList); - if (dependedJobCount > 0) + uint32 dependentJobCount = (uint32) list_length(dependentJobList); + if (dependentJobCount > 0) { - Job *dependedJob = (Job *) linitial(dependedJobList); - if (CitusIsA(dependedJob, MapMergeJob)) + Job *dependentJob = (Job *) linitial(dependentJobList); + if (CitusIsA(dependentJob, MapMergeJob)) { - MapMergeJob *mapMergeJob = (MapMergeJob *) dependedJob; + MapMergeJob *mapMergeJob = (MapMergeJob *) dependentJob; if (mapMergeJob->partitionType == DUAL_HASH_PARTITION_TYPE) { dependsOnHashPartitionJob = true; @@ -2923,7 +2925,7 @@ AdjustColumnOldAttributes(List *expressionList) */ static List * RangeTableFragmentsList(List *rangeTableList, List *whereClauseList, - List *dependedJobList) + List *dependentJobList) { List *rangeTableFragmentsList = NIL; uint32 rangeTableIndex = 0; @@ -2975,10 +2977,10 @@ RangeTableFragmentsList(List *rangeTableList, List *whereClauseList, List *mergeTaskFragmentList = NIL; ListCell *mergeTaskCell = NULL; - Job *dependedJob = JobForRangeTable(dependedJobList, rangeTableEntry); - Assert(CitusIsA(dependedJob, MapMergeJob)); + Job *dependentJob = JobForRangeTable(dependentJobList, rangeTableEntry); + Assert(CitusIsA(dependentJob, MapMergeJob)); - MapMergeJob *dependedMapMergeJob = (MapMergeJob *) dependedJob; + MapMergeJob *dependedMapMergeJob = (MapMergeJob *) dependentJob; List *mergeTaskList = dependedMapMergeJob->mergeTaskList; /* if there are no tasks for the depended job, just return NIL */ @@ -3328,7 +3330,7 @@ UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval) */ static List * FragmentCombinationList(List *rangeTableFragmentsList, Query *jobQuery, - List *dependedJobList) + List *dependentJobList) { List *fragmentCombinationList = NIL; List *fragmentCombinationQueue = NIL; @@ -3337,7 +3339,7 @@ FragmentCombinationList(List *rangeTableFragmentsList, Query *jobQuery, /* find a sequence that joins the range tables in the list */ JoinSequenceNode *joinSequenceArray = JoinSequenceArray(rangeTableFragmentsList, jobQuery, - dependedJobList); + dependentJobList); /* * We use breadth-first search with pruning to create fragment combinations. @@ -3432,7 +3434,7 @@ FragmentCombinationList(List *rangeTableFragmentsList, Query *jobQuery, * range table list and the id of a preceding table with which it is joined, if any. */ static JoinSequenceNode * -JoinSequenceArray(List *rangeTableFragmentsList, Query *jobQuery, List *dependedJobList) +JoinSequenceArray(List *rangeTableFragmentsList, Query *jobQuery, List *dependentJobList) { List *rangeTableList = jobQuery->rtable; uint32 rangeTableCount = (uint32) list_length(rangeTableList); @@ -3530,9 +3532,9 @@ JoinSequenceArray(List *rangeTableFragmentsList, Query *jobQuery, List *depended } bool leftPartitioned = PartitionedOnColumn(leftColumn, rangeTableList, - dependedJobList); + dependentJobList); bool rightPartitioned = PartitionedOnColumn(rightColumn, rangeTableList, - dependedJobList); + dependentJobList); if (leftPartitioned && rightPartitioned) { /* make sure this join clause references only simple columns */ @@ -3570,7 +3572,7 @@ JoinSequenceArray(List *rangeTableFragmentsList, Query *jobQuery, List *depended * relation is a reference table. */ static bool -PartitionedOnColumn(Var *column, List *rangeTableList, List *dependedJobList) +PartitionedOnColumn(Var *column, List *rangeTableList, List *dependentJobList) { bool partitionedOnColumn = false; Index rangeTableId = column->varno; @@ -3598,7 +3600,7 @@ PartitionedOnColumn(Var *column, List *rangeTableList, List *dependedJobList) } else if (rangeTableType == CITUS_RTE_REMOTE_QUERY) { - Job *job = JobForRangeTable(dependedJobList, rangeTableEntry); + Job *job = JobForRangeTable(dependentJobList, rangeTableEntry); MapMergeJob *mapMergeJob = (MapMergeJob *) job; /* @@ -3857,7 +3859,7 @@ DataFetchTaskList(uint64 jobId, uint32 taskIdIndex, List *fragmentList) /* create merge fetch task and have it depend on the merge task */ Task *mergeFetchTask = CreateBasicTask(jobId, taskIdIndex, MERGE_FETCH_TASK, undefinedQueryString); - mergeFetchTask->dependedTaskList = list_make1(mergeTask); + mergeFetchTask->dependentTaskList = list_make1(mergeTask); dataFetchTaskList = lappend(dataFetchTaskList, mergeFetchTask); taskIdIndex++; @@ -4089,13 +4091,13 @@ PruneSqlTaskDependencies(List *sqlTaskList) foreach(sqlTaskCell, sqlTaskList) { Task *sqlTask = (Task *) lfirst(sqlTaskCell); - List *dependedTaskList = sqlTask->dependedTaskList; - List *prunedDependedTaskList = NIL; + List *dependentTaskList = sqlTask->dependentTaskList; + List *prunedDependendTaskList = NIL; - ListCell *dependedTaskCell = NULL; - foreach(dependedTaskCell, dependedTaskList) + ListCell *dependentTaskCell = NULL; + foreach(dependentTaskCell, dependentTaskList) { - Task *dataFetchTask = (Task *) lfirst(dependedTaskCell); + Task *dataFetchTask = (Task *) lfirst(dependentTaskCell); /* * If we have a merge fetch task, our task assignment algorithm makes @@ -4104,12 +4106,12 @@ PruneSqlTaskDependencies(List *sqlTaskList) */ if (dataFetchTask->taskType == MERGE_FETCH_TASK) { - List *mergeFetchDependencyList = dataFetchTask->dependedTaskList; + List *mergeFetchDependencyList = dataFetchTask->dependentTaskList; Assert(list_length(mergeFetchDependencyList) == 1); Task *mergeTaskReference = (Task *) linitial(mergeFetchDependencyList); - prunedDependedTaskList = lappend(prunedDependedTaskList, - mergeTaskReference); + prunedDependendTaskList = lappend(prunedDependendTaskList, + mergeTaskReference); ereport(DEBUG2, (errmsg("pruning merge fetch taskId %d", dataFetchTask->taskId), @@ -4118,7 +4120,7 @@ PruneSqlTaskDependencies(List *sqlTaskList) } } - sqlTask->dependedTaskList = prunedDependedTaskList; + sqlTask->dependentTaskList = prunedDependendTaskList; } return sqlTaskList; @@ -4469,14 +4471,14 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex) undefinedQueryString); mapOutputFetchTask->partitionId = partitionId; mapOutputFetchTask->upstreamTaskId = mergeTaskId; - mapOutputFetchTask->dependedTaskList = list_make1(mapTask); + mapOutputFetchTask->dependentTaskList = list_make1(mapTask); taskIdIndex++; mapOutputFetchTaskList = lappend(mapOutputFetchTaskList, mapOutputFetchTask); } /* merge task depends on completion of fetch tasks */ - mergeTask->dependedTaskList = mapOutputFetchTaskList; + mergeTask->dependentTaskList = mapOutputFetchTaskList; /* if single repartitioned, each merge task represents an interval */ if (mapMergeJob->partitionType == RANGE_PARTITION_TYPE) @@ -4710,13 +4712,13 @@ HasMergeTaskDependencies(List *sqlTaskList) { bool hasMergeTaskDependencies = false; Task *sqlTask = (Task *) linitial(sqlTaskList); - List *dependedTaskList = sqlTask->dependedTaskList; + List *dependentTaskList = sqlTask->dependentTaskList; - ListCell *dependedTaskCell = NULL; - foreach(dependedTaskCell, dependedTaskList) + ListCell *dependentTaskCell = NULL; + foreach(dependentTaskCell, dependentTaskList) { - Task *dependedTask = (Task *) lfirst(dependedTaskCell); - if (dependedTask->taskType == MERGE_TASK) + Task *dependentTask = (Task *) lfirst(dependentTaskCell); + if (dependentTask->taskType == MERGE_TASK) { hasMergeTaskDependencies = true; break; @@ -5291,15 +5293,15 @@ static List * FindDependedMergeTaskList(Task *sqlTask) { List *dependedMergeTaskList = NIL; - List *dependedTaskList = sqlTask->dependedTaskList; + List *dependentTaskList = sqlTask->dependentTaskList; - ListCell *dependedTaskCell = NULL; - foreach(dependedTaskCell, dependedTaskList) + ListCell *dependentTaskCell = NULL; + foreach(dependentTaskCell, dependentTaskList) { - Task *dependedTask = (Task *) lfirst(dependedTaskCell); - if (dependedTask->taskType == MERGE_TASK) + Task *dependentTask = (Task *) lfirst(dependentTaskCell); + if (dependentTask->taskType == MERGE_TASK) { - dependedMergeTaskList = lappend(dependedMergeTaskList, dependedTask); + dependedMergeTaskList = lappend(dependedMergeTaskList, dependentTask); } } @@ -5398,18 +5400,18 @@ AssignDataFetchDependencies(List *taskList) foreach(taskCell, taskList) { Task *task = (Task *) lfirst(taskCell); - List *dependedTaskList = task->dependedTaskList; - ListCell *dependedTaskCell = NULL; + List *dependentTaskList = task->dependentTaskList; + ListCell *dependentTaskCell = NULL; Assert(task->taskPlacementList != NIL); Assert(task->taskType == SQL_TASK || task->taskType == MERGE_TASK); - foreach(dependedTaskCell, dependedTaskList) + foreach(dependentTaskCell, dependentTaskList) { - Task *dependedTask = (Task *) lfirst(dependedTaskCell); - if (dependedTask->taskType == MAP_OUTPUT_FETCH_TASK) + Task *dependentTask = (Task *) lfirst(dependentTaskCell); + if (dependentTask->taskType == MAP_OUTPUT_FETCH_TASK) { - dependedTask->taskPlacementList = task->taskPlacementList; + dependentTask->taskPlacementList = task->taskPlacementList; } } } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 753e52a5e..9095064b9 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1453,7 +1453,7 @@ CreateJob(Query *query) job->jobId = UniqueJobId(); job->jobQuery = query; job->taskList = NIL; - job->dependedJobList = NIL; + job->dependentJobList = NIL; job->subqueryPushdown = false; job->requiresMasterEvaluation = false; job->deferredPruning = false; @@ -1575,7 +1575,7 @@ CreateTask(TaskType taskType) task->queryString = NULL; task->anchorShardId = INVALID_SHARD_ID; task->taskPlacementList = NIL; - task->dependedTaskList = NIL; + task->dependentTaskList = NIL; task->partitionId = 0; task->upstreamTaskId = INVALID_TASK_ID; diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index e8bd9f0ea..7594bf1af 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -80,7 +80,7 @@ copyJobInfo(Job *newnode, Job *from) COPY_SCALAR_FIELD(jobId); COPY_NODE_FIELD(jobQuery); COPY_NODE_FIELD(taskList); - COPY_NODE_FIELD(dependedJobList); + COPY_NODE_FIELD(dependentJobList); COPY_SCALAR_FIELD(subqueryPushdown); COPY_SCALAR_FIELD(requiresMasterEvaluation); COPY_SCALAR_FIELD(deferredPruning); @@ -251,7 +251,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_STRING_FIELD(queryString); COPY_SCALAR_FIELD(anchorShardId); COPY_NODE_FIELD(taskPlacementList); - COPY_NODE_FIELD(dependedTaskList); + COPY_NODE_FIELD(dependentTaskList); COPY_SCALAR_FIELD(partitionId); COPY_SCALAR_FIELD(upstreamTaskId); COPY_NODE_FIELD(shardInterval); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 726e27753..fe5ebb535 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -323,7 +323,7 @@ OutJobFields(StringInfo str, const Job *node) WRITE_UINT64_FIELD(jobId); WRITE_NODE_FIELD(jobQuery); WRITE_NODE_FIELD(taskList); - WRITE_NODE_FIELD(dependedJobList); + WRITE_NODE_FIELD(dependentJobList); WRITE_BOOL_FIELD(subqueryPushdown); WRITE_BOOL_FIELD(requiresMasterEvaluation); WRITE_BOOL_FIELD(deferredPruning); @@ -467,7 +467,7 @@ OutTask(OUTFUNC_ARGS) WRITE_STRING_FIELD(queryString); WRITE_UINT64_FIELD(anchorShardId); WRITE_NODE_FIELD(taskPlacementList); - WRITE_NODE_FIELD(dependedTaskList); + WRITE_NODE_FIELD(dependentTaskList); WRITE_UINT_FIELD(partitionId); WRITE_UINT_FIELD(upstreamTaskId); WRITE_NODE_FIELD(shardInterval); diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index d3e9ab7dd..5c638a290 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -184,7 +184,7 @@ readJobInfo(Job *local_node) READ_UINT64_FIELD(jobId); READ_NODE_FIELD(jobQuery); READ_NODE_FIELD(taskList); - READ_NODE_FIELD(dependedJobList); + READ_NODE_FIELD(dependentJobList); READ_BOOL_FIELD(subqueryPushdown); READ_BOOL_FIELD(requiresMasterEvaluation); READ_BOOL_FIELD(deferredPruning); @@ -381,7 +381,7 @@ ReadTask(READFUNC_ARGS) READ_STRING_FIELD(queryString); READ_UINT64_FIELD(anchorShardId); READ_NODE_FIELD(taskPlacementList); - READ_NODE_FIELD(dependedTaskList); + READ_NODE_FIELD(dependentTaskList); READ_UINT_FIELD(partitionId); READ_UINT_FIELD(upstreamTaskId); READ_NODE_FIELD(shardInterval); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index beb9a6320..6a2e7e763 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -132,7 +132,7 @@ typedef struct Job uint64 jobId; Query *jobQuery; List *taskList; - List *dependedJobList; + List *dependentJobList; bool subqueryPushdown; bool requiresMasterEvaluation; /* only applies to modify jobs */ bool deferredPruning; @@ -185,7 +185,7 @@ typedef struct Task char *queryString; uint64 anchorShardId; /* only applies to compute tasks */ List *taskPlacementList; /* only applies to compute tasks */ - List *dependedTaskList; /* only applies to compute tasks */ + List *dependentTaskList; /* only applies to compute tasks */ uint32 partitionId; uint32 upstreamTaskId; /* only applies to data fetch tasks */