From 96adce77d6683d37e5f4e77324a4da487e4f66ec Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Thu, 9 Jul 2020 15:30:35 +0300 Subject: [PATCH] rename node/worker utilities (#4003) The names were not explicit about what they do, and we have many misusages in the codebase, so they are renamed to be more explicit. --- .../distributed/commands/dependencies.c | 2 +- src/backend/distributed/commands/function.c | 2 +- .../executor/intermediate_results.c | 2 +- .../executor/multi_server_executor.c | 2 +- .../executor/multi_task_tracker_executor.c | 2 +- .../distributed/metadata/metadata_sync.c | 4 +-- .../operations/citus_create_restore_point.c | 2 +- .../distributed/operations/node_protocol.c | 2 +- .../operations/worker_node_manager.c | 28 +++++++++++-------- .../planner/intermediate_result_pruning.c | 4 +-- .../planner/multi_physical_planner.c | 4 +-- .../planner/multi_router_planner.c | 2 +- src/backend/distributed/test/metadata_sync.c | 2 +- .../distributed/transaction/backend_data.c | 2 +- .../transaction/citus_dist_stat_activity.c | 4 +-- .../transaction/worker_transaction.c | 4 +-- src/backend/distributed/utils/resource_lock.c | 2 +- .../distributed/utils/statistics_collection.c | 2 +- src/include/distributed/worker_manager.h | 8 +++--- 19 files changed, 42 insertions(+), 38 deletions(-) diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index b5b5e59d4..47d623c10 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -85,7 +85,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) * either get it now, or get it in master_add_node after this transaction finishes and * the pg_dist_object record becomes visible. */ - List *workerNodeList = ActivePrimaryWorkerNodeList(RowShareLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(RowShareLock); /* * right after we acquired the lock we mark our objects as distributed, these changes diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 38daa8bee..9ff2159db 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -1022,7 +1022,7 @@ EnsureSequentialModeForFunctionDDL(void) static void TriggerSyncMetadataToPrimaryNodes(void) { - List *workerList = ActivePrimaryWorkerNodeList(ShareLock); + List *workerList = ActivePrimaryNonCoordinatorNodeList(ShareLock); bool triggerMetadataSync = false; WorkerNode *workerNode = NULL; diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 90634c62a..12695f86b 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -136,7 +136,7 @@ broadcast_intermediate_result(PG_FUNCTION_ARGS) */ UseCoordinatedTransaction(); - List *nodeList = ActivePrimaryWorkerNodeList(NoLock); + List *nodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); EState *estate = CreateExecutorState(); RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) CreateRemoteFileDestReceiver(resultIdString, diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 52b4ba1da..3ca1a2e40 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -118,7 +118,7 @@ JobExecutorType(DistributedPlan *distributedPlan) } else { - List *workerNodeList = ActiveReadableWorkerNodeList(); + List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); int workerNodeCount = list_length(workerNodeList); int taskCount = list_length(job->taskList); double tasksPerNode = taskCount / ((double) workerNodeCount); diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index a099e3232..d27fd2a9d 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -209,7 +209,7 @@ MultiTaskTrackerExecute(Job *job) * assigning and checking the status of tasks. The second (temporary) hash * helps us in fetching results data from worker nodes to the master node. */ - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); uint32 taskTrackerCount = (uint32) list_length(workerNodeList); /* connect as the current user for running queries */ diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 4d002bdc3..8b7c84be2 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1254,7 +1254,7 @@ SchemaOwnerName(Oid objectId) static bool HasMetadataWorkers(void) { - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) @@ -1373,7 +1373,7 @@ SyncMetadataToNodes(void) return METADATA_SYNC_FAILED_LOCK; } - List *workerList = ActivePrimaryWorkerNodeList(NoLock); + List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerList) { diff --git a/src/backend/distributed/operations/citus_create_restore_point.c b/src/backend/distributed/operations/citus_create_restore_point.c index a9ededfe1..9d9019a83 100644 --- a/src/backend/distributed/operations/citus_create_restore_point.c +++ b/src/backend/distributed/operations/citus_create_restore_point.c @@ -117,7 +117,7 @@ OpenConnectionsToAllWorkerNodes(LOCKMODE lockMode) List *connectionList = NIL; int connectionFlags = FORCE_NEW_CONNECTION; - List *workerNodeList = ActivePrimaryWorkerNodeList(lockMode); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index e167380d1..552c439ca 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -457,7 +457,7 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS) MemoryContext oldContext = MemoryContextSwitchTo( functionContext->multi_call_memory_ctx); - List *workerNodeList = ActiveReadableWorkerNodeList(); + List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); workerNodeCount = (uint32) list_length(workerNodeList); functionContext->user_fctx = workerNodeList; diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index e0e38e08c..565215f15 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -293,12 +293,13 @@ WorkerGetNodeWithName(const char *hostname) /* - * ActivePrimaryWorkerNodeCount returns the number of groups with a primary in the cluster. + * ActivePrimaryNonCoordinatorNodeCount returns the number of groups with a primary in the cluster. + * This method excludes coordinator even if it is added as a worker to cluster. */ uint32 -ActivePrimaryWorkerNodeCount(void) +ActivePrimaryNonCoordinatorNodeCount(void) { - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); uint32 liveWorkerCount = list_length(workerNodeList); return liveWorkerCount; @@ -306,12 +307,13 @@ ActivePrimaryWorkerNodeCount(void) /* - * ActiveReadableWorkerNodeCount returns the number of groups with a node we can read from. + * ActiveReadableNonCoordinatorNodeCount returns the number of groups with a node we can read from. + * This method excludes coordinator even if it is added as a worker. */ uint32 -ActiveReadableWorkerNodeCount(void) +ActiveReadableNonCoordinatorNodeCount(void) { - List *workerNodeList = ActiveReadableWorkerNodeList(); + List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); uint32 liveWorkerCount = list_length(workerNodeList); return liveWorkerCount; @@ -366,13 +368,14 @@ FilterActiveNodeListFunc(LOCKMODE lockMode, bool (*checkFunction)(WorkerNode *)) /* - * ActivePrimaryWorkerNodeList returns a list of all active primary worker nodes + * ActivePrimaryNonCoordinatorNodeList returns a list of all active primary worker nodes * in workerNodeHash. lockMode specifies which lock to use on pg_dist_node, * this is necessary when the caller wouldn't want nodes to be added concurrent * to their use of this list. + * This method excludes coordinator even if it is added as a worker to cluster. */ List * -ActivePrimaryWorkerNodeList(LOCKMODE lockMode) +ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode) { EnsureModificationsCanRun(); return FilterActiveNodeListFunc(lockMode, NodeIsPrimaryWorker); @@ -443,11 +446,11 @@ NodeCanHaveDistTablePlacements(WorkerNode *node) /* - * ActiveReadableWorkerNodeList returns a list of all nodes in workerNodeHash - * that are readable workers. + * ActiveReadableNonCoordinatorNodeList returns a list of all nodes in workerNodeHash + * that are readable nodes This method excludes coordinator. */ List * -ActiveReadableWorkerNodeList(void) +ActiveReadableNonCoordinatorNodeList(void) { return FilterActiveNodeListFunc(NoLock, NodeIsReadableWorker); } @@ -456,6 +459,7 @@ ActiveReadableWorkerNodeList(void) /* * ActiveReadableNodeList returns a list of all nodes in workerNodeHash * that are readable workers. + * This method includes coordinator if it is added as a worker to the cluster. */ List * ActiveReadableNodeList(void) @@ -602,7 +606,7 @@ WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize) WorkerNode * GetFirstPrimaryWorkerNode(void) { - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); WorkerNode *firstWorkerNode = NULL; WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) diff --git a/src/backend/distributed/planner/intermediate_result_pruning.c b/src/backend/distributed/planner/intermediate_result_pruning.c index a851eb11d..2156e3469 100644 --- a/src/backend/distributed/planner/intermediate_result_pruning.c +++ b/src/backend/distributed/planner/intermediate_result_pruning.c @@ -151,7 +151,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList; List *subPlanList = distributedPlan->subPlanList; ListCell *subPlanCell = NULL; - int workerNodeCount = ActiveReadableWorkerNodeCount(); + int workerNodeCount = ActiveReadableNonCoordinatorNodeCount(); foreach(subPlanCell, usedSubPlanNodeList) { @@ -269,7 +269,7 @@ AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, static void AppendAllWorkerNodes(IntermediateResultsHashEntry *entry) { - List *workerNodeList = ActiveReadableWorkerNodeList(); + List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 955fca0b0..3e389e8a9 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2107,7 +2107,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependentJobList, Var *partitionKey, static uint32 HashPartitionCount(void) { - uint32 groupCount = ActiveReadableWorkerNodeCount(); + uint32 groupCount = ActiveReadableNonCoordinatorNodeCount(); double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0; uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode); @@ -5717,7 +5717,7 @@ AssignDualHashTaskList(List *taskList) * if subsequent jobs have a small number of tasks, we won't allocate the * tasks to the same worker repeatedly. */ - List *workerNodeList = ActiveReadableWorkerNodeList(); + List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); uint32 workerNodeCount = (uint32) list_length(workerNodeList); uint32 beginningNodeIndex = jobId % workerNodeCount; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 809300f7c..f980529d4 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -2275,7 +2275,7 @@ CreateDummyPlacement(bool hasLocalRelation) return CreateLocalDummyPlacement(); } - List *workerNodeList = ActiveReadableWorkerNodeList(); + List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); if (workerNodeList == NIL) { /* diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index e7450454a..2a1b6c290 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -75,7 +75,7 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS) { uint32 timeout = PG_GETARG_UINT32(0); - List *workerList = ActivePrimaryWorkerNodeList(NoLock); + List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock); bool waitNotifications = false; WorkerNode *workerNode = NULL; diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index f49449251..e40fdf999 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -217,7 +217,7 @@ Datum get_global_active_transactions(PG_FUNCTION_ARGS) { TupleDesc tupleDescriptor = NULL; - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); List *connectionList = NIL; StringInfo queryToSend = makeStringInfo(); diff --git a/src/backend/distributed/transaction/citus_dist_stat_activity.c b/src/backend/distributed/transaction/citus_dist_stat_activity.c index 96229f281..d3cc93ea6 100644 --- a/src/backend/distributed/transaction/citus_dist_stat_activity.c +++ b/src/backend/distributed/transaction/citus_dist_stat_activity.c @@ -311,7 +311,7 @@ citus_worker_stat_activity(PG_FUNCTION_ARGS) static List * CitusStatActivity(const char *statQuery) { - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); List *connectionList = NIL; /* @@ -437,7 +437,7 @@ GetLocalNodeCitusDistStat(const char *statQuery) int32 localGroupId = GetLocalGroupId(); /* get the current worker's node stats */ - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index bd0db1d8e..7e010f8c9 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -156,7 +156,7 @@ static void SendCommandListToAllWorkersInternal(List *commandList, bool failOnError, const char *superuser) { - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) @@ -198,7 +198,7 @@ SendOptionalCommandListToAllWorkers(List *commandList, const char *superuser) List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { - List *workerNodeList = ActivePrimaryWorkerNodeList(lockMode); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); List *result = NIL; int32 localGroupId = GetLocalGroupId(); diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 603e1ec71..08d07f257 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -230,7 +230,7 @@ LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList) static bool IsFirstWorkerNode() { - List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); diff --git a/src/backend/distributed/utils/statistics_collection.c b/src/backend/distributed/utils/statistics_collection.c index 47a6e082e..d1430abd1 100644 --- a/src/backend/distributed/utils/statistics_collection.c +++ b/src/backend/distributed/utils/statistics_collection.c @@ -96,7 +96,7 @@ CollectBasicUsageStatistics(void) distTableOids = DistTableOidList(); roundedDistTableCount = NextPow2(list_length(distTableOids)); roundedClusterSize = NextPow2(DistributedTablesSize(distTableOids)); - workerNodeCount = ActivePrimaryWorkerNodeCount(); + workerNodeCount = ActivePrimaryNonCoordinatorNodeCount(); metadataJsonbDatum = DistNodeMetadata(); metadataJsonbStr = DatumGetCString(DirectFunctionCall1(jsonb_out, metadataJsonbDatum)); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 336e1eedf..c49eb8e41 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -70,14 +70,14 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId, uint32 placementIndex); extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); -extern uint32 ActivePrimaryWorkerNodeCount(void); -extern List * ActivePrimaryWorkerNodeList(LOCKMODE lockMode); +extern uint32 ActivePrimaryNonCoordinatorNodeCount(void); +extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode); extern List * ActivePrimaryNodeList(LOCKMODE lockMode); extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode); extern List * DistributedTablePlacementNodeList(LOCKMODE lockMode); extern bool NodeCanHaveDistTablePlacements(WorkerNode *node); -extern uint32 ActiveReadableWorkerNodeCount(void); -extern List * ActiveReadableWorkerNodeList(void); +extern uint32 ActiveReadableNonCoordinatorNodeCount(void); +extern List * ActiveReadableNonCoordinatorNodeList(void); extern List * ActiveReadableNodeList(void); extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); extern WorkerNode * ForceFindWorkerNode(const char *nodeName, int32 nodePort);