citus/src/backend/distributed/operations/worker_node_manager.c

559 lines
14 KiB
C

/*-------------------------------------------------------------------------
*
* worker_node_manager.c
* Routines for reading worker nodes from membership file, and allocating
* candidate nodes for shard placement.
*
* Copyright (c) Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "commands/dbcommands.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/hash_helpers.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h"
#include "distributed/worker_manager.h"
#include "libpq/hba.h"
#include "common/ip.h"
#include "libpq/libpq-be.h"
#include "postmaster/postmaster.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/shmem.h"
#include "utils/guc.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
/* Config variables managed via guc.c */
char *WorkerListFileName;
int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size */
/* Local functions forward declarations */
static List * PrimaryNodesNotInList(List *currentList);
static WorkerNode * FindRandomNodeFromList(List *candidateWorkerNodeList);
static bool OddNumber(uint32 number);
static bool ListMember(List *currentList, WorkerNode *workerNode);
static bool NodeIsPrimaryWorker(WorkerNode *node);
static bool NodeIsReadableWorker(WorkerNode *node);
/* ------------------------------------------------------------
* Worker node selection functions follow
* ------------------------------------------------------------
*/
/*
* WorkerGetRandomCandidateNode accepts a list of WorkerNode's and returns a random
* primary node which is not in that list.
*
* Note that the function returns null if the worker membership list does not
* contain enough nodes to allocate a new worker node.
*/
WorkerNode *
WorkerGetRandomCandidateNode(List *currentNodeList)
{
WorkerNode *workerNode = NULL;
bool wantSameRack = false;
uint32 tryCount = WORKER_RACK_TRIES;
uint32 currentNodeCount = list_length(currentNodeList);
List *candidateWorkerNodeList = PrimaryNodesNotInList(currentNodeList);
/* we check if the shard has already been placed on all nodes known to us */
if (list_length(candidateWorkerNodeList) == 0)
{
return NULL;
}
/* if current node list is empty, randomly pick one node and return */
if (currentNodeCount == 0)
{
workerNode = FindRandomNodeFromList(candidateWorkerNodeList);
return workerNode;
}
/*
* If the current list has an odd number of nodes (1, 3, 5, etc), we want to
* place the shard on a different rack than the first node's rack.
* Otherwise, we want to place the shard on the same rack as the first node.
*/
if (OddNumber(currentNodeCount))
{
wantSameRack = false;
}
else
{
wantSameRack = true;
}
/*
* We try to find a worker node that fits our rack-aware placement strategy.
* If after a predefined number of tries, we still cannot find such a node,
* we simply give up and return the last worker node we found.
*/
for (uint32 tryIndex = 0; tryIndex < tryCount; tryIndex++)
{
WorkerNode *firstNode = (WorkerNode *) linitial(currentNodeList);
char *firstRack = firstNode->workerRack;
workerNode = FindRandomNodeFromList(candidateWorkerNodeList);
char *workerRack = workerNode->workerRack;
bool sameRack = (strncmp(workerRack, firstRack, WORKER_LENGTH) == 0);
if ((sameRack && wantSameRack) || (!sameRack && !wantSameRack))
{
break;
}
}
return workerNode;
}
/*
* WorkerGetRoundRobinCandidateNode takes in a list of worker nodes and returns
* a candidate worker node from that list. To select this node, this function
* uses the round-robin policy. An ideal round-robin implementation requires
* keeping shared state for shard placements; and we instead approximate our
* implementation by relying on the ever-increasing shardId. So, the first
* worker node selected will be the node at the (shardId MOD worker node count)
* index and the remaining candidate nodes will be the next nodes in the list.
*
* Note that the function returns null if the worker membership list does not
* contain enough nodes to place all replicas.
*/
WorkerNode *
WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId,
uint32 placementIndex)
{
uint32 workerNodeCount = list_length(workerNodeList);
WorkerNode *candidateNode = NULL;
if (placementIndex < workerNodeCount)
{
uint32 candidateNodeIndex = (shardId + placementIndex) % workerNodeCount;
candidateNode = (WorkerNode *) list_nth(workerNodeList, candidateNodeIndex);
}
return candidateNode;
}
/*
* ActivePrimaryNonCoordinatorNodeCount returns the number of groups with a primary in the cluster.
* This method excludes coordinator even if it is added as a worker to cluster.
*/
uint32
ActivePrimaryNonCoordinatorNodeCount(void)
{
List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
uint32 liveWorkerCount = list_length(workerNodeList);
return liveWorkerCount;
}
/*
* ActivePrimaryNodeCount returns the number of groups with a primary in the cluster.
*/
uint32
ActivePrimaryNodeCount(void)
{
List *nodeList = ActivePrimaryNodeList(NoLock);
return list_length(nodeList);
}
/*
* NodeIsCoordinator returns true if the given node represents the coordinator.
*/
bool
NodeIsCoordinator(WorkerNode *node)
{
return node->groupId == COORDINATOR_GROUP_ID;
}
/*
* ActiveNodeListFilterFunc returns a list of all active nodes that checkFunction
* returns true for.
* lockMode specifies which lock to use on pg_dist_node, this is necessary when
* the caller wouldn't want nodes to be added concurrent to their use of this list
*/
static List *
FilterActiveNodeListFunc(LOCKMODE lockMode, bool (*checkFunction)(WorkerNode *))
{
List *workerNodeList = NIL;
WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status;
Assert(checkFunction != NULL);
if (lockMode != NoLock)
{
LockRelationOid(DistNodeRelationId(), lockMode);
}
HTAB *workerNodeHash = GetWorkerNodeHash();
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
{
if (workerNode->isActive && checkFunction(workerNode))
{
WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode));
*workerNodeCopy = *workerNode;
workerNodeList = lappend(workerNodeList, workerNodeCopy);
}
}
return workerNodeList;
}
/*
* ActivePrimaryNonCoordinatorNodeList returns a list of all active primary worker nodes
* in workerNodeHash. lockMode specifies which lock to use on pg_dist_node,
* this is necessary when the caller wouldn't want nodes to be added concurrent
* to their use of this list.
* This method excludes coordinator even if it is added as a worker to cluster.
*/
List *
ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode)
{
EnsureModificationsCanRun();
return FilterActiveNodeListFunc(lockMode, NodeIsPrimaryWorker);
}
/*
* ActivePrimaryNodeList returns a list of all active primary nodes in
* workerNodeHash.
*/
List *
ActivePrimaryNodeList(LOCKMODE lockMode)
{
EnsureModificationsCanRun();
return FilterActiveNodeListFunc(lockMode, NodeIsPrimary);
}
/*
* ActivePrimaryRemoteNodeList returns a list of all active primary nodes in
* workerNodeHash.
*/
List *
ActivePrimaryRemoteNodeList(LOCKMODE lockMode)
{
EnsureModificationsCanRun();
return FilterActiveNodeListFunc(lockMode, NodeIsPrimaryAndRemote);
}
/*
* NodeIsPrimaryWorker returns true if the node is a primary worker node.
*/
static bool
NodeIsPrimaryWorker(WorkerNode *node)
{
return !NodeIsCoordinator(node) && NodeIsPrimary(node);
}
/*
* CoordinatorAddedAsWorkerNode returns true if coordinator is added to the
* pg_dist_node.
*/
bool
CoordinatorAddedAsWorkerNode()
{
bool groupContainsNodes = false;
PrimaryNodeForGroup(COORDINATOR_GROUP_ID, &groupContainsNodes);
return groupContainsNodes;
}
/*
* ReferenceTablePlacementNodeList returns the set of nodes that should have
* reference table placements. This includes all primaries, including the
* coordinator if known.
*/
List *
ReferenceTablePlacementNodeList(LOCKMODE lockMode)
{
EnsureModificationsCanRun();
return FilterActiveNodeListFunc(lockMode, NodeIsPrimary);
}
/*
* CoordinatorNodeIfAddedAsWorkerOrError returns the WorkerNode object for
* coordinator node if it is added to pg_dist_node, otherwise errors out.
* Also, as CoordinatorAddedAsWorkerNode acquires AccessShareLock on pg_dist_node
* and doesn't release it, callers can safely assume coordinator won't be
* removed from metadata until the end of transaction when this function
* returns coordinator node.
*/
WorkerNode *
CoordinatorNodeIfAddedAsWorkerOrError()
{
ErrorIfCoordinatorNotAddedAsWorkerNode();
WorkerNode *coordinatorNode = LookupNodeForGroup(COORDINATOR_GROUP_ID);
WorkerNode *coordinatorNodeCopy = palloc0(sizeof(WorkerNode));
*coordinatorNodeCopy = *coordinatorNode;
return coordinatorNodeCopy;
}
/*
* ErrorIfCoordinatorNotAddedAsWorkerNode errors out if coordinator is not added
* to metadata.
*/
void
ErrorIfCoordinatorNotAddedAsWorkerNode()
{
if (CoordinatorAddedAsWorkerNode())
{
return;
}
ereport(ERROR, (errmsg("could not find the coordinator node in "
"metadata as it is not added as a worker")));
}
/*
* DistributedTablePlacementNodeList returns a list of all active, primary
* worker nodes that can store new data, i.e shouldstoreshards is 'true'
*/
List *
DistributedTablePlacementNodeList(LOCKMODE lockMode)
{
EnsureModificationsCanRun();
return FilterActiveNodeListFunc(lockMode, NodeCanHaveDistTablePlacements);
}
/*
* NodeCanHaveDistTablePlacements returns true if the given node can have
* shards of a distributed table.
*/
bool
NodeCanHaveDistTablePlacements(WorkerNode *node)
{
if (!NodeIsPrimary(node))
{
return false;
}
return node->shouldHaveShards;
}
/*
* ActiveReadableNonCoordinatorNodeList returns a list of all nodes in workerNodeHash
* that are readable nodes This method excludes coordinator.
*/
List *
ActiveReadableNonCoordinatorNodeList(void)
{
return FilterActiveNodeListFunc(NoLock, NodeIsReadableWorker);
}
/*
* ActiveReadableNodeList returns a list of all nodes in workerNodeHash
* that are readable workers.
* This method includes coordinator if it is added as a worker to the cluster.
*/
List *
ActiveReadableNodeList(void)
{
return FilterActiveNodeListFunc(NoLock, NodeIsReadable);
}
/*
* NodeIsReadableWorker returns true if the given node is a readable worker node.
*/
static bool
NodeIsReadableWorker(WorkerNode *node)
{
return !NodeIsCoordinator(node) && NodeIsReadable(node);
}
/*
* PrimaryNodesNotInList scans through the worker node hash and returns a list of all
* primary nodes which are not in currentList. It runs in O(n*m) but currentList is
* quite small.
*/
static List *
PrimaryNodesNotInList(List *currentList)
{
List *workerNodeList = NIL;
HTAB *workerNodeHash = GetWorkerNodeHash();
WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status;
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
{
if (ListMember(currentList, workerNode))
{
continue;
}
if (NodeIsPrimary(workerNode))
{
workerNodeList = lappend(workerNodeList, workerNode);
}
}
return workerNodeList;
}
/* FindRandomNodeFromList picks a random node from the list provided to it. */
static WorkerNode *
FindRandomNodeFromList(List *candidateWorkerNodeList)
{
uint32 candidateNodeCount = list_length(candidateWorkerNodeList);
/* nb, the random seed has already been set by the postmaster when starting up */
uint32 workerPosition = (random() % candidateNodeCount);
WorkerNode *workerNode =
(WorkerNode *) list_nth(candidateWorkerNodeList, workerPosition);
return workerNode;
}
/*
* OddNumber function returns true if given number is odd; returns false otherwise.
*/
static bool
OddNumber(uint32 number)
{
bool oddNumber = ((number % 2) == 1);
return oddNumber;
}
/* Checks if given worker node is a member of the current list. */
static bool
ListMember(List *currentList, WorkerNode *workerNode)
{
Size keySize = WORKER_LENGTH + sizeof(uint32);
WorkerNode *currentNode = NULL;
foreach_ptr(currentNode, currentList)
{
if (WorkerNodeCompare(workerNode, currentNode, keySize) == 0)
{
return true;
}
}
return false;
}
/*
* CompareWorkerNodes compares two pointers to worker nodes using the exact
* same logic employed by WorkerNodeCompare.
*/
int
CompareWorkerNodes(const void *leftElement, const void *rightElement)
{
const void *leftWorker = *((const void **) leftElement);
const void *rightWorker = *((const void **) rightElement);
Size ignoredKeySize = 0;
int compare = WorkerNodeCompare(leftWorker, rightWorker, ignoredKeySize);
return compare;
}
/*
* WorkerNodeCompare compares two worker nodes by their host name and port
* number. Two nodes that only differ by their rack locations are considered to
* be equal to each other.
*/
int
WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize)
{
const WorkerNode *workerLhs = (const WorkerNode *) lhsKey;
const WorkerNode *workerRhs = (const WorkerNode *) rhsKey;
return NodeNamePortCompare(workerLhs->workerName, workerRhs->workerName,
workerLhs->workerPort, workerRhs->workerPort);
}
/*
* NodeNamePortCompare implements the common logic for comparing two nodes
* with their given nodeNames and ports.
*
* This function is useful for ensuring consistency of sort operations between
* different representations of nodes in the cluster such as WorkerNode and
* WorkerPool.
*/
int
NodeNamePortCompare(const char *workerLhsName, const char *workerRhsName,
int workerLhsPort, int workerRhsPort)
{
int nameCompare = strncmp(workerLhsName, workerRhsName, WORKER_LENGTH);
if (nameCompare != 0)
{
return nameCompare;
}
int portCompare = workerLhsPort - workerRhsPort;
return portCompare;
}
/*
* GetFirstPrimaryWorkerNode returns the primary worker node with the
* lowest rank based on CompareWorkerNodes.
*
* The ranking is arbitrary, but needs to be kept consistent with IsFirstWorkerNode.
*/
WorkerNode *
GetFirstPrimaryWorkerNode(void)
{
List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
WorkerNode *firstWorkerNode = NULL;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
if (firstWorkerNode == NULL ||
CompareWorkerNodes(&workerNode, &firstWorkerNode) < 0)
{
firstWorkerNode = workerNode;
}
}
return firstWorkerNode;
}