From fe636f58cd6fb0cf5e062927bd2677d121249384 Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Fri, 23 Sep 2016 20:39:12 +0300 Subject: [PATCH] Change the workernode cache from a hash to a list --- .../distributed/master/worker_node_manager.c | 150 +++++++----------- .../distributed/utils/metadata_cache.c | 102 +++++------- src/backend/distributed/utils/node_metadata.c | 44 ++--- src/include/distributed/metadata_cache.h | 4 +- src/include/distributed/worker_manager.h | 8 +- .../regress/expected/multi_cluster_node.out | 16 +- src/test/regress/expected/multi_table_ddl.out | 14 +- src/test/regress/sql/multi_cluster_node.sql | 2 +- src/test/regress/sql/multi_table_ddl.sql | 2 +- 9 files changed, 147 insertions(+), 195 deletions(-) diff --git a/src/backend/distributed/master/worker_node_manager.c b/src/backend/distributed/master/worker_node_manager.c index e0b4e38c6..058274019 100644 --- a/src/backend/distributed/master/worker_node_manager.c +++ b/src/backend/distributed/master/worker_node_manager.c @@ -36,7 +36,7 @@ int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size * /* Local functions forward declarations */ static char * ClientHostAddress(StringInfo remoteHostStringInfo); -static WorkerNode * FindRandomNodeNotInList(HTAB *WorkerNodesHash, +static WorkerNode * FindRandomNodeNotInList(List *workerNodesList, List *currentNodeList); static bool ListMember(List *currentList, WorkerNode *workerNode); static int WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize); @@ -60,22 +60,26 @@ WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList) { WorkerNode *workerNode = NULL; - HTAB *workerNodeHash = GetWorkerNodeHash(); + WorkerNode *resultWorkerNode = NULL; + List *workerNodeList = GetWorkerNodeList(); /* * We check if the shard has already been placed on all nodes known to us. - * This check is rather defensive, and has the drawback of performing a full - * scan over the worker node hash for determining the number of live nodes. */ uint32 currentNodeCount = list_length(currentNodeList); - uint32 liveNodeCount = WorkerGetLiveNodeCount(); + uint32 liveNodeCount = list_length(workerNodeList); if (currentNodeCount >= liveNodeCount) { return NULL; } - workerNode = FindRandomNodeNotInList(workerNodeHash, currentNodeList); - return workerNode; + workerNode = FindRandomNodeNotInList(workerNodeList, currentNodeList); + + resultWorkerNode = palloc(sizeof(WorkerNode)); + memcpy(resultWorkerNode, workerNode, sizeof(WorkerNode)); + list_free_deep(workerNodeList); + + return resultWorkerNode; } @@ -229,24 +233,25 @@ ClientHostAddress(StringInfo clientHostStringInfo) WorkerNode * WorkerGetNodeWithName(const char *hostname) { - WorkerNode *workerNode = NULL; - HASH_SEQ_STATUS status; - HTAB *workerNodeHash = GetWorkerNodeHash(); + WorkerNode *resultWorkerNode = NULL; + ListCell *workerNodeCell = NULL; + List *workerNodeList = GetWorkerNodeList(); - hash_seq_init(&status, workerNodeHash); - - while ((workerNode = hash_seq_search(&status)) != NULL) + foreach(workerNodeCell, workerNodeList) { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH); if (nameCompare == 0) { - /* we need to terminate the scan since we break */ - hash_seq_term(&status); + resultWorkerNode = palloc(sizeof(WorkerNode)); + memcpy(resultWorkerNode, workerNode, sizeof(WorkerNode)); break; } } - return workerNode; + list_free_deep(workerNodeList); + return resultWorkerNode; } @@ -254,41 +259,29 @@ WorkerGetNodeWithName(const char *hostname) uint32 WorkerGetLiveNodeCount(void) { - WorkerNode *workerNode = NULL; uint32 liveWorkerCount = 0; - HASH_SEQ_STATUS status; - HTAB *workerNodeHash = GetWorkerNodeHash(); + List *workerNodeList = GetWorkerNodeList(); - hash_seq_init(&status, workerNodeHash); + /* todo: this is such a waste of resources. we create a whole new list just to count + * it and immediately free it */ - while ((workerNode = hash_seq_search(&status)) != NULL) - { - liveWorkerCount++; - } + liveWorkerCount = list_length(workerNodeList); + list_free_deep(workerNodeList); return liveWorkerCount; } /* - * WorkerNodeList iterates over the hash table that includes the worker nodes, and adds - * them to a list which is returned. + * WorkerNodeList returns a list of worker nodes, don't forget to free it after you've + * used it! + * + * todo: add free to all call sites? */ List * WorkerNodeList(void) { - List *workerNodeList = NIL; - WorkerNode *workerNode = NULL; - HTAB *workerNodeHash = GetWorkerNodeHash(); - HASH_SEQ_STATUS status; - - hash_seq_init(&status, workerNodeHash); - - while ((workerNode = hash_seq_search(&status)) != NULL) - { - workerNodeList = lappend(workerNodeList, workerNode); - } - + List *workerNodeList = GetWorkerNodeList(); return workerNodeList; } @@ -301,24 +294,24 @@ WorkerNodeList(void) bool WorkerNodeActive(const char *nodeName, uint32 nodePort) { - WorkerNode *workerNode = NULL; - HASH_SEQ_STATUS status; - HTAB *workerNodeHash = GetWorkerNodeHash(); + bool nodeActive = false; + ListCell *workerNodeCell = NULL; + List *workerNodeList = GetWorkerNodeList(); - hash_seq_init(&status, workerNodeHash); - - while ((workerNode = hash_seq_search(&status)) != NULL) + foreach(workerNodeCell, workerNodeList) { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + if (strncmp(workerNode->workerName, nodeName, WORKER_LENGTH) == 0 && workerNode->workerPort == nodePort) { - /* we need to terminate the scan since we break */ - hash_seq_term(&status); + nodeActive = true; break; } } - return workerNode != NULL; + list_free_deep(workerNodeList); + return nodeActive; } @@ -327,68 +320,47 @@ WorkerNodeActive(const char *nodeName, uint32 nodePort) * a member of the current node list. The caller is responsible for making the * necessary node count checks to ensure that such a node exists. * - * Note that this function has a selection bias towards nodes whose positions in - * the shared hash are sequentially adjacent to the positions of nodes that are - * in the current node list. This bias follows from our decision to first pick a - * random node in the hash, and if that node is a member of the current list, to - * simply iterate to the next node in the hash. Overall, this approach trades in - * some selection bias for simplicity in design and for bounded execution time. + * This function is O(workerNodesList * currentNodeList). This is unfortunate but was + * easy to implement. */ static WorkerNode * -FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList) +FindRandomNodeNotInList(List *workerNodesList, List *currentNodeList) { - WorkerNode *workerNode = NULL; - HASH_SEQ_STATUS status; - uint32 workerNodeCount = 0; - uint32 currentNodeCount PG_USED_FOR_ASSERTS_ONLY = 0; - bool lookForWorkerNode = true; + ListCell *workerNodeCell; + uint32 workerPosition = 0; uint32 workerIndex = 0; - workerNodeCount = hash_get_num_entries(WorkerNodesHash); - currentNodeCount = list_length(currentNodeList); + uint32 workerNodeCount = list_length(workerNodesList); + uint32 currentNodeCount = list_length(currentNodeList); + Assert(workerNodeCount > currentNodeCount); /* - * We determine a random position within the worker hash between [1, N], - * assuming that the number of elements in the hash is N. We then get to - * this random position by iterating over the worker hash. Please note that - * the random seed has already been set by the postmaster when starting up. + * Please note that the random seed has already been set by the postmaster when + * starting up. */ - workerPosition = (random() % workerNodeCount) + 1; - hash_seq_init(&status, WorkerNodesHash); + workerPosition = random() % (workerNodeCount - currentNodeCount); - for (workerIndex = 0; workerIndex < workerPosition; workerIndex++) - { - workerNode = (WorkerNode *) hash_seq_search(&status); - } - - while (lookForWorkerNode) + foreach(workerNodeCell, workerNodesList) { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); bool listMember = ListMember(currentNodeList, workerNode); - if (!listMember) + if (listMember) { - lookForWorkerNode = false; + continue; } - else - { - /* iterate to the next worker node in the hash */ - workerNode = (WorkerNode *) hash_seq_search(&status); - /* reached end of hash; start from the beginning */ - if (workerNode == NULL) - { - hash_seq_init(&status, WorkerNodesHash); - workerNode = (WorkerNode *) hash_seq_search(&status); - } + if (workerIndex == workerPosition) + { + return workerNode; } + + workerIndex++; } - /* we stopped scanning before completion; therefore clean up scan */ - hash_seq_term(&status); - - return workerNode; + return NULL; } diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 5fcdc0dc1..ac2a7d7e6 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -62,8 +62,8 @@ static Oid extraDataContainerFuncId = InvalidOid; /* Hash table for informations about each partition */ static HTAB *DistTableCacheHash = NULL; -/* Hash table for informations about worker nodes */ -static HTAB *WorkerNodeHash = NULL; +/* cached list of active workers */ +static List *CachedNodeList = NULL; /* built first time through in InitializePartitionCache */ static ScanKeyData DistPartitionScanKey[1]; @@ -967,19 +967,33 @@ InitializeDistTableCache(void) /* - * GetWorkerNodeHash is a wrapper around InitializeWorkerNodeCache(). It + * GetWorkerNodeList is a wrapper around InitializeWorkerNodeCache(). It * triggers InitializeWorkerNodeCache when the workerHash is NULL. Otherwise, * it returns the hash. */ -HTAB * -GetWorkerNodeHash(void) +List * +GetWorkerNodeList(void) { - if (WorkerNodeHash == NULL) + List *resultList = NULL; + ListCell *workerNodeCell; + + if (CachedNodeList == NULL) { InitializeWorkerNodeCache(); } - return WorkerNodeHash; + /* deep copy the list so invalidations, which free the list, don't cause crashes */ + foreach(workerNodeCell, CachedNodeList) + { + WorkerNode *oldNode = (WorkerNode *) lfirst(workerNodeCell); + WorkerNode *newNode = palloc(sizeof(WorkerNode)); + + memcpy(newNode, oldNode, sizeof(WorkerNode)); + + resultList = lappend(resultList, newNode); + } + + return resultList; } @@ -991,12 +1005,10 @@ GetWorkerNodeHash(void) static void InitializeWorkerNodeCache(void) { + MemoryContext oldContext; static bool invalidationRegistered = false; - List *workerNodeList = NIL; - ListCell *workerNodeCell = NULL; - HASHCTL info; - int hashFlags = 0; - long maxTableSize = (long) MaxWorkerNodesTracked; + List *workerList = NULL; + ListCell *workerListCell; /* make sure we've initialized CacheMemoryContext */ if (CacheMemoryContext == NULL) @@ -1004,58 +1016,24 @@ InitializeWorkerNodeCache(void) CreateCacheMemoryContext(); } - /* - * Create the hash that holds the worker nodes. The key is the unique nodeid - * field. - */ - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(uint32); - info.entrysize = sizeof(WorkerNode); - info.hcxt = CacheMemoryContext; - info.hash = tag_hash; - hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT; - - WorkerNodeHash = hash_create("Worker Node Hash", - maxTableSize, - &info, hashFlags); + Assert(CachedNodeList == NULL); /* read the list from the pg_dist_node */ - workerNodeList = ReadWorkerNodes(); + workerList = ReadWorkerNodes(); - /* iterate over the worker node list */ - foreach(workerNodeCell, workerNodeList) + oldContext = MemoryContextSwitchTo(CacheMemoryContext); + + foreach(workerListCell, workerList) { - WorkerNode *workerNode = NULL; - WorkerNode *currentNode = lfirst(workerNodeCell); - void *hashKey = NULL; - bool handleFound = false; + WorkerNode *workerNode = (WorkerNode *) lfirst(workerListCell); + WorkerNode *newWorkerNode = palloc(sizeof(WorkerNode)); - /* - * Search for the worker node in the hash, and then insert the - * values. When searching, we make the hashKey the unique nodeid. - */ - hashKey = (void *) ¤tNode->nodeId; - workerNode = (WorkerNode *) hash_search(WorkerNodeHash, hashKey, - HASH_ENTER, &handleFound); - - /* fill the newly allocated workerNode in the cache */ - strlcpy(workerNode->workerName, currentNode->workerName, WORKER_LENGTH); - workerNode->workerPort = currentNode->workerPort; - workerNode->groupId = currentNode->groupId; - - if (handleFound) - { - ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"", - workerNode->workerName, - workerNode->workerPort))); - } - - workerNode->workerPort = currentNode->workerPort; - - /* we do not need the currentNode anymore */ - pfree(currentNode); + memcpy(newWorkerNode, workerNode, sizeof(WorkerNode)); + CachedNodeList = lappend(CachedNodeList, newWorkerNode); } + MemoryContextSwitchTo(oldContext); + /* prevent multiple invalidation registrations */ if (!invalidationRegistered) { @@ -1181,18 +1159,18 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) /* - * InvalidateNodeRelationCacheCallback destroys the WorkerNodeHash when - * any change happens on pg_dist_node table. It also set WorkerNodeHash to + * InvalidateNodeRelationCacheCallback destroys the CachedNodeList when + * any change happens on pg_dist_node table. It also set CachedNodeList to * NULL, which allows consequent accesses to the hash read from the * pg_dist_node from scratch. */ static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId) { - if (WorkerNodeHash != NULL && relationId == DistNodeRelationId()) + if (CachedNodeList != NULL && relationId == DistNodeRelationId()) { - hash_destroy(WorkerNodeHash); - WorkerNodeHash = NULL; + list_free_deep(CachedNodeList); + CachedNodeList = NULL; } } diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 813acd651..035ca6300 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -215,24 +215,26 @@ GenerateNodeTuple(WorkerNode *workerNode) static WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort) { - WorkerNode *workerNode = NULL; - HTAB *workerNodeHash = GetWorkerNodeHash(); - HASH_SEQ_STATUS status; + ListCell *workerNodeCell = NULL; - hash_seq_init(&status, workerNodeHash); - - while ((workerNode = hash_seq_search(&status)) != NULL) + List *workerNodeList = GetWorkerNodeList(); + foreach(workerNodeCell, workerNodeList) { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + if (strncasecmp(nodeName, workerNode->workerName, WORKER_LENGTH) == 0 && nodePort == workerNode->workerPort) { - /* we need to terminate the scan since we break */ - hash_seq_term(&status); - break; + WorkerNode *resultWorkerNode = palloc(sizeof(WorkerNode)); + memcpy(resultWorkerNode, workerNode, sizeof(WorkerNode)); + + list_free_deep(workerNodeList); + return resultWorkerNode; } } - return workerNode; + list_free_deep(workerNodeList); + return NULL; } @@ -271,14 +273,12 @@ static uint32 GetMaxGroupId() { uint32 maxGroupId = 0; - WorkerNode *workerNode = NULL; - HTAB *workerNodeHash = GetWorkerNodeHash(); - HASH_SEQ_STATUS status; + ListCell *workerNodeCell = NULL; - hash_seq_init(&status, workerNodeHash); - - while ((workerNode = hash_seq_search(&status)) != NULL) + List *workerNodeList = GetWorkerNodeList(); + foreach(workerNodeCell, workerNodeList) { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); uint32 workerNodeGroupId = workerNode->groupId; if (workerNodeGroupId > maxGroupId) @@ -287,6 +287,7 @@ GetMaxGroupId() } } + list_free_deep(workerNodeList); return maxGroupId; } @@ -299,14 +300,12 @@ static uint64 GetNodeCountInGroup(uint32 groupId) { uint64 elementCountInGroup = 0; - WorkerNode *workerNode = NULL; - HTAB *workerNodeHash = GetWorkerNodeHash(); - HASH_SEQ_STATUS status; + ListCell *workerNodeCell; + List *workerNodeList = GetWorkerNodeList(); - hash_seq_init(&status, workerNodeHash); - - while ((workerNode = hash_seq_search(&status)) != NULL) + foreach(workerNodeCell, workerNodeList) { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); uint32 workerNodeGroupId = workerNode->groupId; if (workerNodeGroupId == groupId) @@ -315,6 +314,7 @@ GetNodeCountInGroup(uint32 groupId) } } + list_free_deep(workerNodeList); return elementCountInGroup; } diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index d66981437..5e308db0c 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -60,8 +60,8 @@ extern void CitusInvalidateNodeCache(void); extern bool CitusHasBeenLoaded(void); -/* access WorkerNodeHash */ -extern HTAB * GetWorkerNodeHash(void); +/* access WorkerNodeList */ +extern List * GetWorkerNodeList(void); /* relation oids */ extern Oid DistPartitionRelationId(void); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 079433022..de15618b5 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -25,12 +25,14 @@ /* - * In memory representation of pg_dist_node table elements. The elements are hold in - * WorkerNodeHash table. + * In memory representation of pg_dist_node table elements. + * + * Caution: In some places we use memcpy to copy WorkerNodes. Those sites will have to be + * changed if you add any pointers to this structure. */ typedef struct WorkerNode { - uint32 nodeId; /* node's unique id, key of the hash table */ + uint32 nodeId; /* node's unique id */ uint32 workerPort; /* node's port */ char workerName[WORKER_LENGTH]; /* node's name */ uint32 groupId; /* node's groupId; same for the nodes that are in the same group */ diff --git a/src/test/regress/expected/multi_cluster_node.out b/src/test/regress/expected/multi_cluster_node.out index 15d9d0118..b5642bf1d 100644 --- a/src/test/regress/expected/multi_cluster_node.out +++ b/src/test/regress/expected/multi_cluster_node.out @@ -1,15 +1,15 @@ -- Tests functions related to cluster membership -- add the nodes to the cluster -SELECT master_add_node('localhost', :worker_1_port); - master_add_node ------------------------ - (1,1,localhost,57637) -(1 row) - SELECT master_add_node('localhost', :worker_2_port); master_add_node ----------------------- - (2,2,localhost,57638) + (1,1,localhost,57638) +(1 row) + +SELECT master_add_node('localhost', :worker_1_port); + master_add_node +----------------------- + (2,2,localhost,57637) (1 row) -- get the active nodes @@ -24,7 +24,7 @@ SELECT master_get_active_worker_nodes(); SELECT master_add_node('localhost', :worker_1_port); master_add_node ----------------------- - (1,1,localhost,57637) + (2,2,localhost,57637) (1 row) -- get the active nodes diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index 45efdfef9..d9287dd1b 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -66,16 +66,16 @@ SELECT * FROM pg_dist_shard_placement; DROP EXTENSION citus; CREATE EXTENSION citus; -- re-add the nodes to the cluster -SELECT master_add_node('localhost', :worker_1_port); - master_add_node ------------------------ - (1,1,localhost,57637) -(1 row) - SELECT master_add_node('localhost', :worker_2_port); master_add_node ----------------------- - (2,2,localhost,57638) + (1,1,localhost,57638) +(1 row) + +SELECT master_add_node('localhost', :worker_1_port); + master_add_node +----------------------- + (2,2,localhost,57637) (1 row) -- create a table with a SERIAL column diff --git a/src/test/regress/sql/multi_cluster_node.sql b/src/test/regress/sql/multi_cluster_node.sql index f2ae7222d..137775ec4 100644 --- a/src/test/regress/sql/multi_cluster_node.sql +++ b/src/test/regress/sql/multi_cluster_node.sql @@ -1,8 +1,8 @@ -- Tests functions related to cluster membership -- add the nodes to the cluster -SELECT master_add_node('localhost', :worker_1_port); SELECT master_add_node('localhost', :worker_2_port); +SELECT master_add_node('localhost', :worker_1_port); -- get the active nodes SELECT master_get_active_worker_nodes(); diff --git a/src/test/regress/sql/multi_table_ddl.sql b/src/test/regress/sql/multi_table_ddl.sql index 8cb2ddbf3..5779be29e 100644 --- a/src/test/regress/sql/multi_table_ddl.sql +++ b/src/test/regress/sql/multi_table_ddl.sql @@ -46,8 +46,8 @@ DROP EXTENSION citus; CREATE EXTENSION citus; -- re-add the nodes to the cluster -SELECT master_add_node('localhost', :worker_1_port); SELECT master_add_node('localhost', :worker_2_port); +SELECT master_add_node('localhost', :worker_1_port); -- create a table with a SERIAL column CREATE TABLE testserialtable(id serial, group_id integer);