Enhance replica management with removal and ID-based UDFs

This commit builds upon the initial replica registration feature by
introducing several enhancements and new capabilities for managing
replica nodes.

The following UDFs have been added to provide more flexible and
complete replica management:

- `citus_add_replica_node_with_nodeid`: Allows adding a replica by
  referencing the primary node's ID, which is more convenient than
  using hostname and port.
- `citus_remove_replica_node`: Allows for the removal of an inactive
  replica node by its hostname and port.
- `citus_remove_replica_node_with_nodeid`: Allows for the removal of an
  inactive replica node by its ID.

Additionally the newly created replicas are now assigned a new group ID
instead of inheriting the primary's group ID. This change more
accurately reflects their status as independent non-usable nodes until
they are promoted.
pull/8046/head
Muhammad Usama 2025-06-20 19:28:17 +05:00
parent d457ea49bf
commit 932eb8c052
4 changed files with 194 additions and 13 deletions

View File

@ -142,8 +142,10 @@ static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode
WorkerNode *workerNode, bool force, int32 lock_cooldown);
static void EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode,
char* replicaHostname,
int replicaPort);
char* replicaHostname, int replicaPort);
static int32 CitusAddReplicaNode(WorkerNode *primaryWorkerNode,
char *replicaHostname, int32 replicaPort);
static void RemoveReplicaNode(WorkerNode *replicaNode);
/* Function definitions go here */
@ -174,6 +176,9 @@ PG_FUNCTION_INFO_V1(citus_is_coordinator);
PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced);
PG_FUNCTION_INFO_V1(citus_is_primary_node);
PG_FUNCTION_INFO_V1(citus_add_replica_node);
PG_FUNCTION_INFO_V1(citus_add_replica_node_with_nodeid);
PG_FUNCTION_INFO_V1(citus_remove_replica_node);
PG_FUNCTION_INFO_V1(citus_remove_replica_node_with_nodeid);
/*
* DefaultNodeMetadata creates a NodeMetadata struct with the fields set to
@ -1429,12 +1434,10 @@ master_update_node(PG_FUNCTION_ARGS)
return citus_update_node(fcinfo);
}
/*
* citus_add_replica_node function adds a new node as a replica of an existing primary node.
* It records the replica's hostname, port, and links it to the primary node's ID.
* The replica is initially marked as inactive and not having shards.
* citus_add_replica_node adds a new node as a replica of an existing primary node.
*/
Datum
citus_add_replica_node(PG_FUNCTION_ARGS)
{
@ -1459,6 +1462,60 @@ citus_add_replica_node(PG_FUNCTION_ARGS)
primaryHostname, primaryPort)));
}
int32 replicaNodeId = CitusAddReplicaNode(primaryWorkerNode, replicaHostname, replicaPort);
PG_RETURN_INT32(replicaNodeId);
}
/*
* citus_add_replica_node_with_nodeid adds a new node as a replica of an existing primary node
* using the primary node's ID. It records the replica's hostname, port, and links it to the
* primary node's ID.
*
* This function is useful when you already know the primary node's ID and want to add a replica
* without needing to look it up by hostname and port.
*/
Datum
citus_add_replica_node_with_nodeid(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
EnsureCoordinator();
text *replicaHostnameText = PG_GETARG_TEXT_P(0);
int32 replicaPort = PG_GETARG_INT32(1);
int32 primaryNodeId = PG_GETARG_INT32(2);
char *replicaHostname = text_to_cstring(replicaHostnameText);
bool missingOk = false;
WorkerNode *primaryWorkerNode = FindNodeWithNodeId(primaryNodeId, missingOk);
if (primaryWorkerNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("primary node with ID %d does not exist", primaryNodeId)));
}
int32 replicaNodeId = CitusAddReplicaNode(primaryWorkerNode, replicaHostname, replicaPort);
PG_RETURN_INT32(replicaNodeId);
}
/*
* CitusAddReplicaNode function adds a new node as a replica of an existing primary node.
* It records the replica's hostname, port, and links it to the primary node's ID.
* The replica is initially marked as inactive and not having shards.
*/
static int32
CitusAddReplicaNode(WorkerNode *primaryWorkerNode,
char *replicaHostname, int32 replicaPort)
{
Assert(primaryWorkerNode != NULL);
/* Future-proofing: Ideally, a primary node should not itself be a replica.
* This check might be more relevant once replica promotion logic exists.
* For now, pg_dist_node.nodeisreplica defaults to false for existing nodes.
@ -1467,14 +1524,14 @@ citus_add_replica_node(PG_FUNCTION_ARGS)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("primary node %s:%d is itself a replica and cannot have replicas",
primaryHostname, primaryPort)));
primaryWorkerNode->workerName, primaryWorkerNode->workerPort)));
}
if (!primaryWorkerNode->shouldHaveShards)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("primary node %s:%d does not have shards, node without shards cannot have replicas",
primaryHostname, primaryPort)));
primaryWorkerNode->workerName, primaryWorkerNode->workerPort)));
}
WorkerNode *existingReplicaNode = FindWorkerNodeAnyCluster(replicaHostname, replicaPort);
@ -1489,7 +1546,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS)
{
ereport(NOTICE, (errmsg("node %s:%d is already registered as a replica for primary %s:%d (nodeid %d)",
replicaHostname, replicaPort,
primaryHostname, primaryPort, primaryWorkerNode->nodeId)));
primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primaryWorkerNode->nodeId)));
PG_RETURN_INT32(existingReplicaNode->nodeId);
}
else
@ -1507,7 +1564,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS)
nodeMetadata.nodeprimarynodeid = primaryWorkerNode->nodeId;
nodeMetadata.isActive = false; /* Replicas start as inactive */
nodeMetadata.shouldHaveShards = false; /* Replicas do not directly own primary shards */
nodeMetadata.groupId = primaryWorkerNode->groupId; /* Same group as primary */
nodeMetadata.groupId = INVALID_GROUP_ID; /* Replicas get a new group ID and do not belong to any existing group */
nodeMetadata.nodeRole = UnavailableNodeRoleId(); /* The node role is set to 'unavailable' */
nodeMetadata.nodeCluster = primaryWorkerNode->nodeCluster; /* Same cluster as primary */
/* Other fields like hasMetadata, metadataSynced will take defaults from DefaultNodeMetadata
@ -1544,7 +1601,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS)
{
ereport(NOTICE, (errmsg("node %s:%d was already correctly registered as a replica for primary %s:%d (nodeid %d)",
replicaHostname, replicaPort,
primaryHostname, primaryPort, primaryWorkerNode->nodeId)));
primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primaryWorkerNode->nodeId)));
/* Intentional fall-through to return replicaNodeId */
}
else
@ -1552,7 +1609,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS)
/* This state is less expected if our initial check passed or errored. */
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("node %s:%d already exists but is not correctly configured as a replica for primary %s:%d",
replicaHostname, replicaPort, primaryHostname, primaryPort)));
replicaHostname, replicaPort, primaryWorkerNode->workerName, primaryWorkerNode->workerPort)));
}
}
@ -1569,9 +1626,95 @@ citus_add_replica_node(PG_FUNCTION_ARGS)
* For now, citus_add_node_replica just registers it.
*/
PG_RETURN_INT32(replicaNodeId);
return replicaNodeId;
}
/*
* citus_remove_replica_node removes an inactive streaming replica node from Citus metadata.
*/
Datum
citus_remove_replica_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
EnsureCoordinator();
text *nodeNameText = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeName = text_to_cstring(nodeNameText);
WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
if (workerNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("node \"%s:%d\" does not exist", nodeName, nodePort)));
}
RemoveReplicaNode(workerNode);
PG_RETURN_VOID();
}
/*
* citus_remove_replica_node_with_nodeid removes an inactive streaming replica node from Citus metadata
* using the node's ID.
*/
Datum
citus_remove_replica_node_with_nodeid(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
EnsureCoordinator();
uint32 replicaNodeId = PG_GETARG_INT32(0);
WorkerNode *replicaNode = FindNodeAnyClusterByNodeId(replicaNodeId);
if (replicaNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Replica node with ID %d does not exist", replicaNodeId)));
}
RemoveReplicaNode(replicaNode);
PG_RETURN_VOID();
}
static void
RemoveReplicaNode(WorkerNode *replicaNode)
{
Assert(replicaNode != NULL);
if (!replicaNode->nodeisreplica)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Node %s:%d (ID %d) is not a replica node. "
"Use citus_remove_node() to remove primary or already promoted nodes.",
replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId)));
}
if (replicaNode->isActive)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Replica node %s:%d (ID %d) is marked as active and cannot be removed with this function. "
"This might indicate a promoted replica. Consider using citus_remove_node() if you are sure, "
"or ensure it's properly deactivated if it's an unpromoted replica in an unexpected state.",
replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId)));
}
/*
* All checks passed, proceed with removal.
* RemoveNodeFromCluster handles locking, catalog changes, connection closing, and metadata sync.
*/
ereport(NOTICE, (errmsg("Removing inactive replica node %s:%d (ID %d)",
replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId)));
RemoveNodeFromCluster(replicaNode->workerName, replicaNode->workerPort);
/* RemoveNodeFromCluster might set this, but setting it here ensures it's marked for this UDF's transaction. */
TransactionModifiedNodeMetadata = true;
}
/*
* SetLockTimeoutLocally sets the lock_timeout to the given value.

View File

@ -54,6 +54,7 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
#include "cat_upgrades/add_replica_info_to_pg_dist_node.sql"
#include "udfs/citus_add_replica_node/13.1-1.sql"
#include "udfs/citus_remove_replica_node/13.1-1.sql"
-- Since shard_name/13.1-1.sql first drops the function and then creates it, we first
-- need to drop citus_shards view since that view depends on this function. And immediately

View File

@ -11,3 +11,16 @@ COMMENT ON FUNCTION pg_catalog.citus_add_replica_node(text, integer, text, integ
'Adds a new node as a replica of an existing primary node. The replica is initially inactive. Returns the nodeid of the new replica node.';
REVOKE ALL ON FUNCTION pg_catalog.citus_add_replica_node(text, int, text, int) FROM PUBLIC;
CREATE OR REPLACE FUNCTION pg_catalog.citus_add_replica_node_with_nodeid(
replica_hostname text,
replica_port integer,
primary_nodeid integer)
RETURNS INTEGER
LANGUAGE C VOLATILE STRICT
AS 'MODULE_PATHNAME', $$citus_add_replica_node_with_nodeid$$;
COMMENT ON FUNCTION pg_catalog.citus_add_replica_node_with_nodeid(text, integer, integer) IS
'Adds a new node as a replica of an existing primary node using the primary node''s ID. The replica is initially inactive. Returns the nodeid of the new replica node.';
REVOKE ALL ON FUNCTION pg_catalog.citus_add_replica_node_with_nodeid(text, int, int) FROM PUBLIC;

View File

@ -0,0 +1,24 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_remove_replica_node(
nodename text,
nodeport integer
)
RETURNS VOID
LANGUAGE C VOLATILE STRICT
AS 'MODULE_PATHNAME', $$citus_remove_replica_node$$;
COMMENT ON FUNCTION pg_catalog.citus_remove_replica_node(text, integer)
IS 'Removes an inactive streaming replica node from Citus metadata. Errors if the node is not found, not registered as a replica, or is currently marked active.';
REVOKE ALL ON FUNCTION pg_catalog.citus_remove_replica_node(text, integer) FROM PUBLIC;
CREATE OR REPLACE FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid(
nodeid integer
)
RETURNS VOID
LANGUAGE C VOLATILE STRICT
AS 'MODULE_PATHNAME', $$citus_remove_replica_node_with_nodeid$$;
COMMENT ON FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid(integer)
IS 'Removes an inactive streaming replica node from Citus metadata using its node ID. Errors if the node is not found, not registered as a replica, or is currently marked active.';
REVOKE ALL ON FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid(integer) FROM PUBLIC;