cluster management UDFs see nodes in different clusters

- master_activate_node and master_disable_node correctly toggle
  isActive, without crashing
- master_add_node rejects duplicate nodes, even if they're in different
  clusters
- master_remove_node allows removing nodes in different clusters
pull/1519/head
Brian Cloutier 2017-08-03 16:36:27 +03:00 committed by Brian Cloutier
parent 3151b52a0b
commit 5914c992e6
4 changed files with 90 additions and 20 deletions

View File

@ -60,7 +60,7 @@ static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId,
Oid nodeRole, char *nodeCluster, bool *nodeAlreadyExists);
static uint32 CountPrimariesWithMetadata();
static void SetNodeState(char *nodeName, int32 nodePort, bool isActive);
static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort);
static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort, bool raiseError);
static Datum GenerateNodeTuple(WorkerNode *workerNode);
static int32 GetNextGroupId(void);
static uint32 GetMaxGroupId(void);
@ -219,7 +219,7 @@ master_disable_node(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);
workerNode = FindWorkerNode(nodeName, nodePort);
workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
if (workerNode == NULL)
{
ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
@ -268,7 +268,9 @@ master_activate_node(PG_FUNCTION_ARGS)
/*
* GroupForNode returns the group which a given node belongs to
* GroupForNode returns the group which a given node belongs to.
*
* It only works if the requested node is a part of CurrentCluster.
*/
uint32
GroupForNode(char *nodeName, int nodePort)
@ -350,7 +352,7 @@ static Datum
ActivateNode(char *nodeName, int nodePort)
{
Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort, true);
CommandId commandId = GetCurrentCommandId(true);
LockTupleMode lockTupleMode = LockTupleExclusive;
LockWaitPolicy lockWaitPolicy = LockWaitError;
@ -368,7 +370,7 @@ ActivateNode(char *nodeName, int nodePort)
SetNodeState(nodeName, nodePort, isActive);
workerNode = FindWorkerNode(nodeName, nodePort);
workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
if (WorkerNodeIsPrimary(workerNode))
{
@ -531,6 +533,30 @@ FindWorkerNode(char *nodeName, int32 nodePort)
}
/*
* FindWorkerNodeAnyCluster returns the workerNode no matter which cluster it is a part
* of. FindWorkerNodes, like almost every other function, acts as if nodes in other
* clusters do not exist.
*/
WorkerNode *
FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort)
{
WorkerNode *workerNode = NULL;
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort, false);
if (heapTuple != NULL)
{
workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
}
heap_close(pgDistNode, NoLock);
return workerNode;
}
/*
* ReadWorkerNodes iterates over pg_dist_node table, converts each row
* into it's memory representation (i.e., WorkerNode) and adds them into
@ -597,7 +623,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
EnsureCoordinator();
EnsureSuperUser();
workerNode = FindWorkerNode(nodeName, nodePort);
workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
if (workerNode == NULL)
{
ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
@ -708,8 +734,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
*/
LockRelationOid(DistNodeRelationId(), ShareRowExclusiveLock);
/* check if the node already exists in the cluster */
workerNode = FindWorkerNode(nodeName, nodePort);
workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
if (workerNode != NULL)
{
*nodeAlreadyExists = true;
@ -757,7 +782,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata,
isActive, nodeRole, nodeCluster);
workerNode = FindWorkerNode(nodeName, nodePort);
workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
/* send the delete command to all primary nodes with metadata */
nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
@ -786,7 +811,7 @@ SetNodeState(char *nodeName, int32 nodePort, bool isActive)
{
Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort, true);
Datum values[Natts_pg_dist_node];
bool isnull[Natts_pg_dist_node];
@ -807,22 +832,27 @@ SetNodeState(char *nodeName, int32 nodePort, bool isActive)
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
CommandCounterIncrement();
workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
heap_close(pgDistNode, NoLock);
/* we also update isactive column at worker nodes */
workerNode = FindWorkerNode(nodeName, nodePort);
nodeStateUpdateCommand = NodeStateUpdateCommand(workerNode->nodeId, isActive);
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeStateUpdateCommand);
}
/*
* GetNodeTuple function returns heap tuple of given nodeName and nodePort. If
* there are no node tuple with specified nodeName and nodePort, this function
* errors out.
* GetNodeTuple function returns heap tuple of given nodeName and nodePort.
*
* If there are no node tuples with specified nodeName and nodePort and raiseError is
* true, this function errors out. If the node is not found and raiseError is false this
* function returns NULL.
*
* This function may return worker nodes from other clusters.
*/
static HeapTuple
GetNodeTuple(char *nodeName, int32 nodePort)
GetNodeTuple(char *nodeName, int32 nodePort, bool raiseError)
{
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
const int scanKeyCount = 2;
@ -843,8 +873,16 @@ GetNodeTuple(char *nodeName, int32 nodePort)
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
nodeName, nodePort)));
if (raiseError)
{
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
nodeName, nodePort)));
}
systable_endscan(scanDescriptor);
heap_close(pgDistNode, NoLock);
return NULL;
}
nodeTuple = heap_copytuple(heapTuple);

