Merge pull request #2142 from citusdata/master_update_node_locking

Make master_update_node block writes to the node
pull/2155/head
Marco Slot 2018-05-09 14:27:03 +02:00 committed by GitHub
commit a63e628120
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 500 additions and 26 deletions

View File

@ -773,6 +773,51 @@ BuildShardPlacementList(ShardInterval *shardInterval)
}
/*
* BuildShardPlacementListForGroup finds shard placements for the given groupId
* from system catalogs, converts these placements to their in-memory
* representation, and returns the converted shard placements in a new list.
*/
List *
AllShardPlacementsOnNodeGroup(int32 groupId)
{
List *shardPlacementList = NIL;
Relation pgPlacement = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
HeapTuple heapTuple = NULL;
pgPlacement = heap_open(DistPlacementRelationId(), AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_groupid,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
scanDescriptor = systable_beginscan(pgPlacement,
DistPlacementGroupidIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
TupleDesc tupleDescriptor = RelationGetDescr(pgPlacement);
GroupShardPlacement *placement =
TupleToGroupShardPlacement(tupleDescriptor, heapTuple);
shardPlacementList = lappend(shardPlacementList, placement);
heapTuple = systable_getnext(scanDescriptor);
}
systable_endscan(scanDescriptor);
heap_close(pgPlacement, NoLock);
return shardPlacementList;
}
/*
* TupleToGroupShardPlacement takes in a heap tuple from pg_dist_placement,
* and converts this tuple to in-memory struct. The function assumes the

View File

@ -503,6 +503,30 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
}
/*
* LookupNodeByNodeId returns a worker node by nodeId or NULL if the node
* cannot be found.
*/
WorkerNode *
LookupNodeByNodeId(uint32 nodeId)
{
int workerNodeIndex = 0;
PrepareWorkerNodeCache();
for (workerNodeIndex = 0; workerNodeIndex < WorkerNodeCount; workerNodeIndex++)
{
WorkerNode *workerNode = WorkerNodeArray[workerNodeIndex];
if (workerNode->nodeId == nodeId)
{
return workerNode;
}
}
return NULL;
}
/*
* LookupNodeForGroup searches the WorkerNodeHash for a worker which is a member of the
* given group and also readable (a primary if we're reading from primaries, a secondary

View File

@ -468,27 +468,61 @@ master_update_node(PG_FUNCTION_ARGS)
int32 newNodePort = PG_GETARG_INT32(2);
char *newNodeNameString = text_to_cstring(newNodeName);
WorkerNode *workerNode = NULL;
WorkerNode *workerNodeWithSameAddress = NULL;
List *placementList = NIL;
CheckCitusVersion(ERROR);
workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString, newNodePort);
if (workerNodeWithSameAddress != NULL)
{
/* a node with the given hostname and port already exists in the metadata */
if (workerNodeWithSameAddress->nodeId == nodeId)
{
/* it's the node itself, meaning this is a noop update */
PG_RETURN_VOID();
}
else
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("there is already another node with the specified "
"hostname and port")));
}
}
workerNode = LookupNodeByNodeId(nodeId);
if (workerNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND),
errmsg("node %u not found", nodeId)));
}
/*
* If the node is a primary node we block reads and writes.
*
* This lock has two purposes:
* - Ensure buggy code in Citus doesn't cause failures when the nodename/nodeport of
* a node changes mid-query
* - Provide fencing during failover, after this function returns all connections
* will use the new node location.
*
* - Ensure buggy code in Citus doesn't cause failures when the
* nodename/nodeport of a node changes mid-query
*
* - Provide fencing during failover, after this function returns all
* connections will use the new node location.
*
* Drawback:
* - This function blocks until all previous queries have finished. This means that
* long-running queries will prevent failover.
*
* - This function blocks until all previous queries have finished. This
* means that long-running queries will prevent failover.
*
* It might be worth blocking reads to a secondary for the same reasons,
* though we currently only query secondaries on follower clusters
* where these locks will have no effect.
*/
LockRelationOid(DistNodeRelationId(), AccessExclusiveLock);
if (FindWorkerNodeAnyCluster(newNodeNameString, newNodePort) != NULL)
if (WorkerNodeIsPrimary(workerNode))
{
ereport(ERROR, (errmsg("node at \"%s:%u\" already exists",
newNodeNameString,
newNodePort)));
placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock);
}
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);

