From 0cae8e7d6b905780e4dcf8f59d9538b8d59f13f9 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sun, 6 Feb 2022 21:36:34 +0100 Subject: [PATCH] Remove local-node-first shard placement --- .../distributed/operations/stage_protocol.c | 6 +- .../operations/worker_node_manager.c | 143 ------------------ src/backend/distributed/shared_library_init.c | 1 - .../distributed/coordinator_protocol.h | 5 +- src/include/distributed/worker_manager.h | 1 - 5 files changed, 3 insertions(+), 153 deletions(-) diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index c473e7974..2fa052536 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -173,11 +173,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) { WorkerNode *candidateNode = NULL; - if (ShardPlacementPolicy == SHARD_PLACEMENT_LOCAL_NODE_FIRST) - { - candidateNode = WorkerGetLocalFirstCandidateNode(candidateNodeList); - } - else if (ShardPlacementPolicy == SHARD_PLACEMENT_ROUND_ROBIN) + if (ShardPlacementPolicy == SHARD_PLACEMENT_ROUND_ROBIN) { candidateNode = WorkerGetRoundRobinCandidateNode(workerNodeList, shardId, candidateNodeIndex); diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 7fbc53e32..938b90a24 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -40,8 +40,6 @@ int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size * /* Local functions forward declarations */ -static WorkerNode * WorkerGetNodeWithName(const char *hostname); -static char * ClientHostAddress(StringInfo remoteHostStringInfo); static List * PrimaryNodesNotInList(List *currentList); static WorkerNode * FindRandomNodeFromList(List *candidateWorkerNodeList); static bool OddNumber(uint32 number); @@ -152,147 +150,6 @@ WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId, } -/* - * WorkerGetLocalFirstCandidateNode takes in a list of worker nodes, and then - * allocates a new worker node. The allocation is performed according to the - * following policy: if the list is empty, the node where the caller is connecting - * from is allocated; if the list is not empty, a node is allocated according - * to random policy. - */ -WorkerNode * -WorkerGetLocalFirstCandidateNode(List *currentNodeList) -{ - WorkerNode *candidateNode = NULL; - uint32 currentNodeCount = list_length(currentNodeList); - - /* choose first candidate node to be the client's host */ - if (currentNodeCount == 0) - { - StringInfo clientHostStringInfo = makeStringInfo(); - char *errorMessage = ClientHostAddress(clientHostStringInfo); - - if (errorMessage != NULL) - { - ereport(ERROR, (errmsg("%s", errorMessage), - errdetail("Could not find the first worker " - "node for local-node-first policy."), - errhint("Make sure that you are not on the " - "master node."))); - } - - /* if hostname is localhost.localdomain, change it to localhost */ - char *clientHost = clientHostStringInfo->data; - if (strncmp(clientHost, "localhost.localdomain", WORKER_LENGTH) == 0) - { - clientHost = pstrdup("localhost"); - } - - candidateNode = WorkerGetNodeWithName(clientHost); - if (candidateNode == NULL) - { - ereport(ERROR, (errmsg("could not find worker node for " - "host: %s", clientHost))); - } - } - else - { - /* find a candidate node different from those already selected */ - candidateNode = WorkerGetRandomCandidateNode(currentNodeList); - } - - return candidateNode; -} - - -/* - * ClientHostAddress appends the connecting client's fully qualified hostname - * to the given StringInfo. If there is no such connection or the connection is - * over Unix domain socket, the function fills the error message and returns it. - * On success, it just returns NULL. - */ -static char * -ClientHostAddress(StringInfo clientHostStringInfo) -{ - Port *port = MyProcPort; - char *clientHost = NULL; - char *errorMessage = NULL; - int clientHostLength = NI_MAXHOST; - int flags = NI_NAMEREQD; /* require fully qualified hostname */ - int nameFound = 0; - - if (port == NULL) - { - errorMessage = "cannot find tcp/ip connection to client"; - return errorMessage; - } - - switch (port->raddr.addr.ss_family) - { - case AF_INET: -#ifdef HAVE_IPV6 - case AF_INET6: -#endif - { - break; - } - - default: - { - errorMessage = "invalid address family in connection"; - return errorMessage; - } - } - - clientHost = palloc0(clientHostLength); - - nameFound = pg_getnameinfo_all(&port->raddr.addr, port->raddr.salen, - clientHost, clientHostLength, NULL, 0, flags); - if (nameFound == 0) - { - appendStringInfo(clientHostStringInfo, "%s", clientHost); - } - else - { - StringInfo errorMessageStringInfo = makeStringInfo(); - appendStringInfo(errorMessageStringInfo, "could not resolve client host: %s", - gai_strerror(nameFound)); - - errorMessage = errorMessageStringInfo->data; - return errorMessage; - } - - return errorMessage; -} - - -/* - * WorkerGetNodeWithName finds and returns a node from the membership list that - * has the given hostname. The function returns null if no such node exists. - */ -static WorkerNode * -WorkerGetNodeWithName(const char *hostname) -{ - WorkerNode *workerNode = NULL; - HASH_SEQ_STATUS status; - HTAB *workerNodeHash = GetWorkerNodeHash(); - - hash_seq_init(&status, workerNodeHash); - - while ((workerNode = hash_seq_search(&status)) != NULL) - { - int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH); - if (nameCompare == 0) - { - /* we need to terminate the scan since we break */ - hash_seq_term(&status); - break; - } - } - - return workerNode; -} - - /* * ActivePrimaryNonCoordinatorNodeCount returns the number of groups with a primary in the cluster. * This method excludes coordinator even if it is added as a worker to cluster. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 00e7a523a..11898618c 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -169,7 +169,6 @@ static const struct config_enum_entry task_executor_type_options[] = { }; static const struct config_enum_entry shard_placement_policy_options[] = { - { "local-node-first", SHARD_PLACEMENT_LOCAL_NODE_FIRST, false }, { "round-robin", SHARD_PLACEMENT_ROUND_ROBIN, false }, { "random", SHARD_PLACEMENT_RANDOM, false }, { NULL, 0, false } diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index f0bda8515..c3e149c07 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -76,9 +76,8 @@ typedef enum { SHARD_PLACEMENT_INVALID_FIRST = 0, - SHARD_PLACEMENT_LOCAL_NODE_FIRST = 1, - SHARD_PLACEMENT_ROUND_ROBIN = 2, - SHARD_PLACEMENT_RANDOM = 3 + SHARD_PLACEMENT_ROUND_ROBIN = 1, + SHARD_PLACEMENT_RANDOM = 2 } ShardPlacementPolicyType; /* diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 91d91a880..0a6b637b3 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -70,7 +70,6 @@ extern WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList); extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId, uint32 placementIndex); -extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); extern uint32 ActivePrimaryNonCoordinatorNodeCount(void); extern uint32 ActivePrimaryNodeCount(void); extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode);