Fix and simplify pg_dist_node locking

pull/1407/head
Marco Slot 2017-05-10 12:30:37 +02:00
parent 01d8926228
commit 868ee6be83
5 changed files with 655 additions and 33 deletions

View File

@ -219,6 +219,9 @@ master_disable_node(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
if (workerNode == NULL) if (workerNode == NULL)
{ {
@ -351,28 +354,12 @@ PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes)
static Datum static Datum
ActivateNode(char *nodeName, int nodePort) ActivateNode(char *nodeName, int nodePort)
{ {
Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
CommandId commandId = GetCurrentCommandId(true);
LockTupleMode lockTupleMode = LockTupleExclusive;
LockWaitPolicy lockWaitPolicy = LockWaitError;
bool followUpdates = false;
Buffer buffer = 0;
HeapUpdateFailureData heapUpdateFailureData;
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
bool isActive = true; bool isActive = true;
Datum nodeRecord = 0; Datum nodeRecord = 0;
if (heapTuple == NULL) /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
{ LockRelationOid(DistNodeRelationId(), ExclusiveLock);
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
nodeName, nodePort)));
}
heap_lock_tuple(pgDistNode, heapTuple, commandId, lockTupleMode, lockWaitPolicy,
followUpdates, &buffer, &heapUpdateFailureData);
ReleaseBuffer(buffer);
SetNodeState(nodeName, nodePort, isActive); SetNodeState(nodeName, nodePort, isActive);
@ -385,8 +372,6 @@ ActivateNode(char *nodeName, int nodePort)
nodeRecord = GenerateNodeTuple(workerNode); nodeRecord = GenerateNodeTuple(workerNode);
heap_close(pgDistNode, NoLock);
return nodeRecord; return nodeRecord;
} }
@ -400,7 +385,7 @@ Datum
master_initialize_node_metadata(PG_FUNCTION_ARGS) master_initialize_node_metadata(PG_FUNCTION_ARGS)
{ {
ListCell *workerNodeCell = NULL; ListCell *workerNodeCell = NULL;
List *workerNodes = NULL; List *workerNodes = NIL;
bool nodeAlreadyExists = false; bool nodeAlreadyExists = false;
/* nodeRole and nodeCluster don't exist when this function is caled */ /* nodeRole and nodeCluster don't exist when this function is caled */
@ -409,7 +394,15 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
/*
* This function should only ever be called from the create extension
* script, but just to be sure, take an exclusive lock on pg_dist_node
* to prevent concurrent calls.
*/
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
workerNodes = ParseWorkerNodeFileAndRename(); workerNodes = ParseWorkerNodeFileAndRename();
foreach(workerNodeCell, workerNodes) foreach(workerNodeCell, workerNodes)
{ {
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
@ -629,6 +622,9 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
EnsureCoordinator(); EnsureCoordinator();
EnsureSuperUser(); EnsureSuperUser();
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
if (workerNode == NULL) if (workerNode == NULL)
{ {
@ -734,17 +730,19 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
*nodeAlreadyExists = false; *nodeAlreadyExists = false;
/* /*
* Acquire a lock so that no one can do this concurrently. Specifically, * Take an exclusive lock on pg_dist_node to serialize node changes.
* pg_dist_node_trigger_func also takes out a ShareRowExclusiveLock. This lets us * We may want to relax or have more fine-grained locking in the future
* ensure there is only one primary per node group. * to allow users to add multiple nodes concurrently.
*/ */
LockRelationOid(DistNodeRelationId(), ShareRowExclusiveLock); LockRelationOid(DistNodeRelationId(), ExclusiveLock);
workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
if (workerNode != NULL) if (workerNode != NULL)
{ {
*nodeAlreadyExists = true; /* fill return data and return */
returnData = GenerateNodeTuple(workerNode); returnData = GenerateNodeTuple(workerNode);
*nodeAlreadyExists = true;
return returnData; return returnData;
} }
@ -923,7 +921,6 @@ GenerateNodeTuple(WorkerNode *workerNode)
values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(workerNode->nodeRole); values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(workerNode->nodeRole);
values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum; values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
/* open shard relation and generate new tuple */
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
/* generate the tuple */ /* generate the tuple */
@ -931,7 +928,6 @@ GenerateNodeTuple(WorkerNode *workerNode)
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
nodeDatum = HeapTupleGetDatum(heapTuple); nodeDatum = HeapTupleGetDatum(heapTuple);
/* close the relation */
heap_close(pgDistNode, NoLock); heap_close(pgDistNode, NoLock);
return nodeDatum; return nodeDatum;
@ -1088,7 +1084,6 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *
values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeRole); values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeRole);
values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum; values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
/* open shard relation and insert new tuple */
pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock); pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistNode); tupleDescriptor = RelationGetDescr(pgDistNode);
@ -1096,13 +1091,13 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *
CatalogTupleInsert(pgDistNode, heapTuple); CatalogTupleInsert(pgDistNode, heapTuple);
/* close relation and invalidate previous cache entry */
heap_close(pgDistNode, NoLock);
CitusInvalidateRelcacheByRelid(DistNodeRelationId()); CitusInvalidateRelcacheByRelid(DistNodeRelationId());
/* increment the counter so that next command can see the row */ /* increment the counter so that next command can see the row */
CommandCounterIncrement(); CommandCounterIncrement();
/* close relation */
heap_close(pgDistNode, NoLock);
} }
@ -1140,13 +1135,14 @@ DeleteNodeRow(char *nodeName, int32 nodePort)
simple_heap_delete(pgDistNode, &(heapTuple->t_self)); simple_heap_delete(pgDistNode, &(heapTuple->t_self));
systable_endscan(heapScan); systable_endscan(heapScan);
heap_close(pgDistNode, NoLock);
/* ensure future commands don't use the node we just removed */ /* ensure future commands don't use the node we just removed */
CitusInvalidateRelcacheByRelid(DistNodeRelationId()); CitusInvalidateRelcacheByRelid(DistNodeRelationId());
/* increment the counter so that next command won't see the row */ /* increment the counter so that next command won't see the row */
CommandCounterIncrement(); CommandCounterIncrement();
heap_close(pgDistNode, NoLock);
} }

