Enable registration of worker node replicas in Citus

Core changes:
 *Extend pg_dist_node schema:
   Add nodeisreplica (BOOLEAN) and nodeprimarynodeid (INT4) columns to mark
   replicas and reference their primary nodes.
 *Introduce UDF citus_add_replica_node:
   citus_add_replica_node(
     replica_hostname TEXT,
     replica_port     INT,
     primary_hostname TEXT,
     primary_port     INT)
    RETURNS INT
  Registers a user‐provisioned PostgreSQL streaming replica as an inactive Citus
  node linked to its primary.

This lays the foundation for the upcoming snapshot-based node split feature.
pull/8046/head
Muhammad Usama 2025-06-18 00:31:28 +05:00
parent 55a0d1f730
commit d457ea49bf
9 changed files with 399 additions and 10 deletions

View File

@ -3549,6 +3549,18 @@ SecondaryNodeRoleId(void)
return MetadataCache.secondaryNodeRoleId;
}
/* return the Oid of the 'unavailable' nodeRole enum value */
Oid
UnavailableNodeRoleId(void)
{
if (!MetadataCache.secondaryNodeRoleId)
{
MetadataCache.secondaryNodeRoleId = LookupStringEnumValueId("noderole",
"unavailable");
}
return MetadataCache.secondaryNodeRoleId;
}
Oid
CitusJobStatusScheduledId(void)

View File

@ -814,7 +814,7 @@ NodeListInsertCommand(List *workerNodeList)
appendStringInfo(nodeListInsertCommand,
"INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, "
"noderack, hasmetadata, metadatasynced, isactive, noderole, "
"nodecluster, shouldhaveshards) VALUES ");
"nodecluster, shouldhaveshards, nodeisreplica, nodeprimarynodeid) VALUES ");
/* iterate over the worker nodes, add the values */
WorkerNode *workerNode = NULL;
@ -824,13 +824,14 @@ NodeListInsertCommand(List *workerNodeList)
char *metadataSyncedString = workerNode->metadataSynced ? "TRUE" : "FALSE";
char *isActiveString = workerNode->isActive ? "TRUE" : "FALSE";
char *shouldHaveShards = workerNode->shouldHaveShards ? "TRUE" : "FALSE";
char *nodeisreplicaString = workerNode->nodeisreplica ? "TRUE" : "FALSE";
Datum nodeRoleOidDatum = ObjectIdGetDatum(workerNode->nodeRole);
Datum nodeRoleStringDatum = DirectFunctionCall1(enum_out, nodeRoleOidDatum);
char *nodeRoleString = DatumGetCString(nodeRoleStringDatum);
appendStringInfo(nodeListInsertCommand,
"(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s, %s)",
"(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s, %s, %s, %d)",
workerNode->nodeId,
workerNode->groupId,
quote_literal_cstr(workerNode->workerName),
@ -841,7 +842,9 @@ NodeListInsertCommand(List *workerNodeList)
isActiveString,
nodeRoleString,
quote_literal_cstr(workerNode->nodeCluster),
shouldHaveShards);
shouldHaveShards,
nodeisreplicaString,
workerNode->nodeprimarynodeid);
processedWorkerNodeCount++;
if (processedWorkerNodeCount != workerCount)
@ -875,9 +878,11 @@ NodeListIdempotentInsertCommand(List *workerNodeList)
"hasmetadata = EXCLUDED.hasmetadata, "
"isactive = EXCLUDED.isactive, "
"noderole = EXCLUDED.noderole, "
"nodecluster = EXCLUDED.nodecluster ,"
"nodecluster = EXCLUDED.nodecluster, "
"metadatasynced = EXCLUDED.metadatasynced, "
"shouldhaveshards = EXCLUDED.shouldhaveshards";
"shouldhaveshards = EXCLUDED.shouldhaveshards, "
"nodeisreplica = EXCLUDED.nodeisreplica, "
"nodeprimarynodeid = EXCLUDED.nodeprimarynodeid";
appendStringInfoString(nodeInsertIdempotentCommand, onConflictStr);
return nodeInsertIdempotentCommand->data;
}

View File

