pull/8046/merge
Muhammad Usama 2025-07-01 19:22:45 +00:00 committed by GitHub
commit 3ab6ed1fc7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 1429 additions and 13 deletions

View File

@ -3549,6 +3549,18 @@ SecondaryNodeRoleId(void)
return MetadataCache.secondaryNodeRoleId; return MetadataCache.secondaryNodeRoleId;
} }
/* return the Oid of the 'unavailable' nodeRole enum value */
Oid
UnavailableNodeRoleId(void)
{
if (!MetadataCache.secondaryNodeRoleId)
{
MetadataCache.secondaryNodeRoleId = LookupStringEnumValueId("noderole",
"unavailable");
}
return MetadataCache.secondaryNodeRoleId;
}
Oid Oid
CitusJobStatusScheduledId(void) CitusJobStatusScheduledId(void)
@ -4367,6 +4379,8 @@ InitializeWorkerNodeCache(void)
workerNode->isActive = currentNode->isActive; workerNode->isActive = currentNode->isActive;
workerNode->nodeRole = currentNode->nodeRole; workerNode->nodeRole = currentNode->nodeRole;
workerNode->shouldHaveShards = currentNode->shouldHaveShards; workerNode->shouldHaveShards = currentNode->shouldHaveShards;
workerNode->nodeprimarynodeid = currentNode->nodeprimarynodeid;
workerNode->nodeisreplica = currentNode->nodeisreplica;
strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN); strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN);
newWorkerNodeArray[workerNodeIndex++] = workerNode; newWorkerNodeArray[workerNodeIndex++] = workerNode;

View File

@ -814,7 +814,7 @@ NodeListInsertCommand(List *workerNodeList)
appendStringInfo(nodeListInsertCommand, appendStringInfo(nodeListInsertCommand,
"INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, " "INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, "
"noderack, hasmetadata, metadatasynced, isactive, noderole, " "noderack, hasmetadata, metadatasynced, isactive, noderole, "
"nodecluster, shouldhaveshards) VALUES "); "nodecluster, shouldhaveshards, nodeisreplica, nodeprimarynodeid) VALUES ");
/* iterate over the worker nodes, add the values */ /* iterate over the worker nodes, add the values */
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
@ -824,13 +824,14 @@ NodeListInsertCommand(List *workerNodeList)
char *metadataSyncedString = workerNode->metadataSynced ? "TRUE" : "FALSE"; char *metadataSyncedString = workerNode->metadataSynced ? "TRUE" : "FALSE";
char *isActiveString = workerNode->isActive ? "TRUE" : "FALSE"; char *isActiveString = workerNode->isActive ? "TRUE" : "FALSE";
char *shouldHaveShards = workerNode->shouldHaveShards ? "TRUE" : "FALSE"; char *shouldHaveShards = workerNode->shouldHaveShards ? "TRUE" : "FALSE";
char *nodeisreplicaString = workerNode->nodeisreplica ? "TRUE" : "FALSE";
Datum nodeRoleOidDatum = ObjectIdGetDatum(workerNode->nodeRole); Datum nodeRoleOidDatum = ObjectIdGetDatum(workerNode->nodeRole);
Datum nodeRoleStringDatum = DirectFunctionCall1(enum_out, nodeRoleOidDatum); Datum nodeRoleStringDatum = DirectFunctionCall1(enum_out, nodeRoleOidDatum);
char *nodeRoleString = DatumGetCString(nodeRoleStringDatum); char *nodeRoleString = DatumGetCString(nodeRoleStringDatum);
appendStringInfo(nodeListInsertCommand, appendStringInfo(nodeListInsertCommand,
"(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s, %s)", "(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s, %s, %s, %d)",
workerNode->nodeId, workerNode->nodeId,
workerNode->groupId, workerNode->groupId,
quote_literal_cstr(workerNode->workerName), quote_literal_cstr(workerNode->workerName),
@ -841,7 +842,9 @@ NodeListInsertCommand(List *workerNodeList)
isActiveString, isActiveString,
nodeRoleString, nodeRoleString,
quote_literal_cstr(workerNode->nodeCluster), quote_literal_cstr(workerNode->nodeCluster),
shouldHaveShards); shouldHaveShards,
nodeisreplicaString,
workerNode->nodeprimarynodeid);
processedWorkerNodeCount++; processedWorkerNodeCount++;
if (processedWorkerNodeCount != workerCount) if (processedWorkerNodeCount != workerCount)
@ -875,9 +878,11 @@ NodeListIdempotentInsertCommand(List *workerNodeList)
"hasmetadata = EXCLUDED.hasmetadata, " "hasmetadata = EXCLUDED.hasmetadata, "
"isactive = EXCLUDED.isactive, " "isactive = EXCLUDED.isactive, "
"noderole = EXCLUDED.noderole, " "noderole = EXCLUDED.noderole, "
"nodecluster = EXCLUDED.nodecluster ," "nodecluster = EXCLUDED.nodecluster, "
"metadatasynced = EXCLUDED.metadatasynced, " "metadatasynced = EXCLUDED.metadatasynced, "
"shouldhaveshards = EXCLUDED.shouldhaveshards"; "shouldhaveshards = EXCLUDED.shouldhaveshards, "
"nodeisreplica = EXCLUDED.nodeisreplica, "
"nodeprimarynodeid = EXCLUDED.nodeprimarynodeid";
appendStringInfoString(nodeInsertIdempotentCommand, onConflictStr); appendStringInfoString(nodeInsertIdempotentCommand, onConflictStr);
return nodeInsertIdempotentCommand->data; return nodeInsertIdempotentCommand->data;
} }

View File

