mirror of https://github.com/citusdata/citus.git
Merge pull request #5690 from citusdata/marcocitus/placement-policy-cleanup
commit
d7858709b4
|
@ -67,7 +67,6 @@
|
||||||
/* Shard related configuration */
|
/* Shard related configuration */
|
||||||
int ShardCount = 32;
|
int ShardCount = 32;
|
||||||
int ShardReplicationFactor = 1; /* desired replication factor for shards */
|
int ShardReplicationFactor = 1; /* desired replication factor for shards */
|
||||||
int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN;
|
|
||||||
int NextShardId = 0;
|
int NextShardId = 0;
|
||||||
int NextPlacementId = 0;
|
int NextPlacementId = 0;
|
||||||
|
|
||||||
|
|
|
@ -171,26 +171,9 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
||||||
/* first retrieve a list of random nodes for shard placements */
|
/* first retrieve a list of random nodes for shard placements */
|
||||||
while (candidateNodeIndex < attemptableNodeCount)
|
while (candidateNodeIndex < attemptableNodeCount)
|
||||||
{
|
{
|
||||||
WorkerNode *candidateNode = NULL;
|
WorkerNode *candidateNode = WorkerGetRoundRobinCandidateNode(workerNodeList,
|
||||||
|
shardId,
|
||||||
if (ShardPlacementPolicy == SHARD_PLACEMENT_LOCAL_NODE_FIRST)
|
candidateNodeIndex);
|
||||||
{
|
|
||||||
candidateNode = WorkerGetLocalFirstCandidateNode(candidateNodeList);
|
|
||||||
}
|
|
||||||
else if (ShardPlacementPolicy == SHARD_PLACEMENT_ROUND_ROBIN)
|
|
||||||
{
|
|
||||||
candidateNode = WorkerGetRoundRobinCandidateNode(workerNodeList, shardId,
|
|
||||||
candidateNodeIndex);
|
|
||||||
}
|
|
||||||
else if (ShardPlacementPolicy == SHARD_PLACEMENT_RANDOM)
|
|
||||||
{
|
|
||||||
candidateNode = WorkerGetRandomCandidateNode(candidateNodeList);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("unrecognized shard placement policy")));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (candidateNode == NULL)
|
if (candidateNode == NULL)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("could only find %u of %u possible nodes",
|
ereport(ERROR, (errmsg("could only find %u of %u possible nodes",
|
||||||
|
|
|
@ -40,12 +40,6 @@ int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size *
|
||||||
|
|
||||||
|
|
||||||
/* Local functions forward declarations */
|
/* Local functions forward declarations */
|
||||||
static WorkerNode * WorkerGetNodeWithName(const char *hostname);
|
|
||||||
static char * ClientHostAddress(StringInfo remoteHostStringInfo);
|
|
||||||
static List * PrimaryNodesNotInList(List *currentList);
|
|
||||||
static WorkerNode * FindRandomNodeFromList(List *candidateWorkerNodeList);
|
|
||||||
static bool OddNumber(uint32 number);
|
|
||||||
static bool ListMember(List *currentList, WorkerNode *workerNode);
|
|
||||||
static bool NodeIsPrimaryWorker(WorkerNode *node);
|
static bool NodeIsPrimaryWorker(WorkerNode *node);
|
||||||
static bool NodeIsReadableWorker(WorkerNode *node);
|
static bool NodeIsReadableWorker(WorkerNode *node);
|
||||||
|
|
||||||
|
@ -55,73 +49,6 @@ static bool NodeIsReadableWorker(WorkerNode *node);
|
||||||
* ------------------------------------------------------------
|
* ------------------------------------------------------------
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/*
|
|
||||||
* WorkerGetRandomCandidateNode accepts a list of WorkerNode's and returns a random
|
|
||||||
* primary node which is not in that list.
|
|
||||||
*
|
|
||||||
* Note that the function returns null if the worker membership list does not
|
|
||||||
* contain enough nodes to allocate a new worker node.
|
|
||||||
*/
|
|
||||||
WorkerNode *
|
|
||||||
WorkerGetRandomCandidateNode(List *currentNodeList)
|
|
||||||
{
|
|
||||||
WorkerNode *workerNode = NULL;
|
|
||||||
bool wantSameRack = false;
|
|
||||||
uint32 tryCount = WORKER_RACK_TRIES;
|
|
||||||
|
|
||||||
uint32 currentNodeCount = list_length(currentNodeList);
|
|
||||||
List *candidateWorkerNodeList = PrimaryNodesNotInList(currentNodeList);
|
|
||||||
|
|
||||||
/* we check if the shard has already been placed on all nodes known to us */
|
|
||||||
if (list_length(candidateWorkerNodeList) == 0)
|
|
||||||
{
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* if current node list is empty, randomly pick one node and return */
|
|
||||||
if (currentNodeCount == 0)
|
|
||||||
{
|
|
||||||
workerNode = FindRandomNodeFromList(candidateWorkerNodeList);
|
|
||||||
return workerNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If the current list has an odd number of nodes (1, 3, 5, etc), we want to
|
|
||||||
* place the shard on a different rack than the first node's rack.
|
|
||||||
* Otherwise, we want to place the shard on the same rack as the first node.
|
|
||||||
*/
|
|
||||||
if (OddNumber(currentNodeCount))
|
|
||||||
{
|
|
||||||
wantSameRack = false;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
wantSameRack = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* We try to find a worker node that fits our rack-aware placement strategy.
|
|
||||||
* If after a predefined number of tries, we still cannot find such a node,
|
|
||||||
* we simply give up and return the last worker node we found.
|
|
||||||
*/
|
|
||||||
for (uint32 tryIndex = 0; tryIndex < tryCount; tryIndex++)
|
|
||||||
{
|
|
||||||
WorkerNode *firstNode = (WorkerNode *) linitial(currentNodeList);
|
|
||||||
char *firstRack = firstNode->workerRack;
|
|
||||||
|
|
||||||
workerNode = FindRandomNodeFromList(candidateWorkerNodeList);
|
|
||||||
char *workerRack = workerNode->workerRack;
|
|
||||||
|
|
||||||
bool sameRack = (strncmp(workerRack, firstRack, WORKER_LENGTH) == 0);
|
|
||||||
if ((sameRack && wantSameRack) || (!sameRack && !wantSameRack))
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return workerNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WorkerGetRoundRobinCandidateNode takes in a list of worker nodes and returns
|
* WorkerGetRoundRobinCandidateNode takes in a list of worker nodes and returns
|
||||||
|
@ -152,147 +79,6 @@ WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* WorkerGetLocalFirstCandidateNode takes in a list of worker nodes, and then
|
|
||||||
* allocates a new worker node. The allocation is performed according to the
|
|
||||||
* following policy: if the list is empty, the node where the caller is connecting
|
|
||||||
* from is allocated; if the list is not empty, a node is allocated according
|
|
||||||
* to random policy.
|
|
||||||
*/
|
|
||||||
WorkerNode *
|
|
||||||
WorkerGetLocalFirstCandidateNode(List *currentNodeList)
|
|
||||||
{
|
|
||||||
WorkerNode *candidateNode = NULL;
|
|
||||||
uint32 currentNodeCount = list_length(currentNodeList);
|
|
||||||
|
|
||||||
/* choose first candidate node to be the client's host */
|
|
||||||
if (currentNodeCount == 0)
|
|
||||||
{
|
|
||||||
StringInfo clientHostStringInfo = makeStringInfo();
|
|
||||||
char *errorMessage = ClientHostAddress(clientHostStringInfo);
|
|
||||||
|
|
||||||
if (errorMessage != NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("%s", errorMessage),
|
|
||||||
errdetail("Could not find the first worker "
|
|
||||||
"node for local-node-first policy."),
|
|
||||||
errhint("Make sure that you are not on the "
|
|
||||||
"master node.")));
|
|
||||||
}
|
|
||||||
|
|
||||||
/* if hostname is localhost.localdomain, change it to localhost */
|
|
||||||
char *clientHost = clientHostStringInfo->data;
|
|
||||||
if (strncmp(clientHost, "localhost.localdomain", WORKER_LENGTH) == 0)
|
|
||||||
{
|
|
||||||
clientHost = pstrdup("localhost");
|
|
||||||
}
|
|
||||||
|
|
||||||
candidateNode = WorkerGetNodeWithName(clientHost);
|
|
||||||
if (candidateNode == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not find worker node for "
|
|
||||||
"host: %s", clientHost)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* find a candidate node different from those already selected */
|
|
||||||
candidateNode = WorkerGetRandomCandidateNode(currentNodeList);
|
|
||||||
}
|
|
||||||
|
|
||||||
return candidateNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ClientHostAddress appends the connecting client's fully qualified hostname
|
|
||||||
* to the given StringInfo. If there is no such connection or the connection is
|
|
||||||
* over Unix domain socket, the function fills the error message and returns it.
|
|
||||||
* On success, it just returns NULL.
|
|
||||||
*/
|
|
||||||
static char *
|
|
||||||
ClientHostAddress(StringInfo clientHostStringInfo)
|
|
||||||
{
|
|
||||||
Port *port = MyProcPort;
|
|
||||||
char *clientHost = NULL;
|
|
||||||
char *errorMessage = NULL;
|
|
||||||
int clientHostLength = NI_MAXHOST;
|
|
||||||
int flags = NI_NAMEREQD; /* require fully qualified hostname */
|
|
||||||
int nameFound = 0;
|
|
||||||
|
|
||||||
if (port == NULL)
|
|
||||||
{
|
|
||||||
errorMessage = "cannot find tcp/ip connection to client";
|
|
||||||
return errorMessage;
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (port->raddr.addr.ss_family)
|
|
||||||
{
|
|
||||||
case AF_INET:
|
|
||||||
#ifdef HAVE_IPV6
|
|
||||||
case AF_INET6:
|
|
||||||
#endif
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
|
||||||
{
|
|
||||||
errorMessage = "invalid address family in connection";
|
|
||||||
return errorMessage;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
clientHost = palloc0(clientHostLength);
|
|
||||||
|
|
||||||
nameFound = pg_getnameinfo_all(&port->raddr.addr, port->raddr.salen,
|
|
||||||
clientHost, clientHostLength, NULL, 0, flags);
|
|
||||||
if (nameFound == 0)
|
|
||||||
{
|
|
||||||
appendStringInfo(clientHostStringInfo, "%s", clientHost);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
StringInfo errorMessageStringInfo = makeStringInfo();
|
|
||||||
appendStringInfo(errorMessageStringInfo, "could not resolve client host: %s",
|
|
||||||
gai_strerror(nameFound));
|
|
||||||
|
|
||||||
errorMessage = errorMessageStringInfo->data;
|
|
||||||
return errorMessage;
|
|
||||||
}
|
|
||||||
|
|
||||||
return errorMessage;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* WorkerGetNodeWithName finds and returns a node from the membership list that
|
|
||||||
* has the given hostname. The function returns null if no such node exists.
|
|
||||||
*/
|
|
||||||
static WorkerNode *
|
|
||||||
WorkerGetNodeWithName(const char *hostname)
|
|
||||||
{
|
|
||||||
WorkerNode *workerNode = NULL;
|
|
||||||
HASH_SEQ_STATUS status;
|
|
||||||
HTAB *workerNodeHash = GetWorkerNodeHash();
|
|
||||||
|
|
||||||
hash_seq_init(&status, workerNodeHash);
|
|
||||||
|
|
||||||
while ((workerNode = hash_seq_search(&status)) != NULL)
|
|
||||||
{
|
|
||||||
int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH);
|
|
||||||
if (nameCompare == 0)
|
|
||||||
{
|
|
||||||
/* we need to terminate the scan since we break */
|
|
||||||
hash_seq_term(&status);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return workerNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ActivePrimaryNonCoordinatorNodeCount returns the number of groups with a primary in the cluster.
|
* ActivePrimaryNonCoordinatorNodeCount returns the number of groups with a primary in the cluster.
|
||||||
* This method excludes coordinator even if it is added as a worker to cluster.
|
* This method excludes coordinator even if it is added as a worker to cluster.
|
||||||
|
@ -542,84 +328,6 @@ NodeIsReadableWorker(WorkerNode *node)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* PrimaryNodesNotInList scans through the worker node hash and returns a list of all
|
|
||||||
* primary nodes which are not in currentList. It runs in O(n*m) but currentList is
|
|
||||||
* quite small.
|
|
||||||
*/
|
|
||||||
static List *
|
|
||||||
PrimaryNodesNotInList(List *currentList)
|
|
||||||
{
|
|
||||||
List *workerNodeList = NIL;
|
|
||||||
HTAB *workerNodeHash = GetWorkerNodeHash();
|
|
||||||
WorkerNode *workerNode = NULL;
|
|
||||||
HASH_SEQ_STATUS status;
|
|
||||||
|
|
||||||
hash_seq_init(&status, workerNodeHash);
|
|
||||||
|
|
||||||
while ((workerNode = hash_seq_search(&status)) != NULL)
|
|
||||||
{
|
|
||||||
if (ListMember(currentList, workerNode))
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (NodeIsPrimary(workerNode))
|
|
||||||
{
|
|
||||||
workerNodeList = lappend(workerNodeList, workerNode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return workerNodeList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* FindRandomNodeFromList picks a random node from the list provided to it. */
|
|
||||||
static WorkerNode *
|
|
||||||
FindRandomNodeFromList(List *candidateWorkerNodeList)
|
|
||||||
{
|
|
||||||
uint32 candidateNodeCount = list_length(candidateWorkerNodeList);
|
|
||||||
|
|
||||||
/* nb, the random seed has already been set by the postmaster when starting up */
|
|
||||||
uint32 workerPosition = (random() % candidateNodeCount);
|
|
||||||
|
|
||||||
WorkerNode *workerNode =
|
|
||||||
(WorkerNode *) list_nth(candidateWorkerNodeList, workerPosition);
|
|
||||||
|
|
||||||
return workerNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* OddNumber function returns true if given number is odd; returns false otherwise.
|
|
||||||
*/
|
|
||||||
static bool
|
|
||||||
OddNumber(uint32 number)
|
|
||||||
{
|
|
||||||
bool oddNumber = ((number % 2) == 1);
|
|
||||||
return oddNumber;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* Checks if given worker node is a member of the current list. */
|
|
||||||
static bool
|
|
||||||
ListMember(List *currentList, WorkerNode *workerNode)
|
|
||||||
{
|
|
||||||
Size keySize = WORKER_LENGTH + sizeof(uint32);
|
|
||||||
|
|
||||||
WorkerNode *currentNode = NULL;
|
|
||||||
foreach_ptr(currentNode, currentList)
|
|
||||||
{
|
|
||||||
if (WorkerNodeCompare(workerNode, currentNode, keySize) == 0)
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CompareWorkerNodes compares two pointers to worker nodes using the exact
|
* CompareWorkerNodes compares two pointers to worker nodes using the exact
|
||||||
* same logic employed by WorkerNodeCompare.
|
* same logic employed by WorkerNodeCompare.
|
||||||
|
|
|
@ -168,13 +168,6 @@ static const struct config_enum_entry task_executor_type_options[] = {
|
||||||
{ NULL, 0, false }
|
{ NULL, 0, false }
|
||||||
};
|
};
|
||||||
|
|
||||||
static const struct config_enum_entry shard_placement_policy_options[] = {
|
|
||||||
{ "local-node-first", SHARD_PLACEMENT_LOCAL_NODE_FIRST, false },
|
|
||||||
{ "round-robin", SHARD_PLACEMENT_ROUND_ROBIN, false },
|
|
||||||
{ "random", SHARD_PLACEMENT_RANDOM, false },
|
|
||||||
{ NULL, 0, false }
|
|
||||||
};
|
|
||||||
|
|
||||||
static const struct config_enum_entry use_secondary_nodes_options[] = {
|
static const struct config_enum_entry use_secondary_nodes_options[] = {
|
||||||
{ "never", USE_SECONDARY_NODES_NEVER, false },
|
{ "never", USE_SECONDARY_NODES_NEVER, false },
|
||||||
{ "always", USE_SECONDARY_NODES_ALWAYS, false },
|
{ "always", USE_SECONDARY_NODES_ALWAYS, false },
|
||||||
|
@ -1629,22 +1622,6 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_STANDARD,
|
GUC_STANDARD,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomEnumVariable(
|
|
||||||
"citus.shard_placement_policy",
|
|
||||||
gettext_noop("Sets the policy to use when choosing nodes for shard placement."),
|
|
||||||
gettext_noop("The master node chooses which worker nodes to place new shards "
|
|
||||||
"on. This configuration value specifies the policy to use when "
|
|
||||||
"selecting these nodes. The local-node-first policy places the "
|
|
||||||
"first replica on the client node and chooses others randomly. "
|
|
||||||
"The round-robin policy aims to distribute shards evenly across "
|
|
||||||
"the cluster by selecting nodes in a round-robin fashion."
|
|
||||||
"The random policy picks all workers randomly."),
|
|
||||||
&ShardPlacementPolicy,
|
|
||||||
SHARD_PLACEMENT_ROUND_ROBIN, shard_placement_policy_options,
|
|
||||||
PGC_USERSET,
|
|
||||||
GUC_STANDARD,
|
|
||||||
NULL, NULL, NULL);
|
|
||||||
|
|
||||||
DefineCustomIntVariable(
|
DefineCustomIntVariable(
|
||||||
"citus.shard_replication_factor",
|
"citus.shard_replication_factor",
|
||||||
gettext_noop("Sets the replication factor for shards."),
|
gettext_noop("Sets the replication factor for shards."),
|
||||||
|
|
|
@ -72,15 +72,6 @@
|
||||||
#define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s CASCADE"
|
#define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s CASCADE"
|
||||||
#define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s AUTHORIZATION %s"
|
#define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s AUTHORIZATION %s"
|
||||||
|
|
||||||
/* Enumeration that defines the shard placement policy to use while staging */
|
|
||||||
typedef enum
|
|
||||||
{
|
|
||||||
SHARD_PLACEMENT_INVALID_FIRST = 0,
|
|
||||||
SHARD_PLACEMENT_LOCAL_NODE_FIRST = 1,
|
|
||||||
SHARD_PLACEMENT_ROUND_ROBIN = 2,
|
|
||||||
SHARD_PLACEMENT_RANDOM = 3
|
|
||||||
} ShardPlacementPolicyType;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TableDDLCommandType encodes the implementation used by TableDDLCommand. See comments in
|
* TableDDLCommandType encodes the implementation used by TableDDLCommand. See comments in
|
||||||
* TableDDLCpmmand for details.
|
* TableDDLCpmmand for details.
|
||||||
|
@ -212,7 +203,6 @@ extern TableDDLCommand * ColumnarGetCustomTableOptionsDDL(char *schemaName,
|
||||||
/* Config variables managed via guc.c */
|
/* Config variables managed via guc.c */
|
||||||
extern int ShardCount;
|
extern int ShardCount;
|
||||||
extern int ShardReplicationFactor;
|
extern int ShardReplicationFactor;
|
||||||
extern int ShardPlacementPolicy;
|
|
||||||
extern int NextShardId;
|
extern int NextShardId;
|
||||||
extern int NextPlacementId;
|
extern int NextPlacementId;
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,6 @@ extern WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList);
|
||||||
extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList,
|
extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList,
|
||||||
uint64 shardId,
|
uint64 shardId,
|
||||||
uint32 placementIndex);
|
uint32 placementIndex);
|
||||||
extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList);
|
|
||||||
extern uint32 ActivePrimaryNonCoordinatorNodeCount(void);
|
extern uint32 ActivePrimaryNonCoordinatorNodeCount(void);
|
||||||
extern uint32 ActivePrimaryNodeCount(void);
|
extern uint32 ActivePrimaryNodeCount(void);
|
||||||
extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode);
|
extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode);
|
||||||
|
|
Loading…
Reference in New Issue