Make master_create_empty_shard() aware of the shard placement policy

Now, master_create_empty_shard() will create shards according to the
value of citus.shard_placement_policy which also makes default round-robin
instead of random.
pull/489/head
Metin Doslu 2016-04-19 16:48:59 +03:00
parent 2148922cb2
commit afa74ce5ca
8 changed files with 204 additions and 91 deletions

View File

@ -53,7 +53,6 @@ int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */
int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN;
static char * hostname_client_addr(void);
static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor);
@ -343,36 +342,12 @@ master_get_local_first_candidate_nodes(PG_FUNCTION_ARGS)
oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx);
currentNodeList = functionContext->user_fctx;
if (currentNodeCount == 0)
{
/* choose first candidate node to be the client's host */
char *remoteHostname = hostname_client_addr();
/* if hostname is localhost.localdomain, change it to localhost */
int nameCompare = strncmp(remoteHostname, "localhost.localdomain",
WORKER_LENGTH);
if (nameCompare == 0)
{
remoteHostname = pstrdup("localhost");
}
candidateNode = WorkerGetNodeWithName(remoteHostname);
if (candidateNode == NULL)
{
ereport(ERROR, (errmsg("could not find worker node for hostname: %s",
remoteHostname)));
}
}
else
{
/* find a candidate node different from those already selected */
candidateNode = WorkerGetCandidateNode(currentNodeList);
candidateNode = WorkerGetLocalFirstCandidateNode(currentNodeList);
if (candidateNode == NULL)
{
ereport(ERROR, (errmsg("could only find %u of %u required nodes",
currentNodeCount, desiredNodeCount)));
}
}
currentNodeList = lappend(currentNodeList, candidateNode);
functionContext->user_fctx = currentNodeList;
@ -695,56 +670,6 @@ GetTableDDLEvents(Oid relationId)
}
/*
* hostname_client_addr allocates memory for the connecting client's fully
* qualified hostname, and returns this name. If there is no such connection or
* the connection is over Unix domain socket, the function errors.
*/
static char *
hostname_client_addr(void)
{
Port *port = MyProcPort;
char *remoteHost = NULL;
int remoteHostLen = NI_MAXHOST;
int flags = NI_NAMEREQD; /* require fully qualified hostname */
int nameFound = 0;
if (port == NULL)
{
ereport(ERROR, (errmsg("cannot find tcp/ip connection to client")));
}
switch (port->raddr.addr.ss_family)
{
case AF_INET:
#ifdef HAVE_IPV6
case AF_INET6:
#endif
{
break;
}
default:
{
ereport(ERROR, (errmsg("invalid address family in connection")));
break;
}
}
remoteHost = palloc0(remoteHostLen);
nameFound = pg_getnameinfo_all(&port->raddr.addr, port->raddr.salen,
remoteHost, remoteHostLen, NULL, 0, flags);
if (nameFound != 0)
{
ereport(ERROR, (errmsg("could not resolve client hostname: %s",
gai_strerror(nameFound))));
}
return remoteHost;
}
/*
* WorkerNodeGetDatum converts the worker node passed to it into its datum
* representation. To do this, the function first creates the heap tuple from

View File

@ -69,13 +69,14 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
{
text *relationNameText = PG_GETARG_TEXT_P(0);
char *relationName = text_to_cstring(relationNameText);
List *workerNodeList = WorkerNodeList();
Datum shardIdDatum = 0;
int64 shardId = INVALID_SHARD_ID;
List *ddlEventList = NULL;
uint32 attemptableNodeCount = 0;
uint32 liveNodeCount = 0;
uint32 candidateNodeCount = 0;
uint32 candidateNodeIndex = 0;
List *candidateNodeList = NIL;
text *nullMinValue = NULL;
text *nullMaxValue = NULL;
@ -118,17 +119,36 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
}
/* first retrieve a list of random nodes for shard placements */
while (candidateNodeCount < attemptableNodeCount)
while (candidateNodeIndex < attemptableNodeCount)
{
WorkerNode *candidateNode = WorkerGetCandidateNode(candidateNodeList);
WorkerNode *candidateNode = NULL;
if (ShardPlacementPolicy == SHARD_PLACEMENT_LOCAL_NODE_FIRST)
{
candidateNode = WorkerGetLocalFirstCandidateNode(candidateNodeList);
}
else if (ShardPlacementPolicy == SHARD_PLACEMENT_ROUND_ROBIN)
{
candidateNode = WorkerGetRoundRobinCandidateNode(workerNodeList, shardId,
candidateNodeIndex);
}
else if (ShardPlacementPolicy == SHARD_PLACEMENT_RANDOM)
{
candidateNode = WorkerGetRandomCandidateNode(candidateNodeList);
}
else
{
ereport(ERROR, (errmsg("unrecognized shard placement policy")));
}
if (candidateNode == NULL)
{
ereport(ERROR, (errmsg("could only find %u of %u possible nodes",
candidateNodeCount, attemptableNodeCount)));
candidateNodeIndex, attemptableNodeCount)));
}
candidateNodeList = lappend(candidateNodeList, candidateNode);
candidateNodeCount++;
candidateNodeIndex++;
}
CreateShardPlacements(shardId, ddlEventList, relationOwner,

View File

@ -18,6 +18,8 @@
#include "distributed/worker_manager.h"
#include "distributed/multi_client_executor.h"
#include "libpq/hba.h"
#include "libpq/ip.h"
#include "libpq/libpq-be.h"
#include "postmaster/postmaster.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@ -36,6 +38,7 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
/* Local functions forward declarations */
static char * ClientHostAddress(StringInfo remoteHostStringInfo);
static bool OddNumber(uint32 number);
static WorkerNode * FindRandomNodeNotInList(HTAB *WorkerNodesHash,
List *currentNodeList);
@ -55,8 +58,8 @@ static bool WorkerNodeResponsive(const char *workerName, uint32 workerPort);
*/
/*
* WorkerGetCandidateNode takes in a list of worker nodes, and then allocates a
* new worker node. The allocation is performed according to the following
* WorkerGetRandomCandidateNode takes in a list of worker nodes, and then allocates
* a new worker node. The allocation is performed according to the following
* policy: if the list is empty, a random node is allocated; if the list has one
* node (or an odd number of nodes), the new node is allocated on a different
* rack than the first node; and if the list has two nodes (or an even number of
@ -69,7 +72,7 @@ static bool WorkerNodeResponsive(const char *workerName, uint32 workerPort);
* contain enough nodes to allocate a new worker node.
*/
WorkerNode *
WorkerGetCandidateNode(List *currentNodeList)
WorkerGetRandomCandidateNode(List *currentNodeList)
{
WorkerNode *workerNode = NULL;
bool wantSameRack = false;
@ -164,6 +167,120 @@ WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId,
}
/*
* WorkerGetLocalFirstCandidateNode takes in a list of worker nodes, and then
* allocates a new worker node. The allocation is performed according to the
* following policy: if the list is empty, the node where the caller is connecting
* from is allocated; if the list is not empty, a node is allocated according
* to random policy.
*/
WorkerNode *
WorkerGetLocalFirstCandidateNode(List *currentNodeList)
{
WorkerNode *candidateNode = NULL;
uint32 currentNodeCount = list_length(currentNodeList);
/* choose first candidate node to be the client's host */
if (currentNodeCount == 0)
{
StringInfo clientHostStringInfo = makeStringInfo();
char *clientHost = NULL;
char *errorMessage = ClientHostAddress(clientHostStringInfo);
if (errorMessage != NULL)
{
ereport(ERROR, (errmsg("%s", errorMessage),
errdetail("Could not find the first worker "
"node for local-node-first policy."),
errhint("Make sure that you are not on the "
"master node.")));
}
/* if hostname is localhost.localdomain, change it to localhost */
clientHost = clientHostStringInfo->data;
if (strncmp(clientHost, "localhost.localdomain", WORKER_LENGTH) == 0)
{
clientHost = pstrdup("localhost");
}
candidateNode = WorkerGetNodeWithName(clientHost);
if (candidateNode == NULL)
{
ereport(ERROR, (errmsg("could not find worker node for "
"host: %s", clientHost)));
}
}
else
{
/* find a candidate node different from those already selected */
candidateNode = WorkerGetRandomCandidateNode(currentNodeList);
}
return candidateNode;
}
/*
* ClientHostAddress appends the connecting client's fully qualified hostname
* to the given StringInfo. If there is no such connection or the connection is
* over Unix domain socket, the function fills the error message and returns it.
* On success, it just returns NULL.
*/
static char *
ClientHostAddress(StringInfo clientHostStringInfo)
{
Port *port = MyProcPort;
char *clientHost = NULL;
char *errorMessage = NULL;
int clientHostLength = NI_MAXHOST;
int flags = NI_NAMEREQD; /* require fully qualified hostname */
int nameFound = 0;
if (port == NULL)
{
errorMessage = "cannot find tcp/ip connection to client";
return errorMessage;
}
switch (port->raddr.addr.ss_family)
{
case AF_INET:
#ifdef HAVE_IPV6
case AF_INET6:
#endif
{
break;
}
default:
{
errorMessage = "invalid address family in connection";
return errorMessage;
}
}
clientHost = palloc0(clientHostLength);
nameFound = pg_getnameinfo_all(&port->raddr.addr, port->raddr.salen,
clientHost, clientHostLength, NULL, 0, flags);
if (nameFound == 0)
{
appendStringInfo(clientHostStringInfo, "%s", clientHost);
}
else
{
StringInfo errorMessageStringInfo = makeStringInfo();
appendStringInfo(errorMessageStringInfo, "could not resolve client host: %s",
gai_strerror(nameFound));
errorMessage = errorMessageStringInfo->data;
return errorMessage;
}
return errorMessage;
}
/*
* WorkerGetNodeWithName finds and returns a node from the membership list that
* has the given hostname. The function returns null if no such node exists.

View File

@ -65,6 +65,7 @@ static const struct config_enum_entry task_executor_type_options[] = {
static const struct config_enum_entry shard_placement_policy_options[] = {
{ "local-node-first", SHARD_PLACEMENT_LOCAL_NODE_FIRST, false },
{ "round-robin", SHARD_PLACEMENT_ROUND_ROBIN, false },
{ "random", SHARD_PLACEMENT_RANDOM, false },
{ NULL, 0, false }
};
@ -530,7 +531,8 @@ RegisterCitusConfigVariables(void)
"selecting these nodes. The local-node-first policy places the "
"first replica on the client node and chooses others randomly. "
"The round-robin policy aims to distribute shards evenly across "
"the cluster by selecting nodes in a round-robin fashion."),
"the cluster by selecting nodes in a round-robin fashion."
"The random policy picks all workers randomly."),
&ShardPlacementPolicy,
SHARD_PLACEMENT_ROUND_ROBIN, shard_placement_policy_options,
PGC_USERSET,

View File

@ -72,7 +72,8 @@ typedef enum
{
SHARD_PLACEMENT_INVALID_FIRST = 0,
SHARD_PLACEMENT_LOCAL_NODE_FIRST = 1,
SHARD_PLACEMENT_ROUND_ROBIN = 2
SHARD_PLACEMENT_ROUND_ROBIN = 2,
SHARD_PLACEMENT_RANDOM = 3
} ShardPlacementPolicyType;

View File

@ -57,10 +57,11 @@ extern char *WorkerListFileName;
/* Function declarations for finding worker nodes to place shards on */
extern WorkerNode * WorkerGetCandidateNode(List *currentNodeList);
extern WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList);
extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList,
uint64 shardId,
uint32 placementIndex);
extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList);
extern WorkerNode * WorkerGetNodeWithName(const char *hostname);
extern uint32 WorkerGetLiveNodeCount(void);
extern List * WorkerNodeList(void);

View File

@ -2,6 +2,8 @@
-- MULTI_COPY
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 560000;
-- Create a new hash-partitioned table into which to COPY
CREATE TABLE customer_copy_hash (
c_custkey integer,
@ -255,6 +257,25 @@ COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with delimite
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;
-- Test round robin shard policy
SET citus.shard_replication_factor TO 1;
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|';
SELECT
pg_dist_shard_placement.shardid,
pg_dist_shard_placement.nodeport
FROM
pg_dist_shard,
pg_dist_shard_placement
WHERE
pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND
logicalrelid = 'lineitem_copy_append'::regclass
ORDER BY
pg_dist_shard.shardid DESC
LIMIT
5;
-- Create customer table for the worker copy with constraint and index
CREATE TABLE customer_worker_copy_append (
c_custkey integer ,

View File

@ -1,6 +1,7 @@
--
-- MULTI_COPY
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 560000;
-- Create a new hash-partitioned table into which to COPY
CREATE TABLE customer_copy_hash (
c_custkey integer,
@ -45,7 +46,7 @@ SELECT count(*) FROM customer_copy_hash;
-- Test primary key violation
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv');
ERROR: duplicate key value violates unique constraint "customer_copy_hash_pkey_103160"
ERROR: duplicate key value violates unique constraint "customer_copy_hash_pkey_560048"
DETAIL: Key (c_custkey)=(2) already exists.
-- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash;
@ -304,6 +305,31 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::
5
(1 row)
-- Test round robin shard policy
SET citus.shard_replication_factor TO 1;
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|';
SELECT
pg_dist_shard_placement.shardid,
pg_dist_shard_placement.nodeport
FROM
pg_dist_shard,
pg_dist_shard_placement
WHERE
pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND
logicalrelid = 'lineitem_copy_append'::regclass
ORDER BY
pg_dist_shard.shardid DESC
LIMIT
5;
shardid | nodeport
---------+----------
560141 | 57637
560140 | 57638
560139 | 57637
560138 | 57638
560137 | 57637
(5 rows)
-- Create customer table for the worker copy with constraint and index
CREATE TABLE customer_worker_copy_append (
c_custkey integer ,