View File

@ -65,6 +65,7 @@ extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList);
extern uint32 ActivePrimaryNodeCount(void);
extern List * ActivePrimaryNodeList(void);
extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort);
extern WorkerNode * FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort);
extern List * ReadWorkerNodes(void);
extern void EnsureCoordinator(void);
extern uint32 GroupForNode(char *nodeName, int32 nodePorT);

View File

@ -465,6 +465,31 @@ SELECT 1 FROM master_add_inactive_node('localhost', 9996, groupid => :worker_2_g
1
(1 row)
-- check that you can add a seconary to a non-default cluster, and activate it, and remove it
SELECT master_add_inactive_node('localhost', 9999, groupid => :worker_2_group, nodecluster => 'olap', noderole => 'secondary');
master_add_inactive_node
---------------------------------------------------
(19,14,localhost,9999,default,f,f,secondary,olap)
(1 row)
SELECT master_activate_node('localhost', 9999);
master_activate_node
---------------------------------------------------
(19,14,localhost,9999,default,f,t,secondary,olap)
(1 row)
SELECT master_disable_node('localhost', 9999);
master_disable_node
---------------------
(1 row)
SELECT master_remove_node('localhost', 9999);
master_remove_node
--------------------
(1 row)
-- check that you can't manually add two primaries to a group
INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole)
VALUES ('localhost', 5000, :worker_1_group, 'primary');
@ -488,7 +513,7 @@ SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_po
SELECT master_add_node('localhost', 8888, groupid => :worker_1_group, noderole => 'secondary', nodecluster=> 'olap');
master_add_node
---------------------------------------------------
(19,12,localhost,8888,default,f,t,secondary,olap)
(20,12,localhost,8888,default,f,t,secondary,olap)
(1 row)
-- check that super-long cluster names are truncated
@ -501,13 +526,13 @@ SELECT master_add_node('localhost', 8887, groupid => :worker_1_group, noderole =
);
master_add_node
--------------------------------------------------------------------------------------------------------------
(20,12,localhost,8887,default,f,t,secondary,thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars.)
(21,12,localhost,8887,default,f,t,secondary,thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars.)
(1 row)
SELECT * FROM pg_dist_node WHERE nodeport=8887;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
--------+---------+-----------+----------+----------+-------------+----------+-----------+-----------------------------------------------------------------
20 | 12 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars.
21 | 12 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars.
(1 row)
-- don't remove the secondary and unavailable nodes, check that no commands are sent to

View File

@ -198,6 +198,12 @@ SELECT 1 FROM master_add_node('localhost', 9997, groupid => :worker_1_group, nod
-- add_inactive_node also works with secondaries
SELECT 1 FROM master_add_inactive_node('localhost', 9996, groupid => :worker_2_group, noderole => 'secondary');
-- check that you can add a seconary to a non-default cluster, and activate it, and remove it
SELECT master_add_inactive_node('localhost', 9999, groupid => :worker_2_group, nodecluster => 'olap', noderole => 'secondary');
SELECT master_activate_node('localhost', 9999);
SELECT master_disable_node('localhost', 9999);
SELECT master_remove_node('localhost', 9999);
-- check that you can't manually add two primaries to a group
INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole)
VALUES ('localhost', 5000, :worker_1_group, 'primary');