diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index e77a96510..b8b4cfc90 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -146,6 +146,8 @@ static HTAB *DistShardCacheHash = NULL; /* Hash table for informations about worker nodes */ static HTAB *WorkerNodeHash = NULL; +static WorkerNode **WorkerNodeArray = NULL; +static int WorkerNodeCount = 0; static bool workerNodeHashValid = false; /* default value is -1, for coordinator it's 0 and for worker nodes > 0 */ @@ -168,6 +170,7 @@ static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArra shardIntervalSortCompareFunction); static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, int shardIntervalArrayLength); +static void PrepareWorkerNodeCache(void); static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount); static bool CheckInstalledVersion(int elevel); @@ -511,15 +514,14 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement, static WorkerNode * LookupNodeForGroup(uint32 groupId) { - WorkerNode *workerNode = NULL; - HASH_SEQ_STATUS status; - HTAB *workerNodeHash = GetWorkerNodeHash(); bool foundAnyNodes = false; + int workerNodeIndex = 0; - hash_seq_init(&status, workerNodeHash); + PrepareWorkerNodeCache(); - while ((workerNode = hash_seq_search(&status)) != NULL) + for (workerNodeIndex = 0; workerNodeIndex < WorkerNodeCount; workerNodeIndex++) { + WorkerNode *workerNode = WorkerNodeArray[workerNodeIndex]; uint32 workerNodeGroupId = workerNode->groupId; if (workerNodeGroupId != groupId) { @@ -530,7 +532,6 @@ LookupNodeForGroup(uint32 groupId) if (WorkerNodeIsReadable(workerNode)) { - hash_seq_term(&status); return workerNode; } } @@ -2463,12 +2464,27 @@ InitializeDistTableCache(void) /* - * GetWorkerNodeHash is a wrapper around InitializeWorkerNodeCache(). It - * triggers InitializeWorkerNodeCache when the workerHash is invalid. Otherwise, - * it returns the hash. + * GetWorkerNodeHash returns the worker node data as a hash with the nodename and + * nodeport as a key. + * + * The hash is returned from the cache, if the cache is not (yet) valid, it is first + * rebuilt. */ HTAB * GetWorkerNodeHash(void) +{ + PrepareWorkerNodeCache(); + + return WorkerNodeHash; +} + + +/* + * PrepareWorkerNodeCache makes sure the worker node data from pg_dist_node is cached, + * if it is not already cached. + */ +static void +PrepareWorkerNodeCache(void) { InitializeCaches(); /* ensure relevant callbacks are registered */ @@ -2490,8 +2506,6 @@ GetWorkerNodeHash(void) workerNodeHashValid = true; } - - return WorkerNodeHash; } @@ -2503,13 +2517,16 @@ GetWorkerNodeHash(void) static void InitializeWorkerNodeCache(void) { - HTAB *oldWorkerNodeHash = NULL; + HTAB *newWorkerNodeHash = NULL; List *workerNodeList = NIL; ListCell *workerNodeCell = NULL; HASHCTL info; int hashFlags = 0; long maxTableSize = (long) MaxWorkerNodesTracked; bool includeNodesFromOtherClusters = false; + int newWorkerNodeCount = 0; + WorkerNode **newWorkerNodeArray = NULL; + int workerNodeIndex = 0; InitializeCaches(); @@ -2526,14 +2543,15 @@ InitializeWorkerNodeCache(void) info.match = WorkerNodeCompare; hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE; - oldWorkerNodeHash = WorkerNodeHash; - WorkerNodeHash = hash_create("Worker Node Hash", - maxTableSize, - &info, hashFlags); + newWorkerNodeHash = hash_create("Worker Node Hash", maxTableSize, &info, hashFlags); /* read the list from pg_dist_node */ workerNodeList = ReadWorkerNodes(includeNodesFromOtherClusters); + newWorkerNodeCount = list_length(workerNodeList); + newWorkerNodeArray = MemoryContextAlloc(CacheMemoryContext, + sizeof(WorkerNode *) * newWorkerNodeCount); + /* iterate over the worker node list */ foreach(workerNodeCell, workerNodeList) { @@ -2544,7 +2562,7 @@ InitializeWorkerNodeCache(void) /* search for the worker node in the hash, and then insert the values */ hashKey = (void *) currentNode; - workerNode = (WorkerNode *) hash_search(WorkerNodeHash, hashKey, + workerNode = (WorkerNode *) hash_search(newWorkerNodeHash, hashKey, HASH_ENTER, &handleFound); /* fill the newly allocated workerNode in the cache */ @@ -2558,6 +2576,8 @@ InitializeWorkerNodeCache(void) workerNode->nodeRole = currentNode->nodeRole; strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN); + newWorkerNodeArray[workerNodeIndex++] = workerNode; + if (handleFound) { ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"", @@ -2570,7 +2590,16 @@ InitializeWorkerNodeCache(void) } /* now, safe to destroy the old hash */ - hash_destroy(oldWorkerNodeHash); + hash_destroy(WorkerNodeHash); + + if (WorkerNodeArray != NULL) + { + pfree(WorkerNodeArray); + } + + WorkerNodeCount = newWorkerNodeCount; + WorkerNodeArray = newWorkerNodeArray; + WorkerNodeHash = newWorkerNodeHash; }