Add citus_cleanup_orphaned_shards UDF

Sometimes the background daemon doesn't cleanup orphaned shards quickly
enough. It's useful to have a UDF to trigger this removal when needed.
We already had a UDF like this but it was only used during testing. This
exposes that UDF to users. As a safety measure it cannot be run in a
transaction, because that would cause the background daemon to stop
cleaning up shards while this transaction is running.
pull/5024/head
Jelte Fennema 2021-06-02 12:57:44 +02:00
parent 0f37ab5f85
commit 7015049ea5
17 changed files with 190 additions and 372 deletions

View File

@ -12,6 +12,7 @@
#include "postgres.h"
#include "access/xact.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h"
@ -21,33 +22,66 @@
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(master_defer_delete_shards);
PG_FUNCTION_INFO_V1(citus_cleanup_orphaned_shards);
PG_FUNCTION_INFO_V1(isolation_cleanup_orphaned_shards);
static bool TryDropShard(GroupShardPlacement *placement);
static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode);
/*
* master_defer_delete_shards implements a user-facing UDF to deleter orphaned shards that
* are still haning around in the system. These shards are orphaned by previous actions
* that were not directly able to delete the placements eg. shard moving or dropping of a
* distributed table while one of the data nodes was not online.
* citus_cleanup_orphaned_shards implements a user-facing UDF to delete
* orphaned shards that are still haning around in the system. These shards are
* orphaned by previous actions that were not directly able to delete the
* placements eg. shard moving or dropping of a distributed table while one of
* the data nodes was not online.
*
* This function iterates through placements where shardstate is SHARD_STATE_TO_DELETE
* (shardstate = 4), drops the corresponding tables from the node and removes the
* placement information from the catalog.
* This function iterates through placements where shardstate is
* SHARD_STATE_TO_DELETE (shardstate = 4), drops the corresponding tables from
* the node and removes the placement information from the catalog.
*
* The function takes no arguments and runs cluster wide
* The function takes no arguments and runs cluster wide. It cannot be run in a
* transaction, because holding the locks it takes for a long time is not good.
* While the locks are held, it is impossible for the background daemon to
* cleanup orphaned shards.
*/
Datum
master_defer_delete_shards(PG_FUNCTION_ARGS)
citus_cleanup_orphaned_shards(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
PreventInTransactionBlock(true, "citus_cleanup_orphaned_shards");
bool waitForLocks = true;
int droppedShardCount = DropMarkedShards(waitForLocks);
if (droppedShardCount > 0)
{
ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount)));
}
PG_RETURN_VOID();
}
/*
* isolation_cleanup_orphaned_shards implements a test UDF that's the same as
* citus_cleanup_orphaned_shards. The only difference is that this command can
* be run in transactions, this is to test
*/
Datum
isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
bool waitForLocks = true;
int droppedShardCount = DropMarkedShards(waitForLocks);
if (droppedShardCount > 0)
{
ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount)));
}
PG_RETURN_INT32(droppedShardCount);
PG_RETURN_VOID();
}

View File

@ -47,3 +47,5 @@ WHERE repmodel = 'c'
DROP TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger ON pg_catalog.pg_dist_rebalance_strategy;
DROP FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check();
#include "udfs/citus_cleanup_orphaned_shards/10.1-1.sql"

View File

@ -84,3 +84,4 @@ CREATE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger
BEFORE INSERT OR UPDATE OR DELETE OR TRUNCATE ON pg_dist_rebalance_strategy
FOR EACH STATEMENT EXECUTE FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check();
DROP PROCEDURE pg_catalog.citus_cleanup_orphaned_shards();

View File

@ -0,0 +1,5 @@
CREATE OR REPLACE PROCEDURE pg_catalog.citus_cleanup_orphaned_shards()
LANGUAGE C
AS 'citus', $$citus_cleanup_orphaned_shards$$;
COMMENT ON PROCEDURE pg_catalog.citus_cleanup_orphaned_shards()
IS 'cleanup orphaned shards';

View File

