diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 79cc61092..432eb87fc 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -3549,6 +3549,18 @@ SecondaryNodeRoleId(void) return MetadataCache.secondaryNodeRoleId; } +/* return the Oid of the 'unavailable' nodeRole enum value */ +Oid +UnavailableNodeRoleId(void) +{ + if (!MetadataCache.secondaryNodeRoleId) + { + MetadataCache.secondaryNodeRoleId = LookupStringEnumValueId("noderole", + "unavailable"); + } + + return MetadataCache.secondaryNodeRoleId; +} Oid CitusJobStatusScheduledId(void) @@ -4367,6 +4379,8 @@ InitializeWorkerNodeCache(void) workerNode->isActive = currentNode->isActive; workerNode->nodeRole = currentNode->nodeRole; workerNode->shouldHaveShards = currentNode->shouldHaveShards; + workerNode->nodeprimarynodeid = currentNode->nodeprimarynodeid; + workerNode->nodeisreplica = currentNode->nodeisreplica; strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN); newWorkerNodeArray[workerNodeIndex++] = workerNode; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index f73856169..215516c5a 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -814,7 +814,7 @@ NodeListInsertCommand(List *workerNodeList) appendStringInfo(nodeListInsertCommand, "INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, " "noderack, hasmetadata, metadatasynced, isactive, noderole, " - "nodecluster, shouldhaveshards) VALUES "); + "nodecluster, shouldhaveshards, nodeisreplica, nodeprimarynodeid) VALUES "); /* iterate over the worker nodes, add the values */ WorkerNode *workerNode = NULL; @@ -824,13 +824,14 @@ NodeListInsertCommand(List *workerNodeList) char *metadataSyncedString = workerNode->metadataSynced ? "TRUE" : "FALSE"; char *isActiveString = workerNode->isActive ? "TRUE" : "FALSE"; char *shouldHaveShards = workerNode->shouldHaveShards ? "TRUE" : "FALSE"; + char *nodeisreplicaString = workerNode->nodeisreplica ? "TRUE" : "FALSE"; Datum nodeRoleOidDatum = ObjectIdGetDatum(workerNode->nodeRole); Datum nodeRoleStringDatum = DirectFunctionCall1(enum_out, nodeRoleOidDatum); char *nodeRoleString = DatumGetCString(nodeRoleStringDatum); appendStringInfo(nodeListInsertCommand, - "(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s, %s)", + "(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s, %s, %s, %d)", workerNode->nodeId, workerNode->groupId, quote_literal_cstr(workerNode->workerName), @@ -841,7 +842,9 @@ NodeListInsertCommand(List *workerNodeList) isActiveString, nodeRoleString, quote_literal_cstr(workerNode->nodeCluster), - shouldHaveShards); + shouldHaveShards, + nodeisreplicaString, + workerNode->nodeprimarynodeid); processedWorkerNodeCount++; if (processedWorkerNodeCount != workerCount) @@ -875,9 +878,11 @@ NodeListIdempotentInsertCommand(List *workerNodeList) "hasmetadata = EXCLUDED.hasmetadata, " "isactive = EXCLUDED.isactive, " "noderole = EXCLUDED.noderole, " - "nodecluster = EXCLUDED.nodecluster ," + "nodecluster = EXCLUDED.nodecluster, " "metadatasynced = EXCLUDED.metadatasynced, " - "shouldhaveshards = EXCLUDED.shouldhaveshards"; + "shouldhaveshards = EXCLUDED.shouldhaveshards, " + "nodeisreplica = EXCLUDED.nodeisreplica, " + "nodeprimarynodeid = EXCLUDED.nodeprimarynodeid"; appendStringInfoString(nodeInsertIdempotentCommand, onConflictStr); return nodeInsertIdempotentCommand->data; } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index c2a2abd60..d85adce1f 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -84,6 +84,8 @@ typedef struct NodeMetadata bool isActive; Oid nodeRole; bool shouldHaveShards; + uint32 nodeprimarynodeid; + bool nodeisreplica; char *nodeCluster; } NodeMetadata; @@ -120,7 +122,6 @@ static char * NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *field); static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); -static WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId); static void ErrorIfAnyNodeNotExist(List *nodeList); static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context); static void SendDeletionCommandsForReplicatedTablePlacements( @@ -134,12 +135,16 @@ static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid); static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly); static void EnsureTransactionalMetadataSyncMode(void); -static void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE - lockMode); static BackgroundWorkerHandle * CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown); static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode( WorkerNode *workerNode, bool force, int32 lock_cooldown); +static void EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode, + char* replicaHostname, int replicaPort); +static int32 CitusAddReplicaNode(WorkerNode *primaryWorkerNode, + char *replicaHostname, int32 replicaPort); +static void RemoveReplicaNode(WorkerNode *replicaNode); + /* Function definitions go here */ /* declarations for dynamic loading */ @@ -168,6 +173,10 @@ PG_FUNCTION_INFO_V1(citus_coordinator_nodeid); PG_FUNCTION_INFO_V1(citus_is_coordinator); PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced); PG_FUNCTION_INFO_V1(citus_is_primary_node); +PG_FUNCTION_INFO_V1(citus_add_replica_node); +PG_FUNCTION_INFO_V1(citus_add_replica_node_with_nodeid); +PG_FUNCTION_INFO_V1(citus_remove_replica_node); +PG_FUNCTION_INFO_V1(citus_remove_replica_node_with_nodeid); /* * DefaultNodeMetadata creates a NodeMetadata struct with the fields set to @@ -183,6 +192,8 @@ DefaultNodeMetadata() nodeMetadata.nodeRack = WORKER_DEFAULT_RACK; nodeMetadata.shouldHaveShards = true; nodeMetadata.groupId = INVALID_GROUP_ID; + nodeMetadata.nodeisreplica = false; + nodeMetadata.nodeprimarynodeid = 0; /* 0 typically means InvalidNodeId */ return nodeMetadata; } @@ -1176,6 +1187,27 @@ ActivateNodeList(MetadataSyncContext *context) SetNodeMetadata(context, localOnly); } +/* + * ActivateReplicaNodeAsPrimary sets the given worker node as primary and active + * in the pg_dist_node catalog and make the replica node as first class citizen. + */ +void +ActivateReplicaNodeAsPrimary(WorkerNode *workerNode) +{ + /* + * Set the node as primary and active. + */ + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_noderole, + ObjectIdGetDatum(PrimaryNodeRoleId())); + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive, + BoolGetDatum(true)); + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeisreplica, + BoolGetDatum(false)); + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_shouldhaveshards, + BoolGetDatum(true)); + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeprimarynodeid, + Int32GetDatum(0)); +} /* * Acquires shard metadata locks on all shards residing in the given worker node @@ -1421,6 +1453,287 @@ master_update_node(PG_FUNCTION_ARGS) return citus_update_node(fcinfo); } +/* + * citus_add_replica_node adds a new node as a replica of an existing primary node. + */ + +Datum +citus_add_replica_node(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + text *replicaHostnameText = PG_GETARG_TEXT_P(0); + int32 replicaPort = PG_GETARG_INT32(1); + text *primaryHostnameText = PG_GETARG_TEXT_P(2); + int32 primaryPort = PG_GETARG_INT32(3); + + char *replicaHostname = text_to_cstring(replicaHostnameText); + char *primaryHostname = text_to_cstring(primaryHostnameText); + + WorkerNode *primaryWorkerNode = FindWorkerNodeAnyCluster(primaryHostname, primaryPort); + + if (primaryWorkerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("primary node %s:%d not found in pg_dist_node", + primaryHostname, primaryPort))); + } + + int32 replicaNodeId = CitusAddReplicaNode(primaryWorkerNode, replicaHostname, replicaPort); + + PG_RETURN_INT32(replicaNodeId); +} + +/* + * citus_add_replica_node_with_nodeid adds a new node as a replica of an existing primary node + * using the primary node's ID. It records the replica's hostname, port, and links it to the + * primary node's ID. + * + * This function is useful when you already know the primary node's ID and want to add a replica + * without needing to look it up by hostname and port. +*/ +Datum +citus_add_replica_node_with_nodeid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + text *replicaHostnameText = PG_GETARG_TEXT_P(0); + int32 replicaPort = PG_GETARG_INT32(1); + int32 primaryNodeId = PG_GETARG_INT32(2); + + char *replicaHostname = text_to_cstring(replicaHostnameText); + + bool missingOk = false; + WorkerNode *primaryWorkerNode = FindNodeWithNodeId(primaryNodeId, missingOk); + + if (primaryWorkerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("primary node with ID %d does not exist", primaryNodeId))); + + } + + int32 replicaNodeId = CitusAddReplicaNode(primaryWorkerNode, replicaHostname, replicaPort); + + PG_RETURN_INT32(replicaNodeId); + +} + +/* + * CitusAddReplicaNode function adds a new node as a replica of an existing primary node. + * It records the replica's hostname, port, and links it to the primary node's ID. + * The replica is initially marked as inactive and not having shards. + */ +static int32 +CitusAddReplicaNode(WorkerNode *primaryWorkerNode, + char *replicaHostname, int32 replicaPort) +{ + + Assert(primaryWorkerNode != NULL); + + /* Future-proofing: Ideally, a primary node should not itself be a replica. + * This check might be more relevant once replica promotion logic exists. + * For now, pg_dist_node.nodeisreplica defaults to false for existing nodes. + */ + if (primaryWorkerNode->nodeisreplica) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("primary node %s:%d is itself a replica and cannot have replicas", + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); + } + + if (!primaryWorkerNode->shouldHaveShards) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("primary node %s:%d does not have shards, node without shards cannot have replicas", + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); + } + + WorkerNode *existingReplicaNode = FindWorkerNodeAnyCluster(replicaHostname, replicaPort); + if (existingReplicaNode != NULL) + { + /* + * Idempotency check: If the node already exists, is it already correctly + * registered as a replica for THIS primary? + */ + if (existingReplicaNode->nodeisreplica && + existingReplicaNode->nodeprimarynodeid == primaryWorkerNode->nodeId) + { + ereport(NOTICE, (errmsg("node %s:%d is already registered as a replica for primary %s:%d (nodeid %d)", + replicaHostname, replicaPort, + primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primaryWorkerNode->nodeId))); + PG_RETURN_INT32(existingReplicaNode->nodeId); + } + else + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("a different node %s:%d (nodeid %d) already exists or is a replica for a different primary", + replicaHostname, replicaPort, existingReplicaNode->nodeId))); + } + } + EnsureValidStreamingReplica(primaryWorkerNode, replicaHostname, replicaPort); + + NodeMetadata nodeMetadata = DefaultNodeMetadata(); + + nodeMetadata.nodeisreplica = true; + nodeMetadata.nodeprimarynodeid = primaryWorkerNode->nodeId; + nodeMetadata.isActive = false; /* Replicas start as inactive */ + nodeMetadata.shouldHaveShards = false; /* Replicas do not directly own primary shards */ + nodeMetadata.groupId = INVALID_GROUP_ID; /* Replicas get a new group ID and do not belong to any existing group */ + nodeMetadata.nodeRole = UnavailableNodeRoleId(); /* The node role is set to 'unavailable' */ + nodeMetadata.nodeCluster = primaryWorkerNode->nodeCluster; /* Same cluster as primary */ + /* Other fields like hasMetadata, metadataSynced will take defaults from DefaultNodeMetadata + * (typically true, true for hasMetadata and metadataSynced if it's a new node, + * or might need adjustment based on replica strategy) + * For now, let's assume DefaultNodeMetadata provides suitable defaults for these + * or they will be set by AddNodeMetadata/ActivateNodeList if needed. + * Specifically, hasMetadata is often true, and metadataSynced true after activation. + * Since this replica is inactive, metadata sync status might be less critical initially. + */ + + bool nodeAlreadyExists = false; + bool localOnly = false; /* Propagate change to other workers with metadata */ + + /* + * AddNodeMetadata will take an ExclusiveLock on pg_dist_node. + * It also checks again if the node already exists after acquiring the lock. + */ + int replicaNodeId = AddNodeMetadata(replicaHostname, replicaPort, &nodeMetadata, + &nodeAlreadyExists, localOnly); + + if (nodeAlreadyExists) + { + /* This case should ideally be caught by the FindWorkerNodeAnyCluster check above, + * but AddNodeMetadata does its own check after locking. + * If it already exists and is correctly configured, we might have returned NOTICE above. + * If it exists but is NOT correctly configured as our replica, an ERROR would be more appropriate. + * AddNodeMetadata returns the existing node's ID if it finds one. + * We need to ensure it is the *correct* replica. + */ + WorkerNode *fetchedExistingNode = FindNodeAnyClusterByNodeId(replicaNodeId); + if (fetchedExistingNode != NULL && fetchedExistingNode->nodeisreplica && + fetchedExistingNode->nodeprimarynodeid == primaryWorkerNode->nodeId) + { + ereport(NOTICE, (errmsg("node %s:%d was already correctly registered as a replica for primary %s:%d (nodeid %d)", + replicaHostname, replicaPort, + primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primaryWorkerNode->nodeId))); + /* Intentional fall-through to return replicaNodeId */ + } + else + { + /* This state is less expected if our initial check passed or errored. */ + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("node %s:%d already exists but is not correctly configured as a replica for primary %s:%d", + replicaHostname, replicaPort, primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); + } + } + + TransactionModifiedNodeMetadata = true; + + /* + * Note: Replicas added this way are inactive. + * A separate mechanism or UDF (e.g., citus_activate_replica_node or citus_promote_replica_node) + * would be needed to activate them, potentially after verifying replication lag, etc. + * Activation would typically involve: + * 1. Setting isActive = true. + * 2. Ensuring metadata is synced (hasMetadata=true, metadataSynced=true). + * 3. Potentially other logic like adding to specific scheduler lists. + * For now, citus_add_node_replica just registers it. + */ + + return replicaNodeId; +} + +/* + * citus_remove_replica_node removes an inactive streaming replica node from Citus metadata. + */ +Datum +citus_remove_replica_node(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + text *nodeNameText = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + char *nodeName = text_to_cstring(nodeNameText); + + WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); + + if (workerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("node \"%s:%d\" does not exist", nodeName, nodePort))); + } + + RemoveReplicaNode(workerNode); + + PG_RETURN_VOID(); +} + +/* + * citus_remove_replica_node_with_nodeid removes an inactive streaming replica node from Citus metadata + * using the node's ID. + */ +Datum +citus_remove_replica_node_with_nodeid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + uint32 replicaNodeId = PG_GETARG_INT32(0); + + WorkerNode *replicaNode = FindNodeAnyClusterByNodeId(replicaNodeId); + + if (replicaNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node with ID %d does not exist", replicaNodeId))); + } + RemoveReplicaNode(replicaNode); + + PG_RETURN_VOID(); +} + +static void +RemoveReplicaNode(WorkerNode *replicaNode) +{ + Assert(replicaNode != NULL); + + if (!replicaNode->nodeisreplica) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Node %s:%d (ID %d) is not a replica node. " + "Use citus_remove_node() to remove primary or already promoted nodes.", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + } + + if (replicaNode->isActive) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node %s:%d (ID %d) is marked as active and cannot be removed with this function. " + "This might indicate a promoted replica. Consider using citus_remove_node() if you are sure, " + "or ensure it's properly deactivated if it's an unpromoted replica in an unexpected state.", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + } + + /* + * All checks passed, proceed with removal. + * RemoveNodeFromCluster handles locking, catalog changes, connection closing, and metadata sync. + */ + ereport(NOTICE, (errmsg("Removing inactive replica node %s:%d (ID %d)", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + + RemoveNodeFromCluster(replicaNode->workerName, replicaNode->workerPort); + + /* RemoveNodeFromCluster might set this, but setting it here ensures it's marked for this UDF's transaction. */ + TransactionModifiedNodeMetadata = true; +} /* * SetLockTimeoutLocally sets the lock_timeout to the given value. @@ -1871,7 +2184,7 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort) * FindNodeAnyClusterByNodeId searches pg_dist_node and returns the node with * the nodeId. If the node can't be found returns NULL. */ -static WorkerNode * +WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId) { bool includeNodesFromOtherClusters = true; @@ -2924,6 +3237,10 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum; values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum( nodeMetadata->shouldHaveShards); + values[Anum_pg_dist_node_nodeisreplica - 1] = BoolGetDatum( + nodeMetadata->nodeisreplica); + values[Anum_pg_dist_node_nodeprimarynodeid - 1] = Int32GetDatum( + nodeMetadata->nodeprimarynodeid); Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock); @@ -3014,8 +3331,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) /* * This function can be called before "ALTER TABLE ... ADD COLUMN nodecluster ...", - * therefore heap_deform_tuple() won't set the isNullArray for this column. We - * initialize it true to be safe in that case. + * and other columns. We initialize isNullArray to true to be safe. */ memset(isNullArray, true, sizeof(isNullArray)); @@ -3043,6 +3359,25 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) datumArray[Anum_pg_dist_node_shouldhaveshards - 1]); + /* nodeisreplica and nodeprimarynodeid might be null if row is from older version */ + if (!isNullArray[Anum_pg_dist_node_nodeisreplica - 1]) + { + workerNode->nodeisreplica = DatumGetBool(datumArray[Anum_pg_dist_node_nodeisreplica - 1]); + } + else + { + workerNode->nodeisreplica = false; /* Default value */ + } + + if (!isNullArray[Anum_pg_dist_node_nodeprimarynodeid - 1]) + { + workerNode->nodeprimarynodeid = DatumGetInt32(datumArray[Anum_pg_dist_node_nodeprimarynodeid - 1]); + } + else + { + workerNode->nodeprimarynodeid = 0; /* Default value for InvalidNodeId */ + } + /* * nodecluster column can be missing. In the case of extension creation/upgrade, * master_initialize_node_metadata function is called before the nodecluster @@ -3285,3 +3620,173 @@ SyncNodeMetadata(MetadataSyncContext *context) */ SendOrCollectCommandListToActivatedNodes(context, recreateNodeSnapshotCommandList); } + +/* + * EnsureValidStreamingReplica checks if the given replica is a valid streaming + * replica. It connects to the replica and checks if it is in recovery mode. + * If it is not, it errors out. + * It also checks the system identifier of the replica and the primary + * to ensure they match. + */ +static void +EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode, char* replicaHostname, int replicaPort) +{ + + int connectionFlag = FORCE_NEW_CONNECTION; + MultiConnection *replicaConnection = GetNodeConnection(connectionFlag, replicaHostname, + replicaPort); + + const char *replica_recovery_query = "SELECT pg_is_in_recovery()"; + + int resultCode = SendRemoteCommand(replicaConnection, replica_recovery_query); + + if (resultCode == 0) + { + CloseConnection(replicaConnection); + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to %s:%d to execute pg_is_in_recovery()", + replicaHostname, replicaPort))); + + } + + PGresult *result = GetRemoteCommandResult(replicaConnection, true); + + if (result == NULL) + { + CloseConnection(replicaConnection); + + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("node %s:%d does not support pg_is_in_recovery()", + replicaHostname, replicaPort))); + } + + List *sizeList = ReadFirstColumnAsText(result); + if (list_length(sizeList) != 1) + { + PQclear(result); + ClearResults(replicaConnection, true); + CloseConnection(replicaConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot parse pg_is_in_recovery() result from %s:%d", + replicaHostname, + replicaPort))); + + } + + StringInfo isInRecoveryQueryResInfo = (StringInfo) linitial(sizeList); + char *isInRecoveryQueryResStr = isInRecoveryQueryResInfo->data; + + if (strcmp(isInRecoveryQueryResStr, "t") != 0 && strcmp(isInRecoveryQueryResStr, "true") != 0) + { + PQclear(result); + ClearResults(replicaConnection, true); + CloseConnection(replicaConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("%s:%d is not a streaming replica", + replicaHostname, + replicaPort))); + } + + PQclear(result); + ForgetResults(replicaConnection); + + /* Step2: Get the system identifier from replica */ + const char *sysidQuery = + "SELECT system_identifier FROM pg_control_system()"; + + resultCode = SendRemoteCommand(replicaConnection, sysidQuery); + + if (resultCode == 0) + { + CloseConnection(replicaConnection); + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to %s:%d to get system identifier", + replicaHostname, replicaPort))); + + } + + result = GetRemoteCommandResult(replicaConnection, true); + if (!IsResponseOK(result)) + { + ReportResultError(replicaConnection, result, ERROR); + } + + List *sysidList = ReadFirstColumnAsText(result); + if (list_length(sysidList) != 1) + { + PQclear(result); + ClearResults(replicaConnection, true); + CloseConnection(replicaConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot parse get system identifier result from %s:%d", + replicaHostname, + replicaPort))); + } + + StringInfo sysidQueryResInfo = (StringInfo) linitial(sysidList); + char *sysidQueryResStr = sysidQueryResInfo->data; + + ereport (DEBUG2, (errmsg("system identifier of %s:%d is %s", + replicaHostname, replicaPort, sysidQueryResStr))); + + /* We do not need the connection anymore */ + PQclear(result); + ForgetResults(replicaConnection); + CloseConnection(replicaConnection); + + /* Step3: Get system identifier from primary */ + int primaryConnectionFlag = 0; + MultiConnection *primaryConnection = GetNodeConnection(primaryConnectionFlag, + primaryWorkerNode->workerName, + primaryWorkerNode->workerPort); + int primaryResultCode = SendRemoteCommand(primaryConnection, sysidQuery); + if (primaryResultCode == 0) + { + CloseConnection(primaryConnection); + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to %s:%d to get system identifier", + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); + + } + PGresult *primaryResult = GetRemoteCommandResult(primaryConnection, true); + if (primaryResult == NULL) + { + CloseConnection(primaryConnection); + + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("node %s:%d does not support pg_control_system queries", + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); + } + List *primarySizeList = ReadFirstColumnAsText(primaryResult); + if (list_length(primarySizeList) != 1) + { + PQclear(primaryResult); + ClearResults(primaryConnection, true); + CloseConnection(primaryConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot parse get system identifier result from %s:%d", + primaryWorkerNode->workerName, + primaryWorkerNode->workerPort))); + + } + StringInfo primarySysidQueryResInfo = (StringInfo) linitial(primarySizeList); + char *primarySysidQueryResStr = primarySysidQueryResInfo->data; + + ereport (DEBUG2, (errmsg("system identifier of %s:%d is %s", + primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primarySysidQueryResStr))); + /* verify both identifiers */ + if (strcmp(sysidQueryResStr, primarySysidQueryResStr) != 0) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("system identifiers do not match: %s (replica) vs %s (primary)", + sysidQueryResStr, primarySysidQueryResStr))); + } + PQclear(primaryResult); + ClearResults(primaryConnection, true); + CloseConnection(primaryConnection); +} + diff --git a/src/backend/distributed/operations/node_promotion.c b/src/backend/distributed/operations/node_promotion.c new file mode 100644 index 000000000..5d9934bde --- /dev/null +++ b/src/backend/distributed/operations/node_promotion.c @@ -0,0 +1,380 @@ +#include "postgres.h" +#include "utils/fmgrprotos.h" +#include "utils/pg_lsn.h" + +#include "distributed/argutils.h" +#include "distributed/remote_commands.h" +#include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" +#include "distributed/shard_rebalancer.h" + + +static int64 GetReplicationLag(WorkerNode *primaryWorkerNode, WorkerNode *replicaWorkerNode); +static void BlockAllWritesToWorkerNode(WorkerNode *workerNode); +static bool GetNodeIsInRecoveryStatus(WorkerNode *workerNode); +static void PromoteReplicaNode(WorkerNode *replicaWorkerNode); + + +PG_FUNCTION_INFO_V1(citus_promote_replica_and_rebalance); + +Datum +citus_promote_replica_and_rebalance(PG_FUNCTION_ARGS) +{ + // Ensure superuser and coordinator + EnsureSuperUser(); + EnsureCoordinator(); + + // Get replica_nodeid argument + int32 replicaNodeIdArg = PG_GETARG_INT32(0); + + WorkerNode *replicaNode = NULL; + WorkerNode *primaryNode = NULL; + + // Lock pg_dist_node to prevent concurrent modifications during this operation + LockRelationOid(DistNodeRelationId(), RowExclusiveLock); + + replicaNode = FindNodeAnyClusterByNodeId(replicaNodeIdArg); + if (replicaNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node with ID %d not found.", replicaNodeIdArg))); + } + + if (!replicaNode->nodeisreplica || replicaNode->nodeprimarynodeid == 0) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Node %s:%d (ID %d) is not a valid replica or its primary node ID is not set.", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + } + + if (replicaNode->isActive) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node %s:%d (ID %d) is already active and cannot be promoted.", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + } + + primaryNode = FindNodeAnyClusterByNodeId(replicaNode->nodeprimarynodeid); + if (primaryNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Primary node with ID %d (for replica %s:%d) not found.", + replicaNode->nodeprimarynodeid, replicaNode->workerName, replicaNode->workerPort))); + } + + if (primaryNode->nodeisreplica) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Primary node %s:%d (ID %d) is itself a replica.", + primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId))); + } + + if (!primaryNode->isActive) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Primary node %s:%d (ID %d) is not active.", + primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId))); + } + /* Ensure the primary node is related to the replica node */ + if (primaryNode->nodeId != replicaNode->nodeprimarynodeid) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node %s:%d (ID %d) is not replica of the primary node %s:%d (ID %d).", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId, + primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId))); + } + + ereport(NOTICE, (errmsg("Starting promotion process for replica node %s:%d (ID %d), original primary %s:%d (ID %d)", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId, + primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId))); + + /* Step 1: Block Writes on Original Primary's Shards */ + ereport(NOTICE, (errmsg("Blocking writes on shards of original primary node %s:%d (group %d)", + primaryNode->workerName, primaryNode->workerPort, primaryNode->groupId))); + + BlockAllWritesToWorkerNode(primaryNode); + + /* Step 2: Wait for Replica to Catch Up */ + ereport(NOTICE, (errmsg("Waiting for replica %s:%d to catch up with primary %s:%d", + replicaNode->workerName, replicaNode->workerPort, + primaryNode->workerName, primaryNode->workerPort))); + + bool caughtUp = false; + const int catchUpTimeoutSeconds = 300; // 5 minutes, TODO: Make GUC + const int sleepIntervalSeconds = 5; + int elapsedTimeSeconds = 0; + + while (elapsedTimeSeconds < catchUpTimeoutSeconds) + { + uint64 repLag = GetReplicationLag(primaryNode, replicaNode); + if (repLag <= 0) + { + caughtUp = true; + break; + } + pg_usleep(sleepIntervalSeconds * 1000000L); + elapsedTimeSeconds += sleepIntervalSeconds; + } + + if (!caughtUp) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica %s:%d failed to catch up with primary %s:%d within %d seconds.", + replicaNode->workerName, replicaNode->workerPort, + primaryNode->workerName, primaryNode->workerPort, + catchUpTimeoutSeconds))); + } + + ereport(NOTICE, (errmsg("Replica %s:%d is now caught up with primary %s:%d.", + replicaNode->workerName, replicaNode->workerPort, + primaryNode->workerName, primaryNode->workerPort))); + + + + /* Step 3: PostgreSQL Replica Promotion */ + ereport(NOTICE, (errmsg("Attempting to promote replica %s:%d via pg_promote().", + replicaNode->workerName, replicaNode->workerPort))); + + PromoteReplicaNode(replicaNode); + + /* Step 4: Update Replica Metadata in pg_dist_node on Coordinator */ + + ereport(NOTICE, (errmsg("Updating metadata for promoted replica %s:%d (ID %d)", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + ActivateReplicaNodeAsPrimary(replicaNode); + + /* We need to sync metadata changes to all nodes before rebalancing shards + * since the rebalancing algorithm depends on the latest metadata. + */ + SyncNodeMetadataToNodes(); + + /* Step 5: Split Shards Between Primary and Replica */ + SplitShardsBetweenPrimaryAndReplica(primaryNode, replicaNode, PG_GETARG_NAME_OR_NULL(1)); + + + TransactionModifiedNodeMetadata = true; // Inform Citus about metadata change + TriggerNodeMetadataSyncOnCommit(); // Ensure changes are propagated + + + + ereport(NOTICE, (errmsg("Replica node %s:%d (ID %d) metadata updated. It is now a primary", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + + + + /* TODO: Step 6: Unblock Writes (should be handled by transaction commit) */ + ereport(NOTICE, (errmsg("TODO: Step 6: Unblock Writes"))); + + PG_RETURN_VOID(); +} + + +/* + * GetReplicationLag calculates the replication lag between the primary and replica nodes. + * It returns the lag in bytes. + */ +static int64 +GetReplicationLag(WorkerNode *primaryWorkerNode, WorkerNode *replicaWorkerNode) +{ + +#if PG_VERSION_NUM >= 100000 + const char *primary_lsn_query = "SELECT pg_current_wal_lsn()"; + const char *replica_lsn_query = "SELECT pg_last_wal_replay_lsn()"; +#else + const char *primary_lsn_query = "SELECT pg_current_xlog_location()"; + const char *replica_lsn_query = "SELECT pg_last_xlog_replay_location()"; +#endif + + int connectionFlag = 0; + MultiConnection *primaryConnection = GetNodeConnection(connectionFlag, + primaryWorkerNode->workerName, + primaryWorkerNode->workerPort); + if (PQstatus(primaryConnection->pgConn) != CONNECTION_OK) + { + ereport(ERROR, (errmsg("cannot connect to %s:%d to fetch replication status", + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); + } + MultiConnection *replicaConnection = GetNodeConnection(connectionFlag, + replicaWorkerNode->workerName, + replicaWorkerNode->workerPort); + + if (PQstatus(replicaConnection->pgConn) != CONNECTION_OK) + { + ereport(ERROR, (errmsg("cannot connect to %s:%d to fetch replication status", + replicaWorkerNode->workerName, replicaWorkerNode->workerPort))); + } + + int primaryResultCode = SendRemoteCommand(primaryConnection, primary_lsn_query); + if (primaryResultCode == 0) + { + ReportConnectionError(primaryConnection, ERROR); + } + + PGresult *primaryResult = GetRemoteCommandResult(primaryConnection, true); + if (!IsResponseOK(primaryResult)) + { + ReportResultError(primaryConnection, primaryResult, ERROR); + } + + int replicaResultCode = SendRemoteCommand(replicaConnection, replica_lsn_query); + if (replicaResultCode == 0) + { + ReportConnectionError(replicaConnection, ERROR); + } + PGresult *replicaResult = GetRemoteCommandResult(replicaConnection, true); + if (!IsResponseOK(replicaResult)) + { + ReportResultError(replicaConnection, replicaResult, ERROR); + } + + + List *primaryLsnList = ReadFirstColumnAsText(primaryResult); + if (list_length(primaryLsnList) != 1) + { + PQclear(primaryResult); + ClearResults(primaryConnection, true); + CloseConnection(primaryConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot parse get primary LSN result from %s:%d", + primaryWorkerNode->workerName, + primaryWorkerNode->workerPort))); + + } + StringInfo primaryLsnQueryResInfo = (StringInfo) linitial(primaryLsnList); + char *primary_lsn_str = primaryLsnQueryResInfo->data; + + List *replicaLsnList = ReadFirstColumnAsText(replicaResult); + if (list_length(replicaLsnList) != 1) + { + PQclear(replicaResult); + ClearResults(replicaConnection, true); + CloseConnection(replicaConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot parse get replica LSN result from %s:%d", + replicaWorkerNode->workerName, + replicaWorkerNode->workerPort))); + + } + StringInfo replicaLsnQueryResInfo = (StringInfo) linitial(replicaLsnList); + char *replica_lsn_str = replicaLsnQueryResInfo->data; + + if (!primary_lsn_str || !replica_lsn_str) + return -1; + + int64 primary_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(primary_lsn_str))); + int64 replica_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(replica_lsn_str))); + + int64 lag_bytes = primary_lsn - replica_lsn; + + PQclear(primaryResult); + ForgetResults(primaryConnection); + CloseConnection(primaryConnection); + + PQclear(replicaResult); + ForgetResults(replicaConnection); + CloseConnection(replicaConnection); + + ereport(NOTICE, (errmsg("replication lag between %s:%d and %s:%d is %ld bytes", + primaryWorkerNode->workerName, primaryWorkerNode->workerPort, + replicaWorkerNode->workerName, replicaWorkerNode->workerPort, + lag_bytes))); + return lag_bytes; +} + +static void +PromoteReplicaNode(WorkerNode *replicaWorkerNode) +{ + int connectionFlag = 0; + MultiConnection *replicaConnection = GetNodeConnection(connectionFlag, + replicaWorkerNode->workerName, + replicaWorkerNode->workerPort); + + if (PQstatus(replicaConnection->pgConn) != CONNECTION_OK) + { + ereport(ERROR, (errmsg("cannot connect to %s:%d to promote replica", + replicaWorkerNode->workerName, replicaWorkerNode->workerPort))); + } + + const char *promoteQuery = "SELECT pg_promote(wait := true);"; + int resultCode = SendRemoteCommand(replicaConnection, promoteQuery); + if (resultCode == 0) + { + ReportConnectionError(replicaConnection, ERROR); + } + ForgetResults(replicaConnection); + CloseConnection(replicaConnection); + /* connect again and verify the replica is promoted */ + if ( GetNodeIsInRecoveryStatus(replicaWorkerNode) ) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Failed to promote replica %s:%d (ID %d). It is still in recovery.", + replicaWorkerNode->workerName, replicaWorkerNode->workerPort, replicaWorkerNode->nodeId))); + } + else + { + ereport(NOTICE, (errmsg("Replica node %s:%d (ID %d) has been successfully promoted.", + replicaWorkerNode->workerName, replicaWorkerNode->workerPort, replicaWorkerNode->nodeId))); + } +} + +static void +BlockAllWritesToWorkerNode(WorkerNode *workerNode) +{ + ereport(NOTICE, (errmsg("Blocking all writes to worker node %s:%d (ID %d)", + workerNode->workerName, workerNode->workerPort, workerNode->nodeId))); + // List *placementsOnOldPrimaryGroup = AllShardPlacementsOnNodeGroup(workerNode->groupId); + + LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock); +} + +bool +GetNodeIsInRecoveryStatus(WorkerNode *workerNode) +{ + int connectionFlag = 0; + MultiConnection *nodeConnection = GetNodeConnection(connectionFlag, + workerNode->workerName, + workerNode->workerPort); + + if (PQstatus(nodeConnection->pgConn) != CONNECTION_OK) + { + ereport(ERROR, (errmsg("cannot connect to %s:%d to check recovery status", + workerNode->workerName, workerNode->workerPort))); + } + + const char *recoveryQuery = "SELECT pg_is_in_recovery();"; + int resultCode = SendRemoteCommand(nodeConnection, recoveryQuery); + if (resultCode == 0) + { + ReportConnectionError(nodeConnection, ERROR); + } + + PGresult *result = GetRemoteCommandResult(nodeConnection, true); + if (!IsResponseOK(result)) + { + ReportResultError(nodeConnection, result, ERROR); + } + + List *recoveryStatusList = ReadFirstColumnAsText(result); + if (list_length(recoveryStatusList) != 1) + { + PQclear(result); + ClearResults(nodeConnection, true); + CloseConnection(nodeConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot parse recovery status result from %s:%d", + workerNode->workerName, + workerNode->workerPort))); + } + + StringInfo recoveryStatusInfo = (StringInfo) linitial(recoveryStatusList); + bool isInRecovery = (strcmp(recoveryStatusInfo->data, "t") == 0) || (strcmp(recoveryStatusInfo->data, "true") == 0); + + PQclear(result); + ForgetResults(nodeConnection); + CloseConnection(nodeConnection); + + return isInRecovery; +} \ No newline at end of file diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 074f1bed0..20dc7299e 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -81,9 +81,27 @@ typedef struct RebalanceOptions Form_pg_dist_rebalance_strategy rebalanceStrategy; const char *operationName; WorkerNode *workerNode; + List *involvedWorkerNodeList; } RebalanceOptions; +typedef struct SplitPrimaryReplicaShards +{ + /* + * primaryShardPlacementList contains the placements that + * should stay on primary worker node. + */ + List *primaryShardIdList; + /* + * replicaShardPlacementList contains the placements that should stay on + * replica worker node. + */ + List *replicaShardIdList; +} SplitPrimaryReplicaShards; + + +static SplitPrimaryReplicaShards * +GetPrimaryReplicaSplitRebalanceSteps(RebalanceOptions *options, WorkerNode* replicaNode); /* * RebalanceState is used to keep the internal state of the rebalance * algorithm in one place. @@ -318,6 +336,7 @@ PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check); PG_FUNCTION_INFO_V1(citus_rebalance_start); PG_FUNCTION_INFO_V1(citus_rebalance_stop); PG_FUNCTION_INFO_V1(citus_rebalance_wait); +PG_FUNCTION_INFO_V1(get_snapshot_based_node_split_plan); bool RunningUnderCitusTestSuite = false; int MaxRebalancerLoggedIgnoredMoves = 5; @@ -517,8 +536,16 @@ GetRebalanceSteps(RebalanceOptions *options) .context = &context, }; + if (options->involvedWorkerNodeList == NULL) + { + /* + * If the user did not specify a list of worker nodes, we use all the + * active worker nodes. + */ + options->involvedWorkerNodeList = SortedActiveWorkers(); + } /* sort the lists to make the function more deterministic */ - List *activeWorkerList = SortedActiveWorkers(); + List *activeWorkerList = options->involvedWorkerNodeList; //SortedActiveWorkers(); int shardAllowedNodeCount = 0; WorkerNode *workerNode = NULL; foreach_declared_ptr(workerNode, activeWorkerList) @@ -981,6 +1008,7 @@ rebalance_table_shards(PG_FUNCTION_ARGS) .excludedShardArray = PG_GETARG_ARRAYTYPE_P(3), .drainOnly = PG_GETARG_BOOL(5), .rebalanceStrategy = strategy, + .involvedWorkerNodeList = NULL, .improvementThreshold = strategy->improvementThreshold, }; Oid shardTransferModeOid = PG_GETARG_OID(4); @@ -3546,6 +3574,342 @@ EnsureShardCostUDF(Oid functionOid) ReleaseSysCache(proctup); } +/* + * SplitShardsBetweenPrimaryAndReplica splits the shards in shardPlacementList + * between the primary and replica nodes, adding them to the respective lists. + */ +void +SplitShardsBetweenPrimaryAndReplica(WorkerNode *primaryNode, + WorkerNode *replicaNode, + Name strategyName) +{ + CheckCitusVersion(ERROR); + + List *relationIdList = NIL; + relationIdList = NonColocatedDistRelationIdList(); + + Form_pg_dist_rebalance_strategy strategy = GetRebalanceStrategy(strategyName);/* We use default strategy for now */ + + RebalanceOptions options = { + .relationIdList = relationIdList, + .threshold = 0, /* Threshold is not strictly needed for two nodes */ + .maxShardMoves = -1, /* No limit on moves between these two nodes */ + .excludedShardArray = construct_empty_array(INT8OID), + .drainOnly = false, /* Not a drain operation */ + .rebalanceStrategy = strategy, + .improvementThreshold = 0, /* Consider all beneficial moves */ + .workerNode = primaryNode /* indicate Primary node as a source node */ + }; + + SplitPrimaryReplicaShards *splitShards = NULL; + splitShards = GetPrimaryReplicaSplitRebalanceSteps(&options, replicaNode); + AdjustShardsForPrimaryReplicaNodeSplit(primaryNode, replicaNode, + splitShards->primaryShardIdList, splitShards->replicaShardIdList); +} + +/* + * GetPrimaryReplicaSplitRebalanceSteps returns a List of PlacementUpdateEvents that are needed to + * rebalance a list of tables. + */ +static SplitPrimaryReplicaShards * +GetPrimaryReplicaSplitRebalanceSteps(RebalanceOptions *options, WorkerNode* replicaNode) +{ + WorkerNode *sourceNode = options->workerNode; + WorkerNode *targetNode = replicaNode; + + /* Initialize rebalance plan functions and context */ + EnsureShardCostUDF(options->rebalanceStrategy->shardCostFunction); + EnsureNodeCapacityUDF(options->rebalanceStrategy->nodeCapacityFunction); + EnsureShardAllowedOnNodeUDF(options->rebalanceStrategy->shardAllowedOnNodeFunction); + + RebalanceContext context; + memset(&context, 0, sizeof(RebalanceContext)); + fmgr_info(options->rebalanceStrategy->shardCostFunction, &context.shardCostUDF); + fmgr_info(options->rebalanceStrategy->nodeCapacityFunction, &context.nodeCapacityUDF); + fmgr_info(options->rebalanceStrategy->shardAllowedOnNodeFunction, + &context.shardAllowedOnNodeUDF); + + RebalancePlanFunctions rebalancePlanFunctions = { + .shardAllowedOnNode = ShardAllowedOnNode, + .nodeCapacity = NodeCapacity, + .shardCost = GetShardCost, + .context = &context, + }; + + /* + * Collect all active shard placements on the source node for the given relations. + * Unlike the main rebalancer, we build a single list of all relevant source placements + * across all specified relations (or all relations if none specified). + */ + List *allSourcePlacements = NIL; + Oid relationIdItr = InvalidOid; + foreach_declared_oid(relationIdItr, options->relationIdList) + { + List *shardPlacementList = FullShardPlacementList(relationIdItr, + options->excludedShardArray); + List *activeShardPlacementsForRelation = + FilterShardPlacementList(shardPlacementList, IsActiveShardPlacement); + + ShardPlacement *placement = NULL; + foreach_declared_ptr(placement, activeShardPlacementsForRelation) + { + if (placement->nodeId == sourceNode->nodeId) + { + /* Ensure we don't add duplicate shardId if it's somehow listed under multiple relations */ + bool alreadyAdded = false; + ShardPlacement *existingPlacement = NULL; + foreach_declared_ptr(existingPlacement, allSourcePlacements) + { + if (existingPlacement->shardId == placement->shardId) + { + alreadyAdded = true; + break; + } + } + if (!alreadyAdded) + { + allSourcePlacements = lappend(allSourcePlacements, placement); + } + } + } + } + + List *activeWorkerList = list_make2(options->workerNode, replicaNode); + SplitPrimaryReplicaShards *splitShards = palloc0(sizeof(SplitPrimaryReplicaShards)); + splitShards->primaryShardIdList = NIL; + splitShards->replicaShardIdList = NIL; + + if (list_length(allSourcePlacements) > 0) + { + /* + * Initialize RebalanceState considering only the source node's shards + * and the two active workers (source and target). + */ + RebalanceState *state = InitRebalanceState(activeWorkerList, allSourcePlacements, &rebalancePlanFunctions); + + NodeFillState *sourceFillState = NULL; + NodeFillState *targetFillState = NULL; + ListCell *fsc = NULL; + + /* Identify the fill states for our specific source and target nodes */ + foreach(fsc, state->fillStateListAsc) /* Could be fillStateListDesc too, order doesn't matter here */ + { + NodeFillState *fs = (NodeFillState *) lfirst(fsc); + if (fs->node->nodeId == sourceNode->nodeId) + { + sourceFillState = fs; + } + else if (fs->node->nodeId == targetNode->nodeId) + { + targetFillState = fs; + } + } + + if (sourceFillState != NULL && targetFillState != NULL) + { + /* + * The goal is to move roughly half the total cost from source to target. + * The target node is assumed to be empty or its existing load is not + * considered for this specific two-node balancing plan's shard distribution. + * We calculate costs based *only* on the shards currently on the source node. + */ + /* + * The core idea is to simulate the balancing process between these two nodes. + * We have all shards on sourceFillState. TargetFillState is initially empty (in terms of these specific shards). + * We want to move shards from source to target until their costs are as balanced as possible. + */ + float4 sourceCurrentCost = sourceFillState->totalCost; + float4 targetCurrentCost = 0; /* Representing cost on target from these source shards */ + + /* Sort shards on source node by cost (descending). This is a common heuristic. */ + sourceFillState->shardCostListDesc = SortList(sourceFillState->shardCostListDesc, CompareShardCostDesc); + + List *potentialMoves = NIL; + ListCell *lc_shardcost = NULL; + + /* + * Iterate through each shard on the source node. For each shard, decide if moving it + * to the target node would improve the balance (or is necessary to reach balance). + * A simple greedy approach: move shard if target node's current cost is less than source's. + */ + foreach(lc_shardcost, sourceFillState->shardCostListDesc) + { + ShardCost *shardToConsider = (ShardCost *) lfirst(lc_shardcost); + + /* Check if shard is allowed on the target node */ + // if (!state->functions->shardAllowedOnNode(shardToConsider->shardId, + // targetNode, + // state->functions->context)) + // { + // splitShards->primaryShardIdList = lappend_int(splitShards->primaryShardIdList, shardToConsider->shardId); + // continue; /* Cannot move this shard to the target */ + // } + + /* + * If moving this shard makes the target less loaded than the source would become, + * or if target is simply less loaded currently, consider the move. + * More accurately, we move if target's cost + shard's cost < source's cost - shard's cost (approximately) + * or if target is significantly emptier. + * The condition (targetCurrentCost < sourceCurrentCost - shardToConsider->cost) is a greedy choice. + * A better check: would moving this shard reduce the difference in costs? + * Current difference: abs(sourceCurrentCost - targetCurrentCost) + * Difference after move: abs((sourceCurrentCost - shardToConsider->cost) - (targetCurrentCost + shardToConsider->cost)) + * Move if new difference is smaller. + */ + float4 costOfShard = shardToConsider->cost; + float4 diffBefore = fabsf(sourceCurrentCost - targetCurrentCost); + float4 diffAfter = fabsf((sourceCurrentCost - costOfShard) - (targetCurrentCost + costOfShard)); + + if (diffAfter < diffBefore) + { + PlacementUpdateEvent *update = palloc0(sizeof(PlacementUpdateEvent)); + update->shardId = shardToConsider->shardId; + update->sourceNode = sourceNode; + update->targetNode = targetNode; + update->updateType = PLACEMENT_UPDATE_MOVE; + potentialMoves = lappend(potentialMoves, update); + splitShards->replicaShardIdList = lappend_int(splitShards->replicaShardIdList, shardToConsider->shardId); + + + /* Update simulated costs for the next iteration */ + sourceCurrentCost -= costOfShard; + targetCurrentCost += costOfShard; + } + else + { + splitShards->primaryShardIdList = lappend_int(splitShards->primaryShardIdList, shardToConsider->shardId); + } + } + } + /* RebalanceState is in memory context, will be cleaned up */ + } + return splitShards; +} + +/* + * Snapshot-based node split plan outputs the shard placement plan + * for primary and replica based node split + * + * SQL signature: + * get_snapshot_based_node_split_plan( + * primary_node_name text, + * primary_node_port integer, + * replica_node_name text, + * replica_node_port integer, + * rebalance_strategy name DEFAULT NULL + * + */ +Datum +get_snapshot_based_node_split_plan(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + text *primaryNodeNameText = PG_GETARG_TEXT_P(0); + int32 primaryNodePort = PG_GETARG_INT32(1); + text *replicaNodeNameText = PG_GETARG_TEXT_P(2); + int32 replicaNodePort = PG_GETARG_INT32(3); + + char *primaryNodeName = text_to_cstring(primaryNodeNameText); + char *replicaNodeName = text_to_cstring(replicaNodeNameText); + + WorkerNode *primaryNode = FindWorkerNodeOrError(primaryNodeName, primaryNodePort); + WorkerNode *replicaNode = FindWorkerNodeOrError(replicaNodeName, replicaNodePort); + + if (!replicaNode->nodeisreplica || replicaNode->nodeprimarynodeid == 0) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Node %s:%d (ID %d) is not a valid replica or its primary node ID is not set.", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId))); + } + if (primaryNode->nodeisreplica) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Primary node %s:%d (ID %d) is itself a replica.", + primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId))); + } + /* Ensure the primary node is related to the replica node */ + if (primaryNode->nodeId != replicaNode->nodeprimarynodeid) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Replica node %s:%d (ID %d) is not replica of the primary node %s:%d (ID %d).", + replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId, + primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId))); + } + + List *relationIdList = NIL; + relationIdList = NonColocatedDistRelationIdList(); + + Form_pg_dist_rebalance_strategy strategy = GetRebalanceStrategy( + PG_GETARG_NAME_OR_NULL(4)); + + RebalanceOptions options = { + .relationIdList = relationIdList, + .threshold = 0, /* Threshold is not strictly needed for two nodes */ + .maxShardMoves = -1, /* No limit on moves between these two nodes */ + .excludedShardArray = construct_empty_array(INT8OID), + .drainOnly = false, /* Not a drain operation */ + .rebalanceStrategy = strategy, + .improvementThreshold = 0, /* Consider all beneficial moves */ + .workerNode = primaryNode /* indicate Primary node as a source node */ + }; + + SplitPrimaryReplicaShards *splitShards = NULL; + splitShards = GetPrimaryReplicaSplitRebalanceSteps(&options, replicaNode); + + if (splitShards == NULL) + { + ereport(ERROR, (errmsg("No shards to split between primary and replica nodes."))); + } + + int shardId = 0; + TupleDesc tupdesc; + Tuplestorestate *tupstore = SetupTuplestore(fcinfo, &tupdesc); + Datum values[4]; + bool nulls[4]; + + + foreach_declared_int(shardId, splitShards->primaryShardIdList) + { + ShardInterval *shardInterval = LoadShardInterval(shardId); + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + ListCell *colocatedShardCell = NULL; + foreach(colocatedShardCell, colocatedShardList) + { + ShardInterval *colocatedShard = lfirst(colocatedShardCell); + int colocatedShardId = colocatedShard->shardId; + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = ObjectIdGetDatum(RelationIdForShard(colocatedShardId)); + values[1] = UInt64GetDatum(colocatedShardId); + values[2] = UInt64GetDatum(ShardLength(colocatedShardId)); + values[3] = PointerGetDatum(cstring_to_text("Primary Node")); + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + } + + foreach_declared_int(shardId, splitShards->replicaShardIdList) + { + ShardInterval *shardInterval = LoadShardInterval(shardId); + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + ListCell *colocatedShardCell = NULL; + foreach(colocatedShardCell, colocatedShardList) + { + ShardInterval *colocatedShard = lfirst(colocatedShardCell); + int colocatedShardId = colocatedShard->shardId; + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = ObjectIdGetDatum(RelationIdForShard(colocatedShardId)); + values[1] = UInt64GetDatum(colocatedShardId); + values[2] = UInt64GetDatum(ShardLength(colocatedShardId)); + values[3] = PointerGetDatum(cstring_to_text("Replica Node")); + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + } + + return (Datum) 0; +} /* * EnsureNodeCapacityUDF checks that the UDF matching the oid has the correct diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index b7d07b2cf..3922b862e 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -573,6 +573,44 @@ TransferShards(int64 shardId, char *sourceNodeName, FinalizeCurrentProgressMonitor(); } +/* + * AdjustShardsForPrimaryReplicaNodeSplit is called when a primary-replica node split + * occurs. It adjusts the shard placements such that the shards that should be on the + * primary node are removed from the replica node, and vice versa. + * + * This function does not move any data; it only updates the shard placement metadata. + */ +void +AdjustShardsForPrimaryReplicaNodeSplit(WorkerNode *primaryNode, + WorkerNode *replicaNode, + List* primaryShardList, + List* replicaShardList) +{ + int shardId = 0; + /* + * Remove all shards from the replica that should reside on the primary node, + * and update the shard placement metadata for shards that will now be served + * from the replica node. No data movement is required; we only need to drop + * the relevant shards from the replica and primary nodes and update the + * corresponding shard placement metadata. + */ + foreach_declared_int(shardId, primaryShardList) + { + ShardInterval *shardInterval = LoadShardInterval(shardId); + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + /* TODO: Drops shard table here */ + } + /* Now drop all shards from primary that need to be on the replica node */ + foreach_declared_int(shardId, replicaShardList) + { + ShardInterval *shardInterval = LoadShardInterval(shardId); + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + UpdateColocatedShardPlacementMetadataOnWorkers(shardId, + primaryNode->workerName, primaryNode->workerPort, + replicaNode->workerName, replicaNode->workerPort); + /* TODO: Drop the not required table on primary here */ + } +} /* * Insert deferred cleanup records. diff --git a/src/backend/distributed/sql/cat_upgrades/add_replica_info_to_pg_dist_node.sql b/src/backend/distributed/sql/cat_upgrades/add_replica_info_to_pg_dist_node.sql new file mode 100644 index 000000000..8ca8ae239 --- /dev/null +++ b/src/backend/distributed/sql/cat_upgrades/add_replica_info_to_pg_dist_node.sql @@ -0,0 +1,7 @@ +-- Add replica information columns to pg_dist_node +ALTER TABLE pg_catalog.pg_dist_node ADD COLUMN nodeisreplica BOOLEAN NOT NULL DEFAULT FALSE; +ALTER TABLE pg_catalog.pg_dist_node ADD COLUMN nodeprimarynodeid INT4 NOT NULL DEFAULT 0; + +-- Add a comment to the table and columns for clarity in \d output +COMMENT ON COLUMN pg_catalog.pg_dist_node.nodeisreplica IS 'Indicates if this node is a replica of another node.'; +COMMENT ON COLUMN pg_catalog.pg_dist_node.nodeprimarynodeid IS 'If nodeisreplica is true, this stores the nodeid of its primary node.'; diff --git a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql index 1599b21f1..009dc2dd2 100644 --- a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql +++ b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql @@ -52,6 +52,11 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits; #include "udfs/citus_stat_counters_reset/13.1-1.sql" #include "udfs/citus_nodes/13.1-1.sql" +#include "cat_upgrades/add_replica_info_to_pg_dist_node.sql" +#include "udfs/citus_add_replica_node/13.1-1.sql" +#include "udfs/citus_remove_replica_node/13.1-1.sql" +#include "udfs/citus_promote_replica_and_rebalance/13.1-1.sql" +#include "udfs/get_snapshot_based_node_split_plan/13.1-1.sql" -- Since shard_name/13.1-1.sql first drops the function and then creates it, we first -- need to drop citus_shards view since that view depends on this function. And immediately -- after creating the function, we recreate citus_shards view again. diff --git a/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql new file mode 100644 index 000000000..9530e54c2 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql @@ -0,0 +1,26 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_add_replica_node( + replica_hostname text, + replica_port integer, + primary_hostname text, + primary_port integer) + RETURNS INTEGER + LANGUAGE C VOLATILE STRICT + AS 'MODULE_PATHNAME', $$citus_add_replica_node$$; + +COMMENT ON FUNCTION pg_catalog.citus_add_replica_node(text, integer, text, integer) IS +'Adds a new node as a replica of an existing primary node. The replica is initially inactive. Returns the nodeid of the new replica node.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_add_replica_node(text, int, text, int) FROM PUBLIC; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_add_replica_node_with_nodeid( + replica_hostname text, + replica_port integer, + primary_nodeid integer) + RETURNS INTEGER + LANGUAGE C VOLATILE STRICT + AS 'MODULE_PATHNAME', $$citus_add_replica_node_with_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_add_replica_node_with_nodeid(text, integer, integer) IS +'Adds a new node as a replica of an existing primary node using the primary node''s ID. The replica is initially inactive. Returns the nodeid of the new replica node.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_add_replica_node_with_nodeid(text, int, int) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_promote_replica_and_rebalance/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_promote_replica_and_rebalance/13.1-1.sql new file mode 100644 index 000000000..274e1f727 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_promote_replica_and_rebalance/13.1-1.sql @@ -0,0 +1,12 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_promote_replica_and_rebalance( + replica_nodeid integer, + rebalance_strategy name DEFAULT NULL +) +RETURNS VOID +AS 'MODULE_PATHNAME' +LANGUAGE C VOLATILE; + +COMMENT ON FUNCTION pg_catalog.citus_promote_replica_and_rebalance(integer, name) IS +'Promotes a registered replica node to a primary, performs necessary metadata updates, and rebalances a portion of shards from its original primary to the newly promoted node.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_promote_replica_and_rebalance(integer, name) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_remove_replica_node/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_remove_replica_node/13.1-1.sql new file mode 100644 index 000000000..68b957681 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_remove_replica_node/13.1-1.sql @@ -0,0 +1,24 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_remove_replica_node( + nodename text, + nodeport integer +) +RETURNS VOID +LANGUAGE C VOLATILE STRICT +AS 'MODULE_PATHNAME', $$citus_remove_replica_node$$; + +COMMENT ON FUNCTION pg_catalog.citus_remove_replica_node(text, integer) +IS 'Removes an inactive streaming replica node from Citus metadata. Errors if the node is not found, not registered as a replica, or is currently marked active.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_remove_replica_node(text, integer) FROM PUBLIC; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid( + nodeid integer +) +RETURNS VOID +LANGUAGE C VOLATILE STRICT +AS 'MODULE_PATHNAME', $$citus_remove_replica_node_with_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid(integer) +IS 'Removes an inactive streaming replica node from Citus metadata using its node ID. Errors if the node is not found, not registered as a replica, or is currently marked active.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid(integer) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/get_snapshot_based_node_split_plan/13.1-1.sql b/src/backend/distributed/sql/udfs/get_snapshot_based_node_split_plan/13.1-1.sql new file mode 100644 index 000000000..f2d294315 --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_snapshot_based_node_split_plan/13.1-1.sql @@ -0,0 +1,18 @@ +CREATE OR REPLACE FUNCTION pg_catalog.get_snapshot_based_node_split_plan( + primary_node_name text, + primary_node_port integer, + replica_node_name text, + replica_node_port integer, + rebalance_strategy name DEFAULT NULL + ) + RETURNS TABLE (table_name regclass, + shardid bigint, + shard_size bigint, + placement_node text) + AS 'MODULE_PATHNAME' + LANGUAGE C VOLATILE; + +COMMENT ON FUNCTION pg_catalog.get_snapshot_based_node_split_plan(text, int, text, int, name) + IS 'shows the shard placements to balance shards between primary and replica worker nodes'; + +REVOKE ALL ON FUNCTION pg_catalog.get_snapshot_based_node_split_plan(text, int, text, int, name) FROM PUBLIC; diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index f1120497b..479a0af8b 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -298,6 +298,7 @@ extern Oid CitusDependentObjectFuncId(void); /* enum oids */ extern Oid PrimaryNodeRoleId(void); extern Oid SecondaryNodeRoleId(void); +extern Oid UnavailableNodeRoleId(void); extern Oid CitusCopyFormatTypeId(void); extern Oid TextCopyFormatId(void); extern Oid BinaryCopyFormatId(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 38c13eb51..fd146b576 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -466,4 +466,7 @@ extern bool IsBackgroundJobStatusTerminal(BackgroundJobStatus status); extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status); extern Oid BackgroundJobStatusOid(BackgroundJobStatus status); extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status); +/* from node_metadata.c */ +extern void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode); +extern void ActivateReplicaNodeAsPrimary(WorkerNode *workerNode); #endif /* METADATA_UTILITY_H */ diff --git a/src/include/distributed/pg_dist_node.h b/src/include/distributed/pg_dist_node.h index 371c2a35a..de23a4255 100644 --- a/src/include/distributed/pg_dist_node.h +++ b/src/include/distributed/pg_dist_node.h @@ -20,7 +20,7 @@ * in particular their OUT parameters) must be changed whenever the definition of * pg_dist_node changes. */ -#define Natts_pg_dist_node 11 +#define Natts_pg_dist_node 13 #define Anum_pg_dist_node_nodeid 1 #define Anum_pg_dist_node_groupid 2 #define Anum_pg_dist_node_nodename 3 @@ -32,6 +32,8 @@ #define Anum_pg_dist_node_nodecluster 9 #define Anum_pg_dist_node_metadatasynced 10 #define Anum_pg_dist_node_shouldhaveshards 11 +#define Anum_pg_dist_node_nodeisreplica 12 +#define Anum_pg_dist_node_nodeprimarynodeid 13 #define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq" #define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq" diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 79414eb3c..8ea5fb1d0 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -222,4 +222,7 @@ extern void SetupRebalanceMonitor(List *placementUpdateList, uint64 initialProgressState, PlacementUpdateStatus initialStatus); +extern void SplitShardsBetweenPrimaryAndReplica(WorkerNode *primaryNode, + WorkerNode *replicaNode, + Name strategyName); #endif /* SHARD_REBALANCER_H */ diff --git a/src/include/distributed/shard_transfer.h b/src/include/distributed/shard_transfer.h index c1621879b..0d7b641a9 100644 --- a/src/include/distributed/shard_transfer.h +++ b/src/include/distributed/shard_transfer.h @@ -41,3 +41,9 @@ extern void UpdatePlacementUpdateStatusForShardIntervalList(List *shardIntervalL extern void InsertDeferredDropCleanupRecordsForShards(List *shardIntervalList); extern void InsertCleanupRecordsForShardPlacementsOnNode(List *shardIntervalList, int32 groupId); + +extern void +AdjustShardsForPrimaryReplicaNodeSplit(WorkerNode *primaryNode, + WorkerNode *replicaNode, + List* primaryShardList, + List* replicaShardList); \ No newline at end of file diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 02a43fe0b..93cc6b290 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -54,6 +54,8 @@ typedef struct WorkerNode char nodeCluster[NAMEDATALEN]; /* the cluster the node is a part of */ bool metadataSynced; /* node has the most recent metadata */ bool shouldHaveShards; /* if the node should have distributed table shards on it or not */ + bool nodeisreplica; /* whether this node is a replica */ + int32 nodeprimarynodeid; /* nodeid of the primary for this replica */ } WorkerNode; @@ -84,6 +86,7 @@ extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort); extern WorkerNode * FindNodeWithNodeId(int nodeId, bool missingOk); +extern WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId); extern WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); extern List * ReadDistNode(bool includeNodesFromOtherClusters); extern void EnsureCoordinator(void);