From d457ea49bf2bd61fb0b7414d147013e9de5e1946 Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Wed, 18 Jun 2025 00:31:28 +0500 Subject: [PATCH 1/3] Enable registration of worker node replicas in Citus MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Core changes: *Extend pg_dist_node schema: Add nodeisreplica (BOOLEAN) and nodeprimarynodeid (INT4) columns to mark replicas and reference their primary nodes. *Introduce UDF citus_add_replica_node: citus_add_replica_node( replica_hostname TEXT, replica_port INT, primary_hostname TEXT, primary_port INT) RETURNS INT Registers a user‐provisioned PostgreSQL streaming replica as an inactive Citus node linked to its primary. This lays the foundation for the upcoming snapshot-based node split feature. --- .../distributed/metadata/metadata_cache.c | 12 + .../distributed/metadata/metadata_sync.c | 15 +- .../distributed/metadata/node_metadata.c | 351 +++++++++++++++++- .../add_replica_info_to_pg_dist_node.sql | 7 + .../distributed/sql/citus--13.0-1--13.1-1.sql | 3 + .../udfs/citus_add_replica_node/13.1-1.sql | 13 + src/include/distributed/metadata_cache.h | 1 + src/include/distributed/pg_dist_node.h | 4 +- src/include/distributed/worker_manager.h | 3 + 9 files changed, 399 insertions(+), 10 deletions(-) create mode 100644 src/backend/distributed/sql/cat_upgrades/add_replica_info_to_pg_dist_node.sql create mode 100644 src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 79cc61092..b9f274495 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -3549,6 +3549,18 @@ SecondaryNodeRoleId(void) return MetadataCache.secondaryNodeRoleId; } +/* return the Oid of the 'unavailable' nodeRole enum value */ +Oid +UnavailableNodeRoleId(void) +{ + if (!MetadataCache.secondaryNodeRoleId) + { + MetadataCache.secondaryNodeRoleId = LookupStringEnumValueId("noderole", + "unavailable"); + } + + return MetadataCache.secondaryNodeRoleId; +} Oid CitusJobStatusScheduledId(void) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index f73856169..215516c5a 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -814,7 +814,7 @@ NodeListInsertCommand(List *workerNodeList) appendStringInfo(nodeListInsertCommand, "INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, " "noderack, hasmetadata, metadatasynced, isactive, noderole, " - "nodecluster, shouldhaveshards) VALUES "); + "nodecluster, shouldhaveshards, nodeisreplica, nodeprimarynodeid) VALUES "); /* iterate over the worker nodes, add the values */ WorkerNode *workerNode = NULL; @@ -824,13 +824,14 @@ NodeListInsertCommand(List *workerNodeList) char *metadataSyncedString = workerNode->metadataSynced ? "TRUE" : "FALSE"; char *isActiveString = workerNode->isActive ? "TRUE" : "FALSE"; char *shouldHaveShards = workerNode->shouldHaveShards ? "TRUE" : "FALSE"; + char *nodeisreplicaString = workerNode->nodeisreplica ? "TRUE" : "FALSE"; Datum nodeRoleOidDatum = ObjectIdGetDatum(workerNode->nodeRole); Datum nodeRoleStringDatum = DirectFunctionCall1(enum_out, nodeRoleOidDatum); char *nodeRoleString = DatumGetCString(nodeRoleStringDatum); appendStringInfo(nodeListInsertCommand, - "(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s, %s)", + "(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s, %s, %s, %d)", workerNode->nodeId, workerNode->groupId, quote_literal_cstr(workerNode->workerName), @@ -841,7 +842,9 @@ NodeListInsertCommand(List *workerNodeList) isActiveString, nodeRoleString, quote_literal_cstr(workerNode->nodeCluster), - shouldHaveShards); + shouldHaveShards, + nodeisreplicaString, + workerNode->nodeprimarynodeid); processedWorkerNodeCount++; if (processedWorkerNodeCount != workerCount) @@ -875,9 +878,11 @@ NodeListIdempotentInsertCommand(List *workerNodeList) "hasmetadata = EXCLUDED.hasmetadata, " "isactive = EXCLUDED.isactive, " "noderole = EXCLUDED.noderole, " - "nodecluster = EXCLUDED.nodecluster ," + "nodecluster = EXCLUDED.nodecluster, " "metadatasynced = EXCLUDED.metadatasynced, " - "shouldhaveshards = EXCLUDED.shouldhaveshards"; + "shouldhaveshards = EXCLUDED.shouldhaveshards, " + "nodeisreplica = EXCLUDED.nodeisreplica, " + "nodeprimarynodeid = EXCLUDED.nodeprimarynodeid"; appendStringInfoString(nodeInsertIdempotentCommand, onConflictStr); return nodeInsertIdempotentCommand->data; } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index c2a2abd60..57d49ac81 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -84,6 +84,8 @@ typedef struct NodeMetadata bool isActive; Oid nodeRole; bool shouldHaveShards; + uint32 nodeprimarynodeid; + bool nodeisreplica; char *nodeCluster; } NodeMetadata; @@ -120,7 +122,6 @@ static char * NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *field); static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); -static WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId); static void ErrorIfAnyNodeNotExist(List *nodeList); static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context); static void SendDeletionCommandsForReplicatedTablePlacements( @@ -140,6 +141,10 @@ static BackgroundWorkerHandle * CheckBackgroundWorkerToObtainLocks(int32 lock_co static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode( WorkerNode *workerNode, bool force, int32 lock_cooldown); +static void EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode, + char* replicaHostname, + int replicaPort); + /* Function definitions go here */ /* declarations for dynamic loading */ @@ -168,6 +173,7 @@ PG_FUNCTION_INFO_V1(citus_coordinator_nodeid); PG_FUNCTION_INFO_V1(citus_is_coordinator); PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced); PG_FUNCTION_INFO_V1(citus_is_primary_node); +PG_FUNCTION_INFO_V1(citus_add_replica_node); /* * DefaultNodeMetadata creates a NodeMetadata struct with the fields set to @@ -183,6 +189,8 @@ DefaultNodeMetadata() nodeMetadata.nodeRack = WORKER_DEFAULT_RACK; nodeMetadata.shouldHaveShards = true; nodeMetadata.groupId = INVALID_GROUP_ID; + nodeMetadata.nodeisreplica = false; + nodeMetadata.nodeprimarynodeid = 0; /* 0 typically means InvalidNodeId */ return nodeMetadata; } @@ -1422,6 +1430,149 @@ master_update_node(PG_FUNCTION_ARGS) } +/* + * citus_add_replica_node function adds a new node as a replica of an existing primary node. + * It records the replica's hostname, port, and links it to the primary node's ID. + * The replica is initially marked as inactive and not having shards. + */ +Datum +citus_add_replica_node(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + text *replicaHostnameText = PG_GETARG_TEXT_P(0); + int32 replicaPort = PG_GETARG_INT32(1); + text *primaryHostnameText = PG_GETARG_TEXT_P(2); + int32 primaryPort = PG_GETARG_INT32(3); + + char *replicaHostname = text_to_cstring(replicaHostnameText); + char *primaryHostname = text_to_cstring(primaryHostnameText); + + WorkerNode *primaryWorkerNode = FindWorkerNodeAnyCluster(primaryHostname, primaryPort); + + if (primaryWorkerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("primary node %s:%d not found in pg_dist_node", + primaryHostname, primaryPort))); + } + + /* Future-proofing: Ideally, a primary node should not itself be a replica. + * This check might be more relevant once replica promotion logic exists. + * For now, pg_dist_node.nodeisreplica defaults to false for existing nodes. + */ + if (primaryWorkerNode->nodeisreplica) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("primary node %s:%d is itself a replica and cannot have replicas", + primaryHostname, primaryPort))); + } + + if (!primaryWorkerNode->shouldHaveShards) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("primary node %s:%d does not have shards, node without shards cannot have replicas", + primaryHostname, primaryPort))); + } + + WorkerNode *existingReplicaNode = FindWorkerNodeAnyCluster(replicaHostname, replicaPort); + if (existingReplicaNode != NULL) + { + /* + * Idempotency check: If the node already exists, is it already correctly + * registered as a replica for THIS primary? + */ + if (existingReplicaNode->nodeisreplica && + existingReplicaNode->nodeprimarynodeid == primaryWorkerNode->nodeId) + { + ereport(NOTICE, (errmsg("node %s:%d is already registered as a replica for primary %s:%d (nodeid %d)", + replicaHostname, replicaPort, + primaryHostname, primaryPort, primaryWorkerNode->nodeId))); + PG_RETURN_INT32(existingReplicaNode->nodeId); + } + else + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("a different node %s:%d (nodeid %d) already exists or is a replica for a different primary", + replicaHostname, replicaPort, existingReplicaNode->nodeId))); + } + } + EnsureValidStreamingReplica(primaryWorkerNode, replicaHostname, replicaPort); + + NodeMetadata nodeMetadata = DefaultNodeMetadata(); + + nodeMetadata.nodeisreplica = true; + nodeMetadata.nodeprimarynodeid = primaryWorkerNode->nodeId; + nodeMetadata.isActive = false; /* Replicas start as inactive */ + nodeMetadata.shouldHaveShards = false; /* Replicas do not directly own primary shards */ + nodeMetadata.groupId = primaryWorkerNode->groupId; /* Same group as primary */ + nodeMetadata.nodeRole = UnavailableNodeRoleId(); /* The node role is set to 'unavailable' */ + nodeMetadata.nodeCluster = primaryWorkerNode->nodeCluster; /* Same cluster as primary */ + /* Other fields like hasMetadata, metadataSynced will take defaults from DefaultNodeMetadata + * (typically true, true for hasMetadata and metadataSynced if it's a new node, + * or might need adjustment based on replica strategy) + * For now, let's assume DefaultNodeMetadata provides suitable defaults for these + * or they will be set by AddNodeMetadata/ActivateNodeList if needed. + * Specifically, hasMetadata is often true, and metadataSynced true after activation. + * Since this replica is inactive, metadata sync status might be less critical initially. + */ + + bool nodeAlreadyExists = false; + bool localOnly = false; /* Propagate change to other workers with metadata */ + + /* + * AddNodeMetadata will take an ExclusiveLock on pg_dist_node. + * It also checks again if the node already exists after acquiring the lock. + */ + int replicaNodeId = AddNodeMetadata(replicaHostname, replicaPort, &nodeMetadata, + &nodeAlreadyExists, localOnly); + + if (nodeAlreadyExists) + { + /* This case should ideally be caught by the FindWorkerNodeAnyCluster check above, + * but AddNodeMetadata does its own check after locking. + * If it already exists and is correctly configured, we might have returned NOTICE above. + * If it exists but is NOT correctly configured as our replica, an ERROR would be more appropriate. + * AddNodeMetadata returns the existing node's ID if it finds one. + * We need to ensure it is the *correct* replica. + */ + WorkerNode *fetchedExistingNode = FindNodeAnyClusterByNodeId(replicaNodeId); + if (fetchedExistingNode != NULL && fetchedExistingNode->nodeisreplica && + fetchedExistingNode->nodeprimarynodeid == primaryWorkerNode->nodeId) + { + ereport(NOTICE, (errmsg("node %s:%d was already correctly registered as a replica for primary %s:%d (nodeid %d)", + replicaHostname, replicaPort, + primaryHostname, primaryPort, primaryWorkerNode->nodeId))); + /* Intentional fall-through to return replicaNodeId */ + } + else + { + /* This state is less expected if our initial check passed or errored. */ + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("node %s:%d already exists but is not correctly configured as a replica for primary %s:%d", + replicaHostname, replicaPort, primaryHostname, primaryPort))); + } + } + + TransactionModifiedNodeMetadata = true; + + /* + * Note: Replicas added this way are inactive. + * A separate mechanism or UDF (e.g., citus_activate_replica_node or citus_promote_replica_node) + * would be needed to activate them, potentially after verifying replication lag, etc. + * Activation would typically involve: + * 1. Setting isActive = true. + * 2. Ensuring metadata is synced (hasMetadata=true, metadataSynced=true). + * 3. Potentially other logic like adding to specific scheduler lists. + * For now, citus_add_node_replica just registers it. + */ + + PG_RETURN_INT32(replicaNodeId); +} + + /* * SetLockTimeoutLocally sets the lock_timeout to the given value. * This setting is local. @@ -1871,7 +2022,7 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort) * FindNodeAnyClusterByNodeId searches pg_dist_node and returns the node with * the nodeId. If the node can't be found returns NULL. */ -static WorkerNode * +WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId) { bool includeNodesFromOtherClusters = true; @@ -2924,6 +3075,10 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum; values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum( nodeMetadata->shouldHaveShards); + values[Anum_pg_dist_node_nodeisreplica - 1] = BoolGetDatum( + nodeMetadata->nodeisreplica); + values[Anum_pg_dist_node_nodeprimarynodeid - 1] = Int32GetDatum( + nodeMetadata->nodeprimarynodeid); Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock); @@ -3014,8 +3169,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) /* * This function can be called before "ALTER TABLE ... ADD COLUMN nodecluster ...", - * therefore heap_deform_tuple() won't set the isNullArray for this column. We - * initialize it true to be safe in that case. + * and other columns. We initialize isNullArray to true to be safe. */ memset(isNullArray, true, sizeof(isNullArray)); @@ -3043,6 +3197,25 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) datumArray[Anum_pg_dist_node_shouldhaveshards - 1]); + /* nodeisreplica and nodeprimarynodeid might be null if row is from older version */ + if (!isNullArray[Anum_pg_dist_node_nodeisreplica - 1]) + { + workerNode->nodeisreplica = DatumGetBool(datumArray[Anum_pg_dist_node_nodeisreplica - 1]); + } + else + { + workerNode->nodeisreplica = false; /* Default value */ + } + + if (!isNullArray[Anum_pg_dist_node_nodeprimarynodeid - 1]) + { + workerNode->nodeprimarynodeid = DatumGetInt32(datumArray[Anum_pg_dist_node_nodeprimarynodeid - 1]); + } + else + { + workerNode->nodeprimarynodeid = 0; /* Default value for InvalidNodeId */ + } + /* * nodecluster column can be missing. In the case of extension creation/upgrade, * master_initialize_node_metadata function is called before the nodecluster @@ -3285,3 +3458,173 @@ SyncNodeMetadata(MetadataSyncContext *context) */ SendOrCollectCommandListToActivatedNodes(context, recreateNodeSnapshotCommandList); } + +/* + * EnsureValidStreamingReplica checks if the given replica is a valid streaming + * replica. It connects to the replica and checks if it is in recovery mode. + * If it is not, it errors out. + * It also checks the system identifier of the replica and the primary + * to ensure they match. + */ +static void +EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode, char* replicaHostname, int replicaPort) +{ + + int connectionFlag = FORCE_NEW_CONNECTION; + MultiConnection *replicaConnection = GetNodeConnection(connectionFlag, replicaHostname, + replicaPort); + + const char *replica_recovery_query = "SELECT pg_is_in_recovery()"; + + int resultCode = SendRemoteCommand(replicaConnection, replica_recovery_query); + + if (resultCode == 0) + { + CloseConnection(replicaConnection); + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to %s:%d to execute pg_is_in_recovery()", + replicaHostname, replicaPort))); + + } + + PGresult *result = GetRemoteCommandResult(replicaConnection, true); + + if (result == NULL) + { + CloseConnection(replicaConnection); + + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("node %s:%d does not support pg_is_in_recovery()", + replicaHostname, replicaPort))); + } + + List *sizeList = ReadFirstColumnAsText(result); + if (list_length(sizeList) != 1) + { + PQclear(result); + ClearResults(replicaConnection, true); + CloseConnection(replicaConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot parse pg_is_in_recovery() result from %s:%d", + replicaHostname, + replicaPort))); + + } + + StringInfo isInRecoveryQueryResInfo = (StringInfo) linitial(sizeList); + char *isInRecoveryQueryResStr = isInRecoveryQueryResInfo->data; + + if (strcmp(isInRecoveryQueryResStr, "t") != 0 && strcmp(isInRecoveryQueryResStr, "true") != 0) + { + PQclear(result); + ClearResults(replicaConnection, true); + CloseConnection(replicaConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("%s:%d is not a streaming replica", + replicaHostname, + replicaPort))); + } + + PQclear(result); + ForgetResults(replicaConnection); + + /* Step2: Get the system identifier from replica */ + const char *sysidQuery = + "SELECT system_identifier FROM pg_control_system()"; + + resultCode = SendRemoteCommand(replicaConnection, sysidQuery); + + if (resultCode == 0) + { + CloseConnection(replicaConnection); + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to %s:%d to get system identifier", + replicaHostname, replicaPort))); + + } + + result = GetRemoteCommandResult(replicaConnection, true); + if (!IsResponseOK(result)) + { + ReportResultError(replicaConnection, result, ERROR); + } + + List *sysidList = ReadFirstColumnAsText(result); + if (list_length(sysidList) != 1) + { + PQclear(result); + ClearResults(replicaConnection, true); + CloseConnection(replicaConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot parse get system identifier result from %s:%d", + replicaHostname, + replicaPort))); + } + + StringInfo sysidQueryResInfo = (StringInfo) linitial(sysidList); + char *sysidQueryResStr = sysidQueryResInfo->data; + + ereport (NOTICE, (errmsg("system identifier of %s:%d is %s", + replicaHostname, replicaPort, sysidQueryResStr))); + + /* We do not need the connection anymore */ + PQclear(result); + ForgetResults(replicaConnection); + CloseConnection(replicaConnection); + + /* Step3: Get system identifier from primary */ + int primaryConnectionFlag = 0; + MultiConnection *primaryConnection = GetNodeConnection(primaryConnectionFlag, + primaryWorkerNode->workerName, + primaryWorkerNode->workerPort); + int primaryResultCode = SendRemoteCommand(primaryConnection, sysidQuery); + if (primaryResultCode == 0) + { + CloseConnection(primaryConnection); + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to %s:%d to get system identifier", + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); + + } + PGresult *primaryResult = GetRemoteCommandResult(primaryConnection, true); + if (primaryResult == NULL) + { + CloseConnection(primaryConnection); + + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("node %s:%d does not support pg_control_system queries", + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); + } + List *primarySizeList = ReadFirstColumnAsText(primaryResult); + if (list_length(primarySizeList) != 1) + { + PQclear(primaryResult); + ClearResults(primaryConnection, true); + CloseConnection(primaryConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot parse get system identifier result from %s:%d", + primaryWorkerNode->workerName, + primaryWorkerNode->workerPort))); + + } + StringInfo primarySysidQueryResInfo = (StringInfo) linitial(primarySizeList); + char *primarySysidQueryResStr = primarySysidQueryResInfo->data; + + ereport (NOTICE, (errmsg("system identifier of %s:%d is %s", + primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primarySysidQueryResStr))); + /* verify both identifiers */ + if (strcmp(sysidQueryResStr, primarySysidQueryResStr) != 0) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("system identifiers do not match: %s (replica) vs %s (primary)", + sysidQueryResStr, primarySysidQueryResStr))); + } + PQclear(primaryResult); + ClearResults(primaryConnection, true); + CloseConnection(primaryConnection); +} + diff --git a/src/backend/distributed/sql/cat_upgrades/add_replica_info_to_pg_dist_node.sql b/src/backend/distributed/sql/cat_upgrades/add_replica_info_to_pg_dist_node.sql new file mode 100644 index 000000000..8ca8ae239 --- /dev/null +++ b/src/backend/distributed/sql/cat_upgrades/add_replica_info_to_pg_dist_node.sql @@ -0,0 +1,7 @@ +-- Add replica information columns to pg_dist_node +ALTER TABLE pg_catalog.pg_dist_node ADD COLUMN nodeisreplica BOOLEAN NOT NULL DEFAULT FALSE; +ALTER TABLE pg_catalog.pg_dist_node ADD COLUMN nodeprimarynodeid INT4 NOT NULL DEFAULT 0; + +-- Add a comment to the table and columns for clarity in \d output +COMMENT ON COLUMN pg_catalog.pg_dist_node.nodeisreplica IS 'Indicates if this node is a replica of another node.'; +COMMENT ON COLUMN pg_catalog.pg_dist_node.nodeprimarynodeid IS 'If nodeisreplica is true, this stores the nodeid of its primary node.'; diff --git a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql index 1599b21f1..8e0acec75 100644 --- a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql +++ b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql @@ -52,6 +52,9 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits; #include "udfs/citus_stat_counters_reset/13.1-1.sql" #include "udfs/citus_nodes/13.1-1.sql" +#include "cat_upgrades/add_replica_info_to_pg_dist_node.sql" +#include "udfs/citus_add_replica_node/13.1-1.sql" + -- Since shard_name/13.1-1.sql first drops the function and then creates it, we first -- need to drop citus_shards view since that view depends on this function. And immediately -- after creating the function, we recreate citus_shards view again. diff --git a/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql new file mode 100644 index 000000000..f936c6af7 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql @@ -0,0 +1,13 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_add_replica_node( + replica_hostname text, + replica_port integer, + primary_hostname text, + primary_port integer) + RETURNS INTEGER + LANGUAGE C VOLATILE STRICT + AS 'MODULE_PATHNAME', $$citus_add_replica_node$$; + +COMMENT ON FUNCTION pg_catalog.citus_add_replica_node(text, integer, text, integer) IS +'Adds a new node as a replica of an existing primary node. The replica is initially inactive. Returns the nodeid of the new replica node.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_add_replica_node(text, int, text, int) FROM PUBLIC; diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index f1120497b..479a0af8b 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -298,6 +298,7 @@ extern Oid CitusDependentObjectFuncId(void); /* enum oids */ extern Oid PrimaryNodeRoleId(void); extern Oid SecondaryNodeRoleId(void); +extern Oid UnavailableNodeRoleId(void); extern Oid CitusCopyFormatTypeId(void); extern Oid TextCopyFormatId(void); extern Oid BinaryCopyFormatId(void); diff --git a/src/include/distributed/pg_dist_node.h b/src/include/distributed/pg_dist_node.h index 371c2a35a..de23a4255 100644 --- a/src/include/distributed/pg_dist_node.h +++ b/src/include/distributed/pg_dist_node.h @@ -20,7 +20,7 @@ * in particular their OUT parameters) must be changed whenever the definition of * pg_dist_node changes. */ -#define Natts_pg_dist_node 11 +#define Natts_pg_dist_node 13 #define Anum_pg_dist_node_nodeid 1 #define Anum_pg_dist_node_groupid 2 #define Anum_pg_dist_node_nodename 3 @@ -32,6 +32,8 @@ #define Anum_pg_dist_node_nodecluster 9 #define Anum_pg_dist_node_metadatasynced 10 #define Anum_pg_dist_node_shouldhaveshards 11 +#define Anum_pg_dist_node_nodeisreplica 12 +#define Anum_pg_dist_node_nodeprimarynodeid 13 #define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq" #define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq" diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 02a43fe0b..93cc6b290 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -54,6 +54,8 @@ typedef struct WorkerNode char nodeCluster[NAMEDATALEN]; /* the cluster the node is a part of */ bool metadataSynced; /* node has the most recent metadata */ bool shouldHaveShards; /* if the node should have distributed table shards on it or not */ + bool nodeisreplica; /* whether this node is a replica */ + int32 nodeprimarynodeid; /* nodeid of the primary for this replica */ } WorkerNode; @@ -84,6 +86,7 @@ extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort); extern WorkerNode * FindNodeWithNodeId(int nodeId, bool missingOk); +extern WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId); extern WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); extern List * ReadDistNode(bool includeNodesFromOtherClusters); extern void EnsureCoordinator(void); From 932eb8c052c57418ab6a53787f362a4120c74aae Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Fri, 20 Jun 2025 19:28:17 +0500 Subject: [PATCH 2/3] Enhance replica management with removal and ID-based UDFs This commit builds upon the initial replica registration feature by introducing several enhancements and new capabilities for managing replica nodes. The following UDFs have been added to provide more flexible and complete replica management: - `citus_add_replica_node_with_nodeid`: Allows adding a replica by referencing the primary node's ID, which is more convenient than using hostname and port. - `citus_remove_replica_node`: Allows for the removal of an inactive replica node by its hostname and port. - `citus_remove_replica_node_with_nodeid`: Allows for the removal of an inactive replica node by its ID. Additionally the newly created replicas are now assigned a new group ID instead of inheriting the primary's group ID. This change more accurately reflects their status as independent non-usable nodes until they are promoted. --- .../distributed/metadata/node_metadata.c | 169 ++++++++++++++++-- .../distributed/sql/citus--13.0-1--13.1-1.sql | 1 + .../udfs/citus_add_replica_node/13.1-1.sql | 13 ++ .../udfs/citus_remove_replica_node/13.1-1.sql | 24 +++ 4 files changed, 194 insertions(+), 13 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_remove_replica_node/13.1-1.sql diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 57d49ac81..fa4de2f1d 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -142,8 +142,10 @@ static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode WorkerNode *workerNode, bool force, int32 lock_cooldown); static void EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode, - char* replicaHostname, - int replicaPort); + char* replicaHostname, int replicaPort); +static int32 CitusAddReplicaNode(WorkerNode *primaryWorkerNode, + char *replicaHostname, int32 replicaPort); +static void RemoveReplicaNode(WorkerNode *replicaNode); /* Function definitions go here */ @@ -174,6 +176,9 @@ PG_FUNCTION_INFO_V1(citus_is_coordinator); PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced); PG_FUNCTION_INFO_V1(citus_is_primary_node); PG_FUNCTION_INFO_V1(citus_add_replica_node); +PG_FUNCTION_INFO_V1(citus_add_replica_node_with_nodeid); +PG_FUNCTION_INFO_V1(citus_remove_replica_node); +PG_FUNCTION_INFO_V1(citus_remove_replica_node_with_nodeid); /* * DefaultNodeMetadata creates a NodeMetadata struct with the fields set to @@ -1429,12 +1434,10 @@ master_update_node(PG_FUNCTION_ARGS) return citus_update_node(fcinfo); } - /* - * citus_add_replica_node function adds a new node as a replica of an existing primary node. - * It records the replica's hostname, port, and links it to the primary node's ID. - * The replica is initially marked as inactive and not having shards. + * citus_add_replica_node adds a new node as a replica of an existing primary node. */ + Datum citus_add_replica_node(PG_FUNCTION_ARGS) { @@ -1459,6 +1462,60 @@ citus_add_replica_node(PG_FUNCTION_ARGS) primaryHostname, primaryPort))); } + int32 replicaNodeId = CitusAddReplicaNode(primaryWorkerNode, replicaHostname, replicaPort); + + PG_RETURN_INT32(replicaNodeId); +} + +/* + * citus_add_replica_node_with_nodeid adds a new node as a replica of an existing primary node + * using the primary node's ID. It records the replica's hostname, port, and links it to the + * primary node's ID. + * + * This function is useful when you already know the primary node's ID and want to add a replica + * without needing to look it up by hostname and port. +*/ +Datum +citus_add_replica_node_with_nodeid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + text *replicaHostnameText = PG_GETARG_TEXT_P(0); + int32 replicaPort = PG_GETARG_INT32(1); + int32 primaryNodeId = PG_GETARG_INT32(2); + + char *replicaHostname = text_to_cstring(replicaHostnameText); + + bool missingOk = false; + WorkerNode *primaryWorkerNode = FindNodeWithNodeId(primaryNodeId, missingOk); + + if (primaryWorkerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("primary node with ID %d does not exist", primaryNodeId))); + + } + + int32 replicaNodeId = CitusAddReplicaNode(primaryWorkerNode, replicaHostname, replicaPort); + + PG_RETURN_INT32(replicaNodeId); + +} + +/* + * CitusAddReplicaNode function adds a new node as a replica of an existing primary node. + * It records the replica's hostname, port, and links it to the primary node's ID. + * The replica is initially marked as inactive and not having shards. + */ +static int32 +CitusAddReplicaNode(WorkerNode *primaryWorkerNode, + char *replicaHostname, int32 replicaPort) +{ + + Assert(primaryWorkerNode != NULL); + /* Future-proofing: Ideally, a primary node should not itself be a replica. * This check might be more relevant once replica promotion logic exists. * For now, pg_dist_node.nodeisreplica defaults to false for existing nodes. @@ -1467,14 +1524,14 @@ citus_add_replica_node(PG_FUNCTION_ARGS) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("primary node %s:%d is itself a replica and cannot have replicas", - primaryHostname, primaryPort))); + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); } if (!primaryWorkerNode->shouldHaveShards) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("primary node %s:%d does not have shards, node without shards cannot have replicas", - primaryHostname, primaryPort))); + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); } WorkerNode *existingReplicaNode = FindWorkerNodeAnyCluster(replicaHostname, replicaPort); @@ -1489,7 +1546,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS) { ereport(NOTICE, (errmsg("node %s:%d is already registered as a replica for primary %s:%d (nodeid %d)", replicaHostname, replicaPort, - primaryHostname, primaryPort, primaryWorkerNode->nodeId))); + primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primaryWorkerNode->nodeId))); PG_RETURN_INT32(existingReplicaNode->nodeId); } else @@ -1507,7 +1564,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS) nodeMetadata.nodeprimarynodeid = primaryWorkerNode->nodeId; nodeMetadata.isActive = false; /* Replicas start as inactive */ nodeMetadata.shouldHaveShards = false; /* Replicas do not directly own primary shards */ - nodeMetadata.groupId = primaryWorkerNode->groupId; /* Same group as primary */ + nodeMetadata.groupId = INVALID_GROUP_ID; /* Replicas get a new group ID and do not belong to any existing group */ nodeMetadata.nodeRole = UnavailableNodeRoleId(); /* The node role is set to 'unavailable' */ nodeMetadata.nodeCluster = primaryWorkerNode->nodeCluster; /* Same cluster as primary */ /* Other fields like hasMetadata, metadataSynced will take defaults from DefaultNodeMetadata @@ -1544,7 +1601,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS) { ereport(NOTICE, (errmsg("node %s:%d was already correctly registered as a replica for primary %s:%d (nodeid %d)", replicaHostname, replicaPort, - primaryHostname, primaryPort, primaryWorkerNode->nodeId))); + primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primaryWorkerNode->nodeId))); /* Intentional fall-through to return replicaNodeId */ } else @@ -1552,7 +1609,7 @@ citus_add_replica_node(PG_FUNCTION_ARGS) /* This state is less expected if our initial check passed or errored. */ ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("node %s:%d already exists but is not correctly configured as a replica for primary %s:%d", - replicaHostname, replicaPort, primaryHostname, primaryPort))); + replicaHostname, replicaPort, primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); } } @@ -1569,9 +1626,95 @@ citus_add_replica_node(PG_FUNCTION_ARGS) * For now, citus_add_node_replica just registers it. */ - PG_RETURN_INT32(replicaNodeId); + return replicaNodeId; } +/* + * citus_remove_replica_node removes an inactive streaming replica node from Citus metadata. + */ +Datum +citus_remove_replica_node(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + text *nodeNameText = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + char *nodeName = text_to_cstring(nodeNameText); + + WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); + + if (workerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("node \"%s:%d\" does not exist", nodeName, nodePort))); + } + + RemoveReplicaNode(workerNode); + + PG_RETURN_VOID(); +} + +/* + * citus_remove_replica_node_with_nodeid removes an inactive streaming replica node from Citus metadata + * using the node's ID. + */ +Datum +citus_remove_replica_node_with_nodeid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + uint32 replicaNodeId = PG_GETARG_INT32(0); + + WorkerNode *replicaNode = FindNodeAnyClusterByNodeId(replicaNodeId); + + if (replicaNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node with ID %d does not exist", replicaNodeId))); + } + RemoveReplicaNode(replicaNode); + + PG_RETURN_VOID(); +} + +static void +RemoveReplicaNode(WorkerNode *replicaNode) +{ + Assert(replicaNode != NULL); + + if (!replicaNode->nodeisreplica) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Node %s:%d (ID %d) is not a replica node. " + "Use citus_remove_node() to remove primary or already promoted nodes.", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + } + + if (replicaNode->isActive) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node %s:%d (ID %d) is marked as active and cannot be removed with this function. " + "This might indicate a promoted replica. Consider using citus_remove_node() if you are sure, " + "or ensure it's properly deactivated if it's an unpromoted replica in an unexpected state.", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + } + + /* + * All checks passed, proceed with removal. + * RemoveNodeFromCluster handles locking, catalog changes, connection closing, and metadata sync. + */ + ereport(NOTICE, (errmsg("Removing inactive replica node %s:%d (ID %d)", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + + RemoveNodeFromCluster(replicaNode->workerName, replicaNode->workerPort); + + /* RemoveNodeFromCluster might set this, but setting it here ensures it's marked for this UDF's transaction. */ + TransactionModifiedNodeMetadata = true; +} /* * SetLockTimeoutLocally sets the lock_timeout to the given value. diff --git a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql index 8e0acec75..76ae1f596 100644 --- a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql +++ b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql @@ -54,6 +54,7 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits; #include "cat_upgrades/add_replica_info_to_pg_dist_node.sql" #include "udfs/citus_add_replica_node/13.1-1.sql" +#include "udfs/citus_remove_replica_node/13.1-1.sql" -- Since shard_name/13.1-1.sql first drops the function and then creates it, we first -- need to drop citus_shards view since that view depends on this function. And immediately diff --git a/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql index f936c6af7..9530e54c2 100644 --- a/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql +++ b/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql @@ -11,3 +11,16 @@ COMMENT ON FUNCTION pg_catalog.citus_add_replica_node(text, integer, text, integ 'Adds a new node as a replica of an existing primary node. The replica is initially inactive. Returns the nodeid of the new replica node.'; REVOKE ALL ON FUNCTION pg_catalog.citus_add_replica_node(text, int, text, int) FROM PUBLIC; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_add_replica_node_with_nodeid( + replica_hostname text, + replica_port integer, + primary_nodeid integer) + RETURNS INTEGER + LANGUAGE C VOLATILE STRICT + AS 'MODULE_PATHNAME', $$citus_add_replica_node_with_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_add_replica_node_with_nodeid(text, integer, integer) IS +'Adds a new node as a replica of an existing primary node using the primary node''s ID. The replica is initially inactive. Returns the nodeid of the new replica node.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_add_replica_node_with_nodeid(text, int, int) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_remove_replica_node/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_remove_replica_node/13.1-1.sql new file mode 100644 index 000000000..68b957681 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_remove_replica_node/13.1-1.sql @@ -0,0 +1,24 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_remove_replica_node( + nodename text, + nodeport integer +) +RETURNS VOID +LANGUAGE C VOLATILE STRICT +AS 'MODULE_PATHNAME', $$citus_remove_replica_node$$; + +COMMENT ON FUNCTION pg_catalog.citus_remove_replica_node(text, integer) +IS 'Removes an inactive streaming replica node from Citus metadata. Errors if the node is not found, not registered as a replica, or is currently marked active.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_remove_replica_node(text, integer) FROM PUBLIC; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid( + nodeid integer +) +RETURNS VOID +LANGUAGE C VOLATILE STRICT +AS 'MODULE_PATHNAME', $$citus_remove_replica_node_with_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid(integer) +IS 'Removes an inactive streaming replica node from Citus metadata using its node ID. Errors if the node is not found, not registered as a replica, or is currently marked active.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid(integer) FROM PUBLIC; From 56f7249704bd232c8335d46c1e3e5c889c18148d Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Tue, 1 Jul 2025 23:58:14 +0500 Subject: [PATCH 3/3] Implement snapshot-based node split by promoting a replica This commit introduces the functionality to promote a streaming replica to a primary node and rebalance the shards between the original primary and the newly promoted node. The key changes include: - **New UDF `citus_promote_replica_and_rebalance`**: This function orchestrates the entire process. It blocks writes on the primary, waits for the replica to catch up, promotes the replica, updates its metadata to a primary node, and then rebalances the shards between the two nodes. - **New UDF `get_snapshot_based_node_split_plan`**: This function provides a preview of how the shards will be distributed between the primary and the replica after the promotion and rebalancing, allowing users to inspect the plan before execution. - **Core Logic for Promotion and Rebalancing**: - node_promotion.c: Contains the implementation for the promotion logic, including checking replication lag and calling `pg_promote`. - shard_rebalancer.c: Extended to calculate the shard distribution plan for the two-node split. - shard_transfer.c: Includes logic to adjust shard placements in the metadata after the split. - node_metadata.c: Updated to handle the activation of a replica as a new primary node. This commit enables to execute the happy path for snapshot-based node split in Citus. While the test cases and code cleanup are still remaining, the core functionality is in place and can be tested. --- .../distributed/metadata/metadata_cache.c | 2 + .../distributed/metadata/node_metadata.c | 27 +- .../distributed/operations/node_promotion.c | 380 ++++++++++++++++++ .../distributed/operations/shard_rebalancer.c | 366 ++++++++++++++++- .../distributed/operations/shard_transfer.c | 38 ++ .../distributed/sql/citus--13.0-1--13.1-1.sql | 3 +- .../13.1-1.sql | 12 + .../13.1-1.sql | 18 + src/include/distributed/metadata_utility.h | 3 + src/include/distributed/shard_rebalancer.h | 3 + src/include/distributed/shard_transfer.h | 6 + 11 files changed, 852 insertions(+), 6 deletions(-) create mode 100644 src/backend/distributed/operations/node_promotion.c create mode 100644 src/backend/distributed/sql/udfs/citus_promote_replica_and_rebalance/13.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/get_snapshot_based_node_split_plan/13.1-1.sql diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index b9f274495..432eb87fc 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -4379,6 +4379,8 @@ InitializeWorkerNodeCache(void) workerNode->isActive = currentNode->isActive; workerNode->nodeRole = currentNode->nodeRole; workerNode->shouldHaveShards = currentNode->shouldHaveShards; + workerNode->nodeprimarynodeid = currentNode->nodeprimarynodeid; + workerNode->nodeisreplica = currentNode->nodeisreplica; strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN); newWorkerNodeArray[workerNodeIndex++] = workerNode; diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index fa4de2f1d..d85adce1f 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -135,8 +135,6 @@ static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid); static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly); static void EnsureTransactionalMetadataSyncMode(void); -static void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE - lockMode); static BackgroundWorkerHandle * CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown); static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode( WorkerNode *workerNode, bool force, int32 lock_cooldown); @@ -1189,6 +1187,27 @@ ActivateNodeList(MetadataSyncContext *context) SetNodeMetadata(context, localOnly); } +/* + * ActivateReplicaNodeAsPrimary sets the given worker node as primary and active + * in the pg_dist_node catalog and make the replica node as first class citizen. + */ +void +ActivateReplicaNodeAsPrimary(WorkerNode *workerNode) +{ + /* + * Set the node as primary and active. + */ + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_noderole, + ObjectIdGetDatum(PrimaryNodeRoleId())); + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive, + BoolGetDatum(true)); + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeisreplica, + BoolGetDatum(false)); + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_shouldhaveshards, + BoolGetDatum(true)); + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeprimarynodeid, + Int32GetDatum(0)); +} /* * Acquires shard metadata locks on all shards residing in the given worker node @@ -3710,7 +3729,7 @@ EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode, char* replicaHostname StringInfo sysidQueryResInfo = (StringInfo) linitial(sysidList); char *sysidQueryResStr = sysidQueryResInfo->data; - ereport (NOTICE, (errmsg("system identifier of %s:%d is %s", + ereport (DEBUG2, (errmsg("system identifier of %s:%d is %s", replicaHostname, replicaPort, sysidQueryResStr))); /* We do not need the connection anymore */ @@ -3757,7 +3776,7 @@ EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode, char* replicaHostname StringInfo primarySysidQueryResInfo = (StringInfo) linitial(primarySizeList); char *primarySysidQueryResStr = primarySysidQueryResInfo->data; - ereport (NOTICE, (errmsg("system identifier of %s:%d is %s", + ereport (DEBUG2, (errmsg("system identifier of %s:%d is %s", primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primarySysidQueryResStr))); /* verify both identifiers */ if (strcmp(sysidQueryResStr, primarySysidQueryResStr) != 0) diff --git a/src/backend/distributed/operations/node_promotion.c b/src/backend/distributed/operations/node_promotion.c new file mode 100644 index 000000000..5d9934bde --- /dev/null +++ b/src/backend/distributed/operations/node_promotion.c @@ -0,0 +1,380 @@ +#include "postgres.h" +#include "utils/fmgrprotos.h" +#include "utils/pg_lsn.h" + +#include "distributed/argutils.h" +#include "distributed/remote_commands.h" +#include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" +#include "distributed/shard_rebalancer.h" + + +static int64 GetReplicationLag(WorkerNode *primaryWorkerNode, WorkerNode *replicaWorkerNode); +static void BlockAllWritesToWorkerNode(WorkerNode *workerNode); +static bool GetNodeIsInRecoveryStatus(WorkerNode *workerNode); +static void PromoteReplicaNode(WorkerNode *replicaWorkerNode); + + +PG_FUNCTION_INFO_V1(citus_promote_replica_and_rebalance); + +Datum +citus_promote_replica_and_rebalance(PG_FUNCTION_ARGS) +{ + // Ensure superuser and coordinator + EnsureSuperUser(); + EnsureCoordinator(); + + // Get replica_nodeid argument + int32 replicaNodeIdArg = PG_GETARG_INT32(0); + + WorkerNode *replicaNode = NULL; + WorkerNode *primaryNode = NULL; + + // Lock pg_dist_node to prevent concurrent modifications during this operation + LockRelationOid(DistNodeRelationId(), RowExclusiveLock); + + replicaNode = FindNodeAnyClusterByNodeId(replicaNodeIdArg); + if (replicaNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node with ID %d not found.", replicaNodeIdArg))); + } + + if (!replicaNode->nodeisreplica || replicaNode->nodeprimarynodeid == 0) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Node %s:%d (ID %d) is not a valid replica or its primary node ID is not set.", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + } + + if (replicaNode->isActive) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node %s:%d (ID %d) is already active and cannot be promoted.", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + } + + primaryNode = FindNodeAnyClusterByNodeId(replicaNode->nodeprimarynodeid); + if (primaryNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Primary node with ID %d (for replica %s:%d) not found.", + replicaNode->nodeprimarynodeid, replicaNode->workerName, replicaNode->workerPort))); + } + + if (primaryNode->nodeisreplica) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Primary node %s:%d (ID %d) is itself a replica.", + primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId))); + } + + if (!primaryNode->isActive) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Primary node %s:%d (ID %d) is not active.", + primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId))); + } + /* Ensure the primary node is related to the replica node */ + if (primaryNode->nodeId != replicaNode->nodeprimarynodeid) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node %s:%d (ID %d) is not replica of the primary node %s:%d (ID %d).", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId, + primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId))); + } + + ereport(NOTICE, (errmsg("Starting promotion process for replica node %s:%d (ID %d), original primary %s:%d (ID %d)", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId, + primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId))); + + /* Step 1: Block Writes on Original Primary's Shards */ + ereport(NOTICE, (errmsg("Blocking writes on shards of original primary node %s:%d (group %d)", + primaryNode->workerName, primaryNode->workerPort, primaryNode->groupId))); + + BlockAllWritesToWorkerNode(primaryNode); + + /* Step 2: Wait for Replica to Catch Up */ + ereport(NOTICE, (errmsg("Waiting for replica %s:%d to catch up with primary %s:%d", + replicaNode->workerName, replicaNode->workerPort, + primaryNode->workerName, primaryNode->workerPort))); + + bool caughtUp = false; + const int catchUpTimeoutSeconds = 300; // 5 minutes, TODO: Make GUC + const int sleepIntervalSeconds = 5; + int elapsedTimeSeconds = 0; + + while (elapsedTimeSeconds < catchUpTimeoutSeconds) + { + uint64 repLag = GetReplicationLag(primaryNode, replicaNode); + if (repLag <= 0) + { + caughtUp = true; + break; + } + pg_usleep(sleepIntervalSeconds * 1000000L); + elapsedTimeSeconds += sleepIntervalSeconds; + } + + if (!caughtUp) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica %s:%d failed to catch up with primary %s:%d within %d seconds.", + replicaNode->workerName, replicaNode->workerPort, + primaryNode->workerName, primaryNode->workerPort, + catchUpTimeoutSeconds))); + } + + ereport(NOTICE, (errmsg("Replica %s:%d is now caught up with primary %s:%d.", + replicaNode->workerName, replicaNode->workerPort, + primaryNode->workerName, primaryNode->workerPort))); + + + + /* Step 3: PostgreSQL Replica Promotion */ + ereport(NOTICE, (errmsg("Attempting to promote replica %s:%d via pg_promote().", + replicaNode->workerName, replicaNode->workerPort))); + + PromoteReplicaNode(replicaNode); + + /* Step 4: Update Replica Metadata in pg_dist_node on Coordinator */ + + ereport(NOTICE, (errmsg("Updating metadata for promoted replica %s:%d (ID %d)", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + ActivateReplicaNodeAsPrimary(replicaNode); + + /* We need to sync metadata changes to all nodes before rebalancing shards + * since the rebalancing algorithm depends on the latest metadata. + */ + SyncNodeMetadataToNodes(); + + /* Step 5: Split Shards Between Primary and Replica */ + SplitShardsBetweenPrimaryAndReplica(primaryNode, replicaNode, PG_GETARG_NAME_OR_NULL(1)); + + + TransactionModifiedNodeMetadata = true; // Inform Citus about metadata change + TriggerNodeMetadataSyncOnCommit(); // Ensure changes are propagated + + + + ereport(NOTICE, (errmsg("Replica node %s:%d (ID %d) metadata updated. It is now a primary", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + + + + /* TODO: Step 6: Unblock Writes (should be handled by transaction commit) */ + ereport(NOTICE, (errmsg("TODO: Step 6: Unblock Writes"))); + + PG_RETURN_VOID(); +} + + +/* + * GetReplicationLag calculates the replication lag between the primary and replica nodes. + * It returns the lag in bytes. + */ +static int64 +GetReplicationLag(WorkerNode *primaryWorkerNode, WorkerNode *replicaWorkerNode) +{ + +#if PG_VERSION_NUM >= 100000 + const char *primary_lsn_query = "SELECT pg_current_wal_lsn()"; + const char *replica_lsn_query = "SELECT pg_last_wal_replay_lsn()"; +#else + const char *primary_lsn_query = "SELECT pg_current_xlog_location()"; + const char *replica_lsn_query = "SELECT pg_last_xlog_replay_location()"; +#endif + + int connectionFlag = 0; + MultiConnection *primaryConnection = GetNodeConnection(connectionFlag, + primaryWorkerNode->workerName, + primaryWorkerNode->workerPort); + if (PQstatus(primaryConnection->pgConn) != CONNECTION_OK) + { + ereport(ERROR, (errmsg("cannot connect to %s:%d to fetch replication status", + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); + } + MultiConnection *replicaConnection = GetNodeConnection(connectionFlag, + replicaWorkerNode->workerName, + replicaWorkerNode->workerPort); + + if (PQstatus(replicaConnection->pgConn) != CONNECTION_OK) + { + ereport(ERROR, (errmsg("cannot connect to %s:%d to fetch replication status", + replicaWorkerNode->workerName, replicaWorkerNode->workerPort))); + } + + int primaryResultCode = SendRemoteCommand(primaryConnection, primary_lsn_query); + if (primaryResultCode == 0) + { + ReportConnectionError(primaryConnection, ERROR); + } + + PGresult *primaryResult = GetRemoteCommandResult(primaryConnection, true); + if (!IsResponseOK(primaryResult)) + { + ReportResultError(primaryConnection, primaryResult, ERROR); + } + + int replicaResultCode = SendRemoteCommand(replicaConnection, replica_lsn_query); + if (replicaResultCode == 0) + { + ReportConnectionError(replicaConnection, ERROR); + } + PGresult *replicaResult = GetRemoteCommandResult(replicaConnection, true); + if (!IsResponseOK(replicaResult)) + { + ReportResultError(replicaConnection, replicaResult, ERROR); + } + + + List *primaryLsnList = ReadFirstColumnAsText(primaryResult); + if (list_length(primaryLsnList) != 1) + { + PQclear(primaryResult); + ClearResults(primaryConnection, true); + CloseConnection(primaryConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot parse get primary LSN result from %s:%d", + primaryWorkerNode->workerName, + primaryWorkerNode->workerPort))); + + } + StringInfo primaryLsnQueryResInfo = (StringInfo) linitial(primaryLsnList); + char *primary_lsn_str = primaryLsnQueryResInfo->data; + + List *replicaLsnList = ReadFirstColumnAsText(replicaResult); + if (list_length(replicaLsnList) != 1) + { + PQclear(replicaResult); + ClearResults(replicaConnection, true); + CloseConnection(replicaConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot parse get replica LSN result from %s:%d", + replicaWorkerNode->workerName, + replicaWorkerNode->workerPort))); + + } + StringInfo replicaLsnQueryResInfo = (StringInfo) linitial(replicaLsnList); + char *replica_lsn_str = replicaLsnQueryResInfo->data; + + if (!primary_lsn_str || !replica_lsn_str) + return -1; + + int64 primary_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(primary_lsn_str))); + int64 replica_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(replica_lsn_str))); + + int64 lag_bytes = primary_lsn - replica_lsn; + + PQclear(primaryResult); + ForgetResults(primaryConnection); + CloseConnection(primaryConnection); + + PQclear(replicaResult); + ForgetResults(replicaConnection); + CloseConnection(replicaConnection); + + ereport(NOTICE, (errmsg("replication lag between %s:%d and %s:%d is %ld bytes", + primaryWorkerNode->workerName, primaryWorkerNode->workerPort, + replicaWorkerNode->workerName, replicaWorkerNode->workerPort, + lag_bytes))); + return lag_bytes; +} + +static void +PromoteReplicaNode(WorkerNode *replicaWorkerNode) +{ + int connectionFlag = 0; + MultiConnection *replicaConnection = GetNodeConnection(connectionFlag, + replicaWorkerNode->workerName, + replicaWorkerNode->workerPort); + + if (PQstatus(replicaConnection->pgConn) != CONNECTION_OK) + { + ereport(ERROR, (errmsg("cannot connect to %s:%d to promote replica", + replicaWorkerNode->workerName, replicaWorkerNode->workerPort))); + } + + const char *promoteQuery = "SELECT pg_promote(wait := true);"; + int resultCode = SendRemoteCommand(replicaConnection, promoteQuery); + if (resultCode == 0) + { + ReportConnectionError(replicaConnection, ERROR); + } + ForgetResults(replicaConnection); + CloseConnection(replicaConnection); + /* connect again and verify the replica is promoted */ + if ( GetNodeIsInRecoveryStatus(replicaWorkerNode) ) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Failed to promote replica %s:%d (ID %d). It is still in recovery.", + replicaWorkerNode->workerName, replicaWorkerNode->workerPort, replicaWorkerNode->nodeId))); + } + else + { + ereport(NOTICE, (errmsg("Replica node %s:%d (ID %d) has been successfully promoted.", + replicaWorkerNode->workerName, replicaWorkerNode->workerPort, replicaWorkerNode->nodeId))); + } +} + +static void +BlockAllWritesToWorkerNode(WorkerNode *workerNode) +{ + ereport(NOTICE, (errmsg("Blocking all writes to worker node %s:%d (ID %d)", + workerNode->workerName, workerNode->workerPort, workerNode->nodeId))); + // List *placementsOnOldPrimaryGroup = AllShardPlacementsOnNodeGroup(workerNode->groupId); + + LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock); +} + +bool +GetNodeIsInRecoveryStatus(WorkerNode *workerNode) +{ + int connectionFlag = 0; + MultiConnection *nodeConnection = GetNodeConnection(connectionFlag, + workerNode->workerName, + workerNode->workerPort); + + if (PQstatus(nodeConnection->pgConn) != CONNECTION_OK) + { + ereport(ERROR, (errmsg("cannot connect to %s:%d to check recovery status", + workerNode->workerName, workerNode->workerPort))); + } + + const char *recoveryQuery = "SELECT pg_is_in_recovery();"; + int resultCode = SendRemoteCommand(nodeConnection, recoveryQuery); + if (resultCode == 0) + { + ReportConnectionError(nodeConnection, ERROR); + } + + PGresult *result = GetRemoteCommandResult(nodeConnection, true); + if (!IsResponseOK(result)) + { + ReportResultError(nodeConnection, result, ERROR); + } + + List *recoveryStatusList = ReadFirstColumnAsText(result); + if (list_length(recoveryStatusList) != 1) + { + PQclear(result); + ClearResults(nodeConnection, true); + CloseConnection(nodeConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot parse recovery status result from %s:%d", + workerNode->workerName, + workerNode->workerPort))); + } + + StringInfo recoveryStatusInfo = (StringInfo) linitial(recoveryStatusList); + bool isInRecovery = (strcmp(recoveryStatusInfo->data, "t") == 0) || (strcmp(recoveryStatusInfo->data, "true") == 0); + + PQclear(result); + ForgetResults(nodeConnection); + CloseConnection(nodeConnection); + + return isInRecovery; +} \ No newline at end of file diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 074f1bed0..20dc7299e 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -81,9 +81,27 @@ typedef struct RebalanceOptions Form_pg_dist_rebalance_strategy rebalanceStrategy; const char *operationName; WorkerNode *workerNode; + List *involvedWorkerNodeList; } RebalanceOptions; +typedef struct SplitPrimaryReplicaShards +{ + /* + * primaryShardPlacementList contains the placements that + * should stay on primary worker node. + */ + List *primaryShardIdList; + /* + * replicaShardPlacementList contains the placements that should stay on + * replica worker node. + */ + List *replicaShardIdList; +} SplitPrimaryReplicaShards; + + +static SplitPrimaryReplicaShards * +GetPrimaryReplicaSplitRebalanceSteps(RebalanceOptions *options, WorkerNode* replicaNode); /* * RebalanceState is used to keep the internal state of the rebalance * algorithm in one place. @@ -318,6 +336,7 @@ PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check); PG_FUNCTION_INFO_V1(citus_rebalance_start); PG_FUNCTION_INFO_V1(citus_rebalance_stop); PG_FUNCTION_INFO_V1(citus_rebalance_wait); +PG_FUNCTION_INFO_V1(get_snapshot_based_node_split_plan); bool RunningUnderCitusTestSuite = false; int MaxRebalancerLoggedIgnoredMoves = 5; @@ -517,8 +536,16 @@ GetRebalanceSteps(RebalanceOptions *options) .context = &context, }; + if (options->involvedWorkerNodeList == NULL) + { + /* + * If the user did not specify a list of worker nodes, we use all the + * active worker nodes. + */ + options->involvedWorkerNodeList = SortedActiveWorkers(); + } /* sort the lists to make the function more deterministic */ - List *activeWorkerList = SortedActiveWorkers(); + List *activeWorkerList = options->involvedWorkerNodeList; //SortedActiveWorkers(); int shardAllowedNodeCount = 0; WorkerNode *workerNode = NULL; foreach_declared_ptr(workerNode, activeWorkerList) @@ -981,6 +1008,7 @@ rebalance_table_shards(PG_FUNCTION_ARGS) .excludedShardArray = PG_GETARG_ARRAYTYPE_P(3), .drainOnly = PG_GETARG_BOOL(5), .rebalanceStrategy = strategy, + .involvedWorkerNodeList = NULL, .improvementThreshold = strategy->improvementThreshold, }; Oid shardTransferModeOid = PG_GETARG_OID(4); @@ -3546,6 +3574,342 @@ EnsureShardCostUDF(Oid functionOid) ReleaseSysCache(proctup); } +/* + * SplitShardsBetweenPrimaryAndReplica splits the shards in shardPlacementList + * between the primary and replica nodes, adding them to the respective lists. + */ +void +SplitShardsBetweenPrimaryAndReplica(WorkerNode *primaryNode, + WorkerNode *replicaNode, + Name strategyName) +{ + CheckCitusVersion(ERROR); + + List *relationIdList = NIL; + relationIdList = NonColocatedDistRelationIdList(); + + Form_pg_dist_rebalance_strategy strategy = GetRebalanceStrategy(strategyName);/* We use default strategy for now */ + + RebalanceOptions options = { + .relationIdList = relationIdList, + .threshold = 0, /* Threshold is not strictly needed for two nodes */ + .maxShardMoves = -1, /* No limit on moves between these two nodes */ + .excludedShardArray = construct_empty_array(INT8OID), + .drainOnly = false, /* Not a drain operation */ + .rebalanceStrategy = strategy, + .improvementThreshold = 0, /* Consider all beneficial moves */ + .workerNode = primaryNode /* indicate Primary node as a source node */ + }; + + SplitPrimaryReplicaShards *splitShards = NULL; + splitShards = GetPrimaryReplicaSplitRebalanceSteps(&options, replicaNode); + AdjustShardsForPrimaryReplicaNodeSplit(primaryNode, replicaNode, + splitShards->primaryShardIdList, splitShards->replicaShardIdList); +} + +/* + * GetPrimaryReplicaSplitRebalanceSteps returns a List of PlacementUpdateEvents that are needed to + * rebalance a list of tables. + */ +static SplitPrimaryReplicaShards * +GetPrimaryReplicaSplitRebalanceSteps(RebalanceOptions *options, WorkerNode* replicaNode) +{ + WorkerNode *sourceNode = options->workerNode; + WorkerNode *targetNode = replicaNode; + + /* Initialize rebalance plan functions and context */ + EnsureShardCostUDF(options->rebalanceStrategy->shardCostFunction); + EnsureNodeCapacityUDF(options->rebalanceStrategy->nodeCapacityFunction); + EnsureShardAllowedOnNodeUDF(options->rebalanceStrategy->shardAllowedOnNodeFunction); + + RebalanceContext context; + memset(&context, 0, sizeof(RebalanceContext)); + fmgr_info(options->rebalanceStrategy->shardCostFunction, &context.shardCostUDF); + fmgr_info(options->rebalanceStrategy->nodeCapacityFunction, &context.nodeCapacityUDF); + fmgr_info(options->rebalanceStrategy->shardAllowedOnNodeFunction, + &context.shardAllowedOnNodeUDF); + + RebalancePlanFunctions rebalancePlanFunctions = { + .shardAllowedOnNode = ShardAllowedOnNode, + .nodeCapacity = NodeCapacity, + .shardCost = GetShardCost, + .context = &context, + }; + + /* + * Collect all active shard placements on the source node for the given relations. + * Unlike the main rebalancer, we build a single list of all relevant source placements + * across all specified relations (or all relations if none specified). + */ + List *allSourcePlacements = NIL; + Oid relationIdItr = InvalidOid; + foreach_declared_oid(relationIdItr, options->relationIdList) + { + List *shardPlacementList = FullShardPlacementList(relationIdItr, + options->excludedShardArray); + List *activeShardPlacementsForRelation = + FilterShardPlacementList(shardPlacementList, IsActiveShardPlacement); + + ShardPlacement *placement = NULL; + foreach_declared_ptr(placement, activeShardPlacementsForRelation) + { + if (placement->nodeId == sourceNode->nodeId) + { + /* Ensure we don't add duplicate shardId if it's somehow listed under multiple relations */ + bool alreadyAdded = false; + ShardPlacement *existingPlacement = NULL; + foreach_declared_ptr(existingPlacement, allSourcePlacements) + { + if (existingPlacement->shardId == placement->shardId) + { + alreadyAdded = true; + break; + } + } + if (!alreadyAdded) + { + allSourcePlacements = lappend(allSourcePlacements, placement); + } + } + } + } + + List *activeWorkerList = list_make2(options->workerNode, replicaNode); + SplitPrimaryReplicaShards *splitShards = palloc0(sizeof(SplitPrimaryReplicaShards)); + splitShards->primaryShardIdList = NIL; + splitShards->replicaShardIdList = NIL; + + if (list_length(allSourcePlacements) > 0) + { + /* + * Initialize RebalanceState considering only the source node's shards + * and the two active workers (source and target). + */ + RebalanceState *state = InitRebalanceState(activeWorkerList, allSourcePlacements, &rebalancePlanFunctions); + + NodeFillState *sourceFillState = NULL; + NodeFillState *targetFillState = NULL; + ListCell *fsc = NULL; + + /* Identify the fill states for our specific source and target nodes */ + foreach(fsc, state->fillStateListAsc) /* Could be fillStateListDesc too, order doesn't matter here */ + { + NodeFillState *fs = (NodeFillState *) lfirst(fsc); + if (fs->node->nodeId == sourceNode->nodeId) + { + sourceFillState = fs; + } + else if (fs->node->nodeId == targetNode->nodeId) + { + targetFillState = fs; + } + } + + if (sourceFillState != NULL && targetFillState != NULL) + { + /* + * The goal is to move roughly half the total cost from source to target. + * The target node is assumed to be empty or its existing load is not + * considered for this specific two-node balancing plan's shard distribution. + * We calculate costs based *only* on the shards currently on the source node. + */ + /* + * The core idea is to simulate the balancing process between these two nodes. + * We have all shards on sourceFillState. TargetFillState is initially empty (in terms of these specific shards). + * We want to move shards from source to target until their costs are as balanced as possible. + */ + float4 sourceCurrentCost = sourceFillState->totalCost; + float4 targetCurrentCost = 0; /* Representing cost on target from these source shards */ + + /* Sort shards on source node by cost (descending). This is a common heuristic. */ + sourceFillState->shardCostListDesc = SortList(sourceFillState->shardCostListDesc, CompareShardCostDesc); + + List *potentialMoves = NIL; + ListCell *lc_shardcost = NULL; + + /* + * Iterate through each shard on the source node. For each shard, decide if moving it + * to the target node would improve the balance (or is necessary to reach balance). + * A simple greedy approach: move shard if target node's current cost is less than source's. + */ + foreach(lc_shardcost, sourceFillState->shardCostListDesc) + { + ShardCost *shardToConsider = (ShardCost *) lfirst(lc_shardcost); + + /* Check if shard is allowed on the target node */ + // if (!state->functions->shardAllowedOnNode(shardToConsider->shardId, + // targetNode, + // state->functions->context)) + // { + // splitShards->primaryShardIdList = lappend_int(splitShards->primaryShardIdList, shardToConsider->shardId); + // continue; /* Cannot move this shard to the target */ + // } + + /* + * If moving this shard makes the target less loaded than the source would become, + * or if target is simply less loaded currently, consider the move. + * More accurately, we move if target's cost + shard's cost < source's cost - shard's cost (approximately) + * or if target is significantly emptier. + * The condition (targetCurrentCost < sourceCurrentCost - shardToConsider->cost) is a greedy choice. + * A better check: would moving this shard reduce the difference in costs? + * Current difference: abs(sourceCurrentCost - targetCurrentCost) + * Difference after move: abs((sourceCurrentCost - shardToConsider->cost) - (targetCurrentCost + shardToConsider->cost)) + * Move if new difference is smaller. + */ + float4 costOfShard = shardToConsider->cost; + float4 diffBefore = fabsf(sourceCurrentCost - targetCurrentCost); + float4 diffAfter = fabsf((sourceCurrentCost - costOfShard) - (targetCurrentCost + costOfShard)); + + if (diffAfter < diffBefore) + { + PlacementUpdateEvent *update = palloc0(sizeof(PlacementUpdateEvent)); + update->shardId = shardToConsider->shardId; + update->sourceNode = sourceNode; + update->targetNode = targetNode; + update->updateType = PLACEMENT_UPDATE_MOVE; + potentialMoves = lappend(potentialMoves, update); + splitShards->replicaShardIdList = lappend_int(splitShards->replicaShardIdList, shardToConsider->shardId); + + + /* Update simulated costs for the next iteration */ + sourceCurrentCost -= costOfShard; + targetCurrentCost += costOfShard; + } + else + { + splitShards->primaryShardIdList = lappend_int(splitShards->primaryShardIdList, shardToConsider->shardId); + } + } + } + /* RebalanceState is in memory context, will be cleaned up */ + } + return splitShards; +} + +/* + * Snapshot-based node split plan outputs the shard placement plan + * for primary and replica based node split + * + * SQL signature: + * get_snapshot_based_node_split_plan( + * primary_node_name text, + * primary_node_port integer, + * replica_node_name text, + * replica_node_port integer, + * rebalance_strategy name DEFAULT NULL + * + */ +Datum +get_snapshot_based_node_split_plan(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + text *primaryNodeNameText = PG_GETARG_TEXT_P(0); + int32 primaryNodePort = PG_GETARG_INT32(1); + text *replicaNodeNameText = PG_GETARG_TEXT_P(2); + int32 replicaNodePort = PG_GETARG_INT32(3); + + char *primaryNodeName = text_to_cstring(primaryNodeNameText); + char *replicaNodeName = text_to_cstring(replicaNodeNameText); + + WorkerNode *primaryNode = FindWorkerNodeOrError(primaryNodeName, primaryNodePort); + WorkerNode *replicaNode = FindWorkerNodeOrError(replicaNodeName, replicaNodePort); + + if (!replicaNode->nodeisreplica || replicaNode->nodeprimarynodeid == 0) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Node %s:%d (ID %d) is not a valid replica or its primary node ID is not set.", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + } + if (primaryNode->nodeisreplica) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Primary node %s:%d (ID %d) is itself a replica.", + primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId))); + } + /* Ensure the primary node is related to the replica node */ + if (primaryNode->nodeId != replicaNode->nodeprimarynodeid) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node %s:%d (ID %d) is not replica of the primary node %s:%d (ID %d).", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId, + primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId))); + } + + List *relationIdList = NIL; + relationIdList = NonColocatedDistRelationIdList(); + + Form_pg_dist_rebalance_strategy strategy = GetRebalanceStrategy( + PG_GETARG_NAME_OR_NULL(4)); + + RebalanceOptions options = { + .relationIdList = relationIdList, + .threshold = 0, /* Threshold is not strictly needed for two nodes */ + .maxShardMoves = -1, /* No limit on moves between these two nodes */ + .excludedShardArray = construct_empty_array(INT8OID), + .drainOnly = false, /* Not a drain operation */ + .rebalanceStrategy = strategy, + .improvementThreshold = 0, /* Consider all beneficial moves */ + .workerNode = primaryNode /* indicate Primary node as a source node */ + }; + + SplitPrimaryReplicaShards *splitShards = NULL; + splitShards = GetPrimaryReplicaSplitRebalanceSteps(&options, replicaNode); + + if (splitShards == NULL) + { + ereport(ERROR, (errmsg("No shards to split between primary and replica nodes."))); + } + + int shardId = 0; + TupleDesc tupdesc; + Tuplestorestate *tupstore = SetupTuplestore(fcinfo, &tupdesc); + Datum values[4]; + bool nulls[4]; + + + foreach_declared_int(shardId, splitShards->primaryShardIdList) + { + ShardInterval *shardInterval = LoadShardInterval(shardId); + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + ListCell *colocatedShardCell = NULL; + foreach(colocatedShardCell, colocatedShardList) + { + ShardInterval *colocatedShard = lfirst(colocatedShardCell); + int colocatedShardId = colocatedShard->shardId; + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = ObjectIdGetDatum(RelationIdForShard(colocatedShardId)); + values[1] = UInt64GetDatum(colocatedShardId); + values[2] = UInt64GetDatum(ShardLength(colocatedShardId)); + values[3] = PointerGetDatum(cstring_to_text("Primary Node")); + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + } + + foreach_declared_int(shardId, splitShards->replicaShardIdList) + { + ShardInterval *shardInterval = LoadShardInterval(shardId); + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + ListCell *colocatedShardCell = NULL; + foreach(colocatedShardCell, colocatedShardList) + { + ShardInterval *colocatedShard = lfirst(colocatedShardCell); + int colocatedShardId = colocatedShard->shardId; + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = ObjectIdGetDatum(RelationIdForShard(colocatedShardId)); + values[1] = UInt64GetDatum(colocatedShardId); + values[2] = UInt64GetDatum(ShardLength(colocatedShardId)); + values[3] = PointerGetDatum(cstring_to_text("Replica Node")); + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + } + + return (Datum) 0; +} /* * EnsureNodeCapacityUDF checks that the UDF matching the oid has the correct diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index b7d07b2cf..3922b862e 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -573,6 +573,44 @@ TransferShards(int64 shardId, char *sourceNodeName, FinalizeCurrentProgressMonitor(); } +/* + * AdjustShardsForPrimaryReplicaNodeSplit is called when a primary-replica node split + * occurs. It adjusts the shard placements such that the shards that should be on the + * primary node are removed from the replica node, and vice versa. + * + * This function does not move any data; it only updates the shard placement metadata. + */ +void +AdjustShardsForPrimaryReplicaNodeSplit(WorkerNode *primaryNode, + WorkerNode *replicaNode, + List* primaryShardList, + List* replicaShardList) +{ + int shardId = 0; + /* + * Remove all shards from the replica that should reside on the primary node, + * and update the shard placement metadata for shards that will now be served + * from the replica node. No data movement is required; we only need to drop + * the relevant shards from the replica and primary nodes and update the + * corresponding shard placement metadata. + */ + foreach_declared_int(shardId, primaryShardList) + { + ShardInterval *shardInterval = LoadShardInterval(shardId); + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + /* TODO: Drops shard table here */ + } + /* Now drop all shards from primary that need to be on the replica node */ + foreach_declared_int(shardId, replicaShardList) + { + ShardInterval *shardInterval = LoadShardInterval(shardId); + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + UpdateColocatedShardPlacementMetadataOnWorkers(shardId, + primaryNode->workerName, primaryNode->workerPort, + replicaNode->workerName, replicaNode->workerPort); + /* TODO: Drop the not required table on primary here */ + } +} /* * Insert deferred cleanup records. diff --git a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql index 76ae1f596..009dc2dd2 100644 --- a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql +++ b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql @@ -55,7 +55,8 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits; #include "cat_upgrades/add_replica_info_to_pg_dist_node.sql" #include "udfs/citus_add_replica_node/13.1-1.sql" #include "udfs/citus_remove_replica_node/13.1-1.sql" - +#include "udfs/citus_promote_replica_and_rebalance/13.1-1.sql" +#include "udfs/get_snapshot_based_node_split_plan/13.1-1.sql" -- Since shard_name/13.1-1.sql first drops the function and then creates it, we first -- need to drop citus_shards view since that view depends on this function. And immediately -- after creating the function, we recreate citus_shards view again. diff --git a/src/backend/distributed/sql/udfs/citus_promote_replica_and_rebalance/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_promote_replica_and_rebalance/13.1-1.sql new file mode 100644 index 000000000..274e1f727 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_promote_replica_and_rebalance/13.1-1.sql @@ -0,0 +1,12 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_promote_replica_and_rebalance( + replica_nodeid integer, + rebalance_strategy name DEFAULT NULL +) +RETURNS VOID +AS 'MODULE_PATHNAME' +LANGUAGE C VOLATILE; + +COMMENT ON FUNCTION pg_catalog.citus_promote_replica_and_rebalance(integer, name) IS +'Promotes a registered replica node to a primary, performs necessary metadata updates, and rebalances a portion of shards from its original primary to the newly promoted node.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_promote_replica_and_rebalance(integer, name) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/get_snapshot_based_node_split_plan/13.1-1.sql b/src/backend/distributed/sql/udfs/get_snapshot_based_node_split_plan/13.1-1.sql new file mode 100644 index 000000000..f2d294315 --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_snapshot_based_node_split_plan/13.1-1.sql @@ -0,0 +1,18 @@ +CREATE OR REPLACE FUNCTION pg_catalog.get_snapshot_based_node_split_plan( + primary_node_name text, + primary_node_port integer, + replica_node_name text, + replica_node_port integer, + rebalance_strategy name DEFAULT NULL + ) + RETURNS TABLE (table_name regclass, + shardid bigint, + shard_size bigint, + placement_node text) + AS 'MODULE_PATHNAME' + LANGUAGE C VOLATILE; + +COMMENT ON FUNCTION pg_catalog.get_snapshot_based_node_split_plan(text, int, text, int, name) + IS 'shows the shard placements to balance shards between primary and replica worker nodes'; + +REVOKE ALL ON FUNCTION pg_catalog.get_snapshot_based_node_split_plan(text, int, text, int, name) FROM PUBLIC; diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 38c13eb51..fd146b576 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -466,4 +466,7 @@ extern bool IsBackgroundJobStatusTerminal(BackgroundJobStatus status); extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status); extern Oid BackgroundJobStatusOid(BackgroundJobStatus status); extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status); +/* from node_metadata.c */ +extern void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode); +extern void ActivateReplicaNodeAsPrimary(WorkerNode *workerNode); #endif /* METADATA_UTILITY_H */ diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 79414eb3c..8ea5fb1d0 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -222,4 +222,7 @@ extern void SetupRebalanceMonitor(List *placementUpdateList, uint64 initialProgressState, PlacementUpdateStatus initialStatus); +extern void SplitShardsBetweenPrimaryAndReplica(WorkerNode *primaryNode, + WorkerNode *replicaNode, + Name strategyName); #endif /* SHARD_REBALANCER_H */ diff --git a/src/include/distributed/shard_transfer.h b/src/include/distributed/shard_transfer.h index c1621879b..0d7b641a9 100644 --- a/src/include/distributed/shard_transfer.h +++ b/src/include/distributed/shard_transfer.h @@ -41,3 +41,9 @@ extern void UpdatePlacementUpdateStatusForShardIntervalList(List *shardIntervalL extern void InsertDeferredDropCleanupRecordsForShards(List *shardIntervalList); extern void InsertCleanupRecordsForShardPlacementsOnNode(List *shardIntervalList, int32 groupId); + +extern void +AdjustShardsForPrimaryReplicaNodeSplit(WorkerNode *primaryNode, + WorkerNode *replicaNode, + List* primaryShardList, + List* replicaShardList); \ No newline at end of file