@ -0,0 +1,5 @@
CREATE OR REPLACE PROCEDURE pg_catalog.citus_cleanup_orphaned_shards()
LANGUAGE C
AS 'citus', $$citus_cleanup_orphaned_shards$$;
COMMENT ON PROCEDURE pg_catalog.citus_cleanup_orphaned_shards()
IS 'cleanup orphaned shards';

View File

@ -61,12 +61,8 @@ SELECT count(*) FROM referencing_table2;
101
(1 row)
SELECT 1 FROM public.master_defer_delete_shards();
?column?
---------------------------------------------------------------------
1
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 2 orphaned shards
SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3;
name | relid | refd_relid
---------------------------------------------------------------------
@ -108,12 +104,8 @@ SELECT count(*) FROM referencing_table2;
101
(1 row)
SELECT 1 FROM public.master_defer_delete_shards();
?column?
---------------------------------------------------------------------
1
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 2 orphaned shards
SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3;
name | relid | refd_relid
---------------------------------------------------------------------

View File

@ -10,23 +10,19 @@ step s1-move-placement:
master_move_shard_placement
s1: NOTICE: cleaned up 1 orphaned shards
step s1-drop-marked-shards:
SELECT public.master_defer_delete_shards();
SET client_min_messages to NOTICE;
CALL isolation_cleanup_orphaned_shards();
master_defer_delete_shards
1
step s2-drop-marked-shards:
SET client_min_messages to DEBUG1;
SELECT public.master_defer_delete_shards();
CALL isolation_cleanup_orphaned_shards();
<waiting ...>
step s1-commit:
COMMIT;
step s2-drop-marked-shards: <... completed>
master_defer_delete_shards
0
starting permutation: s1-begin s1-move-placement s2-drop-marked-shards s1-drop-marked-shards s1-commit
step s1-begin:
@ -40,17 +36,13 @@ master_move_shard_placement
step s2-drop-marked-shards:
SET client_min_messages to DEBUG1;
SELECT public.master_defer_delete_shards();
CALL isolation_cleanup_orphaned_shards();
master_defer_delete_shards
0
s1: NOTICE: cleaned up 1 orphaned shards
step s1-drop-marked-shards:
SELECT public.master_defer_delete_shards();
SET client_min_messages to NOTICE;
CALL isolation_cleanup_orphaned_shards();
master_defer_delete_shards
1
step s1-commit:
COMMIT;
@ -82,14 +74,12 @@ run_commands_on_session_level_connection_to_node
step s1-drop-marked-shards:
SELECT public.master_defer_delete_shards();
SET client_min_messages to NOTICE;
CALL isolation_cleanup_orphaned_shards();
<waiting ...>
s1: WARNING: canceling statement due to lock timeout
s1: WARNING: Failed to drop 1 old shards out of 1
step s1-drop-marked-shards: <... completed>
master_defer_delete_shards
0
s1: WARNING: Failed to drop 1 old shards out of 1
step s1-commit:
COMMIT;

View File

@ -573,6 +573,7 @@ SELECT * FROM print_extension_changes();
function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint) |
function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name) TABLE(table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer) |
| function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) void
| function citus_cleanup_orphaned_shards()
| function citus_local_disk_space_stats() record
| function create_distributed_table(regclass,text,citus.distribution_type,text,integer) void
| function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint)
@ -580,7 +581,7 @@ SELECT * FROM print_extension_changes();
| function worker_partitioned_relation_size(regclass) bigint
| function worker_partitioned_relation_total_size(regclass) bigint
| function worker_partitioned_table_size(regclass) bigint
(14 rows)
(15 rows)
DROP TABLE prev_objects, extension_diff;
-- show running version

View File

@ -1,9 +1,3 @@
CREATE OR REPLACE FUNCTION master_defer_delete_shards()
RETURNS int
LANGUAGE C STRICT
AS 'citus', $$master_defer_delete_shards$$;
COMMENT ON FUNCTION master_defer_delete_shards()
IS 'remove orphaned shards';
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT

View File

