From 280b9ae018ba5a87acaa3392a1c91379b7bc52c1 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Wed, 2 Jun 2021 16:48:45 +0200 Subject: [PATCH] Cleanup orphaned shards at the start of a rebalance In case the background daemon hasn't cleaned up shards yet, we do this manually at the start of a rebalance. --- .../distributed/operations/shard_cleaner.c | 18 +++++++ .../distributed/operations/shard_rebalancer.c | 3 ++ src/include/distributed/shard_cleaner.h | 1 + .../regress/expected/shard_rebalancer.out | 53 ++++++++++++++++++- src/test/regress/sql/shard_rebalancer.sql | 21 +++++++- 5 files changed, 93 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index 80b243d51..d01fd6eb3 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -13,10 +13,12 @@ #include "postgres.h" #include "access/xact.h" +#include "postmaster/postmaster.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/shard_cleaner.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/worker_transaction.h" @@ -85,6 +87,22 @@ isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS) } +/* + * DropMarkedShardsInDifferentTransaction cleans up orphaned shards by + * connecting to localhost. This is done, so that the locks that + * DropMarkedShards takes are only held for a short time. + */ +void +DropMarkedShardsInDifferentTransaction(void) +{ + int connectionFlag = FORCE_NEW_CONNECTION; + MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, + PostPortNumber); + ExecuteCriticalRemoteCommand(connection, "CALL citus_cleanup_orphaned_shards();"); + CloseConnection(connection); +} + + /* * TryDropMarkedShards is a wrapper around DropMarkedShards that catches * any errors to make it safe to use in the maintenance daemon. diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 92f37ec05..d9832f306 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -43,6 +43,7 @@ #include "distributed/repair_shards.h" #include "distributed/resource_lock.h" #include "distributed/shard_rebalancer.h" +#include "distributed/shard_cleaner.h" #include "distributed/tuplestore.h" #include "distributed/worker_protocol.h" #include "funcapi.h" @@ -700,6 +701,8 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, "unsupported"))); } + DropMarkedShardsInDifferentTransaction(); + foreach(placementUpdateCell, placementUpdateList) { PlacementUpdateEvent *placementUpdate = lfirst(placementUpdateCell); diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index b48d89522..103097397 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -19,5 +19,6 @@ extern bool CheckAvailableSpaceBeforeMove; extern int TryDropMarkedShards(bool waitForLocks); extern int DropMarkedShards(bool waitForLocks); +extern void DropMarkedShardsInDifferentTransaction(void); #endif /*CITUS_SHARD_CLEANER_H */ diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 037107848..33852f037 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -156,7 +156,6 @@ SELECT pg_sleep(.1); -- wait to make sure the config has changed before running SET citus.shard_replication_factor TO 2; SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes'); -NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); @@ -1075,7 +1074,57 @@ SELECT * FROM get_rebalance_progress(); --------------------------------------------------------------------- (0 rows) --- Confirm that the nodes are now there +-- Confirm that the shards are now there +SELECT * FROM public.table_placements_per_node; + nodeport | logicalrelid | count +--------------------------------------------------------------------- + 57637 | colocated_rebalance_test | 2 + 57638 | colocated_rebalance_test | 2 + 57637 | colocated_rebalance_test2 | 2 + 57638 | colocated_rebalance_test2 | 2 +(4 rows) + +CALL citus_cleanup_orphaned_shards(); +select * from pg_dist_placement; + placementid | shardid | shardstate | shardlength | groupid +--------------------------------------------------------------------- + 135 | 123023 | 1 | 0 | 14 + 138 | 123024 | 1 | 0 | 14 + 141 | 123027 | 1 | 0 | 14 + 142 | 123028 | 1 | 0 | 14 + 143 | 123021 | 1 | 0 | 16 + 144 | 123025 | 1 | 0 | 16 + 145 | 123022 | 1 | 0 | 16 + 146 | 123026 | 1 | 0 | 16 +(8 rows) + +-- Move all shards to worker1 again +SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') +FROM pg_dist_shard NATURAL JOIN pg_dist_placement NATURAL JOIN pg_dist_node +WHERE nodeport = :worker_2_port AND logicalrelid = 'colocated_rebalance_test'::regclass; + master_move_shard_placement +--------------------------------------------------------------------- + + +(2 rows) + +-- Confirm that the shards are now all on worker1 +SELECT * FROM public.table_placements_per_node; + nodeport | logicalrelid | count +--------------------------------------------------------------------- + 57637 | colocated_rebalance_test | 4 + 57637 | colocated_rebalance_test2 | 4 +(2 rows) + +-- Explicitly don't run citus_cleanup_orphaned_shards, rebalance_table_shards +-- should do that for automatically. +SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +-- Confirm that the shards are now moved SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index 636fa2341..0e41bac74 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -688,7 +688,26 @@ CALL citus_cleanup_orphaned_shards(); -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); --- Confirm that the nodes are now there +-- Confirm that the shards are now there +SELECT * FROM public.table_placements_per_node; + +CALL citus_cleanup_orphaned_shards(); +select * from pg_dist_placement; + + +-- Move all shards to worker1 again +SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') +FROM pg_dist_shard NATURAL JOIN pg_dist_placement NATURAL JOIN pg_dist_node +WHERE nodeport = :worker_2_port AND logicalrelid = 'colocated_rebalance_test'::regclass; + +-- Confirm that the shards are now all on worker1 +SELECT * FROM public.table_placements_per_node; + +-- Explicitly don't run citus_cleanup_orphaned_shards, rebalance_table_shards +-- should do that for automatically. +SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); + +-- Confirm that the shards are now moved SELECT * FROM public.table_placements_per_node;