Change the workernode cache from a hash to a list

pull/801/head
Brian Cloutier 2016-09-23 20:39:12 +03:00
parent cd6a1b8d58
commit fe636f58cd
9 changed files with 147 additions and 195 deletions

View File

@ -36,7 +36,7 @@ int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size *
/* Local functions forward declarations */ /* Local functions forward declarations */
static char * ClientHostAddress(StringInfo remoteHostStringInfo); static char * ClientHostAddress(StringInfo remoteHostStringInfo);
static WorkerNode * FindRandomNodeNotInList(HTAB *WorkerNodesHash, static WorkerNode * FindRandomNodeNotInList(List *workerNodesList,
List *currentNodeList); List *currentNodeList);
static bool ListMember(List *currentList, WorkerNode *workerNode); static bool ListMember(List *currentList, WorkerNode *workerNode);
static int WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize); static int WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize);
@ -60,22 +60,26 @@ WorkerNode *
WorkerGetRandomCandidateNode(List *currentNodeList) WorkerGetRandomCandidateNode(List *currentNodeList)
{ {
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
HTAB *workerNodeHash = GetWorkerNodeHash(); WorkerNode *resultWorkerNode = NULL;
List *workerNodeList = GetWorkerNodeList();
/* /*
* We check if the shard has already been placed on all nodes known to us. * We check if the shard has already been placed on all nodes known to us.
* This check is rather defensive, and has the drawback of performing a full
* scan over the worker node hash for determining the number of live nodes.
*/ */
uint32 currentNodeCount = list_length(currentNodeList); uint32 currentNodeCount = list_length(currentNodeList);
uint32 liveNodeCount = WorkerGetLiveNodeCount(); uint32 liveNodeCount = list_length(workerNodeList);
if (currentNodeCount >= liveNodeCount) if (currentNodeCount >= liveNodeCount)
{ {
return NULL; return NULL;
} }
workerNode = FindRandomNodeNotInList(workerNodeHash, currentNodeList); workerNode = FindRandomNodeNotInList(workerNodeList, currentNodeList);
return workerNode;
resultWorkerNode = palloc(sizeof(WorkerNode));
memcpy(resultWorkerNode, workerNode, sizeof(WorkerNode));
list_free_deep(workerNodeList);
return resultWorkerNode;
} }
@ -229,24 +233,25 @@ ClientHostAddress(StringInfo clientHostStringInfo)
WorkerNode * WorkerNode *
WorkerGetNodeWithName(const char *hostname) WorkerGetNodeWithName(const char *hostname)
{ {
WorkerNode *workerNode = NULL; WorkerNode *resultWorkerNode = NULL;
HASH_SEQ_STATUS status; ListCell *workerNodeCell = NULL;
HTAB *workerNodeHash = GetWorkerNodeHash(); List *workerNodeList = GetWorkerNodeList();
hash_seq_init(&status, workerNodeHash); foreach(workerNodeCell, workerNodeList)
while ((workerNode = hash_seq_search(&status)) != NULL)
{ {
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH); int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH);
if (nameCompare == 0) if (nameCompare == 0)
{ {
/* we need to terminate the scan since we break */ resultWorkerNode = palloc(sizeof(WorkerNode));
hash_seq_term(&status); memcpy(resultWorkerNode, workerNode, sizeof(WorkerNode));
break; break;
} }
} }
return workerNode; list_free_deep(workerNodeList);
return resultWorkerNode;
} }
@ -254,41 +259,29 @@ WorkerGetNodeWithName(const char *hostname)
uint32 uint32
WorkerGetLiveNodeCount(void) WorkerGetLiveNodeCount(void)
{ {
WorkerNode *workerNode = NULL;
uint32 liveWorkerCount = 0; uint32 liveWorkerCount = 0;
HASH_SEQ_STATUS status; List *workerNodeList = GetWorkerNodeList();
HTAB *workerNodeHash = GetWorkerNodeHash();
hash_seq_init(&status, workerNodeHash); /* todo: this is such a waste of resources. we create a whole new list just to count
* it and immediately free it */
while ((workerNode = hash_seq_search(&status)) != NULL) liveWorkerCount = list_length(workerNodeList);
{
liveWorkerCount++;
}
list_free_deep(workerNodeList);
return liveWorkerCount; return liveWorkerCount;
} }
/* /*
* WorkerNodeList iterates over the hash table that includes the worker nodes, and adds * WorkerNodeList returns a list of worker nodes, don't forget to free it after you've
* them to a list which is returned. * used it!
*
* todo: add free to all call sites?
*/ */
List * List *
WorkerNodeList(void) WorkerNodeList(void)
{ {
List *workerNodeList = NIL; List *workerNodeList = GetWorkerNodeList();
WorkerNode *workerNode = NULL;
HTAB *workerNodeHash = GetWorkerNodeHash();
HASH_SEQ_STATUS status;
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
{
workerNodeList = lappend(workerNodeList, workerNode);
}
return workerNodeList; return workerNodeList;
} }
@ -301,24 +294,24 @@ WorkerNodeList(void)
bool bool
WorkerNodeActive(const char *nodeName, uint32 nodePort) WorkerNodeActive(const char *nodeName, uint32 nodePort)
{ {
WorkerNode *workerNode = NULL; bool nodeActive = false;
HASH_SEQ_STATUS status; ListCell *workerNodeCell = NULL;
HTAB *workerNodeHash = GetWorkerNodeHash(); List *workerNodeList = GetWorkerNodeList();
hash_seq_init(&status, workerNodeHash); foreach(workerNodeCell, workerNodeList)
while ((workerNode = hash_seq_search(&status)) != NULL)
{ {
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
if (strncmp(workerNode->workerName, nodeName, WORKER_LENGTH) == 0 && if (strncmp(workerNode->workerName, nodeName, WORKER_LENGTH) == 0 &&
workerNode->workerPort == nodePort) workerNode->workerPort == nodePort)
{ {
/* we need to terminate the scan since we break */ nodeActive = true;
hash_seq_term(&status);
break; break;
} }
} }
return workerNode != NULL; list_free_deep(workerNodeList);
return nodeActive;
} }
@ -327,68 +320,47 @@ WorkerNodeActive(const char *nodeName, uint32 nodePort)
* a member of the current node list. The caller is responsible for making the * a member of the current node list. The caller is responsible for making the
* necessary node count checks to ensure that such a node exists. * necessary node count checks to ensure that such a node exists.
* *
* Note that this function has a selection bias towards nodes whose positions in * This function is O(workerNodesList * currentNodeList). This is unfortunate but was
* the shared hash are sequentially adjacent to the positions of nodes that are * easy to implement.
* in the current node list. This bias follows from our decision to first pick a
* random node in the hash, and if that node is a member of the current list, to
* simply iterate to the next node in the hash. Overall, this approach trades in
* some selection bias for simplicity in design and for bounded execution time.
*/ */
static WorkerNode * static WorkerNode *
FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList) FindRandomNodeNotInList(List *workerNodesList, List *currentNodeList)
{ {
WorkerNode *workerNode = NULL; ListCell *workerNodeCell;
HASH_SEQ_STATUS status;
uint32 workerNodeCount = 0;
uint32 currentNodeCount PG_USED_FOR_ASSERTS_ONLY = 0;
bool lookForWorkerNode = true;
uint32 workerPosition = 0; uint32 workerPosition = 0;
uint32 workerIndex = 0; uint32 workerIndex = 0;
workerNodeCount = hash_get_num_entries(WorkerNodesHash); uint32 workerNodeCount = list_length(workerNodesList);
currentNodeCount = list_length(currentNodeList); uint32 currentNodeCount = list_length(currentNodeList);
Assert(workerNodeCount > currentNodeCount); Assert(workerNodeCount > currentNodeCount);
/* /*
* We determine a random position within the worker hash between [1, N], * Please note that the random seed has already been set by the postmaster when
* assuming that the number of elements in the hash is N. We then get to * starting up.
* this random position by iterating over the worker hash. Please note that
* the random seed has already been set by the postmaster when starting up.
*/ */
workerPosition = (random() % workerNodeCount) + 1; workerPosition = random() % (workerNodeCount - currentNodeCount);
hash_seq_init(&status, WorkerNodesHash);
for (workerIndex = 0; workerIndex < workerPosition; workerIndex++) foreach(workerNodeCell, workerNodesList)
{
workerNode = (WorkerNode *) hash_seq_search(&status);
}
while (lookForWorkerNode)
{ {
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
bool listMember = ListMember(currentNodeList, workerNode); bool listMember = ListMember(currentNodeList, workerNode);
if (!listMember) if (listMember)
{ {
lookForWorkerNode = false; continue;
} }
else
{
/* iterate to the next worker node in the hash */
workerNode = (WorkerNode *) hash_seq_search(&status);
/* reached end of hash; start from the beginning */ if (workerIndex == workerPosition)
if (workerNode == NULL) {
{ return workerNode;
hash_seq_init(&status, WorkerNodesHash);
workerNode = (WorkerNode *) hash_seq_search(&status);
}
} }
workerIndex++;
} }
/* we stopped scanning before completion; therefore clean up scan */ return NULL;
hash_seq_term(&status);
return workerNode;
} }

