From 40d715d494ead8463d677b509043fcf81f3b1e9f Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sun, 4 Feb 2018 12:04:36 +0100 Subject: [PATCH] Cache worker node array for faster iteration --- .../distributed/utils/metadata_cache.c | 54 +++++++++++++++---- 1 file changed, 43 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index e77a96510..641857ea9 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -146,6 +146,8 @@ static HTAB *DistShardCacheHash = NULL; /* Hash table for informations about worker nodes */ static HTAB *WorkerNodeHash = NULL; +static WorkerNode **WorkerNodeArray = NULL; +static int WorkerNodeCount = 0; static bool workerNodeHashValid = false; /* default value is -1, for coordinator it's 0 and for worker nodes > 0 */ @@ -168,6 +170,7 @@ static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArra shardIntervalSortCompareFunction); static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, int shardIntervalArrayLength); +static void PrepareWorkerNodeCache(void); static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount); static bool CheckInstalledVersion(int elevel); @@ -511,15 +514,14 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement, static WorkerNode * LookupNodeForGroup(uint32 groupId) { - WorkerNode *workerNode = NULL; - HASH_SEQ_STATUS status; - HTAB *workerNodeHash = GetWorkerNodeHash(); bool foundAnyNodes = false; + int workerNodeIndex = 0; - hash_seq_init(&status, workerNodeHash); + PrepareWorkerNodeCache(); - while ((workerNode = hash_seq_search(&status)) != NULL) + for (workerNodeIndex = 0; workerNodeIndex < WorkerNodeCount; workerNodeIndex++) { + WorkerNode *workerNode = WorkerNodeArray[workerNodeIndex]; uint32 workerNodeGroupId = workerNode->groupId; if (workerNodeGroupId != groupId) { @@ -530,7 +532,6 @@ LookupNodeForGroup(uint32 groupId) if (WorkerNodeIsReadable(workerNode)) { - hash_seq_term(&status); return workerNode; } } @@ -2463,12 +2464,27 @@ InitializeDistTableCache(void) /* - * GetWorkerNodeHash is a wrapper around InitializeWorkerNodeCache(). It - * triggers InitializeWorkerNodeCache when the workerHash is invalid. Otherwise, - * it returns the hash. + * GetWorkerNodeHash returns the worker node data as a hash with the nodename and + * nodeport as a key. + * + * The hash is returned from the cache, if the cache is not (yet) valid, it is first + * rebuilt. */ HTAB * GetWorkerNodeHash(void) +{ + PrepareWorkerNodeCache(); + + return WorkerNodeHash; +} + + +/* + * PrepareWorkerNodeCache makes sure the worker node data from pg_dist_node is cached, + * if it is not already cached. + */ +static void +PrepareWorkerNodeCache(void) { InitializeCaches(); /* ensure relevant callbacks are registered */ @@ -2490,8 +2506,6 @@ GetWorkerNodeHash(void) workerNodeHashValid = true; } - - return WorkerNodeHash; } @@ -2510,6 +2524,9 @@ InitializeWorkerNodeCache(void) int hashFlags = 0; long maxTableSize = (long) MaxWorkerNodesTracked; bool includeNodesFromOtherClusters = false; + int newWorkerNodeCount = 0; + WorkerNode **newWorkerNodeArray = NULL; + int workerNodeIndex = 0; InitializeCaches(); @@ -2534,6 +2551,10 @@ InitializeWorkerNodeCache(void) /* read the list from pg_dist_node */ workerNodeList = ReadWorkerNodes(includeNodesFromOtherClusters); + newWorkerNodeCount = list_length(workerNodeList); + newWorkerNodeArray = MemoryContextAlloc(CacheMemoryContext, + sizeof(WorkerNode *) * newWorkerNodeCount); + /* iterate over the worker node list */ foreach(workerNodeCell, workerNodeList) { @@ -2558,6 +2579,8 @@ InitializeWorkerNodeCache(void) workerNode->nodeRole = currentNode->nodeRole; strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN); + newWorkerNodeArray[workerNodeIndex++] = workerNode; + if (handleFound) { ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"", @@ -2571,6 +2594,15 @@ InitializeWorkerNodeCache(void) /* now, safe to destroy the old hash */ hash_destroy(oldWorkerNodeHash); + + if (WorkerNodeArray != NULL) + { + pfree(WorkerNodeArray); + } + + WorkerNodeCount = newWorkerNodeCount; + WorkerNodeArray = newWorkerNodeArray; + }