diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index 0cb0a5ace..d18ef749c 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -67,7 +67,6 @@ /* Shard related configuration */ int ShardCount = 32; int ShardReplicationFactor = 1; /* desired replication factor for shards */ -int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN; int NextShardId = 0; int NextPlacementId = 0; diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index c473e7974..d6e9c0f2a 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -171,26 +171,9 @@ master_create_empty_shard(PG_FUNCTION_ARGS) /* first retrieve a list of random nodes for shard placements */ while (candidateNodeIndex < attemptableNodeCount) { - WorkerNode *candidateNode = NULL; - - if (ShardPlacementPolicy == SHARD_PLACEMENT_LOCAL_NODE_FIRST) - { - candidateNode = WorkerGetLocalFirstCandidateNode(candidateNodeList); - } - else if (ShardPlacementPolicy == SHARD_PLACEMENT_ROUND_ROBIN) - { - candidateNode = WorkerGetRoundRobinCandidateNode(workerNodeList, shardId, - candidateNodeIndex); - } - else if (ShardPlacementPolicy == SHARD_PLACEMENT_RANDOM) - { - candidateNode = WorkerGetRandomCandidateNode(candidateNodeList); - } - else - { - ereport(ERROR, (errmsg("unrecognized shard placement policy"))); - } - + WorkerNode *candidateNode = WorkerGetRoundRobinCandidateNode(workerNodeList, + shardId, + candidateNodeIndex); if (candidateNode == NULL) { ereport(ERROR, (errmsg("could only find %u of %u possible nodes", diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 7fbc53e32..1054049e4 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -40,12 +40,6 @@ int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size * /* Local functions forward declarations */ -static WorkerNode * WorkerGetNodeWithName(const char *hostname); -static char * ClientHostAddress(StringInfo remoteHostStringInfo); -static List * PrimaryNodesNotInList(List *currentList); -static WorkerNode * FindRandomNodeFromList(List *candidateWorkerNodeList); -static bool OddNumber(uint32 number); -static bool ListMember(List *currentList, WorkerNode *workerNode); static bool NodeIsPrimaryWorker(WorkerNode *node); static bool NodeIsReadableWorker(WorkerNode *node); @@ -55,73 +49,6 @@ static bool NodeIsReadableWorker(WorkerNode *node); * ------------------------------------------------------------ */ -/* - * WorkerGetRandomCandidateNode accepts a list of WorkerNode's and returns a random - * primary node which is not in that list. - * - * Note that the function returns null if the worker membership list does not - * contain enough nodes to allocate a new worker node. - */ -WorkerNode * -WorkerGetRandomCandidateNode(List *currentNodeList) -{ - WorkerNode *workerNode = NULL; - bool wantSameRack = false; - uint32 tryCount = WORKER_RACK_TRIES; - - uint32 currentNodeCount = list_length(currentNodeList); - List *candidateWorkerNodeList = PrimaryNodesNotInList(currentNodeList); - - /* we check if the shard has already been placed on all nodes known to us */ - if (list_length(candidateWorkerNodeList) == 0) - { - return NULL; - } - - /* if current node list is empty, randomly pick one node and return */ - if (currentNodeCount == 0) - { - workerNode = FindRandomNodeFromList(candidateWorkerNodeList); - return workerNode; - } - - /* - * If the current list has an odd number of nodes (1, 3, 5, etc), we want to - * place the shard on a different rack than the first node's rack. - * Otherwise, we want to place the shard on the same rack as the first node. - */ - if (OddNumber(currentNodeCount)) - { - wantSameRack = false; - } - else - { - wantSameRack = true; - } - - /* - * We try to find a worker node that fits our rack-aware placement strategy. - * If after a predefined number of tries, we still cannot find such a node, - * we simply give up and return the last worker node we found. - */ - for (uint32 tryIndex = 0; tryIndex < tryCount; tryIndex++) - { - WorkerNode *firstNode = (WorkerNode *) linitial(currentNodeList); - char *firstRack = firstNode->workerRack; - - workerNode = FindRandomNodeFromList(candidateWorkerNodeList); - char *workerRack = workerNode->workerRack; - - bool sameRack = (strncmp(workerRack, firstRack, WORKER_LENGTH) == 0); - if ((sameRack && wantSameRack) || (!sameRack && !wantSameRack)) - { - break; - } - } - - return workerNode; -} - /* * WorkerGetRoundRobinCandidateNode takes in a list of worker nodes and returns @@ -152,147 +79,6 @@ WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId, } -/* - * WorkerGetLocalFirstCandidateNode takes in a list of worker nodes, and then - * allocates a new worker node. The allocation is performed according to the - * following policy: if the list is empty, the node where the caller is connecting - * from is allocated; if the list is not empty, a node is allocated according - * to random policy. - */ -WorkerNode * -WorkerGetLocalFirstCandidateNode(List *currentNodeList) -{ - WorkerNode *candidateNode = NULL; - uint32 currentNodeCount = list_length(currentNodeList); - - /* choose first candidate node to be the client's host */ - if (currentNodeCount == 0) - { - StringInfo clientHostStringInfo = makeStringInfo(); - char *errorMessage = ClientHostAddress(clientHostStringInfo); - - if (errorMessage != NULL) - { - ereport(ERROR, (errmsg("%s", errorMessage), - errdetail("Could not find the first worker " - "node for local-node-first policy."), - errhint("Make sure that you are not on the " - "master node."))); - } - - /* if hostname is localhost.localdomain, change it to localhost */ - char *clientHost = clientHostStringInfo->data; - if (strncmp(clientHost, "localhost.localdomain", WORKER_LENGTH) == 0) - { - clientHost = pstrdup("localhost"); - } - - candidateNode = WorkerGetNodeWithName(clientHost); - if (candidateNode == NULL) - { - ereport(ERROR, (errmsg("could not find worker node for " - "host: %s", clientHost))); - } - } - else - { - /* find a candidate node different from those already selected */ - candidateNode = WorkerGetRandomCandidateNode(currentNodeList); - } - - return candidateNode; -} - - -/* - * ClientHostAddress appends the connecting client's fully qualified hostname - * to the given StringInfo. If there is no such connection or the connection is - * over Unix domain socket, the function fills the error message and returns it. - * On success, it just returns NULL. - */ -static char * -ClientHostAddress(StringInfo clientHostStringInfo) -{ - Port *port = MyProcPort; - char *clientHost = NULL; - char *errorMessage = NULL; - int clientHostLength = NI_MAXHOST; - int flags = NI_NAMEREQD; /* require fully qualified hostname */ - int nameFound = 0; - - if (port == NULL) - { - errorMessage = "cannot find tcp/ip connection to client"; - return errorMessage; - } - - switch (port->raddr.addr.ss_family) - { - case AF_INET: -#ifdef HAVE_IPV6 - case AF_INET6: -#endif - { - break; - } - - default: - { - errorMessage = "invalid address family in connection"; - return errorMessage; - } - } - - clientHost = palloc0(clientHostLength); - - nameFound = pg_getnameinfo_all(&port->raddr.addr, port->raddr.salen, - clientHost, clientHostLength, NULL, 0, flags); - if (nameFound == 0) - { - appendStringInfo(clientHostStringInfo, "%s", clientHost); - } - else - { - StringInfo errorMessageStringInfo = makeStringInfo(); - appendStringInfo(errorMessageStringInfo, "could not resolve client host: %s", - gai_strerror(nameFound)); - - errorMessage = errorMessageStringInfo->data; - return errorMessage; - } - - return errorMessage; -} - - -/* - * WorkerGetNodeWithName finds and returns a node from the membership list that - * has the given hostname. The function returns null if no such node exists. - */ -static WorkerNode * -WorkerGetNodeWithName(const char *hostname) -{ - WorkerNode *workerNode = NULL; - HASH_SEQ_STATUS status; - HTAB *workerNodeHash = GetWorkerNodeHash(); - - hash_seq_init(&status, workerNodeHash); - - while ((workerNode = hash_seq_search(&status)) != NULL) - { - int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH); - if (nameCompare == 0) - { - /* we need to terminate the scan since we break */ - hash_seq_term(&status); - break; - } - } - - return workerNode; -} - - /* * ActivePrimaryNonCoordinatorNodeCount returns the number of groups with a primary in the cluster. * This method excludes coordinator even if it is added as a worker to cluster. @@ -542,84 +328,6 @@ NodeIsReadableWorker(WorkerNode *node) } -/* - * PrimaryNodesNotInList scans through the worker node hash and returns a list of all - * primary nodes which are not in currentList. It runs in O(n*m) but currentList is - * quite small. - */ -static List * -PrimaryNodesNotInList(List *currentList) -{ - List *workerNodeList = NIL; - HTAB *workerNodeHash = GetWorkerNodeHash(); - WorkerNode *workerNode = NULL; - HASH_SEQ_STATUS status; - - hash_seq_init(&status, workerNodeHash); - - while ((workerNode = hash_seq_search(&status)) != NULL) - { - if (ListMember(currentList, workerNode)) - { - continue; - } - - if (NodeIsPrimary(workerNode)) - { - workerNodeList = lappend(workerNodeList, workerNode); - } - } - - return workerNodeList; -} - - -/* FindRandomNodeFromList picks a random node from the list provided to it. */ -static WorkerNode * -FindRandomNodeFromList(List *candidateWorkerNodeList) -{ - uint32 candidateNodeCount = list_length(candidateWorkerNodeList); - - /* nb, the random seed has already been set by the postmaster when starting up */ - uint32 workerPosition = (random() % candidateNodeCount); - - WorkerNode *workerNode = - (WorkerNode *) list_nth(candidateWorkerNodeList, workerPosition); - - return workerNode; -} - - -/* - * OddNumber function returns true if given number is odd; returns false otherwise. - */ -static bool -OddNumber(uint32 number) -{ - bool oddNumber = ((number % 2) == 1); - return oddNumber; -} - - -/* Checks if given worker node is a member of the current list. */ -static bool -ListMember(List *currentList, WorkerNode *workerNode) -{ - Size keySize = WORKER_LENGTH + sizeof(uint32); - - WorkerNode *currentNode = NULL; - foreach_ptr(currentNode, currentList) - { - if (WorkerNodeCompare(workerNode, currentNode, keySize) == 0) - { - return true; - } - } - - return false; -} - - /* * CompareWorkerNodes compares two pointers to worker nodes using the exact * same logic employed by WorkerNodeCompare. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 00e7a523a..e109bceed 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -168,13 +168,6 @@ static const struct config_enum_entry task_executor_type_options[] = { { NULL, 0, false } }; -static const struct config_enum_entry shard_placement_policy_options[] = { - { "local-node-first", SHARD_PLACEMENT_LOCAL_NODE_FIRST, false }, - { "round-robin", SHARD_PLACEMENT_ROUND_ROBIN, false }, - { "random", SHARD_PLACEMENT_RANDOM, false }, - { NULL, 0, false } -}; - static const struct config_enum_entry use_secondary_nodes_options[] = { { "never", USE_SECONDARY_NODES_NEVER, false }, { "always", USE_SECONDARY_NODES_ALWAYS, false }, @@ -1629,22 +1622,6 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); - DefineCustomEnumVariable( - "citus.shard_placement_policy", - gettext_noop("Sets the policy to use when choosing nodes for shard placement."), - gettext_noop("The master node chooses which worker nodes to place new shards " - "on. This configuration value specifies the policy to use when " - "selecting these nodes. The local-node-first policy places the " - "first replica on the client node and chooses others randomly. " - "The round-robin policy aims to distribute shards evenly across " - "the cluster by selecting nodes in a round-robin fashion." - "The random policy picks all workers randomly."), - &ShardPlacementPolicy, - SHARD_PLACEMENT_ROUND_ROBIN, shard_placement_policy_options, - PGC_USERSET, - GUC_STANDARD, - NULL, NULL, NULL); - DefineCustomIntVariable( "citus.shard_replication_factor", gettext_noop("Sets the replication factor for shards."), diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index f0bda8515..bda318a25 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -72,15 +72,6 @@ #define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s CASCADE" #define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s AUTHORIZATION %s" -/* Enumeration that defines the shard placement policy to use while staging */ -typedef enum -{ - SHARD_PLACEMENT_INVALID_FIRST = 0, - SHARD_PLACEMENT_LOCAL_NODE_FIRST = 1, - SHARD_PLACEMENT_ROUND_ROBIN = 2, - SHARD_PLACEMENT_RANDOM = 3 -} ShardPlacementPolicyType; - /* * TableDDLCommandType encodes the implementation used by TableDDLCommand. See comments in * TableDDLCpmmand for details. @@ -212,7 +203,6 @@ extern TableDDLCommand * ColumnarGetCustomTableOptionsDDL(char *schemaName, /* Config variables managed via guc.c */ extern int ShardCount; extern int ShardReplicationFactor; -extern int ShardPlacementPolicy; extern int NextShardId; extern int NextPlacementId; diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 91d91a880..0a6b637b3 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -70,7 +70,6 @@ extern WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList); extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId, uint32 placementIndex); -extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); extern uint32 ActivePrimaryNonCoordinatorNodeCount(void); extern uint32 ActivePrimaryNodeCount(void); extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode);