From 8f94bd3d14e112f84b0ad581a33b4c82b7669a6a Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Fri, 17 Feb 2017 14:40:51 +0300 Subject: [PATCH] WorkerGetLiveNodeCount (rename s/Node/Group/) only counts primaries --- .../distributed/master/master_node_protocol.c | 2 +- .../master/master_stage_protocol.c | 2 +- .../distributed/master/worker_node_manager.c | 21 +++++++++++++++---- .../planner/multi_physical_planner.c | 2 +- src/include/distributed/worker_manager.h | 2 +- 5 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 1b3623e70..020a35193 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -398,7 +398,7 @@ master_get_local_first_candidate_nodes(PG_FUNCTION_ARGS) functionContext->max_calls = ShardReplicationFactor; /* if enough live nodes, return an extra candidate node as backup */ - liveNodeCount = WorkerGetLiveNodeCount(); + liveNodeCount = WorkerGetLiveGroupCount(); if (liveNodeCount > ShardReplicationFactor) { functionContext->max_calls = ShardReplicationFactor + 1; diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 5deb6832f..6c07a76e1 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -140,7 +140,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) /* if enough live nodes, add an extra candidate node as backup */ attemptableNodeCount = ShardReplicationFactor; - liveNodeCount = WorkerGetLiveNodeCount(); + liveNodeCount = WorkerGetLiveGroupCount(); if (liveNodeCount > ShardReplicationFactor) { attemptableNodeCount = ShardReplicationFactor + 1; diff --git a/src/backend/distributed/master/worker_node_manager.c b/src/backend/distributed/master/worker_node_manager.c index 42f0a38a8..c4294a07a 100644 --- a/src/backend/distributed/master/worker_node_manager.c +++ b/src/backend/distributed/master/worker_node_manager.c @@ -299,13 +299,26 @@ WorkerGetNodeWithName(const char *hostname) /* - * WorkerGetLiveNodeCount returns the number of live nodes in the cluster. - * */ + * WorkerGetLiveGroupCount returns the number of groups which have a primary capable of + * accepting writes. + */ uint32 -WorkerGetLiveNodeCount(void) +WorkerGetLiveGroupCount(void) { HTAB *workerNodeHash = GetWorkerNodeHash(); - uint32 liveWorkerCount = hash_get_num_entries(workerNodeHash); + uint32 liveWorkerCount = 0; + HASH_SEQ_STATUS status; + WorkerNode *workerNode = NULL; + + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) + { + if (workerNode->nodeRole == NODE_ROLE_PRIMARY) + { + liveWorkerCount++; + } + } return liveWorkerCount; } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 22ee7aa60..d6340172f 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -1834,7 +1834,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey, static uint32 HashPartitionCount(void) { - uint32 nodeCount = WorkerGetLiveNodeCount(); + uint32 nodeCount = WorkerGetLiveGroupCount(); double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0; uint32 partitionCount = (uint32) rint(nodeCount * maxReduceTasksPerNode); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 2d1adf25e..122a3d22d 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -58,7 +58,7 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId, uint32 placementIndex); extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); -extern uint32 WorkerGetLiveNodeCount(void); +extern uint32 WorkerGetLiveGroupCount(void); extern List * WorkerNodeList(void); extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort); extern List * ReadWorkerNodes(void);