From 6dbce91e364261abdf2641a628901d06b095380a Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Fri, 23 Sep 2016 15:22:44 +0300 Subject: [PATCH] Implement cluster_remove_node sans error handling --- .../distributed/citus--5.2-1--5.2-2.sql | 2 +- .../distributed/utils/metadata_cache.c | 43 +++++++++++++++++++ src/backend/distributed/utils/node_metadata.c | 10 ++++- src/include/distributed/metadata_cache.h | 1 + 4 files changed, 54 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/citus--5.2-1--5.2-2.sql b/src/backend/distributed/citus--5.2-1--5.2-2.sql index 73facf447..65fcca413 100644 --- a/src/backend/distributed/citus--5.2-1--5.2-2.sql +++ b/src/backend/distributed/citus--5.2-1--5.2-2.sql @@ -45,7 +45,7 @@ COMMENT ON FUNCTION cluster_add_node(nodename text, IS 'add node to the cluster'; CREATE FUNCTION cluster_remove_node(nodename text, nodeport integer) - RETURNS record + RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$cluster_remove_node$$; COMMENT ON FUNCTION cluster_remove_node(nodename text, nodeport integer) diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 0b4271fb2..5c65a57cd 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -1455,6 +1455,49 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId) } +/* + * DeleteNodeRow removes the requested row if it exists + */ +void +DeleteNodeRow(char *nodeName, int32 nodePort) +{ + const int scanKeyCount = 2; + bool indexOK = false; + + HeapTuple heapTuple = NULL; + SysScanDesc heapScan; + ScanKeyData scanKey[scanKeyCount]; + + Relation pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename, + BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport, + BTEqualStrategyNumber, F_INT8EQ, Int32GetDatum(nodePort)); + + heapScan = systable_beginscan(pgDistNode, InvalidOid, indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(heapScan); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", + nodeName, nodePort))); + } + + simple_heap_delete(pgDistNode, &(heapTuple->t_self)); + + systable_endscan(heapScan); + heap_close(pgDistNode, AccessExclusiveLock); + + /* ensure future commands don't use the node we just removed */ + CitusInvalidateRelcacheByRelid(DistNodeRelationId()); + + /* increment the counter so that next command won't see the row */ + CommandCounterIncrement(); +} + + /* * TupleToWorkerNode takes in a heap tuple from pg_dist_node, and * converts this tuple to an equivalent struct in memory. The function assumes diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 412339522..75f469681 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -27,7 +27,6 @@ #include "distributed/metadata_cache.h" #include "distributed/pg_dist_node.h" #include "distributed/worker_manager.h" -#include "distributed/worker_transaction.h" #include "lib/stringinfo.h" #include "storage/lock.h" #include "storage/fd.h" @@ -131,6 +130,15 @@ cluster_remove_node(PG_FUNCTION_ARGS) { text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); + char *nodeNameString = text_to_cstring(nodeName); + + DeleteNodeRow(nodeNameString, nodePort); + + /* + * 1) lookup the node + * 2) ensure there are no existing shard placements for this node + * 3) remove the row + */ PG_RETURN_VOID(); } diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 435c775ae..b19c48b5e 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -54,6 +54,7 @@ extern bool IsDistributedTable(Oid relationId); extern ShardInterval * LoadShardInterval(uint64 shardId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId); +extern void DeleteNodeRow(char *nodename, int32 nodeport); extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateNodeCache(void);