WorkerGetLiveNodeCount (rename s/Node/Group/) only counts primaries

pull/1235/head
Brian Cloutier 2017-02-17 14:40:51 +03:00
parent e39ac1821f
commit 8f94bd3d14
5 changed files with 21 additions and 8 deletions

View File

@ -398,7 +398,7 @@ master_get_local_first_candidate_nodes(PG_FUNCTION_ARGS)
functionContext->max_calls = ShardReplicationFactor; functionContext->max_calls = ShardReplicationFactor;
/* if enough live nodes, return an extra candidate node as backup */ /* if enough live nodes, return an extra candidate node as backup */
liveNodeCount = WorkerGetLiveNodeCount(); liveNodeCount = WorkerGetLiveGroupCount();
if (liveNodeCount > ShardReplicationFactor) if (liveNodeCount > ShardReplicationFactor)
{ {
functionContext->max_calls = ShardReplicationFactor + 1; functionContext->max_calls = ShardReplicationFactor + 1;

View File

@ -140,7 +140,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
/* if enough live nodes, add an extra candidate node as backup */ /* if enough live nodes, add an extra candidate node as backup */
attemptableNodeCount = ShardReplicationFactor; attemptableNodeCount = ShardReplicationFactor;
liveNodeCount = WorkerGetLiveNodeCount(); liveNodeCount = WorkerGetLiveGroupCount();
if (liveNodeCount > ShardReplicationFactor) if (liveNodeCount > ShardReplicationFactor)
{ {
attemptableNodeCount = ShardReplicationFactor + 1; attemptableNodeCount = ShardReplicationFactor + 1;

View File

@ -299,13 +299,26 @@ WorkerGetNodeWithName(const char *hostname)
/* /*
* WorkerGetLiveNodeCount returns the number of live nodes in the cluster. * WorkerGetLiveGroupCount returns the number of groups which have a primary capable of
* */ * accepting writes.
*/
uint32 uint32
WorkerGetLiveNodeCount(void) WorkerGetLiveGroupCount(void)
{ {
HTAB *workerNodeHash = GetWorkerNodeHash(); HTAB *workerNodeHash = GetWorkerNodeHash();
uint32 liveWorkerCount = hash_get_num_entries(workerNodeHash); uint32 liveWorkerCount = 0;
HASH_SEQ_STATUS status;
WorkerNode *workerNode = NULL;
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
{
if (workerNode->nodeRole == NODE_ROLE_PRIMARY)
{
liveWorkerCount++;
}
}
return liveWorkerCount; return liveWorkerCount;
} }

View File

@ -1834,7 +1834,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey,
static uint32 static uint32
HashPartitionCount(void) HashPartitionCount(void)
{ {
uint32 nodeCount = WorkerGetLiveNodeCount(); uint32 nodeCount = WorkerGetLiveGroupCount();
double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0; double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0;
uint32 partitionCount = (uint32) rint(nodeCount * maxReduceTasksPerNode); uint32 partitionCount = (uint32) rint(nodeCount * maxReduceTasksPerNode);

View File

@ -58,7 +58,7 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList,
uint64 shardId, uint64 shardId,
uint32 placementIndex); uint32 placementIndex);
extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList);
extern uint32 WorkerGetLiveNodeCount(void); extern uint32 WorkerGetLiveGroupCount(void);
extern List * WorkerNodeList(void); extern List * WorkerNodeList(void);
extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort);
extern List * ReadWorkerNodes(void); extern List * ReadWorkerNodes(void);