View File

@ -62,8 +62,8 @@ static Oid extraDataContainerFuncId = InvalidOid;
/* Hash table for informations about each partition */ /* Hash table for informations about each partition */
static HTAB *DistTableCacheHash = NULL; static HTAB *DistTableCacheHash = NULL;
/* Hash table for informations about worker nodes */ /* cached list of active workers */
static HTAB *WorkerNodeHash = NULL; static List *CachedNodeList = NULL;
/* built first time through in InitializePartitionCache */ /* built first time through in InitializePartitionCache */
static ScanKeyData DistPartitionScanKey[1]; static ScanKeyData DistPartitionScanKey[1];
@ -967,19 +967,33 @@ InitializeDistTableCache(void)
/* /*
* GetWorkerNodeHash is a wrapper around InitializeWorkerNodeCache(). It * GetWorkerNodeList is a wrapper around InitializeWorkerNodeCache(). It
* triggers InitializeWorkerNodeCache when the workerHash is NULL. Otherwise, * triggers InitializeWorkerNodeCache when the workerHash is NULL. Otherwise,
* it returns the hash. * it returns the hash.
*/ */
HTAB * List *
GetWorkerNodeHash(void) GetWorkerNodeList(void)
{ {
if (WorkerNodeHash == NULL) List *resultList = NULL;
ListCell *workerNodeCell;
if (CachedNodeList == NULL)
{ {
InitializeWorkerNodeCache(); InitializeWorkerNodeCache();
} }
return WorkerNodeHash; /* deep copy the list so invalidations, which free the list, don't cause crashes */
foreach(workerNodeCell, CachedNodeList)
{
WorkerNode *oldNode = (WorkerNode *) lfirst(workerNodeCell);
WorkerNode *newNode = palloc(sizeof(WorkerNode));
memcpy(newNode, oldNode, sizeof(WorkerNode));
resultList = lappend(resultList, newNode);
}
return resultList;
} }
@ -991,12 +1005,10 @@ GetWorkerNodeHash(void)
static void static void
InitializeWorkerNodeCache(void) InitializeWorkerNodeCache(void)
{ {
MemoryContext oldContext;
static bool invalidationRegistered = false; static bool invalidationRegistered = false;
List *workerNodeList = NIL; List *workerList = NULL;
ListCell *workerNodeCell = NULL; ListCell *workerListCell;
HASHCTL info;
int hashFlags = 0;
long maxTableSize = (long) MaxWorkerNodesTracked;
/* make sure we've initialized CacheMemoryContext */ /* make sure we've initialized CacheMemoryContext */
if (CacheMemoryContext == NULL) if (CacheMemoryContext == NULL)
@ -1004,58 +1016,24 @@ InitializeWorkerNodeCache(void)
CreateCacheMemoryContext(); CreateCacheMemoryContext();
} }
/* Assert(CachedNodeList == NULL);
* Create the hash that holds the worker nodes. The key is the unique nodeid
* field.
*/
memset(&info, 0, sizeof(info));
info.keysize = sizeof(uint32);
info.entrysize = sizeof(WorkerNode);
info.hcxt = CacheMemoryContext;
info.hash = tag_hash;
hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT;
WorkerNodeHash = hash_create("Worker Node Hash",
maxTableSize,
&info, hashFlags);
/* read the list from the pg_dist_node */ /* read the list from the pg_dist_node */
workerNodeList = ReadWorkerNodes(); workerList = ReadWorkerNodes();
/* iterate over the worker node list */ oldContext = MemoryContextSwitchTo(CacheMemoryContext);
foreach(workerNodeCell, workerNodeList)
foreach(workerListCell, workerList)
{ {
WorkerNode *workerNode = NULL; WorkerNode *workerNode = (WorkerNode *) lfirst(workerListCell);
WorkerNode *currentNode = lfirst(workerNodeCell); WorkerNode *newWorkerNode = palloc(sizeof(WorkerNode));
void *hashKey = NULL;
bool handleFound = false;
/* memcpy(newWorkerNode, workerNode, sizeof(WorkerNode));
* Search for the worker node in the hash, and then insert the CachedNodeList = lappend(CachedNodeList, newWorkerNode);
* values. When searching, we make the hashKey the unique nodeid.
*/
hashKey = (void *) &currentNode->nodeId;
workerNode = (WorkerNode *) hash_search(WorkerNodeHash, hashKey,
HASH_ENTER, &handleFound);
/* fill the newly allocated workerNode in the cache */
strlcpy(workerNode->workerName, currentNode->workerName, WORKER_LENGTH);
workerNode->workerPort = currentNode->workerPort;
workerNode->groupId = currentNode->groupId;
if (handleFound)
{
ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"",
workerNode->workerName,
workerNode->workerPort)));
}
workerNode->workerPort = currentNode->workerPort;
/* we do not need the currentNode anymore */
pfree(currentNode);
} }
MemoryContextSwitchTo(oldContext);
/* prevent multiple invalidation registrations */ /* prevent multiple invalidation registrations */
if (!invalidationRegistered) if (!invalidationRegistered)
{ {
@ -1181,18 +1159,18 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
/* /*
* InvalidateNodeRelationCacheCallback destroys the WorkerNodeHash when * InvalidateNodeRelationCacheCallback destroys the CachedNodeList when
* any change happens on pg_dist_node table. It also set WorkerNodeHash to * any change happens on pg_dist_node table. It also set CachedNodeList to
* NULL, which allows consequent accesses to the hash read from the * NULL, which allows consequent accesses to the hash read from the
* pg_dist_node from scratch. * pg_dist_node from scratch.
*/ */
static void static void
InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId) InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId)
{ {
if (WorkerNodeHash != NULL && relationId == DistNodeRelationId()) if (CachedNodeList != NULL && relationId == DistNodeRelationId())
{ {
hash_destroy(WorkerNodeHash); list_free_deep(CachedNodeList);
WorkerNodeHash = NULL; CachedNodeList = NULL;
} }
} }

