diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 79cc61092..b9f274495 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -3549,6 +3549,18 @@ SecondaryNodeRoleId(void) return MetadataCache.secondaryNodeRoleId; } +/* return the Oid of the 'unavailable' nodeRole enum value */ +Oid +UnavailableNodeRoleId(void) +{ + if (!MetadataCache.secondaryNodeRoleId) + { + MetadataCache.secondaryNodeRoleId = LookupStringEnumValueId("noderole", + "unavailable"); + } + + return MetadataCache.secondaryNodeRoleId; +} Oid CitusJobStatusScheduledId(void) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index f73856169..215516c5a 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -814,7 +814,7 @@ NodeListInsertCommand(List *workerNodeList) appendStringInfo(nodeListInsertCommand, "INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, " "noderack, hasmetadata, metadatasynced, isactive, noderole, " - "nodecluster, shouldhaveshards) VALUES "); + "nodecluster, shouldhaveshards, nodeisreplica, nodeprimarynodeid) VALUES "); /* iterate over the worker nodes, add the values */ WorkerNode *workerNode = NULL; @@ -824,13 +824,14 @@ NodeListInsertCommand(List *workerNodeList) char *metadataSyncedString = workerNode->metadataSynced ? "TRUE" : "FALSE"; char *isActiveString = workerNode->isActive ? "TRUE" : "FALSE"; char *shouldHaveShards = workerNode->shouldHaveShards ? "TRUE" : "FALSE"; + char *nodeisreplicaString = workerNode->nodeisreplica ? "TRUE" : "FALSE"; Datum nodeRoleOidDatum = ObjectIdGetDatum(workerNode->nodeRole); Datum nodeRoleStringDatum = DirectFunctionCall1(enum_out, nodeRoleOidDatum); char *nodeRoleString = DatumGetCString(nodeRoleStringDatum); appendStringInfo(nodeListInsertCommand, - "(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s, %s)", + "(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s, %s, %s, %d)", workerNode->nodeId, workerNode->groupId, quote_literal_cstr(workerNode->workerName), @@ -841,7 +842,9 @@ NodeListInsertCommand(List *workerNodeList) isActiveString, nodeRoleString, quote_literal_cstr(workerNode->nodeCluster), - shouldHaveShards); + shouldHaveShards, + nodeisreplicaString, + workerNode->nodeprimarynodeid); processedWorkerNodeCount++; if (processedWorkerNodeCount != workerCount) @@ -875,9 +878,11 @@ NodeListIdempotentInsertCommand(List *workerNodeList) "hasmetadata = EXCLUDED.hasmetadata, " "isactive = EXCLUDED.isactive, " "noderole = EXCLUDED.noderole, " - "nodecluster = EXCLUDED.nodecluster ," + "nodecluster = EXCLUDED.nodecluster, " "metadatasynced = EXCLUDED.metadatasynced, " - "shouldhaveshards = EXCLUDED.shouldhaveshards"; + "shouldhaveshards = EXCLUDED.shouldhaveshards, " + "nodeisreplica = EXCLUDED.nodeisreplica, " + "nodeprimarynodeid = EXCLUDED.nodeprimarynodeid"; appendStringInfoString(nodeInsertIdempotentCommand, onConflictStr); return nodeInsertIdempotentCommand->data; } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index c2a2abd60..57d49ac81 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -84,6 +84,8 @@ typedef struct NodeMetadata bool isActive; Oid nodeRole; bool shouldHaveShards; + uint32 nodeprimarynodeid; + bool nodeisreplica; char *nodeCluster; } NodeMetadata; @@ -120,7 +122,6 @@ static char * NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *field); static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); -static WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId); static void ErrorIfAnyNodeNotExist(List *nodeList); static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context); static void SendDeletionCommandsForReplicatedTablePlacements( @@ -140,6 +141,10 @@ static BackgroundWorkerHandle * CheckBackgroundWorkerToObtainLocks(int32 lock_co static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode( WorkerNode *workerNode, bool force, int32 lock_cooldown); +static void EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode, + char* replicaHostname, + int replicaPort); + /* Function definitions go here */ /* declarations for dynamic loading */ @@ -168,6 +173,7 @@ PG_FUNCTION_INFO_V1(citus_coordinator_nodeid); PG_FUNCTION_INFO_V1(citus_is_coordinator); PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced); PG_FUNCTION_INFO_V1(citus_is_primary_node); +PG_FUNCTION_INFO_V1(citus_add_replica_node); /* * DefaultNodeMetadata creates a NodeMetadata struct with the fields set to @@ -183,6 +189,8 @@ DefaultNodeMetadata() nodeMetadata.nodeRack = WORKER_DEFAULT_RACK; nodeMetadata.shouldHaveShards = true; nodeMetadata.groupId = INVALID_GROUP_ID; + nodeMetadata.nodeisreplica = false; + nodeMetadata.nodeprimarynodeid = 0; /* 0 typically means InvalidNodeId */ return nodeMetadata; } @@ -1422,6 +1430,149 @@ master_update_node(PG_FUNCTION_ARGS) } +/* + * citus_add_replica_node function adds a new node as a replica of an existing primary node. + * It records the replica's hostname, port, and links it to the primary node's ID. + * The replica is initially marked as inactive and not having shards. + */ +Datum +citus_add_replica_node(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + text *replicaHostnameText = PG_GETARG_TEXT_P(0); + int32 replicaPort = PG_GETARG_INT32(1); + text *primaryHostnameText = PG_GETARG_TEXT_P(2); + int32 primaryPort = PG_GETARG_INT32(3); + + char *replicaHostname = text_to_cstring(replicaHostnameText); + char *primaryHostname = text_to_cstring(primaryHostnameText); + + WorkerNode *primaryWorkerNode = FindWorkerNodeAnyCluster(primaryHostname, primaryPort); + + if (primaryWorkerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("primary node %s:%d not found in pg_dist_node", + primaryHostname, primaryPort))); + } + + /* Future-proofing: Ideally, a primary node should not itself be a replica. + * This check might be more relevant once replica promotion logic exists. + * For now, pg_dist_node.nodeisreplica defaults to false for existing nodes. + */ + if (primaryWorkerNode->nodeisreplica) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("primary node %s:%d is itself a replica and cannot have replicas", + primaryHostname, primaryPort))); + } + + if (!primaryWorkerNode->shouldHaveShards) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("primary node %s:%d does not have shards, node without shards cannot have replicas", + primaryHostname, primaryPort))); + } + + WorkerNode *existingReplicaNode = FindWorkerNodeAnyCluster(replicaHostname, replicaPort); + if (existingReplicaNode != NULL) + { + /* + * Idempotency check: If the node already exists, is it already correctly + * registered as a replica for THIS primary? + */ + if (existingReplicaNode->nodeisreplica && + existingReplicaNode->nodeprimarynodeid == primaryWorkerNode->nodeId) + { + ereport(NOTICE, (errmsg("node %s:%d is already registered as a replica for primary %s:%d (nodeid %d)", + replicaHostname, replicaPort, + primaryHostname, primaryPort, primaryWorkerNode->nodeId))); + PG_RETURN_INT32(existingReplicaNode->nodeId); + } + else + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("a different node %s:%d (nodeid %d) already exists or is a replica for a different primary", + replicaHostname, replicaPort, existingReplicaNode->nodeId))); + } + } + EnsureValidStreamingReplica(primaryWorkerNode, replicaHostname, replicaPort); + + NodeMetadata nodeMetadata = DefaultNodeMetadata(); + + nodeMetadata.nodeisreplica = true; + nodeMetadata.nodeprimarynodeid = primaryWorkerNode->nodeId; + nodeMetadata.isActive = false; /* Replicas start as inactive */ + nodeMetadata.shouldHaveShards = false; /* Replicas do not directly own primary shards */ + nodeMetadata.groupId = primaryWorkerNode->groupId; /* Same group as primary */ + nodeMetadata.nodeRole = UnavailableNodeRoleId(); /* The node role is set to 'unavailable' */ + nodeMetadata.nodeCluster = primaryWorkerNode->nodeCluster; /* Same cluster as primary */ + /* Other fields like hasMetadata, metadataSynced will take defaults from DefaultNodeMetadata + * (typically true, true for hasMetadata and metadataSynced if it's a new node, + * or might need adjustment based on replica strategy) + * For now, let's assume DefaultNodeMetadata provides suitable defaults for these + * or they will be set by AddNodeMetadata/ActivateNodeList if needed. + * Specifically, hasMetadata is often true, and metadataSynced true after activation. + * Since this replica is inactive, metadata sync status might be less critical initially. + */ + + bool nodeAlreadyExists = false; + bool localOnly = false; /* Propagate change to other workers with metadata */ + + /* + * AddNodeMetadata will take an ExclusiveLock on pg_dist_node. + * It also checks again if the node already exists after acquiring the lock. + */ + int replicaNodeId = AddNodeMetadata(replicaHostname, replicaPort, &nodeMetadata, + &nodeAlreadyExists, localOnly); + + if (nodeAlreadyExists) + { + /* This case should ideally be caught by the FindWorkerNodeAnyCluster check above, + * but AddNodeMetadata does its own check after locking. + * If it already exists and is correctly configured, we might have returned NOTICE above. + * If it exists but is NOT correctly configured as our replica, an ERROR would be more appropriate. + * AddNodeMetadata returns the existing node's ID if it finds one. + * We need to ensure it is the *correct* replica. + */ + WorkerNode *fetchedExistingNode = FindNodeAnyClusterByNodeId(replicaNodeId); + if (fetchedExistingNode != NULL && fetchedExistingNode->nodeisreplica && + fetchedExistingNode->nodeprimarynodeid == primaryWorkerNode->nodeId) + { + ereport(NOTICE, (errmsg("node %s:%d was already correctly registered as a replica for primary %s:%d (nodeid %d)", + replicaHostname, replicaPort, + primaryHostname, primaryPort, primaryWorkerNode->nodeId))); + /* Intentional fall-through to return replicaNodeId */ + } + else + { + /* This state is less expected if our initial check passed or errored. */ + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("node %s:%d already exists but is not correctly configured as a replica for primary %s:%d", + replicaHostname, replicaPort, primaryHostname, primaryPort))); + } + } + + TransactionModifiedNodeMetadata = true; + + /* + * Note: Replicas added this way are inactive. + * A separate mechanism or UDF (e.g., citus_activate_replica_node or citus_promote_replica_node) + * would be needed to activate them, potentially after verifying replication lag, etc. + * Activation would typically involve: + * 1. Setting isActive = true. + * 2. Ensuring metadata is synced (hasMetadata=true, metadataSynced=true). + * 3. Potentially other logic like adding to specific scheduler lists. + * For now, citus_add_node_replica just registers it. + */ + + PG_RETURN_INT32(replicaNodeId); +} + + /* * SetLockTimeoutLocally sets the lock_timeout to the given value. * This setting is local. @@ -1871,7 +2022,7 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort) * FindNodeAnyClusterByNodeId searches pg_dist_node and returns the node with * the nodeId. If the node can't be found returns NULL. */ -static WorkerNode * +WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId) { bool includeNodesFromOtherClusters = true; @@ -2924,6 +3075,10 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum; values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum( nodeMetadata->shouldHaveShards); + values[Anum_pg_dist_node_nodeisreplica - 1] = BoolGetDatum( + nodeMetadata->nodeisreplica); + values[Anum_pg_dist_node_nodeprimarynodeid - 1] = Int32GetDatum( + nodeMetadata->nodeprimarynodeid); Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock); @@ -3014,8 +3169,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) /* * This function can be called before "ALTER TABLE ... ADD COLUMN nodecluster ...", - * therefore heap_deform_tuple() won't set the isNullArray for this column. We - * initialize it true to be safe in that case. + * and other columns. We initialize isNullArray to true to be safe. */ memset(isNullArray, true, sizeof(isNullArray)); @@ -3043,6 +3197,25 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) datumArray[Anum_pg_dist_node_shouldhaveshards - 1]); + /* nodeisreplica and nodeprimarynodeid might be null if row is from older version */ + if (!isNullArray[Anum_pg_dist_node_nodeisreplica - 1]) + { + workerNode->nodeisreplica = DatumGetBool(datumArray[Anum_pg_dist_node_nodeisreplica - 1]); + } + else + { + workerNode->nodeisreplica = false; /* Default value */ + } + + if (!isNullArray[Anum_pg_dist_node_nodeprimarynodeid - 1]) + { + workerNode->nodeprimarynodeid = DatumGetInt32(datumArray[Anum_pg_dist_node_nodeprimarynodeid - 1]); + } + else + { + workerNode->nodeprimarynodeid = 0; /* Default value for InvalidNodeId */ + } + /* * nodecluster column can be missing. In the case of extension creation/upgrade, * master_initialize_node_metadata function is called before the nodecluster @@ -3285,3 +3458,173 @@ SyncNodeMetadata(MetadataSyncContext *context) */ SendOrCollectCommandListToActivatedNodes(context, recreateNodeSnapshotCommandList); } + +/* + * EnsureValidStreamingReplica checks if the given replica is a valid streaming + * replica. It connects to the replica and checks if it is in recovery mode. + * If it is not, it errors out. + * It also checks the system identifier of the replica and the primary + * to ensure they match. + */ +static void +EnsureValidStreamingReplica(WorkerNode *primaryWorkerNode, char* replicaHostname, int replicaPort) +{ + + int connectionFlag = FORCE_NEW_CONNECTION; + MultiConnection *replicaConnection = GetNodeConnection(connectionFlag, replicaHostname, + replicaPort); + + const char *replica_recovery_query = "SELECT pg_is_in_recovery()"; + + int resultCode = SendRemoteCommand(replicaConnection, replica_recovery_query); + + if (resultCode == 0) + { + CloseConnection(replicaConnection); + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to %s:%d to execute pg_is_in_recovery()", + replicaHostname, replicaPort))); + + } + + PGresult *result = GetRemoteCommandResult(replicaConnection, true); + + if (result == NULL) + { + CloseConnection(replicaConnection); + + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("node %s:%d does not support pg_is_in_recovery()", + replicaHostname, replicaPort))); + } + + List *sizeList = ReadFirstColumnAsText(result); + if (list_length(sizeList) != 1) + { + PQclear(result); + ClearResults(replicaConnection, true); + CloseConnection(replicaConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot parse pg_is_in_recovery() result from %s:%d", + replicaHostname, + replicaPort))); + + } + + StringInfo isInRecoveryQueryResInfo = (StringInfo) linitial(sizeList); + char *isInRecoveryQueryResStr = isInRecoveryQueryResInfo->data; + + if (strcmp(isInRecoveryQueryResStr, "t") != 0 && strcmp(isInRecoveryQueryResStr, "true") != 0) + { + PQclear(result); + ClearResults(replicaConnection, true); + CloseConnection(replicaConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("%s:%d is not a streaming replica", + replicaHostname, + replicaPort))); + } + + PQclear(result); + ForgetResults(replicaConnection); + + /* Step2: Get the system identifier from replica */ + const char *sysidQuery = + "SELECT system_identifier FROM pg_control_system()"; + + resultCode = SendRemoteCommand(replicaConnection, sysidQuery); + + if (resultCode == 0) + { + CloseConnection(replicaConnection); + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to %s:%d to get system identifier", + replicaHostname, replicaPort))); + + } + + result = GetRemoteCommandResult(replicaConnection, true); + if (!IsResponseOK(result)) + { + ReportResultError(replicaConnection, result, ERROR); + } + + List *sysidList = ReadFirstColumnAsText(result); + if (list_length(sysidList) != 1) + { + PQclear(result); + ClearResults(replicaConnection, true); + CloseConnection(replicaConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot parse get system identifier result from %s:%d", + replicaHostname, + replicaPort))); + } + + StringInfo sysidQueryResInfo = (StringInfo) linitial(sysidList); + char *sysidQueryResStr = sysidQueryResInfo->data; + + ereport (NOTICE, (errmsg("system identifier of %s:%d is %s", + replicaHostname, replicaPort, sysidQueryResStr))); + + /* We do not need the connection anymore */ + PQclear(result); + ForgetResults(replicaConnection); + CloseConnection(replicaConnection); + + /* Step3: Get system identifier from primary */ + int primaryConnectionFlag = 0; + MultiConnection *primaryConnection = GetNodeConnection(primaryConnectionFlag, + primaryWorkerNode->workerName, + primaryWorkerNode->workerPort); + int primaryResultCode = SendRemoteCommand(primaryConnection, sysidQuery); + if (primaryResultCode == 0) + { + CloseConnection(primaryConnection); + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to %s:%d to get system identifier", + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); + + } + PGresult *primaryResult = GetRemoteCommandResult(primaryConnection, true); + if (primaryResult == NULL) + { + CloseConnection(primaryConnection); + + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("node %s:%d does not support pg_control_system queries", + primaryWorkerNode->workerName, primaryWorkerNode->workerPort))); + } + List *primarySizeList = ReadFirstColumnAsText(primaryResult); + if (list_length(primarySizeList) != 1) + { + PQclear(primaryResult); + ClearResults(primaryConnection, true); + CloseConnection(primaryConnection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot parse get system identifier result from %s:%d", + primaryWorkerNode->workerName, + primaryWorkerNode->workerPort))); + + } + StringInfo primarySysidQueryResInfo = (StringInfo) linitial(primarySizeList); + char *primarySysidQueryResStr = primarySysidQueryResInfo->data; + + ereport (NOTICE, (errmsg("system identifier of %s:%d is %s", + primaryWorkerNode->workerName, primaryWorkerNode->workerPort, primarySysidQueryResStr))); + /* verify both identifiers */ + if (strcmp(sysidQueryResStr, primarySysidQueryResStr) != 0) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("system identifiers do not match: %s (replica) vs %s (primary)", + sysidQueryResStr, primarySysidQueryResStr))); + } + PQclear(primaryResult); + ClearResults(primaryConnection, true); + CloseConnection(primaryConnection); +} + diff --git a/src/backend/distributed/sql/cat_upgrades/add_replica_info_to_pg_dist_node.sql b/src/backend/distributed/sql/cat_upgrades/add_replica_info_to_pg_dist_node.sql new file mode 100644 index 000000000..8ca8ae239 --- /dev/null +++ b/src/backend/distributed/sql/cat_upgrades/add_replica_info_to_pg_dist_node.sql @@ -0,0 +1,7 @@ +-- Add replica information columns to pg_dist_node +ALTER TABLE pg_catalog.pg_dist_node ADD COLUMN nodeisreplica BOOLEAN NOT NULL DEFAULT FALSE; +ALTER TABLE pg_catalog.pg_dist_node ADD COLUMN nodeprimarynodeid INT4 NOT NULL DEFAULT 0; + +-- Add a comment to the table and columns for clarity in \d output +COMMENT ON COLUMN pg_catalog.pg_dist_node.nodeisreplica IS 'Indicates if this node is a replica of another node.'; +COMMENT ON COLUMN pg_catalog.pg_dist_node.nodeprimarynodeid IS 'If nodeisreplica is true, this stores the nodeid of its primary node.'; diff --git a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql index 1599b21f1..8e0acec75 100644 --- a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql +++ b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql @@ -52,6 +52,9 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits; #include "udfs/citus_stat_counters_reset/13.1-1.sql" #include "udfs/citus_nodes/13.1-1.sql" +#include "cat_upgrades/add_replica_info_to_pg_dist_node.sql" +#include "udfs/citus_add_replica_node/13.1-1.sql" + -- Since shard_name/13.1-1.sql first drops the function and then creates it, we first -- need to drop citus_shards view since that view depends on this function. And immediately -- after creating the function, we recreate citus_shards view again. diff --git a/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql new file mode 100644 index 000000000..f936c6af7 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_add_replica_node/13.1-1.sql @@ -0,0 +1,13 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_add_replica_node( + replica_hostname text, + replica_port integer, + primary_hostname text, + primary_port integer) + RETURNS INTEGER + LANGUAGE C VOLATILE STRICT + AS 'MODULE_PATHNAME', $$citus_add_replica_node$$; + +COMMENT ON FUNCTION pg_catalog.citus_add_replica_node(text, integer, text, integer) IS +'Adds a new node as a replica of an existing primary node. The replica is initially inactive. Returns the nodeid of the new replica node.'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_add_replica_node(text, int, text, int) FROM PUBLIC; diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index f1120497b..479a0af8b 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -298,6 +298,7 @@ extern Oid CitusDependentObjectFuncId(void); /* enum oids */ extern Oid PrimaryNodeRoleId(void); extern Oid SecondaryNodeRoleId(void); +extern Oid UnavailableNodeRoleId(void); extern Oid CitusCopyFormatTypeId(void); extern Oid TextCopyFormatId(void); extern Oid BinaryCopyFormatId(void); diff --git a/src/include/distributed/pg_dist_node.h b/src/include/distributed/pg_dist_node.h index 371c2a35a..de23a4255 100644 --- a/src/include/distributed/pg_dist_node.h +++ b/src/include/distributed/pg_dist_node.h @@ -20,7 +20,7 @@ * in particular their OUT parameters) must be changed whenever the definition of * pg_dist_node changes. */ -#define Natts_pg_dist_node 11 +#define Natts_pg_dist_node 13 #define Anum_pg_dist_node_nodeid 1 #define Anum_pg_dist_node_groupid 2 #define Anum_pg_dist_node_nodename 3 @@ -32,6 +32,8 @@ #define Anum_pg_dist_node_nodecluster 9 #define Anum_pg_dist_node_metadatasynced 10 #define Anum_pg_dist_node_shouldhaveshards 11 +#define Anum_pg_dist_node_nodeisreplica 12 +#define Anum_pg_dist_node_nodeprimarynodeid 13 #define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq" #define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq" diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 02a43fe0b..93cc6b290 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -54,6 +54,8 @@ typedef struct WorkerNode char nodeCluster[NAMEDATALEN]; /* the cluster the node is a part of */ bool metadataSynced; /* node has the most recent metadata */ bool shouldHaveShards; /* if the node should have distributed table shards on it or not */ + bool nodeisreplica; /* whether this node is a replica */ + int32 nodeprimarynodeid; /* nodeid of the primary for this replica */ } WorkerNode; @@ -84,6 +86,7 @@ extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort); extern WorkerNode * FindNodeWithNodeId(int nodeId, bool missingOk); +extern WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId); extern WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); extern List * ReadDistNode(bool includeNodesFromOtherClusters); extern void EnsureCoordinator(void);