Not consider old placements when disabling or removing a node (#4960)

* Not consider old placements when disabling or removing a node

* update cluster test
pull/5008/head
SaitTalhaNisanci 2021-05-28 23:38:20 +03:00 committed by GitHub
parent 40a229976f
commit 8c3f85692d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 89 additions and 27 deletions

View File

@ -1288,6 +1288,26 @@ ShardLength(uint64 shardId)
}
/*
* NodeGroupHasLivePlacements returns true if there is any placement
* on the given node group which is not a SHARD_STATE_TO_DELETE placement.
*/
bool
NodeGroupHasLivePlacements(int32 groupId)
{
List *shardPlacements = AllShardPlacementsOnNodeGroup(groupId);
GroupShardPlacement *placement = NULL;
foreach_ptr(placement, shardPlacements)
{
if (placement->shardState != SHARD_STATE_TO_DELETE)
{
return true;
}
}
return false;
}
/*
* NodeGroupHasShardPlacements returns whether any active shards are placed on the group
*/

View File

@ -112,7 +112,7 @@ static bool UnsetMetadataSyncedForAll(void);
static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value,
char *field);
static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards);
static void RemoveOldShardPlacementForNodeGroup(int groupId);
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(citus_set_coordinator_host);
@ -1291,9 +1291,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
*/
DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId);
}
bool onlyConsiderActivePlacements = false;
if (NodeGroupHasShardPlacements(workerNode->groupId,
onlyConsiderActivePlacements))
if (NodeGroupHasLivePlacements(workerNode->groupId))
{
if (ClusterHasReferenceTable())
{
@ -1320,6 +1318,8 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
DeleteNodeRow(workerNode->workerName, nodePort);
RemoveOldShardPlacementForNodeGroup(workerNode->groupId);
char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
/* make sure we don't have any lingering session lifespan connections */
@ -1329,6 +1329,29 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
}
/*
* RemoveOldShardPlacementForNodeGroup removes all old shard placements
* for the given node group from pg_dist_placement.
*/
static void
RemoveOldShardPlacementForNodeGroup(int groupId)
{
/*
* Prevent concurrent deferred drop
*/
LockPlacementCleanup();
List *shardPlacementsOnNode = AllShardPlacementsOnNodeGroup(groupId);
GroupShardPlacement *placement = NULL;
foreach_ptr(placement, shardPlacementsOnNode)
{
if (placement->shardState == SHARD_STATE_TO_DELETE)
{
DeleteShardPlacementRow(placement->placementId);
}
}
}
/*
* CanRemoveReferenceTablePlacements returns true if active primary
* node count is more than 1, which means that even if we remove a node

View File

@ -209,6 +209,7 @@ extern int ShardIntervalCount(Oid relationId);
extern List * LoadShardList(Oid relationId);
extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval);
extern uint64 ShardLength(uint64 shardId);
extern bool NodeGroupHasLivePlacements(int32 groupId);
extern bool NodeGroupHasShardPlacements(int32 groupId,
bool onlyConsiderActivePlacements);
extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId);

View File

@ -330,7 +330,7 @@ SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHER
CREATE TABLE cluster_management_test_colocated (col_1 text, col_2 int);
-- Check that we warn the user about colocated shards that will not get created for shards that do not have active placements
SELECT create_distributed_table('cluster_management_test_colocated', 'col_1', 'hash', colocate_with=>'cluster_management_test');
SELECT create_distributed_table('cluster_management_test_colocated', 'col_1', 'hash', colocate_with => 'cluster_management_test');
WARNING: could not find any shard placements for shardId 1220017
WARNING: could not find any shard placements for shardId 1220019
WARNING: could not find any shard placements for shardId 1220021
@ -374,17 +374,30 @@ SELECT logicalrelid, shardid, shardstate, nodename, nodeport FROM pg_dist_shard_
cluster_management_test | 1220015 | 4 | localhost | 57638
(24 rows)
-- try to remove a node with only to be deleted placements and see that removal still fails
SELECT * INTO removed_placements FROM pg_dist_placement WHERE shardstate = 4;
-- try to remove a node with only to be deleted placements and see that removal succeeds
SELECT master_remove_node('localhost', :worker_2_port);
ERROR: cannot remove the primary node of a node group which has shard placements
HINT: To proceed, either drop the distributed tables or use undistribute_table() function to convert them to local tables
master_remove_node
---------------------------------------------------------------------
(1 row)
SELECT master_get_active_worker_nodes();
master_get_active_worker_nodes
---------------------------------------------------------------------
(localhost,57638)
(localhost,57637)
(2 rows)
(1 row)
SELECT master_add_node('localhost', :worker_2_port, groupId := :worker_2_group);
WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker
DETAIL: distributed objects are only kept in sync when citus.enable_object_propagation is set to on. Newly activated nodes will not get these objects created
master_add_node
---------------------------------------------------------------------
7
(1 row)
-- put removed placements back for testing purposes(in practice we wouldn't have only old placements for a shard)
INSERT INTO pg_dist_placement SELECT * FROM removed_placements;
-- clean-up
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
@ -519,14 +532,14 @@ WARNING: citus.enable_object_propagation is off, not creating distributed objec
DETAIL: distributed objects are only kept in sync when citus.enable_object_propagation is set to on. Newly activated nodes will not get these objects created
master_add_node | master_add_node
---------------------------------------------------------------------
11 | 12
12 | 13
(1 row)
SELECT * FROM pg_dist_node ORDER BY nodeid;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
---------------------------------------------------------------------
11 | 9 | localhost | 57637 | default | f | t | primary | default | f | t
12 | 10 | localhost | 57638 | default | f | t | primary | default | f | t
12 | 9 | localhost | 57637 | default | f | t | primary | default | f | t
13 | 10 | localhost | 57638 | default | f | t | primary | default | f | t
(2 rows)
-- check that mixed add/remove node commands work fine inside transaction
@ -724,13 +737,13 @@ SELECT 1 FROM master_add_inactive_node('localhost', 9996, groupid => :worker_2_g
SELECT master_add_inactive_node('localhost', 9999, groupid => :worker_2_group, nodecluster => 'olap', noderole => 'secondary');
master_add_inactive_node
---------------------------------------------------------------------
22
23
(1 row)
SELECT master_activate_node('localhost', 9999);
master_activate_node
---------------------------------------------------------------------
22
23
(1 row)
SELECT master_disable_node('localhost', 9999);
@ -758,17 +771,17 @@ CONTEXT: PL/pgSQL function citus_internal.pg_dist_node_trigger_func() line 18 a
INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole, nodecluster)
VALUES ('localhost', 5000, 1000, 'primary', 'olap');
ERROR: new row for relation "pg_dist_node" violates check constraint "primaries_are_only_allowed_in_the_default_cluster"
DETAIL: Failing row contains (24, 1000, localhost, 5000, default, f, t, primary, olap, f, t).
DETAIL: Failing row contains (25, 1000, localhost, 5000, default, f, t, primary, olap, f, t).
UPDATE pg_dist_node SET nodecluster = 'olap'
WHERE nodeport = :worker_1_port;
ERROR: new row for relation "pg_dist_node" violates check constraint "primaries_are_only_allowed_in_the_default_cluster"
DETAIL: Failing row contains (16, 14, localhost, 57637, default, f, t, primary, olap, f, t).
DETAIL: Failing row contains (17, 14, localhost, 57637, default, f, t, primary, olap, f, t).
-- check that you /can/ add a secondary node to a non-default cluster
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
SELECT master_add_node('localhost', 8888, groupid => :worker_1_group, noderole => 'secondary', nodecluster=> 'olap');
master_add_node
---------------------------------------------------------------------
25
26
(1 row)
-- check that super-long cluster names are truncated
@ -781,13 +794,13 @@ SELECT master_add_node('localhost', 8887, groupid => :worker_1_group, noderole =
);
master_add_node
---------------------------------------------------------------------
26
27
(1 row)
SELECT * FROM pg_dist_node WHERE nodeport=8887;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
---------------------------------------------------------------------
26 | 14 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars. | f | t
27 | 14 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars. | f | t
(1 row)
-- don't remove the secondary and unavailable nodes, check that no commands are sent to
@ -796,13 +809,13 @@ SELECT * FROM pg_dist_node WHERE nodeport=8887;
SELECT master_add_secondary_node('localhost', 9995, 'localhost', :worker_1_port);
master_add_secondary_node
---------------------------------------------------------------------
27
28
(1 row)
SELECT master_add_secondary_node('localhost', 9994, primaryname => 'localhost', primaryport => :worker_2_port);
master_add_secondary_node
---------------------------------------------------------------------
28
29
(1 row)
SELECT master_add_secondary_node('localhost', 9993, 'localhost', 2000);
@ -810,7 +823,7 @@ ERROR: node at "localhost:xxxxx" does not exist
SELECT master_add_secondary_node('localhost', 9992, 'localhost', :worker_1_port, nodecluster => 'second-cluster');
master_add_secondary_node
---------------------------------------------------------------------
29
30
(1 row)
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
@ -830,7 +843,7 @@ SELECT master_update_node(:worker_1_node, 'somehost', 9000);
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
---------------------------------------------------------------------
16 | 14 | somehost | 9000 | default | f | t | primary | default | f | t
17 | 14 | somehost | 9000 | default | f | t | primary | default | f | t
(1 row)
-- cleanup
@ -843,7 +856,7 @@ SELECT master_update_node(:worker_1_node, 'localhost', :worker_1_port);
SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
---------------------------------------------------------------------
16 | 14 | localhost | 57637 | default | f | t | primary | default | f | t
17 | 14 | localhost | 57637 | default | f | t | primary | default | f | t
(1 row)
SET citus.shard_replication_factor TO 1;

View File

@ -130,15 +130,20 @@ UPDATE pg_dist_placement SET shardstate=4 WHERE groupid=:worker_2_group;
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
CREATE TABLE cluster_management_test_colocated (col_1 text, col_2 int);
-- Check that we warn the user about colocated shards that will not get created for shards that do not have active placements
SELECT create_distributed_table('cluster_management_test_colocated', 'col_1', 'hash', colocate_with=>'cluster_management_test');
SELECT create_distributed_table('cluster_management_test_colocated', 'col_1', 'hash', colocate_with => 'cluster_management_test');
-- Check that colocated shards don't get created for shards that are to be deleted
SELECT logicalrelid, shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard ORDER BY shardstate, shardid;
-- try to remove a node with only to be deleted placements and see that removal still fails
SELECT * INTO removed_placements FROM pg_dist_placement WHERE shardstate = 4;
-- try to remove a node with only to be deleted placements and see that removal succeeds
SELECT master_remove_node('localhost', :worker_2_port);
SELECT master_get_active_worker_nodes();
SELECT master_add_node('localhost', :worker_2_port, groupId := :worker_2_group);
-- put removed placements back for testing purposes(in practice we wouldn't have only old placements for a shard)
INSERT INTO pg_dist_placement SELECT * FROM removed_placements;
-- clean-up
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
UPDATE pg_dist_placement SET shardstate=1 WHERE groupid=:worker_2_group;