diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 84ee97cb6..14e250af9 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -610,12 +610,11 @@ BlockingShardSplit(SplitOperation splitOperation, /* For Blocking split, copy isn't snapshotted */ char *snapshotName = NULL; + ConflictWithIsolationTestingBeforeCopy(); DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList, snapshotName, distributionColumnOverrides); - - /* Used for testing */ - ConflictOnlyWithIsolationTesting(); + ConflictWithIsolationTestingAfterCopy(); ereport(LOG, (errmsg( "creating auxillary structures (indexes, stats, replicaindentities, triggers) for %s", diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 6bab424d7..5d50417cc 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -1289,7 +1289,9 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, sourceNodePort, PLACEMENT_UPDATE_STATUS_COPYING_DATA); + ConflictWithIsolationTestingBeforeCopy(); CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); + ConflictWithIsolationTestingAfterCopy(); UpdatePlacementUpdateStatusForShardIntervalList( shardIntervalList, diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 43a48c1c3..bb9c7d221 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -236,7 +236,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo logicalRepTargetList); /* only useful for isolation testing, see the function comment for the details */ - ConflictOnlyWithIsolationTesting(); + ConflictWithIsolationTestingBeforeCopy(); /* * We have to create the primary key (or any other replica identity) @@ -427,7 +427,7 @@ CompleteNonBlockingShardTransfer(List *shardList, /* only useful for isolation testing, see the function comment for the details */ - ConflictOnlyWithIsolationTesting(); + ConflictWithIsolationTestingAfterCopy(); /* * We're almost done, we'll block the writes to the shards that we're @@ -1269,18 +1269,16 @@ CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList) /* - * ConflictOnlyWithIsolationTesting is only useful for testing and should - * not be called by any code-path except for LogicallyReplicateShards(). - * - * Since logically replicating shards does eventually block modifications, - * it becomes tricky to use isolation tester to show concurrent behaviour - * of online shard rebalancing and modification queries. + * ConflictWithIsolationTestingBeforeCopy is only useful to test + * get_rebalance_progress by pausing before doing the actual copy. This way we + * can see the state of the tables at that point. This should not be called by + * any code-path except for code paths to move and split shards(). * * Note that since the cost of calling this function is pretty low, we prefer * to use it in non-assert builds as well not to diverge in the behaviour. */ extern void -ConflictOnlyWithIsolationTesting() +ConflictWithIsolationTestingBeforeCopy(void) { LOCKTAG tag; const bool sessionLock = false; @@ -1288,11 +1286,46 @@ ConflictOnlyWithIsolationTesting() if (RunningUnderIsolationTest) { - /* we've picked random keys */ - SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, SHARD_MOVE_ADVISORY_LOCK_FIRST_KEY, + SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, + SHARD_MOVE_ADVISORY_LOCK_SECOND_KEY, + SHARD_MOVE_ADVISORY_LOCK_FIRST_KEY, 2); + + /* uses sharelock so concurrent moves don't conflict with eachother */ + (void) LockAcquire(&tag, ShareLock, sessionLock, dontWait); + } +} + + +/* + * ConflictWithIsolationTestingAfterCopy is only useful for two types of tests. + * 1. Testing the output of get_rebalance_progress after the copy is completed, + * but before the move is completely finished. Because finishing the move + * will clear the contents of get_rebalance_progress. + * 2. To test that our non-blocking shard moves/splits actually don't block + * writes. Since logically replicating shards does eventually block + * modifications, it becomes tricky to use isolation tester to show + * concurrent behaviour of online shard rebalancing and modification + * queries. So, during logical replication we call this function at + * the end of the catchup, right before blocking writes. + * + * Note that since the cost of calling this function is pretty low, we prefer + * to use it in non-assert builds as well not to diverge in the behaviour. + */ +extern void +ConflictWithIsolationTestingAfterCopy(void) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = false; + + if (RunningUnderIsolationTest) + { + SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, + SHARD_MOVE_ADVISORY_LOCK_FIRST_KEY, SHARD_MOVE_ADVISORY_LOCK_SECOND_KEY, 2); - (void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); + /* uses sharelock so concurrent moves don't conflict with eachother */ + (void) LockAcquire(&tag, ShareLock, sessionLock, dontWait); } } diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index f042b4326..16ac4ac02 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -131,7 +131,8 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, char *targetNodeName, int targetNodePort); -extern void ConflictOnlyWithIsolationTesting(void); +extern void ConflictWithIsolationTestingBeforeCopy(void); +extern void ConflictWithIsolationTestingAfterCopy(void); extern void CreateReplicaIdentities(List *subscriptionInfoList); extern void CreateReplicaIdentitiesOnNode(List *shardList, char *nodeName, diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index 2b29868c9..ba210da34 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -1,19 +1,20 @@ -Parsed test spec with 7 sessions +Parsed test spec with 6 sessions -starting permutation: s2-lock-1-start s1-rebalance-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress enable-deferred-drop +starting permutation: s5-acquire-advisory-lock-before-copy s1-rebalance-c1-block-writes s7-get-progress s5-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s2-lock-1-start: - BEGIN; - DELETE FROM colocated1 WHERE test_id = 1; - DELETE FROM separate WHERE test_id = 1; +step s5-acquire-advisory-lock-before-copy: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) step s1-rebalance-c1-block-writes: - BEGIN; - SELECT * FROM get_rebalance_table_shards_plan('colocated1'); SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); step s7-get-progress: @@ -39,32 +40,27 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f |Setting Up -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move |t |t |f |Setting Up +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) -step s2-unlock-1-start: - ROLLBACK; +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-rebalance-c1-block-writes: <... completed> -table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport ---------------------------------------------------------------------- -colocated1|1500001| 0|localhost | 57637|localhost | 57638 -colocated2|1500005| 0|localhost | 57637|localhost | 57638 -colocated1|1500002| 0|localhost | 57637|localhost | 57638 -colocated2|1500006| 0|localhost | 57637|localhost | 57638 -(4 rows) - rebalance_table_shards --------------------------------------------------------------------- (1 row) -step s1-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -90,11 +86,8 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s3-lock-2-start s1-rebalance-c1-block-writes s7-get-progress s3-unlock-2-start s1-commit s7-get-progress enable-deferred-drop +starting permutation: s3-lock-2-start s1-rebalance-c1-block-writes s7-get-progress s3-unlock-2-start s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -105,8 +98,6 @@ step s3-lock-2-start: DELETE FROM colocated1 WHERE test_id = 3; step s1-rebalance-c1-block-writes: - BEGIN; - SELECT * FROM get_rebalance_table_shards_plan('colocated1'); SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); step s7-get-progress: @@ -132,8 +123,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 0|localhost | 57638| 50000| 2|move |t |t |f |Completed -colocated2|1500005| 400000|localhost | 57637| 0|localhost | 57638| 400000| 2|move |t |t |f |Completed +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 2|move |t |t |f |Completed +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 2|move |t |t |f |Completed colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 1|move |t |t |f |Setting Up colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 1|move |t |t |f |Setting Up (4 rows) @@ -142,22 +133,12 @@ step s3-unlock-2-start: ROLLBACK; step s1-rebalance-c1-block-writes: <... completed> -table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport ---------------------------------------------------------------------- -colocated1|1500001| 0|localhost | 57637|localhost | 57638 -colocated2|1500005| 0|localhost | 57637|localhost | 57638 -colocated1|1500002| 0|localhost | 57637|localhost | 57638 -colocated2|1500006| 0|localhost | 57637|localhost | 57638 -(4 rows) - rebalance_table_shards --------------------------------------------------------------------- (1 row) -step s1-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -183,35 +164,22 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s7-grab-lock s1-rebalance-c1-block-writes s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock-after-copy s1-rebalance-c1-block-writes s7-get-progress s6-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s7-grab-lock: - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; +step s6-acquire-advisory-lock-after-copy: + SELECT pg_advisory_lock(44000, 55152); -?column? +pg_advisory_lock --------------------------------------------------------------------- - 1 -(1 row) -?column? ---------------------------------------------------------------------- - 1 (1 row) step s1-rebalance-c1-block-writes: - BEGIN; - SELECT * FROM get_rebalance_table_shards_plan('colocated1'); SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); step s7-get-progress: @@ -237,32 +205,27 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Completing -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Completing +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Copying Data +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Copying Data colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) -step s7-release-lock: - COMMIT; +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-rebalance-c1-block-writes: <... completed> -table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport ---------------------------------------------------------------------- -colocated1|1500001| 0|localhost | 57637|localhost | 57638 -colocated2|1500005| 0|localhost | 57637|localhost | 57638 -colocated1|1500002| 0|localhost | 57637|localhost | 57638 -colocated2|1500006| 0|localhost | 57637|localhost | 57638 -(4 rows) - rebalance_table_shards --------------------------------------------------------------------- (1 row) -step s1-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -288,18 +251,15 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s6-acquire-advisory-lock s1-rebalance-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s5-acquire-advisory-lock-before-copy s1-rebalance-c1-online s7-get-progress s5-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s6-acquire-advisory-lock: - SELECT pg_advisory_lock(44000, 55152); +step s5-acquire-advisory-lock-before-copy: + SELECT pg_advisory_lock(55152, 44000); pg_advisory_lock --------------------------------------------------------------------- @@ -307,8 +267,6 @@ pg_advisory_lock (1 row) step s1-rebalance-c1-online: - BEGIN; - SELECT * FROM get_rebalance_table_shards_plan('colocated1'); SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='force_logical'); step s7-get-progress: @@ -340,6 +298,93 @@ colocated1|1500002| 200000|localhost | 57637| 200000|localhost colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-rebalance-c1-online: <... completed> +rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +step s1-wait: +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s6-acquire-advisory-lock-after-copy s1-rebalance-c1-online s7-get-progress s6-release-advisory-lock s1-wait s7-get-progress +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s6-acquire-advisory-lock-after-copy: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-rebalance-c1-online: + SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |t |Final Catchup +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |t |Final Catchup +colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet +colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet +(4 rows) + step s6-release-advisory-lock: SELECT pg_advisory_unlock(44000, 55152); @@ -349,22 +394,12 @@ t (1 row) step s1-rebalance-c1-online: <... completed> -table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport ---------------------------------------------------------------------- -colocated1|1500001| 0|localhost | 57637|localhost | 57638 -colocated2|1500005| 0|localhost | 57637|localhost | 57638 -colocated1|1500002| 0|localhost | 57637|localhost | 57638 -colocated2|1500006| 0|localhost | 57637|localhost | 57638 -(4 rows) - rebalance_table_shards --------------------------------------------------------------------- (1 row) -step s1-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -390,23 +425,22 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress enable-deferred-drop +starting permutation: s5-acquire-advisory-lock-before-copy s1-shard-move-c1-block-writes s7-get-progress s5-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s2-lock-1-start: - BEGIN; - DELETE FROM colocated1 WHERE test_id = 1; - DELETE FROM separate WHERE test_id = 1; +step s5-acquire-advisory-lock-before-copy: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) step s1-shard-move-c1-block-writes: - BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); step s7-get-progress: @@ -432,12 +466,17 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f |Setting Up -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move |t |t |f |Setting Up +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data (2 rows) -step s2-unlock-1-start: - ROLLBACK; +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-shard-move-c1-block-writes: <... completed> citus_move_shard_placement @@ -445,9 +484,7 @@ citus_move_shard_placement (1 row) -step s1-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -473,34 +510,22 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s7-grab-lock s1-shard-move-c1-block-writes s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock-after-copy s1-shard-move-c1-block-writes s7-get-progress s6-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s7-grab-lock: - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; +step s6-acquire-advisory-lock-after-copy: + SELECT pg_advisory_lock(44000, 55152); -?column? +pg_advisory_lock --------------------------------------------------------------------- - 1 -(1 row) -?column? ---------------------------------------------------------------------- - 1 (1 row) step s1-shard-move-c1-block-writes: - BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); step s7-get-progress: @@ -526,12 +551,17 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Completing -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Completing +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Copying Data +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Copying Data (2 rows) -step s7-release-lock: - COMMIT; +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-shard-move-c1-block-writes: <... completed> citus_move_shard_placement @@ -539,9 +569,7 @@ citus_move_shard_placement (1 row) -step s1-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -567,23 +595,22 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s2-lock-1-start s1-shard-copy-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit +starting permutation: s5-acquire-advisory-lock-before-copy s1-shard-copy-c1-block-writes s7-get-progress s5-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s2-lock-1-start: - BEGIN; - DELETE FROM colocated1 WHERE test_id = 1; - DELETE FROM separate WHERE test_id = 1; +step s5-acquire-advisory-lock-before-copy: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) step s1-shard-copy-c1-block-writes: - BEGIN; UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='block_writes'); @@ -610,12 +637,17 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|copy |t |t |f |Setting Up -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|copy |t |t |f |Setting Up +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data (2 rows) -step s2-unlock-1-start: - ROLLBACK; +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-shard-copy-c1-block-writes: <... completed> citus_copy_shard_placement @@ -623,17 +655,40 @@ citus_copy_shard_placement (1 row) -step s1-commit: - COMMIT; +step s1-wait: +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +(0 rows) -starting permutation: s6-acquire-advisory-lock s1-shard-move-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock-after-copy s1-shard-copy-c1-block-writes s7-get-progress s6-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s6-acquire-advisory-lock: +step s6-acquire-advisory-lock-after-copy: SELECT pg_advisory_lock(44000, 55152); pg_advisory_lock @@ -641,8 +696,93 @@ pg_advisory_lock (1 row) +step s1-shard-copy-c1-block-writes: + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='block_writes'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|copy |t |t |f |Copying Data +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|copy |t |t |f |Copying Data +(2 rows) + +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-copy-c1-block-writes: <... completed> +citus_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-wait: +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s5-acquire-advisory-lock-before-copy s1-shard-move-c1-online s7-get-progress s5-release-advisory-lock s1-wait s7-get-progress +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s5-acquire-advisory-lock-before-copy: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + step s1-shard-move-c1-online: - BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); step s7-get-progress: @@ -672,8 +812,8 @@ colocated1|1500001| 50000|localhost | 57637| 50000|localhost colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up (2 rows) -step s6-release-advisory-lock: - SELECT pg_advisory_unlock(44000, 55152); +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); pg_advisory_unlock --------------------------------------------------------------------- @@ -686,9 +826,7 @@ citus_move_shard_placement (1 row) -step s1-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -714,17 +852,14 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s6-acquire-advisory-lock s1-shard-copy-c1-online s7-get-progress s6-release-advisory-lock s1-commit +starting permutation: s6-acquire-advisory-lock-after-copy s1-shard-move-c1-online s7-get-progress s6-release-advisory-lock s1-wait s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s6-acquire-advisory-lock: +step s6-acquire-advisory-lock-after-copy: SELECT pg_advisory_lock(44000, 55152); pg_advisory_lock @@ -732,8 +867,92 @@ pg_advisory_lock (1 row) +step s1-shard-move-c1-online: + SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |t |Final Catchup +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |t |Final Catchup +(2 rows) + +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-move-c1-online: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-wait: +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s5-acquire-advisory-lock-before-copy s1-shard-copy-c1-online s7-get-progress s5-release-advisory-lock s1-wait s7-get-progress +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s5-acquire-advisory-lock-before-copy: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + step s1-shard-copy-c1-online: - BEGIN; UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='force_logical'); @@ -764,6 +983,92 @@ colocated1|1500001| 50000|localhost | 57637| 50000|localhost colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up (2 rows) +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-copy-c1-online: <... completed> +citus_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-wait: +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s6-acquire-advisory-lock-after-copy s1-shard-copy-c1-online s7-get-progress s6-release-advisory-lock s1-wait s7-get-progress +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s6-acquire-advisory-lock-after-copy: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-shard-copy-c1-online: + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available, + status + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|copy |t |t |t |Final Catchup +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|copy |t |t |t |Final Catchup +(2 rows) + step s6-release-advisory-lock: SELECT pg_advisory_unlock(44000, 55152); @@ -778,78 +1083,7 @@ citus_copy_shard_placement (1 row) -step s1-commit: - COMMIT; - - -starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s2-unlock-1-start s1-commit s4-commit s7-get-progress enable-deferred-drop -master_set_node_property ---------------------------------------------------------------------- - -(1 row) - -step s2-lock-1-start: - BEGIN; - DELETE FROM colocated1 WHERE test_id = 1; - DELETE FROM separate WHERE test_id = 1; - -step s1-shard-move-c1-block-writes: - BEGIN; - SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); - -step s4-shard-move-sep-block-writes: - BEGIN; - SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); - -step s7-get-progress: - set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) - SELECT - table_name, - shardid, - ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, - sourcename, - sourceport, - ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, - targetname, - targetport, - ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, - progress, - operation_type, - source_lsn >= target_lsn as lsn_sanity_check, - source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available, - status - FROM get_rebalance_progress(); - -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status ---------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f |Setting Up -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move |t |t |f |Setting Up -separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f |Setting Up -(3 rows) - -step s2-unlock-1-start: - ROLLBACK; - -step s1-shard-move-c1-block-writes: <... completed> -citus_move_shard_placement ---------------------------------------------------------------------- - -(1 row) - -step s4-shard-move-sep-block-writes: <... completed> -citus_move_shard_placement ---------------------------------------------------------------------- - -(1 row) - -step s1-commit: - COMMIT; - -step s4-commit: - COMMIT; - +step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) @@ -875,41 +1109,28 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s7-grab-lock s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s7-release-lock s1-commit s4-commit s7-get-progress enable-deferred-drop +starting permutation: s5-acquire-advisory-lock-before-copy s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress-ordered s5-release-advisory-lock s1-wait s4-wait s7-get-progress-ordered master_set_node_property --------------------------------------------------------------------- (1 row) -step s7-grab-lock: - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; +step s5-acquire-advisory-lock-before-copy: + SELECT pg_advisory_lock(55152, 44000); -?column? +pg_advisory_lock --------------------------------------------------------------------- - 1 -(1 row) -?column? ---------------------------------------------------------------------- - 1 (1 row) step s1-shard-move-c1-block-writes: - BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); step s4-shard-move-sep-block-writes: - BEGIN; SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); -step s7-get-progress: +step s7-get-progress-ordered: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) SELECT @@ -926,19 +1147,24 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available, - status - FROM get_rebalance_progress(); + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress() + ORDER BY 1, 2, 3, 4, 5; -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Completing -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Completing -separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 200000| 1|move |t |t |f |Completing +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f +separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f (3 rows) -step s7-release-lock: - COMMIT; +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-shard-move-c1-block-writes: <... completed> citus_move_shard_placement @@ -952,13 +1178,9 @@ citus_move_shard_placement (1 row) -step s1-commit: - COMMIT; - -step s4-commit: - COMMIT; - -step s7-get-progress: +step s1-wait: +step s4-wait: +step s7-get-progress-ordered: set LOCAL client_min_messages=NOTICE; WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) SELECT @@ -975,14 +1197,107 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available, - status - FROM get_rebalance_progress(); + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress() + ORDER BY 1, 2, 3, 4, 5; -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + +starting permutation: s6-acquire-advisory-lock-after-copy s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress-ordered s6-release-advisory-lock s1-wait s4-wait s7-get-progress-ordered +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s6-acquire-advisory-lock-after-copy: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-shard-move-c1-block-writes: + SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); + +step s4-shard-move-sep-block-writes: + SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); + +step s7-get-progress-ordered: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress() + ORDER BY 1, 2, 3, 4, 5; + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f +separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 200000| 1|move |t |t |f +(3 rows) + +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-move-c1-block-writes: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s4-shard-move-sep-block-writes: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-wait: +step s4-wait: +step s7-get-progress-ordered: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress() + ORDER BY 1, 2, 3, 4, 5; + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +(0 rows) diff --git a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec index c1a3eac3a..e24a0a0df 100644 --- a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec +++ b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec @@ -1,13 +1,5 @@ setup { - -- We disable deffered drop, so we can easily trigger blocking the shard - -- move at the end of the move. This is done in a separate setup step, - -- because this cannot run in a transaction. - ALTER SYSTEM SET citus.defer_drop_after_shard_move TO OFF; -} -setup -{ - SELECT pg_reload_conf(); ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1500001; SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; @@ -30,7 +22,6 @@ setup teardown { - SELECT pg_reload_conf(); DROP TABLE colocated2; DROP TABLE colocated1; DROP TABLE separate; @@ -40,62 +31,37 @@ session "s1" step "s1-rebalance-c1-block-writes" { - BEGIN; - SELECT * FROM get_rebalance_table_shards_plan('colocated1'); SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); } step "s1-rebalance-c1-online" { - BEGIN; - SELECT * FROM get_rebalance_table_shards_plan('colocated1'); SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='force_logical'); } step "s1-shard-move-c1-block-writes" { - BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); } step "s1-shard-copy-c1-block-writes" { - BEGIN; UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='block_writes'); } step "s1-shard-move-c1-online" { - BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); } step "s1-shard-copy-c1-online" { - BEGIN; UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='force_logical'); } -step "s1-commit" -{ - COMMIT; -} - -session "s2" - -step "s2-lock-1-start" -{ - BEGIN; - DELETE FROM colocated1 WHERE test_id = 1; - DELETE FROM separate WHERE test_id = 1; -} - -step "s2-unlock-1-start" -{ - ROLLBACK; -} +step "s1-wait" {} session "s3" @@ -114,21 +80,33 @@ session "s4" step "s4-shard-move-sep-block-writes" { - BEGIN; SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); } -step "s4-commit" +step "s4-wait" {} + +session "s5" + +// this advisory lock with (almost) random values are only used +// is taken before any data is copied. For details check the source +// code of ConflictWithIsolationTestingBeforeCopy +step "s5-acquire-advisory-lock-before-copy" { - COMMIT; + SELECT pg_advisory_lock(55152, 44000); } +step "s5-release-advisory-lock" +{ + SELECT pg_advisory_unlock(55152, 44000); +} + + session "s6" // this advisory lock with (almost) random values are only used -// for testing purposes. For details, check Citus' logical replication -// source code -step "s6-acquire-advisory-lock" +// is taken after all data is copied. For details check the source +// code of ConflictWithIsolationTestingAfterCopy +step "s6-acquire-advisory-lock-after-copy" { SELECT pg_advisory_lock(44000, 55152); } @@ -141,22 +119,6 @@ step "s6-release-advisory-lock" session "s7" -// get_rebalance_progress internally calls pg_total_relation_size on all the -// shards. This means that it takes AccessShareLock on those shards. Because we -// run with deferred drop that means that get_rebalance_progress actually waits -// for the shard move to complete the drop. But we want to get the progress -// before the shards are dropped. So we grab the locks first with a simple -// query that reads from all shards. We force using a single connection because -// get_rebalance_progress isn't smart enough to reuse the right connection for -// the right shards and will simply use a single one for all of them. -step "s7-grab-lock" -{ - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; -} - step "s7-get-progress" { set LOCAL client_min_messages=NOTICE; @@ -180,46 +142,57 @@ step "s7-get-progress" FROM get_rebalance_progress(); } -step "s7-release-lock" +// When getting progress from multiple monitors at the same time it can result +// in random order of the tuples, because there's no defined order of the +// monitors. So in those cases we need to order the output for consistent results. +step "s7-get-progress-ordered" { - COMMIT; + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress() + ORDER BY 1, 2, 3, 4, 5; } -session "s8" - -// After running these tests we want to enable deferred-drop again. Sadly -// the isolation tester framework does not support multiple teardown steps -// and this cannot be run in a transaction. So we need to do it manually at -// the end of the last test. -step "enable-deferred-drop" -{ - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; -} // blocking rebalancer does what it should -permutation "s2-lock-1-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s3-lock-2-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s3-unlock-2-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s7-grab-lock" "s1-rebalance-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s5-acquire-advisory-lock-before-copy" "s1-rebalance-c1-block-writes" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress" +permutation "s3-lock-2-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s3-unlock-2-start" "s1-wait" "s7-get-progress" +permutation "s6-acquire-advisory-lock-after-copy" "s1-rebalance-c1-block-writes" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress" // online rebalancer -permutation "s6-acquire-advisory-lock" "s1-rebalance-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" -// Commented out due to flakyness -// permutation "s7-grab-lock" "s1-rebalance-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s5-acquire-advisory-lock-before-copy" "s1-rebalance-c1-online" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress" +permutation "s6-acquire-advisory-lock-after-copy" "s1-rebalance-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress" // blocking shard move -permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s5-acquire-advisory-lock-before-copy" "s1-shard-move-c1-block-writes" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress" +permutation "s6-acquire-advisory-lock-after-copy" "s1-shard-move-c1-block-writes" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress" // blocking shard copy -permutation "s2-lock-1-start" "s1-shard-copy-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" +permutation "s5-acquire-advisory-lock-before-copy" "s1-shard-copy-c1-block-writes" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress" +permutation "s6-acquire-advisory-lock-after-copy" "s1-shard-copy-c1-block-writes" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress" // online shard move -permutation "s6-acquire-advisory-lock" "s1-shard-move-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" -// Commented out due to flakyness -// permutation "s7-grab-lock" "s1-shard-move-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s5-acquire-advisory-lock-before-copy" "s1-shard-move-c1-online" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress" +permutation "s6-acquire-advisory-lock-after-copy" "s1-shard-move-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress" // online shard copy -permutation "s6-acquire-advisory-lock" "s1-shard-copy-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" +permutation "s5-acquire-advisory-lock-before-copy" "s1-shard-copy-c1-online" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress" +permutation "s6-acquire-advisory-lock-after-copy" "s1-shard-copy-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress" // parallel blocking shard move -permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress" "s7-release-lock" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s5-acquire-advisory-lock-before-copy" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress-ordered" "s5-release-advisory-lock" "s1-wait" "s4-wait" "s7-get-progress-ordered" +permutation "s6-acquire-advisory-lock-after-copy" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress-ordered" "s6-release-advisory-lock" "s1-wait" "s4-wait" "s7-get-progress-ordered"