@ -50,13 +50,14 @@ $cmd$);
(localhost,57638,t,1)
(2 rows)
-- Make sure this cannot be run in a transaction
BEGIN;
CALL citus_cleanup_orphaned_shards();
ERROR: citus_cleanup_orphaned_shards cannot run inside a transaction block
COMMIT;
-- execute delayed removal
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
1
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 1 orphaned shards
-- we expect the shard to be on only the second worker
SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000000';

View File

@ -31,24 +31,14 @@ SELECT rebalance_table_shards('dist_table_test');
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT rebalance_table_shards();
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
-- test that calling rebalance_table_shards without specifying relation
-- wouldn't move shard of the citus local table.
CREATE TABLE citus_local_table(a int, b int);
@ -65,12 +55,7 @@ SELECT rebalance_table_shards();
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
-- show that citus local table shard is still on the coordinator
SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%';
tablename
@ -101,12 +86,7 @@ SELECT pg_sleep(.1); -- wait to make sure the config has changed before running
SELECT master_drain_node('localhost', :master_port);
ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: <system specific error>
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
ALTER SYSTEM RESET citus.local_hostname;
SELECT pg_reload_conf();
pg_reload_conf
@ -126,12 +106,7 @@ SELECT master_drain_node('localhost', :master_port);
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
-- show that citus local table shard is still on the coordinator
SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%';
tablename
@ -579,7 +554,7 @@ AS $$
pg_dist_shard_placement src USING (shardid),
(SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst
WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass;
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
$$;
CALL create_unbalanced_shards('rebalance_test_table');
SET citus.shard_replication_factor TO 2;
@ -624,12 +599,7 @@ FROM (
WHERE logicalrelid = 'rebalance_test_table'::regclass
) T;
ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: <system specific error>
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
ALTER SYSTEM RESET citus.local_hostname;
SELECT pg_reload_conf();
pg_reload_conf
@ -658,12 +628,7 @@ FROM (
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
1
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -715,12 +680,7 @@ SELECT * FROM table_placements_per_node;
57638 | rebalance_test_table | 5
(2 rows)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT rebalance_table_shards('rebalance_test_table',
threshold := 0, max_shard_moves := 1,
shard_transfer_mode:='block_writes');
@ -729,12 +689,7 @@ SELECT rebalance_table_shards('rebalance_test_table',
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
1
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -749,12 +704,7 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 1, shard_tran
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -769,12 +719,7 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 0);
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
1
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -790,12 +735,7 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_tran
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -969,12 +909,7 @@ SELECT COUNT(*) FROM imbalanced_table;
-- Try force_logical
SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='force_logical');
ERROR: the force_logical transfer mode is currently unsupported
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
-- Test rebalance operation
SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes');
rebalance_table_shards
@ -982,12 +917,7 @@ SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_m
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
1
(1 row)
CALL citus_cleanup_orphaned_shards();
-- Confirm rebalance
-- Shard counts in each node after rebalance
SELECT * FROM public.table_placements_per_node;
@ -1024,12 +954,7 @@ FROM pg_dist_shard_placement
WHERE nodeport = :worker_2_port;
ERROR: Moving shards to a non-existing node is not supported
HINT: Add the target node via SELECT citus_add_node('localhost', 10000);
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
-- Try to move shards to a node where shards are not allowed
SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false);
master_set_node_property
@ -1073,12 +998,7 @@ WHERE nodeport = :worker_2_port;
(2 rows)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
2
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT create_distributed_table('colocated_rebalance_test2', 'id');
create_distributed_table
---------------------------------------------------------------------
@ -1106,12 +1026,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0,
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
-- Confirm that nothing changed
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
@ -1153,12 +1068,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0,
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
4
(1 row)
CALL citus_cleanup_orphaned_shards();
-- Check that we can call this function without a crash
SELECT * FROM get_rebalance_progress();
sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size
@ -1216,12 +1126,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0,
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
4
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1244,12 +1149,7 @@ SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold :
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
2
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1271,12 +1171,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0,
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
4
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1293,12 +1188,7 @@ SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold :
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
2
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1335,12 +1225,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
6
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1362,12 +1247,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
6
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1404,12 +1284,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
6
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1431,12 +1306,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
6
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1462,12 +1332,7 @@ SELECT * from master_drain_node('localhost', :worker_2_port, shard_transfer_mode
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
6
(1 row)
CALL citus_cleanup_orphaned_shards();
select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port;
shouldhaveshards
---------------------------------------------------------------------
@ -1495,12 +1360,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
6
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1607,12 +1467,7 @@ SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes')
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1627,12 +1482,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
1
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 1 orphaned shards
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1648,12 +1499,7 @@ DETAIL: Using threshold of 0.01
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1759,12 +1605,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
4
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 4 orphaned shards
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1859,12 +1701,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
3
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 3 orphaned shards
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1888,12 +1726,7 @@ SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes')
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1943,12 +1776,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
4
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 4 orphaned shards
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1973,20 +1802,10 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'non_
ERROR: could not find rebalance strategy with name non_existing
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing');
ERROR: could not find rebalance strategy with name non_existing
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing');
ERROR: could not find rebalance strategy with name non_existing
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT citus_set_default_rebalance_strategy('non_existing');
ERROR: strategy with specified name does not exist
UPDATE pg_dist_rebalance_strategy SET default_strategy=false;
@ -1994,20 +1813,10 @@ SELECT * FROM get_rebalance_table_shards_plan('tab');
ERROR: no rebalance_strategy was provided, but there is also no default strategy set
SELECT * FROM rebalance_table_shards('tab');
ERROR: no rebalance_strategy was provided, but there is also no default strategy set
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM master_drain_node('localhost', :worker_2_port);
ERROR: no rebalance_strategy was provided, but there is also no default strategy set
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count';
CREATE OR REPLACE FUNCTION shard_cost_no_arguments()
RETURNS real AS $$ SELECT 1.0::real $$ LANGUAGE sql;
@ -2283,12 +2092,7 @@ SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='bloc
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
3
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
count
---------------------------------------------------------------------
@ -2350,12 +2154,7 @@ SELECT rebalance_table_shards();
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
2
(1 row)
CALL citus_cleanup_orphaned_shards();
DROP TABLE t1, r1, r2;
-- verify there are no distributed tables before we perform the following tests. Preceding
-- test suites should clean up their distributed tables.
@ -2404,12 +2203,7 @@ SELECT rebalance_table_shards();
(1 row)
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
---------------------------------------------------------------------
0
(1 row)
CALL citus_cleanup_orphaned_shards();
-- verify the reference table is on all nodes after the rebalance
SELECT count(*)
FROM pg_dist_shard

View File

@ -38,6 +38,7 @@ ORDER BY 1;
function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real)
function citus_add_secondary_node(text,integer,text,integer,name)
function citus_blocking_pids(integer)
function citus_cleanup_orphaned_shards()
function citus_conninfo_cache_invalidate()
function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode)
function citus_create_restore_point(text)
@ -245,5 +246,5 @@ ORDER BY 1;
view citus_worker_stat_activity
view pg_dist_shard_placement
view time_partitions
(229 rows)
(230 rows)