@ -84,6 +84,8 @@ typedef struct NodeMetadata
bool isActive; bool isActive;
Oid nodeRole; Oid nodeRole;
bool shouldHaveShards; bool shouldHaveShards;
uint32 nodeprimarynodeid;
bool nodeisreplica;
char *nodeCluster; char *nodeCluster;
} NodeMetadata; } NodeMetadata;
@ -120,7 +122,6 @@ static char * NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced
static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value,
char *field); char *field);
static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards);
static WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId);
static void ErrorIfAnyNodeNotExist(List *nodeList); static void ErrorIfAnyNodeNotExist(List *nodeList);
static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context); static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context);
static void SendDeletionCommandsForReplicatedTablePlacements( static void SendDeletionCommandsForReplicatedTablePlacements(
@ -134,12 +135,16 @@ static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context,
static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid); static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid);
static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly); static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly);
static void EnsureTransactionalMetadataSyncMode(void); static void EnsureTransactionalMetadataSyncMode(void);
static void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE
lockMode);
static BackgroundWorkerHandle * CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown); static BackgroundWorkerHandle * CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown);
static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode( static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode(
WorkerNode *workerNode, bool force, int32 lock_cooldown); WorkerNode *workerNode, bool force, int32 lock_cooldown);
static void EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode,
char* replicaHostname, int replicaPort);
static int32 CitusAddReplicaNode(WorkerNode *primaryWorkerNode,
char *replicaHostname, int32 replicaPort);
static void RemoveReplicaNode(WorkerNode *replicaNode);
/* Function definitions go here */ /* Function definitions go here */
/* declarations for dynamic loading */ /* declarations for dynamic loading */
@ -168,6 +173,10 @@ PG_FUNCTION_INFO_V1(citus_coordinator_nodeid);
PG_FUNCTION_INFO_V1(citus_is_coordinator); PG_FUNCTION_INFO_V1(citus_is_coordinator);
PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced); PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced);
PG_FUNCTION_INFO_V1(citus_is_primary_node); PG_FUNCTION_INFO_V1(citus_is_primary_node);
PG_FUNCTION_INFO_V1(citus_add_replica_node);
PG_FUNCTION_INFO_V1(citus_add_replica_node_with_nodeid);
PG_FUNCTION_INFO_V1(citus_remove_replica_node);
PG_FUNCTION_INFO_V1(citus_remove_replica_node_with_nodeid);
/* /*
* DefaultNodeMetadata creates a NodeMetadata struct with the fields set to * DefaultNodeMetadata creates a NodeMetadata struct with the fields set to
@ -183,6 +192,8 @@ DefaultNodeMetadata()
nodeMetadata.nodeRack = WORKER_DEFAULT_RACK; nodeMetadata.nodeRack = WORKER_DEFAULT_RACK;
nodeMetadata.shouldHaveShards = true; nodeMetadata.shouldHaveShards = true;
nodeMetadata.groupId = INVALID_GROUP_ID; nodeMetadata.groupId = INVALID_GROUP_ID;
nodeMetadata.nodeisreplica = false;
nodeMetadata.nodeprimarynodeid = 0; /* 0 typically means InvalidNodeId */
return nodeMetadata; return nodeMetadata;
} }
@ -1176,6 +1187,27 @@ ActivateNodeList(MetadataSyncContext *context)
SetNodeMetadata(context, localOnly); SetNodeMetadata(context, localOnly);
} }
/*
* ActivateReplicaNodeAsPrimary sets the given worker node as primary and active
* in the pg_dist_node catalog and make the replica node as first class citizen.
*/
void
ActivateReplicaNodeAsPrimary(WorkerNode *workerNode)
{
/*
* Set the node as primary and active.
*/
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_noderole,
ObjectIdGetDatum(PrimaryNodeRoleId()));
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
BoolGetDatum(true));
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeisreplica,
BoolGetDatum(false));
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_shouldhaveshards,
BoolGetDatum(true));
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeprimarynodeid,
Int32GetDatum(0));
}
/* /*
* Acquires shard metadata locks on all shards residing in the given worker node * Acquires shard metadata locks on all shards residing in the given worker node
@ -1421,6 +1453,287 @@ master_update_node(PG_FUNCTION_ARGS)
return citus_update_node(fcinfo); return citus_update_node(fcinfo);
} }
/*
* citus_add_replica_node adds a new node as a replica of an existing primary node.
*/
Datum
citus_add_replica_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
EnsureCoordinator();
text *replicaHostnameText = PG_GETARG_TEXT_P(0);
int32 replicaPort = PG_GETARG_INT32(1);
text *primaryHostnameText = PG_GETARG_TEXT_P(2);
int32 primaryPort = PG_GETARG_INT32(3);
char *replicaHostname = text_to_cstring(replicaHostnameText);
char *primaryHostname = text_to_cstring(primaryHostnameText);
WorkerNode *primaryWorkerNode = FindWorkerNodeAnyCluster(primaryHostname, primaryPort);
if (primaryWorkerNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("primary node %s:%d not found in pg_dist_node",
primaryHostname, primaryPort)));
}
int32 replicaNodeId = CitusAddReplicaNode(primaryWorkerNode, replicaHostname, replicaPort);
PG_RETURN_INT32(replicaNodeId);
}
/*
* citus_add_replica_node_with_nodeid adds a new node as a replica of an existing primary node
* using the primary node's ID. It records the replica's hostname, port, and links it to the
* primary node's ID.
*
* This function is useful when you already know the primary node's ID and want to add a replica
* without needing to look it up by hostname and port.
*/
Datum
citus_add_replica_node_with_nodeid(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
EnsureCoordinator();
text *replicaHostnameText = PG_GETARG_TEXT_P(0);
int32 replicaPort = PG_GETARG_INT32(1);
int32 primaryNodeId = PG_GETARG_INT32(2);
char *replicaHostname = text_to_cstring(replicaHostnameText);
bool missingOk = false;
WorkerNode *primaryWorkerNode = FindNodeWithNodeId(primaryNodeId, missingOk);
if (primaryWorkerNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("primary node with ID %d does not exist", primaryNodeId)));
}
int32 replicaNodeId = CitusAddReplicaNode(primaryWorkerNode, replicaHostname, replicaPort);
PG_RETURN_INT32(replicaNodeId);
}
/*
* CitusAddReplicaNode function adds a new node as a replica of an existing primary node.
* It records the replica's hostname, port, and links it to the primary node's ID.
* The replica is initially marked as inactive and not having shards.
*/
static int32
CitusAddReplicaNode(WorkerNode *primaryWorkerNode,
char *replicaHostname, int32 replicaPort)
{
Assert(primaryWorkerNode != NULL);
/* Future-proofing: Ideally, a primary node should not itself be a replica.
* This check might be more relevant once replica promotion logic exists.
* For now, pg_dist_node.nodeisreplica defaults to false for existing nodes.
*/
if (primaryWorkerNode->nodeisreplica)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("primary node %s:%d is itself a replica and cannot have replicas",
primaryWorkerNode->workerName, primaryWorkerNode->workerPort)));
}
if (!primaryWorkerNode->shouldHaveShards)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("primary node %s:%d does not have shards, node without shards cannot have replicas",
primaryWorkerNode->workerName, primaryWorkerNode->workerPort)));
}
WorkerNode *existingReplicaNode = FindWorkerNodeAnyCluster(replicaHostname, replicaPort);
if (existingReplicaNode != NULL)
{
/*
* Idempotency check: If the node already exists, is it already correctly
* registered as a replica for THIS primary?
*/
if (existingReplicaNode->nodeisreplica &&
existingReplicaNode->nodeprimarynodeid == primaryWorkerNode->nodeId)
{
ereport(NOTICE, (errmsg("node %s:%d is already registered as a replica for primary %s:%d (nodeid %d)",
replicaHostname, replicaPort,
primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primaryWorkerNode->nodeId)));
PG_RETURN_INT32(existingReplicaNode->nodeId);
}
else
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("a different node %s:%d (nodeid %d) already exists or is a replica for a different primary",
replicaHostname, replicaPort, existingReplicaNode->nodeId)));
}
}
EnsureValidStreamingReplica(primaryWorkerNode, replicaHostname, replicaPort);
NodeMetadata nodeMetadata = DefaultNodeMetadata();
nodeMetadata.nodeisreplica = true;
nodeMetadata.nodeprimarynodeid = primaryWorkerNode->nodeId;
nodeMetadata.isActive = false; /* Replicas start as inactive */
nodeMetadata.shouldHaveShards = false; /* Replicas do not directly own primary shards */
nodeMetadata.groupId = INVALID_GROUP_ID; /* Replicas get a new group ID and do not belong to any existing group */
nodeMetadata.nodeRole = UnavailableNodeRoleId(); /* The node role is set to 'unavailable' */
nodeMetadata.nodeCluster = primaryWorkerNode->nodeCluster; /* Same cluster as primary */
/* Other fields like hasMetadata, metadataSynced will take defaults from DefaultNodeMetadata
* (typically true, true for hasMetadata and metadataSynced if it's a new node,
* or might need adjustment based on replica strategy)
* For now, let's assume DefaultNodeMetadata provides suitable defaults for these
* or they will be set by AddNodeMetadata/ActivateNodeList if needed.
* Specifically, hasMetadata is often true, and metadataSynced true after activation.
* Since this replica is inactive, metadata sync status might be less critical initially.
*/
bool nodeAlreadyExists = false;
bool localOnly = false; /* Propagate change to other workers with metadata */
/*
* AddNodeMetadata will take an ExclusiveLock on pg_dist_node.
* It also checks again if the node already exists after acquiring the lock.
*/
int replicaNodeId = AddNodeMetadata(replicaHostname, replicaPort, &nodeMetadata,
&nodeAlreadyExists, localOnly);
if (nodeAlreadyExists)
{
/* This case should ideally be caught by the FindWorkerNodeAnyCluster check above,
* but AddNodeMetadata does its own check after locking.
* If it already exists and is correctly configured, we might have returned NOTICE above.
* If it exists but is NOT correctly configured as our replica, an ERROR would be more appropriate.
* AddNodeMetadata returns the existing node's ID if it finds one.
* We need to ensure it is the *correct* replica.
*/
WorkerNode *fetchedExistingNode = FindNodeAnyClusterByNodeId(replicaNodeId);
if (fetchedExistingNode != NULL && fetchedExistingNode->nodeisreplica &&
fetchedExistingNode->nodeprimarynodeid == primaryWorkerNode->nodeId)
{
ereport(NOTICE, (errmsg("node %s:%d was already correctly registered as a replica for primary %s:%d (nodeid %d)",
replicaHostname, replicaPort,
primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primaryWorkerNode->nodeId)));
/* Intentional fall-through to return replicaNodeId */
}
else
{
/* This state is less expected if our initial check passed or errored. */
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("node %s:%d already exists but is not correctly configured as a replica for primary %s:%d",
replicaHostname, replicaPort, primaryWorkerNode->workerName, primaryWorkerNode->workerPort)));
}
}
TransactionModifiedNodeMetadata = true;
/*
* Note: Replicas added this way are inactive.
* A separate mechanism or UDF (e.g., citus_activate_replica_node or citus_promote_replica_node)
* would be needed to activate them, potentially after verifying replication lag, etc.
* Activation would typically involve:
* 1. Setting isActive = true.
* 2. Ensuring metadata is synced (hasMetadata=true, metadataSynced=true).
* 3. Potentially other logic like adding to specific scheduler lists.
* For now, citus_add_node_replica just registers it.
*/
return replicaNodeId;
}
/*
* citus_remove_replica_node removes an inactive streaming replica node from Citus metadata.
*/
Datum
citus_remove_replica_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
EnsureCoordinator();
text *nodeNameText = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeName = text_to_cstring(nodeNameText);
WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
if (workerNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("node \"%s:%d\" does not exist", nodeName, nodePort)));
}
RemoveReplicaNode(workerNode);
PG_RETURN_VOID();
}
/*
* citus_remove_replica_node_with_nodeid removes an inactive streaming replica node from Citus metadata
* using the node's ID.
*/
Datum
citus_remove_replica_node_with_nodeid(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
EnsureCoordinator();
uint32 replicaNodeId = PG_GETARG_INT32(0);
WorkerNode *replicaNode = FindNodeAnyClusterByNodeId(replicaNodeId);
if (replicaNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Replica node with ID %d does not exist", replicaNodeId)));
}
RemoveReplicaNode(replicaNode);
PG_RETURN_VOID();
}
static void
RemoveReplicaNode(WorkerNode *replicaNode)
{
Assert(replicaNode != NULL);
if (!replicaNode->nodeisreplica)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Node %s:%d (ID %d) is not a replica node. "
"Use citus_remove_node() to remove primary or already promoted nodes.",
replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId)));
}
if (replicaNode->isActive)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Replica node %s:%d (ID %d) is marked as active and cannot be removed with this function. "
"This might indicate a promoted replica. Consider using citus_remove_node() if you are sure, "
"or ensure it's properly deactivated if it's an unpromoted replica in an unexpected state.",
replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId)));
}
/*
* All checks passed, proceed with removal.
* RemoveNodeFromCluster handles locking, catalog changes, connection closing, and metadata sync.
*/
ereport(NOTICE, (errmsg("Removing inactive replica node %s:%d (ID %d)",
replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId)));
RemoveNodeFromCluster(replicaNode->workerName, replicaNode->workerPort);
/* RemoveNodeFromCluster might set this, but setting it here ensures it's marked for this UDF's transaction. */
TransactionModifiedNodeMetadata = true;
}
/* /*
* SetLockTimeoutLocally sets the lock_timeout to the given value. * SetLockTimeoutLocally sets the lock_timeout to the given value.
@ -1871,7 +2184,7 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort)
* FindNodeAnyClusterByNodeId searches pg_dist_node and returns the node with * FindNodeAnyClusterByNodeId searches pg_dist_node and returns the node with
* the nodeId. If the node can't be found returns NULL. * the nodeId. If the node can't be found returns NULL.
*/ */
static WorkerNode * WorkerNode *
FindNodeAnyClusterByNodeId(uint32 nodeId) FindNodeAnyClusterByNodeId(uint32 nodeId)
{ {
bool includeNodesFromOtherClusters = true; bool includeNodesFromOtherClusters = true;
@ -2924,6 +3237,10 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum; values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum( values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum(
nodeMetadata->shouldHaveShards); nodeMetadata->shouldHaveShards);
values[Anum_pg_dist_node_nodeisreplica - 1] = BoolGetDatum(
nodeMetadata->nodeisreplica);
values[Anum_pg_dist_node_nodeprimarynodeid - 1] = Int32GetDatum(
nodeMetadata->nodeprimarynodeid);
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock); Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
@ -3014,8 +3331,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
/* /*
* This function can be called before "ALTER TABLE ... ADD COLUMN nodecluster ...", * This function can be called before "ALTER TABLE ... ADD COLUMN nodecluster ...",
* therefore heap_deform_tuple() won't set the isNullArray for this column. We * and other columns. We initialize isNullArray to true to be safe.
* initialize it true to be safe in that case.
*/ */
memset(isNullArray, true, sizeof(isNullArray)); memset(isNullArray, true, sizeof(isNullArray));
@ -3043,6 +3359,25 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
datumArray[Anum_pg_dist_node_shouldhaveshards - datumArray[Anum_pg_dist_node_shouldhaveshards -
1]); 1]);
/* nodeisreplica and nodeprimarynodeid might be null if row is from older version */
if (!isNullArray[Anum_pg_dist_node_nodeisreplica - 1])
{
workerNode->nodeisreplica = DatumGetBool(datumArray[Anum_pg_dist_node_nodeisreplica - 1]);
}
else
{
workerNode->nodeisreplica = false; /* Default value */
}
if (!isNullArray[Anum_pg_dist_node_nodeprimarynodeid - 1])
{
workerNode->nodeprimarynodeid = DatumGetInt32(datumArray[Anum_pg_dist_node_nodeprimarynodeid - 1]);
}
else
{
workerNode->nodeprimarynodeid = 0; /* Default value for InvalidNodeId */
}
/* /*
* nodecluster column can be missing. In the case of extension creation/upgrade, * nodecluster column can be missing. In the case of extension creation/upgrade,
* master_initialize_node_metadata function is called before the nodecluster * master_initialize_node_metadata function is called before the nodecluster
@ -3285,3 +3620,173 @@ SyncNodeMetadata(MetadataSyncContext *context)
*/ */
SendOrCollectCommandListToActivatedNodes(context, recreateNodeSnapshotCommandList); SendOrCollectCommandListToActivatedNodes(context, recreateNodeSnapshotCommandList);
} }
/*
* EnsureValidStreamingReplica checks if the given replica is a valid streaming
* replica. It connects to the replica and checks if it is in recovery mode.
* If it is not, it errors out.
* It also checks the system identifier of the replica and the primary
* to ensure they match.
*/
static void
EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode, char* replicaHostname, int replicaPort)
{
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *replicaConnection = GetNodeConnection(connectionFlag, replicaHostname,
replicaPort);
const char *replica_recovery_query = "SELECT pg_is_in_recovery()";
int resultCode = SendRemoteCommand(replicaConnection, replica_recovery_query);
if (resultCode == 0)
{
CloseConnection(replicaConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to %s:%d to execute pg_is_in_recovery()",
replicaHostname, replicaPort)));
}
PGresult *result = GetRemoteCommandResult(replicaConnection, true);
if (result == NULL)
{
CloseConnection(replicaConnection);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("node %s:%d does not support pg_is_in_recovery()",
replicaHostname, replicaPort)));
}
List *sizeList = ReadFirstColumnAsText(result);
if (list_length(sizeList) != 1)
{
PQclear(result);
ClearResults(replicaConnection, true);
CloseConnection(replicaConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot parse pg_is_in_recovery() result from %s:%d",
replicaHostname,
replicaPort)));
}
StringInfo isInRecoveryQueryResInfo = (StringInfo) linitial(sizeList);
char *isInRecoveryQueryResStr = isInRecoveryQueryResInfo->data;
if (strcmp(isInRecoveryQueryResStr, "t") != 0 && strcmp(isInRecoveryQueryResStr, "true") != 0)
{
PQclear(result);
ClearResults(replicaConnection, true);
CloseConnection(replicaConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("%s:%d is not a streaming replica",
replicaHostname,
replicaPort)));
}
PQclear(result);
ForgetResults(replicaConnection);
/* Step2: Get the system identifier from replica */
const char *sysidQuery =
"SELECT system_identifier FROM pg_control_system()";
resultCode = SendRemoteCommand(replicaConnection, sysidQuery);
if (resultCode == 0)
{
CloseConnection(replicaConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to %s:%d to get system identifier",
replicaHostname, replicaPort)));
}
result = GetRemoteCommandResult(replicaConnection, true);
if (!IsResponseOK(result))
{
ReportResultError(replicaConnection, result, ERROR);
}
List *sysidList = ReadFirstColumnAsText(result);
if (list_length(sysidList) != 1)
{
PQclear(result);
ClearResults(replicaConnection, true);
CloseConnection(replicaConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot parse get system identifier result from %s:%d",
replicaHostname,
replicaPort)));
}
StringInfo sysidQueryResInfo = (StringInfo) linitial(sysidList);
char *sysidQueryResStr = sysidQueryResInfo->data;
ereport (DEBUG2, (errmsg("system identifier of %s:%d is %s",
replicaHostname, replicaPort, sysidQueryResStr)));
/* We do not need the connection anymore */
PQclear(result);
ForgetResults(replicaConnection);
CloseConnection(replicaConnection);
/* Step3: Get system identifier from primary */
int primaryConnectionFlag = 0;
MultiConnection *primaryConnection = GetNodeConnection(primaryConnectionFlag,
primaryWorkerNode->workerName,
primaryWorkerNode->workerPort);
int primaryResultCode = SendRemoteCommand(primaryConnection, sysidQuery);
if (primaryResultCode == 0)
{
CloseConnection(primaryConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to %s:%d to get system identifier",
primaryWorkerNode->workerName, primaryWorkerNode->workerPort)));
}
PGresult *primaryResult = GetRemoteCommandResult(primaryConnection, true);
if (primaryResult == NULL)
{
CloseConnection(primaryConnection);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("node %s:%d does not support pg_control_system queries",
primaryWorkerNode->workerName, primaryWorkerNode->workerPort)));
}
List *primarySizeList = ReadFirstColumnAsText(primaryResult);
if (list_length(primarySizeList) != 1)
{
PQclear(primaryResult);
ClearResults(primaryConnection, true);
CloseConnection(primaryConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot parse get system identifier result from %s:%d",
primaryWorkerNode->workerName,
primaryWorkerNode->workerPort)));
}
StringInfo primarySysidQueryResInfo = (StringInfo) linitial(primarySizeList);
char *primarySysidQueryResStr = primarySysidQueryResInfo->data;
ereport (DEBUG2, (errmsg("system identifier of %s:%d is %s",
primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primarySysidQueryResStr)));
/* verify both identifiers */
if (strcmp(sysidQueryResStr, primarySysidQueryResStr) != 0)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("system identifiers do not match: %s (replica) vs %s (primary)",
sysidQueryResStr, primarySysidQueryResStr)));
}
PQclear(primaryResult);
ClearResults(primaryConnection, true);
CloseConnection(primaryConnection);
}