View File

@ -230,6 +230,9 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval)
List *workerNodeList = NULL; List *workerNodeList = NULL;
ListCell *workerNodeCell = NULL; ListCell *workerNodeCell = NULL;
/* prevent concurrent pg_dist_node changes */
LockRelationOid(DistNodeRelationId(), RowShareLock);
workerNodeList = ActivePrimaryNodeList(); workerNodeList = ActivePrimaryNodeList();
/* /*

View File

@ -0,0 +1,488 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s1-add-node-1 s2-remove-node-1 s1-commit s1-show-nodes
?column?
1
step s1-begin:
BEGIN;
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s2-remove-node-1:
SELECT * FROM master_remove_node('localhost', 57637);
<waiting ...>
step s1-commit:
COMMIT;
step s2-remove-node-1: <... completed>
master_remove_node
step s1-show-nodes:
SELECT nodename, nodeport, isactive FROM pg_dist_node ORDER BY nodename, nodeport;
nodename nodeport isactive
master_remove_node
starting permutation: s1-begin s1-add-node-1 s2-add-node-1 s1-commit s1-show-nodes
?column?
1
step s1-begin:
BEGIN;
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s2-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
<waiting ...>
step s1-commit:
COMMIT;
step s2-add-node-1: <... completed>
?column?
1
step s1-show-nodes:
SELECT nodename, nodeport, isactive FROM pg_dist_node ORDER BY nodename, nodeport;
nodename nodeport isactive
localhost 57637 t
master_remove_node
starting permutation: s1-begin s1-add-node-1 s2-add-node-1 s1-commit s1-show-nodes
?column?
1
step s1-begin:
BEGIN;
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s2-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
<waiting ...>
step s1-commit:
COMMIT;
step s2-add-node-1: <... completed>
?column?
1
step s1-show-nodes:
SELECT nodename, nodeport, isactive FROM pg_dist_node ORDER BY nodename, nodeport;
nodename nodeport isactive
localhost 57637 t
master_remove_node
starting permutation: s1-add-node-1 s1-add-node-2 s1-begin s1-remove-node-1 s2-remove-node-2 s1-commit s1-show-nodes
?column?
1
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s1-add-node-2:
SELECT 1 FROM master_add_node('localhost', 57638);
?column?
1
step s1-begin:
BEGIN;
step s1-remove-node-1:
SELECT * FROM master_remove_node('localhost', 57637);
master_remove_node
step s2-remove-node-2:
SELECT * FROM master_remove_node('localhost', 57638);
<waiting ...>
step s1-commit:
COMMIT;
step s2-remove-node-2: <... completed>
master_remove_node
step s1-show-nodes:
SELECT nodename, nodeport, isactive FROM pg_dist_node ORDER BY nodename, nodeport;
nodename nodeport isactive
master_remove_node
starting permutation: s1-add-node-1 s1-begin s1-remove-node-1 s2-remove-node-1 s1-commit s1-show-nodes
?column?
1
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s1-begin:
BEGIN;
step s1-remove-node-1:
SELECT * FROM master_remove_node('localhost', 57637);
master_remove_node
step s2-remove-node-1:
SELECT * FROM master_remove_node('localhost', 57637);
<waiting ...>
step s1-commit:
COMMIT;
step s2-remove-node-1: <... completed>
error in steps s1-commit s2-remove-node-1: ERROR: node at "localhost:57637" does not exist
step s1-show-nodes:
SELECT nodename, nodeport, isactive FROM pg_dist_node ORDER BY nodename, nodeport;
nodename nodeport isactive
master_remove_node
starting permutation: s1-add-node-1 s1-begin s1-activate-node-1 s2-activate-node-1 s1-commit s1-show-nodes
?column?
1
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s1-begin:
BEGIN;
step s1-activate-node-1:
SELECT 1 FROM master_activate_node('localhost', 57637);
?column?
1
step s2-activate-node-1:
SELECT 1 FROM master_activate_node('localhost', 57637);
<waiting ...>
step s1-commit:
COMMIT;
step s2-activate-node-1: <... completed>
?column?
1
step s1-show-nodes:
SELECT nodename, nodeport, isactive FROM pg_dist_node ORDER BY nodename, nodeport;
nodename nodeport isactive
localhost 57637 t
master_remove_node
starting permutation: s1-add-node-1 s1-begin s1-disable-node-1 s2-disable-node-1 s1-commit s1-show-nodes
?column?
1
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s1-begin:
BEGIN;
step s1-disable-node-1:
SELECT 1 FROM master_disable_node('localhost', 57637);
?column?
1
step s2-disable-node-1:
SELECT 1 FROM master_disable_node('localhost', 57637);
<waiting ...>
step s1-commit:
COMMIT;
step s2-disable-node-1: <... completed>
?column?
1
step s1-show-nodes:
SELECT nodename, nodeport, isactive FROM pg_dist_node ORDER BY nodename, nodeport;
nodename nodeport isactive
localhost 57637 f
master_remove_node
starting permutation: s1-add-inactive-1 s1-begin s1-activate-node-1 s2-activate-node-1 s1-commit s1-show-nodes
?column?
1
step s1-add-inactive-1:
SELECT 1 FROM master_add_inactive_node('localhost', 57637);
?column?
1
step s1-begin:
BEGIN;
step s1-activate-node-1:
SELECT 1 FROM master_activate_node('localhost', 57637);
?column?
1
step s2-activate-node-1:
SELECT 1 FROM master_activate_node('localhost', 57637);
<waiting ...>
step s1-commit:
COMMIT;
step s2-activate-node-1: <... completed>
?column?
1
step s1-show-nodes:
SELECT nodename, nodeport, isactive FROM pg_dist_node ORDER BY nodename, nodeport;
nodename nodeport isactive
localhost 57637 t
master_remove_node
starting permutation: s1-add-inactive-1 s1-begin s1-disable-node-1 s2-disable-node-1 s1-commit s1-show-nodes
?column?
1
step s1-add-inactive-1:
SELECT 1 FROM master_add_inactive_node('localhost', 57637);
?column?
1
step s1-begin:
BEGIN;
step s1-disable-node-1:
SELECT 1 FROM master_disable_node('localhost', 57637);
?column?
1
step s2-disable-node-1:
SELECT 1 FROM master_disable_node('localhost', 57637);
<waiting ...>
step s1-commit:
COMMIT;
step s2-disable-node-1: <... completed>
?column?
1
step s1-show-nodes:
SELECT nodename, nodeport, isactive FROM pg_dist_node ORDER BY nodename, nodeport;
nodename nodeport isactive
localhost 57637 f
master_remove_node
starting permutation: s1-add-node-1 s1-begin s1-disable-node-1 s2-activate-node-1 s1-commit s1-show-nodes
?column?
1
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s1-begin:
BEGIN;
step s1-disable-node-1:
SELECT 1 FROM master_disable_node('localhost', 57637);
?column?
1
step s2-activate-node-1:
SELECT 1 FROM master_activate_node('localhost', 57637);
<waiting ...>
step s1-commit:
COMMIT;
step s2-activate-node-1: <... completed>
?column?
1
step s1-show-nodes:
SELECT nodename, nodeport, isactive FROM pg_dist_node ORDER BY nodename, nodeport;
nodename nodeport isactive
localhost 57637 t
master_remove_node
starting permutation: s1-add-node-1 s1-begin s1-activate-node-1 s2-disable-node-1 s1-commit s1-show-nodes
?column?
1
step s1-add-node-1:
SELECT 1 FROM master_add_node('localhost', 57637);
?column?
1
step s1-begin:
BEGIN;
step s1-activate-node-1:
SELECT 1 FROM master_activate_node('localhost', 57637);
?column?
1
step s2-disable-node-1:
SELECT 1 FROM master_disable_node('localhost', 57637);
<waiting ...>
step s1-commit:
COMMIT;
step s2-disable-node-1: <... completed>
?column?
1
step s1-show-nodes:
SELECT nodename, nodeport, isactive FROM pg_dist_node ORDER BY nodename, nodeport;
nodename nodeport isactive
localhost 57637 f
master_remove_node
starting permutation: s1-add-inactive-1 s1-begin s1-disable-node-1 s2-activate-node-1 s1-commit s1-show-nodes
?column?
1
step s1-add-inactive-1:
SELECT 1 FROM master_add_inactive_node('localhost', 57637);
?column?
1
step s1-begin:
BEGIN;
step s1-disable-node-1:
SELECT 1 FROM master_disable_node('localhost', 57637);
?column?
1
step s2-activate-node-1:
SELECT 1 FROM master_activate_node('localhost', 57637);
<waiting ...>
step s1-commit:
COMMIT;
step s2-activate-node-1: <... completed>
?column?
1
step s1-show-nodes:
SELECT nodename, nodeport, isactive FROM pg_dist_node ORDER BY nodename, nodeport;
nodename nodeport isactive
localhost 57637 t
master_remove_node
starting permutation: s1-add-inactive-1 s1-begin s1-activate-node-1 s2-disable-node-1 s1-commit s1-show-nodes
?column?
1
step s1-add-inactive-1:
SELECT 1 FROM master_add_inactive_node('localhost', 57637);
?column?
1
step s1-begin:
BEGIN;
step s1-activate-node-1:
SELECT 1 FROM master_activate_node('localhost', 57637);
?column?
1
step s2-disable-node-1:
SELECT 1 FROM master_disable_node('localhost', 57637);
<waiting ...>
step s1-commit:
COMMIT;
step s2-disable-node-1: <... completed>
?column?
1
step s1-show-nodes:
SELECT nodename, nodeport, isactive FROM pg_dist_node ORDER BY nodename, nodeport;
nodename nodeport isactive
localhost 57637 f
master_remove_node

View File

@ -1,3 +1,4 @@
test: isolation_add_remove_node
test: isolation_add_node_vs_reference_table_operations test: isolation_add_node_vs_reference_table_operations
# tests that change node metadata should precede # tests that change node metadata should precede

View File

@ -0,0 +1,134 @@
setup
{
SELECT 1;
}
teardown
{
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-add-node-1"
{
SELECT 1 FROM master_add_node('localhost', 57637);
}
step "s1-add-node-2"
{
SELECT 1 FROM master_add_node('localhost', 57638);
}
step "s1-add-inactive-1"
{
SELECT 1 FROM master_add_inactive_node('localhost', 57637);
}
step "s1-activate-node-1"
{
SELECT 1 FROM master_activate_node('localhost', 57637);
}
step "s1-disable-node-1"
{
SELECT 1 FROM master_disable_node('localhost', 57637);
}
step "s1-remove-node-1"
{
SELECT * FROM master_remove_node('localhost', 57637);
}
step "s1-remove-node-2"
{
SELECT * FROM master_remove_node('localhost', 57638);
}
step "s1-commit"
{
COMMIT;
}
step "s1-show-nodes"
{
SELECT nodename, nodeport, isactive FROM pg_dist_node ORDER BY nodename, nodeport;
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-add-node-1"
{
SELECT 1 FROM master_add_node('localhost', 57637);
}
step "s2-add-node-2"
{
SELECT 1 FROM master_add_node('localhost', 57638);
}
step "s2-activate-node-1"
{
SELECT 1 FROM master_activate_node('localhost', 57637);
}
step "s2-disable-node-1"
{
SELECT 1 FROM master_disable_node('localhost', 57637);
}
step "s2-remove-node-1"
{
SELECT * FROM master_remove_node('localhost', 57637);
}
step "s2-remove-node-2"
{
SELECT * FROM master_remove_node('localhost', 57638);
}
step "s2-commit"
{
COMMIT;
}
# session 1 adds a node, session 2 removes it, should be ok
permutation "s1-begin" "s1-add-node-1" "s2-remove-node-1" "s1-commit" "s1-show-nodes"
# add a different node from 2 sessions, should be ok
permutation "s1-begin" "s1-add-node-1" "s2-add-node-1" "s1-commit" "s1-show-nodes"
# add the same node from 2 sessions, should be ok (idempotent)
permutation "s1-begin" "s1-add-node-1" "s2-add-node-1" "s1-commit" "s1-show-nodes"
# remove a different node from 2 transactions, should be ok
permutation "s1-add-node-1" "s1-add-node-2" "s1-begin" "s1-remove-node-1" "s2-remove-node-2" "s1-commit" "s1-show-nodes"
# remove the same node from 2 transactions, should be ok (idempotent)
permutation "s1-add-node-1" "s1-begin" "s1-remove-node-1" "s2-remove-node-1" "s1-commit" "s1-show-nodes"
# activate an active node from 2 transactions, should be ok
permutation "s1-add-node-1" "s1-begin" "s1-activate-node-1" "s2-activate-node-1" "s1-commit" "s1-show-nodes"
# disable an active node from 2 transactions, should be ok
permutation "s1-add-node-1" "s1-begin" "s1-disable-node-1" "s2-disable-node-1" "s1-commit" "s1-show-nodes"
# activate an inactive node from 2 transactions, should be ok
permutation "s1-add-inactive-1" "s1-begin" "s1-activate-node-1" "s2-activate-node-1" "s1-commit" "s1-show-nodes"
# disable an inactive node from 2 transactions, should be ok
permutation "s1-add-inactive-1" "s1-begin" "s1-disable-node-1" "s2-disable-node-1" "s1-commit" "s1-show-nodes"
# disable and activate an active node from 2 transactions, should be ok
permutation "s1-add-node-1" "s1-begin" "s1-disable-node-1" "s2-activate-node-1" "s1-commit" "s1-show-nodes"
# activate and disable an active node node from 2 transactions, should be ok
permutation "s1-add-node-1" "s1-begin" "s1-activate-node-1" "s2-disable-node-1" "s1-commit" "s1-show-nodes"
# disable and activate an inactive node from 2 transactions, should be ok
permutation "s1-add-inactive-1" "s1-begin" "s1-disable-node-1" "s2-activate-node-1" "s1-commit" "s1-show-nodes"
# activate and disable an inactive node node from 2 transactions, should be ok
permutation "s1-add-inactive-1" "s1-begin" "s1-activate-node-1" "s2-disable-node-1" "s1-commit" "s1-show-nodes"