View File

@ -273,6 +273,30 @@ LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode)
}
/*
* LockPlacementListMetadata takes locks on the metadata of all shards in
* shardPlacementList to prevent concurrent placement changes.
*/
void
LockShardsInPlacementListMetadata(List *shardPlacementList, LOCKMODE lockMode)
{
ListCell *shardPlacementCell = NULL;
/* lock shards in order of shard id to prevent deadlock */
shardPlacementList =
SortList(shardPlacementList, CompareShardPlacementsByShardId);
foreach(shardPlacementCell, shardPlacementList)
{
GroupShardPlacement *placement =
(GroupShardPlacement *) lfirst(shardPlacementCell);
int64 shardId = placement->shardId;
LockShardDistributionMetadata(shardId, lockMode);
}
}
/*
* LockShardListResources takes locks on all shards in shardIntervalList to
* prevent concurrent DML statements on those shards.

View File

@ -129,6 +129,34 @@ CompareShardIntervalsById(const void *leftElement, const void *rightElement)
}
/*
* CompareShardPlacementsByShardId is a comparison function for sorting shard
* placement by their shard ID.
*/
int
CompareShardPlacementsByShardId(const void *leftElement, const void *rightElement)
{
GroupShardPlacement *left = *((GroupShardPlacement **) leftElement);
GroupShardPlacement *right = *((GroupShardPlacement **) rightElement);
int64 leftShardId = left->shardId;
int64 rightShardId = right->shardId;
/* we compare 64-bit integers, instead of casting their difference to int */
if (leftShardId > rightShardId)
{
return 1;
}
else if (leftShardId < rightShardId)
{
return -1;
}
else
{
return 0;
}
}
/*
* CompareRelationShards is a comparison function for sorting relation
* to shard mappings by their relation ID and then shard ID.

View File

@ -127,6 +127,7 @@ extern bool NodeGroupHasShardPlacements(uint32 groupId,
extern List * FinalizedShardPlacementList(uint64 shardId);
extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk);
extern List * BuildShardPlacementList(ShardInterval *shardInterval);
extern List * AllShardPlacementsOnNodeGroup(int32 groupId);
extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, uint32 groupId);
/* Function declarations to modify shard and shard placement data */

View File

@ -102,6 +102,7 @@ extern void EnsureModificationsCanRun(void);
/* access WorkerNodeHash */
extern HTAB * GetWorkerNodeHash(void);
extern WorkerNode * LookupNodeByNodeId(uint32 nodeId);
/* relation oids */
extern Oid DistColocationRelationId(void);

View File

@ -77,6 +77,8 @@ extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode);
/* Lock multiple shards for safe modification */
extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode);
extern void LockShardsInPlacementListMetadata(List *shardPlacementList,
LOCKMODE lockMode);
extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode);
extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode);

View File

@ -30,6 +30,8 @@ extern ShardInterval * LowestShardIntervalById(List *shardIntervalList);
extern int CompareShardIntervals(const void *leftElement, const void *rightElement,
FmgrInfo *typeCompareFunction);
extern int CompareShardIntervalsById(const void *leftElement, const void *rightElement);
extern int CompareShardPlacementsByShardId(const void *leftElement, const
void *rightElement);
extern int CompareRelationShards(const void *leftElement,
const void *rightElement);
extern int ShardIndex(ShardInterval *shardInterval);

View File

@ -66,6 +66,7 @@ extern uint32 ActivePrimaryNodeCount(void);
extern List * ActivePrimaryNodeList(void);
extern uint32 ActiveReadableNodeCount(void);
extern List * ActiveReadableNodeList(void);
extern WorkerNode * GetWorkerNodeByNodeId(int nodeId);
extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort);
extern WorkerNode * FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort);
extern List * ReadWorkerNodes(bool includeNodesFromOtherClusters);

View File

