From 60eb67b908911ff9a96536d8e5ae997f2c032399 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Mon, 17 Oct 2022 17:32:28 +0200 Subject: [PATCH] Increase shard move test coverage by improving advisory locks (#6429) To be able to test non-blocking shard moves we take an advisory lock, so we can pause the shard move at an interesting moment. Originally this was during the logical replication catch up phase. But when I added tests for the rebalancer progress I moved this lock before the initial data copy. This allowed testing of the rebalance progress, but inadvertently made our non-blocking tests not actually test if we held unintended locks during logical replication catch up. This fixes that by creating two types of advisory locks, one before the copy and one after. This causes the tests to actually test their intended scenario again. Furthermore it starts using one of these locks for blocking shard moves too. Which allowed me to reduce the complexity of the rebalance progress test suite quite a bit. It also allowed enabling some flaky tests again, because this stopped them from being flaky. And finally it allowed testing of rebalance progress for blocking shard copy operations as well. In passing it fixes a flaky test during parallel blocking shard moves by ordering the output. --- .../distributed/operations/shard_split.c | 5 +- .../distributed/operations/shard_transfer.c | 2 + .../replication/multi_logical_replication.c | 57 +- .../distributed/multi_logical_replication.h | 3 +- .../isolation_shard_rebalancer_progress.out | 861 ++++++++++++------ .../isolation_shard_rebalancer_progress.spec | 141 ++- 6 files changed, 696 insertions(+), 373 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 84ee97cb6..14e250af9 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -610,12 +610,11 @@ BlockingShardSplit(SplitOperation splitOperation, /* For Blocking split, copy isn't snapshotted */ char *snapshotName = NULL; + ConflictWithIsolationTestingBeforeCopy(); DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList, snapshotName, distributionColumnOverrides); - - /* Used for testing */ - ConflictOnlyWithIsolationTesting(); + ConflictWithIsolationTestingAfterCopy(); ereport(LOG, (errmsg( "creating auxillary structures (indexes, stats, replicaindentities, triggers) for %s", diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 6bab424d7..5d50417cc 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -1289,7 +1289,9 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, sourceNodePort, PLACEMENT_UPDATE_STATUS_COPYING_DATA); + ConflictWithIsolationTestingBeforeCopy(); CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); + ConflictWithIsolationTestingAfterCopy(); UpdatePlacementUpdateStatusForShardIntervalList( shardIntervalList, diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 43a48c1c3..bb9c7d221 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -236,7 +236,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo logicalRepTargetList); /* only useful for isolation testing, see the function comment for the details */ - ConflictOnlyWithIsolationTesting(); + ConflictWithIsolationTestingBeforeCopy(); /* * We have to create the primary key (or any other replica identity) @@ -427,7 +427,7 @@ CompleteNonBlockingShardTransfer(List *shardList, /* only useful for isolation testing, see the function comment for the details */ - ConflictOnlyWithIsolationTesting(); + ConflictWithIsolationTestingAfterCopy(); /* * We're almost done, we'll block the writes to the shards that we're @@ -1269,18 +1269,16 @@ CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList) /* - * ConflictOnlyWithIsolationTesting is only useful for testing and should - * not be called by any code-path except for LogicallyReplicateShards(). - * - * Since logically replicating shards does eventually block modifications, - * it becomes tricky to use isolation tester to show concurrent behaviour - * of online shard rebalancing and modification queries. + * ConflictWithIsolationTestingBeforeCopy is only useful to test + * get_rebalance_progress by pausing before doing the actual copy. This way we + * can see the state of the tables at that point. This should not be called by + * any code-path except for code paths to move and split shards(). * * Note that since the cost of calling this function is pretty low, we prefer * to use it in non-assert builds as well not to diverge in the behaviour. */ extern void -ConflictOnlyWithIsolationTesting() +ConflictWithIsolationTestingBeforeCopy(void) { LOCKTAG tag; const bool sessionLock = false; @@ -1288,11 +1286,46 @@ ConflictOnlyWithIsolationTesting() if (RunningUnderIsolationTest) { - /* we've picked random keys */ - SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, SHARD_MOVE_ADVISORY_LOCK_FIRST_KEY, + SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, + SHARD_MOVE_ADVISORY_LOCK_SECOND_KEY, + SHARD_MOVE_ADVISORY_LOCK_FIRST_KEY, 2); + + /* uses sharelock so concurrent moves don't conflict with eachother */ + (void) LockAcquire(&tag, ShareLock, sessionLock, dontWait); + } +} + + +/* + * ConflictWithIsolationTestingAfterCopy is only useful for two types of tests. + * 1. Testing the output of get_rebalance_progress after the copy is completed, + * but before the move is completely finished. Because finishing the move + * will clear the contents of get_rebalance_progress. + * 2. To test that our non-blocking shard moves/splits actually don't block + * writes. Since logically replicating shards does eventually block + * modifications, it becomes tricky to use isolation tester to show + * concurrent behaviour of online shard rebalancing and modification + * queries. So, during logical replication we call this function at + * the end of the catchup, right before blocking writes. + * + * Note that since the cost of calling this function is pretty low, we prefer + * to use it in non-assert builds as well not to diverge in the behaviour. + */ +extern void +ConflictWithIsolationTestingAfterCopy(void) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = false; + + if (RunningUnderIsolationTest) + { + SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, + SHARD_MOVE_ADVISORY_LOCK_FIRST_KEY, SHARD_MOVE_ADVISORY_LOCK_SECOND_KEY, 2); - (void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); + /* uses sharelock so concurrent moves don't conflict with eachother */ + (void) LockAcquire(&tag, ShareLock, sessionLock, dontWait); } } diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index f042b4326..16ac4ac02 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -131,7 +131,8 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, char *targetNodeName, int targetNodePort); -extern void ConflictOnlyWithIsolationTesting(void); +extern void ConflictWithIsolationTestingBeforeCopy(void); +extern void ConflictWithIsolationTestingAfterCopy(void); extern void CreateReplicaIdentities(List *subscriptionInfoList); extern void CreateReplicaIdentitiesOnNode(List *shardList, char *nodeName, diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index 2b29868c9..ba210da34 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -1,19 +1,20 @@ -Parsed test spec with 7 sessions +Parsed test spec with 6 sessions -starting permutation: s2-lock-1-start s1-rebalance-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress enable-deferred-drop +starting permutation: s5-acquire-advisory-lock-before-copy s1-rebalance-c1-block-writes s7-get-progress s5-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s2-lock-1-start: - BEGIN; - DELETE FROM colocated1 WHERE test_id = 1; - DELETE FROM separate WHERE test_id = 1; +step s5-acquire-advisory-lock-before-copy: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) step s1-rebalance-c1-block-writes: - BEGIN; - SELECT * FROM get_rebalance_table_shards_plan('colocated1'); SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); step s7-get-progress: @@ -39,32 +40,27 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f |Setting Up -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move |t |t |f |Setting Up +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) -step s2-unlock-1-start: - ROLLBACK; +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-rebalance-c1-block-writes: <... completed> -table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport ---------------------------------------------------------------------- -colocated1|1500001| 0|localhost | 57637|localhost | 57638 -colocated2|1500005| 0|localhost | 57637|localhost | 57638 -colocated1|1500002| 0|localhost | 57637|localhost | 57638 -colocated2|1500006| 0|localhost | 57637|localhost | 57638 -(4 rows) - rebalance_table_shards --------------------------------------------------------------------- (1 row) -step s1-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -90,11 +86,8 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s3-lock-2-start s1-rebalance-c1-block-writes s7-get-progress s3-unlock-2-start s1-commit s7-get-progress enable-deferred-drop +starting permutation: s3-lock-2-start s1-rebalance-c1-block-writes s7-get-progress s3-unlock-2-start s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -105,8 +98,6 @@ step s3-lock-2-start: DELETE FROM colocated1 WHERE test_id = 3; step s1-rebalance-c1-block-writes: - BEGIN; - SELECT * FROM get_rebalance_table_shards_plan('colocated1'); SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); step s7-get-progress: @@ -132,8 +123,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 0|localhost | 57638| 50000| 2|move |t |t |f |Completed -colocated2|1500005| 400000|localhost | 57637| 0|localhost | 57638| 400000| 2|move |t |t |f |Completed +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 2|move |t |t |f |Completed +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 2|move |t |t |f |Completed colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 1|move |t |t |f |Setting Up colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 1|move |t |t |f |Setting Up (4 rows) @@ -142,22 +133,12 @@ step s3-unlock-2-start: ROLLBACK; step s1-rebalance-c1-block-writes: <... completed> -table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport ---------------------------------------------------------------------- -colocated1|1500001| 0|localhost | 57637|localhost | 57638 -colocated2|1500005| 0|localhost | 57637|localhost | 57638 -colocated1|1500002| 0|localhost | 57637|localhost | 57638 -colocated2|1500006| 0|localhost | 57637|localhost | 57638 -(4 rows) - rebalance_table_shards --------------------------------------------------------------------- (1 row) -step s1-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -183,35 +164,22 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s7-grab-lock s1-rebalance-c1-block-writes s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock-after-copy s1-rebalance-c1-block-writes s7-get-progress s6-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s7-grab-lock: - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; +step s6-acquire-advisory-lock-after-copy: + SELECT pg_advisory_lock(44000, 55152); -?column? +pg_advisory_lock --------------------------------------------------------------------- - 1 -(1 row) -?column? ---------------------------------------------------------------------- - 1 (1 row) step s1-rebalance-c1-block-writes: - BEGIN; - SELECT * FROM get_rebalance_table_shards_plan('colocated1'); SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); step s7-get-progress: @@ -237,32 +205,27 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Completing -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Completing +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Copying Data +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Copying Data colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) -step s7-release-lock: - COMMIT; +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-rebalance-c1-block-writes: <... completed> -table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport ---------------------------------------------------------------------- -colocated1|1500001| 0|localhost | 57637|localhost | 57638 -colocated2|1500005| 0|localhost | 57637|localhost | 57638 -colocated1|1500002| 0|localhost | 57637|localhost | 57638 -colocated2|1500006| 0|localhost | 57637|localhost | 57638 -(4 rows) - rebalance_table_shards --------------------------------------------------------------------- (1 row) -step s1-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -288,18 +251,15 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s6-acquire-advisory-lock s1-rebalance-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s5-acquire-advisory-lock-before-copy s1-rebalance-c1-online s7-get-progress s5-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s6-acquire-advisory-lock: - SELECT pg_advisory_lock(44000, 55152); +step s5-acquire-advisory-lock-before-copy: + SELECT pg_advisory_lock(55152, 44000); pg_advisory_lock --------------------------------------------------------------------- @@ -307,8 +267,6 @@ pg_advisory_lock (1 row) step s1-rebalance-c1-online: - BEGIN; - SELECT * FROM get_rebalance_table_shards_plan('colocated1'); SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='force_logical'); step s7-get-progress: @@ -340,6 +298,93 @@ colocated1|1500002| 200000|localhost | 57637| 200000|localhost colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-rebalance-c1-online: <... completed> +rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +step s1-wait: +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s6-acquire-advisory-lock-after-copy s1-rebalance-c1-online s7-get-progress s6-release-advisory-lock s1-wait s7-get-progress +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s6-acquire-advisory-lock-after-copy: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-rebalance-c1-online: + SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |t |Final Catchup +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |t |Final Catchup +colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet +colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet +(4 rows) + step s6-release-advisory-lock: SELECT pg_advisory_unlock(44000, 55152); @@ -349,22 +394,12 @@ t (1 row) step s1-rebalance-c1-online: <... completed> -table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport ---------------------------------------------------------------------- -colocated1|1500001| 0|localhost | 57637|localhost | 57638 -colocated2|1500005| 0|localhost | 57637|localhost | 57638 -colocated1|1500002| 0|localhost | 57637|localhost | 57638 -colocated2|1500006| 0|localhost | 57637|localhost | 57638 -(4 rows) - rebalance_table_shards --------------------------------------------------------------------- (1 row) -step s1-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -390,23 +425,22 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress enable-deferred-drop +starting permutation: s5-acquire-advisory-lock-before-copy s1-shard-move-c1-block-writes s7-get-progress s5-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s2-lock-1-start: - BEGIN; - DELETE FROM colocated1 WHERE test_id = 1; - DELETE FROM separate WHERE test_id = 1; +step s5-acquire-advisory-lock-before-copy: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) step s1-shard-move-c1-block-writes: - BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); step s7-get-progress: @@ -432,12 +466,17 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f |Setting Up -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move |t |t |f |Setting Up +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data (2 rows) -step s2-unlock-1-start: - ROLLBACK; +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-shard-move-c1-block-writes: <... completed> citus_move_shard_placement @@ -445,9 +484,7 @@ citus_move_shard_placement (1 row) -step s1-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -473,34 +510,22 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s7-grab-lock s1-shard-move-c1-block-writes s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock-after-copy s1-shard-move-c1-block-writes s7-get-progress s6-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s7-grab-lock: - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; +step s6-acquire-advisory-lock-after-copy: + SELECT pg_advisory_lock(44000, 55152); -?column? +pg_advisory_lock --------------------------------------------------------------------- - 1 -(1 row) -?column? ---------------------------------------------------------------------- - 1 (1 row) step s1-shard-move-c1-block-writes: - BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); step s7-get-progress: @@ -526,12 +551,17 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Completing -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Completing +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Copying Data +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Copying Data (2 rows) -step s7-release-lock: - COMMIT; +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-shard-move-c1-block-writes: <... completed> citus_move_shard_placement @@ -539,9 +569,7 @@ citus_move_shard_placement (1 row) -step s1-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -567,23 +595,22 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s2-lock-1-start s1-shard-copy-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit +starting permutation: s5-acquire-advisory-lock-before-copy s1-shard-copy-c1-block-writes s7-get-progress s5-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s2-lock-1-start: - BEGIN; - DELETE FROM colocated1 WHERE test_id = 1; - DELETE FROM separate WHERE test_id = 1; +step s5-acquire-advisory-lock-before-copy: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) step s1-shard-copy-c1-block-writes: - BEGIN; UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='block_writes'); @@ -610,12 +637,17 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|copy |t |t |f |Setting Up -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|copy |t |t |f |Setting Up +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data (2 rows) -step s2-unlock-1-start: - ROLLBACK; +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-shard-copy-c1-block-writes: <... completed> citus_copy_shard_placement @@ -623,17 +655,40 @@ citus_copy_shard_placement (1 row) -step s1-commit: - COMMIT; +step s1-wait: +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +(0 rows) -starting permutation: s6-acquire-advisory-lock s1-shard-move-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock-after-copy s1-shard-copy-c1-block-writes s7-get-progress s6-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s6-acquire-advisory-lock: +step s6-acquire-advisory-lock-after-copy: SELECT pg_advisory_lock(44000, 55152); pg_advisory_lock @@ -641,8 +696,93 @@ pg_advisory_lock (1 row) +step s1-shard-copy-c1-block-writes: + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='block_writes'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|copy |t |t |f |Copying Data +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|copy |t |t |f |Copying Data +(2 rows) + +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-copy-c1-block-writes: <... completed> +citus_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-wait: +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s5-acquire-advisory-lock-before-copy s1-shard-move-c1-online s7-get-progress s5-release-advisory-lock s1-wait s7-get-progress +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s5-acquire-advisory-lock-before-copy: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + step s1-shard-move-c1-online: - BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); step s7-get-progress: @@ -672,8 +812,8 @@ colocated1|1500001| 50000|localhost | 57637| 50000|localhost colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up (2 rows) -step s6-release-advisory-lock: - SELECT pg_advisory_unlock(44000, 55152); +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); pg_advisory_unlock --------------------------------------------------------------------- @@ -686,9 +826,7 @@ citus_move_shard_placement (1 row) -step s1-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -714,17 +852,14 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s6-acquire-advisory-lock s1-shard-copy-c1-online s7-get-progress s6-release-advisory-lock s1-commit +starting permutation: s6-acquire-advisory-lock-after-copy s1-shard-move-c1-online s7-get-progress s6-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s6-acquire-advisory-lock: +step s6-acquire-advisory-lock-after-copy: SELECT pg_advisory_lock(44000, 55152); pg_advisory_lock @@ -732,8 +867,92 @@ pg_advisory_lock (1 row) +step s1-shard-move-c1-online: + SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |t |Final Catchup +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |t |Final Catchup +(2 rows) + +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-move-c1-online: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-wait: +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s5-acquire-advisory-lock-before-copy s1-shard-copy-c1-online s7-get-progress s5-release-advisory-lock s1-wait s7-get-progress +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s5-acquire-advisory-lock-before-copy: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + step s1-shard-copy-c1-online: - BEGIN; UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='force_logical'); @@ -764,6 +983,92 @@ colocated1|1500001| 50000|localhost | 57637| 50000|localhost colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up (2 rows) +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-copy-c1-online: <... completed> +citus_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-wait: +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s6-acquire-advisory-lock-after-copy s1-shard-copy-c1-online s7-get-progress s6-release-advisory-lock s1-wait s7-get-progress +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s6-acquire-advisory-lock-after-copy: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-shard-copy-c1-online: + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|copy |t |t |t |Final Catchup +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|copy |t |t |t |Final Catchup +(2 rows) + step s6-release-advisory-lock: SELECT pg_advisory_unlock(44000, 55152); @@ -778,78 +1083,7 @@ citus_copy_shard_placement (1 row) -step s1-commit: - COMMIT; - - -starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s2-unlock-1-start s1-commit s4-commit s7-get-progress enable-deferred-drop -master_set_node_property ---------------------------------------------------------------------- - -(1 row) - -step s2-lock-1-start: - BEGIN; - DELETE FROM colocated1 WHERE test_id = 1; - DELETE FROM separate WHERE test_id = 1; - -step s1-shard-move-c1-block-writes: - BEGIN; - SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); - -step s4-shard-move-sep-block-writes: - BEGIN; - SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); - -step s7-get-progress: - set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) - SELECT - table_name, - shardid, - ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, - sourcename, - sourceport, - ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, - targetname, - targetport, - ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, - progress, - operation_type, - source_lsn >= target_lsn as lsn_sanity_check, - source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available, - status - FROM get_rebalance_progress(); - -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status ---------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f |Setting Up -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move |t |t |f |Setting Up -separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f |Setting Up -(3 rows) - -step s2-unlock-1-start: - ROLLBACK; - -step s1-shard-move-c1-block-writes: <... completed> -citus_move_shard_placement ---------------------------------------------------------------------- - -(1 row) - -step s4-shard-move-sep-block-writes: <... completed> -citus_move_shard_placement ---------------------------------------------------------------------- - -(1 row) - -step s1-commit: - COMMIT; - -step s4-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -875,41 +1109,28 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s7-grab-lock s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s7-release-lock s1-commit s4-commit s7-get-progress enable-deferred-drop +starting permutation: s5-acquire-advisory-lock-before-copy s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress-ordered s5-release-advisory-lock s1-wait s4-wait s7-get-progress-ordered master_set_node_property --------------------------------------------------------------------- (1 row) -step s7-grab-lock: - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; +step s5-acquire-advisory-lock-before-copy: + SELECT pg_advisory_lock(55152, 44000); -?column? +pg_advisory_lock --------------------------------------------------------------------- - 1 -(1 row) -?column? ---------------------------------------------------------------------- - 1 (1 row) step s1-shard-move-c1-block-writes: - BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); step s4-shard-move-sep-block-writes: - BEGIN; SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); -step s7-get-progress: +step s7-get-progress-ordered: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) SELECT @@ -926,19 +1147,24 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available, - status - FROM get_rebalance_progress(); + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress() + ORDER BY 1, 2, 3, 4, 5; -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Completing -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Completing -separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 200000| 1|move |t |t |f |Completing +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f +separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f (3 rows) -step s7-release-lock: - COMMIT; +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-shard-move-c1-block-writes: <... completed> citus_move_shard_placement @@ -952,13 +1178,9 @@ citus_move_shard_placement (1 row) -step s1-commit: - COMMIT; - -step s4-commit: - COMMIT; - -step s7-get-progress: +step s1-wait: +step s4-wait: +step s7-get-progress-ordered: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) SELECT @@ -975,14 +1197,107 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available, - status - FROM get_rebalance_progress(); + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress() + ORDER BY 1, 2, 3, 4, 5; -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + +starting permutation: s6-acquire-advisory-lock-after-copy s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress-ordered s6-release-advisory-lock s1-wait s4-wait s7-get-progress-ordered +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s6-acquire-advisory-lock-after-copy: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-shard-move-c1-block-writes: + SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); + +step s4-shard-move-sep-block-writes: + SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); + +step s7-get-progress-ordered: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress() + ORDER BY 1, 2, 3, 4, 5; + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f +separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 200000| 1|move |t |t |f +(3 rows) + +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-move-c1-block-writes: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s4-shard-move-sep-block-writes: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-wait: +step s4-wait: +step s7-get-progress-ordered: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress() + ORDER BY 1, 2, 3, 4, 5; + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +(0 rows) diff --git a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec index c1a3eac3a..e24a0a0df 100644 --- a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec +++ b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec @@ -1,13 +1,5 @@ setup { - -- We disable deffered drop, so we can easily trigger blocking the shard - -- move at the end of the move. This is done in a separate setup step, - -- because this cannot run in a transaction. - ALTER SYSTEM SET citus.defer_drop_after_shard_move TO OFF; -} -setup -{ - SELECT pg_reload_conf(); ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1500001; SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; @@ -30,7 +22,6 @@ setup teardown { - SELECT pg_reload_conf(); DROP TABLE colocated2; DROP TABLE colocated1; DROP TABLE separate; @@ -40,62 +31,37 @@ session "s1" step "s1-rebalance-c1-block-writes" { - BEGIN; - SELECT * FROM get_rebalance_table_shards_plan('colocated1'); SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); } step "s1-rebalance-c1-online" { - BEGIN; - SELECT * FROM get_rebalance_table_shards_plan('colocated1'); SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='force_logical'); } step "s1-shard-move-c1-block-writes" { - BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); } step "s1-shard-copy-c1-block-writes" { - BEGIN; UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='block_writes'); } step "s1-shard-move-c1-online" { - BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); } step "s1-shard-copy-c1-online" { - BEGIN; UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='force_logical'); } -step "s1-commit" -{ - COMMIT; -} - -session "s2" - -step "s2-lock-1-start" -{ - BEGIN; - DELETE FROM colocated1 WHERE test_id = 1; - DELETE FROM separate WHERE test_id = 1; -} - -step "s2-unlock-1-start" -{ - ROLLBACK; -} +step "s1-wait" {} session "s3" @@ -114,21 +80,33 @@ session "s4" step "s4-shard-move-sep-block-writes" { - BEGIN; SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); } -step "s4-commit" +step "s4-wait" {} + +session "s5" + +// this advisory lock with (almost) random values are only used +// is taken before any data is copied. For details check the source +// code of ConflictWithIsolationTestingBeforeCopy +step "s5-acquire-advisory-lock-before-copy" { - COMMIT; + SELECT pg_advisory_lock(55152, 44000); } +step "s5-release-advisory-lock" +{ + SELECT pg_advisory_unlock(55152, 44000); +} + + session "s6" // this advisory lock with (almost) random values are only used -// for testing purposes. For details, check Citus' logical replication -// source code -step "s6-acquire-advisory-lock" +// is taken after all data is copied. For details check the source +// code of ConflictWithIsolationTestingAfterCopy +step "s6-acquire-advisory-lock-after-copy" { SELECT pg_advisory_lock(44000, 55152); } @@ -141,22 +119,6 @@ step "s6-release-advisory-lock" session "s7" -// get_rebalance_progress internally calls pg_total_relation_size on all the -// shards. This means that it takes AccessShareLock on those shards. Because we -// run with deferred drop that means that get_rebalance_progress actually waits -// for the shard move to complete the drop. But we want to get the progress -// before the shards are dropped. So we grab the locks first with a simple -// query that reads from all shards. We force using a single connection because -// get_rebalance_progress isn't smart enough to reuse the right connection for -// the right shards and will simply use a single one for all of them. -step "s7-grab-lock" -{ - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; -} - step "s7-get-progress" { set LOCAL client_min_messages=NOTICE; @@ -180,46 +142,57 @@ step "s7-get-progress" FROM get_rebalance_progress(); } -step "s7-release-lock" +// When getting progress from multiple monitors at the same time it can result +// in random order of the tuples, because there's no defined order of the +// monitors. So in those cases we need to order the output for consistent results. +step "s7-get-progress-ordered" { - COMMIT; + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress() + ORDER BY 1, 2, 3, 4, 5; } -session "s8" - -// After running these tests we want to enable deferred-drop again. Sadly -// the isolation tester framework does not support multiple teardown steps -// and this cannot be run in a transaction. So we need to do it manually at -// the end of the last test. -step "enable-deferred-drop" -{ - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; -} // blocking rebalancer does what it should -permutation "s2-lock-1-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s3-lock-2-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s3-unlock-2-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s7-grab-lock" "s1-rebalance-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s5-acquire-advisory-lock-before-copy" "s1-rebalance-c1-block-writes" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress" +permutation "s3-lock-2-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s3-unlock-2-start" "s1-wait" "s7-get-progress" +permutation "s6-acquire-advisory-lock-after-copy" "s1-rebalance-c1-block-writes" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress" // online rebalancer -permutation "s6-acquire-advisory-lock" "s1-rebalance-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" -// Commented out due to flakyness -// permutation "s7-grab-lock" "s1-rebalance-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s5-acquire-advisory-lock-before-copy" "s1-rebalance-c1-online" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress" +permutation "s6-acquire-advisory-lock-after-copy" "s1-rebalance-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress" // blocking shard move -permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s5-acquire-advisory-lock-before-copy" "s1-shard-move-c1-block-writes" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress" +permutation "s6-acquire-advisory-lock-after-copy" "s1-shard-move-c1-block-writes" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress" // blocking shard copy -permutation "s2-lock-1-start" "s1-shard-copy-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" +permutation "s5-acquire-advisory-lock-before-copy" "s1-shard-copy-c1-block-writes" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress" +permutation "s6-acquire-advisory-lock-after-copy" "s1-shard-copy-c1-block-writes" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress" // online shard move -permutation "s6-acquire-advisory-lock" "s1-shard-move-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" -// Commented out due to flakyness -// permutation "s7-grab-lock" "s1-shard-move-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s5-acquire-advisory-lock-before-copy" "s1-shard-move-c1-online" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress" +permutation "s6-acquire-advisory-lock-after-copy" "s1-shard-move-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress" // online shard copy -permutation "s6-acquire-advisory-lock" "s1-shard-copy-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" +permutation "s5-acquire-advisory-lock-before-copy" "s1-shard-copy-c1-online" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress" +permutation "s6-acquire-advisory-lock-after-copy" "s1-shard-copy-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress" // parallel blocking shard move -permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress" "s7-release-lock" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s5-acquire-advisory-lock-before-copy" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress-ordered" "s5-release-advisory-lock" "s1-wait" "s4-wait" "s7-get-progress-ordered" +permutation "s6-acquire-advisory-lock-after-copy" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress-ordered" "s6-release-advisory-lock" "s1-wait" "s4-wait" "s7-get-progress-ordered"