diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 84ee97cb6..456e95eff 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -615,7 +615,7 @@ BlockingShardSplit(SplitOperation splitOperation, snapshotName, distributionColumnOverrides); /* Used for testing */ - ConflictOnlyWithIsolationTesting(); + ConflictOnlyWithIsolationTesting(false); ereport(LOG, (errmsg( "creating auxillary structures (indexes, stats, replicaindentities, triggers) for %s", diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index cdb61731f..9e4f44d55 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -1258,6 +1258,7 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, } CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); + ConflictOnlyWithIsolationTesting(false); foreach_ptr(shardInterval, shardIntervalList) { diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 850daca24..a20d7d4e0 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -236,7 +236,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo logicalRepTargetList); /* only useful for isolation testing, see the function comment for the details */ - ConflictOnlyWithIsolationTesting(); + ConflictOnlyWithIsolationTesting(true); /* * We have to create the primary key (or any other replica identity) @@ -402,7 +402,7 @@ CompleteNonBlockingShardTransfer(List *shardList, /* only useful for isolation testing, see the function comment for the details */ - ConflictOnlyWithIsolationTesting(); + ConflictOnlyWithIsolationTesting(false); /* * We're almost done, we'll block the writes to the shards that we're @@ -1241,9 +1241,13 @@ CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList) * * Note that since the cost of calling this function is pretty low, we prefer * to use it in non-assert builds as well not to diverge in the behaviour. + * + * Depending on its argument it takes a different lock. It can take either a + * lock that is taken before the data copy is started, or it takes a lock + * during logical replication. */ extern void -ConflictOnlyWithIsolationTesting() +ConflictOnlyWithIsolationTesting(bool beforeCopy) { LOCKTAG tag; const bool sessionLock = false; @@ -1252,8 +1256,18 @@ ConflictOnlyWithIsolationTesting() if (RunningUnderIsolationTest) { /* we've picked random keys */ - SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, SHARD_MOVE_ADVISORY_LOCK_FIRST_KEY, - SHARD_MOVE_ADVISORY_LOCK_SECOND_KEY, 2); + if (beforeCopy) + { + SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, + SHARD_MOVE_ADVISORY_LOCK_SECOND_KEY, + SHARD_MOVE_ADVISORY_LOCK_FIRST_KEY, 2); + } + else + { + SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, + SHARD_MOVE_ADVISORY_LOCK_FIRST_KEY, + SHARD_MOVE_ADVISORY_LOCK_SECOND_KEY, 2); + } (void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); } diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index f042b4326..2cd787f10 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -131,7 +131,7 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, char *targetNodeName, int targetNodePort); -extern void ConflictOnlyWithIsolationTesting(void); +extern void ConflictOnlyWithIsolationTesting(bool beforeCopy); extern void CreateReplicaIdentities(List *subscriptionInfoList); extern void CreateReplicaIdentitiesOnNode(List *shardList, char *nodeName, diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index 69d0a9f91..1d5aa062c 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -1,6 +1,6 @@ Parsed test spec with 7 sessions -starting permutation: s2-lock-1-start s1-rebalance-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress enable-deferred-drop +starting permutation: s2-lock-1-start s1-rebalance-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -88,11 +88,8 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s3-lock-2-start s1-rebalance-c1-block-writes s7-get-progress s3-unlock-2-start s1-commit s7-get-progress enable-deferred-drop +starting permutation: s3-lock-2-start s1-rebalance-c1-block-writes s7-get-progress s3-unlock-2-start s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -129,8 +126,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 0|localhost | 57638| 50000| 2|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 0|localhost | 57638| 400000| 2|move |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 2|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 2|move |t |t |f colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 1|move |t |t |f colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 1|move |t |t |f (4 rows) @@ -179,30 +176,19 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s7-grab-lock s1-rebalance-c1-block-writes s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock s1-rebalance-c1-block-writes s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s7-grab-lock: - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; +step s6-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); -?column? +pg_advisory_lock --------------------------------------------------------------------- - 1 -(1 row) -?column? ---------------------------------------------------------------------- - 1 (1 row) step s1-rebalance-c1-block-writes: @@ -238,8 +224,13 @@ colocated1|1500002| 200000|localhost | 57637| 200000|localhost colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f (4 rows) -step s7-release-lock: - COMMIT; +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-rebalance-c1-block-writes: <... completed> table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport @@ -282,11 +273,105 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + +starting permutation: s5-acquire-advisory-lock s1-rebalance-c1-online s7-get-progress s5-release-advisory-lock s1-commit s7-get-progress +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s5-acquire-advisory-lock: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-rebalance-c1-online: + BEGIN; + SELECT * FROM get_rebalance_table_shards_plan('colocated1'); + SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f +colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f +colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f +(4 rows) + +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-rebalance-c1-online: <... completed> +table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport +--------------------------------------------------------------------- +colocated1|1500001| 0|localhost | 57637|localhost | 57638 +colocated2|1500005| 0|localhost | 57637|localhost | 57638 +colocated1|1500002| 0|localhost | 57637|localhost | 57638 +colocated2|1500006| 0|localhost | 57637|localhost | 57638 +(4 rows) + +rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +(0 rows) -starting permutation: s6-acquire-advisory-lock s1-rebalance-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock s1-rebalance-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -327,8 +412,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |t +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |t colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f (4 rows) @@ -382,11 +467,8 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress enable-deferred-drop +starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -463,30 +545,19 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s7-grab-lock s1-shard-move-c1-block-writes s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock s1-shard-move-c1-block-writes s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s7-grab-lock: - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; +step s6-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); -?column? +pg_advisory_lock --------------------------------------------------------------------- - 1 -(1 row) -?column? ---------------------------------------------------------------------- - 1 (1 row) step s1-shard-move-c1-block-writes: @@ -519,8 +590,13 @@ colocated1|1500001| 50000|localhost | 57637| 50000|localhost colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f (2 rows) -step s7-release-lock: - COMMIT; +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-shard-move-c1-block-writes: <... completed> citus_move_shard_placement @@ -555,9 +631,6 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - starting permutation: s2-lock-1-start s1-shard-copy-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit master_set_node_property @@ -614,7 +687,180 @@ step s1-commit: COMMIT; -starting permutation: s6-acquire-advisory-lock s1-shard-move-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock s1-shard-copy-c1-block-writes s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s6-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-shard-copy-c1-block-writes: + BEGIN; + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='block_writes'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|copy |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|copy |t |t |f +(2 rows) + +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-copy-c1-block-writes: <... completed> +citus_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s5-acquire-advisory-lock s1-shard-move-c1-online s7-get-progress s5-release-advisory-lock s1-commit s7-get-progress +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s5-acquire-advisory-lock: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-shard-move-c1-online: + BEGIN; + SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f +(2 rows) + +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-move-c1-online: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s6-acquire-advisory-lock s1-shard-move-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -654,8 +900,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |t +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |t (2 rows) step s6-release-advisory-lock: @@ -699,11 +945,71 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + +starting permutation: s5-acquire-advisory-lock s1-shard-copy-c1-online s7-get-progress s5-release-advisory-lock s1-commit +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s5-acquire-advisory-lock: + SELECT pg_advisory_lock(55152, 44000); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-shard-copy-c1-online: + BEGIN; + UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid IN ('colocated1', 'colocated2'); + SELECT citus_copy_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy |t |t |f +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f +(2 rows) + +step s5-release-advisory-lock: + SELECT pg_advisory_unlock(55152, 44000); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-copy-c1-online: <... completed> +citus_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; -starting permutation: s6-acquire-advisory-lock s1-shard-copy-c1-online s7-get-progress s6-release-advisory-lock s1-commit +starting permutation: s6-acquire-advisory-lock s1-shard-copy-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -744,8 +1050,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|copy |t |t |t +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|copy |t |t |t (2 rows) step s6-release-advisory-lock: @@ -765,8 +1071,32 @@ citus_copy_shard_placement step s1-commit: COMMIT; +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + SELECT + table_name, + shardid, + ( SELECT size FROM possible_sizes WHERE ABS(size - shard_size) = (SELECT MIN(ABS(size - shard_size)) FROM possible_sizes )) shard_size, + sourcename, + sourceport, + ( SELECT size FROM possible_sizes WHERE ABS(size - source_shard_size) = (SELECT MIN(ABS(size - source_shard_size)) FROM possible_sizes )) source_shard_size, + targetname, + targetport, + ( SELECT size FROM possible_sizes WHERE ABS(size - target_shard_size) = (SELECT MIN(ABS(size - target_shard_size)) FROM possible_sizes )) target_shard_size, + progress, + operation_type, + source_lsn >= target_lsn as lsn_sanity_check, + source_lsn > '0/0' as source_lsn_available, + target_lsn > '0/0' as target_lsn_available + FROM get_rebalance_progress(); -starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s2-unlock-1-start s1-commit s4-commit s7-get-progress enable-deferred-drop +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes-without-advisory-locks s7-get-progress s2-unlock-1-start s1-commit s4-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- @@ -781,8 +1111,9 @@ step s1-shard-move-c1-block-writes: BEGIN; SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); -step s4-shard-move-sep-block-writes: +step s4-shard-move-sep-block-writes-without-advisory-locks: BEGIN; + SET LOCAL citus.running_under_isolation_test = false; SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); step s7-get-progress: @@ -821,7 +1152,7 @@ citus_move_shard_placement (1 row) -step s4-shard-move-sep-block-writes: <... completed> +step s4-shard-move-sep-block-writes-without-advisory-locks: <... completed> citus_move_shard_placement --------------------------------------------------------------------- @@ -857,30 +1188,19 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - -starting permutation: s7-grab-lock s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s7-release-lock s1-commit s4-commit s7-get-progress enable-deferred-drop +starting permutation: s6-acquire-advisory-lock s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s6-release-advisory-lock s1-commit s4-commit s7-get-progress master_set_node_property --------------------------------------------------------------------- (1 row) -step s7-grab-lock: - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; +step s6-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); -?column? +pg_advisory_lock --------------------------------------------------------------------- - 1 -(1 row) -?column? ---------------------------------------------------------------------- - 1 (1 row) step s1-shard-move-c1-block-writes: @@ -918,8 +1238,13 @@ colocated2|1500005| 400000|localhost | 57637| 400000|localhost separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 200000| 1|move |t |t |f (3 rows) -step s7-release-lock: - COMMIT; +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) step s1-shard-move-c1-block-writes: <... completed> citus_move_shard_placement @@ -927,15 +1252,15 @@ citus_move_shard_placement (1 row) +step s1-commit: + COMMIT; + step s4-shard-move-sep-block-writes: <... completed> citus_move_shard_placement --------------------------------------------------------------------- (1 row) -step s1-commit: - COMMIT; - step s4-commit: COMMIT; @@ -963,6 +1288,3 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) -step enable-deferred-drop: - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; - diff --git a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec index b9d10d047..678e3081f 100644 --- a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec +++ b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec @@ -1,13 +1,9 @@ setup { - -- We disable deffered drop, so we can easily trigger blocking the shard - -- move at the end of the move. This is done in a separate setup step, - -- because this cannot run in a transaction. - ALTER SYSTEM SET citus.defer_drop_after_shard_move TO OFF; + CALL citus_cleanup_orphaned_shards(); } setup { - SELECT pg_reload_conf(); ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1500001; SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; @@ -30,7 +26,6 @@ setup teardown { - SELECT pg_reload_conf(); DROP TABLE colocated2; DROP TABLE colocated1; DROP TABLE separate; @@ -118,11 +113,38 @@ step "s4-shard-move-sep-block-writes" SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); } +// Running two shard moves at the same time can cause racy behaviour over who +// gets the lock. For the test where this move is used in we don't rely on the +// advisory locks. So we disable taking the advisory lock there to avoid the +// racy lock acquisition with the other concurretn move. +step "s4-shard-move-sep-block-writes-without-advisory-locks" +{ + BEGIN; + SET LOCAL citus.running_under_isolation_test = false; + SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); +} + step "s4-commit" { COMMIT; } +session "s5" + +// this advisory lock with (almost) random values are only used +// for testing purposes. For details, check Citus' logical replication +// source code +step "s5-acquire-advisory-lock" +{ + SELECT pg_advisory_lock(55152, 44000); +} + +step "s5-release-advisory-lock" +{ + SELECT pg_advisory_unlock(55152, 44000); +} + + session "s6" // this advisory lock with (almost) random values are only used @@ -141,22 +163,6 @@ step "s6-release-advisory-lock" session "s7" -// get_rebalance_progress internally calls pg_total_relation_size on all the -// shards. This means that it takes AccessShareLock on those shards. Because we -// run with deferred drop that means that get_rebalance_progress actually waits -// for the shard move to complete the drop. But we want to get the progress -// before the shards are dropped. So we grab the locks first with a simple -// query that reads from all shards. We force using a single connection because -// get_rebalance_progress isn't smart enough to reuse the right connection for -// the right shards and will simply use a single one for all of them. -step "s7-grab-lock" -{ - BEGIN; - SET LOCAL citus.max_adaptive_executor_pool_size = 1; - SELECT 1 FROM colocated1 LIMIT 1; - SELECT 1 FROM separate LIMIT 1; -} - step "s7-get-progress" { set LOCAL client_min_messages=NOTICE; @@ -179,46 +185,31 @@ step "s7-get-progress" FROM get_rebalance_progress(); } -step "s7-release-lock" -{ - COMMIT; -} - -session "s8" - -// After running these tests we want to enable deferred-drop again. Sadly -// the isolation tester framework does not support multiple teardown steps -// and this cannot be run in a transaction. So we need to do it manually at -// the end of the last test. -step "enable-deferred-drop" -{ - ALTER SYSTEM RESET citus.defer_drop_after_shard_move; -} // blocking rebalancer does what it should -permutation "s2-lock-1-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s3-lock-2-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s3-unlock-2-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s7-grab-lock" "s1-rebalance-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s2-lock-1-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" +permutation "s3-lock-2-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s3-unlock-2-start" "s1-commit" "s7-get-progress" +permutation "s6-acquire-advisory-lock" "s1-rebalance-c1-block-writes" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" // online rebalancer -permutation "s6-acquire-advisory-lock" "s1-rebalance-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" -// Commented out due to flakyness -// permutation "s7-grab-lock" "s1-rebalance-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s5-acquire-advisory-lock" "s1-rebalance-c1-online" "s7-get-progress" "s5-release-advisory-lock" "s1-commit" "s7-get-progress" +permutation "s6-acquire-advisory-lock" "s1-rebalance-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" // blocking shard move -permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" +permutation "s6-acquire-advisory-lock" "s1-shard-move-c1-block-writes" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" // blocking shard copy permutation "s2-lock-1-start" "s1-shard-copy-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" +permutation "s6-acquire-advisory-lock" "s1-shard-copy-c1-block-writes" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" // online shard move -permutation "s6-acquire-advisory-lock" "s1-shard-move-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" -// Commented out due to flakyness -// permutation "s7-grab-lock" "s1-shard-move-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s5-acquire-advisory-lock" "s1-shard-move-c1-online" "s7-get-progress" "s5-release-advisory-lock" "s1-commit" "s7-get-progress" +permutation "s6-acquire-advisory-lock" "s1-shard-move-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" // online shard copy -permutation "s6-acquire-advisory-lock" "s1-shard-copy-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" +permutation "s5-acquire-advisory-lock" "s1-shard-copy-c1-online" "s7-get-progress" "s5-release-advisory-lock" "s1-commit" +permutation "s6-acquire-advisory-lock" "s1-shard-copy-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" // parallel blocking shard move -permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop" -permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress" "s7-release-lock" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes-without-advisory-locks"("s1-shard-move-c1-block-writes") "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s4-commit" "s7-get-progress" +permutation "s6-acquire-advisory-lock" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes"("s1-shard-move-c1-block-writes") "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s4-commit" "s7-get-progress"