View File

@ -0,0 +1,380 @@
#include "postgres.h"
#include "utils/fmgrprotos.h"
#include "utils/pg_lsn.h"
#include "distributed/argutils.h"
#include "distributed/remote_commands.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/shard_rebalancer.h"
static int64 GetReplicationLag(WorkerNode *primaryWorkerNode, WorkerNode *replicaWorkerNode);
static void BlockAllWritesToWorkerNode(WorkerNode *workerNode);
static bool GetNodeIsInRecoveryStatus(WorkerNode *workerNode);
static void PromoteReplicaNode(WorkerNode *replicaWorkerNode);
PG_FUNCTION_INFO_V1(citus_promote_replica_and_rebalance);
Datum
citus_promote_replica_and_rebalance(PG_FUNCTION_ARGS)
{
// Ensure superuser and coordinator
EnsureSuperUser();
EnsureCoordinator();
// Get replica_nodeid argument
int32 replicaNodeIdArg = PG_GETARG_INT32(0);
WorkerNode *replicaNode = NULL;
WorkerNode *primaryNode = NULL;
// Lock pg_dist_node to prevent concurrent modifications during this operation
LockRelationOid(DistNodeRelationId(), RowExclusiveLock);
replicaNode = FindNodeAnyClusterByNodeId(replicaNodeIdArg);
if (replicaNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Replica node with ID %d not found.", replicaNodeIdArg)));
}
if (!replicaNode->nodeisreplica || replicaNode->nodeprimarynodeid == 0)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Node %s:%d (ID %d) is not a valid replica or its primary node ID is not set.",
replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId)));
}
if (replicaNode->isActive)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Replica node %s:%d (ID %d) is already active and cannot be promoted.",
replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId)));
}
primaryNode = FindNodeAnyClusterByNodeId(replicaNode->nodeprimarynodeid);
if (primaryNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Primary node with ID %d (for replica %s:%d) not found.",
replicaNode->nodeprimarynodeid, replicaNode->workerName, replicaNode->workerPort)));
}
if (primaryNode->nodeisreplica)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Primary node %s:%d (ID %d) is itself a replica.",
primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId)));
}
if (!primaryNode->isActive)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Primary node %s:%d (ID %d) is not active.",
primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId)));
}
/* Ensure the primary node is related to the replica node */
if (primaryNode->nodeId != replicaNode->nodeprimarynodeid)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Replica node %s:%d (ID %d) is not replica of the primary node %s:%d (ID %d).",
replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId,
primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId)));
}
ereport(NOTICE, (errmsg("Starting promotion process for replica node %s:%d (ID %d), original primary %s:%d (ID %d)",
replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId,
primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId)));
/* Step 1: Block Writes on Original Primary's Shards */
ereport(NOTICE, (errmsg("Blocking writes on shards of original primary node %s:%d (group %d)",
primaryNode->workerName, primaryNode->workerPort, primaryNode->groupId)));
BlockAllWritesToWorkerNode(primaryNode);
/* Step 2: Wait for Replica to Catch Up */
ereport(NOTICE, (errmsg("Waiting for replica %s:%d to catch up with primary %s:%d",
replicaNode->workerName, replicaNode->workerPort,
primaryNode->workerName, primaryNode->workerPort)));
bool caughtUp = false;
const int catchUpTimeoutSeconds = 300; // 5 minutes, TODO: Make GUC
const int sleepIntervalSeconds = 5;
int elapsedTimeSeconds = 0;
while (elapsedTimeSeconds < catchUpTimeoutSeconds)
{
uint64 repLag = GetReplicationLag(primaryNode, replicaNode);
if (repLag <= 0)
{
caughtUp = true;
break;
}
pg_usleep(sleepIntervalSeconds * 1000000L);
elapsedTimeSeconds += sleepIntervalSeconds;
}
if (!caughtUp)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Replica %s:%d failed to catch up with primary %s:%d within %d seconds.",
replicaNode->workerName, replicaNode->workerPort,
primaryNode->workerName, primaryNode->workerPort,
catchUpTimeoutSeconds)));
}
ereport(NOTICE, (errmsg("Replica %s:%d is now caught up with primary %s:%d.",
replicaNode->workerName, replicaNode->workerPort,
primaryNode->workerName, primaryNode->workerPort)));
/* Step 3: PostgreSQL Replica Promotion */
ereport(NOTICE, (errmsg("Attempting to promote replica %s:%d via pg_promote().",
replicaNode->workerName, replicaNode->workerPort)));
PromoteReplicaNode(replicaNode);
/* Step 4: Update Replica Metadata in pg_dist_node on Coordinator */
ereport(NOTICE, (errmsg("Updating metadata for promoted replica %s:%d (ID %d)",
replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId)));
ActivateReplicaNodeAsPrimary(replicaNode);
/* We need to sync metadata changes to all nodes before rebalancing shards
* since the rebalancing algorithm depends on the latest metadata.
*/
SyncNodeMetadataToNodes();
/* Step 5: Split Shards Between Primary and Replica */
SplitShardsBetweenPrimaryAndReplica(primaryNode, replicaNode, PG_GETARG_NAME_OR_NULL(1));
TransactionModifiedNodeMetadata = true; // Inform Citus about metadata change
TriggerNodeMetadataSyncOnCommit(); // Ensure changes are propagated
ereport(NOTICE, (errmsg("Replica node %s:%d (ID %d) metadata updated. It is now a primary",
replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId)));
/* TODO: Step 6: Unblock Writes (should be handled by transaction commit) */
ereport(NOTICE, (errmsg("TODO: Step 6: Unblock Writes")));
PG_RETURN_VOID();
}
/*
* GetReplicationLag calculates the replication lag between the primary and replica nodes.
* It returns the lag in bytes.
*/
static int64
GetReplicationLag(WorkerNode *primaryWorkerNode, WorkerNode *replicaWorkerNode)
{
#if PG_VERSION_NUM >= 100000
const char *primary_lsn_query = "SELECT pg_current_wal_lsn()";
const char *replica_lsn_query = "SELECT pg_last_wal_replay_lsn()";
#else
const char *primary_lsn_query = "SELECT pg_current_xlog_location()";
const char *replica_lsn_query = "SELECT pg_last_xlog_replay_location()";
#endif
int connectionFlag = 0;
MultiConnection *primaryConnection = GetNodeConnection(connectionFlag,
primaryWorkerNode->workerName,
primaryWorkerNode->workerPort);
if (PQstatus(primaryConnection->pgConn) != CONNECTION_OK)
{
ereport(ERROR, (errmsg("cannot connect to %s:%d to fetch replication status",
primaryWorkerNode->workerName, primaryWorkerNode->workerPort)));
}
MultiConnection *replicaConnection = GetNodeConnection(connectionFlag,
replicaWorkerNode->workerName,
replicaWorkerNode->workerPort);
if (PQstatus(replicaConnection->pgConn) != CONNECTION_OK)
{
ereport(ERROR, (errmsg("cannot connect to %s:%d to fetch replication status",
replicaWorkerNode->workerName, replicaWorkerNode->workerPort)));
}
int primaryResultCode = SendRemoteCommand(primaryConnection, primary_lsn_query);
if (primaryResultCode == 0)
{
ReportConnectionError(primaryConnection, ERROR);
}
PGresult *primaryResult = GetRemoteCommandResult(primaryConnection, true);
if (!IsResponseOK(primaryResult))
{
ReportResultError(primaryConnection, primaryResult, ERROR);
}
int replicaResultCode = SendRemoteCommand(replicaConnection, replica_lsn_query);
if (replicaResultCode == 0)
{
ReportConnectionError(replicaConnection, ERROR);
}
PGresult *replicaResult = GetRemoteCommandResult(replicaConnection, true);
if (!IsResponseOK(replicaResult))
{
ReportResultError(replicaConnection, replicaResult, ERROR);
}
List *primaryLsnList = ReadFirstColumnAsText(primaryResult);
if (list_length(primaryLsnList) != 1)
{
PQclear(primaryResult);
ClearResults(primaryConnection, true);
CloseConnection(primaryConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot parse get primary LSN result from %s:%d",
primaryWorkerNode->workerName,
primaryWorkerNode->workerPort)));
}
StringInfo primaryLsnQueryResInfo = (StringInfo) linitial(primaryLsnList);
char *primary_lsn_str = primaryLsnQueryResInfo->data;
List *replicaLsnList = ReadFirstColumnAsText(replicaResult);
if (list_length(replicaLsnList) != 1)
{
PQclear(replicaResult);
ClearResults(replicaConnection, true);
CloseConnection(replicaConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot parse get replica LSN result from %s:%d",
replicaWorkerNode->workerName,
replicaWorkerNode->workerPort)));
}
StringInfo replicaLsnQueryResInfo = (StringInfo) linitial(replicaLsnList);
char *replica_lsn_str = replicaLsnQueryResInfo->data;
if (!primary_lsn_str || !replica_lsn_str)
return -1;
int64 primary_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(primary_lsn_str)));
int64 replica_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(replica_lsn_str)));
int64 lag_bytes = primary_lsn - replica_lsn;
PQclear(primaryResult);
ForgetResults(primaryConnection);
CloseConnection(primaryConnection);
PQclear(replicaResult);
ForgetResults(replicaConnection);
CloseConnection(replicaConnection);
ereport(NOTICE, (errmsg("replication lag between %s:%d and %s:%d is %ld bytes",
primaryWorkerNode->workerName, primaryWorkerNode->workerPort,
replicaWorkerNode->workerName, replicaWorkerNode->workerPort,
lag_bytes)));
return lag_bytes;
}
static void
PromoteReplicaNode(WorkerNode *replicaWorkerNode)
{
int connectionFlag = 0;
MultiConnection *replicaConnection = GetNodeConnection(connectionFlag,
replicaWorkerNode->workerName,
replicaWorkerNode->workerPort);
if (PQstatus(replicaConnection->pgConn) != CONNECTION_OK)
{
ereport(ERROR, (errmsg("cannot connect to %s:%d to promote replica",
replicaWorkerNode->workerName, replicaWorkerNode->workerPort)));
}
const char *promoteQuery = "SELECT pg_promote(wait := true);";
int resultCode = SendRemoteCommand(replicaConnection, promoteQuery);
if (resultCode == 0)
{
ReportConnectionError(replicaConnection, ERROR);
}
ForgetResults(replicaConnection);
CloseConnection(replicaConnection);
/* connect again and verify the replica is promoted */
if ( GetNodeIsInRecoveryStatus(replicaWorkerNode) )
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Failed to promote replica %s:%d (ID %d). It is still in recovery.",
replicaWorkerNode->workerName, replicaWorkerNode->workerPort, replicaWorkerNode->nodeId)));
}
else
{
ereport(NOTICE, (errmsg("Replica node %s:%d (ID %d) has been successfully promoted.",
replicaWorkerNode->workerName, replicaWorkerNode->workerPort, replicaWorkerNode->nodeId)));
}
}
static void
BlockAllWritesToWorkerNode(WorkerNode *workerNode)
{
ereport(NOTICE, (errmsg("Blocking all writes to worker node %s:%d (ID %d)",
workerNode->workerName, workerNode->workerPort, workerNode->nodeId)));
// List *placementsOnOldPrimaryGroup = AllShardPlacementsOnNodeGroup(workerNode->groupId);
LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock);
}
bool
GetNodeIsInRecoveryStatus(WorkerNode *workerNode)
{
int connectionFlag = 0;
MultiConnection *nodeConnection = GetNodeConnection(connectionFlag,
workerNode->workerName,
workerNode->workerPort);
if (PQstatus(nodeConnection->pgConn) != CONNECTION_OK)
{
ereport(ERROR, (errmsg("cannot connect to %s:%d to check recovery status",
workerNode->workerName, workerNode->workerPort)));
}
const char *recoveryQuery = "SELECT pg_is_in_recovery();";
int resultCode = SendRemoteCommand(nodeConnection, recoveryQuery);
if (resultCode == 0)
{
ReportConnectionError(nodeConnection, ERROR);
}
PGresult *result = GetRemoteCommandResult(nodeConnection, true);
if (!IsResponseOK(result))
{
ReportResultError(nodeConnection, result, ERROR);
}
List *recoveryStatusList = ReadFirstColumnAsText(result);
if (list_length(recoveryStatusList) != 1)
{
PQclear(result);
ClearResults(nodeConnection, true);
CloseConnection(nodeConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot parse recovery status result from %s:%d",
workerNode->workerName,
workerNode->workerPort)));
}
StringInfo recoveryStatusInfo = (StringInfo) linitial(recoveryStatusList);
bool isInRecovery = (strcmp(recoveryStatusInfo->data, "t") == 0) || (strcmp(recoveryStatusInfo->data, "true") == 0);
PQclear(result);
ForgetResults(nodeConnection);
CloseConnection(nodeConnection);
return isInRecovery;
}