@ -84,6 +84,8 @@ typedef struct NodeMetadata
bool isActive;
Oid nodeRole;
bool shouldHaveShards;
uint32 nodeprimarynodeid;
bool nodeisreplica;
char *nodeCluster;
} NodeMetadata;
@ -120,7 +122,6 @@ static char * NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced
static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value,
char *field);
static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards);
static WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId);
static void ErrorIfAnyNodeNotExist(List *nodeList);
static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context);
static void SendDeletionCommandsForReplicatedTablePlacements(
@ -140,6 +141,10 @@ static BackgroundWorkerHandle * CheckBackgroundWorkerToObtainLocks(int32 lock_co
static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode(
WorkerNode *workerNode, bool force, int32 lock_cooldown);
static void EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode,
char* replicaHostname,
int replicaPort);
/* Function definitions go here */
/* declarations for dynamic loading */
@ -168,6 +173,7 @@ PG_FUNCTION_INFO_V1(citus_coordinator_nodeid);
PG_FUNCTION_INFO_V1(citus_is_coordinator);
PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced);
PG_FUNCTION_INFO_V1(citus_is_primary_node);
PG_FUNCTION_INFO_V1(citus_add_replica_node);
/*
* DefaultNodeMetadata creates a NodeMetadata struct with the fields set to
@ -183,6 +189,8 @@ DefaultNodeMetadata()
nodeMetadata.nodeRack = WORKER_DEFAULT_RACK;
nodeMetadata.shouldHaveShards = true;
nodeMetadata.groupId = INVALID_GROUP_ID;
nodeMetadata.nodeisreplica = false;
nodeMetadata.nodeprimarynodeid = 0; /* 0 typically means InvalidNodeId */
return nodeMetadata;
}
@ -1422,6 +1430,149 @@ master_update_node(PG_FUNCTION_ARGS)
}
/*
* citus_add_replica_node function adds a new node as a replica of an existing primary node.
* It records the replica's hostname, port, and links it to the primary node's ID.
* The replica is initially marked as inactive and not having shards.
*/
Datum
citus_add_replica_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
EnsureCoordinator();
text *replicaHostnameText = PG_GETARG_TEXT_P(0);
int32 replicaPort = PG_GETARG_INT32(1);
text *primaryHostnameText = PG_GETARG_TEXT_P(2);
int32 primaryPort = PG_GETARG_INT32(3);
char *replicaHostname = text_to_cstring(replicaHostnameText);
char *primaryHostname = text_to_cstring(primaryHostnameText);
WorkerNode *primaryWorkerNode = FindWorkerNodeAnyCluster(primaryHostname, primaryPort);
if (primaryWorkerNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("primary node %s:%d not found in pg_dist_node",
primaryHostname, primaryPort)));
}
/* Future-proofing: Ideally, a primary node should not itself be a replica.
* This check might be more relevant once replica promotion logic exists.
* For now, pg_dist_node.nodeisreplica defaults to false for existing nodes.
*/
if (primaryWorkerNode->nodeisreplica)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("primary node %s:%d is itself a replica and cannot have replicas",
primaryHostname, primaryPort)));
}
if (!primaryWorkerNode->shouldHaveShards)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("primary node %s:%d does not have shards, node without shards cannot have replicas",
primaryHostname, primaryPort)));
}
WorkerNode *existingReplicaNode = FindWorkerNodeAnyCluster(replicaHostname, replicaPort);
if (existingReplicaNode != NULL)
{
/*
* Idempotency check: If the node already exists, is it already correctly
* registered as a replica for THIS primary?
*/
if (existingReplicaNode->nodeisreplica &&
existingReplicaNode->nodeprimarynodeid == primaryWorkerNode->nodeId)
{
ereport(NOTICE, (errmsg("node %s:%d is already registered as a replica for primary %s:%d (nodeid %d)",
replicaHostname, replicaPort,
primaryHostname, primaryPort, primaryWorkerNode->nodeId)));
PG_RETURN_INT32(existingReplicaNode->nodeId);
}
else
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("a different node %s:%d (nodeid %d) already exists or is a replica for a different primary",
replicaHostname, replicaPort, existingReplicaNode->nodeId)));
}
}
EnsureValidStreamingReplica(primaryWorkerNode, replicaHostname, replicaPort);
NodeMetadata nodeMetadata = DefaultNodeMetadata();
nodeMetadata.nodeisreplica = true;
nodeMetadata.nodeprimarynodeid = primaryWorkerNode->nodeId;
nodeMetadata.isActive = false; /* Replicas start as inactive */
nodeMetadata.shouldHaveShards = false; /* Replicas do not directly own primary shards */
nodeMetadata.groupId = primaryWorkerNode->groupId; /* Same group as primary */
nodeMetadata.nodeRole = UnavailableNodeRoleId(); /* The node role is set to 'unavailable' */
nodeMetadata.nodeCluster = primaryWorkerNode->nodeCluster; /* Same cluster as primary */
/* Other fields like hasMetadata, metadataSynced will take defaults from DefaultNodeMetadata
* (typically true, true for hasMetadata and metadataSynced if it's a new node,
* or might need adjustment based on replica strategy)
* For now, let's assume DefaultNodeMetadata provides suitable defaults for these
* or they will be set by AddNodeMetadata/ActivateNodeList if needed.
* Specifically, hasMetadata is often true, and metadataSynced true after activation.
* Since this replica is inactive, metadata sync status might be less critical initially.
*/
bool nodeAlreadyExists = false;
bool localOnly = false; /* Propagate change to other workers with metadata */
/*
* AddNodeMetadata will take an ExclusiveLock on pg_dist_node.
* It also checks again if the node already exists after acquiring the lock.
*/
int replicaNodeId = AddNodeMetadata(replicaHostname, replicaPort, &nodeMetadata,
&nodeAlreadyExists, localOnly);
if (nodeAlreadyExists)
{
/* This case should ideally be caught by the FindWorkerNodeAnyCluster check above,
* but AddNodeMetadata does its own check after locking.
* If it already exists and is correctly configured, we might have returned NOTICE above.
* If it exists but is NOT correctly configured as our replica, an ERROR would be more appropriate.
* AddNodeMetadata returns the existing node's ID if it finds one.
* We need to ensure it is the *correct* replica.
*/
WorkerNode *fetchedExistingNode = FindNodeAnyClusterByNodeId(replicaNodeId);
if (fetchedExistingNode != NULL && fetchedExistingNode->nodeisreplica &&
fetchedExistingNode->nodeprimarynodeid == primaryWorkerNode->nodeId)
{
ereport(NOTICE, (errmsg("node %s:%d was already correctly registered as a replica for primary %s:%d (nodeid %d)",
replicaHostname, replicaPort,
primaryHostname, primaryPort, primaryWorkerNode->nodeId)));
/* Intentional fall-through to return replicaNodeId */
}
else
{
/* This state is less expected if our initial check passed or errored. */
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("node %s:%d already exists but is not correctly configured as a replica for primary %s:%d",
replicaHostname, replicaPort, primaryHostname, primaryPort)));
}
}
TransactionModifiedNodeMetadata = true;
/*
* Note: Replicas added this way are inactive.
* A separate mechanism or UDF (e.g., citus_activate_replica_node or citus_promote_replica_node)
* would be needed to activate them, potentially after verifying replication lag, etc.
* Activation would typically involve:
* 1. Setting isActive = true.
* 2. Ensuring metadata is synced (hasMetadata=true, metadataSynced=true).
* 3. Potentially other logic like adding to specific scheduler lists.
* For now, citus_add_node_replica just registers it.
*/
PG_RETURN_INT32(replicaNodeId);
}
/*
* SetLockTimeoutLocally sets the lock_timeout to the given value.
* This setting is local.
@ -1871,7 +2022,7 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort)
* FindNodeAnyClusterByNodeId searches pg_dist_node and returns the node with
* the nodeId. If the node can't be found returns NULL.
*/
static WorkerNode *
WorkerNode *
FindNodeAnyClusterByNodeId(uint32 nodeId)
{
bool includeNodesFromOtherClusters = true;
@ -2924,6 +3075,10 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum(
nodeMetadata->shouldHaveShards);
values[Anum_pg_dist_node_nodeisreplica - 1] = BoolGetDatum(
nodeMetadata->nodeisreplica);
values[Anum_pg_dist_node_nodeprimarynodeid - 1] = Int32GetDatum(
nodeMetadata->nodeprimarynodeid);
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
@ -3014,8 +3169,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
/*
* This function can be called before "ALTER TABLE ... ADD COLUMN nodecluster ...",
* therefore heap_deform_tuple() won't set the isNullArray for this column. We
* initialize it true to be safe in that case.
* and other columns. We initialize isNullArray to true to be safe.
*/
memset(isNullArray, true, sizeof(isNullArray));
@ -3043,6 +3197,25 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
datumArray[Anum_pg_dist_node_shouldhaveshards -
1]);
/* nodeisreplica and nodeprimarynodeid might be null if row is from older version */
if (!isNullArray[Anum_pg_dist_node_nodeisreplica - 1])
{
workerNode->nodeisreplica = DatumGetBool(datumArray[Anum_pg_dist_node_nodeisreplica - 1]);
}
else
{
workerNode->nodeisreplica = false; /* Default value */
}
if (!isNullArray[Anum_pg_dist_node_nodeprimarynodeid - 1])
{
workerNode->nodeprimarynodeid = DatumGetInt32(datumArray[Anum_pg_dist_node_nodeprimarynodeid - 1]);
}
else
{
workerNode->nodeprimarynodeid = 0; /* Default value for InvalidNodeId */
}
/*
* nodecluster column can be missing. In the case of extension creation/upgrade,
* master_initialize_node_metadata function is called before the nodecluster
@ -3285,3 +3458,173 @@ SyncNodeMetadata(MetadataSyncContext *context)
*/
SendOrCollectCommandListToActivatedNodes(context, recreateNodeSnapshotCommandList);
}
/*
* EnsureValidStreamingReplica checks if the given replica is a valid streaming
* replica. It connects to the replica and checks if it is in recovery mode.
* If it is not, it errors out.
* It also checks the system identifier of the replica and the primary
* to ensure they match.
*/
static void
EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode, char* replicaHostname, int replicaPort)
{
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *replicaConnection = GetNodeConnection(connectionFlag, replicaHostname,
replicaPort);
const char *replica_recovery_query = "SELECT pg_is_in_recovery()";
int resultCode = SendRemoteCommand(replicaConnection, replica_recovery_query);
if (resultCode == 0)
{
CloseConnection(replicaConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to %s:%d to execute pg_is_in_recovery()",
replicaHostname, replicaPort)));
}
PGresult *result = GetRemoteCommandResult(replicaConnection, true);
if (result == NULL)
{
CloseConnection(replicaConnection);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("node %s:%d does not support pg_is_in_recovery()",
replicaHostname, replicaPort)));
}
List *sizeList = ReadFirstColumnAsText(result);
if (list_length(sizeList) != 1)
{
PQclear(result);
ClearResults(replicaConnection, true);
CloseConnection(replicaConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot parse pg_is_in_recovery() result from %s:%d",
replicaHostname,
replicaPort)));
}
StringInfo isInRecoveryQueryResInfo = (StringInfo) linitial(sizeList);
char *isInRecoveryQueryResStr = isInRecoveryQueryResInfo->data;
if (strcmp(isInRecoveryQueryResStr, "t") != 0 && strcmp(isInRecoveryQueryResStr, "true") != 0)
{
PQclear(result);
ClearResults(replicaConnection, true);
CloseConnection(replicaConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("%s:%d is not a streaming replica",
replicaHostname,
replicaPort)));
}
PQclear(result);
ForgetResults(replicaConnection);
/* Step2: Get the system identifier from replica */
const char *sysidQuery =
"SELECT system_identifier FROM pg_control_system()";
resultCode = SendRemoteCommand(replicaConnection, sysidQuery);
if (resultCode == 0)
{
CloseConnection(replicaConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to %s:%d to get system identifier",
replicaHostname, replicaPort)));
}
result = GetRemoteCommandResult(replicaConnection, true);
if (!IsResponseOK(result))
{
ReportResultError(replicaConnection, result, ERROR);
}
List *sysidList = ReadFirstColumnAsText(result);
if (list_length(sysidList) != 1)
{
PQclear(result);
ClearResults(replicaConnection, true);
CloseConnection(replicaConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot parse get system identifier result from %s:%d",
replicaHostname,
replicaPort)));
}
StringInfo sysidQueryResInfo = (StringInfo) linitial(sysidList);
char *sysidQueryResStr = sysidQueryResInfo->data;
ereport (NOTICE, (errmsg("system identifier of %s:%d is %s",
replicaHostname, replicaPort, sysidQueryResStr)));
/* We do not need the connection anymore */
PQclear(result);
ForgetResults(replicaConnection);
CloseConnection(replicaConnection);
/* Step3: Get system identifier from primary */
int primaryConnectionFlag = 0;
MultiConnection *primaryConnection = GetNodeConnection(primaryConnectionFlag,
primaryWorkerNode->workerName,
primaryWorkerNode->workerPort);
int primaryResultCode = SendRemoteCommand(primaryConnection, sysidQuery);
if (primaryResultCode == 0)
{
CloseConnection(primaryConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to %s:%d to get system identifier",
primaryWorkerNode->workerName, primaryWorkerNode->workerPort)));
}
PGresult *primaryResult = GetRemoteCommandResult(primaryConnection, true);
if (primaryResult == NULL)
{
CloseConnection(primaryConnection);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("node %s:%d does not support pg_control_system queries",
primaryWorkerNode->workerName, primaryWorkerNode->workerPort)));
}
List *primarySizeList = ReadFirstColumnAsText(primaryResult);
if (list_length(primarySizeList) != 1)
{
PQclear(primaryResult);
ClearResults(primaryConnection, true);
CloseConnection(primaryConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot parse get system identifier result from %s:%d",
primaryWorkerNode->workerName,
primaryWorkerNode->workerPort)));
}
StringInfo primarySysidQueryResInfo = (StringInfo) linitial(primarySizeList);
char *primarySysidQueryResStr = primarySysidQueryResInfo->data;
ereport (NOTICE, (errmsg("system identifier of %s:%d is %s",
primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primarySysidQueryResStr)));
/* verify both identifiers */
if (strcmp(sysidQueryResStr, primarySysidQueryResStr) != 0)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("system identifiers do not match: %s (replica) vs %s (primary)",
sysidQueryResStr, primarySysidQueryResStr)));
}
PQclear(primaryResult);
ClearResults(primaryConnection, true);
CloseConnection(primaryConnection);
}

