mirror of https://github.com/citusdata/citus.git
Refactor 9 argument function to use a struct (#2952)
For another PR I needed to add another column which would require to add another argument to an already 9 argument function signature. In this case it would be a boolean flag and there were already two boolean flags in there. In my experience it becomes really easy to mess up the order of these flags at that point. Especially because the type system doesn't distinguish between the 3 different booleans with completely different meanings. So I refactored these signatures to receive a struct containing most of these arguments. Like that you don't mess up orderening, because the meaning of the boolean is not order dependent but fieldname dependent. It also makes it possible to set good shared defaults for this struct.pull/2962/head
parent
48b7fbb9e5
commit
389086102a
|
@ -53,19 +53,27 @@ int GroupSize = 1;
|
|||
/* config variable managed via guc.c */
|
||||
char *CurrentCluster = "default";
|
||||
|
||||
typedef struct NodeMetadata
|
||||
{
|
||||
int32 groupId;
|
||||
char *nodeRack;
|
||||
bool hasMetadata;
|
||||
bool isActive;
|
||||
Oid nodeRole;
|
||||
char *nodeCluster;
|
||||
} NodeMetadata;
|
||||
|
||||
/* local function forward declarations */
|
||||
static int ActivateNode(char *nodeName, int nodePort);
|
||||
static void RemoveNodeFromCluster(char *nodeName, int32 nodePort);
|
||||
static int AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId,
|
||||
char *nodeRack, bool hasMetadata, bool isActive,
|
||||
Oid nodeRole, char *nodeCluster, bool *nodeAlreadyExists);
|
||||
static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata
|
||||
*nodeMetadata, bool *nodeAlreadyExists);
|
||||
static void SetNodeState(char *nodeName, int32 nodePort, bool isActive);
|
||||
static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort);
|
||||
static int32 GetNextGroupId(void);
|
||||
static int GetNextNodeId(void);
|
||||
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, int32 groupId,
|
||||
char *nodeRack, bool hasMetadata, bool isActive, Oid nodeRole,
|
||||
char *nodeCluster);
|
||||
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
|
||||
*nodeMetadata);
|
||||
static void DeleteNodeRow(char *nodename, int32 nodeport);
|
||||
static List * ParseWorkerNodeFileAndRename(void);
|
||||
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
|
||||
|
@ -83,6 +91,20 @@ PG_FUNCTION_INFO_V1(master_initialize_node_metadata);
|
|||
PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
|
||||
|
||||
|
||||
/*
|
||||
* DefaultNodeMetadata creates a NodeMetadata struct with the fields set to
|
||||
* sane defaults, e.g. nodeRack = WORKER_DEFAULT_RACK.
|
||||
*/
|
||||
static NodeMetadata
|
||||
DefaultNodeMetadata()
|
||||
{
|
||||
NodeMetadata nodeMetadata = {
|
||||
.nodeRack = WORKER_DEFAULT_RACK,
|
||||
};
|
||||
return nodeMetadata;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* master_add_node function adds a new node to the cluster and returns its id. It also
|
||||
* replicates all reference tables to the new node.
|
||||
|
@ -93,15 +115,12 @@ master_add_node(PG_FUNCTION_ARGS)
|
|||
text *nodeName = PG_GETARG_TEXT_P(0);
|
||||
int32 nodePort = PG_GETARG_INT32(1);
|
||||
char *nodeNameString = text_to_cstring(nodeName);
|
||||
int32 groupId = PG_GETARG_INT32(2);
|
||||
Oid nodeRole = InvalidOid;
|
||||
char *nodeClusterString = NULL;
|
||||
char *nodeRack = WORKER_DEFAULT_RACK;
|
||||
bool hasMetadata = false;
|
||||
bool isActive = false;
|
||||
bool nodeAlreadyExists = false;
|
||||
int nodeId = 0;
|
||||
|
||||
NodeMetadata nodeMetadata = DefaultNodeMetadata();
|
||||
bool nodeAlreadyExists = false;
|
||||
nodeMetadata.groupId = PG_GETARG_INT32(2);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/*
|
||||
|
@ -110,19 +129,18 @@ master_add_node(PG_FUNCTION_ARGS)
|
|||
*/
|
||||
if (PG_NARGS() == 3)
|
||||
{
|
||||
nodeRole = InvalidOid;
|
||||
nodeClusterString = "default";
|
||||
nodeMetadata.nodeRole = InvalidOid;
|
||||
nodeMetadata.nodeCluster = "default";
|
||||
}
|
||||
else
|
||||
{
|
||||
Name nodeClusterName = PG_GETARG_NAME(4);
|
||||
nodeClusterString = NameStr(*nodeClusterName);
|
||||
nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
|
||||
|
||||
nodeRole = PG_GETARG_OID(3);
|
||||
nodeMetadata.nodeRole = PG_GETARG_OID(3);
|
||||
}
|
||||
|
||||
nodeId = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
|
||||
hasMetadata, isActive, nodeRole, nodeClusterString,
|
||||
nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
|
||||
&nodeAlreadyExists);
|
||||
|
||||
/*
|
||||
|
@ -150,20 +168,18 @@ master_add_inactive_node(PG_FUNCTION_ARGS)
|
|||
text *nodeName = PG_GETARG_TEXT_P(0);
|
||||
int32 nodePort = PG_GETARG_INT32(1);
|
||||
char *nodeNameString = text_to_cstring(nodeName);
|
||||
int32 groupId = PG_GETARG_INT32(2);
|
||||
Oid nodeRole = PG_GETARG_OID(3);
|
||||
Name nodeClusterName = PG_GETARG_NAME(4);
|
||||
char *nodeClusterString = NameStr(*nodeClusterName);
|
||||
char *nodeRack = WORKER_DEFAULT_RACK;
|
||||
bool hasMetadata = false;
|
||||
bool isActive = false;
|
||||
|
||||
NodeMetadata nodeMetadata = DefaultNodeMetadata();
|
||||
bool nodeAlreadyExists = false;
|
||||
int nodeId = 0;
|
||||
nodeMetadata.groupId = PG_GETARG_INT32(2);
|
||||
nodeMetadata.nodeRole = PG_GETARG_OID(3);
|
||||
nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
nodeId = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
|
||||
hasMetadata, isActive, nodeRole, nodeClusterString,
|
||||
nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
|
||||
&nodeAlreadyExists);
|
||||
|
||||
PG_RETURN_INT32(nodeId);
|
||||
|
@ -184,21 +200,20 @@ master_add_secondary_node(PG_FUNCTION_ARGS)
|
|||
text *primaryName = PG_GETARG_TEXT_P(2);
|
||||
int32 primaryPort = PG_GETARG_INT32(3);
|
||||
char *primaryNameString = text_to_cstring(primaryName);
|
||||
int32 groupId = GroupForNode(primaryNameString, primaryPort);
|
||||
|
||||
Oid nodeRole = SecondaryNodeRoleId();
|
||||
Name nodeClusterName = PG_GETARG_NAME(4);
|
||||
char *nodeClusterString = NameStr(*nodeClusterName);
|
||||
char *nodeRack = WORKER_DEFAULT_RACK;
|
||||
bool hasMetadata = false;
|
||||
bool isActive = true;
|
||||
NodeMetadata nodeMetadata = DefaultNodeMetadata();
|
||||
bool nodeAlreadyExists = false;
|
||||
int nodeId = 0;
|
||||
|
||||
nodeMetadata.groupId = GroupForNode(primaryNameString, primaryPort);
|
||||
nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
|
||||
nodeMetadata.nodeRole = SecondaryNodeRoleId();
|
||||
nodeMetadata.isActive = true;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
nodeId = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
|
||||
hasMetadata, isActive, nodeRole, nodeClusterString,
|
||||
nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
|
||||
&nodeAlreadyExists);
|
||||
|
||||
PG_RETURN_INT32(nodeId);
|
||||
|
@ -638,11 +653,6 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
|
|||
{
|
||||
ListCell *workerNodeCell = NULL;
|
||||
List *workerNodes = NIL;
|
||||
bool nodeAlreadyExists = false;
|
||||
|
||||
/* nodeRole and nodeCluster don't exist when this function is caled */
|
||||
Oid nodeRole = InvalidOid;
|
||||
char *nodeCluster = WORKER_DEFAULT_CLUSTER;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
|
@ -658,10 +668,13 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
|
|||
foreach(workerNodeCell, workerNodes)
|
||||
{
|
||||
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
||||
bool nodeAlreadyExists = false;
|
||||
NodeMetadata nodeMetadata = DefaultNodeMetadata();
|
||||
nodeMetadata.nodeRack = workerNode->workerRack;
|
||||
nodeMetadata.isActive = workerNode->isActive;
|
||||
|
||||
AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0,
|
||||
workerNode->workerRack, false, workerNode->isActive,
|
||||
nodeRole, nodeCluster, &nodeAlreadyExists);
|
||||
AddNodeMetadata(workerNode->workerName, workerNode->workerPort, &nodeMetadata,
|
||||
&nodeAlreadyExists);
|
||||
}
|
||||
|
||||
PG_RETURN_BOOL(true);
|
||||
|
@ -959,8 +972,8 @@ CountPrimariesWithMetadata(void)
|
|||
* pg_dist_node as well as the nodes with hasmetadata=true.
|
||||
*/
|
||||
static int
|
||||
AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
||||
bool hasMetadata, bool isActive, Oid nodeRole, char *nodeCluster,
|
||||
AddNodeMetadata(char *nodeName, int32 nodePort,
|
||||
NodeMetadata *nodeMetadata,
|
||||
bool *nodeAlreadyExists)
|
||||
{
|
||||
int nextNodeIdInt = 0;
|
||||
|
@ -989,25 +1002,30 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
|||
}
|
||||
|
||||
/* user lets Citus to decide on the group that the newly added node should be in */
|
||||
if (groupId == 0)
|
||||
if (nodeMetadata->groupId == 0)
|
||||
{
|
||||
groupId = GetNextGroupId();
|
||||
nodeMetadata->groupId = GetNextGroupId();
|
||||
}
|
||||
|
||||
/* if nodeRole hasn't been added yet there's a constraint for one-node-per-group */
|
||||
if (nodeRole != InvalidOid && nodeRole == PrimaryNodeRoleId())
|
||||
if (nodeMetadata->nodeRole != InvalidOid && nodeMetadata->nodeRole ==
|
||||
PrimaryNodeRoleId())
|
||||
{
|
||||
WorkerNode *existingPrimaryNode = PrimaryNodeForGroup(groupId, NULL);
|
||||
WorkerNode *existingPrimaryNode = PrimaryNodeForGroup(nodeMetadata->groupId,
|
||||
NULL);
|
||||
|
||||
if (existingPrimaryNode != NULL)
|
||||
{
|
||||
ereport(ERROR, (errmsg("group %d already has a primary node", groupId)));
|
||||
ereport(ERROR, (errmsg("group %d already has a primary node",
|
||||
nodeMetadata->groupId)));
|
||||
}
|
||||
}
|
||||
|
||||
if (nodeRole == PrimaryNodeRoleId())
|
||||
if (nodeMetadata->nodeRole == PrimaryNodeRoleId())
|
||||
{
|
||||
if (strncmp(nodeCluster, WORKER_DEFAULT_CLUSTER, WORKER_LENGTH) != 0)
|
||||
if (strncmp(nodeMetadata->nodeCluster,
|
||||
WORKER_DEFAULT_CLUSTER,
|
||||
WORKER_LENGTH) != 0)
|
||||
{
|
||||
ereport(ERROR, (errmsg("primaries must be added to the default cluster")));
|
||||
}
|
||||
|
@ -1016,8 +1034,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
|||
/* generate the new node id from the sequence */
|
||||
nextNodeIdInt = GetNextNodeId();
|
||||
|
||||
InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata,
|
||||
isActive, nodeRole, nodeCluster);
|
||||
InsertNodeRow(nextNodeIdInt, nodeName, nodePort, nodeMetadata);
|
||||
|
||||
workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
|
||||
|
||||
|
@ -1216,8 +1233,7 @@ EnsureCoordinator(void)
|
|||
* an existing group. If you don't it's possible for the metadata to become inconsistent.
|
||||
*/
|
||||
static void
|
||||
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
||||
bool hasMetadata, bool isActive, Oid nodeRole, char *nodeCluster)
|
||||
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata)
|
||||
{
|
||||
Relation pgDistNode = NULL;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
|
@ -1225,7 +1241,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, int32 groupId, char *n
|
|||
Datum values[Natts_pg_dist_node];
|
||||
bool isNulls[Natts_pg_dist_node];
|
||||
|
||||
Datum nodeClusterStringDatum = CStringGetDatum(nodeCluster);
|
||||
Datum nodeClusterStringDatum = CStringGetDatum(nodeMetadata->nodeCluster);
|
||||
Datum nodeClusterNameDatum = DirectFunctionCall1(namein, nodeClusterStringDatum);
|
||||
|
||||
/* form new shard tuple */
|
||||
|
@ -1233,13 +1249,13 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, int32 groupId, char *n
|
|||
memset(isNulls, false, sizeof(isNulls));
|
||||
|
||||
values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid);
|
||||
values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(groupId);
|
||||
values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(nodeMetadata->groupId);
|
||||
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName);
|
||||
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort);
|
||||
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack);
|
||||
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata);
|
||||
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(isActive);
|
||||
values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeRole);
|
||||
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeMetadata->nodeRack);
|
||||
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(nodeMetadata->hasMetadata);
|
||||
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(nodeMetadata->isActive);
|
||||
values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeMetadata->nodeRole);
|
||||
values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
|
||||
|
||||
pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
|
||||
|
|
Loading…
Reference in New Issue