Increase shard move test coverage by improving advisory locks (#6429)

To be able to test non-blocking shard moves we take an advisory lock, so
we can pause the shard move at an interesting moment. Originally this
was during the logical replication catch up phase. But when I added
tests for the rebalancer progress I moved this lock before the initial
data copy. This allowed testing of the rebalance progress, but
inadvertently made our non-blocking tests not actually test if we held
unintended locks during logical replication catch up.

This fixes that by creating two types of advisory locks, one before the
copy and one after. This causes the tests to actually test their
intended scenario again.

Furthermore it starts using one of these locks for blocking shard moves
too. Which allowed me to reduce the complexity of the rebalance progress
test suite quite a bit. It also allowed enabling some flaky tests again,
because this stopped them from being flaky. And finally it allowed
testing of rebalance progress for blocking shard copy operations as
well.

In passing it fixes a flaky test during parallel blocking shard moves by
ordering the output.
pull/6396/head^2
Jelte Fennema 2022-10-17 17:32:28 +02:00 committed by GitHub
parent 96912d9ba1
commit 60eb67b908
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 696 additions and 373 deletions

View File

@ -610,12 +610,11 @@ BlockingShardSplit(SplitOperation splitOperation,
/* For Blocking split, copy isn't snapshotted */
char *snapshotName = NULL;
ConflictWithIsolationTestingBeforeCopy();
DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, workersForPlacementList,
snapshotName, distributionColumnOverrides);
/* Used for testing */
ConflictOnlyWithIsolationTesting();
ConflictWithIsolationTestingAfterCopy();
ereport(LOG, (errmsg(
"creating auxillary structures (indexes, stats, replicaindentities, triggers) for %s",

View File

@ -1289,7 +1289,9 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_COPYING_DATA);
ConflictWithIsolationTestingBeforeCopy();
CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL);
ConflictWithIsolationTestingAfterCopy();
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,

View File

@ -236,7 +236,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
logicalRepTargetList);
/* only useful for isolation testing, see the function comment for the details */
ConflictOnlyWithIsolationTesting();
ConflictWithIsolationTestingBeforeCopy();
/*
* We have to create the primary key (or any other replica identity)
@ -427,7 +427,7 @@ CompleteNonBlockingShardTransfer(List *shardList,
/* only useful for isolation testing, see the function comment for the details */
ConflictOnlyWithIsolationTesting();
ConflictWithIsolationTestingAfterCopy();
/*
* We're almost done, we'll block the writes to the shards that we're
@ -1269,18 +1269,16 @@ CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList)
/*
* ConflictOnlyWithIsolationTesting is only useful for testing and should
* not be called by any code-path except for LogicallyReplicateShards().
*
* Since logically replicating shards does eventually block modifications,
* it becomes tricky to use isolation tester to show concurrent behaviour
* of online shard rebalancing and modification queries.
* ConflictWithIsolationTestingBeforeCopy is only useful to test
* get_rebalance_progress by pausing before doing the actual copy. This way we
* can see the state of the tables at that point. This should not be called by
* any code-path except for code paths to move and split shards().
*
* Note that since the cost of calling this function is pretty low, we prefer
* to use it in non-assert builds as well not to diverge in the behaviour.
*/
extern void
ConflictOnlyWithIsolationTesting()
ConflictWithIsolationTestingBeforeCopy(void)
{
LOCKTAG tag;
const bool sessionLock = false;
@ -1288,11 +1286,46 @@ ConflictOnlyWithIsolationTesting()
if (RunningUnderIsolationTest)
{
/* we've picked random keys */
SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, SHARD_MOVE_ADVISORY_LOCK_FIRST_KEY,
SET_LOCKTAG_ADVISORY(tag, MyDatabaseId,
SHARD_MOVE_ADVISORY_LOCK_SECOND_KEY,
SHARD_MOVE_ADVISORY_LOCK_FIRST_KEY, 2);
/* uses sharelock so concurrent moves don't conflict with eachother */
(void) LockAcquire(&tag, ShareLock, sessionLock, dontWait);
}
}
/*
* ConflictWithIsolationTestingAfterCopy is only useful for two types of tests.
* 1. Testing the output of get_rebalance_progress after the copy is completed,
* but before the move is completely finished. Because finishing the move
* will clear the contents of get_rebalance_progress.
* 2. To test that our non-blocking shard moves/splits actually don't block
* writes. Since logically replicating shards does eventually block
* modifications, it becomes tricky to use isolation tester to show
* concurrent behaviour of online shard rebalancing and modification
* queries. So, during logical replication we call this function at
* the end of the catchup, right before blocking writes.
*
* Note that since the cost of calling this function is pretty low, we prefer
* to use it in non-assert builds as well not to diverge in the behaviour.
*/
extern void
ConflictWithIsolationTestingAfterCopy(void)
{
LOCKTAG tag;
const bool sessionLock = false;
const bool dontWait = false;
if (RunningUnderIsolationTest)
{
SET_LOCKTAG_ADVISORY(tag, MyDatabaseId,
SHARD_MOVE_ADVISORY_LOCK_FIRST_KEY,
SHARD_MOVE_ADVISORY_LOCK_SECOND_KEY, 2);
(void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait);
/* uses sharelock so concurrent moves don't conflict with eachother */
(void) LockAcquire(&tag, ShareLock, sessionLock, dontWait);
}
}

View File

@ -131,7 +131,8 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName,
int sourceNodePort, char *targetNodeName,
int targetNodePort);
extern void ConflictOnlyWithIsolationTesting(void);
extern void ConflictWithIsolationTestingBeforeCopy(void);
extern void ConflictWithIsolationTestingAfterCopy(void);
extern void CreateReplicaIdentities(List *subscriptionInfoList);
extern void CreateReplicaIdentitiesOnNode(List *shardList,
char *nodeName,

View File

@ -1,13 +1,5 @@
setup
{
-- We disable deffered drop, so we can easily trigger blocking the shard
-- move at the end of the move. This is done in a separate setup step,
-- because this cannot run in a transaction.
ALTER SYSTEM SET citus.defer_drop_after_shard_move TO OFF;
}
setup
{
SELECT pg_reload_conf();
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1500001;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
@ -30,7 +22,6 @@ setup
teardown
{
SELECT pg_reload_conf();
DROP TABLE colocated2;
DROP TABLE colocated1;
DROP TABLE separate;
@ -40,62 +31,37 @@ session "s1"
step "s1-rebalance-c1-block-writes"
{
BEGIN;
SELECT * FROM get_rebalance_table_shards_plan('colocated1');
SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes');
}
step "s1-rebalance-c1-online"
{
BEGIN;
SELECT * FROM get_rebalance_table_shards_plan('colocated1');
SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='force_logical');
}
step "s1-shard-move-c1-block-writes"
{
BEGIN;
SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes');
}
step "s1-shard-copy-c1-block-writes"
{
BEGIN;
UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2');
SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='block_writes');
}
step "s1-shard-move-c1-online"
{
BEGIN;
SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical');
}
step "s1-shard-copy-c1-online"
{
BEGIN;
UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2');
SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='force_logical');
}
step "s1-commit"
{
COMMIT;
}
session "s2"
step "s2-lock-1-start"
{
BEGIN;
DELETE FROM colocated1 WHERE test_id = 1;
DELETE FROM separate WHERE test_id = 1;
}
step "s2-unlock-1-start"
{
ROLLBACK;
}
step "s1-wait" {}
session "s3"
@ -114,21 +80,33 @@ session "s4"
step "s4-shard-move-sep-block-writes"
{
BEGIN;
SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes');
}
step "s4-commit"
step "s4-wait" {}
session "s5"
// this advisory lock with (almost) random values are only used
// is taken before any data is copied. For details check the source
// code of ConflictWithIsolationTestingBeforeCopy
step "s5-acquire-advisory-lock-before-copy"
{
COMMIT;
SELECT pg_advisory_lock(55152, 44000);
}
step "s5-release-advisory-lock"
{
SELECT pg_advisory_unlock(55152, 44000);
}
session "s6"
// this advisory lock with (almost) random values are only used
// for testing purposes. For details, check Citus' logical replication
// source code
step "s6-acquire-advisory-lock"
// is taken after all data is copied. For details check the source
// code of ConflictWithIsolationTestingAfterCopy
step "s6-acquire-advisory-lock-after-copy"
{
SELECT pg_advisory_lock(44000, 55152);
}
@ -141,22 +119,6 @@ step "s6-release-advisory-lock"
session "s7"
// get_rebalance_progress internally calls pg_total_relation_size on all the
// shards. This means that it takes AccessShareLock on those shards. Because we
// run with deferred drop that means that get_rebalance_progress actually waits
// for the shard move to complete the drop. But we want to get the progress
// before the shards are dropped. So we grab the locks first with a simple
// query that reads from all shards. We force using a single connection because
// get_rebalance_progress isn't smart enough to reuse the right connection for
// the right shards and will simply use a single one for all of them.
step "s7-grab-lock"
{
BEGIN;
SET LOCAL citus.max_adaptive_executor_pool_size = 1;
SELECT 1 FROM colocated1 LIMIT 1;
SELECT 1 FROM separate LIMIT 1;
}
step "s7-get-progress"
{
set LOCAL client_min_messages=NOTICE;
@ -180,46 +142,57 @@ step "s7-get-progress"
FROM get_rebalance_progress();
}
step "s7-release-lock"
// When getting progress from multiple monitors at the same time it can result
// in random order of the tuples, because there's no defined order of the
// monitors. So in those cases we need to order the output for consistent results.
step "s7-get-progress-ordered"
{
COMMIT;
set LOCAL client_min_messages=NOTICE;
WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000))
SELECT
table_name,
shardid,
( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size,
sourcename,
sourceport,
( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size,
targetname,
targetport,
( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size,
progress,
operation_type,
source_lsn >= target_lsn as lsn_sanity_check,
source_lsn > '0/0' as source_lsn_available,
target_lsn > '0/0' as target_lsn_available
FROM get_rebalance_progress()
ORDER BY 1, 2, 3, 4, 5;
}
session "s8"
// After running these tests we want to enable deferred-drop again. Sadly
// the isolation tester framework does not support multiple teardown steps
// and this cannot be run in a transaction. So we need to do it manually at
// the end of the last test.
step "enable-deferred-drop"
{
ALTER SYSTEM RESET citus.defer_drop_after_shard_move;
}
// blocking rebalancer does what it should
permutation "s2-lock-1-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop"
permutation "s3-lock-2-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s3-unlock-2-start" "s1-commit" "s7-get-progress" "enable-deferred-drop"
permutation "s7-grab-lock" "s1-rebalance-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop"
permutation "s5-acquire-advisory-lock-before-copy" "s1-rebalance-c1-block-writes" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress"
permutation "s3-lock-2-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s3-unlock-2-start" "s1-wait" "s7-get-progress"
permutation "s6-acquire-advisory-lock-after-copy" "s1-rebalance-c1-block-writes" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress"
// online rebalancer
permutation "s6-acquire-advisory-lock" "s1-rebalance-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop"
// Commented out due to flakyness
// permutation "s7-grab-lock" "s1-rebalance-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop"
permutation "s5-acquire-advisory-lock-before-copy" "s1-rebalance-c1-online" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress"
permutation "s6-acquire-advisory-lock-after-copy" "s1-rebalance-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress"
// blocking shard move
permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop"
permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop"
permutation "s5-acquire-advisory-lock-before-copy" "s1-shard-move-c1-block-writes" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress"
permutation "s6-acquire-advisory-lock-after-copy" "s1-shard-move-c1-block-writes" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress"
// blocking shard copy
permutation "s2-lock-1-start" "s1-shard-copy-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit"
permutation "s5-acquire-advisory-lock-before-copy" "s1-shard-copy-c1-block-writes" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress"
permutation "s6-acquire-advisory-lock-after-copy" "s1-shard-copy-c1-block-writes" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress"
// online shard move
permutation "s6-acquire-advisory-lock" "s1-shard-move-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop"
// Commented out due to flakyness
// permutation "s7-grab-lock" "s1-shard-move-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop"
permutation "s5-acquire-advisory-lock-before-copy" "s1-shard-move-c1-online" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress"
permutation "s6-acquire-advisory-lock-after-copy" "s1-shard-move-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress"
// online shard copy
permutation "s6-acquire-advisory-lock" "s1-shard-copy-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit"
permutation "s5-acquire-advisory-lock-before-copy" "s1-shard-copy-c1-online" "s7-get-progress" "s5-release-advisory-lock" "s1-wait" "s7-get-progress"
permutation "s6-acquire-advisory-lock-after-copy" "s1-shard-copy-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-wait" "s7-get-progress"
// parallel blocking shard move
permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop"
permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress" "s7-release-lock" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop"
permutation "s5-acquire-advisory-lock-before-copy" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress-ordered" "s5-release-advisory-lock" "s1-wait" "s4-wait" "s7-get-progress-ordered"
permutation "s6-acquire-advisory-lock-after-copy" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress-ordered" "s6-release-advisory-lock" "s1-wait" "s4-wait" "s7-get-progress-ordered"