mirror of https://github.com/citusdata/citus.git
Allow citus_update_node() to work with nodes from different clusters (#6466)
DESCRIPTION: Allow citus_update_node() to work with nodes from different clusters citus_update_node(), citus_nodename_for_nodeid(), and citus_nodeport_for_nodeid() functions only checked for nodes in their own clusters and hence last two returned NULLs and the first one showed an error is the nodeId was from a different cluster. Fixes https://github.com/citusdata/citus/issues/6433pull/6468/head^2
parent
3f66f3d9dd
commit
402a30a2b7
|
@ -128,6 +128,7 @@ static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum val
|
|||
static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards);
|
||||
static void RemoveOldShardPlacementForNodeGroup(int groupId);
|
||||
static int FindCoordinatorNodeId(void);
|
||||
static WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId);
|
||||
|
||||
/* declarations for dynamic loading */
|
||||
PG_FUNCTION_INFO_V1(citus_set_coordinator_host);
|
||||
|
@ -1343,7 +1344,7 @@ citus_update_node(PG_FUNCTION_ARGS)
|
|||
}
|
||||
}
|
||||
|
||||
WorkerNode *workerNode = LookupNodeByNodeId(nodeId);
|
||||
WorkerNode *workerNode = FindNodeAnyClusterByNodeId(nodeId);
|
||||
if (workerNode == NULL)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND),
|
||||
|
@ -1419,7 +1420,7 @@ citus_update_node(PG_FUNCTION_ARGS)
|
|||
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);
|
||||
|
||||
/* we should be able to find the new node from the metadata */
|
||||
workerNode = FindWorkerNode(newNodeNameString, newNodePort);
|
||||
workerNode = FindWorkerNodeAnyCluster(newNodeNameString, newNodePort);
|
||||
Assert(workerNode->nodeId == nodeId);
|
||||
|
||||
/*
|
||||
|
@ -1620,8 +1621,7 @@ citus_nodename_for_nodeid(PG_FUNCTION_ARGS)
|
|||
|
||||
int nodeId = PG_GETARG_INT32(0);
|
||||
|
||||
bool missingOk = true;
|
||||
WorkerNode *node = FindNodeWithNodeId(nodeId, missingOk);
|
||||
WorkerNode *node = FindNodeAnyClusterByNodeId(nodeId);
|
||||
|
||||
if (node == NULL)
|
||||
{
|
||||
|
@ -1642,8 +1642,7 @@ citus_nodeport_for_nodeid(PG_FUNCTION_ARGS)
|
|||
|
||||
int nodeId = PG_GETARG_INT32(0);
|
||||
|
||||
bool missingOk = true;
|
||||
WorkerNode *node = FindNodeWithNodeId(nodeId, missingOk);
|
||||
WorkerNode *node = FindNodeAnyClusterByNodeId(nodeId);
|
||||
|
||||
if (node == NULL)
|
||||
{
|
||||
|
@ -1766,6 +1765,29 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* FindNodeAnyClusterByNodeId searches pg_dist_node and returns the node with
|
||||
* the nodeId. If the node can't be found returns NULL.
|
||||
*/
|
||||
static WorkerNode *
|
||||
FindNodeAnyClusterByNodeId(uint32 nodeId)
|
||||
{
|
||||
bool includeNodesFromOtherClusters = true;
|
||||
List *nodeList = ReadDistNode(includeNodesFromOtherClusters);
|
||||
WorkerNode *node = NULL;
|
||||
|
||||
foreach_ptr(node, nodeList)
|
||||
{
|
||||
if (node->nodeId == nodeId)
|
||||
{
|
||||
return node;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FindNodeWithNodeId searches pg_dist_node and returns the node with the nodeId.
|
||||
* If the node cannot be found this functions errors.
|
||||
|
|
|
@ -961,6 +961,44 @@ SELECT master_add_secondary_node('localhost', 9992, 'localhost', :worker_1_port,
|
|||
29
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=9992 \gset
|
||||
-- citus_update_node allows updating a node from the non-default cluster
|
||||
SELECT citus_update_node(:worker_1_node, 'localhost', 9991);
|
||||
citus_update_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus_nodename_for_nodeid(:worker_1_node);
|
||||
citus_nodename_for_nodeid
|
||||
---------------------------------------------------------------------
|
||||
localhost
|
||||
(1 row)
|
||||
|
||||
SELECT citus_nodeport_for_nodeid(:worker_1_node);
|
||||
citus_nodeport_for_nodeid
|
||||
---------------------------------------------------------------------
|
||||
9991
|
||||
(1 row)
|
||||
|
||||
SELECT citus_update_node(:worker_1_node, 'localhost', 9992);
|
||||
citus_update_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus_nodename_for_nodeid(:worker_1_node);
|
||||
citus_nodename_for_nodeid
|
||||
---------------------------------------------------------------------
|
||||
localhost
|
||||
(1 row)
|
||||
|
||||
SELECT citus_nodeport_for_nodeid(:worker_1_node);
|
||||
citus_nodeport_for_nodeid
|
||||
---------------------------------------------------------------------
|
||||
9992
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
|
||||
-- master_update_node checks node exists
|
||||
SELECT master_update_node(100, 'localhost', 8000);
|
||||
|
|
|
@ -372,6 +372,17 @@ SELECT master_add_secondary_node('localhost', 9994, primaryname => 'localhost',
|
|||
SELECT master_add_secondary_node('localhost', 9993, 'localhost', 2000);
|
||||
SELECT master_add_secondary_node('localhost', 9992, 'localhost', :worker_1_port, nodecluster => 'second-cluster');
|
||||
|
||||
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=9992 \gset
|
||||
|
||||
-- citus_update_node allows updating a node from the non-default cluster
|
||||
SELECT citus_update_node(:worker_1_node, 'localhost', 9991);
|
||||
SELECT citus_nodename_for_nodeid(:worker_1_node);
|
||||
SELECT citus_nodeport_for_nodeid(:worker_1_node);
|
||||
SELECT citus_update_node(:worker_1_node, 'localhost', 9992);
|
||||
SELECT citus_nodename_for_nodeid(:worker_1_node);
|
||||
SELECT citus_nodeport_for_nodeid(:worker_1_node);
|
||||
|
||||
|
||||
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
|
||||
|
||||
-- master_update_node checks node exists
|
||||
|
|
Loading…
Reference in New Issue