rename node/worker utilities

The names were not explicit about what they do, and we have many
misusages in the codebase, so they are renamed to be more explicit.

(cherry picked from commit 09962a7e2ff340705b6b193bbfececa2d48e0855)
pull/4206/head
Sait Talha Nisanci 2020-07-09 12:25:30 +03:00
parent 0bd4002e5f
commit a04e7b233e
19 changed files with 42 additions and 38 deletions

View File

@ -85,7 +85,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
* either get it now, or get it in master_add_node after this transaction finishes and * either get it now, or get it in master_add_node after this transaction finishes and
* the pg_dist_object record becomes visible. * the pg_dist_object record becomes visible.
*/ */
List *workerNodeList = ActivePrimaryWorkerNodeList(RowShareLock); List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(RowShareLock);
/* /*
* right after we acquired the lock we mark our objects as distributed, these changes * right after we acquired the lock we mark our objects as distributed, these changes

View File

@ -1022,7 +1022,7 @@ EnsureSequentialModeForFunctionDDL(void)
static void static void
TriggerSyncMetadataToPrimaryNodes(void) TriggerSyncMetadataToPrimaryNodes(void)
{ {
List *workerList = ActivePrimaryWorkerNodeList(ShareLock); List *workerList = ActivePrimaryNonCoordinatorNodeList(ShareLock);
bool triggerMetadataSync = false; bool triggerMetadataSync = false;
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;

View File

@ -136,7 +136,7 @@ broadcast_intermediate_result(PG_FUNCTION_ARGS)
*/ */
UseCoordinatedTransaction(); UseCoordinatedTransaction();
List *nodeList = ActivePrimaryWorkerNodeList(NoLock); List *nodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
EState *estate = CreateExecutorState(); EState *estate = CreateExecutorState();
RemoteFileDestReceiver *resultDest = RemoteFileDestReceiver *resultDest =
(RemoteFileDestReceiver *) CreateRemoteFileDestReceiver(resultIdString, (RemoteFileDestReceiver *) CreateRemoteFileDestReceiver(resultIdString,

View File

@ -118,7 +118,7 @@ JobExecutorType(DistributedPlan *distributedPlan)
} }
else else
{ {
List *workerNodeList = ActiveReadableWorkerNodeList(); List *workerNodeList = ActiveReadableNonCoordinatorNodeList();
int workerNodeCount = list_length(workerNodeList); int workerNodeCount = list_length(workerNodeList);
int taskCount = list_length(job->taskList); int taskCount = list_length(job->taskList);
double tasksPerNode = taskCount / ((double) workerNodeCount); double tasksPerNode = taskCount / ((double) workerNodeCount);

View File

@ -209,7 +209,7 @@ MultiTaskTrackerExecute(Job *job)
* assigning and checking the status of tasks. The second (temporary) hash * assigning and checking the status of tasks. The second (temporary) hash
* helps us in fetching results data from worker nodes to the master node. * helps us in fetching results data from worker nodes to the master node.
*/ */
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
uint32 taskTrackerCount = (uint32) list_length(workerNodeList); uint32 taskTrackerCount = (uint32) list_length(workerNodeList);
/* connect as the current user for running queries */ /* connect as the current user for running queries */

View File

@ -1254,7 +1254,7 @@ SchemaOwnerName(Oid objectId)
static bool static bool
HasMetadataWorkers(void) HasMetadataWorkers(void)
{ {
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList) foreach_ptr(workerNode, workerNodeList)
@ -1373,7 +1373,7 @@ SyncMetadataToNodes(void)
return METADATA_SYNC_FAILED_LOCK; return METADATA_SYNC_FAILED_LOCK;
} }
List *workerList = ActivePrimaryWorkerNodeList(NoLock); List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerList) foreach_ptr(workerNode, workerList)
{ {

View File

@ -117,7 +117,7 @@ OpenConnectionsToAllWorkerNodes(LOCKMODE lockMode)
List *connectionList = NIL; List *connectionList = NIL;
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = FORCE_NEW_CONNECTION;
List *workerNodeList = ActivePrimaryWorkerNodeList(lockMode); List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode);
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList) foreach_ptr(workerNode, workerNodeList)

View File

@ -457,7 +457,7 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS)
MemoryContext oldContext = MemoryContextSwitchTo( MemoryContext oldContext = MemoryContextSwitchTo(
functionContext->multi_call_memory_ctx); functionContext->multi_call_memory_ctx);
List *workerNodeList = ActiveReadableWorkerNodeList(); List *workerNodeList = ActiveReadableNonCoordinatorNodeList();
workerNodeCount = (uint32) list_length(workerNodeList); workerNodeCount = (uint32) list_length(workerNodeList);
functionContext->user_fctx = workerNodeList; functionContext->user_fctx = workerNodeList;

View File

@ -293,12 +293,13 @@ WorkerGetNodeWithName(const char *hostname)
/* /*
* ActivePrimaryWorkerNodeCount returns the number of groups with a primary in the cluster. * ActivePrimaryNonCoordinatorNodeCount returns the number of groups with a primary in the cluster.
* This method excludes coordinator even if it is added as a worker to cluster.
*/ */
uint32 uint32
ActivePrimaryWorkerNodeCount(void) ActivePrimaryNonCoordinatorNodeCount(void)
{ {
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
uint32 liveWorkerCount = list_length(workerNodeList); uint32 liveWorkerCount = list_length(workerNodeList);
return liveWorkerCount; return liveWorkerCount;
@ -306,12 +307,13 @@ ActivePrimaryWorkerNodeCount(void)
/* /*
* ActiveReadableWorkerNodeCount returns the number of groups with a node we can read from. * ActiveReadableNonCoordinatorNodeCount returns the number of groups with a node we can read from.
* This method excludes coordinator even if it is added as a worker.
*/ */
uint32 uint32
ActiveReadableWorkerNodeCount(void) ActiveReadableNonCoordinatorNodeCount(void)
{ {
List *workerNodeList = ActiveReadableWorkerNodeList(); List *workerNodeList = ActiveReadableNonCoordinatorNodeList();
uint32 liveWorkerCount = list_length(workerNodeList); uint32 liveWorkerCount = list_length(workerNodeList);
return liveWorkerCount; return liveWorkerCount;
@ -366,13 +368,14 @@ FilterActiveNodeListFunc(LOCKMODE lockMode, bool (*checkFunction)(WorkerNode *))
/* /*
* ActivePrimaryWorkerNodeList returns a list of all active primary worker nodes * ActivePrimaryNonCoordinatorNodeList returns a list of all active primary worker nodes
* in workerNodeHash. lockMode specifies which lock to use on pg_dist_node, * in workerNodeHash. lockMode specifies which lock to use on pg_dist_node,
* this is necessary when the caller wouldn't want nodes to be added concurrent * this is necessary when the caller wouldn't want nodes to be added concurrent
* to their use of this list. * to their use of this list.
* This method excludes coordinator even if it is added as a worker to cluster.
*/ */
List * List *
ActivePrimaryWorkerNodeList(LOCKMODE lockMode) ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode)
{ {
EnsureModificationsCanRun(); EnsureModificationsCanRun();
return FilterActiveNodeListFunc(lockMode, NodeIsPrimaryWorker); return FilterActiveNodeListFunc(lockMode, NodeIsPrimaryWorker);
@ -443,11 +446,11 @@ NodeCanHaveDistTablePlacements(WorkerNode *node)
/* /*
* ActiveReadableWorkerNodeList returns a list of all nodes in workerNodeHash * ActiveReadableNonCoordinatorNodeList returns a list of all nodes in workerNodeHash
* that are readable workers. * that are readable nodes This method excludes coordinator.
*/ */
List * List *
ActiveReadableWorkerNodeList(void) ActiveReadableNonCoordinatorNodeList(void)
{ {
return FilterActiveNodeListFunc(NoLock, NodeIsReadableWorker); return FilterActiveNodeListFunc(NoLock, NodeIsReadableWorker);
} }
@ -456,6 +459,7 @@ ActiveReadableWorkerNodeList(void)
/* /*
* ActiveReadableNodeList returns a list of all nodes in workerNodeHash * ActiveReadableNodeList returns a list of all nodes in workerNodeHash
* that are readable workers. * that are readable workers.
* This method includes coordinator if it is added as a worker to the cluster.
*/ */
List * List *
ActiveReadableNodeList(void) ActiveReadableNodeList(void)
@ -602,7 +606,7 @@ WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize)
WorkerNode * WorkerNode *
GetFirstPrimaryWorkerNode(void) GetFirstPrimaryWorkerNode(void)
{ {
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
WorkerNode *firstWorkerNode = NULL; WorkerNode *firstWorkerNode = NULL;
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList) foreach_ptr(workerNode, workerNodeList)

View File

@ -151,7 +151,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList; List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList;
List *subPlanList = distributedPlan->subPlanList; List *subPlanList = distributedPlan->subPlanList;
ListCell *subPlanCell = NULL; ListCell *subPlanCell = NULL;
int workerNodeCount = ActiveReadableWorkerNodeCount(); int workerNodeCount = ActiveReadableNonCoordinatorNodeCount();
foreach(subPlanCell, usedSubPlanNodeList) foreach(subPlanCell, usedSubPlanNodeList)
{ {
@ -269,7 +269,7 @@ AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry,
static void static void
AppendAllWorkerNodes(IntermediateResultsHashEntry *entry) AppendAllWorkerNodes(IntermediateResultsHashEntry *entry)
{ {
List *workerNodeList = ActiveReadableWorkerNodeList(); List *workerNodeList = ActiveReadableNonCoordinatorNodeList();
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList) foreach_ptr(workerNode, workerNodeList)

View File

@ -2107,7 +2107,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependentJobList, Var *partitionKey,
static uint32 static uint32
HashPartitionCount(void) HashPartitionCount(void)
{ {
uint32 groupCount = ActiveReadableWorkerNodeCount(); uint32 groupCount = ActiveReadableNonCoordinatorNodeCount();
double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0; double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0;
uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode); uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode);
@ -5717,7 +5717,7 @@ AssignDualHashTaskList(List *taskList)
* if subsequent jobs have a small number of tasks, we won't allocate the * if subsequent jobs have a small number of tasks, we won't allocate the
* tasks to the same worker repeatedly. * tasks to the same worker repeatedly.
*/ */
List *workerNodeList = ActiveReadableWorkerNodeList(); List *workerNodeList = ActiveReadableNonCoordinatorNodeList();
uint32 workerNodeCount = (uint32) list_length(workerNodeList); uint32 workerNodeCount = (uint32) list_length(workerNodeList);
uint32 beginningNodeIndex = jobId % workerNodeCount; uint32 beginningNodeIndex = jobId % workerNodeCount;

View File

@ -2299,7 +2299,7 @@ CreateDummyPlacement(bool hasLocalRelation)
return CreateLocalDummyPlacement(); return CreateLocalDummyPlacement();
} }
List *workerNodeList = ActiveReadableWorkerNodeList(); List *workerNodeList = ActiveReadableNonCoordinatorNodeList();
if (workerNodeList == NIL) if (workerNodeList == NIL)
{ {
/* /*

View File

@ -75,7 +75,7 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
{ {
uint32 timeout = PG_GETARG_UINT32(0); uint32 timeout = PG_GETARG_UINT32(0);
List *workerList = ActivePrimaryWorkerNodeList(NoLock); List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
bool waitNotifications = false; bool waitNotifications = false;
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;

View File

@ -217,7 +217,7 @@ Datum
get_global_active_transactions(PG_FUNCTION_ARGS) get_global_active_transactions(PG_FUNCTION_ARGS)
{ {
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
List *connectionList = NIL; List *connectionList = NIL;
StringInfo queryToSend = makeStringInfo(); StringInfo queryToSend = makeStringInfo();

View File

@ -311,7 +311,7 @@ citus_worker_stat_activity(PG_FUNCTION_ARGS)
static List * static List *
CitusStatActivity(const char *statQuery) CitusStatActivity(const char *statQuery)
{ {
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
List *connectionList = NIL; List *connectionList = NIL;
/* /*
@ -437,7 +437,7 @@ GetLocalNodeCitusDistStat(const char *statQuery)
int32 localGroupId = GetLocalGroupId(); int32 localGroupId = GetLocalGroupId();
/* get the current worker's node stats */ /* get the current worker's node stats */
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList) foreach_ptr(workerNode, workerNodeList)
{ {

View File

@ -156,7 +156,7 @@ static void
SendCommandListToAllWorkersInternal(List *commandList, bool failOnError, const SendCommandListToAllWorkersInternal(List *commandList, bool failOnError, const
char *superuser) char *superuser)
{ {
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList) foreach_ptr(workerNode, workerNodeList)
@ -198,7 +198,7 @@ SendOptionalCommandListToAllWorkers(List *commandList, const char *superuser)
List * List *
TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
{ {
List *workerNodeList = ActivePrimaryWorkerNodeList(lockMode); List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode);
List *result = NIL; List *result = NIL;
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;

View File

@ -230,7 +230,7 @@ LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList)
static bool static bool
IsFirstWorkerNode() IsFirstWorkerNode()
{ {
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
workerNodeList = SortList(workerNodeList, CompareWorkerNodes); workerNodeList = SortList(workerNodeList, CompareWorkerNodes);

View File

@ -96,7 +96,7 @@ CollectBasicUsageStatistics(void)
distTableOids = DistTableOidList(); distTableOids = DistTableOidList();
roundedDistTableCount = NextPow2(list_length(distTableOids)); roundedDistTableCount = NextPow2(list_length(distTableOids));
roundedClusterSize = NextPow2(DistributedTablesSize(distTableOids)); roundedClusterSize = NextPow2(DistributedTablesSize(distTableOids));
workerNodeCount = ActivePrimaryWorkerNodeCount(); workerNodeCount = ActivePrimaryNonCoordinatorNodeCount();
metadataJsonbDatum = DistNodeMetadata(); metadataJsonbDatum = DistNodeMetadata();
metadataJsonbStr = DatumGetCString(DirectFunctionCall1(jsonb_out, metadataJsonbStr = DatumGetCString(DirectFunctionCall1(jsonb_out,
metadataJsonbDatum)); metadataJsonbDatum));

View File

@ -70,14 +70,14 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList,
uint64 shardId, uint64 shardId,
uint32 placementIndex); uint32 placementIndex);
extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList);
extern uint32 ActivePrimaryWorkerNodeCount(void); extern uint32 ActivePrimaryNonCoordinatorNodeCount(void);
extern List * ActivePrimaryWorkerNodeList(LOCKMODE lockMode); extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode);
extern List * ActivePrimaryNodeList(LOCKMODE lockMode); extern List * ActivePrimaryNodeList(LOCKMODE lockMode);
extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode); extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode);
extern List * DistributedTablePlacementNodeList(LOCKMODE lockMode); extern List * DistributedTablePlacementNodeList(LOCKMODE lockMode);
extern bool NodeCanHaveDistTablePlacements(WorkerNode *node); extern bool NodeCanHaveDistTablePlacements(WorkerNode *node);
extern uint32 ActiveReadableWorkerNodeCount(void); extern uint32 ActiveReadableNonCoordinatorNodeCount(void);
extern List * ActiveReadableWorkerNodeList(void); extern List * ActiveReadableNonCoordinatorNodeList(void);
extern List * ActiveReadableNodeList(void); extern List * ActiveReadableNodeList(void);
extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort);
extern WorkerNode * ForceFindWorkerNode(const char *nodeName, int32 nodePort); extern WorkerNode * ForceFindWorkerNode(const char *nodeName, int32 nodePort);