View File

@ -0,0 +1,7 @@
-- Add replica information columns to pg_dist_node
ALTER TABLE pg_catalog.pg_dist_node ADD COLUMN nodeisreplica BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE pg_catalog.pg_dist_node ADD COLUMN nodeprimarynodeid INT4 NOT NULL DEFAULT 0;
-- Add a comment to the table and columns for clarity in \d output
COMMENT ON COLUMN pg_catalog.pg_dist_node.nodeisreplica IS 'Indicates if this node is a replica of another node.';
COMMENT ON COLUMN pg_catalog.pg_dist_node.nodeprimarynodeid IS 'If nodeisreplica is true, this stores the nodeid of its primary node.';

View File

@ -52,6 +52,9 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
#include "udfs/citus_stat_counters_reset/13.1-1.sql"
#include "udfs/citus_nodes/13.1-1.sql"
#include "cat_upgrades/add_replica_info_to_pg_dist_node.sql"
#include "udfs/citus_add_replica_node/13.1-1.sql"
-- Since shard_name/13.1-1.sql first drops the function and then creates it, we first
-- need to drop citus_shards view since that view depends on this function. And immediately
-- after creating the function, we recreate citus_shards view again.

View File

@ -0,0 +1,13 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_add_replica_node(
replica_hostname text,
replica_port integer,
primary_hostname text,
primary_port integer)
RETURNS INTEGER
LANGUAGE C VOLATILE STRICT
AS 'MODULE_PATHNAME', $$citus_add_replica_node$$;
COMMENT ON FUNCTION pg_catalog.citus_add_replica_node(text, integer, text, integer) IS
'Adds a new node as a replica of an existing primary node. The replica is initially inactive. Returns the nodeid of the new replica node.';
REVOKE ALL ON FUNCTION pg_catalog.citus_add_replica_node(text, int, text, int) FROM PUBLIC;

