mirror of https://github.com/citusdata/citus.git
Replace workerNodeCount -> nodeCount
parent
eb5be579e3
commit
ff82e85ea2
|
@ -158,6 +158,7 @@ distributed_planner(Query *parse,
|
||||||
}
|
}
|
||||||
|
|
||||||
int rteIdCounter = 1;
|
int rteIdCounter = 1;
|
||||||
|
|
||||||
DistributedPlanningContext planContext = {
|
DistributedPlanningContext planContext = {
|
||||||
.query = parse,
|
.query = parse,
|
||||||
.cursorOptions = cursorOptions,
|
.cursorOptions = cursorOptions,
|
||||||
|
|
|
@ -32,7 +32,7 @@ bool LogIntermediateResults = false;
|
||||||
static List * FindSubPlansUsedInNode(Node *node, SubPlanAccessType accessType);
|
static List * FindSubPlansUsedInNode(Node *node, SubPlanAccessType accessType);
|
||||||
static void AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry,
|
static void AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry,
|
||||||
DistributedPlan *distributedPlan,
|
DistributedPlan *distributedPlan,
|
||||||
int workerNodeCount);
|
int nodeCount);
|
||||||
static void AppendAllWorkerNodes(IntermediateResultsHashEntry *entry);
|
static void AppendAllWorkerNodes(IntermediateResultsHashEntry *entry);
|
||||||
static List * FindAllRemoteWorkerNodesUsingSubplan(IntermediateResultsHashEntry *entry);
|
static List * FindAllRemoteWorkerNodesUsingSubplan(IntermediateResultsHashEntry *entry);
|
||||||
static List * RemoveLocalNodeFromWorkerList(List *workerNodeList);
|
static List * RemoveLocalNodeFromWorkerList(List *workerNodeList);
|
||||||
|
@ -154,7 +154,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
|
||||||
List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList;
|
List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList;
|
||||||
List *subPlanList = distributedPlan->subPlanList;
|
List *subPlanList = distributedPlan->subPlanList;
|
||||||
ListCell *subPlanCell = NULL;
|
ListCell *subPlanCell = NULL;
|
||||||
int workerNodeCount = list_length(ActiveReadableNodeList());
|
int nodeCount = list_length(ActiveReadableNodeList());
|
||||||
|
|
||||||
foreach(subPlanCell, usedSubPlanNodeList)
|
foreach(subPlanCell, usedSubPlanNodeList)
|
||||||
{
|
{
|
||||||
|
@ -170,7 +170,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
|
||||||
* will be written to a local file and sent to all nodes. Note that the
|
* will be written to a local file and sent to all nodes. Note that the
|
||||||
* remaining subplans in the distributed plan should still be traversed.
|
* remaining subplans in the distributed plan should still be traversed.
|
||||||
*/
|
*/
|
||||||
if (list_length(entry->nodeIdList) == workerNodeCount && entry->writeLocalFile)
|
if (list_length(entry->nodeIdList) == nodeCount && entry->writeLocalFile)
|
||||||
{
|
{
|
||||||
elog(DEBUG4, "Subplan %s is used in all workers", resultId);
|
elog(DEBUG4, "Subplan %s is used in all workers", resultId);
|
||||||
continue;
|
continue;
|
||||||
|
@ -190,7 +190,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
|
||||||
* workers will be in the node list. We can improve intermediate result
|
* workers will be in the node list. We can improve intermediate result
|
||||||
* pruning by deciding which reference table shard will be accessed earlier.
|
* pruning by deciding which reference table shard will be accessed earlier.
|
||||||
*/
|
*/
|
||||||
AppendAllAccessedWorkerNodes(entry, distributedPlan, workerNodeCount);
|
AppendAllAccessedWorkerNodes(entry, distributedPlan, nodeCount);
|
||||||
|
|
||||||
elog(DEBUG4, "Subplan %s is used in %lu", resultId, distributedPlan->planId);
|
elog(DEBUG4, "Subplan %s is used in %lu", resultId, distributedPlan->planId);
|
||||||
}
|
}
|
||||||
|
@ -231,7 +231,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
|
||||||
static void
|
static void
|
||||||
AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry,
|
AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry,
|
||||||
DistributedPlan *distributedPlan,
|
DistributedPlan *distributedPlan,
|
||||||
int workerNodeCount)
|
int nodeCount)
|
||||||
{
|
{
|
||||||
List *taskList = distributedPlan->workerJob->taskList;
|
List *taskList = distributedPlan->workerJob->taskList;
|
||||||
ListCell *taskCell = NULL;
|
ListCell *taskCell = NULL;
|
||||||
|
@ -254,7 +254,7 @@ AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry,
|
||||||
list_append_unique_int(entry->nodeIdList, placement->nodeId);
|
list_append_unique_int(entry->nodeIdList, placement->nodeId);
|
||||||
|
|
||||||
/* early return if all the workers are accessed */
|
/* early return if all the workers are accessed */
|
||||||
if (list_length(entry->nodeIdList) == workerNodeCount &&
|
if (list_length(entry->nodeIdList) == nodeCount &&
|
||||||
entry->writeLocalFile)
|
entry->writeLocalFile)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
|
|
Loading…
Reference in New Issue