Merge pull request #489 from citusdata/respect_guc_in_master_create_empty_shard

Make master_create_empty_shard() aware of the shard placement policy

cr: @marcocitus
pull/516/head
Metin Döşlü 2016-05-27 15:13:23 +03:00
commit b520fb7448
8 changed files with 204 additions and 91 deletions

View File

@ -53,7 +53,6 @@ int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */
int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN; int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN;
static char * hostname_client_addr(void);
static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor); static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor);
@ -343,36 +342,12 @@ master_get_local_first_candidate_nodes(PG_FUNCTION_ARGS)
oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx); oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx);
currentNodeList = functionContext->user_fctx; currentNodeList = functionContext->user_fctx;
if (currentNodeCount == 0) candidateNode = WorkerGetLocalFirstCandidateNode(currentNodeList);
{
/* choose first candidate node to be the client's host */
char *remoteHostname = hostname_client_addr();
/* if hostname is localhost.localdomain, change it to localhost */
int nameCompare = strncmp(remoteHostname, "localhost.localdomain",
WORKER_LENGTH);
if (nameCompare == 0)
{
remoteHostname = pstrdup("localhost");
}
candidateNode = WorkerGetNodeWithName(remoteHostname);
if (candidateNode == NULL)
{
ereport(ERROR, (errmsg("could not find worker node for hostname: %s",
remoteHostname)));
}
}
else
{
/* find a candidate node different from those already selected */
candidateNode = WorkerGetCandidateNode(currentNodeList);
if (candidateNode == NULL) if (candidateNode == NULL)
{ {
ereport(ERROR, (errmsg("could only find %u of %u required nodes", ereport(ERROR, (errmsg("could only find %u of %u required nodes",
currentNodeCount, desiredNodeCount))); currentNodeCount, desiredNodeCount)));
} }
}
currentNodeList = lappend(currentNodeList, candidateNode); currentNodeList = lappend(currentNodeList, candidateNode);
functionContext->user_fctx = currentNodeList; functionContext->user_fctx = currentNodeList;
@ -695,56 +670,6 @@ GetTableDDLEvents(Oid relationId)
} }
/*
* hostname_client_addr allocates memory for the connecting client's fully
* qualified hostname, and returns this name. If there is no such connection or
* the connection is over Unix domain socket, the function errors.
*/
static char *
hostname_client_addr(void)
{
Port *port = MyProcPort;
char *remoteHost = NULL;
int remoteHostLen = NI_MAXHOST;
int flags = NI_NAMEREQD; /* require fully qualified hostname */
int nameFound = 0;
if (port == NULL)
{
ereport(ERROR, (errmsg("cannot find tcp/ip connection to client")));
}
switch (port->raddr.addr.ss_family)
{
case AF_INET:
#ifdef HAVE_IPV6
case AF_INET6:
#endif
{
break;
}
default:
{
ereport(ERROR, (errmsg("invalid address family in connection")));
break;
}
}
remoteHost = palloc0(remoteHostLen);
nameFound = pg_getnameinfo_all(&port->raddr.addr, port->raddr.salen,
remoteHost, remoteHostLen, NULL, 0, flags);
if (nameFound != 0)
{
ereport(ERROR, (errmsg("could not resolve client hostname: %s",
gai_strerror(nameFound))));
}
return remoteHost;
}
/* /*
* WorkerNodeGetDatum converts the worker node passed to it into its datum * WorkerNodeGetDatum converts the worker node passed to it into its datum
* representation. To do this, the function first creates the heap tuple from * representation. To do this, the function first creates the heap tuple from

View File

@ -69,13 +69,14 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
{ {
text *relationNameText = PG_GETARG_TEXT_P(0); text *relationNameText = PG_GETARG_TEXT_P(0);
char *relationName = text_to_cstring(relationNameText); char *relationName = text_to_cstring(relationNameText);
List *workerNodeList = WorkerNodeList();
Datum shardIdDatum = 0; Datum shardIdDatum = 0;
int64 shardId = INVALID_SHARD_ID; int64 shardId = INVALID_SHARD_ID;
List *ddlEventList = NULL; List *ddlEventList = NULL;
uint32 attemptableNodeCount = 0; uint32 attemptableNodeCount = 0;
uint32 liveNodeCount = 0; uint32 liveNodeCount = 0;
uint32 candidateNodeCount = 0; uint32 candidateNodeIndex = 0;
List *candidateNodeList = NIL; List *candidateNodeList = NIL;
text *nullMinValue = NULL; text *nullMinValue = NULL;
text *nullMaxValue = NULL; text *nullMaxValue = NULL;
@ -118,17 +119,36 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
} }
/* first retrieve a list of random nodes for shard placements */ /* first retrieve a list of random nodes for shard placements */
while (candidateNodeCount < attemptableNodeCount) while (candidateNodeIndex < attemptableNodeCount)
{ {
WorkerNode *candidateNode = WorkerGetCandidateNode(candidateNodeList); WorkerNode *candidateNode = NULL;
if (ShardPlacementPolicy == SHARD_PLACEMENT_LOCAL_NODE_FIRST)
{
candidateNode = WorkerGetLocalFirstCandidateNode(candidateNodeList);
}
else if (ShardPlacementPolicy == SHARD_PLACEMENT_ROUND_ROBIN)
{
candidateNode = WorkerGetRoundRobinCandidateNode(workerNodeList, shardId,
candidateNodeIndex);
}
else if (ShardPlacementPolicy == SHARD_PLACEMENT_RANDOM)
{
candidateNode = WorkerGetRandomCandidateNode(candidateNodeList);
}
else
{
ereport(ERROR, (errmsg("unrecognized shard placement policy")));
}
if (candidateNode == NULL) if (candidateNode == NULL)
{ {
ereport(ERROR, (errmsg("could only find %u of %u possible nodes", ereport(ERROR, (errmsg("could only find %u of %u possible nodes",
candidateNodeCount, attemptableNodeCount))); candidateNodeIndex, attemptableNodeCount)));
} }
candidateNodeList = lappend(candidateNodeList, candidateNode); candidateNodeList = lappend(candidateNodeList, candidateNode);
candidateNodeCount++; candidateNodeIndex++;
} }
CreateShardPlacements(shardId, ddlEventList, relationOwner, CreateShardPlacements(shardId, ddlEventList, relationOwner,

View File

@ -18,6 +18,8 @@
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "libpq/hba.h" #include "libpq/hba.h"
#include "libpq/ip.h"
#include "libpq/libpq-be.h"
#include "postmaster/postmaster.h" #include "postmaster/postmaster.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/ipc.h" #include "storage/ipc.h"
@ -36,6 +38,7 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
/* Local functions forward declarations */ /* Local functions forward declarations */
static char * ClientHostAddress(StringInfo remoteHostStringInfo);
static bool OddNumber(uint32 number); static bool OddNumber(uint32 number);
static WorkerNode * FindRandomNodeNotInList(HTAB *WorkerNodesHash, static WorkerNode * FindRandomNodeNotInList(HTAB *WorkerNodesHash,
List *currentNodeList); List *currentNodeList);
@ -55,8 +58,8 @@ static bool WorkerNodeResponsive(const char *workerName, uint32 workerPort);
*/ */
/* /*
* WorkerGetCandidateNode takes in a list of worker nodes, and then allocates a * WorkerGetRandomCandidateNode takes in a list of worker nodes, and then allocates
* new worker node. The allocation is performed according to the following * a new worker node. The allocation is performed according to the following
* policy: if the list is empty, a random node is allocated; if the list has one * policy: if the list is empty, a random node is allocated; if the list has one
* node (or an odd number of nodes), the new node is allocated on a different * node (or an odd number of nodes), the new node is allocated on a different
* rack than the first node; and if the list has two nodes (or an even number of * rack than the first node; and if the list has two nodes (or an even number of
@ -69,7 +72,7 @@ static bool WorkerNodeResponsive(const char *workerName, uint32 workerPort);
* contain enough nodes to allocate a new worker node. * contain enough nodes to allocate a new worker node.
*/ */
WorkerNode * WorkerNode *
WorkerGetCandidateNode(List *currentNodeList) WorkerGetRandomCandidateNode(List *currentNodeList)
{ {
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
bool wantSameRack = false; bool wantSameRack = false;
@ -164,6 +167,120 @@ WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId,
} }
/*
* WorkerGetLocalFirstCandidateNode takes in a list of worker nodes, and then
* allocates a new worker node. The allocation is performed according to the
* following policy: if the list is empty, the node where the caller is connecting
* from is allocated; if the list is not empty, a node is allocated according
* to random policy.
*/
WorkerNode *
WorkerGetLocalFirstCandidateNode(List *currentNodeList)
{
WorkerNode *candidateNode = NULL;
uint32 currentNodeCount = list_length(currentNodeList);
/* choose first candidate node to be the client's host */
if (currentNodeCount == 0)
{
StringInfo clientHostStringInfo = makeStringInfo();
char *clientHost = NULL;
char *errorMessage = ClientHostAddress(clientHostStringInfo);
if (errorMessage != NULL)
{
ereport(ERROR, (errmsg("%s", errorMessage),
errdetail("Could not find the first worker "
"node for local-node-first policy."),
errhint("Make sure that you are not on the "
"master node.")));
}
/* if hostname is localhost.localdomain, change it to localhost */
clientHost = clientHostStringInfo->data;
if (strncmp(clientHost, "localhost.localdomain", WORKER_LENGTH) == 0)
{
clientHost = pstrdup("localhost");
}
candidateNode = WorkerGetNodeWithName(clientHost);
if (candidateNode == NULL)
{
ereport(ERROR, (errmsg("could not find worker node for "
"host: %s", clientHost)));
}
}
else
{
/* find a candidate node different from those already selected */
candidateNode = WorkerGetRandomCandidateNode(currentNodeList);
}
return candidateNode;
}
/*
* ClientHostAddress appends the connecting client's fully qualified hostname
* to the given StringInfo. If there is no such connection or the connection is
* over Unix domain socket, the function fills the error message and returns it.
* On success, it just returns NULL.
*/
static char *
ClientHostAddress(StringInfo clientHostStringInfo)
{
Port *port = MyProcPort;
char *clientHost = NULL;
char *errorMessage = NULL;
int clientHostLength = NI_MAXHOST;
int flags = NI_NAMEREQD; /* require fully qualified hostname */
int nameFound = 0;
if (port == NULL)
{
errorMessage = "cannot find tcp/ip connection to client";
return errorMessage;
}
switch (port->raddr.addr.ss_family)
{
case AF_INET:
#ifdef HAVE_IPV6
case AF_INET6:
#endif
{
break;
}
default:
{
errorMessage = "invalid address family in connection";
return errorMessage;
}
}
clientHost = palloc0(clientHostLength);
nameFound = pg_getnameinfo_all(&port->raddr.addr, port->raddr.salen,
clientHost, clientHostLength, NULL, 0, flags);
if (nameFound == 0)
{
appendStringInfo(clientHostStringInfo, "%s", clientHost);
}
else
{
StringInfo errorMessageStringInfo = makeStringInfo();
appendStringInfo(errorMessageStringInfo, "could not resolve client host: %s",
gai_strerror(nameFound));
errorMessage = errorMessageStringInfo->data;
return errorMessage;
}
return errorMessage;
}
/* /*
* WorkerGetNodeWithName finds and returns a node from the membership list that * WorkerGetNodeWithName finds and returns a node from the membership list that
* has the given hostname. The function returns null if no such node exists. * has the given hostname. The function returns null if no such node exists.

View File

@ -65,6 +65,7 @@ static const struct config_enum_entry task_executor_type_options[] = {
static const struct config_enum_entry shard_placement_policy_options[] = { static const struct config_enum_entry shard_placement_policy_options[] = {
{ "local-node-first", SHARD_PLACEMENT_LOCAL_NODE_FIRST, false }, { "local-node-first", SHARD_PLACEMENT_LOCAL_NODE_FIRST, false },
{ "round-robin", SHARD_PLACEMENT_ROUND_ROBIN, false }, { "round-robin", SHARD_PLACEMENT_ROUND_ROBIN, false },
{ "random", SHARD_PLACEMENT_RANDOM, false },
{ NULL, 0, false } { NULL, 0, false }
}; };
@ -530,7 +531,8 @@ RegisterCitusConfigVariables(void)
"selecting these nodes. The local-node-first policy places the " "selecting these nodes. The local-node-first policy places the "
"first replica on the client node and chooses others randomly. " "first replica on the client node and chooses others randomly. "
"The round-robin policy aims to distribute shards evenly across " "The round-robin policy aims to distribute shards evenly across "
"the cluster by selecting nodes in a round-robin fashion."), "the cluster by selecting nodes in a round-robin fashion."
"The random policy picks all workers randomly."),
&ShardPlacementPolicy, &ShardPlacementPolicy,
SHARD_PLACEMENT_ROUND_ROBIN, shard_placement_policy_options, SHARD_PLACEMENT_ROUND_ROBIN, shard_placement_policy_options,
PGC_USERSET, PGC_USERSET,

View File

@ -72,7 +72,8 @@ typedef enum
{ {
SHARD_PLACEMENT_INVALID_FIRST = 0, SHARD_PLACEMENT_INVALID_FIRST = 0,
SHARD_PLACEMENT_LOCAL_NODE_FIRST = 1, SHARD_PLACEMENT_LOCAL_NODE_FIRST = 1,
SHARD_PLACEMENT_ROUND_ROBIN = 2 SHARD_PLACEMENT_ROUND_ROBIN = 2,
SHARD_PLACEMENT_RANDOM = 3
} ShardPlacementPolicyType; } ShardPlacementPolicyType;

View File

@ -57,10 +57,11 @@ extern char *WorkerListFileName;
/* Function declarations for finding worker nodes to place shards on */ /* Function declarations for finding worker nodes to place shards on */
extern WorkerNode * WorkerGetCandidateNode(List *currentNodeList); extern WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList);
extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList,
uint64 shardId, uint64 shardId,
uint32 placementIndex); uint32 placementIndex);
extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList);
extern WorkerNode * WorkerGetNodeWithName(const char *hostname); extern WorkerNode * WorkerGetNodeWithName(const char *hostname);
extern uint32 WorkerGetLiveNodeCount(void); extern uint32 WorkerGetLiveNodeCount(void);
extern List * WorkerNodeList(void); extern List * WorkerNodeList(void);