View File

@ -215,24 +215,26 @@ GenerateNodeTuple(WorkerNode *workerNode)
static WorkerNode * static WorkerNode *
FindWorkerNode(char *nodeName, int32 nodePort) FindWorkerNode(char *nodeName, int32 nodePort)
{ {
WorkerNode *workerNode = NULL; ListCell *workerNodeCell = NULL;
HTAB *workerNodeHash = GetWorkerNodeHash();
HASH_SEQ_STATUS status;
hash_seq_init(&status, workerNodeHash); List *workerNodeList = GetWorkerNodeList();
foreach(workerNodeCell, workerNodeList)
while ((workerNode = hash_seq_search(&status)) != NULL)
{ {
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
if (strncasecmp(nodeName, workerNode->workerName, WORKER_LENGTH) == 0 && if (strncasecmp(nodeName, workerNode->workerName, WORKER_LENGTH) == 0 &&
nodePort == workerNode->workerPort) nodePort == workerNode->workerPort)
{ {
/* we need to terminate the scan since we break */ WorkerNode *resultWorkerNode = palloc(sizeof(WorkerNode));
hash_seq_term(&status); memcpy(resultWorkerNode, workerNode, sizeof(WorkerNode));
break;
list_free_deep(workerNodeList);
return resultWorkerNode;
} }
} }
return workerNode; list_free_deep(workerNodeList);
return NULL;
} }
@ -271,14 +273,12 @@ static uint32
GetMaxGroupId() GetMaxGroupId()
{ {
uint32 maxGroupId = 0; uint32 maxGroupId = 0;
WorkerNode *workerNode = NULL; ListCell *workerNodeCell = NULL;
HTAB *workerNodeHash = GetWorkerNodeHash();
HASH_SEQ_STATUS status;
hash_seq_init(&status, workerNodeHash); List *workerNodeList = GetWorkerNodeList();
foreach(workerNodeCell, workerNodeList)
while ((workerNode = hash_seq_search(&status)) != NULL)
{ {
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
uint32 workerNodeGroupId = workerNode->groupId; uint32 workerNodeGroupId = workerNode->groupId;
if (workerNodeGroupId > maxGroupId) if (workerNodeGroupId > maxGroupId)
@ -287,6 +287,7 @@ GetMaxGroupId()
} }
} }
list_free_deep(workerNodeList);
return maxGroupId; return maxGroupId;
} }
@ -299,14 +300,12 @@ static uint64
GetNodeCountInGroup(uint32 groupId) GetNodeCountInGroup(uint32 groupId)
{ {
uint64 elementCountInGroup = 0; uint64 elementCountInGroup = 0;
WorkerNode *workerNode = NULL; ListCell *workerNodeCell;
HTAB *workerNodeHash = GetWorkerNodeHash(); List *workerNodeList = GetWorkerNodeList();
HASH_SEQ_STATUS status;
hash_seq_init(&status, workerNodeHash); foreach(workerNodeCell, workerNodeList)
while ((workerNode = hash_seq_search(&status)) != NULL)
{ {
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
uint32 workerNodeGroupId = workerNode->groupId; uint32 workerNodeGroupId = workerNode->groupId;
if (workerNodeGroupId == groupId) if (workerNodeGroupId == groupId)
@ -315,6 +314,7 @@ GetNodeCountInGroup(uint32 groupId)
} }
} }
list_free_deep(workerNodeList);
return elementCountInGroup; return elementCountInGroup;
} }

