From 872f0a79dbeea0188802cb4b036070ee522d52ee Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sun, 6 Feb 2022 21:41:20 +0100 Subject: [PATCH] Remove random shard placement policy --- .../distributed/operations/node_protocol.c | 1 - .../distributed/operations/stage_protocol.c | 19 +-- .../operations/worker_node_manager.c | 149 ------------------ src/backend/distributed/shared_library_init.c | 22 --- .../distributed/coordinator_protocol.h | 9 -- 5 files changed, 3 insertions(+), 197 deletions(-) diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index 0cb0a5ace..d18ef749c 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -67,7 +67,6 @@ /* Shard related configuration */ int ShardCount = 32; int ShardReplicationFactor = 1; /* desired replication factor for shards */ -int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN; int NextShardId = 0; int NextPlacementId = 0; diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index 2fa052536..d6e9c0f2a 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -171,22 +171,9 @@ master_create_empty_shard(PG_FUNCTION_ARGS) /* first retrieve a list of random nodes for shard placements */ while (candidateNodeIndex < attemptableNodeCount) { - WorkerNode *candidateNode = NULL; - - if (ShardPlacementPolicy == SHARD_PLACEMENT_ROUND_ROBIN) - { - candidateNode = WorkerGetRoundRobinCandidateNode(workerNodeList, shardId, - candidateNodeIndex); - } - else if (ShardPlacementPolicy == SHARD_PLACEMENT_RANDOM) - { - candidateNode = WorkerGetRandomCandidateNode(candidateNodeList); - } - else - { - ereport(ERROR, (errmsg("unrecognized shard placement policy"))); - } - + WorkerNode *candidateNode = WorkerGetRoundRobinCandidateNode(workerNodeList, + shardId, + candidateNodeIndex); if (candidateNode == NULL) { ereport(ERROR, (errmsg("could only find %u of %u possible nodes", diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 938b90a24..1054049e4 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -40,10 +40,6 @@ int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size * /* Local functions forward declarations */ -static List * PrimaryNodesNotInList(List *currentList); -static WorkerNode * FindRandomNodeFromList(List *candidateWorkerNodeList); -static bool OddNumber(uint32 number); -static bool ListMember(List *currentList, WorkerNode *workerNode); static bool NodeIsPrimaryWorker(WorkerNode *node); static bool NodeIsReadableWorker(WorkerNode *node); @@ -53,73 +49,6 @@ static bool NodeIsReadableWorker(WorkerNode *node); * ------------------------------------------------------------ */ -/* - * WorkerGetRandomCandidateNode accepts a list of WorkerNode's and returns a random - * primary node which is not in that list. - * - * Note that the function returns null if the worker membership list does not - * contain enough nodes to allocate a new worker node. - */ -WorkerNode * -WorkerGetRandomCandidateNode(List *currentNodeList) -{ - WorkerNode *workerNode = NULL; - bool wantSameRack = false; - uint32 tryCount = WORKER_RACK_TRIES; - - uint32 currentNodeCount = list_length(currentNodeList); - List *candidateWorkerNodeList = PrimaryNodesNotInList(currentNodeList); - - /* we check if the shard has already been placed on all nodes known to us */ - if (list_length(candidateWorkerNodeList) == 0) - { - return NULL; - } - - /* if current node list is empty, randomly pick one node and return */ - if (currentNodeCount == 0) - { - workerNode = FindRandomNodeFromList(candidateWorkerNodeList); - return workerNode; - } - - /* - * If the current list has an odd number of nodes (1, 3, 5, etc), we want to - * place the shard on a different rack than the first node's rack. - * Otherwise, we want to place the shard on the same rack as the first node. - */ - if (OddNumber(currentNodeCount)) - { - wantSameRack = false; - } - else - { - wantSameRack = true; - } - - /* - * We try to find a worker node that fits our rack-aware placement strategy. - * If after a predefined number of tries, we still cannot find such a node, - * we simply give up and return the last worker node we found. - */ - for (uint32 tryIndex = 0; tryIndex < tryCount; tryIndex++) - { - WorkerNode *firstNode = (WorkerNode *) linitial(currentNodeList); - char *firstRack = firstNode->workerRack; - - workerNode = FindRandomNodeFromList(candidateWorkerNodeList); - char *workerRack = workerNode->workerRack; - - bool sameRack = (strncmp(workerRack, firstRack, WORKER_LENGTH) == 0); - if ((sameRack && wantSameRack) || (!sameRack && !wantSameRack)) - { - break; - } - } - - return workerNode; -} - /* * WorkerGetRoundRobinCandidateNode takes in a list of worker nodes and returns @@ -399,84 +328,6 @@ NodeIsReadableWorker(WorkerNode *node) } -/* - * PrimaryNodesNotInList scans through the worker node hash and returns a list of all - * primary nodes which are not in currentList. It runs in O(n*m) but currentList is - * quite small. - */ -static List * -PrimaryNodesNotInList(List *currentList) -{ - List *workerNodeList = NIL; - HTAB *workerNodeHash = GetWorkerNodeHash(); - WorkerNode *workerNode = NULL; - HASH_SEQ_STATUS status; - - hash_seq_init(&status, workerNodeHash); - - while ((workerNode = hash_seq_search(&status)) != NULL) - { - if (ListMember(currentList, workerNode)) - { - continue; - } - - if (NodeIsPrimary(workerNode)) - { - workerNodeList = lappend(workerNodeList, workerNode); - } - } - - return workerNodeList; -} - - -/* FindRandomNodeFromList picks a random node from the list provided to it. */ -static WorkerNode * -FindRandomNodeFromList(List *candidateWorkerNodeList) -{ - uint32 candidateNodeCount = list_length(candidateWorkerNodeList); - - /* nb, the random seed has already been set by the postmaster when starting up */ - uint32 workerPosition = (random() % candidateNodeCount); - - WorkerNode *workerNode = - (WorkerNode *) list_nth(candidateWorkerNodeList, workerPosition); - - return workerNode; -} - - -/* - * OddNumber function returns true if given number is odd; returns false otherwise. - */ -static bool -OddNumber(uint32 number) -{ - bool oddNumber = ((number % 2) == 1); - return oddNumber; -} - - -/* Checks if given worker node is a member of the current list. */ -static bool -ListMember(List *currentList, WorkerNode *workerNode) -{ - Size keySize = WORKER_LENGTH + sizeof(uint32); - - WorkerNode *currentNode = NULL; - foreach_ptr(currentNode, currentList) - { - if (WorkerNodeCompare(workerNode, currentNode, keySize) == 0) - { - return true; - } - } - - return false; -} - - /* * CompareWorkerNodes compares two pointers to worker nodes using the exact * same logic employed by WorkerNodeCompare. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 11898618c..e109bceed 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -168,12 +168,6 @@ static const struct config_enum_entry task_executor_type_options[] = { { NULL, 0, false } }; -static const struct config_enum_entry shard_placement_policy_options[] = { - { "round-robin", SHARD_PLACEMENT_ROUND_ROBIN, false }, - { "random", SHARD_PLACEMENT_RANDOM, false }, - { NULL, 0, false } -}; - static const struct config_enum_entry use_secondary_nodes_options[] = { { "never", USE_SECONDARY_NODES_NEVER, false }, { "always", USE_SECONDARY_NODES_ALWAYS, false }, @@ -1628,22 +1622,6 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); - DefineCustomEnumVariable( - "citus.shard_placement_policy", - gettext_noop("Sets the policy to use when choosing nodes for shard placement."), - gettext_noop("The master node chooses which worker nodes to place new shards " - "on. This configuration value specifies the policy to use when " - "selecting these nodes. The local-node-first policy places the " - "first replica on the client node and chooses others randomly. " - "The round-robin policy aims to distribute shards evenly across " - "the cluster by selecting nodes in a round-robin fashion." - "The random policy picks all workers randomly."), - &ShardPlacementPolicy, - SHARD_PLACEMENT_ROUND_ROBIN, shard_placement_policy_options, - PGC_USERSET, - GUC_STANDARD, - NULL, NULL, NULL); - DefineCustomIntVariable( "citus.shard_replication_factor", gettext_noop("Sets the replication factor for shards."), diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index c3e149c07..bda318a25 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -72,14 +72,6 @@ #define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s CASCADE" #define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s AUTHORIZATION %s" -/* Enumeration that defines the shard placement policy to use while staging */ -typedef enum -{ - SHARD_PLACEMENT_INVALID_FIRST = 0, - SHARD_PLACEMENT_ROUND_ROBIN = 1, - SHARD_PLACEMENT_RANDOM = 2 -} ShardPlacementPolicyType; - /* * TableDDLCommandType encodes the implementation used by TableDDLCommand. See comments in * TableDDLCpmmand for details. @@ -211,7 +203,6 @@ extern TableDDLCommand * ColumnarGetCustomTableOptionsDDL(char *schemaName, /* Config variables managed via guc.c */ extern int ShardCount; extern int ShardReplicationFactor; -extern int ShardPlacementPolicy; extern int NextShardId; extern int NextPlacementId;