View File

@ -2,6 +2,8 @@
-- MULTI_COPY -- MULTI_COPY
-- --
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 560000;
-- Create a new hash-partitioned table into which to COPY -- Create a new hash-partitioned table into which to COPY
CREATE TABLE customer_copy_hash ( CREATE TABLE customer_copy_hash (
c_custkey integer, c_custkey integer,
@ -255,6 +257,25 @@ COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with delimite
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass; SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;
-- Test round robin shard policy
SET citus.shard_replication_factor TO 1;
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|';
SELECT
pg_dist_shard_placement.shardid,
pg_dist_shard_placement.nodeport
FROM
pg_dist_shard,
pg_dist_shard_placement
WHERE
pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND
logicalrelid = 'lineitem_copy_append'::regclass
ORDER BY
pg_dist_shard.shardid DESC
LIMIT
5;
-- Create customer table for the worker copy with constraint and index -- Create customer table for the worker copy with constraint and index
CREATE TABLE customer_worker_copy_append ( CREATE TABLE customer_worker_copy_append (
c_custkey integer , c_custkey integer ,

View File

@ -1,6 +1,7 @@
-- --
-- MULTI_COPY -- MULTI_COPY
-- --
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 560000;
-- Create a new hash-partitioned table into which to COPY -- Create a new hash-partitioned table into which to COPY
CREATE TABLE customer_copy_hash ( CREATE TABLE customer_copy_hash (
c_custkey integer, c_custkey integer,
@ -45,7 +46,7 @@ SELECT count(*) FROM customer_copy_hash;
-- Test primary key violation -- Test primary key violation
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv'); WITH (FORMAT 'csv');
ERROR: duplicate key value violates unique constraint "customer_copy_hash_pkey_103160" ERROR: duplicate key value violates unique constraint "customer_copy_hash_pkey_560048"
DETAIL: Key (c_custkey)=(2) already exists. DETAIL: Key (c_custkey)=(2) already exists.
-- Confirm that no data was copied -- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash; SELECT count(*) FROM customer_copy_hash;
@ -304,6 +305,31 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::
5 5
(1 row) (1 row)
-- Test round robin shard policy
SET citus.shard_replication_factor TO 1;
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|';
SELECT
pg_dist_shard_placement.shardid,
pg_dist_shard_placement.nodeport
FROM
pg_dist_shard,
pg_dist_shard_placement
WHERE
pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND
logicalrelid = 'lineitem_copy_append'::regclass
ORDER BY
pg_dist_shard.shardid DESC
LIMIT
5;
shardid | nodeport
---------+----------
560141 | 57637
560140 | 57638
560139 | 57637
560138 | 57638
560137 | 57637
(5 rows)
-- Create customer table for the worker copy with constraint and index -- Create customer table for the worker copy with constraint and index
CREATE TABLE customer_worker_copy_append ( CREATE TABLE customer_worker_copy_append (
c_custkey integer , c_custkey integer ,