From e29865abd53fd513af6a425d05a9fea426c263cf Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Wed, 5 Oct 2022 17:43:42 +0200 Subject: [PATCH] Increase shard move test coverage by improving advisory locks To be able to test non-blocking shard moves we take an advisory lock, so we can pause the shard move at an interesting moment. Originally this was during the logical replication catch up phase. But when I added tests for the rebalancer progress I moved this lock before the initial data copy. This allowed testing of the rebalance progress, but inadvertently made our non-blocking tests not actually test if we held unintended locks during logical replication catch up. This fixes that by creating two types of advisory locks, one before the copy and one after. This causes the tests to actually test their intended scenario again. Furthermore it starts using one of these locks for blocking shard moves too. Which allowed me to reduce the complexity of the rebalance progress test suite quite a bit. It also allowed enabling some flaky tests again, because this stopped them from being flaky. And finally it allowed testing of rebalance progress for blocking shard copy operations as well. --- .../distributed/operations/shard_split.c | 2 +- .../distributed/operations/shard_transfer.c | 1 + .../replication/multi_logical_replication.c | 24 +- .../distributed/multi_logical_replication.h | 2 +- .../isolation_shard_rebalancer_progress.out | 496 +++++++++++++++--- .../isolation_shard_rebalancer_progress.spec | 93 ++-- 6 files changed, 473 insertions(+), 145 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 84ee97cb6..456e95eff 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -615,7 +615,7 @@ BlockingShardSplit(SplitOperation splitOperation, snapshotName, distributionColumnOverrides); /* Used for testing */ - ConflictOnlyWithIsolationTesting(); + ConflictOnlyWithIsolationTesting(false); ereport(LOG, (errmsg( "creating auxillary structures (indexes, stats, replicaindentities, triggers) for %s", diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index cdb61731f..9e4f44d55 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -1258,6 +1258,7 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, } CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); + ConflictOnlyWithIsolationTesting(false); foreach_ptr(shardInterval, shardIntervalList) { diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 850daca24..a20d7d4e0 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -236,7 +236,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo logicalRepTargetList); /* only useful for isolation testing, see the function comment for the details */ - ConflictOnlyWithIsolationTesting(); + ConflictOnlyWithIsolationTesting(true); /* * We have to create the primary key (or any other replica identity) @@ -402,7 +402,7 @@ CompleteNonBlockingShardTransfer(List *shardList, /* only useful for isolation testing, see the function comment for the details */ - ConflictOnlyWithIsolationTesting(); + ConflictOnlyWithIsolationTesting(false); /* * We're almost done, we'll block the writes to the shards that we're @@ -1241,9 +1241,13 @@ CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList) * * Note that since the cost of calling this function is pretty low, we prefer * to use it in non-assert builds as well not to diverge in the behaviour. + * + * Depending on its argument it takes a different lock. It can take either a + * lock that is taken before the data copy is started, or it takes a lock + * during logical replication. */ extern void -ConflictOnlyWithIsolationTesting() +ConflictOnlyWithIsolationTesting(bool beforeCopy) { LOCKTAG tag; const bool sessionLock = false; @@ -1252,8 +1256,18 @@ ConflictOnlyWithIsolationTesting() if (RunningUnderIsolationTest) { /* we've picked random keys */ - SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, SHARD_MOVE_ADVISORY_LOCK_FIRST_KEY, - SHARD_MOVE_ADVISORY_LOCK_SECOND_KEY, 2); + if (beforeCopy) + { + SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, + SHARD_MOVE_ADVISORY_LOCK_SECOND_KEY, + SHARD_MOVE_ADVISORY_LOCK_FIRST_KEY, 2); + } + else + { + SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, + SHARD_MOVE_ADVISORY_LOCK_FIRST_KEY, + SHARD_MOVE_ADVISORY_LOCK_SECOND_KEY, 2); + } (void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); } diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index f042b4326..2cd787f10 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -131,7 +131,7 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, char *targetNodeName, int targetNodePort); -extern void ConflictOnlyWithIsolationTesting(void); +extern void ConflictOnlyWithIsolationTesting(bool beforeCopy); extern void CreateReplicaIdentities(List *subscriptionInfoList); extern void CreateReplicaIdentitiesOnNode(List *shardList, char *nodeName, diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index 69d0a9f91..1d5aa062c 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -1,6 +1,6 @@ Parsed test spec with 7 sessions -starting permutation: s2-lock-1-start s1-rebalance-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress enable-deferred-drop +starting permutation: s2-lock-1-start s1-rebalance-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -88,11 +88,8 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s3-lock-2-start s1-rebalance-c1-block-writes s7-get-progress s3-unlock-2-start s1-commit s7-get-progress enable-deferred-drop +starting permutation: s3-lock-2-start s1-rebalance-c1-block-writes s7-get-progress s3-unlock-2-start s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -129,8 +126,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 0|localhost | 57638| 50000| 2|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 0|localhost | 57638| 400000| 2|move |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 2|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 2|move |t |t |f colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 1|move |t |t |f colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 1|move |t |t |f (4 rows) @@ -179,30 +176,19 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s7-grab-lock s1-rebalance-c1-block-writes s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock s1-rebalance-c1-block-writes s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s7-grab-lock: - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; +step s6-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); -?column? +pg_advisory_lock --------------------------------------------------------------------- - 1 -(1 row) -?column? ---------------------------------------------------------------------- - 1 (1 row) step s1-rebalance-c1-block-writes: @@ -238,8 +224,13 @@ colocated1|1500002| 200000|localhost | 57637| 200000|localhost colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f (4 rows) -step s7-release-lock: - COMMIT; +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-rebalance-c1-block-writes: <... completed> table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport @@ -282,11 +273,105 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + +starting permutation: s5-acquire-advisory-lock s1-rebalance-c1-online s7-get-progress s5-release-advisory-lock s1-commit s7-get-progress +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s5-acquire-advisory-lock: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-rebalance-c1-online: + BEGIN; + SELECT * FROM get_rebalance_table_shards_plan('colocated1'); + SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f +colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f +colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f +(4 rows) + +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-rebalance-c1-online: <... completed> +table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport +--------------------------------------------------------------------- +colocated1|1500001| 0|localhost | 57637|localhost | 57638 +colocated2|1500005| 0|localhost | 57637|localhost | 57638 +colocated1|1500002| 0|localhost | 57637|localhost | 57638 +colocated2|1500006| 0|localhost | 57637|localhost | 57638 +(4 rows) + +rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +(0 rows) -starting permutation: s6-acquire-advisory-lock s1-rebalance-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock s1-rebalance-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -327,8 +412,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |t +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |t colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f (4 rows) @@ -382,11 +467,8 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress enable-deferred-drop +starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -463,30 +545,19 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s7-grab-lock s1-shard-move-c1-block-writes s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock s1-shard-move-c1-block-writes s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s7-grab-lock: - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; +step s6-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); -?column? +pg_advisory_lock --------------------------------------------------------------------- - 1 -(1 row) -?column? ---------------------------------------------------------------------- - 1 (1 row) step s1-shard-move-c1-block-writes: @@ -519,8 +590,13 @@ colocated1|1500001| 50000|localhost | 57637| 50000|localhost colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f (2 rows) -step s7-release-lock: - COMMIT; +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-shard-move-c1-block-writes: <... completed> citus_move_shard_placement @@ -555,9 +631,6 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - starting permutation: s2-lock-1-start s1-shard-copy-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit master_set_node_property @@ -614,7 +687,180 @@ step s1-commit: COMMIT; -starting permutation: s6-acquire-advisory-lock s1-shard-move-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock s1-shard-copy-c1-block-writes s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s6-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-shard-copy-c1-block-writes: + BEGIN; + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='block_writes'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|copy |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|copy |t |t |f +(2 rows) + +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-copy-c1-block-writes: <... completed> +citus_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s5-acquire-advisory-lock s1-shard-move-c1-online s7-get-progress s5-release-advisory-lock s1-commit s7-get-progress +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s5-acquire-advisory-lock: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-shard-move-c1-online: + BEGIN; + SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f +(2 rows) + +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-move-c1-online: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s6-acquire-advisory-lock s1-shard-move-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -654,8 +900,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |t +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |t (2 rows) step s6-release-advisory-lock: @@ -699,11 +945,71 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + +starting permutation: s5-acquire-advisory-lock s1-shard-copy-c1-online s7-get-progress s5-release-advisory-lock s1-commit +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s5-acquire-advisory-lock: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-shard-copy-c1-online: + BEGIN; + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f +(2 rows) + +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-copy-c1-online: <... completed> +citus_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; -starting permutation: s6-acquire-advisory-lock s1-shard-copy-c1-online s7-get-progress s6-release-advisory-lock s1-commit +starting permutation: s6-acquire-advisory-lock s1-shard-copy-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -744,8 +1050,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|copy |t |t |t +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|copy |t |t |t (2 rows) step s6-release-advisory-lock: @@ -765,8 +1071,32 @@ citus_copy_shard_placement step s1-commit: COMMIT; +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); -starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s2-unlock-1-start s1-commit s4-commit s7-get-progress enable-deferred-drop +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes-without-advisory-locks s7-get-progress s2-unlock-1-start s1-commit s4-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -781,8 +1111,9 @@ step s1-shard-move-c1-block-writes: BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); -step s4-shard-move-sep-block-writes: +step s4-shard-move-sep-block-writes-without-advisory-locks: BEGIN; + SET LOCAL citus.running_under_isolation_test = false; SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); step s7-get-progress: @@ -821,7 +1152,7 @@ citus_move_shard_placement (1 row) -step s4-shard-move-sep-block-writes: <... completed> +step s4-shard-move-sep-block-writes-without-advisory-locks: <... completed> citus_move_shard_placement --------------------------------------------------------------------- @@ -857,30 +1188,19 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s7-grab-lock s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s7-release-lock s1-commit s4-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s6-release-advisory-lock s1-commit s4-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s7-grab-lock: - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; +step s6-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); -?column? +pg_advisory_lock --------------------------------------------------------------------- - 1 -(1 row) -?column? ---------------------------------------------------------------------- - 1 (1 row) step s1-shard-move-c1-block-writes: @@ -918,8 +1238,13 @@ colocated2|1500005| 400000|localhost | 57637| 400000|localhost separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 200000| 1|move |t |t |f (3 rows) -step s7-release-lock: - COMMIT; +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-shard-move-c1-block-writes: <... completed> citus_move_shard_placement @@ -927,15 +1252,15 @@ citus_move_shard_placement (1 row) +step s1-commit: + COMMIT; + step s4-shard-move-sep-block-writes: <... completed> citus_move_shard_placement --------------------------------------------------------------------- (1 row) -step s1-commit: - COMMIT; - step s4-commit: COMMIT; @@ -963,6 +1288,3 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - diff --git a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec index b9d10d047..678e3081f 100644 --- a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec +++ b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec @@ -1,13 +1,9 @@ setup { - -- We disable deffered drop, so we can easily trigger blocking the shard - -- move at the end of the move. This is done in a separate setup step, - -- because this cannot run in a transaction. - ALTER SYSTEM SET citus.defer_drop_after_shard_move TO OFF; + CALL citus_cleanup_orphaned_shards(); } setup { - SELECT pg_reload_conf(); ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1500001; SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; @@ -30,7 +26,6 @@ setup teardown { - SELECT pg_reload_conf(); DROP TABLE colocated2; DROP TABLE colocated1; DROP TABLE separate; @@ -118,11 +113,38 @@ step "s4-shard-move-sep-block-writes" SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); } +// Running two shard moves at the same time can cause racy behaviour over who +// gets the lock. For the test where this move is used in we don't rely on the +// advisory locks. So we disable taking the advisory lock there to avoid the +// racy lock acquisition with the other concurretn move. +step "s4-shard-move-sep-block-writes-without-advisory-locks" +{ + BEGIN; + SET LOCAL citus.running_under_isolation_test = false; + SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); +} + step "s4-commit" { COMMIT; } +session "s5" + +// this advisory lock with (almost) random values are only used +// for testing purposes. For details, check Citus' logical replication +// source code +step "s5-acquire-advisory-lock" +{ + SELECT pg_advisory_lock(55152, 44000); +} + +step "s5-release-advisory-lock" +{ + SELECT pg_advisory_unlock(55152, 44000); +} + + session "s6" // this advisory lock with (almost) random values are only used @@ -141,22 +163,6 @@ step "s6-release-advisory-lock" session "s7" -// get_rebalance_progress internally calls pg_total_relation_size on all the -// shards. This means that it takes AccessShareLock on those shards. Because we -// run with deferred drop that means that get_rebalance_progress actually waits -// for the shard move to complete the drop. But we want to get the progress -// before the shards are dropped. So we grab the locks first with a simple -// query that reads from all shards. We force using a single connection because -// get_rebalance_progress isn't smart enough to reuse the right connection for -// the right shards and will simply use a single one for all of them. -step "s7-grab-lock" -{ - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; -} - step "s7-get-progress" { set LOCAL client_min_messages=NOTICE; @@ -179,46 +185,31 @@ step "s7-get-progress" FROM get_rebalance_progress(); } -step "s7-release-lock" -{ - COMMIT; -} - -session "s8" - -// After running these tests we want to enable deferred-drop again. Sadly -// the isolation tester framework does not support multiple teardown steps -// and this cannot be run in a transaction. So we need to do it manually at -// the end of the last test. -step "enable-deferred-drop" -{ - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; -} // blocking rebalancer does what it should -permutation "s2-lock-1-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s3-lock-2-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s3-unlock-2-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s7-grab-lock" "s1-rebalance-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s2-lock-1-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" +permutation "s3-lock-2-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s3-unlock-2-start" "s1-commit" "s7-get-progress" +permutation "s6-acquire-advisory-lock" "s1-rebalance-c1-block-writes" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" // online rebalancer -permutation "s6-acquire-advisory-lock" "s1-rebalance-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" -// Commented out due to flakyness -// permutation "s7-grab-lock" "s1-rebalance-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s5-acquire-advisory-lock" "s1-rebalance-c1-online" "s7-get-progress" "s5-release-advisory-lock" "s1-commit" "s7-get-progress" +permutation "s6-acquire-advisory-lock" "s1-rebalance-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" // blocking shard move -permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" +permutation "s6-acquire-advisory-lock" "s1-shard-move-c1-block-writes" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" // blocking shard copy permutation "s2-lock-1-start" "s1-shard-copy-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" +permutation "s6-acquire-advisory-lock" "s1-shard-copy-c1-block-writes" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" // online shard move -permutation "s6-acquire-advisory-lock" "s1-shard-move-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" -// Commented out due to flakyness -// permutation "s7-grab-lock" "s1-shard-move-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s5-acquire-advisory-lock" "s1-shard-move-c1-online" "s7-get-progress" "s5-release-advisory-lock" "s1-commit" "s7-get-progress" +permutation "s6-acquire-advisory-lock" "s1-shard-move-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" // online shard copy -permutation "s6-acquire-advisory-lock" "s1-shard-copy-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" +permutation "s5-acquire-advisory-lock" "s1-shard-copy-c1-online" "s7-get-progress" "s5-release-advisory-lock" "s1-commit" +permutation "s6-acquire-advisory-lock" "s1-shard-copy-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" // parallel blocking shard move -permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress" "s7-release-lock" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes-without-advisory-locks"("s1-shard-move-c1-block-writes") "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s4-commit" "s7-get-progress" +permutation "s6-acquire-advisory-lock" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s4-commit" "s7-get-progress"