View File

@ -60,8 +60,8 @@ extern void CitusInvalidateNodeCache(void);
extern bool CitusHasBeenLoaded(void); extern bool CitusHasBeenLoaded(void);
/* access WorkerNodeHash */ /* access WorkerNodeList */
extern HTAB * GetWorkerNodeHash(void); extern List * GetWorkerNodeList(void);
/* relation oids */ /* relation oids */
extern Oid DistPartitionRelationId(void); extern Oid DistPartitionRelationId(void);

View File

@ -25,12 +25,14 @@
/* /*
* In memory representation of pg_dist_node table elements. The elements are hold in * In memory representation of pg_dist_node table elements.
* WorkerNodeHash table. *
* Caution: In some places we use memcpy to copy WorkerNodes. Those sites will have to be
* changed if you add any pointers to this structure.
*/ */
typedef struct WorkerNode typedef struct WorkerNode
{ {
uint32 nodeId; /* node's unique id, key of the hash table */ uint32 nodeId; /* node's unique id */
uint32 workerPort; /* node's port */ uint32 workerPort; /* node's port */
char workerName[WORKER_LENGTH]; /* node's name */ char workerName[WORKER_LENGTH]; /* node's name */
uint32 groupId; /* node's groupId; same for the nodes that are in the same group */ uint32 groupId; /* node's groupId; same for the nodes that are in the same group */

View File

@ -1,15 +1,15 @@
-- Tests functions related to cluster membership -- Tests functions related to cluster membership
-- add the nodes to the cluster -- add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-----------------------
(1,1,localhost,57637)
(1 row)
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
master_add_node master_add_node
----------------------- -----------------------
(2,2,localhost,57638) (1,1,localhost,57638)
(1 row)
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-----------------------
(2,2,localhost,57637)
(1 row) (1 row)
-- get the active nodes -- get the active nodes
@ -24,7 +24,7 @@ SELECT master_get_active_worker_nodes();
SELECT master_add_node('localhost', :worker_1_port); SELECT master_add_node('localhost', :worker_1_port);
master_add_node master_add_node
----------------------- -----------------------
(1,1,localhost,57637) (2,2,localhost,57637)
(1 row) (1 row)
-- get the active nodes -- get the active nodes

View File

@ -66,16 +66,16 @@ SELECT * FROM pg_dist_shard_placement;
DROP EXTENSION citus; DROP EXTENSION citus;
CREATE EXTENSION citus; CREATE EXTENSION citus;
-- re-add the nodes to the cluster -- re-add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-----------------------
(1,1,localhost,57637)
(1 row)
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
master_add_node master_add_node
----------------------- -----------------------
(2,2,localhost,57638) (1,1,localhost,57638)
(1 row)
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-----------------------
(2,2,localhost,57637)
(1 row) (1 row)
-- create a table with a SERIAL column -- create a table with a SERIAL column

View File

@ -1,8 +1,8 @@
-- Tests functions related to cluster membership -- Tests functions related to cluster membership
-- add the nodes to the cluster -- add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
SELECT master_add_node('localhost', :worker_1_port);
-- get the active nodes -- get the active nodes
SELECT master_get_active_worker_nodes(); SELECT master_get_active_worker_nodes();

View File

@ -46,8 +46,8 @@ DROP EXTENSION citus;
CREATE EXTENSION citus; CREATE EXTENSION citus;
-- re-add the nodes to the cluster -- re-add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
SELECT master_add_node('localhost', :worker_1_port);
-- create a table with a SERIAL column -- create a table with a SERIAL column
CREATE TABLE testserialtable(id serial, group_id integer); CREATE TABLE testserialtable(id serial, group_id integer);