diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 225b78bd4..4fb789c0d 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -53,19 +53,27 @@ int GroupSize = 1; /* config variable managed via guc.c */ char *CurrentCluster = "default"; +typedef struct NodeMetadata +{ + int32 groupId; + char *nodeRack; + bool hasMetadata; + bool isActive; + Oid nodeRole; + char *nodeCluster; +} NodeMetadata; + /* local function forward declarations */ static int ActivateNode(char *nodeName, int nodePort); static void RemoveNodeFromCluster(char *nodeName, int32 nodePort); -static int AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, - char *nodeRack, bool hasMetadata, bool isActive, - Oid nodeRole, char *nodeCluster, bool *nodeAlreadyExists); +static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata + *nodeMetadata, bool *nodeAlreadyExists); static void SetNodeState(char *nodeName, int32 nodePort, bool isActive); static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort); static int32 GetNextGroupId(void); static int GetNextNodeId(void); -static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, int32 groupId, - char *nodeRack, bool hasMetadata, bool isActive, Oid nodeRole, - char *nodeCluster); +static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata + *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); static List * ParseWorkerNodeFileAndRename(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); @@ -83,6 +91,20 @@ PG_FUNCTION_INFO_V1(master_initialize_node_metadata); PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column); +/* + * DefaultNodeMetadata creates a NodeMetadata struct with the fields set to + * sane defaults, e.g. nodeRack = WORKER_DEFAULT_RACK. + */ +static NodeMetadata +DefaultNodeMetadata() +{ + NodeMetadata nodeMetadata = { + .nodeRack = WORKER_DEFAULT_RACK, + }; + return nodeMetadata; +} + + /* * master_add_node function adds a new node to the cluster and returns its id. It also * replicates all reference tables to the new node. @@ -93,15 +115,12 @@ master_add_node(PG_FUNCTION_ARGS) text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); - int32 groupId = PG_GETARG_INT32(2); - Oid nodeRole = InvalidOid; - char *nodeClusterString = NULL; - char *nodeRack = WORKER_DEFAULT_RACK; - bool hasMetadata = false; - bool isActive = false; - bool nodeAlreadyExists = false; int nodeId = 0; + NodeMetadata nodeMetadata = DefaultNodeMetadata(); + bool nodeAlreadyExists = false; + nodeMetadata.groupId = PG_GETARG_INT32(2); + CheckCitusVersion(ERROR); /* @@ -110,19 +129,18 @@ master_add_node(PG_FUNCTION_ARGS) */ if (PG_NARGS() == 3) { - nodeRole = InvalidOid; - nodeClusterString = "default"; + nodeMetadata.nodeRole = InvalidOid; + nodeMetadata.nodeCluster = "default"; } else { Name nodeClusterName = PG_GETARG_NAME(4); - nodeClusterString = NameStr(*nodeClusterName); + nodeMetadata.nodeCluster = NameStr(*nodeClusterName); - nodeRole = PG_GETARG_OID(3); + nodeMetadata.nodeRole = PG_GETARG_OID(3); } - nodeId = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata, isActive, nodeRole, nodeClusterString, + nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, &nodeAlreadyExists); /* @@ -150,20 +168,18 @@ master_add_inactive_node(PG_FUNCTION_ARGS) text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); - int32 groupId = PG_GETARG_INT32(2); - Oid nodeRole = PG_GETARG_OID(3); Name nodeClusterName = PG_GETARG_NAME(4); - char *nodeClusterString = NameStr(*nodeClusterName); - char *nodeRack = WORKER_DEFAULT_RACK; - bool hasMetadata = false; - bool isActive = false; + + NodeMetadata nodeMetadata = DefaultNodeMetadata(); bool nodeAlreadyExists = false; int nodeId = 0; + nodeMetadata.groupId = PG_GETARG_INT32(2); + nodeMetadata.nodeRole = PG_GETARG_OID(3); + nodeMetadata.nodeCluster = NameStr(*nodeClusterName); CheckCitusVersion(ERROR); - nodeId = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata, isActive, nodeRole, nodeClusterString, + nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, &nodeAlreadyExists); PG_RETURN_INT32(nodeId); @@ -184,21 +200,20 @@ master_add_secondary_node(PG_FUNCTION_ARGS) text *primaryName = PG_GETARG_TEXT_P(2); int32 primaryPort = PG_GETARG_INT32(3); char *primaryNameString = text_to_cstring(primaryName); - int32 groupId = GroupForNode(primaryNameString, primaryPort); - Oid nodeRole = SecondaryNodeRoleId(); Name nodeClusterName = PG_GETARG_NAME(4); - char *nodeClusterString = NameStr(*nodeClusterName); - char *nodeRack = WORKER_DEFAULT_RACK; - bool hasMetadata = false; - bool isActive = true; + NodeMetadata nodeMetadata = DefaultNodeMetadata(); bool nodeAlreadyExists = false; int nodeId = 0; + nodeMetadata.groupId = GroupForNode(primaryNameString, primaryPort); + nodeMetadata.nodeCluster = NameStr(*nodeClusterName); + nodeMetadata.nodeRole = SecondaryNodeRoleId(); + nodeMetadata.isActive = true; + CheckCitusVersion(ERROR); - nodeId = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata, isActive, nodeRole, nodeClusterString, + nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, &nodeAlreadyExists); PG_RETURN_INT32(nodeId); @@ -638,11 +653,6 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) { ListCell *workerNodeCell = NULL; List *workerNodes = NIL; - bool nodeAlreadyExists = false; - - /* nodeRole and nodeCluster don't exist when this function is caled */ - Oid nodeRole = InvalidOid; - char *nodeCluster = WORKER_DEFAULT_CLUSTER; CheckCitusVersion(ERROR); @@ -658,10 +668,13 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) foreach(workerNodeCell, workerNodes) { WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + bool nodeAlreadyExists = false; + NodeMetadata nodeMetadata = DefaultNodeMetadata(); + nodeMetadata.nodeRack = workerNode->workerRack; + nodeMetadata.isActive = workerNode->isActive; - AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0, - workerNode->workerRack, false, workerNode->isActive, - nodeRole, nodeCluster, &nodeAlreadyExists); + AddNodeMetadata(workerNode->workerName, workerNode->workerPort, &nodeMetadata, + &nodeAlreadyExists); } PG_RETURN_BOOL(true); @@ -959,8 +972,8 @@ CountPrimariesWithMetadata(void) * pg_dist_node as well as the nodes with hasmetadata=true. */ static int -AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, - bool hasMetadata, bool isActive, Oid nodeRole, char *nodeCluster, +AddNodeMetadata(char *nodeName, int32 nodePort, + NodeMetadata *nodeMetadata, bool *nodeAlreadyExists) { int nextNodeIdInt = 0; @@ -989,25 +1002,30 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, } /* user lets Citus to decide on the group that the newly added node should be in */ - if (groupId == 0) + if (nodeMetadata->groupId == 0) { - groupId = GetNextGroupId(); + nodeMetadata->groupId = GetNextGroupId(); } /* if nodeRole hasn't been added yet there's a constraint for one-node-per-group */ - if (nodeRole != InvalidOid && nodeRole == PrimaryNodeRoleId()) + if (nodeMetadata->nodeRole != InvalidOid && nodeMetadata->nodeRole == + PrimaryNodeRoleId()) { - WorkerNode *existingPrimaryNode = PrimaryNodeForGroup(groupId, NULL); + WorkerNode *existingPrimaryNode = PrimaryNodeForGroup(nodeMetadata->groupId, + NULL); if (existingPrimaryNode != NULL) { - ereport(ERROR, (errmsg("group %d already has a primary node", groupId))); + ereport(ERROR, (errmsg("group %d already has a primary node", + nodeMetadata->groupId))); } } - if (nodeRole == PrimaryNodeRoleId()) + if (nodeMetadata->nodeRole == PrimaryNodeRoleId()) { - if (strncmp(nodeCluster, WORKER_DEFAULT_CLUSTER, WORKER_LENGTH) != 0) + if (strncmp(nodeMetadata->nodeCluster, + WORKER_DEFAULT_CLUSTER, + WORKER_LENGTH) != 0) { ereport(ERROR, (errmsg("primaries must be added to the default cluster"))); } @@ -1016,8 +1034,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, /* generate the new node id from the sequence */ nextNodeIdInt = GetNextNodeId(); - InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata, - isActive, nodeRole, nodeCluster); + InsertNodeRow(nextNodeIdInt, nodeName, nodePort, nodeMetadata); workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); @@ -1216,8 +1233,7 @@ EnsureCoordinator(void) * an existing group. If you don't it's possible for the metadata to become inconsistent. */ static void -InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, - bool hasMetadata, bool isActive, Oid nodeRole, char *nodeCluster) +InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata) { Relation pgDistNode = NULL; TupleDesc tupleDescriptor = NULL; @@ -1225,7 +1241,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, int32 groupId, char *n Datum values[Natts_pg_dist_node]; bool isNulls[Natts_pg_dist_node]; - Datum nodeClusterStringDatum = CStringGetDatum(nodeCluster); + Datum nodeClusterStringDatum = CStringGetDatum(nodeMetadata->nodeCluster); Datum nodeClusterNameDatum = DirectFunctionCall1(namein, nodeClusterStringDatum); /* form new shard tuple */ @@ -1233,13 +1249,13 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, int32 groupId, char *n memset(isNulls, false, sizeof(isNulls)); values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid); - values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(groupId); + values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(nodeMetadata->groupId); values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName); values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort); - values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack); - values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata); - values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(isActive); - values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeRole); + values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeMetadata->nodeRack); + values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(nodeMetadata->hasMetadata); + values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(nodeMetadata->isActive); + values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeMetadata->nodeRole); values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum; pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);