View File

@ -81,9 +81,27 @@ typedef struct RebalanceOptions
Form_pg_dist_rebalance_strategy rebalanceStrategy; Form_pg_dist_rebalance_strategy rebalanceStrategy;
const char *operationName; const char *operationName;
WorkerNode *workerNode; WorkerNode *workerNode;
List *involvedWorkerNodeList;
} RebalanceOptions; } RebalanceOptions;
typedef struct SplitPrimaryReplicaShards
{
/*
* primaryShardPlacementList contains the placements that
* should stay on primary worker node.
*/
List *primaryShardIdList;
/*
* replicaShardPlacementList contains the placements that should stay on
* replica worker node.
*/
List *replicaShardIdList;
} SplitPrimaryReplicaShards;
static SplitPrimaryReplicaShards *
GetPrimaryReplicaSplitRebalanceSteps(RebalanceOptions *options, WorkerNode* replicaNode);
/* /*
* RebalanceState is used to keep the internal state of the rebalance * RebalanceState is used to keep the internal state of the rebalance
* algorithm in one place. * algorithm in one place.
@ -318,6 +336,7 @@ PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check);
PG_FUNCTION_INFO_V1(citus_rebalance_start); PG_FUNCTION_INFO_V1(citus_rebalance_start);
PG_FUNCTION_INFO_V1(citus_rebalance_stop); PG_FUNCTION_INFO_V1(citus_rebalance_stop);
PG_FUNCTION_INFO_V1(citus_rebalance_wait); PG_FUNCTION_INFO_V1(citus_rebalance_wait);
PG_FUNCTION_INFO_V1(get_snapshot_based_node_split_plan);
bool RunningUnderCitusTestSuite = false; bool RunningUnderCitusTestSuite = false;
int MaxRebalancerLoggedIgnoredMoves = 5; int MaxRebalancerLoggedIgnoredMoves = 5;
@ -517,8 +536,16 @@ GetRebalanceSteps(RebalanceOptions *options)
.context = &context, .context = &context,
}; };
if (options->involvedWorkerNodeList == NULL)
{
/*
* If the user did not specify a list of worker nodes, we use all the
* active worker nodes.
*/
options->involvedWorkerNodeList = SortedActiveWorkers();
}
/* sort the lists to make the function more deterministic */ /* sort the lists to make the function more deterministic */
List *activeWorkerList = SortedActiveWorkers(); List *activeWorkerList = options->involvedWorkerNodeList; //SortedActiveWorkers();
int shardAllowedNodeCount = 0; int shardAllowedNodeCount = 0;
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_declared_ptr(workerNode, activeWorkerList) foreach_declared_ptr(workerNode, activeWorkerList)
@ -981,6 +1008,7 @@ rebalance_table_shards(PG_FUNCTION_ARGS)
.excludedShardArray = PG_GETARG_ARRAYTYPE_P(3), .excludedShardArray = PG_GETARG_ARRAYTYPE_P(3),
.drainOnly = PG_GETARG_BOOL(5), .drainOnly = PG_GETARG_BOOL(5),
.rebalanceStrategy = strategy, .rebalanceStrategy = strategy,
.involvedWorkerNodeList = NULL,
.improvementThreshold = strategy->improvementThreshold, .improvementThreshold = strategy->improvementThreshold,
}; };
Oid shardTransferModeOid = PG_GETARG_OID(4); Oid shardTransferModeOid = PG_GETARG_OID(4);
@ -3546,6 +3574,342 @@ EnsureShardCostUDF(Oid functionOid)
ReleaseSysCache(proctup); ReleaseSysCache(proctup);
} }
/*
* SplitShardsBetweenPrimaryAndReplica splits the shards in shardPlacementList
* between the primary and replica nodes, adding them to the respective lists.
*/
void
SplitShardsBetweenPrimaryAndReplica(WorkerNode *primaryNode,
WorkerNode *replicaNode,
Name strategyName)
{
CheckCitusVersion(ERROR);
List *relationIdList = NIL;
relationIdList = NonColocatedDistRelationIdList();
Form_pg_dist_rebalance_strategy strategy = GetRebalanceStrategy(strategyName);/* We use default strategy for now */
RebalanceOptions options = {
.relationIdList = relationIdList,
.threshold = 0, /* Threshold is not strictly needed for two nodes */
.maxShardMoves = -1, /* No limit on moves between these two nodes */
.excludedShardArray = construct_empty_array(INT8OID),
.drainOnly = false, /* Not a drain operation */
.rebalanceStrategy = strategy,
.improvementThreshold = 0, /* Consider all beneficial moves */
.workerNode = primaryNode /* indicate Primary node as a source node */
};
SplitPrimaryReplicaShards *splitShards = NULL;
splitShards = GetPrimaryReplicaSplitRebalanceSteps(&options, replicaNode);
AdjustShardsForPrimaryReplicaNodeSplit(primaryNode, replicaNode,
splitShards->primaryShardIdList, splitShards->replicaShardIdList);
}
/*
* GetPrimaryReplicaSplitRebalanceSteps returns a List of PlacementUpdateEvents that are needed to
* rebalance a list of tables.
*/
static SplitPrimaryReplicaShards *
GetPrimaryReplicaSplitRebalanceSteps(RebalanceOptions *options, WorkerNode* replicaNode)
{
WorkerNode *sourceNode = options->workerNode;
WorkerNode *targetNode = replicaNode;
/* Initialize rebalance plan functions and context */
EnsureShardCostUDF(options->rebalanceStrategy->shardCostFunction);
EnsureNodeCapacityUDF(options->rebalanceStrategy->nodeCapacityFunction);
EnsureShardAllowedOnNodeUDF(options->rebalanceStrategy->shardAllowedOnNodeFunction);
RebalanceContext context;
memset(&context, 0, sizeof(RebalanceContext));
fmgr_info(options->rebalanceStrategy->shardCostFunction, &context.shardCostUDF);
fmgr_info(options->rebalanceStrategy->nodeCapacityFunction, &context.nodeCapacityUDF);
fmgr_info(options->rebalanceStrategy->shardAllowedOnNodeFunction,
&context.shardAllowedOnNodeUDF);
RebalancePlanFunctions rebalancePlanFunctions = {
.shardAllowedOnNode = ShardAllowedOnNode,
.nodeCapacity = NodeCapacity,
.shardCost = GetShardCost,
.context = &context,
};
/*
* Collect all active shard placements on the source node for the given relations.
* Unlike the main rebalancer, we build a single list of all relevant source placements
* across all specified relations (or all relations if none specified).
*/
List *allSourcePlacements = NIL;
Oid relationIdItr = InvalidOid;
foreach_declared_oid(relationIdItr, options->relationIdList)
{
List *shardPlacementList = FullShardPlacementList(relationIdItr,
options->excludedShardArray);
List *activeShardPlacementsForRelation =
FilterShardPlacementList(shardPlacementList, IsActiveShardPlacement);
ShardPlacement *placement = NULL;
foreach_declared_ptr(placement, activeShardPlacementsForRelation)
{
if (placement->nodeId == sourceNode->nodeId)
{
/* Ensure we don't add duplicate shardId if it's somehow listed under multiple relations */
bool alreadyAdded = false;
ShardPlacement *existingPlacement = NULL;
foreach_declared_ptr(existingPlacement, allSourcePlacements)
{
if (existingPlacement->shardId == placement->shardId)
{
alreadyAdded = true;
break;
}
}
if (!alreadyAdded)
{
allSourcePlacements = lappend(allSourcePlacements, placement);
}
}
}
}
List *activeWorkerList = list_make2(options->workerNode, replicaNode);
SplitPrimaryReplicaShards *splitShards = palloc0(sizeof(SplitPrimaryReplicaShards));
splitShards->primaryShardIdList = NIL;
splitShards->replicaShardIdList = NIL;
if (list_length(allSourcePlacements) > 0)
{
/*
* Initialize RebalanceState considering only the source node's shards
* and the two active workers (source and target).
*/
RebalanceState *state = InitRebalanceState(activeWorkerList, allSourcePlacements, &rebalancePlanFunctions);
NodeFillState *sourceFillState = NULL;
NodeFillState *targetFillState = NULL;
ListCell *fsc = NULL;
/* Identify the fill states for our specific source and target nodes */
foreach(fsc, state->fillStateListAsc) /* Could be fillStateListDesc too, order doesn't matter here */
{
NodeFillState *fs = (NodeFillState *) lfirst(fsc);
if (fs->node->nodeId == sourceNode->nodeId)
{
sourceFillState = fs;
}
else if (fs->node->nodeId == targetNode->nodeId)
{
targetFillState = fs;
}
}
if (sourceFillState != NULL && targetFillState != NULL)
{
/*
* The goal is to move roughly half the total cost from source to target.
* The target node is assumed to be empty or its existing load is not
* considered for this specific two-node balancing plan's shard distribution.
* We calculate costs based *only* on the shards currently on the source node.
*/
/*
* The core idea is to simulate the balancing process between these two nodes.
* We have all shards on sourceFillState. TargetFillState is initially empty (in terms of these specific shards).
* We want to move shards from source to target until their costs are as balanced as possible.
*/
float4 sourceCurrentCost = sourceFillState->totalCost;
float4 targetCurrentCost = 0; /* Representing cost on target from these source shards */
/* Sort shards on source node by cost (descending). This is a common heuristic. */
sourceFillState->shardCostListDesc = SortList(sourceFillState->shardCostListDesc, CompareShardCostDesc);
List *potentialMoves = NIL;
ListCell *lc_shardcost = NULL;
/*
* Iterate through each shard on the source node. For each shard, decide if moving it
* to the target node would improve the balance (or is necessary to reach balance).
* A simple greedy approach: move shard if target node's current cost is less than source's.
*/
foreach(lc_shardcost, sourceFillState->shardCostListDesc)
{
ShardCost *shardToConsider = (ShardCost *) lfirst(lc_shardcost);
/* Check if shard is allowed on the target node */
// if (!state->functions->shardAllowedOnNode(shardToConsider->shardId,
// targetNode,
// state->functions->context))
// {
// splitShards->primaryShardIdList = lappend_int(splitShards->primaryShardIdList, shardToConsider->shardId);
// continue; /* Cannot move this shard to the target */
// }
/*
* If moving this shard makes the target less loaded than the source would become,
* or if target is simply less loaded currently, consider the move.
* More accurately, we move if target's cost + shard's cost < source's cost - shard's cost (approximately)
* or if target is significantly emptier.
* The condition (targetCurrentCost < sourceCurrentCost - shardToConsider->cost) is a greedy choice.
* A better check: would moving this shard reduce the difference in costs?
* Current difference: abs(sourceCurrentCost - targetCurrentCost)
* Difference after move: abs((sourceCurrentCost - shardToConsider->cost) - (targetCurrentCost + shardToConsider->cost))
* Move if new difference is smaller.
*/
float4 costOfShard = shardToConsider->cost;
float4 diffBefore = fabsf(sourceCurrentCost - targetCurrentCost);
float4 diffAfter = fabsf((sourceCurrentCost - costOfShard) - (targetCurrentCost + costOfShard));
if (diffAfter < diffBefore)
{
PlacementUpdateEvent *update = palloc0(sizeof(PlacementUpdateEvent));
update->shardId = shardToConsider->shardId;
update->sourceNode = sourceNode;
update->targetNode = targetNode;
update->updateType = PLACEMENT_UPDATE_MOVE;
potentialMoves = lappend(potentialMoves, update);
splitShards->replicaShardIdList = lappend_int(splitShards->replicaShardIdList, shardToConsider->shardId);
/* Update simulated costs for the next iteration */
sourceCurrentCost -= costOfShard;
targetCurrentCost += costOfShard;
}
else
{
splitShards->primaryShardIdList = lappend_int(splitShards->primaryShardIdList, shardToConsider->shardId);
}
}
}
/* RebalanceState is in memory context, will be cleaned up */
}
return splitShards;
}
/*
* Snapshot-based node split plan outputs the shard placement plan
* for primary and replica based node split
*
* SQL signature:
* get_snapshot_based_node_split_plan(
* primary_node_name text,
* primary_node_port integer,
* replica_node_name text,
* replica_node_port integer,
* rebalance_strategy name DEFAULT NULL
*
*/
Datum
get_snapshot_based_node_split_plan(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *primaryNodeNameText = PG_GETARG_TEXT_P(0);
int32 primaryNodePort = PG_GETARG_INT32(1);
text *replicaNodeNameText = PG_GETARG_TEXT_P(2);
int32 replicaNodePort = PG_GETARG_INT32(3);
char *primaryNodeName = text_to_cstring(primaryNodeNameText);
char *replicaNodeName = text_to_cstring(replicaNodeNameText);
WorkerNode *primaryNode = FindWorkerNodeOrError(primaryNodeName, primaryNodePort);
WorkerNode *replicaNode = FindWorkerNodeOrError(replicaNodeName, replicaNodePort);
if (!replicaNode->nodeisreplica || replicaNode->nodeprimarynodeid == 0)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Node %s:%d (ID %d) is not a valid replica or its primary node ID is not set.",
replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId)));
}
if (primaryNode->nodeisreplica)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Primary node %s:%d (ID %d) is itself a replica.",
primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId)));
}
/* Ensure the primary node is related to the replica node */
if (primaryNode->nodeId != replicaNode->nodeprimarynodeid)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Replica node %s:%d (ID %d) is not replica of the primary node %s:%d (ID %d).",
replicaNode->workerName, replicaNode->workerPort, replicaNode->nodeId,
primaryNode->workerName, primaryNode->workerPort, primaryNode->nodeId)));
}
List *relationIdList = NIL;
relationIdList = NonColocatedDistRelationIdList();
Form_pg_dist_rebalance_strategy strategy = GetRebalanceStrategy(
PG_GETARG_NAME_OR_NULL(4));
RebalanceOptions options = {
.relationIdList = relationIdList,
.threshold = 0, /* Threshold is not strictly needed for two nodes */
.maxShardMoves = -1, /* No limit on moves between these two nodes */
.excludedShardArray = construct_empty_array(INT8OID),
.drainOnly = false, /* Not a drain operation */
.rebalanceStrategy = strategy,
.improvementThreshold = 0, /* Consider all beneficial moves */
.workerNode = primaryNode /* indicate Primary node as a source node */
};
SplitPrimaryReplicaShards *splitShards = NULL;
splitShards = GetPrimaryReplicaSplitRebalanceSteps(&options, replicaNode);
if (splitShards == NULL)
{
ereport(ERROR, (errmsg("No shards to split between primary and replica nodes.")));
}
int shardId = 0;
TupleDesc tupdesc;
Tuplestorestate *tupstore = SetupTuplestore(fcinfo, &tupdesc);
Datum values[4];
bool nulls[4];
foreach_declared_int(shardId, splitShards->primaryShardIdList)
{
ShardInterval *shardInterval = LoadShardInterval(shardId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
ListCell *colocatedShardCell = NULL;
foreach(colocatedShardCell, colocatedShardList)
{
ShardInterval *colocatedShard = lfirst(colocatedShardCell);
int colocatedShardId = colocatedShard->shardId;
memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
values[0] = ObjectIdGetDatum(RelationIdForShard(colocatedShardId));
values[1] = UInt64GetDatum(colocatedShardId);
values[2] = UInt64GetDatum(ShardLength(colocatedShardId));
values[3] = PointerGetDatum(cstring_to_text("Primary Node"));
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
}
foreach_declared_int(shardId, splitShards->replicaShardIdList)
{
ShardInterval *shardInterval = LoadShardInterval(shardId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
ListCell *colocatedShardCell = NULL;
foreach(colocatedShardCell, colocatedShardList)
{
ShardInterval *colocatedShard = lfirst(colocatedShardCell);
int colocatedShardId = colocatedShard->shardId;
memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
values[0] = ObjectIdGetDatum(RelationIdForShard(colocatedShardId));
values[1] = UInt64GetDatum(colocatedShardId);
values[2] = UInt64GetDatum(ShardLength(colocatedShardId));
values[3] = PointerGetDatum(cstring_to_text("Replica Node"));
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
}
return (Datum) 0;
}
/* /*
* EnsureNodeCapacityUDF checks that the UDF matching the oid has the correct * EnsureNodeCapacityUDF checks that the UDF matching the oid has the correct

View File

@ -573,6 +573,44 @@ TransferShards(int64 shardId, char *sourceNodeName,
FinalizeCurrentProgressMonitor(); FinalizeCurrentProgressMonitor();
} }
/*
* AdjustShardsForPrimaryReplicaNodeSplit is called when a primary-replica node split
* occurs. It adjusts the shard placements such that the shards that should be on the
* primary node are removed from the replica node, and vice versa.
*
* This function does not move any data; it only updates the shard placement metadata.
*/
void
AdjustShardsForPrimaryReplicaNodeSplit(WorkerNode *primaryNode,
WorkerNode *replicaNode,
List* primaryShardList,
List* replicaShardList)
{
int shardId = 0;
/*
* Remove all shards from the replica that should reside on the primary node,
* and update the shard placement metadata for shards that will now be served
* from the replica node. No data movement is required; we only need to drop
* the relevant shards from the replica and primary nodes and update the
* corresponding shard placement metadata.
*/
foreach_declared_int(shardId, primaryShardList)
{
ShardInterval *shardInterval = LoadShardInterval(shardId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
/* TODO: Drops shard table here */
}
/* Now drop all shards from primary that need to be on the replica node */
foreach_declared_int(shardId, replicaShardList)
{
ShardInterval *shardInterval = LoadShardInterval(shardId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
UpdateColocatedShardPlacementMetadataOnWorkers(shardId,
primaryNode->workerName, primaryNode->workerPort,
replicaNode->workerName, replicaNode->workerPort);
/* TODO: Drop the not required table on primary here */
}
}
/* /*
* Insert deferred cleanup records. * Insert deferred cleanup records.

View File

@ -0,0 +1,7 @@
-- Add replica information columns to pg_dist_node
ALTER TABLE pg_catalog.pg_dist_node ADD COLUMN nodeisreplica BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE pg_catalog.pg_dist_node ADD COLUMN nodeprimarynodeid INT4 NOT NULL DEFAULT 0;
-- Add a comment to the table and columns for clarity in \d output
COMMENT ON COLUMN pg_catalog.pg_dist_node.nodeisreplica IS 'Indicates if this node is a replica of another node.';
COMMENT ON COLUMN pg_catalog.pg_dist_node.nodeprimarynodeid IS 'If nodeisreplica is true, this stores the nodeid of its primary node.';

View File

@ -52,6 +52,11 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
#include "udfs/citus_stat_counters_reset/13.1-1.sql" #include "udfs/citus_stat_counters_reset/13.1-1.sql"
#include "udfs/citus_nodes/13.1-1.sql" #include "udfs/citus_nodes/13.1-1.sql"
#include "cat_upgrades/add_replica_info_to_pg_dist_node.sql"
#include "udfs/citus_add_replica_node/13.1-1.sql"
#include "udfs/citus_remove_replica_node/13.1-1.sql"
#include "udfs/citus_promote_replica_and_rebalance/13.1-1.sql"
#include "udfs/get_snapshot_based_node_split_plan/13.1-1.sql"
-- Since shard_name/13.1-1.sql first drops the function and then creates it, we first -- Since shard_name/13.1-1.sql first drops the function and then creates it, we first
-- need to drop citus_shards view since that view depends on this function. And immediately -- need to drop citus_shards view since that view depends on this function. And immediately
-- after creating the function, we recreate citus_shards view again. -- after creating the function, we recreate citus_shards view again.

View File

@ -0,0 +1,26 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_add_replica_node(
replica_hostname text,
replica_port integer,
primary_hostname text,
primary_port integer)
RETURNS INTEGER
LANGUAGE C VOLATILE STRICT
AS 'MODULE_PATHNAME', $$citus_add_replica_node$$;
COMMENT ON FUNCTION pg_catalog.citus_add_replica_node(text, integer, text, integer) IS
'Adds a new node as a replica of an existing primary node. The replica is initially inactive. Returns the nodeid of the new replica node.';
REVOKE ALL ON FUNCTION pg_catalog.citus_add_replica_node(text, int, text, int) FROM PUBLIC;
CREATE OR REPLACE FUNCTION pg_catalog.citus_add_replica_node_with_nodeid(
replica_hostname text,
replica_port integer,
primary_nodeid integer)
RETURNS INTEGER
LANGUAGE C VOLATILE STRICT
AS 'MODULE_PATHNAME', $$citus_add_replica_node_with_nodeid$$;
COMMENT ON FUNCTION pg_catalog.citus_add_replica_node_with_nodeid(text, integer, integer) IS
'Adds a new node as a replica of an existing primary node using the primary node''s ID. The replica is initially inactive. Returns the nodeid of the new replica node.';
REVOKE ALL ON FUNCTION pg_catalog.citus_add_replica_node_with_nodeid(text, int, int) FROM PUBLIC;

View File

@ -0,0 +1,12 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_promote_replica_and_rebalance(
replica_nodeid integer,
rebalance_strategy name DEFAULT NULL
)
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_promote_replica_and_rebalance(integer, name) IS
'Promotes a registered replica node to a primary, performs necessary metadata updates, and rebalances a portion of shards from its original primary to the newly promoted node.';
REVOKE ALL ON FUNCTION pg_catalog.citus_promote_replica_and_rebalance(integer, name) FROM PUBLIC;

View File

@ -0,0 +1,24 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_remove_replica_node(
nodename text,
nodeport integer
)
RETURNS VOID
LANGUAGE C VOLATILE STRICT
AS 'MODULE_PATHNAME', $$citus_remove_replica_node$$;
COMMENT ON FUNCTION pg_catalog.citus_remove_replica_node(text, integer)
IS 'Removes an inactive streaming replica node from Citus metadata. Errors if the node is not found, not registered as a replica, or is currently marked active.';
REVOKE ALL ON FUNCTION pg_catalog.citus_remove_replica_node(text, integer) FROM PUBLIC;
CREATE OR REPLACE FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid(
nodeid integer
)
RETURNS VOID
LANGUAGE C VOLATILE STRICT
AS 'MODULE_PATHNAME', $$citus_remove_replica_node_with_nodeid$$;
COMMENT ON FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid(integer)
IS 'Removes an inactive streaming replica node from Citus metadata using its node ID. Errors if the node is not found, not registered as a replica, or is currently marked active.';
REVOKE ALL ON FUNCTION pg_catalog.citus_remove_replica_node_with_nodeid(integer) FROM PUBLIC;

View File

@ -0,0 +1,18 @@
CREATE OR REPLACE FUNCTION pg_catalog.get_snapshot_based_node_split_plan(
primary_node_name text,
primary_node_port integer,
replica_node_name text,
replica_node_port integer,
rebalance_strategy name DEFAULT NULL
)
RETURNS TABLE (table_name regclass,
shardid bigint,
shard_size bigint,
placement_node text)
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.get_snapshot_based_node_split_plan(text, int, text, int, name)
IS 'shows the shard placements to balance shards between primary and replica worker nodes';
REVOKE ALL ON FUNCTION pg_catalog.get_snapshot_based_node_split_plan(text, int, text, int, name) FROM PUBLIC;

View File

@ -298,6 +298,7 @@ extern Oid CitusDependentObjectFuncId(void);
/* enum oids */ /* enum oids */
extern Oid PrimaryNodeRoleId(void); extern Oid PrimaryNodeRoleId(void);
extern Oid SecondaryNodeRoleId(void); extern Oid SecondaryNodeRoleId(void);
extern Oid UnavailableNodeRoleId(void);
extern Oid CitusCopyFormatTypeId(void); extern Oid CitusCopyFormatTypeId(void);
extern Oid TextCopyFormatId(void); extern Oid TextCopyFormatId(void);
extern Oid BinaryCopyFormatId(void); extern Oid BinaryCopyFormatId(void);

View File

@ -466,4 +466,7 @@ extern bool IsBackgroundJobStatusTerminal(BackgroundJobStatus status);
extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status); extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status);
extern Oid BackgroundJobStatusOid(BackgroundJobStatus status); extern Oid BackgroundJobStatusOid(BackgroundJobStatus status);
extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status); extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status);
/* from node_metadata.c */
extern void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode);
extern void ActivateReplicaNodeAsPrimary(WorkerNode *workerNode);
#endif /* METADATA_UTILITY_H */ #endif /* METADATA_UTILITY_H */