@ -98,7 +98,7 @@ step s1-get-current-transaction-id:
row
(0,289)
(0,299)
step s2-get-first-worker-active-transactions:
SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number)
FROM
@ -109,7 +109,7 @@ step s2-get-first-worker-active-transactions:
nodename nodeport success result
localhost 57637 t (0,289)
localhost 57637 t (0,299)
step s1-commit:
COMMIT;

View File

@ -29,11 +29,11 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
292 291 f
302 301 f
transactionnumberwaitingtransactionnumbers
291
292 291
301
302 301
step s1-abort:
ABORT;
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
296 295 f
297 295 f
297 296 t
306 305 f
307 305 f
307 306 t
transactionnumberwaitingtransactionnumbers
295
296 295
297 295,296
305
306 305
307 305,306
step s1-abort:
ABORT;

View File

@ -16,7 +16,7 @@ step s1-finish:
COMMIT;
step s2-insert: <... completed>
error in steps s1-finish s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102321"
error in steps s1-finish s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102329"
step s2-finish:
COMMIT;

View File

@ -0,0 +1,88 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s1-update-node-1 s2-update-node-2 s1-commit s1-show-nodes
nodeid nodename nodeport
22 localhost 57637
23 localhost 57638
step s1-begin:
BEGIN;
step s1-update-node-1:
SELECT 1 FROM master_update_node(
(select nodeid from pg_dist_node where nodeport = 57637),
'localhost',
58637);
?column?
1
step s2-update-node-2:
SELECT 1 FROM master_update_node(
(select nodeid from pg_dist_node where nodeport = 57638),
'localhost',
58638);
?column?
1
step s1-commit:
COMMIT;
step s1-show-nodes:
SELECT nodeid, nodename, nodeport, isactive
FROM pg_dist_node
ORDER BY nodename, nodeport;
nodeid nodename nodeport isactive
22 localhost 58637 t
23 localhost 58638 t
nodeid nodename nodeport
starting permutation: s1-begin s1-update-node-1 s2-begin s2-update-node-1 s1-commit s2-abort s1-show-nodes
nodeid nodename nodeport
24 localhost 57637
25 localhost 57638
step s1-begin:
BEGIN;
step s1-update-node-1:
SELECT 1 FROM master_update_node(
(select nodeid from pg_dist_node where nodeport = 57637),
'localhost',
58637);
?column?
1
step s2-begin:
BEGIN;
step s2-update-node-1:
SELECT 1 FROM master_update_node(
(select nodeid from pg_dist_node where nodeport = 57637),
'localhost',
58637);
<waiting ...>
step s1-commit:
COMMIT;
step s2-update-node-1: <... completed>
error in steps s1-commit s2-update-node-1: ERROR: tuple concurrently updated
step s2-abort:
ABORT;
step s1-show-nodes:
SELECT nodeid, nodename, nodeport, isactive
FROM pg_dist_node
ORDER BY nodename, nodeport;
nodeid nodename nodeport isactive
25 localhost 57638 t
24 localhost 58637 t
nodeid nodename nodeport

View File

@ -0,0 +1,64 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s1-update-node-1 s2-begin s2-insert s1-commit s2-abort
create_distributed_table
step s1-begin:
BEGIN;
step s1-update-node-1:
SELECT 1 FROM master_update_node(
(select nodeid from pg_dist_node where nodeport = 57637),
'localhost',
57638);
?column?
1
step s2-begin:
BEGIN;
step s2-insert:
INSERT INTO update_node(id, f1)
SELECT id, md5(id::text)
FROM generate_series(1, 10) as t(id);
<waiting ...>
step s1-commit:
COMMIT;
step s2-insert: <... completed>
error in steps s1-commit s2-insert: ERROR: relation "public.update_node_102008" does not exist
step s2-abort:
ABORT;
nodeid nodename nodeport
starting permutation: s2-begin s2-insert s1-update-node-1 s2-commit
create_distributed_table
step s2-begin:
BEGIN;
step s2-insert:
INSERT INTO update_node(id, f1)
SELECT id, md5(id::text)
FROM generate_series(1, 10) as t(id);
step s1-update-node-1:
SELECT 1 FROM master_update_node(
(select nodeid from pg_dist_node where nodeport = 57637),
'localhost',
57638);
<waiting ...>
step s2-commit:
COMMIT;
step s1-update-node-1: <... completed>
?column?
1
nodeid nodename nodeport

