mirror of https://github.com/citusdata/citus.git
Fail cluster_remove_node if node has any shard placements
parent
6dbce91e36
commit
4ef57cb67a
|
@ -199,6 +199,43 @@ ShardLength(uint64 shardId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* NodeHasShardPlacements returns whether any shards are placed on this node
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
NodeHasShardPlacements(char *nodeName, int32 nodePort)
|
||||||
|
{
|
||||||
|
const int scanKeyCount = 2;
|
||||||
|
const bool indexOK = true;
|
||||||
|
|
||||||
|
bool hasPlacements = false;
|
||||||
|
|
||||||
|
HeapTuple heapTuple = NULL;
|
||||||
|
SysScanDesc scanDescriptor = NULL;
|
||||||
|
ScanKeyData scanKey[scanKeyCount];
|
||||||
|
|
||||||
|
Relation pgShardPlacement = heap_open(DistShardPlacementRelationId(),
|
||||||
|
AccessShareLock);
|
||||||
|
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_nodename,
|
||||||
|
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
|
||||||
|
ScanKeyInit(&scanKey[1], Anum_pg_dist_shard_placement_nodeport,
|
||||||
|
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodePort));
|
||||||
|
|
||||||
|
scanDescriptor = systable_beginscan(pgShardPlacement,
|
||||||
|
DistShardPlacementNodeidIndexId(), indexOK,
|
||||||
|
NULL, scanKeyCount, scanKey);
|
||||||
|
|
||||||
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
hasPlacements = HeapTupleIsValid(heapTuple);
|
||||||
|
|
||||||
|
systable_endscan(scanDescriptor);
|
||||||
|
heap_close(pgShardPlacement, AccessShareLock);
|
||||||
|
|
||||||
|
return hasPlacements;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FinalizedShardPlacementList finds shard placements for the given shardId from
|
* FinalizedShardPlacementList finds shard placements for the given shardId from
|
||||||
* system catalogs, chooses placements that are in finalized state, and returns
|
* system catalogs, chooses placements that are in finalized state, and returns
|
||||||
|
|
|
@ -56,6 +56,7 @@ static Oid distPartitionLogicalRelidIndexId = InvalidOid;
|
||||||
static Oid distShardLogicalRelidIndexId = InvalidOid;
|
static Oid distShardLogicalRelidIndexId = InvalidOid;
|
||||||
static Oid distShardShardidIndexId = InvalidOid;
|
static Oid distShardShardidIndexId = InvalidOid;
|
||||||
static Oid distShardPlacementShardidIndexId = InvalidOid;
|
static Oid distShardPlacementShardidIndexId = InvalidOid;
|
||||||
|
static Oid distShardPlacementNodeidIndexId = InvalidOid;
|
||||||
static Oid extraDataContainerFuncId = InvalidOid;
|
static Oid extraDataContainerFuncId = InvalidOid;
|
||||||
|
|
||||||
/* Hash table for informations about each partition */
|
/* Hash table for informations about each partition */
|
||||||
|
@ -667,6 +668,17 @@ DistShardPlacementShardidIndexId(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* return oid of pg_dist_shard_placement_nodeid_index */
|
||||||
|
Oid
|
||||||
|
DistShardPlacementNodeidIndexId(void)
|
||||||
|
{
|
||||||
|
CachedRelationLookup("pg_dist_shard_placement_nodeid_index",
|
||||||
|
&distShardPlacementNodeidIndexId);
|
||||||
|
|
||||||
|
return distShardPlacementNodeidIndexId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* return oid of the citus_extradata_container(internal) function */
|
/* return oid of the citus_extradata_container(internal) function */
|
||||||
Oid
|
Oid
|
||||||
CitusExtraDataContainerFuncId(void)
|
CitusExtraDataContainerFuncId(void)
|
||||||
|
|
|
@ -132,13 +132,13 @@ cluster_remove_node(PG_FUNCTION_ARGS)
|
||||||
int32 nodePort = PG_GETARG_INT32(1);
|
int32 nodePort = PG_GETARG_INT32(1);
|
||||||
char *nodeNameString = text_to_cstring(nodeName);
|
char *nodeNameString = text_to_cstring(nodeName);
|
||||||
|
|
||||||
DeleteNodeRow(nodeNameString, nodePort);
|
bool hasShardPlacements = NodeHasShardPlacements(nodeNameString, nodePort);
|
||||||
|
if (hasShardPlacements)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("you cannot remove a node which has shard placements")));
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
DeleteNodeRow(nodeNameString, nodePort);
|
||||||
* 1) lookup the node
|
|
||||||
* 2) ensure there are no existing shard placements for this node
|
|
||||||
* 3) remove the row
|
|
||||||
*/
|
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,6 +62,7 @@ extern int ShardIntervalCount(Oid relationId);
|
||||||
extern List * LoadShardList(Oid relationId);
|
extern List * LoadShardList(Oid relationId);
|
||||||
extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval);
|
extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval);
|
||||||
extern uint64 ShardLength(uint64 shardId);
|
extern uint64 ShardLength(uint64 shardId);
|
||||||
|
extern bool NodeHasShardPlacements(char *nodeName, int32 nodePort);
|
||||||
extern List * FinalizedShardPlacementList(uint64 shardId);
|
extern List * FinalizedShardPlacementList(uint64 shardId);
|
||||||
extern List * ShardPlacementList(uint64 shardId);
|
extern List * ShardPlacementList(uint64 shardId);
|
||||||
extern ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc,
|
extern ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc,
|
||||||
|
|
|
@ -74,6 +74,7 @@ extern Oid DistPartitionLogicalRelidIndexId(void);
|
||||||
extern Oid DistShardLogicalRelidIndexId(void);
|
extern Oid DistShardLogicalRelidIndexId(void);
|
||||||
extern Oid DistShardShardidIndexId(void);
|
extern Oid DistShardShardidIndexId(void);
|
||||||
extern Oid DistShardPlacementShardidIndexId(void);
|
extern Oid DistShardPlacementShardidIndexId(void);
|
||||||
|
extern Oid DistShardPlacementNodeidIndexId(void);
|
||||||
|
|
||||||
/* function oids */
|
/* function oids */
|
||||||
extern Oid CitusExtraDataContainerFuncId(void);
|
extern Oid CitusExtraDataContainerFuncId(void);
|
||||||
|
|
Loading…
Reference in New Issue