mirror of https://github.com/citusdata/citus.git
Cleanup orphaned shards at the start of a rebalance
In case the background daemon hasn't cleaned up shards yet, we do this manually at the start of a rebalance.pull/5024/head
parent
7015049ea5
commit
280b9ae018
|
@ -13,10 +13,12 @@
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
|
#include "postmaster/postmaster.h"
|
||||||
|
|
||||||
#include "distributed/coordinator_protocol.h"
|
#include "distributed/coordinator_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/shard_cleaner.h"
|
#include "distributed/shard_cleaner.h"
|
||||||
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
|
|
||||||
|
@ -85,6 +87,22 @@ isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DropMarkedShardsInDifferentTransaction cleans up orphaned shards by
|
||||||
|
* connecting to localhost. This is done, so that the locks that
|
||||||
|
* DropMarkedShards takes are only held for a short time.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
DropMarkedShardsInDifferentTransaction(void)
|
||||||
|
{
|
||||||
|
int connectionFlag = FORCE_NEW_CONNECTION;
|
||||||
|
MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName,
|
||||||
|
PostPortNumber);
|
||||||
|
ExecuteCriticalRemoteCommand(connection, "CALL citus_cleanup_orphaned_shards();");
|
||||||
|
CloseConnection(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TryDropMarkedShards is a wrapper around DropMarkedShards that catches
|
* TryDropMarkedShards is a wrapper around DropMarkedShards that catches
|
||||||
* any errors to make it safe to use in the maintenance daemon.
|
* any errors to make it safe to use in the maintenance daemon.
|
||||||
|
|
|
@ -43,6 +43,7 @@
|
||||||
#include "distributed/repair_shards.h"
|
#include "distributed/repair_shards.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shard_rebalancer.h"
|
#include "distributed/shard_rebalancer.h"
|
||||||
|
#include "distributed/shard_cleaner.h"
|
||||||
#include "distributed/tuplestore.h"
|
#include "distributed/tuplestore.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "funcapi.h"
|
#include "funcapi.h"
|
||||||
|
@ -700,6 +701,8 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid,
|
||||||
"unsupported")));
|
"unsupported")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DropMarkedShardsInDifferentTransaction();
|
||||||
|
|
||||||
foreach(placementUpdateCell, placementUpdateList)
|
foreach(placementUpdateCell, placementUpdateList)
|
||||||
{
|
{
|
||||||
PlacementUpdateEvent *placementUpdate = lfirst(placementUpdateCell);
|
PlacementUpdateEvent *placementUpdate = lfirst(placementUpdateCell);
|
||||||
|
|
|
@ -19,5 +19,6 @@ extern bool CheckAvailableSpaceBeforeMove;
|
||||||
|
|
||||||
extern int TryDropMarkedShards(bool waitForLocks);
|
extern int TryDropMarkedShards(bool waitForLocks);
|
||||||
extern int DropMarkedShards(bool waitForLocks);
|
extern int DropMarkedShards(bool waitForLocks);
|
||||||
|
extern void DropMarkedShardsInDifferentTransaction(void);
|
||||||
|
|
||||||
#endif /*CITUS_SHARD_CLEANER_H */
|
#endif /*CITUS_SHARD_CLEANER_H */
|
||||||
|
|
|
@ -156,7 +156,6 @@ SELECT pg_sleep(.1); -- wait to make sure the config has changed before running
|
||||||
|
|
||||||
SET citus.shard_replication_factor TO 2;
|
SET citus.shard_replication_factor TO 2;
|
||||||
SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes');
|
SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes');
|
||||||
NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
|
||||||
ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: <system specific error>
|
ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: <system specific error>
|
||||||
ALTER SYSTEM RESET citus.local_hostname;
|
ALTER SYSTEM RESET citus.local_hostname;
|
||||||
SELECT pg_reload_conf();
|
SELECT pg_reload_conf();
|
||||||
|
@ -1075,7 +1074,57 @@ SELECT * FROM get_rebalance_progress();
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
-- Confirm that the nodes are now there
|
-- Confirm that the shards are now there
|
||||||
|
SELECT * FROM public.table_placements_per_node;
|
||||||
|
nodeport | logicalrelid | count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
57637 | colocated_rebalance_test | 2
|
||||||
|
57638 | colocated_rebalance_test | 2
|
||||||
|
57637 | colocated_rebalance_test2 | 2
|
||||||
|
57638 | colocated_rebalance_test2 | 2
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
CALL citus_cleanup_orphaned_shards();
|
||||||
|
select * from pg_dist_placement;
|
||||||
|
placementid | shardid | shardstate | shardlength | groupid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
135 | 123023 | 1 | 0 | 14
|
||||||
|
138 | 123024 | 1 | 0 | 14
|
||||||
|
141 | 123027 | 1 | 0 | 14
|
||||||
|
142 | 123028 | 1 | 0 | 14
|
||||||
|
143 | 123021 | 1 | 0 | 16
|
||||||
|
144 | 123025 | 1 | 0 | 16
|
||||||
|
145 | 123022 | 1 | 0 | 16
|
||||||
|
146 | 123026 | 1 | 0 | 16
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
-- Move all shards to worker1 again
|
||||||
|
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes')
|
||||||
|
FROM pg_dist_shard NATURAL JOIN pg_dist_placement NATURAL JOIN pg_dist_node
|
||||||
|
WHERE nodeport = :worker_2_port AND logicalrelid = 'colocated_rebalance_test'::regclass;
|
||||||
|
master_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- Confirm that the shards are now all on worker1
|
||||||
|
SELECT * FROM public.table_placements_per_node;
|
||||||
|
nodeport | logicalrelid | count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
57637 | colocated_rebalance_test | 4
|
||||||
|
57637 | colocated_rebalance_test2 | 4
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- Explicitly don't run citus_cleanup_orphaned_shards, rebalance_table_shards
|
||||||
|
-- should do that for automatically.
|
||||||
|
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
|
||||||
|
rebalance_table_shards
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Confirm that the shards are now moved
|
||||||
SELECT * FROM public.table_placements_per_node;
|
SELECT * FROM public.table_placements_per_node;
|
||||||
nodeport | logicalrelid | count
|
nodeport | logicalrelid | count
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
|
@ -688,7 +688,26 @@ CALL citus_cleanup_orphaned_shards();
|
||||||
-- Check that we can call this function without a crash
|
-- Check that we can call this function without a crash
|
||||||
SELECT * FROM get_rebalance_progress();
|
SELECT * FROM get_rebalance_progress();
|
||||||
|
|
||||||
-- Confirm that the nodes are now there
|
-- Confirm that the shards are now there
|
||||||
|
SELECT * FROM public.table_placements_per_node;
|
||||||
|
|
||||||
|
CALL citus_cleanup_orphaned_shards();
|
||||||
|
select * from pg_dist_placement;
|
||||||
|
|
||||||
|
|
||||||
|
-- Move all shards to worker1 again
|
||||||
|
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes')
|
||||||
|
FROM pg_dist_shard NATURAL JOIN pg_dist_placement NATURAL JOIN pg_dist_node
|
||||||
|
WHERE nodeport = :worker_2_port AND logicalrelid = 'colocated_rebalance_test'::regclass;
|
||||||
|
|
||||||
|
-- Confirm that the shards are now all on worker1
|
||||||
|
SELECT * FROM public.table_placements_per_node;
|
||||||
|
|
||||||
|
-- Explicitly don't run citus_cleanup_orphaned_shards, rebalance_table_shards
|
||||||
|
-- should do that for automatically.
|
||||||
|
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
|
||||||
|
|
||||||
|
-- Confirm that the shards are now moved
|
||||||
SELECT * FROM public.table_placements_per_node;
|
SELECT * FROM public.table_placements_per_node;
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue