From 932eb8c052c57418ab6a53787f362a4120c74aae Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Fri, 20 Jun 2025 19:28:17 +0500 Subject: [PATCH] Enhance replica management with removal and ID-based UDFs This commit builds upon the initial replica registration feature by introducing several enhancements and new capabilities for managing replica nodes. The following UDFs have been added to provide more flexible and complete replica management: - `citus_add_replica_node_with_nodeid`: Allows adding a replica by referencing the primary node's ID, which is more convenient than using hostname and port. - `citus_remove_replica_node`: Allows for the removal of an inactive replica node by its hostname and port. - `citus_remove_replica_node_with_nodeid`: Allows for the removal of an inactive replica node by its ID. Additionally the newly created replicas are now assigned a new group ID instead of inheriting the primary's group ID. This change more accurately reflects their status as independent non-usable nodes until they are promoted. --- .../distributed/metadata/node_metadata.c | 169 ++++++++++++++++-- .../distributed/sql/citus--13.0-1--13.1-1.sql | 1 + .../udfs/citus_add_replica_node/13.1-1.sql | 13 ++ .../udfs/citus_remove_replica_node/13.1-1.sql | 24 +++ 4 files changed, 194 insertions(+), 13 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_remove_replica_node/13.1-1.sql diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 57d49ac81..fa4de2f1d 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -142,8 +142,10 @@ static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode WorkerNode *workerNode, bool force, int32 lock_cooldown); static void EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode, - char* replicaHostname, - int replicaPort); + char* replicaHostname, int replicaPort); +static int32 CitusAddReplicaNode(WorkerNode *primaryWorkerNode, + char *replicaHostname, int32 replicaPort); +static void RemoveReplicaNode(WorkerNode *replicaNode); /* Function definitions go here */ @@ -174,6 +176,9 @@ PG_FUNCTION_INFO_V1(citus_is_coordinator); PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced); PG_FUNCTION_INFO_V1(citus_is_primary_node); PG_FUNCTION_INFO_V1(citus_add_replica_node); +PG_FUNCTION_INFO_V1(citus_add_replica_node_with_nodeid); +PG_FUNCTION_INFO_V1(citus_remove_replica_node); +PG_FUNCTION_INFO_V1(citus_remove_replica_node_with_nodeid); /* * DefaultNodeMetadata creates a NodeMetadata struct with the fields set to @@ -1429,12 +1434,10 @@ master_update_node(PG_FUNCTION_ARGS) return citus_update_node(fcinfo); } - /* - * citus_add_replica_node function adds a new node as a replica of an existing primary node. - * It records the replica's hostname, port, and links it to the primary node's ID. - * The replica is initially marked as inactive and not having shards. + * citus_add_replica_node adds a new node as a replica of an existing primary node. */ + Datum citus_add_replica_node(PG_FUNCTION_ARGS) { @@ -1459,6 +1462,60 @@ citus_add_replica_node(PG_FUNCTION_ARGS) primaryHostname, primaryPort))); } + int32 replicaNodeId = CitusAddReplicaNode(primaryWorkerNode, replicaHostname, replicaPort); + + PG_RETURN_INT32(replicaNodeId); +} + +/* + * citus_add_replica_node_with_nodeid adds a new node as a replica of an existing primary node + * using the primary node's ID. It records the replica's hostname, port, and links it to the + * primary node's ID. + * + * This function is useful when you already know the primary node's ID and want to add a replica + * without needing to look it up by hostname and port. +*/ +Datum +citus_add_replica_node_with_nodeid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + text *replicaHostnameText = PG_GETARG_TEXT_P(0); + int32 replicaPort = PG_GETARG_INT32(1); + int32 primaryNodeId = PG_GETARG_INT32(2); + + char *replicaHostname = text_to_cstring(replicaHostnameText); + + bool missingOk = false; + WorkerNode *primaryWorkerNode = FindNodeWithNodeId(primaryNodeId, missingOk); + + if (primaryWorkerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("primary node with ID %d does not exist", primaryNodeId))); + + } + + int32 replicaNodeId = CitusAddReplicaNode(primaryWorkerNode, replicaHostname, replicaPort); + + PG_RETURN_INT32(replicaNodeId); + +} + +/* + * CitusAddReplicaNode function adds a new node as a replica of an existing primary node. + * It records the replica's hostname, port, and links it to the primary node's ID. + * The replica is initially marked as inactive and not having shards. + */ +static int32 +CitusAddReplicaNode(WorkerNode *primaryWorkerNode, + char *replicaHostname, int32 replicaPort) +{ + + Assert(primaryWorkerNode != NULL); + /* Future-proofing: Ideally, a primary node should not itself be a replica. * This check might be more relevant once replica promotion logic exists. * For now, pg_dist_node.nodeisreplica defaults to false for existing nodes. @@ -1467,14 +1524,14 @@ citus_add_replica_node(PG_FUNCTION_ARGS) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("primary node %s:%d is itself a replica and cannot have replicas", - primaryHostname, primaryPort))); + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); } if (!primaryWorkerNode->shouldHaveShards) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("primary node %s:%d does not have shards, node without shards cannot have replicas", - primaryHostname, primaryPort))); + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); } WorkerNode *existingReplicaNode = FindWorkerNodeAnyCluster(replicaHostname, replicaPort); @@ -1489,7 +1546,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS) { ereport(NOTICE, (errmsg("node %s:%d is already registered as a replica for primary %s:%d (nodeid %d)", replicaHostname, replicaPort, - primaryHostname, primaryPort, primaryWorkerNode->nodeId))); + primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primaryWorkerNode->nodeId))); PG_RETURN_INT32(existingReplicaNode->nodeId); } else @@ -1507,7 +1564,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS) nodeMetadata.nodeprimarynodeid = primaryWorkerNode->nodeId; nodeMetadata.isActive = false; /* Replicas start as inactive */ nodeMetadata.shouldHaveShards = false; /* Replicas do not directly own primary shards */ - nodeMetadata.groupId = primaryWorkerNode->groupId; /* Same group as primary */ + nodeMetadata.groupId = INVALID_GROUP_ID; /* Replicas get a new group ID and do not belong to any existing group */ nodeMetadata.nodeRole = UnavailableNodeRoleId(); /* The node role is set to 'unavailable' */ nodeMetadata.nodeCluster = primaryWorkerNode->nodeCluster; /* Same cluster as primary */ /* Other fields like hasMetadata, metadataSynced will take defaults from DefaultNodeMetadata @@ -1544,7 +1601,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS) { ereport(NOTICE, (errmsg("node %s:%d was already correctly registered as a replica for primary %s:%d (nodeid %d)", replicaHostname, replicaPort, - primaryHostname, primaryPort, primaryWorkerNode->nodeId))); + primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primaryWorkerNode->nodeId))); /* Intentional fall-through to return replicaNodeId */ } else @@ -1552,7 +1609,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS) /* This state is less expected if our initial check passed or errored. */ ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("node %s:%d already exists but is not correctly configured as a replica for primary %s:%d", - replicaHostname, replicaPort, primaryHostname, primaryPort))); + replicaHostname, replicaPort, primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); } } @@ -1569,9 +1626,95 @@ citus_add_replica_node(PG_FUNCTION_ARGS) * For now, citus_add_node_replica just registers it. */ - PG_RETURN_INT32(replicaNodeId); + return replicaNodeId; } +/* + * citus_remove_replica_node removes an inactive streaming replica node from Citus metadata. + */ +Datum +citus_remove_replica_node(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + text *nodeNameText = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + char *nodeName = text_to_cstring(nodeNameText); + + WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); + + if (workerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("node \"%s:%d\" does not exist", nodeName, nodePort))); + } + + RemoveReplicaNode(workerNode); + + PG_RETURN_VOID(); +} + +/* + * citus_remove_replica_node_with_nodeid removes an inactive streaming replica node from Citus metadata + * using the node's ID. + */ +Datum +citus_remove_replica_node_with_nodeid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + uint32 replicaNodeId = PG_GETARG_INT32(0); + + WorkerNode *replicaNode = FindNodeAnyClusterByNodeId(replicaNodeId); + + if (replicaNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node with ID %d does not exist", replicaNodeId))); + } + RemoveReplicaNode(replicaNode); + + PG_RETURN_VOID(); +} + +static void +RemoveReplicaNode(WorkerNode *replicaNode) +{ + Assert(replicaNode != NULL); + + if (!replicaNode->nodeisreplica) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Node %s:%d (ID %d) is not a replica node. " + "Use citus_remove_node() to remove primary or already promoted nodes.", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + } + + if (replicaNode->isActive) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node %s:%d (ID %d) is marked as active and cannot be removed with this function. " + "This might indicate a promoted replica. Consider using citus_remove_node() if you are sure, " + "or ensure it's properly deactivated if it's an unpromoted replica in an unexpected state.", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + } + + /* + * All checks passed, proceed with removal. + * RemoveNodeFromCluster handles locking, catalog changes, connection closing, and metadata sync. + */ + ereport(NOTICE, (errmsg("Removing inactive replica node %s:%d (ID %d)", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + + RemoveNodeFromCluster(replicaNode->workerName, replicaNode->workerPort); + + /* RemoveNodeFromCluster might set this, but setting it here ensures it's marked for this UDF's transaction. */ + TransactionModifiedNodeMetadata = true; +} /* * SetLockTimeoutLocally sets the lock_timeout to the given value. diff --git a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql index 8e0acec75..76ae1f596 100644 --- a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql +++ b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql @@ -54,6 +54,7 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits; #include "cat_upgrades/add_replica_info_to_pg_dist_node.sql" #include "udfs/citus_add_replica_node/13.1-1.sql" +#include "udfs/citus_remove_replica_node/13.1-1.sql" -- Since shard_name/13.1-1.sql first drops the function and then creates it, we first -- need to drop citus_shards view since that view depends on this function. And immediately diff --git a/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql index f936c6af7..9530e54c2 100644 --- a/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql +++ b/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql @@ -11,3 +11,16 @@ COMMENT ON FUNCTION pg_catalog.citus_add_replica_node(text, integer, text, integ 'Adds a new node as a replica of an existing primary node. The replica is initially inactive. Returns the nodeid of the new replica node.'; REVOKE ALL ON FUNCTION pg_catalog.citus_add_replica_node(text, int, text, int) FROM PUBLIC; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_add_replica_node_with_nodeid( + replica_hostname text, + replica_port integer, + primary_nodeid integer) + RETURNS INTEGER + LANGUAGE C VOLATILE STRICT + AS 'MODULE_PATHNAME', $$citus_add_replica_node_with_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_add_replica_node_with_nodeid(text, integer, integer) IS +'Adds a new node as a replica of an existing primary node using the primary node''s ID. The replica is initially inactive. Returns the nodeid of the new replica node.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_add_replica_node_with_nodeid(text, int, int) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_remove_replica_node/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_remove_replica_node/13.1-1.sql new file mode 100644 index 000000000..68b957681 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_remove_replica_node/13.1-1.sql @@ -0,0 +1,24 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_remove_replica_node( + nodename text, + nodeport integer +) +RETURNS VOID +LANGUAGE C VOLATILE STRICT +AS 'MODULE_PATHNAME', $$citus_remove_replica_node$$; + +COMMENT ON FUNCTION pg_catalog.citus_remove_replica_node(text, integer) +IS 'Removes an inactive streaming replica node from Citus metadata. Errors if the node is not found, not registered as a replica, or is currently marked active.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_remove_replica_node(text, integer) FROM PUBLIC; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid( + nodeid integer +) +RETURNS VOID +LANGUAGE C VOLATILE STRICT +AS 'MODULE_PATHNAME', $$citus_remove_replica_node_with_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid(integer) +IS 'Removes an inactive streaming replica node from Citus metadata using its node ID. Errors if the node is not found, not registered as a replica, or is currently marked active.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid(integer) FROM PUBLIC;