WorkerGetRandomCandidateNode only returns primary nodes

pull/1235/head
Brian Cloutier 2017-02-16 19:09:15 +03:00
parent b081fb0efa
commit 8260fcddbd
1 changed files with 44 additions and 63 deletions

View File

@ -38,11 +38,12 @@ int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size *
/* Local functions forward declarations */ /* Local functions forward declarations */
static char * ClientHostAddress(StringInfo remoteHostStringInfo); static char * ClientHostAddress(StringInfo remoteHostStringInfo);
static WorkerNode * FindRandomNodeNotInList(HTAB *WorkerNodesHash, static WorkerNode * FindRandomNodeFromList(List *workerNodeList);
List *currentNodeList);
static bool OddNumber(uint32 number); static bool OddNumber(uint32 number);
static bool ListMember(List *currentList, WorkerNode *workerNode); static bool ListMember(List *currentList, WorkerNode *workerNode);
static List * PrimaryNodesNotInList(List *currentList);
/* ------------------------------------------------------------ /* ------------------------------------------------------------
* Worker node selection functions follow * Worker node selection functions follow
@ -65,16 +66,15 @@ WorkerGetRandomCandidateNode(List *currentNodeList)
uint32 tryCount = WORKER_RACK_TRIES; uint32 tryCount = WORKER_RACK_TRIES;
uint32 tryIndex = 0; uint32 tryIndex = 0;
HTAB *workerNodeHash = GetWorkerNodeHash(); uint32 currentNodeCount = list_length(currentNodeList);
List *candidateWorkerNodeList = PrimaryNodesNotInList(currentNodeList);
/* /*
* We check if the shard has already been placed on all nodes known to us. * We check if the shard has already been placed on all nodes known to us.
* This check is rather defensive, and has the drawback of performing a full * This check is rather defensive, and has the drawback of performing a full
* scan over the worker node hash for determining the number of live nodes. * scan over the worker node hash.
*/ */
uint32 currentNodeCount = list_length(currentNodeList); if (list_length(candidateWorkerNodeList) == 0)
uint32 liveNodeCount = WorkerGetLiveNodeCount();
if (currentNodeCount >= liveNodeCount)
{ {
return NULL; return NULL;
} }
@ -82,7 +82,7 @@ WorkerGetRandomCandidateNode(List *currentNodeList)
/* if current node list is empty, randomly pick one node and return */ /* if current node list is empty, randomly pick one node and return */
if (currentNodeCount == 0) if (currentNodeCount == 0)
{ {
workerNode = FindRandomNodeNotInList(workerNodeHash, NIL); workerNode = FindRandomNodeFromList(candidateWorkerNodeList);
return workerNode; return workerNode;
} }
@ -112,7 +112,7 @@ WorkerGetRandomCandidateNode(List *currentNodeList)
char *workerRack = NULL; char *workerRack = NULL;
bool sameRack = false; bool sameRack = false;
workerNode = FindRandomNodeNotInList(workerNodeHash, currentNodeList); workerNode = FindRandomNodeFromList(candidateWorkerNodeList);
workerRack = workerNode->workerRack; workerRack = workerNode->workerRack;
sameRack = (strncmp(workerRack, firstRack, WORKER_LENGTH) == 0); sameRack = (strncmp(workerRack, firstRack, WORKER_LENGTH) == 0);
@ -328,13 +328,14 @@ WorkerNodeList(void)
while ((workerNode = hash_seq_search(&status)) != NULL) while ((workerNode = hash_seq_search(&status)) != NULL)
{ {
WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode)); WorkerNode *workerNodeCopy;
if (workerNode->nodeRole != NODE_ROLE_PRIMARY) if (workerNode->nodeRole != NODE_ROLE_PRIMARY)
{ {
continue; continue;
} }
workerNodeCopy = palloc0(sizeof(WorkerNode));
memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode)); memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode));
workerNodeList = lappend(workerNodeList, workerNodeCopy); workerNodeList = lappend(workerNodeList, workerNodeCopy);
} }
@ -344,70 +345,50 @@ WorkerNodeList(void)
/* /*
* FindRandomNodeNotInList finds a random node from the shared hash that is not * WorkerNodesNotInList scans through the worker node hash and returns a list
* a member of the current node list. The caller is responsible for making the * of all primary nodes which are not in currentList.
* necessary node count checks to ensure that such a node exists.
*
* Note that this function has a selection bias towards nodes whose positions in
* the shared hash are sequentially adjacent to the positions of nodes that are
* in the current node list. This bias follows from our decision to first pick a
* random node in the hash, and if that node is a member of the current list, to
* simply iterate to the next node in the hash. Overall, this approach trades in
* some selection bias for simplicity in design and for bounded execution time.
*/ */
static WorkerNode * static List *
FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList) PrimaryNodesNotInList(List *currentList)
{ {
List *workerNodeList = NIL;
HTAB *workerNodeHash = GetWorkerNodeHash();
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
uint32 workerNodeCount = 0;
uint32 currentNodeCount PG_USED_FOR_ASSERTS_ONLY = 0;
bool lookForWorkerNode = true;
uint32 workerPosition = 0;
uint32 workerIndex = 0;
workerNodeCount = hash_get_num_entries(WorkerNodesHash); hash_seq_init(&status, workerNodeHash);
currentNodeCount = list_length(currentNodeList);
Assert(workerNodeCount > currentNodeCount);
/* /* this is O(n*m) but there usually aren't many nodes in currentList */
* We determine a random position within the worker hash between [1, N], while ((workerNode = hash_seq_search(&status)) != NULL)
* assuming that the number of elements in the hash is N. We then get to
* this random position by iterating over the worker hash. Please note that
* the random seed has already been set by the postmaster when starting up.
*/
workerPosition = (random() % workerNodeCount) + 1;
hash_seq_init(&status, WorkerNodesHash);
for (workerIndex = 0; workerIndex < workerPosition; workerIndex++)
{ {
workerNode = (WorkerNode *) hash_seq_search(&status); WorkerNode *workerNodeCopy;
if ((workerNode->nodeRole != NODE_ROLE_PRIMARY) ||
ListMember(currentList, workerNode))
{
continue;
} }
while (lookForWorkerNode) workerNodeCopy = palloc0(sizeof(WorkerNode));
{ memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode));
bool listMember = ListMember(currentNodeList, workerNode); workerNodeList = lappend(workerNodeList, workerNodeCopy);
if (!listMember)
{
lookForWorkerNode = false;
}
else
{
/* iterate to the next worker node in the hash */
workerNode = (WorkerNode *) hash_seq_search(&status);
/* reached end of hash; start from the beginning */
if (workerNode == NULL)
{
hash_seq_init(&status, WorkerNodesHash);
workerNode = (WorkerNode *) hash_seq_search(&status);
}
}
} }
/* we stopped scanning before completion; therefore clean up scan */ return workerNodeList;
hash_seq_term(&status); }
/* FindRandomNodeFromList picks a random node from the list provided to it */
static WorkerNode *
FindRandomNodeFromList(List *candidateWorkerNodeList)
{
uint32 candidateNodeCount = list_length(candidateWorkerNodeList);
/* nb, the random seed has already been set by the postmaster when starting up. */
uint32 workerPosition = (random() % candidateNodeCount);
WorkerNode *workerNode = (WorkerNode *) list_nth(candidateWorkerNodeList,
workerPosition);
return workerNode; return workerNode;
} }