View File

@ -298,6 +298,7 @@ extern Oid CitusDependentObjectFuncId(void);
/* enum oids */
extern Oid PrimaryNodeRoleId(void);
extern Oid SecondaryNodeRoleId(void);
extern Oid UnavailableNodeRoleId(void);
extern Oid CitusCopyFormatTypeId(void);
extern Oid TextCopyFormatId(void);
extern Oid BinaryCopyFormatId(void);

View File

@ -20,7 +20,7 @@
* in particular their OUT parameters) must be changed whenever the definition of
* pg_dist_node changes.
*/
#define Natts_pg_dist_node 11
#define Natts_pg_dist_node 13
#define Anum_pg_dist_node_nodeid 1
#define Anum_pg_dist_node_groupid 2
#define Anum_pg_dist_node_nodename 3
@ -32,6 +32,8 @@
#define Anum_pg_dist_node_nodecluster 9
#define Anum_pg_dist_node_metadatasynced 10
#define Anum_pg_dist_node_shouldhaveshards 11
#define Anum_pg_dist_node_nodeisreplica 12
#define Anum_pg_dist_node_nodeprimarynodeid 13
#define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq"
#define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq"

View File

@ -54,6 +54,8 @@ typedef struct WorkerNode
char nodeCluster[NAMEDATALEN]; /* the cluster the node is a part of */
bool metadataSynced; /* node has the most recent metadata */
bool shouldHaveShards; /* if the node should have distributed table shards on it or not */
bool nodeisreplica; /* whether this node is a replica */
int32 nodeprimarynodeid; /* nodeid of the primary for this replica */
} WorkerNode;
@ -84,6 +86,7 @@ extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort);
extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort);
extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort);
extern WorkerNode * FindNodeWithNodeId(int nodeId, bool missingOk);
extern WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId);
extern WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
extern List * ReadDistNode(bool includeNodesFromOtherClusters);
extern void EnsureCoordinator(void);