mirror of https://github.com/citusdata/citus.git
Cache worker node array for faster iteration
parent
65fca44f4f
commit
40d715d494
|
@ -146,6 +146,8 @@ static HTAB *DistShardCacheHash = NULL;
|
||||||
|
|
||||||
/* Hash table for informations about worker nodes */
|
/* Hash table for informations about worker nodes */
|
||||||
static HTAB *WorkerNodeHash = NULL;
|
static HTAB *WorkerNodeHash = NULL;
|
||||||
|
static WorkerNode **WorkerNodeArray = NULL;
|
||||||
|
static int WorkerNodeCount = 0;
|
||||||
static bool workerNodeHashValid = false;
|
static bool workerNodeHashValid = false;
|
||||||
|
|
||||||
/* default value is -1, for coordinator it's 0 and for worker nodes > 0 */
|
/* default value is -1, for coordinator it's 0 and for worker nodes > 0 */
|
||||||
|
@ -168,6 +170,7 @@ static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArra
|
||||||
shardIntervalSortCompareFunction);
|
shardIntervalSortCompareFunction);
|
||||||
static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
|
static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
|
||||||
int shardIntervalArrayLength);
|
int shardIntervalArrayLength);
|
||||||
|
static void PrepareWorkerNodeCache(void);
|
||||||
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
|
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
|
||||||
int shardCount);
|
int shardCount);
|
||||||
static bool CheckInstalledVersion(int elevel);
|
static bool CheckInstalledVersion(int elevel);
|
||||||
|
@ -511,15 +514,14 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
|
||||||
static WorkerNode *
|
static WorkerNode *
|
||||||
LookupNodeForGroup(uint32 groupId)
|
LookupNodeForGroup(uint32 groupId)
|
||||||
{
|
{
|
||||||
WorkerNode *workerNode = NULL;
|
|
||||||
HASH_SEQ_STATUS status;
|
|
||||||
HTAB *workerNodeHash = GetWorkerNodeHash();
|
|
||||||
bool foundAnyNodes = false;
|
bool foundAnyNodes = false;
|
||||||
|
int workerNodeIndex = 0;
|
||||||
|
|
||||||
hash_seq_init(&status, workerNodeHash);
|
PrepareWorkerNodeCache();
|
||||||
|
|
||||||
while ((workerNode = hash_seq_search(&status)) != NULL)
|
for (workerNodeIndex = 0; workerNodeIndex < WorkerNodeCount; workerNodeIndex++)
|
||||||
{
|
{
|
||||||
|
WorkerNode *workerNode = WorkerNodeArray[workerNodeIndex];
|
||||||
uint32 workerNodeGroupId = workerNode->groupId;
|
uint32 workerNodeGroupId = workerNode->groupId;
|
||||||
if (workerNodeGroupId != groupId)
|
if (workerNodeGroupId != groupId)
|
||||||
{
|
{
|
||||||
|
@ -530,7 +532,6 @@ LookupNodeForGroup(uint32 groupId)
|
||||||
|
|
||||||
if (WorkerNodeIsReadable(workerNode))
|
if (WorkerNodeIsReadable(workerNode))
|
||||||
{
|
{
|
||||||
hash_seq_term(&status);
|
|
||||||
return workerNode;
|
return workerNode;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2463,12 +2464,27 @@ InitializeDistTableCache(void)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GetWorkerNodeHash is a wrapper around InitializeWorkerNodeCache(). It
|
* GetWorkerNodeHash returns the worker node data as a hash with the nodename and
|
||||||
* triggers InitializeWorkerNodeCache when the workerHash is invalid. Otherwise,
|
* nodeport as a key.
|
||||||
* it returns the hash.
|
*
|
||||||
|
* The hash is returned from the cache, if the cache is not (yet) valid, it is first
|
||||||
|
* rebuilt.
|
||||||
*/
|
*/
|
||||||
HTAB *
|
HTAB *
|
||||||
GetWorkerNodeHash(void)
|
GetWorkerNodeHash(void)
|
||||||
|
{
|
||||||
|
PrepareWorkerNodeCache();
|
||||||
|
|
||||||
|
return WorkerNodeHash;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PrepareWorkerNodeCache makes sure the worker node data from pg_dist_node is cached,
|
||||||
|
* if it is not already cached.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
PrepareWorkerNodeCache(void)
|
||||||
{
|
{
|
||||||
InitializeCaches(); /* ensure relevant callbacks are registered */
|
InitializeCaches(); /* ensure relevant callbacks are registered */
|
||||||
|
|
||||||
|
@ -2490,8 +2506,6 @@ GetWorkerNodeHash(void)
|
||||||
|
|
||||||
workerNodeHashValid = true;
|
workerNodeHashValid = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return WorkerNodeHash;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2510,6 +2524,9 @@ InitializeWorkerNodeCache(void)
|
||||||
int hashFlags = 0;
|
int hashFlags = 0;
|
||||||
long maxTableSize = (long) MaxWorkerNodesTracked;
|
long maxTableSize = (long) MaxWorkerNodesTracked;
|
||||||
bool includeNodesFromOtherClusters = false;
|
bool includeNodesFromOtherClusters = false;
|
||||||
|
int newWorkerNodeCount = 0;
|
||||||
|
WorkerNode **newWorkerNodeArray = NULL;
|
||||||
|
int workerNodeIndex = 0;
|
||||||
|
|
||||||
InitializeCaches();
|
InitializeCaches();
|
||||||
|
|
||||||
|
@ -2534,6 +2551,10 @@ InitializeWorkerNodeCache(void)
|
||||||
/* read the list from pg_dist_node */
|
/* read the list from pg_dist_node */
|
||||||
workerNodeList = ReadWorkerNodes(includeNodesFromOtherClusters);
|
workerNodeList = ReadWorkerNodes(includeNodesFromOtherClusters);
|
||||||
|
|
||||||
|
newWorkerNodeCount = list_length(workerNodeList);
|
||||||
|
newWorkerNodeArray = MemoryContextAlloc(CacheMemoryContext,
|
||||||
|
sizeof(WorkerNode *) * newWorkerNodeCount);
|
||||||
|
|
||||||
/* iterate over the worker node list */
|
/* iterate over the worker node list */
|
||||||
foreach(workerNodeCell, workerNodeList)
|
foreach(workerNodeCell, workerNodeList)
|
||||||
{
|
{
|
||||||
|
@ -2558,6 +2579,8 @@ InitializeWorkerNodeCache(void)
|
||||||
workerNode->nodeRole = currentNode->nodeRole;
|
workerNode->nodeRole = currentNode->nodeRole;
|
||||||
strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN);
|
strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN);
|
||||||
|
|
||||||
|
newWorkerNodeArray[workerNodeIndex++] = workerNode;
|
||||||
|
|
||||||
if (handleFound)
|
if (handleFound)
|
||||||
{
|
{
|
||||||
ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"",
|
ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"",
|
||||||
|
@ -2571,6 +2594,15 @@ InitializeWorkerNodeCache(void)
|
||||||
|
|
||||||
/* now, safe to destroy the old hash */
|
/* now, safe to destroy the old hash */
|
||||||
hash_destroy(oldWorkerNodeHash);
|
hash_destroy(oldWorkerNodeHash);
|
||||||
|
|
||||||
|
if (WorkerNodeArray != NULL)
|
||||||
|
{
|
||||||
|
pfree(WorkerNodeArray);
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerNodeCount = newWorkerNodeCount;
|
||||||
|
WorkerNodeArray = newWorkerNodeArray;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue