pull/801/merge
Brian Cloutier 2016-09-23 17:51:32 +00:00 committed by GitHub
commit fdfffc208c
9 changed files with 147 additions and 195 deletions

View File

@ -36,7 +36,7 @@ int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size *
/* Local functions forward declarations */
static char * ClientHostAddress(StringInfo remoteHostStringInfo);
static WorkerNode * FindRandomNodeNotInList(HTAB *WorkerNodesHash,
static WorkerNode * FindRandomNodeNotInList(List *workerNodesList,
List *currentNodeList);
static bool ListMember(List *currentList, WorkerNode *workerNode);
static int WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize);
@ -60,22 +60,26 @@ WorkerNode *
WorkerGetRandomCandidateNode(List *currentNodeList)
{
WorkerNode *workerNode = NULL;
HTAB *workerNodeHash = GetWorkerNodeHash();
WorkerNode *resultWorkerNode = NULL;
List *workerNodeList = GetWorkerNodeList();
/*
* We check if the shard has already been placed on all nodes known to us.
* This check is rather defensive, and has the drawback of performing a full
* scan over the worker node hash for determining the number of live nodes.
*/
uint32 currentNodeCount = list_length(currentNodeList);
uint32 liveNodeCount = WorkerGetLiveNodeCount();
uint32 liveNodeCount = list_length(workerNodeList);
if (currentNodeCount >= liveNodeCount)
{
return NULL;
}
workerNode = FindRandomNodeNotInList(workerNodeHash, currentNodeList);
return workerNode;
workerNode = FindRandomNodeNotInList(workerNodeList, currentNodeList);
resultWorkerNode = palloc(sizeof(WorkerNode));
memcpy(resultWorkerNode, workerNode, sizeof(WorkerNode));
list_free_deep(workerNodeList);
return resultWorkerNode;
}
@ -229,24 +233,25 @@ ClientHostAddress(StringInfo clientHostStringInfo)
WorkerNode *
WorkerGetNodeWithName(const char *hostname)
{
WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status;
HTAB *workerNodeHash = GetWorkerNodeHash();
WorkerNode *resultWorkerNode = NULL;
ListCell *workerNodeCell = NULL;
List *workerNodeList = GetWorkerNodeList();
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH);
if (nameCompare == 0)
{
/* we need to terminate the scan since we break */
hash_seq_term(&status);
resultWorkerNode = palloc(sizeof(WorkerNode));
memcpy(resultWorkerNode, workerNode, sizeof(WorkerNode));
break;
}
}
return workerNode;
list_free_deep(workerNodeList);
return resultWorkerNode;
}
@ -254,41 +259,29 @@ WorkerGetNodeWithName(const char *hostname)
uint32
WorkerGetLiveNodeCount(void)
{
WorkerNode *workerNode = NULL;
uint32 liveWorkerCount = 0;
HASH_SEQ_STATUS status;
HTAB *workerNodeHash = GetWorkerNodeHash();
List *workerNodeList = GetWorkerNodeList();
hash_seq_init(&status, workerNodeHash);
/* todo: this is such a waste of resources. we create a whole new list just to count
* it and immediately free it */
while ((workerNode = hash_seq_search(&status)) != NULL)
{
liveWorkerCount++;
}
liveWorkerCount = list_length(workerNodeList);
list_free_deep(workerNodeList);
return liveWorkerCount;
}
/*
* WorkerNodeList iterates over the hash table that includes the worker nodes, and adds
* them to a list which is returned.
* WorkerNodeList returns a list of worker nodes, don't forget to free it after you've
* used it!
*
* todo: add free to all call sites?
*/
List *
WorkerNodeList(void)
{
List *workerNodeList = NIL;
WorkerNode *workerNode = NULL;
HTAB *workerNodeHash = GetWorkerNodeHash();
HASH_SEQ_STATUS status;
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
{
workerNodeList = lappend(workerNodeList, workerNode);
}
List *workerNodeList = GetWorkerNodeList();
return workerNodeList;
}
@ -301,24 +294,24 @@ WorkerNodeList(void)
bool
WorkerNodeActive(const char *nodeName, uint32 nodePort)
{
WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status;
HTAB *workerNodeHash = GetWorkerNodeHash();
bool nodeActive = false;
ListCell *workerNodeCell = NULL;
List *workerNodeList = GetWorkerNodeList();
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
if (strncmp(workerNode->workerName, nodeName, WORKER_LENGTH) == 0 &&
workerNode->workerPort == nodePort)
{
/* we need to terminate the scan since we break */
hash_seq_term(&status);
nodeActive = true;
break;
}
}
return workerNode != NULL;
list_free_deep(workerNodeList);
return nodeActive;
}
@ -327,68 +320,47 @@ WorkerNodeActive(const char *nodeName, uint32 nodePort)
* a member of the current node list. The caller is responsible for making the
* necessary node count checks to ensure that such a node exists.
*
* Note that this function has a selection bias towards nodes whose positions in
* the shared hash are sequentially adjacent to the positions of nodes that are
* in the current node list. This bias follows from our decision to first pick a
* random node in the hash, and if that node is a member of the current list, to
* simply iterate to the next node in the hash. Overall, this approach trades in
* some selection bias for simplicity in design and for bounded execution time.
* This function is O(workerNodesList * currentNodeList). This is unfortunate but was
* easy to implement.
*/
static WorkerNode *
FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList)
FindRandomNodeNotInList(List *workerNodesList, List *currentNodeList)
{
WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status;
uint32 workerNodeCount = 0;
uint32 currentNodeCount PG_USED_FOR_ASSERTS_ONLY = 0;
bool lookForWorkerNode = true;
ListCell *workerNodeCell;
uint32 workerPosition = 0;
uint32 workerIndex = 0;
workerNodeCount = hash_get_num_entries(WorkerNodesHash);
currentNodeCount = list_length(currentNodeList);
uint32 workerNodeCount = list_length(workerNodesList);
uint32 currentNodeCount = list_length(currentNodeList);
Assert(workerNodeCount > currentNodeCount);
/*
* We determine a random position within the worker hash between [1, N],
* assuming that the number of elements in the hash is N. We then get to
* this random position by iterating over the worker hash. Please note that
* the random seed has already been set by the postmaster when starting up.
* Please note that the random seed has already been set by the postmaster when
* starting up.
*/
workerPosition = (random() % workerNodeCount) + 1;
hash_seq_init(&status, WorkerNodesHash);
workerPosition = random() % (workerNodeCount - currentNodeCount);
for (workerIndex = 0; workerIndex < workerPosition; workerIndex++)
{
workerNode = (WorkerNode *) hash_seq_search(&status);
}
while (lookForWorkerNode)
foreach(workerNodeCell, workerNodesList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
bool listMember = ListMember(currentNodeList, workerNode);
if (!listMember)
if (listMember)
{
lookForWorkerNode = false;
continue;
}
else
{
/* iterate to the next worker node in the hash */
workerNode = (WorkerNode *) hash_seq_search(&status);
/* reached end of hash; start from the beginning */
if (workerNode == NULL)
{
hash_seq_init(&status, WorkerNodesHash);
workerNode = (WorkerNode *) hash_seq_search(&status);
}
if (workerIndex == workerPosition)
{
return workerNode;
}
workerIndex++;
}
/* we stopped scanning before completion; therefore clean up scan */
hash_seq_term(&status);
return workerNode;
return NULL;
}

View File

@ -62,8 +62,8 @@ static Oid extraDataContainerFuncId = InvalidOid;
/* Hash table for informations about each partition */
static HTAB *DistTableCacheHash = NULL;
/* Hash table for informations about worker nodes */
static HTAB *WorkerNodeHash = NULL;
/* cached list of active workers */
static List *CachedNodeList = NULL;
/* built first time through in InitializePartitionCache */
static ScanKeyData DistPartitionScanKey[1];
@ -967,19 +967,33 @@ InitializeDistTableCache(void)
/*
* GetWorkerNodeHash is a wrapper around InitializeWorkerNodeCache(). It
* GetWorkerNodeList is a wrapper around InitializeWorkerNodeCache(). It
* triggers InitializeWorkerNodeCache when the workerHash is NULL. Otherwise,
* it returns the hash.
*/
HTAB *
GetWorkerNodeHash(void)
List *
GetWorkerNodeList(void)
{
if (WorkerNodeHash == NULL)
List *resultList = NULL;
ListCell *workerNodeCell;
if (CachedNodeList == NULL)
{
InitializeWorkerNodeCache();
}
return WorkerNodeHash;
/* deep copy the list so invalidations, which free the list, don't cause crashes */
foreach(workerNodeCell, CachedNodeList)
{
WorkerNode *oldNode = (WorkerNode *) lfirst(workerNodeCell);
WorkerNode *newNode = palloc(sizeof(WorkerNode));
memcpy(newNode, oldNode, sizeof(WorkerNode));
resultList = lappend(resultList, newNode);
}
return resultList;
}
@ -991,12 +1005,10 @@ GetWorkerNodeHash(void)
static void
InitializeWorkerNodeCache(void)
{
MemoryContext oldContext;
static bool invalidationRegistered = false;
List *workerNodeList = NIL;
ListCell *workerNodeCell = NULL;
HASHCTL info;
int hashFlags = 0;
long maxTableSize = (long) MaxWorkerNodesTracked;
List *workerList = NULL;
ListCell *workerListCell;
/* make sure we've initialized CacheMemoryContext */
if (CacheMemoryContext == NULL)
@ -1004,58 +1016,24 @@ InitializeWorkerNodeCache(void)
CreateCacheMemoryContext();
}
/*
* Create the hash that holds the worker nodes. The key is the unique nodeid
* field.
*/
memset(&info, 0, sizeof(info));
info.keysize = sizeof(uint32);
info.entrysize = sizeof(WorkerNode);
info.hcxt = CacheMemoryContext;
info.hash = tag_hash;
hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT;
WorkerNodeHash = hash_create("Worker Node Hash",
maxTableSize,
&info, hashFlags);
Assert(CachedNodeList == NULL);
/* read the list from the pg_dist_node */
workerNodeList = ReadWorkerNodes();
workerList = ReadWorkerNodes();
/* iterate over the worker node list */
foreach(workerNodeCell, workerNodeList)
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
foreach(workerListCell, workerList)
{
WorkerNode *workerNode = NULL;
WorkerNode *currentNode = lfirst(workerNodeCell);
void *hashKey = NULL;
bool handleFound = false;
WorkerNode *workerNode = (WorkerNode *) lfirst(workerListCell);
WorkerNode *newWorkerNode = palloc(sizeof(WorkerNode));
/*
* Search for the worker node in the hash, and then insert the
* values. When searching, we make the hashKey the unique nodeid.
*/
hashKey = (void *) &currentNode->nodeId;
workerNode = (WorkerNode *) hash_search(WorkerNodeHash, hashKey,
HASH_ENTER, &handleFound);
/* fill the newly allocated workerNode in the cache */
strlcpy(workerNode->workerName, currentNode->workerName, WORKER_LENGTH);
workerNode->workerPort = currentNode->workerPort;
workerNode->groupId = currentNode->groupId;
if (handleFound)
{
ereport(WARNING, (errmsg("multiple lines for worker node: \"%s:%u\"",
workerNode->workerName,
workerNode->workerPort)));
}
workerNode->workerPort = currentNode->workerPort;
/* we do not need the currentNode anymore */
pfree(currentNode);
memcpy(newWorkerNode, workerNode, sizeof(WorkerNode));
CachedNodeList = lappend(CachedNodeList, newWorkerNode);
}
MemoryContextSwitchTo(oldContext);
/* prevent multiple invalidation registrations */
if (!invalidationRegistered)
{
@ -1181,18 +1159,18 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
/*
* InvalidateNodeRelationCacheCallback destroys the WorkerNodeHash when
* any change happens on pg_dist_node table. It also set WorkerNodeHash to
* InvalidateNodeRelationCacheCallback destroys the CachedNodeList when
* any change happens on pg_dist_node table. It also set CachedNodeList to
* NULL, which allows consequent accesses to the hash read from the
* pg_dist_node from scratch.
*/
static void
InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId)
{
if (WorkerNodeHash != NULL && relationId == DistNodeRelationId())
if (CachedNodeList != NULL && relationId == DistNodeRelationId())
{
hash_destroy(WorkerNodeHash);
WorkerNodeHash = NULL;
list_free_deep(CachedNodeList);
CachedNodeList = NULL;
}
}

View File

@ -215,24 +215,26 @@ GenerateNodeTuple(WorkerNode *workerNode)
static WorkerNode *
FindWorkerNode(char *nodeName, int32 nodePort)
{
WorkerNode *workerNode = NULL;
HTAB *workerNodeHash = GetWorkerNodeHash();
HASH_SEQ_STATUS status;
ListCell *workerNodeCell = NULL;
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
List *workerNodeList = GetWorkerNodeList();
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
if (strncasecmp(nodeName, workerNode->workerName, WORKER_LENGTH) == 0 &&
nodePort == workerNode->workerPort)
{
/* we need to terminate the scan since we break */
hash_seq_term(&status);
break;
WorkerNode *resultWorkerNode = palloc(sizeof(WorkerNode));
memcpy(resultWorkerNode, workerNode, sizeof(WorkerNode));
list_free_deep(workerNodeList);
return resultWorkerNode;
}
}
return workerNode;
list_free_deep(workerNodeList);
return NULL;
}
@ -271,14 +273,12 @@ static uint32
GetMaxGroupId()
{
uint32 maxGroupId = 0;
WorkerNode *workerNode = NULL;
HTAB *workerNodeHash = GetWorkerNodeHash();
HASH_SEQ_STATUS status;
ListCell *workerNodeCell = NULL;
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
List *workerNodeList = GetWorkerNodeList();
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
uint32 workerNodeGroupId = workerNode->groupId;
if (workerNodeGroupId > maxGroupId)
@ -287,6 +287,7 @@ GetMaxGroupId()
}
}
list_free_deep(workerNodeList);
return maxGroupId;
}
@ -299,14 +300,12 @@ static uint64
GetNodeCountInGroup(uint32 groupId)
{
uint64 elementCountInGroup = 0;
WorkerNode *workerNode = NULL;
HTAB *workerNodeHash = GetWorkerNodeHash();
HASH_SEQ_STATUS status;
ListCell *workerNodeCell;
List *workerNodeList = GetWorkerNodeList();
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
uint32 workerNodeGroupId = workerNode->groupId;
if (workerNodeGroupId == groupId)
@ -315,6 +314,7 @@ GetNodeCountInGroup(uint32 groupId)
}
}
list_free_deep(workerNodeList);
return elementCountInGroup;
}

View File

@ -60,8 +60,8 @@ extern void CitusInvalidateNodeCache(void);
extern bool CitusHasBeenLoaded(void);
/* access WorkerNodeHash */
extern HTAB * GetWorkerNodeHash(void);
/* access WorkerNodeList */
extern List * GetWorkerNodeList(void);
/* relation oids */
extern Oid DistPartitionRelationId(void);

View File

@ -25,12 +25,14 @@
/*
* In memory representation of pg_dist_node table elements. The elements are hold in
* WorkerNodeHash table.
* In memory representation of pg_dist_node table elements.
*
* Caution: In some places we use memcpy to copy WorkerNodes. Those sites will have to be
* changed if you add any pointers to this structure.
*/
typedef struct WorkerNode
{
uint32 nodeId; /* node's unique id, key of the hash table */
uint32 nodeId; /* node's unique id */
uint32 workerPort; /* node's port */
char workerName[WORKER_LENGTH]; /* node's name */
uint32 groupId; /* node's groupId; same for the nodes that are in the same group */

View File

@ -1,15 +1,15 @@
-- Tests functions related to cluster membership
-- add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-----------------------
(1,1,localhost,57637)
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------
(2,2,localhost,57638)
(1,1,localhost,57638)
(1 row)
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-----------------------
(2,2,localhost,57637)
(1 row)
-- get the active nodes
@ -24,7 +24,7 @@ SELECT master_get_active_worker_nodes();
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-----------------------
(1,1,localhost,57637)
(2,2,localhost,57637)
(1 row)
-- get the active nodes

View File

@ -66,16 +66,16 @@ SELECT * FROM pg_dist_shard_placement;
DROP EXTENSION citus;
CREATE EXTENSION citus;
-- re-add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-----------------------
(1,1,localhost,57637)
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------
(2,2,localhost,57638)
(1,1,localhost,57638)
(1 row)
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-----------------------
(2,2,localhost,57637)
(1 row)
-- create a table with a SERIAL column

View File

@ -1,8 +1,8 @@
-- Tests functions related to cluster membership
-- add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
SELECT master_add_node('localhost', :worker_2_port);
SELECT master_add_node('localhost', :worker_1_port);
-- get the active nodes
SELECT master_get_active_worker_nodes();

View File

@ -46,8 +46,8 @@ DROP EXTENSION citus;
CREATE EXTENSION citus;
-- re-add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
SELECT master_add_node('localhost', :worker_2_port);
SELECT master_add_node('localhost', :worker_1_port);
-- create a table with a SERIAL column
CREATE TABLE testserialtable(id serial, group_id integer);