diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 897323318..6a61962f5 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -60,7 +60,7 @@ static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, Oid nodeRole, char *nodeCluster, bool *nodeAlreadyExists); static uint32 CountPrimariesWithMetadata(); static void SetNodeState(char *nodeName, int32 nodePort, bool isActive); -static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort); +static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort, bool raiseError); static Datum GenerateNodeTuple(WorkerNode *workerNode); static int32 GetNextGroupId(void); static uint32 GetMaxGroupId(void); @@ -219,7 +219,7 @@ master_disable_node(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); - workerNode = FindWorkerNode(nodeName, nodePort); + workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); if (workerNode == NULL) { ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort))); @@ -268,7 +268,9 @@ master_activate_node(PG_FUNCTION_ARGS) /* - * GroupForNode returns the group which a given node belongs to + * GroupForNode returns the group which a given node belongs to. + * + * It only works if the requested node is a part of CurrentCluster. */ uint32 GroupForNode(char *nodeName, int nodePort) @@ -350,7 +352,7 @@ static Datum ActivateNode(char *nodeName, int nodePort) { Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock); - HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort); + HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort, true); CommandId commandId = GetCurrentCommandId(true); LockTupleMode lockTupleMode = LockTupleExclusive; LockWaitPolicy lockWaitPolicy = LockWaitError; @@ -368,7 +370,7 @@ ActivateNode(char *nodeName, int nodePort) SetNodeState(nodeName, nodePort, isActive); - workerNode = FindWorkerNode(nodeName, nodePort); + workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); if (WorkerNodeIsPrimary(workerNode)) { @@ -531,6 +533,30 @@ FindWorkerNode(char *nodeName, int32 nodePort) } +/* + * FindWorkerNodeAnyCluster returns the workerNode no matter which cluster it is a part + * of. FindWorkerNodes, like almost every other function, acts as if nodes in other + * clusters do not exist. + */ +WorkerNode * +FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort) +{ + WorkerNode *workerNode = NULL; + + Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); + + HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort, false); + if (heapTuple != NULL) + { + workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple); + } + + heap_close(pgDistNode, NoLock); + return workerNode; +} + + /* * ReadWorkerNodes iterates over pg_dist_node table, converts each row * into it's memory representation (i.e., WorkerNode) and adds them into @@ -597,7 +623,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) EnsureCoordinator(); EnsureSuperUser(); - workerNode = FindWorkerNode(nodeName, nodePort); + workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); if (workerNode == NULL) { ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort))); @@ -708,8 +734,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, */ LockRelationOid(DistNodeRelationId(), ShareRowExclusiveLock); - /* check if the node already exists in the cluster */ - workerNode = FindWorkerNode(nodeName, nodePort); + workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); if (workerNode != NULL) { *nodeAlreadyExists = true; @@ -757,7 +782,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata, isActive, nodeRole, nodeCluster); - workerNode = FindWorkerNode(nodeName, nodePort); + workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); /* send the delete command to all primary nodes with metadata */ nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); @@ -786,7 +811,7 @@ SetNodeState(char *nodeName, int32 nodePort, bool isActive) { Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); - HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort); + HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort, true); Datum values[Natts_pg_dist_node]; bool isnull[Natts_pg_dist_node]; @@ -807,22 +832,27 @@ SetNodeState(char *nodeName, int32 nodePort, bool isActive) CitusInvalidateRelcacheByRelid(DistNodeRelationId()); CommandCounterIncrement(); + workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple); + heap_close(pgDistNode, NoLock); /* we also update isactive column at worker nodes */ - workerNode = FindWorkerNode(nodeName, nodePort); nodeStateUpdateCommand = NodeStateUpdateCommand(workerNode->nodeId, isActive); SendCommandToWorkers(WORKERS_WITH_METADATA, nodeStateUpdateCommand); } /* - * GetNodeTuple function returns heap tuple of given nodeName and nodePort. If - * there are no node tuple with specified nodeName and nodePort, this function - * errors out. + * GetNodeTuple function returns heap tuple of given nodeName and nodePort. + * + * If there are no node tuples with specified nodeName and nodePort and raiseError is + * true, this function errors out. If the node is not found and raiseError is false this + * function returns NULL. + * + * This function may return worker nodes from other clusters. */ static HeapTuple -GetNodeTuple(char *nodeName, int32 nodePort) +GetNodeTuple(char *nodeName, int32 nodePort, bool raiseError) { Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); const int scanKeyCount = 2; @@ -843,8 +873,16 @@ GetNodeTuple(char *nodeName, int32 nodePort) heapTuple = systable_getnext(scanDescriptor); if (!HeapTupleIsValid(heapTuple)) { - ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", - nodeName, nodePort))); + if (raiseError) + { + ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", + nodeName, nodePort))); + } + + systable_endscan(scanDescriptor); + heap_close(pgDistNode, NoLock); + + return NULL; } nodeTuple = heap_copytuple(heapTuple); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 3d07fb6bf..81cbdd69f 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -65,6 +65,7 @@ extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); extern uint32 ActivePrimaryNodeCount(void); extern List * ActivePrimaryNodeList(void); extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort); +extern WorkerNode * FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort); extern List * ReadWorkerNodes(void); extern void EnsureCoordinator(void); extern uint32 GroupForNode(char *nodeName, int32 nodePorT); diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 7d4416136..bcbfed032 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -465,6 +465,31 @@ SELECT 1 FROM master_add_inactive_node('localhost', 9996, groupid => :worker_2_g 1 (1 row) +-- check that you can add a seconary to a non-default cluster, and activate it, and remove it +SELECT master_add_inactive_node('localhost', 9999, groupid => :worker_2_group, nodecluster => 'olap', noderole => 'secondary'); + master_add_inactive_node +--------------------------------------------------- + (19,14,localhost,9999,default,f,f,secondary,olap) +(1 row) + +SELECT master_activate_node('localhost', 9999); + master_activate_node +--------------------------------------------------- + (19,14,localhost,9999,default,f,t,secondary,olap) +(1 row) + +SELECT master_disable_node('localhost', 9999); + master_disable_node +--------------------- + +(1 row) + +SELECT master_remove_node('localhost', 9999); + master_remove_node +-------------------- + +(1 row) + -- check that you can't manually add two primaries to a group INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole) VALUES ('localhost', 5000, :worker_1_group, 'primary'); @@ -488,7 +513,7 @@ SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_po SELECT master_add_node('localhost', 8888, groupid => :worker_1_group, noderole => 'secondary', nodecluster=> 'olap'); master_add_node --------------------------------------------------- - (19,12,localhost,8888,default,f,t,secondary,olap) + (20,12,localhost,8888,default,f,t,secondary,olap) (1 row) -- check that super-long cluster names are truncated @@ -501,13 +526,13 @@ SELECT master_add_node('localhost', 8887, groupid => :worker_1_group, noderole = ); master_add_node -------------------------------------------------------------------------------------------------------------- - (20,12,localhost,8887,default,f,t,secondary,thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars.) + (21,12,localhost,8887,default,f,t,secondary,thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars.) (1 row) SELECT * FROM pg_dist_node WHERE nodeport=8887; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster --------+---------+-----------+----------+----------+-------------+----------+-----------+----------------------------------------------------------------- - 20 | 12 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars. + 21 | 12 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars. (1 row) -- don't remove the secondary and unavailable nodes, check that no commands are sent to diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index da4c26de4..664021197 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -198,6 +198,12 @@ SELECT 1 FROM master_add_node('localhost', 9997, groupid => :worker_1_group, nod -- add_inactive_node also works with secondaries SELECT 1 FROM master_add_inactive_node('localhost', 9996, groupid => :worker_2_group, noderole => 'secondary'); +-- check that you can add a seconary to a non-default cluster, and activate it, and remove it +SELECT master_add_inactive_node('localhost', 9999, groupid => :worker_2_group, nodecluster => 'olap', noderole => 'secondary'); +SELECT master_activate_node('localhost', 9999); +SELECT master_disable_node('localhost', 9999); +SELECT master_remove_node('localhost', 9999); + -- check that you can't manually add two primaries to a group INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole) VALUES ('localhost', 5000, :worker_1_group, 'primary');