diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 1903e2a85..a4457d691 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -36,6 +36,7 @@ #include "distributed/multi_join_order.h" #include "distributed/multi_logical_replication.h" #include "distributed/multi_partitioning_utils.h" +#include "distributed/multi_progress.h" #include "distributed/reference_table_utils.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" @@ -396,6 +397,17 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) EnsureEnoughDiskSpaceForShardMove(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort); + + WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); + WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); + + PlacementUpdateEvent *placementUpdateEvent = palloc0(sizeof(PlacementUpdateEvent)); + placementUpdateEvent->updateType = PLACEMENT_UPDATE_MOVE; + placementUpdateEvent->shardId = shardId; + placementUpdateEvent->sourceNode = sourceNode; + placementUpdateEvent->targetNode = targetNode; + SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId); + /* * At this point of the shard moves, we don't need to block the writes to * shards when logical replication is used. @@ -463,6 +475,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) sourceNodePort, targetNodeName, targetNodePort); + FinalizeCurrentProgressMonitor(); PG_RETURN_VOID(); } diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 043dd03cb..ad7cccb0e 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -201,8 +201,6 @@ static int PlacementsHashCompare(const void *lhsKey, const void *rhsKey, Size ke static uint32 PlacementsHashHashCode(const void *key, Size keySize); static bool WorkerNodeListContains(List *workerNodeList, const char *workerName, uint32 workerPort); -static void UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName, - int sourcePort, uint64 progress); static bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode); static NodeFillState * FindFillStateForPlacement(RebalanceState *state, ShardPlacement *placement); @@ -235,7 +233,6 @@ static Form_pg_dist_rebalance_strategy GetRebalanceStrategy(Name name); static void EnsureShardCostUDF(Oid functionOid); static void EnsureNodeCapacityUDF(Oid functionOid); static void EnsureShardAllowedOnNodeUDF(Oid functionOid); -static void ConflictShardPlacementUpdateOnlyWithIsolationTesting(uint64 shardId); static HTAB * BuildWorkerShardStatisticsHash(PlacementUpdateEventProgress *steps, int stepCount); static HTAB * GetShardStatistics(MultiConnection *connection, HTAB *shardIds); @@ -261,15 +258,6 @@ PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check); bool RunningUnderIsolationTest = false; int MaxRebalancerLoggedIgnoredMoves = 5; -/* - * This is randomly generated hardcoded number. It's used as the first part of - * the advisory lock identifier that's used during isolation tests. See the - * comments on ConflictShardPlacementUpdateOnlyWithIsolationTesting, for more - * information. - */ -#define SHARD_PLACEMENT_UPDATE_ADVISORY_LOCK_FIRST_KEY 29279 - - #ifdef USE_ASSERT_CHECKING /* @@ -774,7 +762,7 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, * a magic number to the first progress field as an indicator. Finally we return the * dsm handle so that it can be used for updating the progress and cleaning things up. */ -static void +void SetupRebalanceMonitor(List *placementUpdateList, Oid relationId) { List *colocatedUpdateList = GetColocatedRebalanceSteps(placementUpdateList); @@ -799,7 +787,7 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId) event->shardId = colocatedUpdate->shardId; event->sourcePort = colocatedUpdate->sourceNode->workerPort; event->targetPort = colocatedUpdate->targetNode->workerPort; - pg_atomic_init_u64(&event->progress, REBALANCE_PROGRESS_WAITING); + pg_atomic_init_u64(&event->progress, REBALANCE_PROGRESS_MOVING); eventIndex++; } @@ -1198,63 +1186,34 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics) PlacementUpdateEventProgress *step = placementUpdateEvents + eventIndex; uint64 shardId = step->shardId; - uint64 shardSize = 0; - uint64 backupShardSize = 0; - uint64 progress = pg_atomic_read_u64(&step->progress); - uint64 sourceSize = WorkerShardSize(shardStatistics, step->sourceName, - step->sourcePort, shardId); - uint64 targetSize = WorkerShardSize(shardStatistics, step->targetName, - step->targetPort, shardId); - - if (progress == REBALANCE_PROGRESS_WAITING || - progress == REBALANCE_PROGRESS_MOVING) - { - /* - * If we are not done with the move, the correct shard size is the - * size on the source. - */ - shardSize = sourceSize; - backupShardSize = targetSize; - } - else if (progress == REBALANCE_PROGRESS_MOVED) - { - /* - * If we are done with the move, the correct shard size is the size - * on the target - */ - shardSize = targetSize; - backupShardSize = sourceSize; - } + uint64 shardSize = WorkerShardSize(shardStatistics, step->sourceName, + step->sourcePort, shardId); if (shardSize == 0) { - if (backupShardSize == 0) + /* + * It's possible that we are reading the sizes after the move has + * already fininshed. This means that the shards on the source + * might have already been deleted. In that case we instead report + * the size on the target as the shard size, since that is now the + * only existing shard. + */ + shardSize = WorkerShardSize(shardStatistics, step->targetName, + step->targetPort, shardId); + if (shardSize == 0) { /* * We don't have any useful shard size. This can happen when a * shard is moved multiple times and it is not present on * either of these nodes. Probably the shard is on a worker - * related to another event. In the weird case that this shard + * related to the next move. In the weird case that this shard * is on the nodes and actually is size 0, we will have no * entry in the hashmap. When fetching from it we always * default to 0 if no entry is found, so that's fine. */ continue; } - - /* - * Because of the way we fetch shard sizes they are from a slightly - * earlier moment than the progress state we just read from shared - * memory. Usually this is no problem, but there exist some race - * conditions where this matters. For example, for very quick moves - * it is possible that even though a step is now reported as MOVED, - * when we read the shard sizes the move had not even started yet. - * This in turn can mean that the target size is 0 while the source - * size is not. We try to handle such rare edge cases by falling - * back on the other shard size if that one is not 0. - */ - shardSize = backupShardSize; } @@ -1468,15 +1427,6 @@ GetMovedShardIdsByWorker(PlacementUpdateEventProgress *steps, int stepCount, AddToWorkerShardIdSet(shardsByWorker, step->sourceName, step->sourcePort, step->shardId); - if (pg_atomic_read_u64(&step->progress) == REBALANCE_PROGRESS_WAITING) - { - /* - * shard move has not started so we don't need target stats for - * this shard - */ - continue; - } - AddToWorkerShardIdSet(shardsByWorker, step->targetName, step->targetPort, step->shardId); } @@ -1609,48 +1559,11 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) * This uses the first relationId from the list, it's only used for display * purposes so it does not really matter which to show */ - SetupRebalanceMonitor(placementUpdateList, linitial_oid(options->relationIdList)); ExecutePlacementUpdates(placementUpdateList, shardReplicationModeOid, "Moving"); FinalizeCurrentProgressMonitor(); } -/* - * ConflictShardPlacementUpdateOnlyWithIsolationTesting is only useful for - * testing and should not be called by any code-path except for - * UpdateShardPlacement(). - * - * To be able to test the rebalance monitor functionality correctly, we need to - * be able to pause the rebalancer at a specific place in time. We cannot do - * this by block the shard move itself someway (e.g. by calling truncate on the - * distributed table). The reason for this is that we do the shard move in a - * newly opened connection. This causes our isolation tester block detection to - * not realise that the rebalance_table_shards call is blocked. - * - * So instead, before opening a connection we lock an advisory lock that's - * based on the shard id (shard id mod 1000). By locking this advisory lock in - * a different session we can block the rebalancer in a way that the isolation - * tester block detection is able to detect. - */ -static void -ConflictShardPlacementUpdateOnlyWithIsolationTesting(uint64 shardId) -{ - LOCKTAG tag; - const bool sessionLock = false; - const bool dontWait = false; - - if (RunningUnderIsolationTest) - { - /* we've picked a random lock */ - SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, - SHARD_PLACEMENT_UPDATE_ADVISORY_LOCK_FIRST_KEY, - shardId % 1000, 2); - - (void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); - } -} - - /* * UpdateShardPlacement copies or moves a shard placement by calling * the corresponding functions in Citus in a subtransaction. @@ -1722,23 +1635,11 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, errmsg("only moving or copying shards is supported"))); } - UpdateColocatedShardPlacementProgress(shardId, - sourceNode->workerName, - sourceNode->workerPort, - REBALANCE_PROGRESS_MOVING); - - ConflictShardPlacementUpdateOnlyWithIsolationTesting(shardId); - /* * In case of failure, we throw an error such that rebalance_table_shards * fails early. */ ExecuteRebalancerCommandInSeparateTransaction(placementUpdateCommand->data); - - UpdateColocatedShardPlacementProgress(shardId, - sourceNode->workerName, - sourceNode->workerPort, - REBALANCE_PROGRESS_MOVED); } @@ -2799,51 +2700,6 @@ WorkerNodeListContains(List *workerNodeList, const char *workerName, uint32 work } -/* - * UpdateColocatedShardPlacementProgress updates the progress of the given placement, - * along with its colocated placements, to the given state. - */ -static void -UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName, int sourcePort, - uint64 progress) -{ - ProgressMonitorData *header = GetCurrentProgressMonitor(); - - if (header != NULL) - { - PlacementUpdateEventProgress *steps = ProgressMonitorSteps(header); - ListCell *colocatedShardIntervalCell = NULL; - - ShardInterval *shardInterval = LoadShardInterval(shardId); - List *colocatedShardIntervalList = ColocatedShardIntervalList(shardInterval); - - for (int moveIndex = 0; moveIndex < header->stepCount; moveIndex++) - { - PlacementUpdateEventProgress *step = steps + moveIndex; - uint64 currentShardId = step->shardId; - bool colocatedShard = false; - - foreach(colocatedShardIntervalCell, colocatedShardIntervalList) - { - ShardInterval *candidateShard = lfirst(colocatedShardIntervalCell); - if (candidateShard->shardId == currentShardId) - { - colocatedShard = true; - break; - } - } - - if (colocatedShard && - strcmp(step->sourceName, sourceName) == 0 && - step->sourcePort == sourcePort) - { - pg_atomic_write_u64(&step->progress, progress); - } - } - } -} - - /* * pg_dist_rebalance_strategy_enterprise_check is a now removed function, but * to avoid issues during upgrades a C stub is kept. diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 0e0cf51d1..d21894ffc 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -73,10 +73,7 @@ /* *INDENT-ON* */ #define REBALANCE_ACTIVITY_MAGIC_NUMBER 1337 -#define REBALANCE_PROGRESS_ERROR -1 -#define REBALANCE_PROGRESS_WAITING 0 #define REBALANCE_PROGRESS_MOVING 1 -#define REBALANCE_PROGRESS_MOVED 2 /* Enumeration that defines different placement update types */ typedef enum @@ -196,5 +193,7 @@ extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlace extern void ExecuteRebalancerCommandInSeparateTransaction(char *command); extern void AcquirePlacementColocationLock(Oid relationId, int lockMode, const char *operationName); +extern void SetupRebalanceMonitor(List *placementUpdateList, Oid relationId); + #endif /* SHARD_REBALANCER_H */ diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index b03b7b99d..000542bca 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -1,34 +1,23 @@ -Parsed test spec with 3 sessions +Parsed test spec with 7 sessions -starting permutation: s2-lock-1 s2-lock-2 s1-rebalance-c1 s3-progress s2-unlock-1 s3-progress s2-unlock-2 s3-progress s1-commit s3-progress +starting permutation: s2-lock-1-start s1-rebalance-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress enable-deferred-drop master_set_node_property --------------------------------------------------------------------- (1 row) -step s2-lock-1: - SELECT pg_advisory_lock(29279, 1); +step s2-lock-1-start: + BEGIN; + DELETE FROM colocated1 WHERE test_id = 1; + DELETE FROM separate WHERE test_id = 1; -pg_advisory_lock ---------------------------------------------------------------------- - -(1 row) - -step s2-lock-2: - SELECT pg_advisory_lock(29279, 2); - -pg_advisory_lock ---------------------------------------------------------------------- - -(1 row) - -step s1-rebalance-c1: +step s1-rebalance-c1-block-writes: BEGIN; SELECT * FROM get_rebalance_table_shards_plan('colocated1'); SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); -step s3-progress: - set client_min_messages=NOTICE; +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; SELECT table_name, shardid, @@ -46,50 +35,12 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1 colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1 -colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0 -colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0 -(4 rows) +(2 rows) -step s2-unlock-1: - SELECT pg_advisory_unlock(29279, 1); +step s2-unlock-1-start: + ROLLBACK; -pg_advisory_unlock ---------------------------------------------------------------------- -t -(1 row) - -step s3-progress: - set client_min_messages=NOTICE; - SELECT - table_name, - shardid, - shard_size, - sourcename, - sourceport, - source_shard_size, - targetname, - targetport, - target_shard_size, - progress - FROM get_rebalance_progress(); - -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress ---------------------------------------------------------------------- -colocated1|1500001| 73728|localhost | 57637| 49152|localhost | 57638| 73728| 2 -colocated2|1500005| 401408|localhost | 57637| 376832|localhost | 57638| 401408| 2 -colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 1 -colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 1 -(4 rows) - -step s2-unlock-2: - SELECT pg_advisory_unlock(29279, 2); - -pg_advisory_unlock ---------------------------------------------------------------------- -t -(1 row) - -step s1-rebalance-c1: <... completed> +step s1-rebalance-c1-block-writes: <... completed> table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport --------------------------------------------------------------------- colocated1|1500001| 0|localhost | 57637|localhost | 57638 @@ -103,8 +54,11 @@ rebalance_table_shards (1 row) -step s3-progress: - set client_min_messages=NOTICE; +step s1-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; SELECT table_name, shardid, @@ -122,11 +76,68 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) +step enable-deferred-drop: + ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + + +starting permutation: s3-lock-2-start s1-rebalance-c1-block-writes s7-get-progress s3-unlock-2-start s1-commit s7-get-progress enable-deferred-drop +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s3-lock-2-start: + BEGIN; + DELETE FROM colocated1 WHERE test_id = 3; + +step s1-rebalance-c1-block-writes: + BEGIN; + SELECT * FROM get_rebalance_table_shards_plan('colocated1'); + SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 1 +colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 1 +(2 rows) + +step s3-unlock-2-start: + ROLLBACK; + +step s1-rebalance-c1-block-writes: <... completed> +table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport +--------------------------------------------------------------------- +colocated1|1500001| 0|localhost | 57637|localhost | 57638 +colocated2|1500005| 0|localhost | 57637|localhost | 57638 +colocated1|1500002| 0|localhost | 57637|localhost | 57638 +colocated2|1500006| 0|localhost | 57637|localhost | 57638 +(4 rows) + +rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + step s1-commit: COMMIT; -step s3-progress: - set client_min_messages=NOTICE; +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; SELECT table_name, shardid, @@ -144,3 +155,762 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- (0 rows) +step enable-deferred-drop: + ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + + +starting permutation: s7-grab-lock s1-rebalance-c1-block-writes s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s7-grab-lock: + BEGIN; + SET LOCAL citus.max_adaptive_executor_pool_size = 1; + SELECT 1 FROM colocated1 LIMIT 1; + SELECT 1 FROM separate LIMIT 1; + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + +step s1-rebalance-c1-block-writes: + BEGIN; + SELECT * FROM get_rebalance_table_shards_plan('colocated1'); + SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 +(2 rows) + +step s7-release-lock: + COMMIT; + +step s1-rebalance-c1-block-writes: <... completed> +table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport +--------------------------------------------------------------------- +colocated1|1500001| 0|localhost | 57637|localhost | 57638 +colocated2|1500005| 0|localhost | 57637|localhost | 57638 +colocated1|1500002| 0|localhost | 57637|localhost | 57638 +colocated2|1500006| 0|localhost | 57637|localhost | 57638 +(4 rows) + +rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +(0 rows) + +step enable-deferred-drop: + ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + + +starting permutation: s6-acquire-advisory-lock s1-rebalance-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress enable-deferred-drop +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s6-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-rebalance-c1-online: + BEGIN; + SELECT * FROM get_rebalance_table_shards_plan('colocated1'); + SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1 +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1 +(2 rows) + +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-rebalance-c1-online: <... completed> +table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport +--------------------------------------------------------------------- +colocated1|1500001| 0|localhost | 57637|localhost | 57638 +colocated2|1500005| 0|localhost | 57637|localhost | 57638 +colocated1|1500002| 0|localhost | 57637|localhost | 57638 +colocated2|1500006| 0|localhost | 57637|localhost | 57638 +(4 rows) + +rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +(0 rows) + +step enable-deferred-drop: + ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + + +starting permutation: s7-grab-lock s1-shard-move-c1-online s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s7-grab-lock: + BEGIN; + SET LOCAL citus.max_adaptive_executor_pool_size = 1; + SELECT 1 FROM colocated1 LIMIT 1; + SELECT 1 FROM separate LIMIT 1; + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + +step s1-shard-move-c1-online: + BEGIN; + SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 +(2 rows) + +step s7-release-lock: + COMMIT; + +step s1-shard-move-c1-online: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +(0 rows) + +step enable-deferred-drop: + ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + + +starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress enable-deferred-drop +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s2-lock-1-start: + BEGIN; + DELETE FROM colocated1 WHERE test_id = 1; + DELETE FROM separate WHERE test_id = 1; + +step s1-shard-move-c1-block-writes: + BEGIN; + SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1 +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1 +(2 rows) + +step s2-unlock-1-start: + ROLLBACK; + +step s1-shard-move-c1-block-writes: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +(0 rows) + +step enable-deferred-drop: + ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + + +starting permutation: s7-grab-lock s1-shard-move-c1-block-writes s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s7-grab-lock: + BEGIN; + SET LOCAL citus.max_adaptive_executor_pool_size = 1; + SELECT 1 FROM colocated1 LIMIT 1; + SELECT 1 FROM separate LIMIT 1; + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + +step s1-shard-move-c1-block-writes: + BEGIN; + SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 +(2 rows) + +step s7-release-lock: + COMMIT; + +step s1-shard-move-c1-block-writes: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +(0 rows) + +step enable-deferred-drop: + ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + + +starting permutation: s6-acquire-advisory-lock s1-shard-move-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress enable-deferred-drop +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s6-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-shard-move-c1-online: + BEGIN; + SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1 +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1 +(2 rows) + +step s6-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-shard-move-c1-online: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +(0 rows) + +step enable-deferred-drop: + ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + + +starting permutation: s7-grab-lock s1-shard-move-c1-online s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s7-grab-lock: + BEGIN; + SET LOCAL citus.max_adaptive_executor_pool_size = 1; + SELECT 1 FROM colocated1 LIMIT 1; + SELECT 1 FROM separate LIMIT 1; + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + +step s1-shard-move-c1-online: + BEGIN; + SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 +(2 rows) + +step s7-release-lock: + COMMIT; + +step s1-shard-move-c1-online: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +(0 rows) + +step enable-deferred-drop: + ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + + +starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s2-unlock-1-start s1-commit s4-commit s7-get-progress enable-deferred-drop +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s2-lock-1-start: + BEGIN; + DELETE FROM colocated1 WHERE test_id = 1; + DELETE FROM separate WHERE test_id = 1; + +step s1-shard-move-c1-block-writes: + BEGIN; + SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); + +step s4-shard-move-sep-block-writes: + BEGIN; + SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1 +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1 +separate |1500009| 122880|localhost | 57637| 122880|localhost | 57638| 0| 1 +(3 rows) + +step s2-unlock-1-start: + ROLLBACK; + +step s1-shard-move-c1-block-writes: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s4-shard-move-sep-block-writes: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + +step s4-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +(0 rows) + +step enable-deferred-drop: + ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + + +starting permutation: s7-grab-lock s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s7-release-lock s1-commit s4-commit s7-get-progress enable-deferred-drop +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +step s7-grab-lock: + BEGIN; + SET LOCAL citus.max_adaptive_executor_pool_size = 1; + SELECT 1 FROM colocated1 LIMIT 1; + SELECT 1 FROM separate LIMIT 1; + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + +step s1-shard-move-c1-block-writes: + BEGIN; + SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); + +step s4-shard-move-sep-block-writes: + BEGIN; + SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 +colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 +separate |1500009| 122880|localhost | 57637| 122880|localhost | 57638| 147456| 1 +(3 rows) + +step s7-release-lock: + COMMIT; + +step s1-shard-move-c1-block-writes: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s4-shard-move-sep-block-writes: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-commit: + COMMIT; + +step s4-commit: + COMMIT; + +step s7-get-progress: + set LOCAL client_min_messages=NOTICE; + SELECT + table_name, + shardid, + shard_size, + sourcename, + sourceport, + source_shard_size, + targetname, + targetport, + target_shard_size, + progress + FROM get_rebalance_progress(); + +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress +--------------------------------------------------------------------- +(0 rows) + +step enable-deferred-drop: + ALTER SYSTEM RESET citus.defer_drop_after_shard_move; + diff --git a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec index 9e7bcca25..c9bb3b641 100644 --- a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec +++ b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec @@ -1,6 +1,14 @@ setup { - select setval('pg_dist_shardid_seq', GREATEST(1500000, nextval('pg_dist_shardid_seq'))); + -- We disable deffered drop, so we can easily trigger blocking the shard + -- move at the end of the move. This is done in a separate setup step, + -- because this cannot run in a transaction. + ALTER SYSTEM SET citus.defer_drop_after_shard_move TO OFF; +} +setup +{ + SELECT pg_reload_conf(); + ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1500001; SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SELECT 1 FROM master_add_node('localhost', 57637); @@ -9,29 +17,53 @@ setup SELECT create_distributed_table('colocated1', 'test_id', 'hash', 'none'); CREATE TABLE colocated2 (test_id integer NOT NULL, data text); SELECT create_distributed_table('colocated2', 'test_id', 'hash', 'colocated1'); + CREATE TABLE separate (test_id integer NOT NULL, data text); + SELECT create_distributed_table('separate', 'test_id', 'hash', 'none'); -- 1 and 3 are chosen so they go to shard 1 and 2 INSERT INTO colocated1(test_id) SELECT 1 from generate_series(0, 1000) i; INSERT INTO colocated2(test_id) SELECT 1 from generate_series(0, 10000) i; INSERT INTO colocated1(test_id) SELECT 3 from generate_series(0, 5000) i; + INSERT INTO separate(test_id) SELECT 1 from generate_series(0, 3000) i; select * from pg_dist_placement; SELECT master_set_node_property('localhost', 57638, 'shouldhaveshards', true); } teardown { + SELECT pg_reload_conf(); DROP TABLE colocated2; DROP TABLE colocated1; + DROP TABLE separate; } session "s1" -step "s1-rebalance-c1" +step "s1-rebalance-c1-block-writes" { BEGIN; SELECT * FROM get_rebalance_table_shards_plan('colocated1'); SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes'); } +step "s1-rebalance-c1-online" +{ + BEGIN; + SELECT * FROM get_rebalance_table_shards_plan('colocated1'); + SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='force_logical'); +} + +step "s1-shard-move-c1-block-writes" +{ + BEGIN; + SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); +} + +step "s1-shard-move-c1-online" +{ + BEGIN; + SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); +} + step "s1-commit" { COMMIT; @@ -39,31 +71,81 @@ step "s1-commit" session "s2" -step "s2-lock-1" +step "s2-lock-1-start" { - SELECT pg_advisory_lock(29279, 1); + BEGIN; + DELETE FROM colocated1 WHERE test_id = 1; + DELETE FROM separate WHERE test_id = 1; } -step "s2-lock-2" +step "s2-unlock-1-start" { - SELECT pg_advisory_lock(29279, 2); -} - -step "s2-unlock-1" -{ - SELECT pg_advisory_unlock(29279, 1); -} - -step "s2-unlock-2" -{ - SELECT pg_advisory_unlock(29279, 2); + ROLLBACK; } session "s3" -step "s3-progress" +step "s3-lock-2-start" { - set client_min_messages=NOTICE; + BEGIN; + DELETE FROM colocated1 WHERE test_id = 3; +} + +step "s3-unlock-2-start" +{ + ROLLBACK; +} + +session "s4" + +step "s4-shard-move-sep-block-writes" +{ + BEGIN; + SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes'); +} + +step "s4-commit" +{ + COMMIT; +} + +session "s6" + +// this advisory lock with (almost) random values are only used +// for testing purposes. For details, check Citus' logical replication +// source code +step "s6-acquire-advisory-lock" +{ + SELECT pg_advisory_lock(44000, 55152); +} + +step "s6-release-advisory-lock" +{ + SELECT pg_advisory_unlock(44000, 55152); +} + + +session "s7" + +// get_rebalance_progress internally calls pg_total_relation_size on all the +// shards. This means that it takes AccessShareLock on those shards. Because we +// run with deferred drop that means that get_rebalance_progress actually waits +// for the shard move to complete the drop. But we want to get the progress +// before the shards are dropped. So we grab the locks first with a simple +// query that reads from all shards. We force using a single connection because +// get_rebalance_progress isn't smart enough to reuse the right connection for +// the right shards and will simply use a single one for all of them. +step "s7-grab-lock" +{ + BEGIN; + SET LOCAL citus.max_adaptive_executor_pool_size = 1; + SELECT 1 FROM colocated1 LIMIT 1; + SELECT 1 FROM separate LIMIT 1; +} + +step "s7-get-progress" +{ + set LOCAL client_min_messages=NOTICE; SELECT table_name, shardid, @@ -78,4 +160,39 @@ step "s3-progress" FROM get_rebalance_progress(); } -permutation "s2-lock-1" "s2-lock-2" "s1-rebalance-c1" "s3-progress" "s2-unlock-1" "s3-progress" "s2-unlock-2" "s3-progress" "s1-commit" "s3-progress" +step "s7-release-lock" +{ + COMMIT; +} + +session "s8" + +// After running these tests we want to enable deferred-drop again. Sadly +// the isolation tester framework does not support multiple teardown steps +// and this cannot be run in a transaction. So we need to do it manually at +// the end of the last test. +step "enable-deferred-drop" +{ + ALTER SYSTEM RESET citus.defer_drop_after_shard_move; +} +// blocking rebalancer does what it should +permutation "s2-lock-1-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s3-lock-2-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s3-unlock-2-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s7-grab-lock" "s1-rebalance-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" + +// online rebalancer +permutation "s6-acquire-advisory-lock" "s1-rebalance-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s7-grab-lock" "s1-shard-move-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" + +// blocking shard move +permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" + +// online shard move +permutation "s6-acquire-advisory-lock" "s1-shard-move-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s7-grab-lock" "s1-shard-move-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop" + + +// parallel blocking shard move +permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop" +permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop"