From a04e7b233ece1ed203bf876d56ab9cbf7260c851 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Thu, 9 Jul 2020 12:25:30 +0300 Subject: [PATCH] rename node/worker utilities The names were not explicit about what they do, and we have many misusages in the codebase, so they are renamed to be more explicit. (cherry picked from commit 09962a7e2ff340705b6b193bbfececa2d48e0855) --- .../distributed/commands/dependencies.c | 2 +- src/backend/distributed/commands/function.c | 2 +- .../executor/intermediate_results.c | 2 +- .../executor/multi_server_executor.c | 2 +- .../executor/multi_task_tracker_executor.c | 2 +- .../distributed/metadata/metadata_sync.c | 4 +-- .../operations/citus_create_restore_point.c | 2 +- .../distributed/operations/node_protocol.c | 2 +- .../operations/worker_node_manager.c | 28 +++++++++++-------- .../planner/intermediate_result_pruning.c | 4 +-- .../planner/multi_physical_planner.c | 4 +-- .../planner/multi_router_planner.c | 2 +- src/backend/distributed/test/metadata_sync.c | 2 +- .../distributed/transaction/backend_data.c | 2 +- .../transaction/citus_dist_stat_activity.c | 4 +-- .../transaction/worker_transaction.c | 4 +-- src/backend/distributed/utils/resource_lock.c | 2 +- .../distributed/utils/statistics_collection.c | 2 +- src/include/distributed/worker_manager.h | 8 +++--- 19 files changed, 42 insertions(+), 38 deletions(-) diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index b5b5e59d4..47d623c10 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -85,7 +85,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) * either get it now, or get it in master_add_node after this transaction finishes and * the pg_dist_object record becomes visible. */ - List *workerNodeList = ActivePrimaryWorkerNodeList(RowShareLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(RowShareLock); /* * right after we acquired the lock we mark our objects as distributed, these changes diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index b835d0617..1164114a1 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -1022,7 +1022,7 @@ EnsureSequentialModeForFunctionDDL(void) static void TriggerSyncMetadataToPrimaryNodes(void) { - List *workerList = ActivePrimaryWorkerNodeList(ShareLock); + List *workerList = ActivePrimaryNonCoordinatorNodeList(ShareLock); bool triggerMetadataSync = false; WorkerNode *workerNode = NULL; diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 90634c62a..12695f86b 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -136,7 +136,7 @@ broadcast_intermediate_result(PG_FUNCTION_ARGS) */ UseCoordinatedTransaction(); - List *nodeList = ActivePrimaryWorkerNodeList(NoLock); + List *nodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); EState *estate = CreateExecutorState(); RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) CreateRemoteFileDestReceiver(resultIdString, diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index a13c39ca3..cf465c8c0 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -118,7 +118,7 @@ JobExecutorType(DistributedPlan *distributedPlan) } else { - List *workerNodeList = ActiveReadableWorkerNodeList(); + List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); int workerNodeCount = list_length(workerNodeList); int taskCount = list_length(job->taskList); double tasksPerNode = taskCount / ((double) workerNodeCount); diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index a099e3232..d27fd2a9d 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -209,7 +209,7 @@ MultiTaskTrackerExecute(Job *job) * assigning and checking the status of tasks. The second (temporary) hash * helps us in fetching results data from worker nodes to the master node. */ - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); uint32 taskTrackerCount = (uint32) list_length(workerNodeList); /* connect as the current user for running queries */ diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 4d002bdc3..8b7c84be2 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1254,7 +1254,7 @@ SchemaOwnerName(Oid objectId) static bool HasMetadataWorkers(void) { - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) @@ -1373,7 +1373,7 @@ SyncMetadataToNodes(void) return METADATA_SYNC_FAILED_LOCK; } - List *workerList = ActivePrimaryWorkerNodeList(NoLock); + List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerList) { diff --git a/src/backend/distributed/operations/citus_create_restore_point.c b/src/backend/distributed/operations/citus_create_restore_point.c index a9ededfe1..9d9019a83 100644 --- a/src/backend/distributed/operations/citus_create_restore_point.c +++ b/src/backend/distributed/operations/citus_create_restore_point.c @@ -117,7 +117,7 @@ OpenConnectionsToAllWorkerNodes(LOCKMODE lockMode) List *connectionList = NIL; int connectionFlags = FORCE_NEW_CONNECTION; - List *workerNodeList = ActivePrimaryWorkerNodeList(lockMode); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index e167380d1..552c439ca 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -457,7 +457,7 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS) MemoryContext oldContext = MemoryContextSwitchTo( functionContext->multi_call_memory_ctx); - List *workerNodeList = ActiveReadableWorkerNodeList(); + List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); workerNodeCount = (uint32) list_length(workerNodeList); functionContext->user_fctx = workerNodeList; diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index e0e38e08c..565215f15 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -293,12 +293,13 @@ WorkerGetNodeWithName(const char *hostname) /* - * ActivePrimaryWorkerNodeCount returns the number of groups with a primary in the cluster. + * ActivePrimaryNonCoordinatorNodeCount returns the number of groups with a primary in the cluster. + * This method excludes coordinator even if it is added as a worker to cluster. */ uint32 -ActivePrimaryWorkerNodeCount(void) +ActivePrimaryNonCoordinatorNodeCount(void) { - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); uint32 liveWorkerCount = list_length(workerNodeList); return liveWorkerCount; @@ -306,12 +307,13 @@ ActivePrimaryWorkerNodeCount(void) /* - * ActiveReadableWorkerNodeCount returns the number of groups with a node we can read from. + * ActiveReadableNonCoordinatorNodeCount returns the number of groups with a node we can read from. + * This method excludes coordinator even if it is added as a worker. */ uint32 -ActiveReadableWorkerNodeCount(void) +ActiveReadableNonCoordinatorNodeCount(void) { - List *workerNodeList = ActiveReadableWorkerNodeList(); + List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); uint32 liveWorkerCount = list_length(workerNodeList); return liveWorkerCount; @@ -366,13 +368,14 @@ FilterActiveNodeListFunc(LOCKMODE lockMode, bool (*checkFunction)(WorkerNode *)) /* - * ActivePrimaryWorkerNodeList returns a list of all active primary worker nodes + * ActivePrimaryNonCoordinatorNodeList returns a list of all active primary worker nodes * in workerNodeHash. lockMode specifies which lock to use on pg_dist_node, * this is necessary when the caller wouldn't want nodes to be added concurrent * to their use of this list. + * This method excludes coordinator even if it is added as a worker to cluster. */ List * -ActivePrimaryWorkerNodeList(LOCKMODE lockMode) +ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode) { EnsureModificationsCanRun(); return FilterActiveNodeListFunc(lockMode, NodeIsPrimaryWorker); @@ -443,11 +446,11 @@ NodeCanHaveDistTablePlacements(WorkerNode *node) /* - * ActiveReadableWorkerNodeList returns a list of all nodes in workerNodeHash - * that are readable workers. + * ActiveReadableNonCoordinatorNodeList returns a list of all nodes in workerNodeHash + * that are readable nodes This method excludes coordinator. */ List * -ActiveReadableWorkerNodeList(void) +ActiveReadableNonCoordinatorNodeList(void) { return FilterActiveNodeListFunc(NoLock, NodeIsReadableWorker); } @@ -456,6 +459,7 @@ ActiveReadableWorkerNodeList(void) /* * ActiveReadableNodeList returns a list of all nodes in workerNodeHash * that are readable workers. + * This method includes coordinator if it is added as a worker to the cluster. */ List * ActiveReadableNodeList(void) @@ -602,7 +606,7 @@ WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize) WorkerNode * GetFirstPrimaryWorkerNode(void) { - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); WorkerNode *firstWorkerNode = NULL; WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) diff --git a/src/backend/distributed/planner/intermediate_result_pruning.c b/src/backend/distributed/planner/intermediate_result_pruning.c index a851eb11d..2156e3469 100644 --- a/src/backend/distributed/planner/intermediate_result_pruning.c +++ b/src/backend/distributed/planner/intermediate_result_pruning.c @@ -151,7 +151,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList; List *subPlanList = distributedPlan->subPlanList; ListCell *subPlanCell = NULL; - int workerNodeCount = ActiveReadableWorkerNodeCount(); + int workerNodeCount = ActiveReadableNonCoordinatorNodeCount(); foreach(subPlanCell, usedSubPlanNodeList) { @@ -269,7 +269,7 @@ AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, static void AppendAllWorkerNodes(IntermediateResultsHashEntry *entry) { - List *workerNodeList = ActiveReadableWorkerNodeList(); + List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 955fca0b0..3e389e8a9 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2107,7 +2107,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependentJobList, Var *partitionKey, static uint32 HashPartitionCount(void) { - uint32 groupCount = ActiveReadableWorkerNodeCount(); + uint32 groupCount = ActiveReadableNonCoordinatorNodeCount(); double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0; uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode); @@ -5717,7 +5717,7 @@ AssignDualHashTaskList(List *taskList) * if subsequent jobs have a small number of tasks, we won't allocate the * tasks to the same worker repeatedly. */ - List *workerNodeList = ActiveReadableWorkerNodeList(); + List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); uint32 workerNodeCount = (uint32) list_length(workerNodeList); uint32 beginningNodeIndex = jobId % workerNodeCount; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index b42355d10..a912f442b 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -2299,7 +2299,7 @@ CreateDummyPlacement(bool hasLocalRelation) return CreateLocalDummyPlacement(); } - List *workerNodeList = ActiveReadableWorkerNodeList(); + List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); if (workerNodeList == NIL) { /* diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index e7450454a..2a1b6c290 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -75,7 +75,7 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS) { uint32 timeout = PG_GETARG_UINT32(0); - List *workerList = ActivePrimaryWorkerNodeList(NoLock); + List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock); bool waitNotifications = false; WorkerNode *workerNode = NULL; diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index f49449251..e40fdf999 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -217,7 +217,7 @@ Datum get_global_active_transactions(PG_FUNCTION_ARGS) { TupleDesc tupleDescriptor = NULL; - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); List *connectionList = NIL; StringInfo queryToSend = makeStringInfo(); diff --git a/src/backend/distributed/transaction/citus_dist_stat_activity.c b/src/backend/distributed/transaction/citus_dist_stat_activity.c index 96229f281..d3cc93ea6 100644 --- a/src/backend/distributed/transaction/citus_dist_stat_activity.c +++ b/src/backend/distributed/transaction/citus_dist_stat_activity.c @@ -311,7 +311,7 @@ citus_worker_stat_activity(PG_FUNCTION_ARGS) static List * CitusStatActivity(const char *statQuery) { - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); List *connectionList = NIL; /* @@ -437,7 +437,7 @@ GetLocalNodeCitusDistStat(const char *statQuery) int32 localGroupId = GetLocalGroupId(); /* get the current worker's node stats */ - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 29288d479..0c0dad87d 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -156,7 +156,7 @@ static void SendCommandListToAllWorkersInternal(List *commandList, bool failOnError, const char *superuser) { - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) @@ -198,7 +198,7 @@ SendOptionalCommandListToAllWorkers(List *commandList, const char *superuser) List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { - List *workerNodeList = ActivePrimaryWorkerNodeList(lockMode); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); List *result = NIL; WorkerNode *workerNode = NULL; diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 603e1ec71..08d07f257 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -230,7 +230,7 @@ LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList) static bool IsFirstWorkerNode() { - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); diff --git a/src/backend/distributed/utils/statistics_collection.c b/src/backend/distributed/utils/statistics_collection.c index 47a6e082e..d1430abd1 100644 --- a/src/backend/distributed/utils/statistics_collection.c +++ b/src/backend/distributed/utils/statistics_collection.c @@ -96,7 +96,7 @@ CollectBasicUsageStatistics(void) distTableOids = DistTableOidList(); roundedDistTableCount = NextPow2(list_length(distTableOids)); roundedClusterSize = NextPow2(DistributedTablesSize(distTableOids)); - workerNodeCount = ActivePrimaryWorkerNodeCount(); + workerNodeCount = ActivePrimaryNonCoordinatorNodeCount(); metadataJsonbDatum = DistNodeMetadata(); metadataJsonbStr = DatumGetCString(DirectFunctionCall1(jsonb_out, metadataJsonbDatum)); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 336e1eedf..c49eb8e41 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -70,14 +70,14 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId, uint32 placementIndex); extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); -extern uint32 ActivePrimaryWorkerNodeCount(void); -extern List * ActivePrimaryWorkerNodeList(LOCKMODE lockMode); +extern uint32 ActivePrimaryNonCoordinatorNodeCount(void); +extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode); extern List * ActivePrimaryNodeList(LOCKMODE lockMode); extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode); extern List * DistributedTablePlacementNodeList(LOCKMODE lockMode); extern bool NodeCanHaveDistTablePlacements(WorkerNode *node); -extern uint32 ActiveReadableWorkerNodeCount(void); -extern List * ActiveReadableWorkerNodeList(void); +extern uint32 ActiveReadableNonCoordinatorNodeCount(void); +extern List * ActiveReadableNonCoordinatorNodeList(void); extern List * ActiveReadableNodeList(void); extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); extern WorkerNode * ForceFindWorkerNode(const char *nodeName, int32 nodePort);