From 3218e34be9c5f199040231a0a5789b794e6c2e77 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Fri, 28 May 2021 12:31:47 +0300 Subject: [PATCH] update cluster test --- .../distributed/metadata/metadata_utility.c | 20 +++++++ .../distributed/metadata/node_metadata.c | 33 +++++++++-- src/include/distributed/metadata_utility.h | 1 + .../expected/multi_cluster_management.out | 55 ++++++++++++------- .../regress/sql/multi_cluster_management.sql | 9 ++- 5 files changed, 90 insertions(+), 28 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 44ba74633..a5bd46e64 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1288,6 +1288,26 @@ ShardLength(uint64 shardId) } +/* + * NodeGroupHasLivePlacements returns true if there is any placement + * on the given node group which is not a SHARD_STATE_TO_DELETE placement. + */ +bool +NodeGroupHasLivePlacements(int32 groupId) +{ + List *shardPlacements = AllShardPlacementsOnNodeGroup(groupId); + GroupShardPlacement *placement = NULL; + foreach_ptr(placement, shardPlacements) + { + if (placement->shardState != SHARD_STATE_TO_DELETE) + { + return true; + } + } + return false; +} + + /* * NodeGroupHasShardPlacements returns whether any active shards are placed on the group */ diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index c10aaedac..0b4f03f93 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -112,7 +112,7 @@ static bool UnsetMetadataSyncedForAll(void); static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *field); static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); - +static void RemoveOldShardPlacementForNodeGroup(int groupId); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_set_coordinator_host); @@ -422,7 +422,7 @@ citus_disable_node(PG_FUNCTION_ARGS) char *nodeName = text_to_cstring(nodeNameText); WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); bool isActive = false; - bool onlyConsiderActivePlacements = true; + bool onlyConsiderActivePlacements = false; MemoryContext savedContext = CurrentMemoryContext; PG_TRY(); @@ -1291,9 +1291,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) */ DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId); } - bool onlyConsiderActivePlacements = true; - if (NodeGroupHasShardPlacements(workerNode->groupId, - onlyConsiderActivePlacements)) + if (NodeGroupHasLivePlacements(workerNode->groupId)) { if (ClusterHasReferenceTable()) { @@ -1320,6 +1318,8 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) DeleteNodeRow(workerNode->workerName, nodePort); + RemoveOldShardPlacementForNodeGroup(workerNode->groupId); + char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); /* make sure we don't have any lingering session lifespan connections */ @@ -1329,6 +1329,29 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) } +/* + * RemoveOldShardPlacementForNodeGroup removes all old shard placements + * for the given node group from pg_dist_placement. + */ +static void +RemoveOldShardPlacementForNodeGroup(int groupId) +{ + /* + * Prevent concurrent deferred drop + */ + LockPlacementCleanup(); + List *shardPlacementsOnNode = AllShardPlacementsOnNodeGroup(groupId); + GroupShardPlacement *placement = NULL; + foreach_ptr(placement, shardPlacementsOnNode) + { + if (placement->shardState == SHARD_STATE_TO_DELETE) + { + DeleteShardPlacementRow(placement->placementId); + } + } +} + + /* * CanRemoveReferenceTablePlacements returns true if active primary * node count is more than 1, which means that even if we remove a node diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index efed8dc2a..9b6942b02 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -209,6 +209,7 @@ extern int ShardIntervalCount(Oid relationId); extern List * LoadShardList(Oid relationId); extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval); extern uint64 ShardLength(uint64 shardId); +extern bool NodeGroupHasLivePlacements(int32 groupId); extern bool NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements); extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId); diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 7e7234e45..297eb89ec 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -330,7 +330,7 @@ SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHER CREATE TABLE cluster_management_test_colocated (col_1 text, col_2 int); -- Check that we warn the user about colocated shards that will not get created for shards that do not have active placements -SELECT create_distributed_table('cluster_management_test_colocated', 'col_1', 'hash', colocate_with=>'cluster_management_test'); +SELECT create_distributed_table('cluster_management_test_colocated', 'col_1', 'hash', colocate_with => 'cluster_management_test'); WARNING: could not find any shard placements for shardId 1220017 WARNING: could not find any shard placements for shardId 1220019 WARNING: could not find any shard placements for shardId 1220021 @@ -374,17 +374,30 @@ SELECT logicalrelid, shardid, shardstate, nodename, nodeport FROM pg_dist_shard_ cluster_management_test | 1220015 | 4 | localhost | 57638 (24 rows) --- try to remove a node with only to be deleted placements and see that removal still fails +SELECT * INTO removed_placements FROM pg_dist_placement WHERE shardstate = 4; +-- try to remove a node with only to be deleted placements and see that removal succeeds SELECT master_remove_node('localhost', :worker_2_port); -ERROR: cannot remove the primary node of a node group which has shard placements -HINT: To proceed, either drop the distributed tables or use undistribute_table() function to convert them to local tables + master_remove_node +--------------------------------------------------------------------- + +(1 row) + SELECT master_get_active_worker_nodes(); master_get_active_worker_nodes --------------------------------------------------------------------- - (localhost,57638) (localhost,57637) -(2 rows) +(1 row) +SELECT master_add_node('localhost', :worker_2_port, groupId := :worker_2_group); +WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker +DETAIL: distributed objects are only kept in sync when citus.enable_object_propagation is set to on. Newly activated nodes will not get these objects created + master_add_node +--------------------------------------------------------------------- + 7 +(1 row) + +-- put removed placements back for testing purposes(in practice we wouldn't have only old placements for a shard) +INSERT INTO pg_dist_placement SELECT * FROM removed_placements; -- clean-up SELECT 1 FROM master_add_node('localhost', :worker_2_port); ?column? @@ -519,14 +532,14 @@ WARNING: citus.enable_object_propagation is off, not creating distributed objec DETAIL: distributed objects are only kept in sync when citus.enable_object_propagation is set to on. Newly activated nodes will not get these objects created master_add_node | master_add_node --------------------------------------------------------------------- - 11 | 12 + 12 | 13 (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards --------------------------------------------------------------------- - 11 | 9 | localhost | 57637 | default | f | t | primary | default | f | t - 12 | 10 | localhost | 57638 | default | f | t | primary | default | f | t + 12 | 9 | localhost | 57637 | default | f | t | primary | default | f | t + 13 | 10 | localhost | 57638 | default | f | t | primary | default | f | t (2 rows) -- check that mixed add/remove node commands work fine inside transaction @@ -724,13 +737,13 @@ SELECT 1 FROM master_add_inactive_node('localhost', 9996, groupid => :worker_2_g SELECT master_add_inactive_node('localhost', 9999, groupid => :worker_2_group, nodecluster => 'olap', noderole => 'secondary'); master_add_inactive_node --------------------------------------------------------------------- - 22 + 23 (1 row) SELECT master_activate_node('localhost', 9999); master_activate_node --------------------------------------------------------------------- - 22 + 23 (1 row) SELECT master_disable_node('localhost', 9999); @@ -758,17 +771,17 @@ CONTEXT: PL/pgSQL function citus_internal.pg_dist_node_trigger_func() line 18 a INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole, nodecluster) VALUES ('localhost', 5000, 1000, 'primary', 'olap'); ERROR: new row for relation "pg_dist_node" violates check constraint "primaries_are_only_allowed_in_the_default_cluster" -DETAIL: Failing row contains (24, 1000, localhost, 5000, default, f, t, primary, olap, f, t). +DETAIL: Failing row contains (25, 1000, localhost, 5000, default, f, t, primary, olap, f, t). UPDATE pg_dist_node SET nodecluster = 'olap' WHERE nodeport = :worker_1_port; ERROR: new row for relation "pg_dist_node" violates check constraint "primaries_are_only_allowed_in_the_default_cluster" -DETAIL: Failing row contains (16, 14, localhost, 57637, default, f, t, primary, olap, f, t). +DETAIL: Failing row contains (17, 14, localhost, 57637, default, f, t, primary, olap, f, t). -- check that you /can/ add a secondary node to a non-default cluster SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset SELECT master_add_node('localhost', 8888, groupid => :worker_1_group, noderole => 'secondary', nodecluster=> 'olap'); master_add_node --------------------------------------------------------------------- - 25 + 26 (1 row) -- check that super-long cluster names are truncated @@ -781,13 +794,13 @@ SELECT master_add_node('localhost', 8887, groupid => :worker_1_group, noderole = ); master_add_node --------------------------------------------------------------------- - 26 + 27 (1 row) SELECT * FROM pg_dist_node WHERE nodeport=8887; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards --------------------------------------------------------------------- - 26 | 14 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars. | f | t + 27 | 14 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars. | f | t (1 row) -- don't remove the secondary and unavailable nodes, check that no commands are sent to @@ -796,13 +809,13 @@ SELECT * FROM pg_dist_node WHERE nodeport=8887; SELECT master_add_secondary_node('localhost', 9995, 'localhost', :worker_1_port); master_add_secondary_node --------------------------------------------------------------------- - 27 + 28 (1 row) SELECT master_add_secondary_node('localhost', 9994, primaryname => 'localhost', primaryport => :worker_2_port); master_add_secondary_node --------------------------------------------------------------------- - 28 + 29 (1 row) SELECT master_add_secondary_node('localhost', 9993, 'localhost', 2000); @@ -810,7 +823,7 @@ ERROR: node at "localhost:xxxxx" does not exist SELECT master_add_secondary_node('localhost', 9992, 'localhost', :worker_1_port, nodecluster => 'second-cluster'); master_add_secondary_node --------------------------------------------------------------------- - 29 + 30 (1 row) SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset @@ -830,7 +843,7 @@ SELECT master_update_node(:worker_1_node, 'somehost', 9000); SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards --------------------------------------------------------------------- - 16 | 14 | somehost | 9000 | default | f | t | primary | default | f | t + 17 | 14 | somehost | 9000 | default | f | t | primary | default | f | t (1 row) -- cleanup @@ -843,7 +856,7 @@ SELECT master_update_node(:worker_1_node, 'localhost', :worker_1_port); SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards --------------------------------------------------------------------- - 16 | 14 | localhost | 57637 | default | f | t | primary | default | f | t + 17 | 14 | localhost | 57637 | default | f | t | primary | default | f | t (1 row) SET citus.shard_replication_factor TO 1; diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index 85a0578b3..baa6be2c1 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -130,15 +130,20 @@ UPDATE pg_dist_placement SET shardstate=4 WHERE groupid=:worker_2_group; SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; CREATE TABLE cluster_management_test_colocated (col_1 text, col_2 int); -- Check that we warn the user about colocated shards that will not get created for shards that do not have active placements -SELECT create_distributed_table('cluster_management_test_colocated', 'col_1', 'hash', colocate_with=>'cluster_management_test'); +SELECT create_distributed_table('cluster_management_test_colocated', 'col_1', 'hash', colocate_with => 'cluster_management_test'); -- Check that colocated shards don't get created for shards that are to be deleted SELECT logicalrelid, shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard ORDER BY shardstate, shardid; --- try to remove a node with only to be deleted placements and see that removal still fails +SELECT * INTO removed_placements FROM pg_dist_placement WHERE shardstate = 4; +-- try to remove a node with only to be deleted placements and see that removal succeeds SELECT master_remove_node('localhost', :worker_2_port); SELECT master_get_active_worker_nodes(); +SELECT master_add_node('localhost', :worker_2_port, groupId := :worker_2_group); +-- put removed placements back for testing purposes(in practice we wouldn't have only old placements for a shard) +INSERT INTO pg_dist_placement SELECT * FROM removed_placements; + -- clean-up SELECT 1 FROM master_add_node('localhost', :worker_2_port); UPDATE pg_dist_placement SET shardstate=1 WHERE groupid=:worker_2_group;