diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 57d49ac81..fa4de2f1d 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -142,8 +142,10 @@ static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode WorkerNode *workerNode, bool force, int32 lock_cooldown); static void EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode, - char* replicaHostname, - int replicaPort); + char* replicaHostname, int replicaPort); +static int32 CitusAddReplicaNode(WorkerNode *primaryWorkerNode, + char *replicaHostname, int32 replicaPort); +static void RemoveReplicaNode(WorkerNode *replicaNode); /* Function definitions go here */ @@ -174,6 +176,9 @@ PG_FUNCTION_INFO_V1(citus_is_coordinator); PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced); PG_FUNCTION_INFO_V1(citus_is_primary_node); PG_FUNCTION_INFO_V1(citus_add_replica_node); +PG_FUNCTION_INFO_V1(citus_add_replica_node_with_nodeid); +PG_FUNCTION_INFO_V1(citus_remove_replica_node); +PG_FUNCTION_INFO_V1(citus_remove_replica_node_with_nodeid); /* * DefaultNodeMetadata creates a NodeMetadata struct with the fields set to @@ -1429,12 +1434,10 @@ master_update_node(PG_FUNCTION_ARGS) return citus_update_node(fcinfo); } - /* - * citus_add_replica_node function adds a new node as a replica of an existing primary node. - * It records the replica's hostname, port, and links it to the primary node's ID. - * The replica is initially marked as inactive and not having shards. + * citus_add_replica_node adds a new node as a replica of an existing primary node. */ + Datum citus_add_replica_node(PG_FUNCTION_ARGS) { @@ -1459,6 +1462,60 @@ citus_add_replica_node(PG_FUNCTION_ARGS) primaryHostname, primaryPort))); } + int32 replicaNodeId = CitusAddReplicaNode(primaryWorkerNode, replicaHostname, replicaPort); + + PG_RETURN_INT32(replicaNodeId); +} + +/* + * citus_add_replica_node_with_nodeid adds a new node as a replica of an existing primary node + * using the primary node's ID. It records the replica's hostname, port, and links it to the + * primary node's ID. + * + * This function is useful when you already know the primary node's ID and want to add a replica + * without needing to look it up by hostname and port. +*/ +Datum +citus_add_replica_node_with_nodeid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + text *replicaHostnameText = PG_GETARG_TEXT_P(0); + int32 replicaPort = PG_GETARG_INT32(1); + int32 primaryNodeId = PG_GETARG_INT32(2); + + char *replicaHostname = text_to_cstring(replicaHostnameText); + + bool missingOk = false; + WorkerNode *primaryWorkerNode = FindNodeWithNodeId(primaryNodeId, missingOk); + + if (primaryWorkerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("primary node with ID %d does not exist", primaryNodeId))); + + } + + int32 replicaNodeId = CitusAddReplicaNode(primaryWorkerNode, replicaHostname, replicaPort); + + PG_RETURN_INT32(replicaNodeId); + +} + +/* + * CitusAddReplicaNode function adds a new node as a replica of an existing primary node. + * It records the replica's hostname, port, and links it to the primary node's ID. + * The replica is initially marked as inactive and not having shards. + */ +static int32 +CitusAddReplicaNode(WorkerNode *primaryWorkerNode, + char *replicaHostname, int32 replicaPort) +{ + + Assert(primaryWorkerNode != NULL); + /* Future-proofing: Ideally, a primary node should not itself be a replica. * This check might be more relevant once replica promotion logic exists. * For now, pg_dist_node.nodeisreplica defaults to false for existing nodes. @@ -1467,14 +1524,14 @@ citus_add_replica_node(PG_FUNCTION_ARGS) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("primary node %s:%d is itself a replica and cannot have replicas", - primaryHostname, primaryPort))); + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); } if (!primaryWorkerNode->shouldHaveShards) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("primary node %s:%d does not have shards, node without shards cannot have replicas", - primaryHostname, primaryPort))); + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); } WorkerNode *existingReplicaNode = FindWorkerNodeAnyCluster(replicaHostname, replicaPort); @@ -1489,7 +1546,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS) { ereport(NOTICE, (errmsg("node %s:%d is already registered as a replica for primary %s:%d (nodeid %d)", replicaHostname, replicaPort, - primaryHostname, primaryPort, primaryWorkerNode->nodeId))); + primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primaryWorkerNode->nodeId))); PG_RETURN_INT32(existingReplicaNode->nodeId); } else @@ -1507,7 +1564,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS) nodeMetadata.nodeprimarynodeid = primaryWorkerNode->nodeId; nodeMetadata.isActive = false; /* Replicas start as inactive */ nodeMetadata.shouldHaveShards = false; /* Replicas do not directly own primary shards */ - nodeMetadata.groupId = primaryWorkerNode->groupId; /* Same group as primary */ + nodeMetadata.groupId = INVALID_GROUP_ID; /* Replicas get a new group ID and do not belong to any existing group */ nodeMetadata.nodeRole = UnavailableNodeRoleId(); /* The node role is set to 'unavailable' */ nodeMetadata.nodeCluster = primaryWorkerNode->nodeCluster; /* Same cluster as primary */ /* Other fields like hasMetadata, metadataSynced will take defaults from DefaultNodeMetadata @@ -1544,7 +1601,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS) { ereport(NOTICE, (errmsg("node %s:%d was already correctly registered as a replica for primary %s:%d (nodeid %d)", replicaHostname, replicaPort, - primaryHostname, primaryPort, primaryWorkerNode->nodeId))); + primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primaryWorkerNode->nodeId))); /* Intentional fall-through to return replicaNodeId */ } else @@ -1552,7 +1609,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS) /* This state is less expected if our initial check passed or errored. */ ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("node %s:%d already exists but is not correctly configured as a replica for primary %s:%d", - replicaHostname, replicaPort, primaryHostname, primaryPort))); + replicaHostname, replicaPort, primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); } } @@ -1569,9 +1626,95 @@ citus_add_replica_node(PG_FUNCTION_ARGS) * For now, citus_add_node_replica just registers it. */ - PG_RETURN_INT32(replicaNodeId); + return replicaNodeId; } +/* + * citus_remove_replica_node removes an inactive streaming replica node from Citus metadata. + */ +Datum +citus_remove_replica_node(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + text *nodeNameText = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + char *nodeName = text_to_cstring(nodeNameText); + + WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); + + if (workerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("node \"%s:%d\" does not exist", nodeName, nodePort))); + } + + RemoveReplicaNode(workerNode); + + PG_RETURN_VOID(); +} + +/* + * citus_remove_replica_node_with_nodeid removes an inactive streaming replica node from Citus metadata + * using the node's ID. + */ +Datum +citus_remove_replica_node_with_nodeid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + uint32 replicaNodeId = PG_GETARG_INT32(0); + + WorkerNode *replicaNode = FindNodeAnyClusterByNodeId(replicaNodeId); + + if (replicaNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node with ID %d does not exist", replicaNodeId))); + } + RemoveReplicaNode(replicaNode); + + PG_RETURN_VOID(); +} + +static void +RemoveReplicaNode(WorkerNode *replicaNode) +{ + Assert(replicaNode != NULL); + + if (!replicaNode->nodeisreplica) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Node %s:%d (ID %d) is not a replica node. " + "Use citus_remove_node() to remove primary or already promoted nodes.", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + } + + if (replicaNode->isActive) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node %s:%d (ID %d) is marked as active and cannot be removed with this function. " + "This might indicate a promoted replica. Consider using citus_remove_node() if you are sure, " + "or ensure it's properly deactivated if it's an unpromoted replica in an unexpected state.", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + } + + /* + * All checks passed, proceed with removal. + * RemoveNodeFromCluster handles locking, catalog changes, connection closing, and metadata sync. + */ + ereport(NOTICE, (errmsg("Removing inactive replica node %s:%d (ID %d)", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + + RemoveNodeFromCluster(replicaNode->workerName, replicaNode->workerPort); + + /* RemoveNodeFromCluster might set this, but setting it here ensures it's marked for this UDF's transaction. */ + TransactionModifiedNodeMetadata = true; +} /* * SetLockTimeoutLocally sets the lock_timeout to the given value. diff --git a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql index 8e0acec75..76ae1f596 100644 --- a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql +++ b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql @@ -54,6 +54,7 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits; #include "cat_upgrades/add_replica_info_to_pg_dist_node.sql" #include "udfs/citus_add_replica_node/13.1-1.sql" +#include "udfs/citus_remove_replica_node/13.1-1.sql" -- Since shard_name/13.1-1.sql first drops the function and then creates it, we first -- need to drop citus_shards view since that view depends on this function. And immediately diff --git a/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql index f936c6af7..9530e54c2 100644 --- a/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql +++ b/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql @@ -11,3 +11,16 @@ COMMENT ON FUNCTION pg_catalog.citus_add_replica_node(text, integer, text, integ 'Adds a new node as a replica of an existing primary node. The replica is initially inactive. Returns the nodeid of the new replica node.'; REVOKE ALL ON FUNCTION pg_catalog.citus_add_replica_node(text, int, text, int) FROM PUBLIC; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_add_replica_node_with_nodeid( + replica_hostname text, + replica_port integer, + primary_nodeid integer) + RETURNS INTEGER + LANGUAGE C VOLATILE STRICT + AS 'MODULE_PATHNAME', $$citus_add_replica_node_with_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_add_replica_node_with_nodeid(text, integer, integer) IS +'Adds a new node as a replica of an existing primary node using the primary node''s ID. The replica is initially inactive. Returns the nodeid of the new replica node.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_add_replica_node_with_nodeid(text, int, int) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_remove_replica_node/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_remove_replica_node/13.1-1.sql new file mode 100644 index 000000000..68b957681 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_remove_replica_node/13.1-1.sql @@ -0,0 +1,24 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_remove_replica_node( + nodename text, + nodeport integer +) +RETURNS VOID +LANGUAGE C VOLATILE STRICT +AS 'MODULE_PATHNAME', $$citus_remove_replica_node$$; + +COMMENT ON FUNCTION pg_catalog.citus_remove_replica_node(text, integer) +IS 'Removes an inactive streaming replica node from Citus metadata. Errors if the node is not found, not registered as a replica, or is currently marked active.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_remove_replica_node(text, integer) FROM PUBLIC; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid( + nodeid integer +) +RETURNS VOID +LANGUAGE C VOLATILE STRICT +AS 'MODULE_PATHNAME', $$citus_remove_replica_node_with_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid(integer) +IS 'Removes an inactive streaming replica node from Citus metadata using its node ID. Errors if the node is not found, not registered as a replica, or is currently marked active.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid(integer) FROM PUBLIC;