From 4a6348acccc0ac2867bfede254d83c6369221209 Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Thu, 22 Sep 2016 18:02:16 +0300 Subject: [PATCH] Citus builds --- .../distributed/master/master_repair_shards.c | 1 - .../distributed/master/worker_node_manager.c | 36 +++------- .../distributed/utils/metadata_cache.c | 67 +------------------ src/backend/distributed/utils/node_metadata.c | 34 ---------- src/include/distributed/worker_manager.h | 4 +- 5 files changed, 11 insertions(+), 131 deletions(-) diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index fd5876881..f7e94d51d 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -122,7 +122,6 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) } targetNode = palloc0(sizeof(WorkerNode)); - targetNode->workerActive = true; strlcpy(targetNode->workerName, targetPlacement->nodeName, WORKER_LENGTH); targetNode->workerPort = targetPlacement->nodePort; diff --git a/src/backend/distributed/master/worker_node_manager.c b/src/backend/distributed/master/worker_node_manager.c index 890117759..e0b4e38c6 100644 --- a/src/backend/distributed/master/worker_node_manager.c +++ b/src/backend/distributed/master/worker_node_manager.c @@ -237,15 +237,12 @@ WorkerGetNodeWithName(const char *hostname) while ((workerNode = hash_seq_search(&status)) != NULL) { - if (workerNode->workerActive) + int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH); + if (nameCompare == 0) { - int nameCompare = strncmp(workerNode->workerName, hostname, WORKER_LENGTH); - if (nameCompare == 0) - { - /* we need to terminate the scan since we break */ - hash_seq_term(&status); - break; - } + /* we need to terminate the scan since we break */ + hash_seq_term(&status); + break; } } @@ -266,10 +263,7 @@ WorkerGetLiveNodeCount(void) while ((workerNode = hash_seq_search(&status)) != NULL) { - if (workerNode->workerActive) - { - liveWorkerCount++; - } + liveWorkerCount++; } return liveWorkerCount; @@ -292,10 +286,7 @@ WorkerNodeList(void) while ((workerNode = hash_seq_search(&status)) != NULL) { - if (workerNode->workerActive) - { - workerNodeList = lappend(workerNodeList, workerNode); - } + workerNodeList = lappend(workerNodeList, workerNode); } return workerNodeList; @@ -310,7 +301,6 @@ WorkerNodeList(void) bool WorkerNodeActive(const char *nodeName, uint32 nodePort) { - bool workerNodeActive = false; WorkerNode *workerNode = NULL; HASH_SEQ_STATUS status; HTAB *workerNodeHash = GetWorkerNodeHash(); @@ -328,15 +318,7 @@ WorkerNodeActive(const char *nodeName, uint32 nodePort) } } - if (workerNode != NULL) - { - if (workerNode->workerActive) - { - workerNodeActive = true; - } - } - - return workerNodeActive; + return workerNode != NULL; } @@ -385,7 +367,7 @@ FindRandomNodeNotInList(HTAB *WorkerNodesHash, List *currentNodeList) { bool listMember = ListMember(currentNodeList, workerNode); - if (workerNode->workerActive && !listMember) + if (!listMember) { lookForWorkerNode = false; } diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 0e997bc0e..0b4271fb2 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -1029,8 +1029,6 @@ InitializeWorkerNodeCache(void) /* fill the newly allocated workerNode in the cache */ strlcpy(workerNode->workerName, currentNode->workerName, WORKER_LENGTH); workerNode->workerPort = currentNode->workerPort; - workerNode->workerActive = currentNode->workerActive; - workerNode->workerRole = currentNode->workerRole; workerNode->groupId = currentNode->groupId; if (handleFound) @@ -1421,8 +1419,7 @@ ReadWorkerNodes() * given values into that system catalog. */ void -InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, char nodeRole, - bool nodeActive, uint32 groupId) +InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId) { Relation pgDistNode = NULL; TupleDesc tupleDescriptor = NULL; @@ -1435,8 +1432,6 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, char nodeRole, memset(isNulls, false, sizeof(isNulls)); values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid); - values[Anum_pg_dist_node_noderole - 1] = CharGetDatum(nodeRole); - values[Anum_pg_dist_node_nodeactive - 1] = BoolGetDatum(nodeActive); values[Anum_pg_dist_node_groupid - 1] = UInt32GetDatum(groupId); values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName); values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort); @@ -1460,60 +1455,6 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, char nodeRole, } -/* - * UpdateNodeActiveColumn updates the nodeactive column of the given worker node - * on the pg_dist_node. The function also invalidates the pg_dist_node's cache so - * that subsequent accesses to the table reads the updated values. - */ -void -UpdateNodeActiveColumn(WorkerNode *workerNode, bool nodeActive) -{ - Relation pgDistNode = NULL; - HeapTuple heapTuple = NULL; - HeapTuple modifiableHeaptuple = NULL; - SysScanDesc scanDescriptor = NULL; - Form_pg_dist_node nodeForm = NULL; - ScanKeyData scanKey[1]; - int scanKeyCount = 1; - uint32 nodeId = workerNode->nodeId; - - pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); - - ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid, - BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(nodeId)); - - scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, false, NULL, - scanKeyCount, scanKey); - - heapTuple = systable_getnext(scanDescriptor); - if (!HeapTupleIsValid(heapTuple)) - { - ereport(ERROR, (errmsg("could not find valid entry for node %d", nodeId))); - } - - /* create a copy of the tuple */ - modifiableHeaptuple = heap_copytuple(heapTuple); - - nodeForm = (Form_pg_dist_node) GETSTRUCT(modifiableHeaptuple); - - /* now update the active column */ - nodeForm->nodeactive = nodeActive; - - simple_heap_update(pgDistNode, &heapTuple->t_self, modifiableHeaptuple); - - systable_endscan(scanDescriptor); - - heap_close(pgDistNode, AccessExclusiveLock); - - /* invalidate the cache */ - CitusInvalidateRelcacheByRelid(DistNodeRelationId()); - - /* increment the counter so that next command can see the row */ - CommandCounterIncrement(); -} - - - /* * TupleToWorkerNode takes in a heap tuple from pg_dist_node, and * converts this tuple to an equivalent struct in memory. The function assumes @@ -1527,10 +1468,6 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) Datum nodeId = heap_getattr(heapTuple, Anum_pg_dist_node_nodeid, tupleDescriptor, &isNull); - Datum nodeRole = heap_getattr(heapTuple, Anum_pg_dist_node_noderole, - tupleDescriptor, &isNull); - Datum nodeActive = heap_getattr(heapTuple, Anum_pg_dist_node_nodeactive, - tupleDescriptor, &isNull); Datum groupId = heap_getattr(heapTuple, Anum_pg_dist_node_groupid, tupleDescriptor, &isNull); Datum nodeName = heap_getattr(heapTuple, Anum_pg_dist_node_nodename, @@ -1543,9 +1480,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode)); workerNode->nodeId = DatumGetUInt32(nodeId); workerNode->workerPort = DatumGetUInt32(nodePort); - workerNode->workerRole = DatumGetChar(nodeRole); workerNode->groupId = DatumGetUInt32(groupId); - workerNode->workerActive = DatumGetBool(nodeActive); strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH); return workerNode; diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index eec0f6a0d..f2e5e295e 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -47,8 +47,6 @@ static WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort); static uint32 NextGroupId(void); static uint32 GetMaxGroupId(void); static uint64 GetNodeCountInGroup(uint32 groupId); -static char * InsertNodeCommand(uint32 nodeid, char *nodename, int nodeport, - uint32 groupId); static List * ParseWorkerNodeFile(const char *workerNodeFilename); /* declarations for dynamic loading */ @@ -65,9 +63,6 @@ PG_FUNCTION_INFO_V1(master_get_next_groupid); * If the groupId is not explicitly given by the user, the function picks the * group that the new node should be in with respect to GroupSize. Then, the * new node is inserted into the local pg_dist_node. - * - * TODO: The following will be added in the near future. - * Lastly, the new node is inserted to all other nodes' pg_dist_node table. */ Datum cluster_add_node(PG_FUNCTION_ARGS) @@ -80,7 +75,6 @@ cluster_add_node(PG_FUNCTION_ARGS) Relation pgDistNode = NULL; Datum nextNodeId = 0; int nextNodeIdInt = 0; - char *insertCommand = NULL; Datum returnData = 0; WorkerNode *workerNode = NULL; @@ -121,11 +115,6 @@ cluster_add_node(PG_FUNCTION_ARGS) InsertNodeRow(nextNodeIdInt, nodeNameString, nodePort, groupId); - insertCommand = InsertNodeCommand(nextNodeIdInt, nodeNameString, nodePort, groupId); - - /* TODO: enable this once we have fully metadata sync */ - /* SendCommandToWorkersInParallel(insertCommand); */ - heap_close(pgDistNode, AccessExclusiveLock); /* fetch the worker node, and generate the output */ @@ -312,29 +301,6 @@ GetNodeCountInGroup(uint32 groupId) } -/* - * DistributionCreateCommands generates a commands that can be - * executed to replicate the metadata for a distributed table. - */ -static char * -InsertNodeCommand(uint32 nodeid, char *nodename, int nodeport, uint32 groupId) -{ - StringInfo insertNodeCommand = makeStringInfo(); - - appendStringInfo(insertNodeCommand, - "INSERT INTO pg_dist_node " /*TODO: add a ON CONFLICT clause */ - "(nodeid, nodename, nodeport, groupid) " - "VALUES " - "(%d, '%s', %d, '%c', %s , %d);", - nodeid, - nodename, - nodeport, - groupId); - - return insertNodeCommand->data; -} - - /* * ParseWorkerNodeFile opens and parses the node name and node port from the * specified configuration file. diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 171567f3c..079433022 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -26,7 +26,7 @@ /* * In memory representation of pg_dist_node table elements. The elements are hold in - * WorkerNodeHash table. workerActive field is used to determine a worker node's liveliness. + * WorkerNodeHash table. */ typedef struct WorkerNode { @@ -34,8 +34,6 @@ typedef struct WorkerNode uint32 workerPort; /* node's port */ char workerName[WORKER_LENGTH]; /* node's name */ uint32 groupId; /* node's groupId; same for the nodes that are in the same group */ - - bool workerActive; /* should Citus utilize the node? */ } WorkerNode;