View File

@ -20,7 +20,7 @@
* in particular their OUT parameters) must be changed whenever the definition of * in particular their OUT parameters) must be changed whenever the definition of
* pg_dist_node changes. * pg_dist_node changes.
*/ */
#define Natts_pg_dist_node 11 #define Natts_pg_dist_node 13
#define Anum_pg_dist_node_nodeid 1 #define Anum_pg_dist_node_nodeid 1
#define Anum_pg_dist_node_groupid 2 #define Anum_pg_dist_node_groupid 2
#define Anum_pg_dist_node_nodename 3 #define Anum_pg_dist_node_nodename 3
@ -32,6 +32,8 @@
#define Anum_pg_dist_node_nodecluster 9 #define Anum_pg_dist_node_nodecluster 9
#define Anum_pg_dist_node_metadatasynced 10 #define Anum_pg_dist_node_metadatasynced 10
#define Anum_pg_dist_node_shouldhaveshards 11 #define Anum_pg_dist_node_shouldhaveshards 11
#define Anum_pg_dist_node_nodeisreplica 12
#define Anum_pg_dist_node_nodeprimarynodeid 13
#define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq" #define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq"
#define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq" #define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq"

View File

@ -222,4 +222,7 @@ extern void SetupRebalanceMonitor(List *placementUpdateList,
uint64 initialProgressState, uint64 initialProgressState,
PlacementUpdateStatus initialStatus); PlacementUpdateStatus initialStatus);
extern void SplitShardsBetweenPrimaryAndReplica(WorkerNode *primaryNode,
WorkerNode *replicaNode,
Name strategyName);
#endif /* SHARD_REBALANCER_H */ #endif /* SHARD_REBALANCER_H */