View File

@ -560,10 +560,10 @@ SELECT master_add_secondary_node('localhost', 9992, 'localhost', :worker_1_port,
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
-- master_update_node checks node exists
SELECT master_update_node(100, 'localhost', 8000);
ERROR: could not find valid entry for node "localhost:8000"
ERROR: node 100 not found
-- master_update_node disallows aliasing existing node
SELECT master_update_node(:worker_1_node, 'localhost', :worker_2_port);
ERROR: node at "localhost:57638" already exists
ERROR: there is already another node with the specified hostname and port
-- master_update_node moves a node
SELECT master_update_node(:worker_1_node, 'somehost', 9000);
master_update_node

View File

@ -1,4 +1,6 @@
test: isolation_add_remove_node
test: isolation_update_node
test: isolation_update_node_lock_writes
test: isolation_add_node_vs_reference_table_operations
test: isolation_create_table_vs_add_remove_node

View File

@ -0,0 +1,87 @@
setup
{
SELECT 1 FROM master_add_node('localhost', 57637);
SELECT 1 FROM master_add_node('localhost', 57638);
SELECT nodeid, nodename, nodeport from pg_dist_node;
}
teardown
{
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
SELECT nodeid, nodename, nodeport from pg_dist_node;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-update-node-1"
{
SELECT 1 FROM master_update_node(
(select nodeid from pg_dist_node where nodeport = 57637),
'localhost',
58637);
}
step "s1-update-node-2"
{
SELECT 1 FROM master_update_node(
(select nodeid from pg_dist_node where nodeport = 57638),
'localhost',
58638);
}
step "s1-commit"
{
COMMIT;
}
step "s1-show-nodes"
{
SELECT nodeid, nodename, nodeport, isactive
FROM pg_dist_node
ORDER BY nodename, nodeport;
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-update-node-1"
{
SELECT 1 FROM master_update_node(
(select nodeid from pg_dist_node where nodeport = 57637),
'localhost',
58637);
}
step "s2-update-node-2"
{
SELECT 1 FROM master_update_node(
(select nodeid from pg_dist_node where nodeport = 57638),
'localhost',
58638);
}
step "s2-abort"
{
ABORT;
}
step "s2-commit"
{
COMMIT;
}
# session 1 updates node 1, session 2 updates node 2, should be ok
permutation "s1-begin" "s1-update-node-1" "s2-update-node-2" "s1-commit" "s1-show-nodes"
# sessions 1 updates node 1, session 2 tries to do the same
permutation "s1-begin" "s1-update-node-1" "s2-begin" "s2-update-node-1" "s1-commit" "s2-abort" "s1-show-nodes"

View File

@ -0,0 +1,71 @@
setup
{
SELECT 1 FROM master_add_node('localhost', 57637);
SET citus.shard_replication_factor TO 1;
CREATE TABLE update_node(id integer primary key, f1 text);
SELECT create_distributed_table('update_node', 'id');
}
teardown
{
RESET citus.shard_replication_factor;
DROP TABLE update_node CASCADE;
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
SELECT nodeid, nodename, nodeport from pg_dist_node;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-update-node-1"
{
SELECT 1 FROM master_update_node(
(select nodeid from pg_dist_node where nodeport = 57637),
'localhost',
57638);
}
step "s1-commit"
{
COMMIT;
}
step "s1-abort"
{
ABORT;
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-insert"
{
INSERT INTO update_node(id, f1)
SELECT id, md5(id::text)
FROM generate_series(1, 10) as t(id);
}
step "s2-abort"
{
ABORT;
}
step "s2-commit"
{
COMMIT;
}
# session 1 updates node 1, session 2 writes should be blocked
permutation "s1-begin" "s1-update-node-1" "s2-begin" "s2-insert" "s1-commit" "s2-abort"
permutation "s2-begin" "s2-insert" "s1-update-node-1" "s2-commit"