From afa74ce5cab961342d6058db34519394b9c77e47 Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Tue, 19 Apr 2016 16:48:59 +0300 Subject: [PATCH] Make master_create_empty_shard() aware of the shard placement policy Now, master_create_empty_shard() will create shards according to the value of citus.shard_placement_policy which also makes default round-robin instead of random. --- .../distributed/master/master_node_protocol.c | 83 +----------- .../master/master_stage_protocol.c | 30 ++++- .../distributed/master/worker_node_manager.c | 123 +++++++++++++++++- src/backend/distributed/shared_library_init.c | 4 +- src/include/distributed/master_protocol.h | 3 +- src/include/distributed/worker_manager.h | 3 +- src/test/regress/input/multi_copy.source | 21 +++ src/test/regress/output/multi_copy.source | 28 +++- 8 files changed, 204 insertions(+), 91 deletions(-) diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 2351bf838..075fcb83f 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -53,7 +53,6 @@ int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */ int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN; -static char * hostname_client_addr(void); static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor); @@ -343,35 +342,11 @@ master_get_local_first_candidate_nodes(PG_FUNCTION_ARGS) oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx); currentNodeList = functionContext->user_fctx; - if (currentNodeCount == 0) + candidateNode = WorkerGetLocalFirstCandidateNode(currentNodeList); + if (candidateNode == NULL) { - /* choose first candidate node to be the client's host */ - char *remoteHostname = hostname_client_addr(); - - /* if hostname is localhost.localdomain, change it to localhost */ - int nameCompare = strncmp(remoteHostname, "localhost.localdomain", - WORKER_LENGTH); - if (nameCompare == 0) - { - remoteHostname = pstrdup("localhost"); - } - - candidateNode = WorkerGetNodeWithName(remoteHostname); - if (candidateNode == NULL) - { - ereport(ERROR, (errmsg("could not find worker node for hostname: %s", - remoteHostname))); - } - } - else - { - /* find a candidate node different from those already selected */ - candidateNode = WorkerGetCandidateNode(currentNodeList); - if (candidateNode == NULL) - { - ereport(ERROR, (errmsg("could only find %u of %u required nodes", - currentNodeCount, desiredNodeCount))); - } + ereport(ERROR, (errmsg("could only find %u of %u required nodes", + currentNodeCount, desiredNodeCount))); } currentNodeList = lappend(currentNodeList, candidateNode); @@ -695,56 +670,6 @@ GetTableDDLEvents(Oid relationId) } -/* - * hostname_client_addr allocates memory for the connecting client's fully - * qualified hostname, and returns this name. If there is no such connection or - * the connection is over Unix domain socket, the function errors. - */ -static char * -hostname_client_addr(void) -{ - Port *port = MyProcPort; - char *remoteHost = NULL; - int remoteHostLen = NI_MAXHOST; - int flags = NI_NAMEREQD; /* require fully qualified hostname */ - int nameFound = 0; - - if (port == NULL) - { - ereport(ERROR, (errmsg("cannot find tcp/ip connection to client"))); - } - - switch (port->raddr.addr.ss_family) - { - case AF_INET: -#ifdef HAVE_IPV6 - case AF_INET6: -#endif - { - break; - } - - default: - { - ereport(ERROR, (errmsg("invalid address family in connection"))); - break; - } - } - - remoteHost = palloc0(remoteHostLen); - - nameFound = pg_getnameinfo_all(&port->raddr.addr, port->raddr.salen, - remoteHost, remoteHostLen, NULL, 0, flags); - if (nameFound != 0) - { - ereport(ERROR, (errmsg("could not resolve client hostname: %s", - gai_strerror(nameFound)))); - } - - return remoteHost; -} - - /* * WorkerNodeGetDatum converts the worker node passed to it into its datum * representation. To do this, the function first creates the heap tuple from diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 04dfbd815..b527f545d 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -69,13 +69,14 @@ master_create_empty_shard(PG_FUNCTION_ARGS) { text *relationNameText = PG_GETARG_TEXT_P(0); char *relationName = text_to_cstring(relationNameText); + List *workerNodeList = WorkerNodeList(); Datum shardIdDatum = 0; int64 shardId = INVALID_SHARD_ID; List *ddlEventList = NULL; uint32 attemptableNodeCount = 0; uint32 liveNodeCount = 0; - uint32 candidateNodeCount = 0; + uint32 candidateNodeIndex = 0; List *candidateNodeList = NIL; text *nullMinValue = NULL; text *nullMaxValue = NULL; @@ -118,17 +119,36 @@ master_create_empty_shard(PG_FUNCTION_ARGS) } /* first retrieve a list of random nodes for shard placements */ - while (candidateNodeCount < attemptableNodeCount) + while (candidateNodeIndex < attemptableNodeCount) { - WorkerNode *candidateNode = WorkerGetCandidateNode(candidateNodeList); + WorkerNode *candidateNode = NULL; + + if (ShardPlacementPolicy == SHARD_PLACEMENT_LOCAL_NODE_FIRST) + { + candidateNode = WorkerGetLocalFirstCandidateNode(candidateNodeList); + } + else if (ShardPlacementPolicy == SHARD_PLACEMENT_ROUND_ROBIN) + { + candidateNode = WorkerGetRoundRobinCandidateNode(workerNodeList, shardId, + candidateNodeIndex); + } + else if (ShardPlacementPolicy == SHARD_PLACEMENT_RANDOM) + { + candidateNode = WorkerGetRandomCandidateNode(candidateNodeList); + } + else + { + ereport(ERROR, (errmsg("unrecognized shard placement policy"))); + } + if (candidateNode == NULL) { ereport(ERROR, (errmsg("could only find %u of %u possible nodes", - candidateNodeCount, attemptableNodeCount))); + candidateNodeIndex, attemptableNodeCount))); } candidateNodeList = lappend(candidateNodeList, candidateNode); - candidateNodeCount++; + candidateNodeIndex++; } CreateShardPlacements(shardId, ddlEventList, relationOwner, diff --git a/src/backend/distributed/master/worker_node_manager.c b/src/backend/distributed/master/worker_node_manager.c index d9eb76734..5db1ef492 100644 --- a/src/backend/distributed/master/worker_node_manager.c +++ b/src/backend/distributed/master/worker_node_manager.c @@ -18,6 +18,8 @@ #include "distributed/worker_manager.h" #include "distributed/multi_client_executor.h" #include "libpq/hba.h" +#include "libpq/ip.h" +#include "libpq/libpq-be.h" #include "postmaster/postmaster.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -36,6 +38,7 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL; /* Local functions forward declarations */ +static char * ClientHostAddress(StringInfo remoteHostStringInfo); static bool OddNumber(uint32 number); static WorkerNode * FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList); @@ -55,8 +58,8 @@ static bool WorkerNodeResponsive(const char *workerName, uint32 workerPort); */ /* - * WorkerGetCandidateNode takes in a list of worker nodes, and then allocates a - * new worker node. The allocation is performed according to the following + * WorkerGetRandomCandidateNode takes in a list of worker nodes, and then allocates + * a new worker node. The allocation is performed according to the following * policy: if the list is empty, a random node is allocated; if the list has one * node (or an odd number of nodes), the new node is allocated on a different * rack than the first node; and if the list has two nodes (or an even number of @@ -69,7 +72,7 @@ static bool WorkerNodeResponsive(const char *workerName, uint32 workerPort); * contain enough nodes to allocate a new worker node. */ WorkerNode * -WorkerGetCandidateNode(List *currentNodeList) +WorkerGetRandomCandidateNode(List *currentNodeList) { WorkerNode *workerNode = NULL; bool wantSameRack = false; @@ -164,6 +167,120 @@ WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId, } +/* + * WorkerGetLocalFirstCandidateNode takes in a list of worker nodes, and then + * allocates a new worker node. The allocation is performed according to the + * following policy: if the list is empty, the node where the caller is connecting + * from is allocated; if the list is not empty, a node is allocated according + * to random policy. + */ +WorkerNode * +WorkerGetLocalFirstCandidateNode(List *currentNodeList) +{ + WorkerNode *candidateNode = NULL; + uint32 currentNodeCount = list_length(currentNodeList); + + /* choose first candidate node to be the client's host */ + if (currentNodeCount == 0) + { + StringInfo clientHostStringInfo = makeStringInfo(); + char *clientHost = NULL; + char *errorMessage = ClientHostAddress(clientHostStringInfo); + + if (errorMessage != NULL) + { + ereport(ERROR, (errmsg("%s", errorMessage), + errdetail("Could not find the first worker " + "node for local-node-first policy."), + errhint("Make sure that you are not on the " + "master node."))); + } + + /* if hostname is localhost.localdomain, change it to localhost */ + clientHost = clientHostStringInfo->data; + if (strncmp(clientHost, "localhost.localdomain", WORKER_LENGTH) == 0) + { + clientHost = pstrdup("localhost"); + } + + candidateNode = WorkerGetNodeWithName(clientHost); + if (candidateNode == NULL) + { + ereport(ERROR, (errmsg("could not find worker node for " + "host: %s", clientHost))); + } + } + else + { + /* find a candidate node different from those already selected */ + candidateNode = WorkerGetRandomCandidateNode(currentNodeList); + } + + return candidateNode; +} + + +/* + * ClientHostAddress appends the connecting client's fully qualified hostname + * to the given StringInfo. If there is no such connection or the connection is + * over Unix domain socket, the function fills the error message and returns it. + * On success, it just returns NULL. + */ +static char * +ClientHostAddress(StringInfo clientHostStringInfo) +{ + Port *port = MyProcPort; + char *clientHost = NULL; + char *errorMessage = NULL; + int clientHostLength = NI_MAXHOST; + int flags = NI_NAMEREQD; /* require fully qualified hostname */ + int nameFound = 0; + + if (port == NULL) + { + errorMessage = "cannot find tcp/ip connection to client"; + return errorMessage; + } + + switch (port->raddr.addr.ss_family) + { + case AF_INET: +#ifdef HAVE_IPV6 + case AF_INET6: +#endif + { + break; + } + + default: + { + errorMessage = "invalid address family in connection"; + return errorMessage; + } + } + + clientHost = palloc0(clientHostLength); + + nameFound = pg_getnameinfo_all(&port->raddr.addr, port->raddr.salen, + clientHost, clientHostLength, NULL, 0, flags); + if (nameFound == 0) + { + appendStringInfo(clientHostStringInfo, "%s", clientHost); + } + else + { + StringInfo errorMessageStringInfo = makeStringInfo(); + appendStringInfo(errorMessageStringInfo, "could not resolve client host: %s", + gai_strerror(nameFound)); + + errorMessage = errorMessageStringInfo->data; + return errorMessage; + } + + return errorMessage; +} + + /* * WorkerGetNodeWithName finds and returns a node from the membership list that * has the given hostname. The function returns null if no such node exists. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 5d7e94dc5..31b8a39e7 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -65,6 +65,7 @@ static const struct config_enum_entry task_executor_type_options[] = { static const struct config_enum_entry shard_placement_policy_options[] = { { "local-node-first", SHARD_PLACEMENT_LOCAL_NODE_FIRST, false }, { "round-robin", SHARD_PLACEMENT_ROUND_ROBIN, false }, + { "random", SHARD_PLACEMENT_RANDOM, false }, { NULL, 0, false } }; @@ -530,7 +531,8 @@ RegisterCitusConfigVariables(void) "selecting these nodes. The local-node-first policy places the " "first replica on the client node and chooses others randomly. " "The round-robin policy aims to distribute shards evenly across " - "the cluster by selecting nodes in a round-robin fashion."), + "the cluster by selecting nodes in a round-robin fashion." + "The random policy picks all workers randomly."), &ShardPlacementPolicy, SHARD_PLACEMENT_ROUND_ROBIN, shard_placement_policy_options, PGC_USERSET, diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 427658bde..0a45c8b1e 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -72,7 +72,8 @@ typedef enum { SHARD_PLACEMENT_INVALID_FIRST = 0, SHARD_PLACEMENT_LOCAL_NODE_FIRST = 1, - SHARD_PLACEMENT_ROUND_ROBIN = 2 + SHARD_PLACEMENT_ROUND_ROBIN = 2, + SHARD_PLACEMENT_RANDOM = 3 } ShardPlacementPolicyType; diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 6225d500e..ec506848c 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -57,10 +57,11 @@ extern char *WorkerListFileName; /* Function declarations for finding worker nodes to place shards on */ -extern WorkerNode * WorkerGetCandidateNode(List *currentNodeList); +extern WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList); extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId, uint32 placementIndex); +extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); extern WorkerNode * WorkerGetNodeWithName(const char *hostname); extern uint32 WorkerGetLiveNodeCount(void); extern List * WorkerNodeList(void); diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index 1c3d6acb0..cf13682b8 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -2,6 +2,8 @@ -- MULTI_COPY -- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 560000; + -- Create a new hash-partitioned table into which to COPY CREATE TABLE customer_copy_hash ( c_custkey integer, @@ -255,6 +257,25 @@ COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with delimite SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass; +-- Test round robin shard policy +SET citus.shard_replication_factor TO 1; + +COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'; + +SELECT + pg_dist_shard_placement.shardid, + pg_dist_shard_placement.nodeport +FROM + pg_dist_shard, + pg_dist_shard_placement +WHERE + pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND + logicalrelid = 'lineitem_copy_append'::regclass +ORDER BY + pg_dist_shard.shardid DESC +LIMIT + 5; + -- Create customer table for the worker copy with constraint and index CREATE TABLE customer_worker_copy_append ( c_custkey integer , diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 3b26ff0e0..76c77415c 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -1,6 +1,7 @@ -- -- MULTI_COPY -- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 560000; -- Create a new hash-partitioned table into which to COPY CREATE TABLE customer_copy_hash ( c_custkey integer, @@ -45,7 +46,7 @@ SELECT count(*) FROM customer_copy_hash; -- Test primary key violation COPY customer_copy_hash (c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv'); -ERROR: duplicate key value violates unique constraint "customer_copy_hash_pkey_103160" +ERROR: duplicate key value violates unique constraint "customer_copy_hash_pkey_560048" DETAIL: Key (c_custkey)=(2) already exists. -- Confirm that no data was copied SELECT count(*) FROM customer_copy_hash; @@ -304,6 +305,31 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append':: 5 (1 row) +-- Test round robin shard policy +SET citus.shard_replication_factor TO 1; +COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'; +SELECT + pg_dist_shard_placement.shardid, + pg_dist_shard_placement.nodeport +FROM + pg_dist_shard, + pg_dist_shard_placement +WHERE + pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND + logicalrelid = 'lineitem_copy_append'::regclass +ORDER BY + pg_dist_shard.shardid DESC +LIMIT + 5; + shardid | nodeport +---------+---------- + 560141 | 57637 + 560140 | 57638 + 560139 | 57637 + 560138 | 57638 + 560137 | 57637 +(5 rows) + -- Create customer table for the worker copy with constraint and index CREATE TABLE customer_worker_copy_append ( c_custkey integer ,