mirror of https://github.com/citusdata/citus.git
Implement cluster_remove_node sans error handling
parent
098e37e082
commit
6dbce91e36
|
@ -45,7 +45,7 @@ COMMENT ON FUNCTION cluster_add_node(nodename text,
|
||||||
IS 'add node to the cluster';
|
IS 'add node to the cluster';
|
||||||
|
|
||||||
CREATE FUNCTION cluster_remove_node(nodename text, nodeport integer)
|
CREATE FUNCTION cluster_remove_node(nodename text, nodeport integer)
|
||||||
RETURNS record
|
RETURNS void
|
||||||
LANGUAGE C STRICT
|
LANGUAGE C STRICT
|
||||||
AS 'MODULE_PATHNAME', $$cluster_remove_node$$;
|
AS 'MODULE_PATHNAME', $$cluster_remove_node$$;
|
||||||
COMMENT ON FUNCTION cluster_remove_node(nodename text, nodeport integer)
|
COMMENT ON FUNCTION cluster_remove_node(nodename text, nodeport integer)
|
||||||
|
|
|
@ -1455,6 +1455,49 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DeleteNodeRow removes the requested row if it exists
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
DeleteNodeRow(char *nodeName, int32 nodePort)
|
||||||
|
{
|
||||||
|
const int scanKeyCount = 2;
|
||||||
|
bool indexOK = false;
|
||||||
|
|
||||||
|
HeapTuple heapTuple = NULL;
|
||||||
|
SysScanDesc heapScan;
|
||||||
|
ScanKeyData scanKey[scanKeyCount];
|
||||||
|
|
||||||
|
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
|
||||||
|
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename,
|
||||||
|
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
|
||||||
|
ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport,
|
||||||
|
BTEqualStrategyNumber, F_INT8EQ, Int32GetDatum(nodePort));
|
||||||
|
|
||||||
|
heapScan = systable_beginscan(pgDistNode, InvalidOid, indexOK,
|
||||||
|
NULL, scanKeyCount, scanKey);
|
||||||
|
|
||||||
|
heapTuple = systable_getnext(heapScan);
|
||||||
|
if (!HeapTupleIsValid(heapTuple))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
|
||||||
|
nodeName, nodePort)));
|
||||||
|
}
|
||||||
|
|
||||||
|
simple_heap_delete(pgDistNode, &(heapTuple->t_self));
|
||||||
|
|
||||||
|
systable_endscan(heapScan);
|
||||||
|
heap_close(pgDistNode, AccessExclusiveLock);
|
||||||
|
|
||||||
|
/* ensure future commands don't use the node we just removed */
|
||||||
|
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
|
||||||
|
|
||||||
|
/* increment the counter so that next command won't see the row */
|
||||||
|
CommandCounterIncrement();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TupleToWorkerNode takes in a heap tuple from pg_dist_node, and
|
* TupleToWorkerNode takes in a heap tuple from pg_dist_node, and
|
||||||
* converts this tuple to an equivalent struct in memory. The function assumes
|
* converts this tuple to an equivalent struct in memory. The function assumes
|
||||||
|
|
|
@ -27,7 +27,6 @@
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/pg_dist_node.h"
|
#include "distributed/pg_dist_node.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "distributed/worker_transaction.h"
|
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
#include "storage/lock.h"
|
#include "storage/lock.h"
|
||||||
#include "storage/fd.h"
|
#include "storage/fd.h"
|
||||||
|
@ -131,6 +130,15 @@ cluster_remove_node(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
text *nodeName = PG_GETARG_TEXT_P(0);
|
text *nodeName = PG_GETARG_TEXT_P(0);
|
||||||
int32 nodePort = PG_GETARG_INT32(1);
|
int32 nodePort = PG_GETARG_INT32(1);
|
||||||
|
char *nodeNameString = text_to_cstring(nodeName);
|
||||||
|
|
||||||
|
DeleteNodeRow(nodeNameString, nodePort);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* 1) lookup the node
|
||||||
|
* 2) ensure there are no existing shard placements for this node
|
||||||
|
* 3) remove the row
|
||||||
|
*/
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,6 +54,7 @@ extern bool IsDistributedTable(Oid relationId);
|
||||||
extern ShardInterval * LoadShardInterval(uint64 shardId);
|
extern ShardInterval * LoadShardInterval(uint64 shardId);
|
||||||
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
|
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
|
||||||
extern void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId);
|
extern void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId);
|
||||||
|
extern void DeleteNodeRow(char *nodename, int32 nodeport);
|
||||||
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
|
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
|
||||||
extern void CitusInvalidateNodeCache(void);
|
extern void CitusInvalidateNodeCache(void);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue