From 82f34a8d886fbad6f4c391d71b151a1f4b317c7a Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Fri, 21 May 2021 10:48:32 +0300 Subject: [PATCH] Enable citus.defer_drop_after_shard_move by default (#4961) Enable citus.defer_drop_after_shard_move by default --- .../distributed/operations/repair_shards.c | 19 +- src/backend/distributed/shared_library_init.c | 4 +- src/backend/distributed/utils/maintenanced.c | 2 +- ...reign_key_to_reference_shard_rebalance.out | 12 + ...ion_blocking_move_multi_shard_commands.out | 53 ++-- ...ocking_move_multi_shard_commands_on_mx.out | 55 ++-- ...on_blocking_move_single_shard_commands.out | 46 ++-- ...cking_move_single_shard_commands_on_mx.out | 44 ++- .../isolation_rebalancer_deferred_drop.out | 37 ++- .../isolation_shard_rebalancer_progress.out | 4 +- .../multi_colocated_shard_rebalance.out | 19 +- src/test/regress/expected/multi_move_mx.out | 5 +- .../expected/shard_move_deferred_delete.out | 5 +- .../regress/expected/shard_rebalancer.out | 255 ++++++++++++++++++ src/test/regress/pg_regress_multi.pl | 1 + ...on_blocking_move_multi_shard_commands.spec | 7 +- ...cking_move_multi_shard_commands_on_mx.spec | 5 +- ...n_blocking_move_single_shard_commands.spec | 5 +- ...king_move_single_shard_commands_on_mx.spec | 3 +- .../isolation_rebalancer_deferred_drop.spec | 23 +- ...reign_key_to_reference_shard_rebalance.sql | 3 + .../sql/multi_colocated_shard_rebalance.sql | 20 +- src/test/regress/sql/multi_move_mx.sql | 5 +- .../sql/shard_move_deferred_delete.sql | 5 +- src/test/regress/sql/shard_rebalancer.sql | 46 ++++ 25 files changed, 539 insertions(+), 144 deletions(-) diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index ff46fa563..cfc7010bb 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -1062,9 +1062,22 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo targetNodePort); if (targetPlacement != NULL) { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("shard " INT64_FORMAT " already exists in the target node", - shardId))); + if (targetPlacement->shardState == SHARD_STATE_TO_DELETE) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg( + "shard " INT64_FORMAT " already exists in the target node", + shardId), + errdetail( + "The existing shard is marked for deletion, but could not be deleted because there are still active queries on it"))); + } + else + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg( + "shard " INT64_FORMAT " already exists in the target node", + shardId))); + } } } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index e2e1410f6..84b33af04 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -640,7 +640,7 @@ RegisterCitusConfigVariables(void) "citus.defer_shard_delete_interval to make sure defered deletions " "will be executed"), &DeferShardDeleteOnMove, - false, + true, PGC_USERSET, 0, NULL, NULL, NULL); @@ -655,7 +655,7 @@ RegisterCitusConfigVariables(void) "the background worker moves on. When set to -1 this background " "process is skipped."), &DeferShardDeleteInterval, - -1, -1, 7 * 24 * 3600 * 1000, + 15000, -1, 7 * 24 * 3600 * 1000, PGC_SIGHUP, GUC_UNIT_MS, NULL, NULL, NULL); diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index af52ddf9f..293698180 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -93,7 +93,7 @@ typedef struct MaintenanceDaemonDBData /* config variable for distributed deadlock detection timeout */ double DistributedDeadlockDetectionTimeoutFactor = 2.0; int Recover2PCInterval = 60000; -int DeferShardDeleteInterval = 60000; +int DeferShardDeleteInterval = 15000; /* config variables for metadata sync timeout */ int MetadataSyncInterval = 60000; diff --git a/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out b/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out index 7bffe0b6f..a144f48a5 100644 --- a/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out +++ b/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out @@ -61,6 +61,12 @@ SELECT count(*) FROM referencing_table2; 101 (1 row) +SELECT 1 FROM public.master_defer_delete_shards(); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; name | relid | refd_relid --------------------------------------------------------------------- @@ -102,6 +108,12 @@ SELECT count(*) FROM referencing_table2; 101 (1 row) +SELECT 1 FROM public.master_defer_delete_shards(); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; name | relid | refd_relid --------------------------------------------------------------------- diff --git a/src/test/regress/expected/isolation_blocking_move_multi_shard_commands.out b/src/test/regress/expected/isolation_blocking_move_multi_shard_commands.out index 4e09e34ad..25c680d2f 100644 --- a/src/test/regress/expected/isolation_blocking_move_multi_shard_commands.out +++ b/src/test/regress/expected/isolation_blocking_move_multi_shard_commands.out @@ -11,9 +11,9 @@ step s2-insert: INSERT INTO logical_replicate_placement VALUES (15, 15), (172, 172); step s1-move-placement: - SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; + SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; -step s2-end: +step s2-end: COMMIT; step s1-move-placement: <... completed> @@ -31,7 +31,7 @@ x y 15 15 172 172 step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -49,9 +49,9 @@ step s2-upsert: INSERT INTO logical_replicate_placement VALUES (15, 15), (172, 172) ON CONFLICT (x) DO UPDATE SET y = logical_replicate_placement.y + 1; step s1-move-placement: - SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; + SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; -step s2-end: +step s2-end: COMMIT; step s1-move-placement: <... completed> @@ -69,7 +69,7 @@ x y 15 16 172 173 step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -89,9 +89,9 @@ step s2-update: UPDATE logical_replicate_placement SET y = y + 1; step s1-move-placement: - SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; + SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; -step s2-end: +step s2-end: COMMIT; step s1-move-placement: <... completed> @@ -109,7 +109,7 @@ x y 15 16 172 173 step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -129,9 +129,9 @@ step s2-delete: DELETE FROM logical_replicate_placement; step s1-move-placement: - SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; + SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; -step s2-end: +step s2-end: COMMIT; step s1-move-placement: <... completed> @@ -147,7 +147,7 @@ step s1-select: x y step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -171,20 +171,19 @@ x y 15 15 172 172 step s1-move-placement: - SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; - -step s2-end: - COMMIT; + SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; -step s1-move-placement: <... completed> master_move_shard_placement +step s2-end: + COMMIT; + step s1-end: COMMIT; step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -201,9 +200,9 @@ step s2-copy: COPY logical_replicate_placement FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5\n15,30"' WITH CSV; step s1-move-placement: - SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; + SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; -step s2-end: +step s2-end: COMMIT; step s1-move-placement: <... completed> @@ -225,7 +224,7 @@ x y 5 5 15 30 step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -245,9 +244,9 @@ step s2-truncate: TRUNCATE logical_replicate_placement; step s1-move-placement: - SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; + SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; -step s2-end: +step s2-end: COMMIT; step s1-move-placement: <... completed> @@ -263,7 +262,7 @@ step s1-select: x y step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -280,9 +279,9 @@ step s2-alter-table: ALTER TABLE logical_replicate_placement ADD COLUMN z INT; step s1-move-placement: - SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; + SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; -step s2-end: +step s2-end: COMMIT; step s1-move-placement: <... completed> @@ -298,7 +297,7 @@ step s1-select: x y z step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport diff --git a/src/test/regress/expected/isolation_blocking_move_multi_shard_commands_on_mx.out b/src/test/regress/expected/isolation_blocking_move_multi_shard_commands_on_mx.out index 68cb2c1fb..ad62d5481 100644 --- a/src/test/regress/expected/isolation_blocking_move_multi_shard_commands_on_mx.out +++ b/src/test/regress/expected/isolation_blocking_move_multi_shard_commands_on_mx.out @@ -2,7 +2,7 @@ Parsed test spec with 2 sessions starting permutation: s1-begin s2-start-session-level-connection s2-begin-on-worker s2-insert s1-move-placement s2-commit-worker s1-commit s1-select s1-get-shard-distribution s2-stop-connection step s1-begin: - BEGIN; + BEGIN; step s2-start-session-level-connection: SELECT start_session_level_connection_to_node('localhost', 57638); @@ -23,10 +23,10 @@ run_commands_on_session_level_connection_to_node step s1-move-placement: - SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; + SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; -step s2-commit-worker: - SELECT run_commands_on_session_level_connection_to_node('COMMIT'); +step s2-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); run_commands_on_session_level_connection_to_node @@ -36,7 +36,7 @@ master_move_shard_placement step s1-commit: - COMMIT; + COMMIT; step s1-select: SELECT * FROM logical_replicate_placement order by y; @@ -46,7 +46,7 @@ x y 15 15 172 172 step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -66,7 +66,7 @@ step s1-insert: INSERT INTO logical_replicate_placement VALUES (15, 15), (172, 172); step s1-begin: - BEGIN; + BEGIN; step s2-start-session-level-connection: SELECT start_session_level_connection_to_node('localhost', 57638); @@ -87,10 +87,10 @@ run_commands_on_session_level_connection_to_node step s1-move-placement: - SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; + SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; -step s2-commit-worker: - SELECT run_commands_on_session_level_connection_to_node('COMMIT'); +step s2-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); run_commands_on_session_level_connection_to_node @@ -100,7 +100,7 @@ master_move_shard_placement step s1-commit: - COMMIT; + COMMIT; step s1-select: SELECT * FROM logical_replicate_placement order by y; @@ -110,7 +110,7 @@ x y 15 16 172 173 step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -130,7 +130,7 @@ step s1-insert: INSERT INTO logical_replicate_placement VALUES (15, 15), (172, 172); step s1-begin: - BEGIN; + BEGIN; step s2-start-session-level-connection: SELECT start_session_level_connection_to_node('localhost', 57638); @@ -151,10 +151,10 @@ run_commands_on_session_level_connection_to_node step s1-move-placement: - SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; + SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; -step s2-commit-worker: - SELECT run_commands_on_session_level_connection_to_node('COMMIT'); +step s2-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); run_commands_on_session_level_connection_to_node @@ -164,7 +164,7 @@ master_move_shard_placement step s1-commit: - COMMIT; + COMMIT; step s1-select: SELECT * FROM logical_replicate_placement order by y; @@ -172,7 +172,7 @@ step s1-select: x y step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -192,7 +192,7 @@ step s1-insert: INSERT INTO logical_replicate_placement VALUES (15, 15), (172, 172); step s1-begin: - BEGIN; + BEGIN; step s2-start-session-level-connection: SELECT start_session_level_connection_to_node('localhost', 57638); @@ -213,23 +213,22 @@ run_commands_on_session_level_connection_to_node step s1-move-placement: - SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; - + SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; + +master_move_shard_placement + + step s2-commit-worker: - SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); run_commands_on_session_level_connection_to_node -step s1-move-placement: <... completed> -master_move_shard_placement - - step s1-commit: - COMMIT; + COMMIT; step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport diff --git a/src/test/regress/expected/isolation_blocking_move_single_shard_commands.out b/src/test/regress/expected/isolation_blocking_move_single_shard_commands.out index 800e41aca..15b801533 100644 --- a/src/test/regress/expected/isolation_blocking_move_single_shard_commands.out +++ b/src/test/regress/expected/isolation_blocking_move_single_shard_commands.out @@ -11,9 +11,9 @@ step s2-insert: INSERT INTO logical_replicate_placement VALUES (15, 15); step s1-move-placement: - SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); -step s2-end: +step s2-end: COMMIT; step s1-move-placement: <... completed> @@ -30,7 +30,7 @@ x y 15 15 step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -48,9 +48,9 @@ step s2-upsert: INSERT INTO logical_replicate_placement VALUES (15, 15) ON CONFLICT (x) DO UPDATE SET y = logical_replicate_placement.y + 1; step s1-move-placement: - SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); -step s2-end: +step s2-end: COMMIT; step s1-move-placement: <... completed> @@ -67,7 +67,7 @@ x y 15 16 step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -87,9 +87,9 @@ step s2-update: UPDATE logical_replicate_placement SET y = y + 1 WHERE x = 15; step s1-move-placement: - SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); -step s2-end: +step s2-end: COMMIT; step s1-move-placement: <... completed> @@ -106,7 +106,7 @@ x y 15 16 step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -126,9 +126,9 @@ step s2-delete: DELETE FROM logical_replicate_placement WHERE x = 15; step s1-move-placement: - SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); -step s2-end: +step s2-end: COMMIT; step s1-move-placement: <... completed> @@ -144,7 +144,7 @@ step s1-select: x y step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -167,20 +167,19 @@ x y 15 15 step s1-move-placement: - SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); - -step s2-end: - COMMIT; + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); -step s1-move-placement: <... completed> master_move_shard_placement +step s2-end: + COMMIT; + step s1-end: COMMIT; step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -203,20 +202,19 @@ x y 15 15 step s1-move-placement: - SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); - -step s2-end: - COMMIT; + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); -step s1-move-placement: <... completed> master_move_shard_placement +step s2-end: + COMMIT; + step s1-end: COMMIT; step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport diff --git a/src/test/regress/expected/isolation_blocking_move_single_shard_commands_on_mx.out b/src/test/regress/expected/isolation_blocking_move_single_shard_commands_on_mx.out index 209275253..c89b918f5 100644 --- a/src/test/regress/expected/isolation_blocking_move_single_shard_commands_on_mx.out +++ b/src/test/regress/expected/isolation_blocking_move_single_shard_commands_on_mx.out @@ -25,7 +25,7 @@ run_commands_on_session_level_connection_to_node step s1-move-placement: SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); -step s2-commit-worker: +step s2-commit-worker: SELECT run_commands_on_session_level_connection_to_node('COMMIT'); run_commands_on_session_level_connection_to_node @@ -36,7 +36,7 @@ master_move_shard_placement step s1-commit: - COMMIT; + COMMIT; step s1-select: SELECT * FROM logical_replicate_placement order by y; @@ -45,7 +45,7 @@ x y 15 15 step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -88,7 +88,7 @@ run_commands_on_session_level_connection_to_node step s1-move-placement: SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); -step s2-commit-worker: +step s2-commit-worker: SELECT run_commands_on_session_level_connection_to_node('COMMIT'); run_commands_on_session_level_connection_to_node @@ -99,7 +99,7 @@ master_move_shard_placement step s1-commit: - COMMIT; + COMMIT; step s1-select: SELECT * FROM logical_replicate_placement order by y; @@ -108,7 +108,7 @@ x y 15 16 step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -151,7 +151,7 @@ run_commands_on_session_level_connection_to_node step s1-move-placement: SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); -step s2-commit-worker: +step s2-commit-worker: SELECT run_commands_on_session_level_connection_to_node('COMMIT'); run_commands_on_session_level_connection_to_node @@ -162,7 +162,7 @@ master_move_shard_placement step s1-commit: - COMMIT; + COMMIT; step s1-select: SELECT * FROM logical_replicate_placement order by y; @@ -170,7 +170,7 @@ step s1-select: x y step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -212,22 +212,21 @@ run_commands_on_session_level_connection_to_node step s1-move-placement: SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); - + +master_move_shard_placement + + step s2-commit-worker: SELECT run_commands_on_session_level_connection_to_node('COMMIT'); run_commands_on_session_level_connection_to_node -step s1-move-placement: <... completed> -master_move_shard_placement - - step s1-commit: - COMMIT; + COMMIT; step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport @@ -269,22 +268,21 @@ run_commands_on_session_level_connection_to_node step s1-move-placement: SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); - + +master_move_shard_placement + + step s2-commit-worker: SELECT run_commands_on_session_level_connection_to_node('COMMIT'); run_commands_on_session_level_connection_to_node -step s1-move-placement: <... completed> -master_move_shard_placement - - step s1-commit: - COMMIT; + COMMIT; step s1-get-shard-distribution: - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; nodeport diff --git a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out index 36918f3a6..d4d7338be 100644 --- a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out +++ b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out @@ -5,8 +5,7 @@ step s1-begin: BEGIN; step s1-move-placement: - SET citus.defer_drop_after_shard_move TO ON; - SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_move_shard_placement @@ -34,8 +33,7 @@ step s1-begin: BEGIN; step s1-move-placement: - SET citus.defer_drop_after_shard_move TO ON; - SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_move_shard_placement @@ -62,8 +60,7 @@ step s1-begin: BEGIN; step s1-move-placement: - SET citus.defer_drop_after_shard_move TO ON; - SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_move_shard_placement @@ -119,3 +116,31 @@ run_try_drop_marked_shards step s1-commit: COMMIT; + +starting permutation: s1-begin s2-begin s2-select s1-move-placement-without-deferred s2-commit s1-commit +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s2-select: + SELECT COUNT(*) FROM t1; + +count + +0 +step s1-move-placement-without-deferred: + SET citus.defer_drop_after_shard_move TO OFF; + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + +step s2-commit: + COMMIT; + +step s1-move-placement-without-deferred: <... completed> +master_move_shard_placement + + +step s1-commit: + COMMIT; + diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index 073a9422e..a941d6d2b 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -65,8 +65,8 @@ step s3-progress: table_name shardid shard_size sourcename sourceport source_shard_sizetargetname targetport target_shard_sizeprogress -colocated1 1500001 49152 localhost 57637 0 localhost 57638 49152 2 -colocated2 1500005 376832 localhost 57637 0 localhost 57638 376832 2 +colocated1 1500001 49152 localhost 57637 49152 localhost 57638 49152 2 +colocated2 1500005 376832 localhost 57637 376832 localhost 57638 376832 2 colocated1 1500002 196608 localhost 57637 196608 localhost 57638 0 1 colocated2 1500006 8192 localhost 57637 8192 localhost 57638 0 1 step s2-unlock-2: diff --git a/src/test/regress/expected/multi_colocated_shard_rebalance.out b/src/test/regress/expected/multi_colocated_shard_rebalance.out index 70c4d8f20..939414ef9 100644 --- a/src/test/regress/expected/multi_colocated_shard_rebalance.out +++ b/src/test/regress/expected/multi_colocated_shard_rebalance.out @@ -260,6 +260,7 @@ WHERE p.logicalrelid = s.logicalrelid AND s.shardid = sp.shardid AND colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass) + AND sp.shardstate != 4 ORDER BY s.shardid, sp.nodeport; shardid | logicalrelid | nodeport --------------------------------------------------------------------- @@ -303,6 +304,7 @@ WHERE p.logicalrelid = s.logicalrelid AND s.shardid = sp.shardid AND p.logicalrelid = 'table5_groupX'::regclass + AND sp.shardstate != 4 ORDER BY s.shardid, sp.nodeport; shardid | logicalrelid | nodeport --------------------------------------------------------------------- @@ -331,7 +333,8 @@ FROM WHERE p.logicalrelid = s.logicalrelid AND s.shardid = sp.shardid AND - p.logicalrelid = 'table5_groupX'::regclass + p.logicalrelid = 'table5_groupX'::regclass AND + sp.shardstate != 4 ORDER BY s.shardid, sp.nodeport; shardid | logicalrelid | nodeport --------------------------------------------------------------------- @@ -355,6 +358,7 @@ WHERE p.logicalrelid = s.logicalrelid AND s.shardid = sp.shardid AND p.logicalrelid = 'table6_append'::regclass + AND sp.shardstate != 4 ORDER BY s.shardid, sp.nodeport; shardid | logicalrelid | nodeport --------------------------------------------------------------------- @@ -376,7 +380,8 @@ FROM WHERE p.logicalrelid = s.logicalrelid AND s.shardid = sp.shardid AND - p.logicalrelid = 'table6_append'::regclass + p.logicalrelid = 'table6_append'::regclass AND + sp.shardstate != 4 ORDER BY s.shardid, sp.nodeport; shardid | logicalrelid | nodeport --------------------------------------------------------------------- @@ -386,8 +391,7 @@ ORDER BY s.shardid, sp.nodeport; -- try to move shard from wrong node SELECT master_move_shard_placement(13000021, 'localhost', :worker_1_port, 'localhost', :worker_2_port); -ERROR: could not find placement matching "localhost:xxxxx" -HINT: Confirm the placement still exists and try again. +ERROR: source placement must be in active state -- test shard move with foreign constraints DROP TABLE IF EXISTS table1_group1, table2_group1; SET citus.shard_count TO 6; @@ -418,6 +422,7 @@ WHERE p.logicalrelid = s.logicalrelid AND s.shardid = sp.shardid AND colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass) + AND sp.shardstate != 4 ORDER BY s.shardid, sp.nodeport; shardid | logicalrelid | nodeport --------------------------------------------------------------------- @@ -449,6 +454,7 @@ WHERE p.logicalrelid = s.logicalrelid AND s.shardid = sp.shardid AND colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass) + AND sp.shardstate != 4 ORDER BY s.shardid, sp.nodeport; shardid | logicalrelid | nodeport --------------------------------------------------------------------- @@ -581,6 +587,7 @@ SELECT count(*) FROM move_partitions.events; SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port) FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) WHERE logicalrelid = 'move_partitions.events'::regclass AND nodeport = :worker_2_port +AND shardstate != 4 ORDER BY shardid LIMIT 1; master_move_shard_placement --------------------------------------------------------------------- @@ -598,7 +605,7 @@ ALTER TABLE move_partitions.events_1 ADD CONSTRAINT e_1_pk PRIMARY KEY (id); -- should be able to move automatically now SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port) FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) -WHERE logicalrelid = 'move_partitions.events'::regclass AND nodeport = :worker_2_port +WHERE logicalrelid = 'move_partitions.events'::regclass AND nodeport = :worker_2_port AND shardstate != 4 ORDER BY shardid LIMIT 1; master_move_shard_placement --------------------------------------------------------------------- @@ -614,7 +621,7 @@ SELECT count(*) FROM move_partitions.events; -- should also be able to move with block writes SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) -WHERE logicalrelid = 'move_partitions.events'::regclass AND nodeport = :worker_2_port +WHERE logicalrelid = 'move_partitions.events'::regclass AND nodeport = :worker_2_port AND shardstate != 4 ORDER BY shardid LIMIT 1; master_move_shard_placement --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_move_mx.out b/src/test/regress/expected/multi_move_mx.out index 3d67c97de..777649d1a 100644 --- a/src/test/regress/expected/multi_move_mx.out +++ b/src/test/regress/expected/multi_move_mx.out @@ -141,9 +141,10 @@ SELECT FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE - logicalrelid = 'mx_table_1'::regclass + (logicalrelid = 'mx_table_1'::regclass OR logicalrelid = 'mx_table_2'::regclass - OR logicalrelid = 'mx_table_3'::regclass + OR logicalrelid = 'mx_table_3'::regclass) + AND shardstate != 4 ORDER BY logicalrelid, shardid; logicalrelid | shardid | nodename | nodeport diff --git a/src/test/regress/expected/shard_move_deferred_delete.out b/src/test/regress/expected/shard_move_deferred_delete.out index 3329aaa54..c125e2c53 100644 --- a/src/test/regress/expected/shard_move_deferred_delete.out +++ b/src/test/regress/expected/shard_move_deferred_delete.out @@ -133,6 +133,10 @@ $cmd$); (localhost,57638,t,1) (2 rows) +-- we expect to get an error since the old placement is still there +SELECT master_move_shard_placement(20000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port); +ERROR: shard xxxxx already exists in the target node +DETAIL: The existing shard is marked for deletion, but could not be deleted because there are still active queries on it SELECT run_command_on_workers($cmd$ -- override the function for testing purpose create or replace function pg_catalog.citus_local_disk_space_stats(OUT available_disk_size bigint, OUT total_disk_size bigint) @@ -169,7 +173,6 @@ SELECT master_move_shard_placement(20000001, 'localhost', :worker_2_port, 'local (1 row) ROLLBACK; --- we expect shard xxxxx to be on both of the workers SELECT run_command_on_workers($cmd$ SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; $cmd$); diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index e9d668387..f7a103dfb 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -31,12 +31,24 @@ SELECT rebalance_table_shards('dist_table_test'); (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + SELECT rebalance_table_shards(); rebalance_table_shards --------------------------------------------------------------------- (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + -- test that calling rebalance_table_shards without specifying relation -- wouldn't move shard of the citus local table. CREATE TABLE citus_local_table(a int, b int); @@ -53,6 +65,12 @@ SELECT rebalance_table_shards(); (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; tablename @@ -83,6 +101,12 @@ SELECT pg_sleep(.1); -- wait to make sure the config has changed before running SELECT master_drain_node('localhost', :master_port); ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); pg_reload_conf @@ -102,6 +126,12 @@ SELECT master_drain_node('localhost', :master_port); (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; tablename @@ -404,6 +434,7 @@ SELECT master_create_distributed_table('replication_test_table', 'int_column', ' CREATE VIEW replication_test_table_placements_per_node AS SELECT count(*) FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard WHERE logicalrelid = 'replication_test_table'::regclass + AND shardstate != 4 GROUP BY nodename, nodeport ORDER BY nodename, nodeport; -- Create four shards with replication factor 2, and delete the placements @@ -526,6 +557,7 @@ SELECT master_create_distributed_table('rebalance_test_table', 'int_column', 'ap CREATE VIEW table_placements_per_node AS SELECT nodeport, logicalrelid::regclass, count(*) FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard +WHERE shardstate != 4 GROUP BY logicalrelid::regclass, nodename, nodeport ORDER BY logicalrelid::regclass, nodename, nodeport; -- Create six shards with replication factor 1 and move them to the same @@ -546,6 +578,7 @@ AS $$ pg_dist_shard_placement src USING (shardid), (SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass; + SELECT public.master_defer_delete_shards(); $$; CALL create_unbalanced_shards('rebalance_test_table'); SET citus.shard_replication_factor TO 2; @@ -590,6 +623,12 @@ FROM ( WHERE logicalrelid = 'rebalance_test_table'::regclass ) T; ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); pg_reload_conf @@ -618,6 +657,12 @@ FROM ( (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 1 +(1 row) + SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -669,6 +714,12 @@ SELECT * FROM table_placements_per_node; 57638 | rebalance_test_table | 5 (2 rows) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, max_shard_moves := 1, shard_transfer_mode:='block_writes'); @@ -677,6 +728,12 @@ SELECT rebalance_table_shards('rebalance_test_table', (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 1 +(1 row) + SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -691,6 +748,12 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 1, shard_tran (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -705,6 +768,12 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 0); (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 1 +(1 row) + SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -720,6 +789,12 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_tran (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -893,6 +968,12 @@ SELECT COUNT(*) FROM imbalanced_table; -- Try force_logical SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='force_logical'); ERROR: the force_logical transfer mode is currently unsupported +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + -- Test rebalance operation SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes'); rebalance_table_shards @@ -900,6 +981,12 @@ SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_m (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 1 +(1 row) + -- Confirm rebalance -- Shard counts in each node after rebalance SELECT * FROM public.table_placements_per_node; @@ -936,6 +1023,12 @@ FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; ERROR: Moving shards to a non-existing node is not supported HINT: Add the target node via SELECT citus_add_node('localhost', 10000); +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + -- Try to move shards to a node where shards are not allowed SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); master_set_node_property @@ -979,6 +1072,12 @@ WHERE nodeport = :worker_2_port; (2 rows) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 2 +(1 row) + SELECT create_distributed_table('colocated_rebalance_test2', 'id'); create_distributed_table --------------------------------------------------------------------- @@ -1006,6 +1105,12 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + -- Confirm that nothing changed SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count @@ -1047,6 +1152,12 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 4 +(1 row) + -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size @@ -1104,6 +1215,12 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 4 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1126,6 +1243,12 @@ SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold : (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 2 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1147,6 +1270,12 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 4 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1163,6 +1292,12 @@ SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold : (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 2 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1199,6 +1334,12 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 6 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1220,6 +1361,12 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 6 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1256,6 +1403,12 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 6 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1277,6 +1430,12 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 6 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1302,6 +1461,12 @@ SELECT * from master_drain_node('localhost', :worker_2_port, shard_transfer_mode (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 6 +(1 row) + select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port; shouldhaveshards --------------------------------------------------------------------- @@ -1329,6 +1494,12 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 6 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1435,6 +1606,12 @@ SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes') (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1449,6 +1626,12 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 1 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1464,6 +1647,12 @@ DETAIL: Using threshold of 0.01 (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1569,6 +1758,12 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 4 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1663,6 +1858,12 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 3 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1686,6 +1887,12 @@ SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes') (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1735,6 +1942,12 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 4 +(1 row) + SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1759,8 +1972,20 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'non_ ERROR: could not find rebalance strategy with name non_existing SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing'); ERROR: could not find rebalance strategy with name non_existing +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing'); ERROR: could not find rebalance strategy with name non_existing +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + SELECT citus_set_default_rebalance_strategy('non_existing'); ERROR: strategy with specified name does not exist UPDATE pg_dist_rebalance_strategy SET default_strategy=false; @@ -1768,8 +1993,20 @@ SELECT * FROM get_rebalance_table_shards_plan('tab'); ERROR: no rebalance_strategy was provided, but there is also no default strategy set SELECT * FROM rebalance_table_shards('tab'); ERROR: no rebalance_strategy was provided, but there is also no default strategy set +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + SELECT * FROM master_drain_node('localhost', :worker_2_port); ERROR: no rebalance_strategy was provided, but there is also no default strategy set +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count'; CREATE OR REPLACE FUNCTION shard_cost_no_arguments() RETURNS real AS $$ SELECT 1.0::real $$ LANGUAGE sql; @@ -2041,6 +2278,12 @@ SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='bloc (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 3 +(1 row) + SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; count --------------------------------------------------------------------- @@ -2102,6 +2345,12 @@ SELECT rebalance_table_shards(); (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 2 +(1 row) + DROP TABLE t1, r1, r2; -- verify there are no distributed tables before we perform the following tests. Preceding -- test suites should clean up their distributed tables. @@ -2150,6 +2399,12 @@ SELECT rebalance_table_shards(); (1 row) +SELECT public.master_defer_delete_shards(); + master_defer_delete_shards +--------------------------------------------------------------------- + 0 +(1 row) + -- verify the reference table is on all nodes after the rebalance SELECT count(*) FROM pg_dist_shard diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index a2c5bb252..c95bf9e2f 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -442,6 +442,7 @@ push(@pgOptions, "wal_retrieve_retry_interval=1000"); push(@pgOptions, "citus.shard_count=4"); push(@pgOptions, "citus.max_adaptive_executor_pool_size=4"); push(@pgOptions, "citus.shard_max_size=1500kB"); +push(@pgOptions, "citus.defer_shard_delete_interval=-1"); push(@pgOptions, "citus.repartition_join_bucket_count_per_node=2"); push(@pgOptions, "citus.sort_returning='on'"); push(@pgOptions, "citus.shard_replication_factor=2"); diff --git a/src/test/regress/spec/isolation_blocking_move_multi_shard_commands.spec b/src/test/regress/spec/isolation_blocking_move_multi_shard_commands.spec index ba534046b..45f5fed84 100644 --- a/src/test/regress/spec/isolation_blocking_move_multi_shard_commands.spec +++ b/src/test/regress/spec/isolation_blocking_move_multi_shard_commands.spec @@ -7,7 +7,8 @@ setup SELECT citus_internal.refresh_isolation_tester_prepared_statement(); SET citus.shard_count TO 8; - SET citus.shard_replication_factor TO 1; + SET citus.shard_replication_factor TO 1; + CREATE TABLE logical_replicate_placement (x int PRIMARY KEY, y int); SELECT create_distributed_table('logical_replicate_placement', 'x'); @@ -33,7 +34,7 @@ step "s1-begin" step "s1-move-placement" { - SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; + SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; } step "s1-end" @@ -53,7 +54,7 @@ step "s1-insert" step "s1-get-shard-distribution" { - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; } session "s2" diff --git a/src/test/regress/spec/isolation_blocking_move_multi_shard_commands_on_mx.spec b/src/test/regress/spec/isolation_blocking_move_multi_shard_commands_on_mx.spec index ac26a5f2c..6f653ab54 100644 --- a/src/test/regress/spec/isolation_blocking_move_multi_shard_commands_on_mx.spec +++ b/src/test/regress/spec/isolation_blocking_move_multi_shard_commands_on_mx.spec @@ -34,6 +34,7 @@ setup SET citus.replication_model to streaming; SET citus.shard_replication_factor TO 1; + SET citus.shard_count TO 8; SET citus.shard_replication_factor TO 1; CREATE TABLE logical_replicate_placement (x int PRIMARY KEY, y int); @@ -60,7 +61,7 @@ step "s1-begin" step "s1-move-placement" { - SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; + SELECT master_move_shard_placement(get_shard_id_for_distribution_column, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes') FROM selected_shard; } step "s1-commit" @@ -80,7 +81,7 @@ step "s1-insert" step "s1-get-shard-distribution" { - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; } session "s2" diff --git a/src/test/regress/spec/isolation_blocking_move_single_shard_commands.spec b/src/test/regress/spec/isolation_blocking_move_single_shard_commands.spec index f1250010f..f125904c0 100644 --- a/src/test/regress/spec/isolation_blocking_move_single_shard_commands.spec +++ b/src/test/regress/spec/isolation_blocking_move_single_shard_commands.spec @@ -7,6 +7,7 @@ setup SET citus.shard_count TO 8; SET citus.shard_replication_factor TO 1; + CREATE TABLE logical_replicate_placement (x int PRIMARY KEY, y int); SELECT create_distributed_table('logical_replicate_placement', 'x'); @@ -31,7 +32,7 @@ step "s1-begin" step "s1-move-placement" { - SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); } step "s1-end" @@ -51,7 +52,7 @@ step "s1-insert" step "s1-get-shard-distribution" { - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; } session "s2" diff --git a/src/test/regress/spec/isolation_blocking_move_single_shard_commands_on_mx.spec b/src/test/regress/spec/isolation_blocking_move_single_shard_commands_on_mx.spec index d0a3f323f..20746af7e 100644 --- a/src/test/regress/spec/isolation_blocking_move_single_shard_commands_on_mx.spec +++ b/src/test/regress/spec/isolation_blocking_move_single_shard_commands_on_mx.spec @@ -33,6 +33,7 @@ setup SET citus.replication_model to streaming; SET citus.shard_replication_factor TO 1; + SET citus.shard_count TO 8; CREATE TABLE logical_replicate_placement (x int PRIMARY KEY, y int); SELECT create_distributed_table('logical_replicate_placement', 'x'); @@ -78,7 +79,7 @@ step "s1-insert" step "s1-get-shard-distribution" { - select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardid in (SELECT * FROM selected_shard) order by nodeport; + select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 and shardid in (SELECT * FROM selected_shard) order by nodeport; } session "s2" diff --git a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec index 554068822..ee225f13a 100644 --- a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec +++ b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec @@ -36,7 +36,6 @@ COMMENT ON FUNCTION master_defer_delete_shards() SET citus.next_shard_id to 120000; SET citus.shard_count TO 8; SET citus.shard_replication_factor TO 1; - SET citus.defer_drop_after_shard_move TO ON; CREATE TABLE t1 (x int PRIMARY KEY, y int); SELECT create_distributed_table('t1', 'x'); @@ -61,8 +60,13 @@ step "s1-begin" step "s1-move-placement" { - SET citus.defer_drop_after_shard_move TO ON; - SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); +} + +step "s1-move-placement-without-deferred" { + SET citus.defer_drop_after_shard_move TO OFF; + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + } step "s1-drop-marked-shards" @@ -81,6 +85,10 @@ step "s1-commit" session "s2" +step "s2-begin" { + BEGIN; +} + step "s2-drop-old-shards" { SELECT run_try_drop_marked_shards(); } @@ -101,15 +109,24 @@ step "s2-lock-table-on-worker" SELECT run_commands_on_session_level_connection_to_node('LOCK TABLE t1_120000'); } +step "s2-select" { + SELECT COUNT(*) FROM t1; +} + step "s2-drop-marked-shards" { SET client_min_messages to DEBUG1; SELECT public.master_defer_delete_shards(); } +step "s2-commit" { + COMMIT; +} + permutation "s1-begin" "s1-move-placement" "s1-drop-marked-shards" "s2-drop-marked-shards" "s1-commit" permutation "s1-begin" "s1-move-placement" "s2-drop-marked-shards" "s1-drop-marked-shards" "s1-commit" permutation "s1-begin" "s1-move-placement" "s2-start-session-level-connection" "s2-lock-table-on-worker" "s1-drop-marked-shards" "s1-commit" "s2-stop-connection" // make sure we error if we cannot get the lock on pg_dist_placement permutation "s1-begin" "s1-lock-pg-dist-placement" "s2-drop-old-shards" "s1-commit" +permutation "s1-begin" "s2-begin" "s2-select" "s1-move-placement-without-deferred" "s2-commit" "s1-commit" diff --git a/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql b/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql index 390ad7357..6df09f061 100644 --- a/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql +++ b/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql @@ -8,6 +8,7 @@ SET search_path to fkey_to_reference_shard_rebalance; SET citus.shard_replication_factor TO 1; SET citus.shard_count to 8; + CREATE TYPE foreign_details AS (name text, relid text, refd_relid text); CREATE VIEW table_fkeys_in_workers AS @@ -44,12 +45,14 @@ SELECT master_move_shard_placement(15000009, 'localhost', :worker_1_port, 'local SELECT count(*) FROM referencing_table2; +SELECT 1 FROM public.master_defer_delete_shards(); SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; SELECT master_move_shard_placement(15000009, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); SELECT count(*) FROM referencing_table2; +SELECT 1 FROM public.master_defer_delete_shards(); SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; -- create a function to show the diff --git a/src/test/regress/sql/multi_colocated_shard_rebalance.sql b/src/test/regress/sql/multi_colocated_shard_rebalance.sql index 46de57776..0c6d5da17 100644 --- a/src/test/regress/sql/multi_colocated_shard_rebalance.sql +++ b/src/test/regress/sql/multi_colocated_shard_rebalance.sql @@ -7,6 +7,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13000000; SET citus.shard_count TO 6; SET citus.shard_replication_factor TO 1; + -- create distributed tables CREATE TABLE table1_group1 ( id int PRIMARY KEY); SELECT create_distributed_table('table1_group1', 'id', 'hash'); @@ -59,6 +60,7 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table1_ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table2_group1_13000006'::regclass; \c - - - :master_port + -- copy colocated shards again to see error message SELECT master_copy_shard_placement(13000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, false, 'force_logical'); @@ -140,6 +142,7 @@ WHERE p.logicalrelid = s.logicalrelid AND s.shardid = sp.shardid AND colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass) + AND sp.shardstate != 4 ORDER BY s.shardid, sp.nodeport; -- also connect worker to verify we successfully moved given shard (and other colocated shards) @@ -149,6 +152,7 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table2_ \c - - - :master_port + -- test moving NOT colocated shard -- status before shard move SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport @@ -158,6 +162,7 @@ WHERE p.logicalrelid = s.logicalrelid AND s.shardid = sp.shardid AND p.logicalrelid = 'table5_groupX'::regclass + AND sp.shardstate != 4 ORDER BY s.shardid, sp.nodeport; -- move NOT colocated shard @@ -170,7 +175,8 @@ FROM WHERE p.logicalrelid = s.logicalrelid AND s.shardid = sp.shardid AND - p.logicalrelid = 'table5_groupX'::regclass + p.logicalrelid = 'table5_groupX'::regclass AND + sp.shardstate != 4 ORDER BY s.shardid, sp.nodeport; @@ -183,6 +189,7 @@ WHERE p.logicalrelid = s.logicalrelid AND s.shardid = sp.shardid AND p.logicalrelid = 'table6_append'::regclass + AND sp.shardstate != 4 ORDER BY s.shardid, sp.nodeport; -- move shard in append distributed table @@ -195,7 +202,8 @@ FROM WHERE p.logicalrelid = s.logicalrelid AND s.shardid = sp.shardid AND - p.logicalrelid = 'table6_append'::regclass + p.logicalrelid = 'table6_append'::regclass AND + sp.shardstate != 4 ORDER BY s.shardid, sp.nodeport; @@ -228,6 +236,7 @@ WHERE p.logicalrelid = s.logicalrelid AND s.shardid = sp.shardid AND colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass) + AND sp.shardstate != 4 ORDER BY s.shardid, sp.nodeport; SELECT master_move_shard_placement(13000022, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes'); @@ -240,6 +249,7 @@ WHERE p.logicalrelid = s.logicalrelid AND s.shardid = sp.shardid AND colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass) + AND sp.shardstate != 4 ORDER BY s.shardid, sp.nodeport; -- also connect worker to verify we successfully moved given shard (and other colocated shards) @@ -254,6 +264,7 @@ SELECT "Constraint", "Definition" FROM table_fkeys \c - - - :master_port + -- test shard copy with foreign constraints -- we expect it to error out because we do not support foreign constraints with replication factor > 1 SELECT master_copy_shard_placement(13000022, 'localhost', :worker_2_port, 'localhost', :worker_1_port, false); @@ -305,6 +316,7 @@ SELECT count(*) FROM move_partitions.events; SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port) FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) WHERE logicalrelid = 'move_partitions.events'::regclass AND nodeport = :worker_2_port +AND shardstate != 4 ORDER BY shardid LIMIT 1; SELECT count(*) FROM move_partitions.events; @@ -315,7 +327,7 @@ ALTER TABLE move_partitions.events_1 ADD CONSTRAINT e_1_pk PRIMARY KEY (id); -- should be able to move automatically now SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port) FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) -WHERE logicalrelid = 'move_partitions.events'::regclass AND nodeport = :worker_2_port +WHERE logicalrelid = 'move_partitions.events'::regclass AND nodeport = :worker_2_port AND shardstate != 4 ORDER BY shardid LIMIT 1; SELECT count(*) FROM move_partitions.events; @@ -323,7 +335,7 @@ SELECT count(*) FROM move_partitions.events; -- should also be able to move with block writes SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) -WHERE logicalrelid = 'move_partitions.events'::regclass AND nodeport = :worker_2_port +WHERE logicalrelid = 'move_partitions.events'::regclass AND nodeport = :worker_2_port AND shardstate != 4 ORDER BY shardid LIMIT 1; SELECT count(*) FROM move_partitions.events; diff --git a/src/test/regress/sql/multi_move_mx.sql b/src/test/regress/sql/multi_move_mx.sql index c317a08d7..35432d95d 100644 --- a/src/test/regress/sql/multi_move_mx.sql +++ b/src/test/regress/sql/multi_move_mx.sql @@ -87,9 +87,10 @@ SELECT FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE - logicalrelid = 'mx_table_1'::regclass + (logicalrelid = 'mx_table_1'::regclass OR logicalrelid = 'mx_table_2'::regclass - OR logicalrelid = 'mx_table_3'::regclass + OR logicalrelid = 'mx_table_3'::regclass) + AND shardstate != 4 ORDER BY logicalrelid, shardid; diff --git a/src/test/regress/sql/shard_move_deferred_delete.sql b/src/test/regress/sql/shard_move_deferred_delete.sql index 1b5ecb5a5..ac82b0680 100644 --- a/src/test/regress/sql/shard_move_deferred_delete.sql +++ b/src/test/regress/sql/shard_move_deferred_delete.sql @@ -70,6 +70,9 @@ SELECT run_command_on_workers($cmd$ SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; $cmd$); +-- we expect to get an error since the old placement is still there +SELECT master_move_shard_placement(20000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port); + SELECT run_command_on_workers($cmd$ -- override the function for testing purpose @@ -95,8 +98,6 @@ set citus.check_available_space_before_move to false; SELECT master_move_shard_placement(20000001, 'localhost', :worker_2_port, 'localhost', :worker_1_port); ROLLBACK; - --- we expect shard 0 to be on both of the workers SELECT run_command_on_workers($cmd$ SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; $cmd$); diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index 27146caaa..ba5696e00 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -13,7 +13,10 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0); -- should just be noops even if we add the coordinator to the pg_dist_node SELECT rebalance_table_shards('dist_table_test'); +SELECT public.master_defer_delete_shards(); SELECT rebalance_table_shards(); +SELECT public.master_defer_delete_shards(); + -- test that calling rebalance_table_shards without specifying relation -- wouldn't move shard of the citus local table. @@ -22,6 +25,7 @@ SELECT citus_add_local_table_to_metadata('citus_local_table'); INSERT INTO citus_local_table VALUES (1, 2); SELECT rebalance_table_shards(); +SELECT public.master_defer_delete_shards(); -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; @@ -34,12 +38,14 @@ SELECT pg_reload_conf(); SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC SELECT master_drain_node('localhost', :master_port); +SELECT public.master_defer_delete_shards(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC SELECT master_drain_node('localhost', :master_port); +SELECT public.master_defer_delete_shards(); -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; @@ -286,6 +292,7 @@ SELECT master_create_distributed_table('replication_test_table', 'int_column', ' CREATE VIEW replication_test_table_placements_per_node AS SELECT count(*) FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard WHERE logicalrelid = 'replication_test_table'::regclass + AND shardstate != 4 GROUP BY nodename, nodeport ORDER BY nodename, nodeport; @@ -364,6 +371,7 @@ SELECT master_create_distributed_table('rebalance_test_table', 'int_column', 'ap CREATE VIEW table_placements_per_node AS SELECT nodeport, logicalrelid::regclass, count(*) FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard +WHERE shardstate != 4 GROUP BY logicalrelid::regclass, nodename, nodeport ORDER BY logicalrelid::regclass, nodename, nodeport; @@ -386,6 +394,7 @@ AS $$ pg_dist_shard_placement src USING (shardid), (SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass; + SELECT public.master_defer_delete_shards(); $$; CALL create_unbalanced_shards('rebalance_test_table'); @@ -417,6 +426,7 @@ FROM ( FROM pg_dist_shard WHERE logicalrelid = 'rebalance_test_table'::regclass ) T; +SELECT public.master_defer_delete_shards(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); @@ -433,6 +443,7 @@ FROM ( FROM pg_dist_shard WHERE logicalrelid = 'rebalance_test_table'::regclass ) T; +SELECT public.master_defer_delete_shards(); SELECT * FROM table_placements_per_node; @@ -467,22 +478,26 @@ SELECT rebalance_table_shards('rebalance_test_table', RESET ROLE; -- Confirm no moves took place at all during these errors SELECT * FROM table_placements_per_node; +SELECT public.master_defer_delete_shards(); SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, max_shard_moves := 1, shard_transfer_mode:='block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM table_placements_per_node; -- Check that threshold=1 doesn't move any shards SELECT rebalance_table_shards('rebalance_test_table', threshold := 1, shard_transfer_mode:='block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM table_placements_per_node; -- Move the remaining shards using threshold=0 SELECT rebalance_table_shards('rebalance_test_table', threshold := 0); +SELECT public.master_defer_delete_shards(); SELECT * FROM table_placements_per_node; @@ -490,6 +505,7 @@ SELECT * FROM table_placements_per_node; -- any effects. SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_transfer_mode:='block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM table_placements_per_node; @@ -584,9 +600,11 @@ SELECT COUNT(*) FROM imbalanced_table; -- Try force_logical SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='force_logical'); +SELECT public.master_defer_delete_shards(); -- Test rebalance operation SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes'); +SELECT public.master_defer_delete_shards(); -- Confirm rebalance -- Shard counts in each node after rebalance @@ -613,6 +631,7 @@ SELECT create_distributed_table('colocated_rebalance_test', 'id'); SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', 10000, 'block_writes') FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +SELECT public.master_defer_delete_shards(); -- Try to move shards to a node where shards are not allowed SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); @@ -639,6 +658,7 @@ UPDATE pg_dist_node SET noderole = 'primary' WHERE nodeport = :worker_1_port; SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +SELECT public.master_defer_delete_shards(); SELECT create_distributed_table('colocated_rebalance_test2', 'id'); @@ -649,6 +669,7 @@ SELECT * FROM public.table_placements_per_node; SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0, drain_only := true); -- Running with drain_only shouldn't do anything SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true); +SELECT public.master_defer_delete_shards(); -- Confirm that nothing changed SELECT * FROM public.table_placements_per_node; @@ -661,6 +682,7 @@ SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', rebala SELECT * FROM get_rebalance_progress(); -- Actually do the rebalance SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); +SELECT public.master_defer_delete_shards(); -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); @@ -678,18 +700,22 @@ SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaves SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0); SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM get_rebalance_table_shards_plan('non_colocated_rebalance_test', threshold := 0); SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; -- testing behaviour when setting shouldhaveshards to false and rebalancing all @@ -697,11 +723,13 @@ SELECT * FROM public.table_placements_per_node; SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); SELECT * FROM get_rebalance_table_shards_plan(threshold := 0, drain_only := true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; -- testing behaviour when setting shouldhaveshards to false and rebalancing all @@ -709,11 +737,13 @@ SELECT * FROM public.table_placements_per_node; SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); SELECT * FROM get_rebalance_table_shards_plan(threshold := 0); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; -- Make it a data node again @@ -721,12 +751,14 @@ SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaves -- testing behaviour of master_drain_node SELECT * from master_drain_node('localhost', :worker_2_port, shard_transfer_mode := 'block_writes'); +SELECT public.master_defer_delete_shards(); select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port; SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; @@ -795,12 +827,15 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_d SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', threshold := 0); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes', threshold := 0); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; -- Check that sizes of colocated tables are added together for rebalances @@ -851,6 +886,7 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_d -- supports improvement_threshold SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', improvement_threshold := 0); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; ANALYZE tab, tab2; @@ -907,11 +943,13 @@ SELECT citus_add_rebalance_strategy( SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_2'); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_2', shard_transfer_mode:='block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; SELECT citus_set_default_rebalance_strategy('capacity_high_worker_2'); SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; CREATE FUNCTION only_worker_1(shardid bigint, nodeidarg int) @@ -932,6 +970,7 @@ SELECT citus_add_rebalance_strategy( SELECT citus_set_default_rebalance_strategy('only_worker_1'); SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); +SELECT public.master_defer_delete_shards(); SELECT * FROM public.table_placements_per_node; SELECT citus_set_default_rebalance_strategy('by_shard_count'); @@ -940,14 +979,18 @@ SELECT * FROM get_rebalance_table_shards_plan('tab'); -- Check all the error handling cases SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'non_existing'); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing'); +SELECT public.master_defer_delete_shards(); SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing'); +SELECT public.master_defer_delete_shards(); SELECT citus_set_default_rebalance_strategy('non_existing'); UPDATE pg_dist_rebalance_strategy SET default_strategy=false; SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM rebalance_table_shards('tab'); +SELECT public.master_defer_delete_shards(); SELECT * FROM master_drain_node('localhost', :worker_2_port); +SELECT public.master_defer_delete_shards(); UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count'; CREATE OR REPLACE FUNCTION shard_cost_no_arguments() @@ -1172,6 +1215,7 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0); SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='block_writes'); +SELECT public.master_defer_delete_shards(); SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; @@ -1204,6 +1248,7 @@ INSERT INTO r2 VALUES (1,2), (3,4); SELECT 1 from master_add_node('localhost', :worker_2_port); SELECT rebalance_table_shards(); +SELECT public.master_defer_delete_shards(); DROP TABLE t1, r1, r2; @@ -1230,6 +1275,7 @@ WHERE logicalrelid = 'r1'::regclass; -- rebalance with _only_ a reference table, this should trigger the copy SELECT rebalance_table_shards(); +SELECT public.master_defer_delete_shards(); -- verify the reference table is on all nodes after the rebalance SELECT count(*)