Merge pull request #1996 from citusdata/cache_worker_node_array

Cache worker node array for faster iteration
pull/2007/head
Marco Slot 2018-02-12 15:26:48 -08:00 committed by GitHub
commit 6ce4795f1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 47 additions and 18 deletions

View File

@ -146,6 +146,8 @@ static HTAB *DistShardCacheHash = NULL;
/* Hash table for informations about worker nodes */ /* Hash table for informations about worker nodes */
static HTAB *WorkerNodeHash = NULL; static HTAB *WorkerNodeHash = NULL;
static WorkerNode **WorkerNodeArray = NULL;
static int WorkerNodeCount = 0;
static bool workerNodeHashValid = false; static bool workerNodeHashValid = false;
/* default value is -1, for coordinator it's 0 and for worker nodes > 0 */ /* default value is -1, for coordinator it's 0 and for worker nodes > 0 */
@ -168,6 +170,7 @@ static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArra
shardIntervalSortCompareFunction); shardIntervalSortCompareFunction);
static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength); int shardIntervalArrayLength);
static void PrepareWorkerNodeCache(void);
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
int shardCount); int shardCount);
static bool CheckInstalledVersion(int elevel); static bool CheckInstalledVersion(int elevel);
@ -511,15 +514,14 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
static WorkerNode * static WorkerNode *
LookupNodeForGroup(uint32 groupId) LookupNodeForGroup(uint32 groupId)
{ {
WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status;
HTAB *workerNodeHash = GetWorkerNodeHash();
bool foundAnyNodes = false; bool foundAnyNodes = false;
int workerNodeIndex = 0;
hash_seq_init(&status, workerNodeHash); PrepareWorkerNodeCache();
while ((workerNode = hash_seq_search(&status)) != NULL) for (workerNodeIndex = 0; workerNodeIndex < WorkerNodeCount; workerNodeIndex++)
{ {
WorkerNode *workerNode = WorkerNodeArray[workerNodeIndex];
uint32 workerNodeGroupId = workerNode->groupId; uint32 workerNodeGroupId = workerNode->groupId;
if (workerNodeGroupId != groupId) if (workerNodeGroupId != groupId)
{ {
@ -530,7 +532,6 @@ LookupNodeForGroup(uint32 groupId)
if (WorkerNodeIsReadable(workerNode)) if (WorkerNodeIsReadable(workerNode))
{ {
hash_seq_term(&status);
return workerNode; return workerNode;
} }
} }
@ -2463,12 +2464,27 @@ InitializeDistTableCache(void)
/* /*
* GetWorkerNodeHash is a wrapper around InitializeWorkerNodeCache(). It * GetWorkerNodeHash returns the worker node data as a hash with the nodename and
* triggers InitializeWorkerNodeCache when the workerHash is invalid. Otherwise, * nodeport as a key.
* it returns the hash. *
* The hash is returned from the cache, if the cache is not (yet) valid, it is first
* rebuilt.
*/ */
HTAB * HTAB *
GetWorkerNodeHash(void) GetWorkerNodeHash(void)
{
PrepareWorkerNodeCache();
return WorkerNodeHash;
}
/*
* PrepareWorkerNodeCache makes sure the worker node data from pg_dist_node is cached,
* if it is not already cached.
*/
static void
PrepareWorkerNodeCache(void)
{ {
InitializeCaches(); /* ensure relevant callbacks are registered */ InitializeCaches(); /* ensure relevant callbacks are registered */
@ -2490,8 +2506,6 @@ GetWorkerNodeHash(void)
workerNodeHashValid = true; workerNodeHashValid = true;
} }
return WorkerNodeHash;
} }
@ -2503,13 +2517,16 @@ GetWorkerNodeHash(void)
static void static void
InitializeWorkerNodeCache(void) InitializeWorkerNodeCache(void)
{ {
HTAB *oldWorkerNodeHash = NULL; HTAB *newWorkerNodeHash = NULL;
List *workerNodeList = NIL; List *workerNodeList = NIL;
ListCell *workerNodeCell = NULL; ListCell *workerNodeCell = NULL;
HASHCTL info; HASHCTL info;
int hashFlags = 0; int hashFlags = 0;
long maxTableSize = (long) MaxWorkerNodesTracked; long maxTableSize = (long) MaxWorkerNodesTracked;
bool includeNodesFromOtherClusters = false; bool includeNodesFromOtherClusters = false;
int newWorkerNodeCount = 0;
WorkerNode **newWorkerNodeArray = NULL;
int workerNodeIndex = 0;
InitializeCaches(); InitializeCaches();
@ -2526,14 +2543,15 @@ InitializeWorkerNodeCache(void)
info.match = WorkerNodeCompare; info.match = WorkerNodeCompare;
hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE; hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE;
oldWorkerNodeHash = WorkerNodeHash; newWorkerNodeHash = hash_create("Worker Node Hash", maxTableSize, &info, hashFlags);
WorkerNodeHash = hash_create("Worker Node Hash",
maxTableSize,
&info, hashFlags);
/* read the list from pg_dist_node */ /* read the list from pg_dist_node */
workerNodeList = ReadWorkerNodes(includeNodesFromOtherClusters); workerNodeList = ReadWorkerNodes(includeNodesFromOtherClusters);
newWorkerNodeCount = list_length(workerNodeList);
newWorkerNodeArray = MemoryContextAlloc(CacheMemoryContext,
sizeof(WorkerNode *) * newWorkerNodeCount);
/* iterate over the worker node list */ /* iterate over the worker node list */
foreach(workerNodeCell, workerNodeList) foreach(workerNodeCell, workerNodeList)
{ {
@ -2544,7 +2562,7 @@ InitializeWorkerNodeCache(void)
/* search for the worker node in the hash, and then insert the values */ /* search for the worker node in the hash, and then insert the values */
hashKey = (void *) currentNode; hashKey = (void *) currentNode;
workerNode = (WorkerNode *) hash_search(WorkerNodeHash, hashKey, workerNode = (WorkerNode *) hash_search(newWorkerNodeHash, hashKey,
HASH_ENTER, &handleFound); HASH_ENTER, &handleFound);
/* fill the newly allocated workerNode in the cache */ /* fill the newly allocated workerNode in the cache */
@ -2558,6 +2576,8 @@ InitializeWorkerNodeCache(void)
workerNode->nodeRole = currentNode->nodeRole; workerNode->nodeRole = currentNode->nodeRole;
strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN); strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN);
newWorkerNodeArray[workerNodeIndex++] = workerNode;
if (handleFound) if (handleFound)
{ {
ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"", ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"",
@ -2570,7 +2590,16 @@ InitializeWorkerNodeCache(void)
} }
/* now, safe to destroy the old hash */ /* now, safe to destroy the old hash */
hash_destroy(oldWorkerNodeHash); hash_destroy(WorkerNodeHash);
if (WorkerNodeArray != NULL)
{
pfree(WorkerNodeArray);
}
WorkerNodeCount = newWorkerNodeCount;
WorkerNodeArray = newWorkerNodeArray;
WorkerNodeHash = newWorkerNodeHash;
} }