mirror of https://github.com/citusdata/citus.git
GetNodeTuple returns NULL it node does not exist
It never throws an error.pull/1519/head
parent
a3e9bef685
commit
7060ade6fe
|
@ -60,7 +60,7 @@ static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId,
|
||||||
Oid nodeRole, char *nodeCluster, bool *nodeAlreadyExists);
|
Oid nodeRole, char *nodeCluster, bool *nodeAlreadyExists);
|
||||||
static uint32 CountPrimariesWithMetadata();
|
static uint32 CountPrimariesWithMetadata();
|
||||||
static void SetNodeState(char *nodeName, int32 nodePort, bool isActive);
|
static void SetNodeState(char *nodeName, int32 nodePort, bool isActive);
|
||||||
static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort, bool raiseError);
|
static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort);
|
||||||
static Datum GenerateNodeTuple(WorkerNode *workerNode);
|
static Datum GenerateNodeTuple(WorkerNode *workerNode);
|
||||||
static int32 GetNextGroupId(void);
|
static int32 GetNextGroupId(void);
|
||||||
static uint32 GetMaxGroupId(void);
|
static uint32 GetMaxGroupId(void);
|
||||||
|
@ -352,7 +352,7 @@ static Datum
|
||||||
ActivateNode(char *nodeName, int nodePort)
|
ActivateNode(char *nodeName, int nodePort)
|
||||||
{
|
{
|
||||||
Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
|
Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
|
||||||
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort, true);
|
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
|
||||||
CommandId commandId = GetCurrentCommandId(true);
|
CommandId commandId = GetCurrentCommandId(true);
|
||||||
LockTupleMode lockTupleMode = LockTupleExclusive;
|
LockTupleMode lockTupleMode = LockTupleExclusive;
|
||||||
LockWaitPolicy lockWaitPolicy = LockWaitError;
|
LockWaitPolicy lockWaitPolicy = LockWaitError;
|
||||||
|
@ -364,6 +364,12 @@ ActivateNode(char *nodeName, int nodePort)
|
||||||
bool isActive = true;
|
bool isActive = true;
|
||||||
Datum nodeRecord = 0;
|
Datum nodeRecord = 0;
|
||||||
|
|
||||||
|
if (heapTuple == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
|
||||||
|
nodeName, nodePort)));
|
||||||
|
}
|
||||||
|
|
||||||
heap_lock_tuple(pgDistNode, heapTuple, commandId, lockTupleMode, lockWaitPolicy,
|
heap_lock_tuple(pgDistNode, heapTuple, commandId, lockTupleMode, lockWaitPolicy,
|
||||||
followUpdates, &buffer, &heapUpdateFailureData);
|
followUpdates, &buffer, &heapUpdateFailureData);
|
||||||
ReleaseBuffer(buffer);
|
ReleaseBuffer(buffer);
|
||||||
|
@ -546,7 +552,7 @@ FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort)
|
||||||
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||||
|
|
||||||
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort, false);
|
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
|
||||||
if (heapTuple != NULL)
|
if (heapTuple != NULL)
|
||||||
{
|
{
|
||||||
workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
|
workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
|
||||||
|
@ -811,7 +817,7 @@ SetNodeState(char *nodeName, int32 nodePort, bool isActive)
|
||||||
{
|
{
|
||||||
Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
|
Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||||
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort, true);
|
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
|
||||||
|
|
||||||
Datum values[Natts_pg_dist_node];
|
Datum values[Natts_pg_dist_node];
|
||||||
bool isnull[Natts_pg_dist_node];
|
bool isnull[Natts_pg_dist_node];
|
||||||
|
@ -820,6 +826,12 @@ SetNodeState(char *nodeName, int32 nodePort, bool isActive)
|
||||||
char *nodeStateUpdateCommand = NULL;
|
char *nodeStateUpdateCommand = NULL;
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
|
|
||||||
|
if (heapTuple == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
|
||||||
|
nodeName, nodePort)));
|
||||||
|
}
|
||||||
|
|
||||||
memset(replace, 0, sizeof(replace));
|
memset(replace, 0, sizeof(replace));
|
||||||
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(isActive);
|
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(isActive);
|
||||||
isnull[Anum_pg_dist_node_isactive - 1] = false;
|
isnull[Anum_pg_dist_node_isactive - 1] = false;
|
||||||
|
@ -843,16 +855,13 @@ SetNodeState(char *nodeName, int32 nodePort, bool isActive)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GetNodeTuple function returns heap tuple of given nodeName and nodePort.
|
* GetNodeTuple function returns the heap tuple of given nodeName and nodePort. If the
|
||||||
*
|
* node is not found this function returns NULL.
|
||||||
* If there are no node tuples with specified nodeName and nodePort and raiseError is
|
|
||||||
* true, this function errors out. If the node is not found and raiseError is false this
|
|
||||||
* function returns NULL.
|
|
||||||
*
|
*
|
||||||
* This function may return worker nodes from other clusters.
|
* This function may return worker nodes from other clusters.
|
||||||
*/
|
*/
|
||||||
static HeapTuple
|
static HeapTuple
|
||||||
GetNodeTuple(char *nodeName, int32 nodePort, bool raiseError)
|
GetNodeTuple(char *nodeName, int32 nodePort)
|
||||||
{
|
{
|
||||||
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
||||||
const int scanKeyCount = 2;
|
const int scanKeyCount = 2;
|
||||||
|
@ -871,21 +880,10 @@ GetNodeTuple(char *nodeName, int32 nodePort, bool raiseError)
|
||||||
NULL, scanKeyCount, scanKey);
|
NULL, scanKeyCount, scanKey);
|
||||||
|
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
if (!HeapTupleIsValid(heapTuple))
|
if (HeapTupleIsValid(heapTuple))
|
||||||
{
|
{
|
||||||
if (raiseError)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
|
|
||||||
nodeName, nodePort)));
|
|
||||||
}
|
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
|
||||||
heap_close(pgDistNode, NoLock);
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeTuple = heap_copytuple(heapTuple);
|
nodeTuple = heap_copytuple(heapTuple);
|
||||||
|
}
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
heap_close(pgDistNode, NoLock);
|
heap_close(pgDistNode, NoLock);
|
||||||
|
|
Loading…
Reference in New Issue