Citus builds

pull/801/head
Brian Cloutier 2016-09-22 18:02:16 +03:00
parent d1c44d5436
commit 4a6348accc
5 changed files with 11 additions and 131 deletions

View File

@ -122,7 +122,6 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
}
targetNode = palloc0(sizeof(WorkerNode));
targetNode->workerActive = true;
strlcpy(targetNode->workerName, targetPlacement->nodeName, WORKER_LENGTH);
targetNode->workerPort = targetPlacement->nodePort;

View File

@ -237,15 +237,12 @@ WorkerGetNodeWithName(const char *hostname)
while ((workerNode = hash_seq_search(&status)) != NULL)
{
if (workerNode->workerActive)
int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH);
if (nameCompare == 0)
{
int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH);
if (nameCompare == 0)
{
/* we need to terminate the scan since we break */
hash_seq_term(&status);
break;
}
/* we need to terminate the scan since we break */
hash_seq_term(&status);
break;
}
}
@ -266,10 +263,7 @@ WorkerGetLiveNodeCount(void)
while ((workerNode = hash_seq_search(&status)) != NULL)
{
if (workerNode->workerActive)
{
liveWorkerCount++;
}
liveWorkerCount++;
}
return liveWorkerCount;
@ -292,10 +286,7 @@ WorkerNodeList(void)
while ((workerNode = hash_seq_search(&status)) != NULL)
{
if (workerNode->workerActive)
{
workerNodeList = lappend(workerNodeList, workerNode);
}
workerNodeList = lappend(workerNodeList, workerNode);
}
return workerNodeList;
@ -310,7 +301,6 @@ WorkerNodeList(void)
bool
WorkerNodeActive(const char *nodeName, uint32 nodePort)
{
bool workerNodeActive = false;
WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status;
HTAB *workerNodeHash = GetWorkerNodeHash();
@ -328,15 +318,7 @@ WorkerNodeActive(const char *nodeName, uint32 nodePort)
}
}
if (workerNode != NULL)
{
if (workerNode->workerActive)
{
workerNodeActive = true;
}
}
return workerNodeActive;
return workerNode != NULL;
}
@ -385,7 +367,7 @@ FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList)
{
bool listMember = ListMember(currentNodeList, workerNode);
if (workerNode->workerActive && !listMember)
if (!listMember)
{
lookForWorkerNode = false;
}

View File

@ -1029,8 +1029,6 @@ InitializeWorkerNodeCache(void)
/* fill the newly allocated workerNode in the cache */
strlcpy(workerNode->workerName, currentNode->workerName, WORKER_LENGTH);
workerNode->workerPort = currentNode->workerPort;
workerNode->workerActive = currentNode->workerActive;
workerNode->workerRole = currentNode->workerRole;
workerNode->groupId = currentNode->groupId;
if (handleFound)
@ -1421,8 +1419,7 @@ ReadWorkerNodes()
* given values into that system catalog.
*/
void
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, char nodeRole,
bool nodeActive, uint32 groupId)
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId)
{
Relation pgDistNode = NULL;
TupleDesc tupleDescriptor = NULL;
@ -1435,8 +1432,6 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, char nodeRole,
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid);
values[Anum_pg_dist_node_noderole - 1] = CharGetDatum(nodeRole);
values[Anum_pg_dist_node_nodeactive - 1] = BoolGetDatum(nodeActive);
values[Anum_pg_dist_node_groupid - 1] = UInt32GetDatum(groupId);
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName);
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort);
@ -1460,60 +1455,6 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, char nodeRole,
}
/*
* UpdateNodeActiveColumn updates the nodeactive column of the given worker node
* on the pg_dist_node. The function also invalidates the pg_dist_node's cache so
* that subsequent accesses to the table reads the updated values.
*/
void
UpdateNodeActiveColumn(WorkerNode *workerNode, bool nodeActive)
{
Relation pgDistNode = NULL;
HeapTuple heapTuple = NULL;
HeapTuple modifiableHeaptuple = NULL;
SysScanDesc scanDescriptor = NULL;
Form_pg_dist_node nodeForm = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
uint32 nodeId = workerNode->nodeId;
pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid,
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(nodeId));
scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, false, NULL,
scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for node %d", nodeId)));
}
/* create a copy of the tuple */
modifiableHeaptuple = heap_copytuple(heapTuple);
nodeForm = (Form_pg_dist_node) GETSTRUCT(modifiableHeaptuple);
/* now update the active column */
nodeForm->nodeactive = nodeActive;
simple_heap_update(pgDistNode, &heapTuple->t_self, modifiableHeaptuple);
systable_endscan(scanDescriptor);
heap_close(pgDistNode, AccessExclusiveLock);
/* invalidate the cache */
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
/* increment the counter so that next command can see the row */
CommandCounterIncrement();
}
/*
* TupleToWorkerNode takes in a heap tuple from pg_dist_node, and
* converts this tuple to an equivalent struct in memory. The function assumes
@ -1527,10 +1468,6 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
Datum nodeId = heap_getattr(heapTuple, Anum_pg_dist_node_nodeid,
tupleDescriptor, &isNull);
Datum nodeRole = heap_getattr(heapTuple, Anum_pg_dist_node_noderole,
tupleDescriptor, &isNull);
Datum nodeActive = heap_getattr(heapTuple, Anum_pg_dist_node_nodeactive,
tupleDescriptor, &isNull);
Datum groupId = heap_getattr(heapTuple, Anum_pg_dist_node_groupid,
tupleDescriptor, &isNull);
Datum nodeName = heap_getattr(heapTuple, Anum_pg_dist_node_nodename,
@ -1543,9 +1480,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
workerNode->nodeId = DatumGetUInt32(nodeId);
workerNode->workerPort = DatumGetUInt32(nodePort);
workerNode->workerRole = DatumGetChar(nodeRole);
workerNode->groupId = DatumGetUInt32(groupId);
workerNode->workerActive = DatumGetBool(nodeActive);
strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH);
return workerNode;

View File

@ -47,8 +47,6 @@ static WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort);
static uint32 NextGroupId(void);
static uint32 GetMaxGroupId(void);
static uint64 GetNodeCountInGroup(uint32 groupId);
static char * InsertNodeCommand(uint32 nodeid, char *nodename, int nodeport,
uint32 groupId);
static List * ParseWorkerNodeFile(const char *workerNodeFilename);
/* declarations for dynamic loading */
@ -65,9 +63,6 @@ PG_FUNCTION_INFO_V1(master_get_next_groupid);
* If the groupId is not explicitly given by the user, the function picks the
* group that the new node should be in with respect to GroupSize. Then, the
* new node is inserted into the local pg_dist_node.
*
* TODO: The following will be added in the near future.
* Lastly, the new node is inserted to all other nodes' pg_dist_node table.
*/
Datum
cluster_add_node(PG_FUNCTION_ARGS)
@ -80,7 +75,6 @@ cluster_add_node(PG_FUNCTION_ARGS)
Relation pgDistNode = NULL;
Datum nextNodeId = 0;
int nextNodeIdInt = 0;
char *insertCommand = NULL;
Datum returnData = 0;
WorkerNode *workerNode = NULL;
@ -121,11 +115,6 @@ cluster_add_node(PG_FUNCTION_ARGS)
InsertNodeRow(nextNodeIdInt, nodeNameString, nodePort, groupId);
insertCommand = InsertNodeCommand(nextNodeIdInt, nodeNameString, nodePort, groupId);
/* TODO: enable this once we have fully metadata sync */
/* SendCommandToWorkersInParallel(insertCommand); */
heap_close(pgDistNode, AccessExclusiveLock);
/* fetch the worker node, and generate the output */
@ -312,29 +301,6 @@ GetNodeCountInGroup(uint32 groupId)
}
/*
* DistributionCreateCommands generates a commands that can be
* executed to replicate the metadata for a distributed table.
*/
static char *
InsertNodeCommand(uint32 nodeid, char *nodename, int nodeport, uint32 groupId)
{
StringInfo insertNodeCommand = makeStringInfo();
appendStringInfo(insertNodeCommand,
"INSERT INTO pg_dist_node " /*TODO: add a ON CONFLICT clause */
"(nodeid, nodename, nodeport, groupid) "
"VALUES "
"(%d, '%s', %d, '%c', %s , %d);",
nodeid,
nodename,
nodeport,
groupId);
return insertNodeCommand->data;
}
/*
* ParseWorkerNodeFile opens and parses the node name and node port from the
* specified configuration file.

View File

@ -26,7 +26,7 @@
/*
* In memory representation of pg_dist_node table elements. The elements are hold in
* WorkerNodeHash table. workerActive field is used to determine a worker node's liveliness.
* WorkerNodeHash table.
*/
typedef struct WorkerNode
{
@ -34,8 +34,6 @@ typedef struct WorkerNode
uint32 workerPort; /* node's port */
char workerName[WORKER_LENGTH]; /* node's name */
uint32 groupId; /* node's groupId; same for the nodes that are in the same group */
bool workerActive; /* should Citus utilize the node? */
} WorkerNode;