From 40d715d494ead8463d677b509043fcf81f3b1e9f Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sun, 4 Feb 2018 12:04:36 +0100 Subject: [PATCH 1/2] Cache worker node array for faster iteration --- .../distributed/utils/metadata_cache.c | 54 +++++++++++++++---- 1 file changed, 43 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index e77a96510..641857ea9 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -146,6 +146,8 @@ static HTAB *DistShardCacheHash = NULL; /* Hash table for informations about worker nodes */ static HTAB *WorkerNodeHash = NULL; +static WorkerNode **WorkerNodeArray = NULL; +static int WorkerNodeCount = 0; static bool workerNodeHashValid = false; /* default value is -1, for coordinator it's 0 and for worker nodes > 0 */ @@ -168,6 +170,7 @@ static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArra shardIntervalSortCompareFunction); static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, int shardIntervalArrayLength); +static void PrepareWorkerNodeCache(void); static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount); static bool CheckInstalledVersion(int elevel); @@ -511,15 +514,14 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement, static WorkerNode * LookupNodeForGroup(uint32 groupId) { - WorkerNode *workerNode = NULL; - HASH_SEQ_STATUS status; - HTAB *workerNodeHash = GetWorkerNodeHash(); bool foundAnyNodes = false; + int workerNodeIndex = 0; - hash_seq_init(&status, workerNodeHash); + PrepareWorkerNodeCache(); - while ((workerNode = hash_seq_search(&status)) != NULL) + for (workerNodeIndex = 0; workerNodeIndex < WorkerNodeCount; workerNodeIndex++) { + WorkerNode *workerNode = WorkerNodeArray[workerNodeIndex]; uint32 workerNodeGroupId = workerNode->groupId; if (workerNodeGroupId != groupId) { @@ -530,7 +532,6 @@ LookupNodeForGroup(uint32 groupId) if (WorkerNodeIsReadable(workerNode)) { - hash_seq_term(&status); return workerNode; } } @@ -2463,12 +2464,27 @@ InitializeDistTableCache(void) /* - * GetWorkerNodeHash is a wrapper around InitializeWorkerNodeCache(). It - * triggers InitializeWorkerNodeCache when the workerHash is invalid. Otherwise, - * it returns the hash. + * GetWorkerNodeHash returns the worker node data as a hash with the nodename and + * nodeport as a key. + * + * The hash is returned from the cache, if the cache is not (yet) valid, it is first + * rebuilt. */ HTAB * GetWorkerNodeHash(void) +{ + PrepareWorkerNodeCache(); + + return WorkerNodeHash; +} + + +/* + * PrepareWorkerNodeCache makes sure the worker node data from pg_dist_node is cached, + * if it is not already cached. + */ +static void +PrepareWorkerNodeCache(void) { InitializeCaches(); /* ensure relevant callbacks are registered */ @@ -2490,8 +2506,6 @@ GetWorkerNodeHash(void) workerNodeHashValid = true; } - - return WorkerNodeHash; } @@ -2510,6 +2524,9 @@ InitializeWorkerNodeCache(void) int hashFlags = 0; long maxTableSize = (long) MaxWorkerNodesTracked; bool includeNodesFromOtherClusters = false; + int newWorkerNodeCount = 0; + WorkerNode **newWorkerNodeArray = NULL; + int workerNodeIndex = 0; InitializeCaches(); @@ -2534,6 +2551,10 @@ InitializeWorkerNodeCache(void) /* read the list from pg_dist_node */ workerNodeList = ReadWorkerNodes(includeNodesFromOtherClusters); + newWorkerNodeCount = list_length(workerNodeList); + newWorkerNodeArray = MemoryContextAlloc(CacheMemoryContext, + sizeof(WorkerNode *) * newWorkerNodeCount); + /* iterate over the worker node list */ foreach(workerNodeCell, workerNodeList) { @@ -2558,6 +2579,8 @@ InitializeWorkerNodeCache(void) workerNode->nodeRole = currentNode->nodeRole; strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN); + newWorkerNodeArray[workerNodeIndex++] = workerNode; + if (handleFound) { ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"", @@ -2571,6 +2594,15 @@ InitializeWorkerNodeCache(void) /* now, safe to destroy the old hash */ hash_destroy(oldWorkerNodeHash); + + if (WorkerNodeArray != NULL) + { + pfree(WorkerNodeArray); + } + + WorkerNodeCount = newWorkerNodeCount; + WorkerNodeArray = newWorkerNodeArray; + } From 0cba4ab588b0f60a0d1750a15dd422f1e351bc63 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 9 Feb 2018 02:27:33 +0100 Subject: [PATCH 2/2] Refactor worker node hash initialisation --- src/backend/distributed/utils/metadata_cache.c | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 641857ea9..b8b4cfc90 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -2517,7 +2517,7 @@ PrepareWorkerNodeCache(void) static void InitializeWorkerNodeCache(void) { - HTAB *oldWorkerNodeHash = NULL; + HTAB *newWorkerNodeHash = NULL; List *workerNodeList = NIL; ListCell *workerNodeCell = NULL; HASHCTL info; @@ -2543,10 +2543,7 @@ InitializeWorkerNodeCache(void) info.match = WorkerNodeCompare; hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE; - oldWorkerNodeHash = WorkerNodeHash; - WorkerNodeHash = hash_create("Worker Node Hash", - maxTableSize, - &info, hashFlags); + newWorkerNodeHash = hash_create("Worker Node Hash", maxTableSize, &info, hashFlags); /* read the list from pg_dist_node */ workerNodeList = ReadWorkerNodes(includeNodesFromOtherClusters); @@ -2565,7 +2562,7 @@ InitializeWorkerNodeCache(void) /* search for the worker node in the hash, and then insert the values */ hashKey = (void *) currentNode; - workerNode = (WorkerNode *) hash_search(WorkerNodeHash, hashKey, + workerNode = (WorkerNode *) hash_search(newWorkerNodeHash, hashKey, HASH_ENTER, &handleFound); /* fill the newly allocated workerNode in the cache */ @@ -2593,7 +2590,7 @@ InitializeWorkerNodeCache(void) } /* now, safe to destroy the old hash */ - hash_destroy(oldWorkerNodeHash); + hash_destroy(WorkerNodeHash); if (WorkerNodeArray != NULL) { @@ -2602,7 +2599,7 @@ InitializeWorkerNodeCache(void) WorkerNodeCount = newWorkerNodeCount; WorkerNodeArray = newWorkerNodeArray; - + WorkerNodeHash = newWorkerNodeHash; }