View File

@ -26,12 +26,11 @@ setup
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
CREATE OR REPLACE FUNCTION master_defer_delete_shards()
RETURNS int
LANGUAGE C STRICT
AS 'citus', $$master_defer_delete_shards$$;
COMMENT ON FUNCTION master_defer_delete_shards()
IS 'remove orphaned shards';
CREATE OR REPLACE PROCEDURE isolation_cleanup_orphaned_shards()
LANGUAGE C
AS 'citus', $$isolation_cleanup_orphaned_shards$$;
COMMENT ON PROCEDURE isolation_cleanup_orphaned_shards()
IS 'cleanup orphaned shards';
SET citus.next_shard_id to 120000;
SET citus.shard_count TO 8;
@ -71,7 +70,8 @@ step "s1-move-placement-without-deferred" {
step "s1-drop-marked-shards"
{
SELECT public.master_defer_delete_shards();
SET client_min_messages to NOTICE;
CALL isolation_cleanup_orphaned_shards();
}
step "s1-lock-pg-dist-placement" {
@ -116,7 +116,7 @@ step "s2-select" {
step "s2-drop-marked-shards"
{
SET client_min_messages to DEBUG1;
SELECT public.master_defer_delete_shards();
CALL isolation_cleanup_orphaned_shards();
}
step "s2-commit" {

View File

@ -45,14 +45,14 @@ SELECT master_move_shard_placement(15000009, 'localhost', :worker_1_port, 'local
SELECT count(*) FROM referencing_table2;
SELECT 1 FROM public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3;
SELECT master_move_shard_placement(15000009, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
SELECT count(*) FROM referencing_table2;
SELECT 1 FROM public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3;
-- create a function to show the

View File

@ -1,10 +1,3 @@
CREATE OR REPLACE FUNCTION master_defer_delete_shards()
RETURNS int
LANGUAGE C STRICT
AS 'citus', $$master_defer_delete_shards$$;
COMMENT ON FUNCTION master_defer_delete_shards()
IS 'remove orphaned shards';
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT

View File

@ -31,8 +31,13 @@ SELECT run_command_on_workers($cmd$
SELECT count(*) FROM pg_class WHERE relname = 't1_20000000';
$cmd$);
-- Make sure this cannot be run in a transaction
BEGIN;
CALL citus_cleanup_orphaned_shards();
COMMIT;
-- execute delayed removal
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
-- we expect the shard to be on only the second worker
SELECT run_command_on_workers($cmd$

View File

@ -13,9 +13,9 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);
-- should just be noops even if we add the coordinator to the pg_dist_node
SELECT rebalance_table_shards('dist_table_test');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT rebalance_table_shards();
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
-- test that calling rebalance_table_shards without specifying relation
@ -25,7 +25,7 @@ SELECT citus_add_local_table_to_metadata('citus_local_table');
INSERT INTO citus_local_table VALUES (1, 2);
SELECT rebalance_table_shards();
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
-- show that citus local table shard is still on the coordinator
SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%';
@ -38,14 +38,14 @@ SELECT pg_reload_conf();
SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC
SELECT master_drain_node('localhost', :master_port);
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
ALTER SYSTEM RESET citus.local_hostname;
SELECT pg_reload_conf();
SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC
SELECT master_drain_node('localhost', :master_port);
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
-- show that citus local table shard is still on the coordinator
SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%';
@ -396,7 +396,7 @@ AS $$
pg_dist_shard_placement src USING (shardid),
(SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst
WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass;
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
$$;
CALL create_unbalanced_shards('rebalance_test_table');
@ -428,7 +428,7 @@ FROM (
FROM pg_dist_shard
WHERE logicalrelid = 'rebalance_test_table'::regclass
) T;
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
ALTER SYSTEM RESET citus.local_hostname;
SELECT pg_reload_conf();
@ -445,7 +445,7 @@ FROM (
FROM pg_dist_shard
WHERE logicalrelid = 'rebalance_test_table'::regclass
) T;
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node;
@ -480,26 +480,26 @@ SELECT rebalance_table_shards('rebalance_test_table',
RESET ROLE;
-- Confirm no moves took place at all during these errors
SELECT * FROM table_placements_per_node;
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT rebalance_table_shards('rebalance_test_table',
threshold := 0, max_shard_moves := 1,
shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node;
-- Check that threshold=1 doesn't move any shards
SELECT rebalance_table_shards('rebalance_test_table', threshold := 1, shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node;
-- Move the remaining shards using threshold=0
SELECT rebalance_table_shards('rebalance_test_table', threshold := 0);
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node;
@ -507,7 +507,7 @@ SELECT * FROM table_placements_per_node;
-- any effects.
SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node;
@ -602,11 +602,11 @@ SELECT COUNT(*) FROM imbalanced_table;
-- Try force_logical
SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='force_logical');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
-- Test rebalance operation
SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
-- Confirm rebalance
-- Shard counts in each node after rebalance
@ -633,7 +633,7 @@ SELECT create_distributed_table('colocated_rebalance_test', 'id');
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', 10000, 'block_writes')
FROM pg_dist_shard_placement
WHERE nodeport = :worker_2_port;
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
-- Try to move shards to a node where shards are not allowed
SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false);
@ -660,7 +660,7 @@ UPDATE pg_dist_node SET noderole = 'primary' WHERE nodeport = :worker_1_port;
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes')
FROM pg_dist_shard_placement
WHERE nodeport = :worker_2_port;
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT create_distributed_table('colocated_rebalance_test2', 'id');
@ -671,7 +671,7 @@ SELECT * FROM public.table_placements_per_node;
SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0, drain_only := true);
-- Running with drain_only shouldn't do anything
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true);
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
-- Confirm that nothing changed
SELECT * FROM public.table_placements_per_node;
@ -684,7 +684,7 @@ SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', rebala
SELECT * FROM get_rebalance_progress();
-- Actually do the rebalance
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
-- Check that we can call this function without a crash
SELECT * FROM get_rebalance_progress();
@ -702,22 +702,22 @@ SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaves
SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0);
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
SELECT * FROM get_rebalance_table_shards_plan('non_colocated_rebalance_test', threshold := 0);
SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
-- Put shards back
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
-- testing behaviour when setting shouldhaveshards to false and rebalancing all
@ -725,13 +725,13 @@ SELECT * FROM public.table_placements_per_node;
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
SELECT * FROM get_rebalance_table_shards_plan(threshold := 0, drain_only := true);
SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true);
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
-- Put shards back
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
-- testing behaviour when setting shouldhaveshards to false and rebalancing all
@ -739,13 +739,13 @@ SELECT * FROM public.table_placements_per_node;
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
SELECT * FROM get_rebalance_table_shards_plan(threshold := 0);
SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
-- Put shards back
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
-- Make it a data node again
@ -753,14 +753,14 @@ SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaves
-- testing behaviour of master_drain_node
SELECT * from master_drain_node('localhost', :worker_2_port, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port;
SELECT * FROM public.table_placements_per_node;
-- Put shards back
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
@ -829,15 +829,15 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_d
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', threshold := 0);
SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes', threshold := 0);
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
-- Check that sizes of colocated tables are added together for rebalances
@ -888,7 +888,7 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_d
-- supports improvement_threshold
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', improvement_threshold := 0);
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
ANALYZE tab, tab2;
@ -945,13 +945,13 @@ SELECT citus_add_rebalance_strategy(
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_2');
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_2', shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
SELECT citus_set_default_rebalance_strategy('capacity_high_worker_2');
SELECT * FROM get_rebalance_table_shards_plan('tab');
SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
CREATE FUNCTION only_worker_1(shardid bigint, nodeidarg int)
@ -972,7 +972,7 @@ SELECT citus_add_rebalance_strategy(
SELECT citus_set_default_rebalance_strategy('only_worker_1');
SELECT * FROM get_rebalance_table_shards_plan('tab');
SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
SELECT citus_set_default_rebalance_strategy('by_shard_count');
@ -981,18 +981,18 @@ SELECT * FROM get_rebalance_table_shards_plan('tab');
-- Check all the error handling cases
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'non_existing');
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT citus_set_default_rebalance_strategy('non_existing');
UPDATE pg_dist_rebalance_strategy SET default_strategy=false;
SELECT * FROM get_rebalance_table_shards_plan('tab');
SELECT * FROM rebalance_table_shards('tab');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT * FROM master_drain_node('localhost', :worker_2_port);
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count';
CREATE OR REPLACE FUNCTION shard_cost_no_arguments()
@ -1222,7 +1222,7 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);
SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='block_writes');
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
@ -1255,7 +1255,7 @@ INSERT INTO r2 VALUES (1,2), (3,4);
SELECT 1 from master_add_node('localhost', :worker_2_port);
SELECT rebalance_table_shards();
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
DROP TABLE t1, r1, r2;
@ -1282,7 +1282,7 @@ WHERE logicalrelid = 'r1'::regclass;
-- rebalance with _only_ a reference table, this should trigger the copy
SELECT rebalance_table_shards();
SELECT public.master_defer_delete_shards();
CALL citus_cleanup_orphaned_shards();
-- verify the reference table is on all nodes after the rebalance
SELECT count(*)