mirror of https://github.com/citusdata/citus.git
node_metadata takes out more sane locks
- Never release locks - AddNodeMetadata takes ShareRowExclusiveLock so it'll conflict with the trigger which prevents multiple primary nodes. - ActivateNode and SetNodeState used to take AccessShareLock, but they modify the table so they should take RowExclusiveLock. - DeleteNodeRow and InsertNodeRow used to take AccessExclusiveLock but only need RowExclusiveLock.pull/1505/merge
parent
ec99f8f983
commit
88702ca58a
|
@ -29,6 +29,7 @@ CREATE OR REPLACE VIEW pg_catalog.pg_dist_shard_placement AS
|
||||||
CREATE OR REPLACE FUNCTION citus.pg_dist_node_trigger_func()
|
CREATE OR REPLACE FUNCTION citus.pg_dist_node_trigger_func()
|
||||||
RETURNS TRIGGER AS $$
|
RETURNS TRIGGER AS $$
|
||||||
BEGIN
|
BEGIN
|
||||||
|
/* AddNodeMetadata also takes out a ShareRowExclusiveLock */
|
||||||
LOCK TABLE pg_dist_node IN SHARE ROW EXCLUSIVE MODE;
|
LOCK TABLE pg_dist_node IN SHARE ROW EXCLUSIVE MODE;
|
||||||
IF (TG_OP = 'INSERT') THEN
|
IF (TG_OP = 'INSERT') THEN
|
||||||
IF NEW.noderole = 'primary'
|
IF NEW.noderole = 'primary'
|
||||||
|
|
|
@ -36,6 +36,7 @@
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
#include "storage/bufmgr.h"
|
#include "storage/bufmgr.h"
|
||||||
|
#include "storage/lmgr.h"
|
||||||
#include "storage/lock.h"
|
#include "storage/lock.h"
|
||||||
#include "storage/fd.h"
|
#include "storage/fd.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
|
@ -329,7 +330,7 @@ PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes)
|
||||||
static Datum
|
static Datum
|
||||||
ActivateNode(char *nodeName, int nodePort)
|
ActivateNode(char *nodeName, int nodePort)
|
||||||
{
|
{
|
||||||
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
|
||||||
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
|
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
|
||||||
CommandId commandId = GetCurrentCommandId(true);
|
CommandId commandId = GetCurrentCommandId(true);
|
||||||
LockTupleMode lockTupleMode = LockTupleExclusive;
|
LockTupleMode lockTupleMode = LockTupleExclusive;
|
||||||
|
@ -357,7 +358,7 @@ ActivateNode(char *nodeName, int nodePort)
|
||||||
|
|
||||||
nodeRecord = GenerateNodeTuple(workerNode);
|
nodeRecord = GenerateNodeTuple(workerNode);
|
||||||
|
|
||||||
heap_close(pgDistNode, AccessShareLock);
|
heap_close(pgDistNode, NoLock);
|
||||||
|
|
||||||
return nodeRecord;
|
return nodeRecord;
|
||||||
}
|
}
|
||||||
|
@ -661,7 +662,6 @@ static Datum
|
||||||
AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
||||||
bool hasMetadata, bool isActive, Oid nodeRole, bool *nodeAlreadyExists)
|
bool hasMetadata, bool isActive, Oid nodeRole, bool *nodeAlreadyExists)
|
||||||
{
|
{
|
||||||
Relation pgDistNode = NULL;
|
|
||||||
int nextNodeIdInt = 0;
|
int nextNodeIdInt = 0;
|
||||||
Datum returnData = 0;
|
Datum returnData = 0;
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
|
@ -673,22 +673,20 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
||||||
|
|
||||||
*nodeAlreadyExists = false;
|
*nodeAlreadyExists = false;
|
||||||
|
|
||||||
/* acquire a lock so that no one can do this concurrently */
|
/*
|
||||||
pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
|
* Acquire a lock so that no one can do this concurrently. Specifically,
|
||||||
|
* pg_dist_node_trigger_func also takes out a ShareRowExclusiveLock. This lets us
|
||||||
|
* ensure there is only one primary per node group.
|
||||||
|
*/
|
||||||
|
LockRelationOid(DistNodeRelationId(), ShareRowExclusiveLock);
|
||||||
|
|
||||||
/* check if the node already exists in the cluster */
|
/* check if the node already exists in the cluster */
|
||||||
workerNode = FindWorkerNode(nodeName, nodePort);
|
workerNode = FindWorkerNode(nodeName, nodePort);
|
||||||
if (workerNode != NULL)
|
if (workerNode != NULL)
|
||||||
{
|
{
|
||||||
/* fill return data and return */
|
|
||||||
returnData = GenerateNodeTuple(workerNode);
|
|
||||||
|
|
||||||
/* close the heap */
|
|
||||||
heap_close(pgDistNode, AccessExclusiveLock);
|
|
||||||
|
|
||||||
*nodeAlreadyExists = true;
|
*nodeAlreadyExists = true;
|
||||||
|
returnData = GenerateNodeTuple(workerNode);
|
||||||
PG_RETURN_DATUM(returnData);
|
return returnData;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* user lets Citus to decide on the group that the newly added node should be in */
|
/* user lets Citus to decide on the group that the newly added node should be in */
|
||||||
|
@ -741,10 +739,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
||||||
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand);
|
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand);
|
||||||
}
|
}
|
||||||
|
|
||||||
heap_close(pgDistNode, NoLock);
|
|
||||||
|
|
||||||
returnData = GenerateNodeTuple(workerNode);
|
returnData = GenerateNodeTuple(workerNode);
|
||||||
|
|
||||||
return returnData;
|
return returnData;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -756,7 +751,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
||||||
static void
|
static void
|
||||||
SetNodeState(char *nodeName, int32 nodePort, bool isActive)
|
SetNodeState(char *nodeName, int32 nodePort, bool isActive)
|
||||||
{
|
{
|
||||||
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||||
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
|
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
|
||||||
|
|
||||||
|
@ -779,7 +774,7 @@ SetNodeState(char *nodeName, int32 nodePort, bool isActive)
|
||||||
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
|
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
|
|
||||||
heap_close(pgDistNode, AccessShareLock);
|
heap_close(pgDistNode, NoLock);
|
||||||
|
|
||||||
/* we also update isactive column at worker nodes */
|
/* we also update isactive column at worker nodes */
|
||||||
workerNode = FindWorkerNode(nodeName, nodePort);
|
workerNode = FindWorkerNode(nodeName, nodePort);
|
||||||
|
@ -822,7 +817,7 @@ GetNodeTuple(char *nodeName, int32 nodePort)
|
||||||
nodeTuple = heap_copytuple(heapTuple);
|
nodeTuple = heap_copytuple(heapTuple);
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
heap_close(pgDistNode, AccessShareLock);
|
heap_close(pgDistNode, NoLock);
|
||||||
|
|
||||||
return nodeTuple;
|
return nodeTuple;
|
||||||
}
|
}
|
||||||
|
@ -855,7 +850,7 @@ GenerateNodeTuple(WorkerNode *workerNode)
|
||||||
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(workerNode->isActive);
|
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(workerNode->isActive);
|
||||||
values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(workerNode->nodeRole);
|
values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(workerNode->nodeRole);
|
||||||
|
|
||||||
/* open shard relation and insert new tuple */
|
/* open shard relation and generate new tuple */
|
||||||
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
||||||
|
|
||||||
/* generate the tuple */
|
/* generate the tuple */
|
||||||
|
@ -864,7 +859,7 @@ GenerateNodeTuple(WorkerNode *workerNode)
|
||||||
nodeDatum = HeapTupleGetDatum(heapTuple);
|
nodeDatum = HeapTupleGetDatum(heapTuple);
|
||||||
|
|
||||||
/* close the relation */
|
/* close the relation */
|
||||||
heap_close(pgDistNode, AccessShareLock);
|
heap_close(pgDistNode, NoLock);
|
||||||
|
|
||||||
return nodeDatum;
|
return nodeDatum;
|
||||||
}
|
}
|
||||||
|
@ -986,8 +981,12 @@ EnsureCoordinator(void)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* InsertNodedRow opens the node system catalog, and inserts a new row with the
|
* InsertNodeRow opens the node system catalog, and inserts a new row with the
|
||||||
* given values into that system catalog.
|
* given values into that system catalog.
|
||||||
|
*
|
||||||
|
* NOTE: If you call this function you probably need to have taken a
|
||||||
|
* ShareRowExclusiveLock then checked that you're not adding a second primary to
|
||||||
|
* an existing group. If you don't it's possible for the metadata to become inconsistent.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack,
|
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack,
|
||||||
|
@ -1013,7 +1012,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *
|
||||||
values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeRole);
|
values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeRole);
|
||||||
|
|
||||||
/* open shard relation and insert new tuple */
|
/* open shard relation and insert new tuple */
|
||||||
pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
|
pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
|
||||||
|
|
||||||
tupleDescriptor = RelationGetDescr(pgDistNode);
|
tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||||
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
||||||
|
@ -1043,7 +1042,7 @@ DeleteNodeRow(char *nodeName, int32 nodePort)
|
||||||
SysScanDesc heapScan = NULL;
|
SysScanDesc heapScan = NULL;
|
||||||
ScanKeyData scanKey[scanKeyCount];
|
ScanKeyData scanKey[scanKeyCount];
|
||||||
|
|
||||||
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
|
Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename,
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename,
|
||||||
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
|
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
|
||||||
|
@ -1064,7 +1063,7 @@ DeleteNodeRow(char *nodeName, int32 nodePort)
|
||||||
simple_heap_delete(pgDistNode, &(heapTuple->t_self));
|
simple_heap_delete(pgDistNode, &(heapTuple->t_self));
|
||||||
|
|
||||||
systable_endscan(heapScan);
|
systable_endscan(heapScan);
|
||||||
heap_close(pgDistNode, AccessExclusiveLock);
|
heap_close(pgDistNode, NoLock);
|
||||||
|
|
||||||
/* ensure future commands don't use the node we just removed */
|
/* ensure future commands don't use the node we just removed */
|
||||||
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
|
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
|
||||||
|
|
|
@ -462,10 +462,10 @@ SELECT master_add_inactive_node('localhost', 9996, groupid => :worker_2_group, n
|
||||||
INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole)
|
INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole)
|
||||||
VALUES ('localhost', 5000, :worker_1_group, 'primary');
|
VALUES ('localhost', 5000, :worker_1_group, 'primary');
|
||||||
ERROR: there cannot be two primary nodes in a group
|
ERROR: there cannot be two primary nodes in a group
|
||||||
CONTEXT: PL/pgSQL function citus.pg_dist_node_trigger_func() line 9 at RAISE
|
CONTEXT: PL/pgSQL function citus.pg_dist_node_trigger_func() line 10 at RAISE
|
||||||
UPDATE pg_dist_node SET noderole = 'primary'
|
UPDATE pg_dist_node SET noderole = 'primary'
|
||||||
WHERE groupid = :worker_1_group AND nodeport = 9998;
|
WHERE groupid = :worker_1_group AND nodeport = 9998;
|
||||||
ERROR: there cannot be two primary nodes in a group
|
ERROR: there cannot be two primary nodes in a group
|
||||||
CONTEXT: PL/pgSQL function citus.pg_dist_node_trigger_func() line 17 at RAISE
|
CONTEXT: PL/pgSQL function citus.pg_dist_node_trigger_func() line 18 at RAISE
|
||||||
-- don't remove the secondary and unavailable nodes, check that no commands are sent to
|
-- don't remove the secondary and unavailable nodes, check that no commands are sent to
|
||||||
-- them in any of the remaining tests
|
-- them in any of the remaining tests
|
||||||
|
|
Loading…
Reference in New Issue