View File

@ -41,3 +41,9 @@ extern void UpdatePlacementUpdateStatusForShardIntervalList(List *shardIntervalL
extern void InsertDeferredDropCleanupRecordsForShards(List *shardIntervalList); extern void InsertDeferredDropCleanupRecordsForShards(List *shardIntervalList);
extern void InsertCleanupRecordsForShardPlacementsOnNode(List *shardIntervalList, extern void InsertCleanupRecordsForShardPlacementsOnNode(List *shardIntervalList,
int32 groupId); int32 groupId);
extern void
AdjustShardsForPrimaryReplicaNodeSplit(WorkerNode *primaryNode,
WorkerNode *replicaNode,
List* primaryShardList,
List* replicaShardList);

View File

@ -54,6 +54,8 @@ typedef struct WorkerNode
char nodeCluster[NAMEDATALEN]; /* the cluster the node is a part of */ char nodeCluster[NAMEDATALEN]; /* the cluster the node is a part of */
bool metadataSynced; /* node has the most recent metadata */ bool metadataSynced; /* node has the most recent metadata */
bool shouldHaveShards; /* if the node should have distributed table shards on it or not */ bool shouldHaveShards; /* if the node should have distributed table shards on it or not */
bool nodeisreplica; /* whether this node is a replica */
int32 nodeprimarynodeid; /* nodeid of the primary for this replica */
} WorkerNode; } WorkerNode;
@ -84,6 +86,7 @@ extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort);
extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort);
extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort);
extern WorkerNode * FindNodeWithNodeId(int nodeId, bool missingOk); extern WorkerNode * FindNodeWithNodeId(int nodeId, bool missingOk);
extern WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId);
extern WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); extern WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
extern List * ReadDistNode(bool includeNodesFromOtherClusters); extern List * ReadDistNode(bool includeNodesFromOtherClusters);
extern void EnsureCoordinator(void); extern void EnsureCoordinator(void);