From 8bb082e77d8bf13ed92c5e1726e7a0e4321223e4 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Wed, 31 Aug 2022 13:55:47 +0200 Subject: [PATCH] Fix reporting of progress on waiting and moved shards (#6274) In commit 31faa88a4e I removed some features of the rebalance progress monitor. I did this because the plan was to remove the foreground shard rebalancer later in the PR that would add the background shard rebalancer. So, I didn't want to spend time fixing something that we would throw away anyway. As it turns out we're not removing the foreground shard rebalancer after all, so it made sens to fix the stuff that I broke. This PR does that. For the most part this commit reverts the changes in commit 31faa88a4e. It's not a full revert though, because it keeps the improved tests and the changes to `citus_move_shard_placement`. --- .../distributed/operations/repair_shards.c | 30 ++-- .../distributed/operations/shard_rebalancer.c | 130 ++++++++++++++++-- .../distributed/progress/multi_progress.c | 8 +- src/include/distributed/shard_rebalancer.h | 6 +- .../isolation_shard_rebalancer_progress.out | 16 ++- 5 files changed, 161 insertions(+), 29 deletions(-) diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 193797384..4388b86fd 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -21,6 +21,7 @@ #include "catalog/pg_class.h" #include "catalog/pg_enum.h" #include "distributed/adaptive_executor.h" +#include "distributed/backend_data.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" @@ -397,15 +398,28 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) targetNodeName, targetNodePort); - WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); - WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); + /* + * We want to be able to track progress of shard moves using + * get_rebalancer_progress. If this move is initiated by the rebalancer, + * then the rebalancer call has already set up the shared memory that is + * used to do that. But if citus_move_shard_placement is called directly by + * the user (or through any other mechanism), then the shared memory is not + * set up yet. In that case we do it here. + */ + if (!IsRebalancerInternalBackend()) + { + WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); + WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); - PlacementUpdateEvent *placementUpdateEvent = palloc0(sizeof(PlacementUpdateEvent)); - placementUpdateEvent->updateType = PLACEMENT_UPDATE_MOVE; - placementUpdateEvent->shardId = shardId; - placementUpdateEvent->sourceNode = sourceNode; - placementUpdateEvent->targetNode = targetNode; - SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId); + PlacementUpdateEvent *placementUpdateEvent = palloc0( + sizeof(PlacementUpdateEvent)); + placementUpdateEvent->updateType = PLACEMENT_UPDATE_MOVE; + placementUpdateEvent->shardId = shardId; + placementUpdateEvent->sourceNode = sourceNode; + placementUpdateEvent->targetNode = targetNode; + SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId, + REBALANCE_PROGRESS_MOVING); + } /* * At this point of the shard moves, we don't need to block the writes to diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index ad7cccb0e..c6079eb83 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -201,6 +201,8 @@ static int PlacementsHashCompare(const void *lhsKey, const void *rhsKey, Size ke static uint32 PlacementsHashHashCode(const void *key, Size keySize); static bool WorkerNodeListContains(List *workerNodeList, const char *workerName, uint32 workerPort); +static void UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName, + int sourcePort, uint64 progress); static bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode); static NodeFillState * FindFillStateForPlacement(RebalanceState *state, ShardPlacement *placement); @@ -258,6 +260,7 @@ PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check); bool RunningUnderIsolationTest = false; int MaxRebalancerLoggedIgnoredMoves = 5; + #ifdef USE_ASSERT_CHECKING /* @@ -763,7 +766,9 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, * dsm handle so that it can be used for updating the progress and cleaning things up. */ void -SetupRebalanceMonitor(List *placementUpdateList, Oid relationId) +SetupRebalanceMonitor(List *placementUpdateList, + Oid relationId, + uint64 initialProgressState) { List *colocatedUpdateList = GetColocatedRebalanceSteps(placementUpdateList); ListCell *colocatedUpdateCell = NULL; @@ -787,7 +792,7 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId) event->shardId = colocatedUpdate->shardId; event->sourcePort = colocatedUpdate->sourceNode->workerPort; event->targetPort = colocatedUpdate->targetNode->workerPort; - pg_atomic_init_u64(&event->progress, REBALANCE_PROGRESS_MOVING); + pg_atomic_init_u64(&event->progress, initialProgressState); eventIndex++; } @@ -1186,34 +1191,63 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics) PlacementUpdateEventProgress *step = placementUpdateEvents + eventIndex; uint64 shardId = step->shardId; + uint64 shardSize = 0; + uint64 backupShardSize = 0; + uint64 progress = pg_atomic_read_u64(&step->progress); - uint64 shardSize = WorkerShardSize(shardStatistics, step->sourceName, - step->sourcePort, shardId); + uint64 sourceSize = WorkerShardSize(shardStatistics, step->sourceName, + step->sourcePort, shardId); + uint64 targetSize = WorkerShardSize(shardStatistics, step->targetName, + step->targetPort, shardId); + + if (progress == REBALANCE_PROGRESS_WAITING || + progress == REBALANCE_PROGRESS_MOVING) + { + /* + * If we are not done with the move, the correct shard size is the + * size on the source. + */ + shardSize = sourceSize; + backupShardSize = targetSize; + } + else if (progress == REBALANCE_PROGRESS_MOVED) + { + /* + * If we are done with the move, the correct shard size is the size + * on the target + */ + shardSize = targetSize; + backupShardSize = sourceSize; + } if (shardSize == 0) { - /* - * It's possible that we are reading the sizes after the move has - * already fininshed. This means that the shards on the source - * might have already been deleted. In that case we instead report - * the size on the target as the shard size, since that is now the - * only existing shard. - */ - shardSize = WorkerShardSize(shardStatistics, step->targetName, - step->targetPort, shardId); - if (shardSize == 0) + if (backupShardSize == 0) { /* * We don't have any useful shard size. This can happen when a * shard is moved multiple times and it is not present on * either of these nodes. Probably the shard is on a worker - * related to the next move. In the weird case that this shard + * related to another event. In the weird case that this shard * is on the nodes and actually is size 0, we will have no * entry in the hashmap. When fetching from it we always * default to 0 if no entry is found, so that's fine. */ continue; } + + /* + * Because of the way we fetch shard sizes they are from a slightly + * earlier moment than the progress state we just read from shared + * memory. Usually this is no problem, but there exist some race + * conditions where this matters. For example, for very quick moves + * it is possible that even though a step is now reported as MOVED, + * when we read the shard sizes the move had not even started yet. + * This in turn can mean that the target size is 0 while the source + * size is not. We try to handle such rare edge cases by falling + * back on the other shard size if that one is not 0. + */ + shardSize = backupShardSize; } @@ -1427,6 +1461,15 @@ GetMovedShardIdsByWorker(PlacementUpdateEventProgress *steps, int stepCount, AddToWorkerShardIdSet(shardsByWorker, step->sourceName, step->sourcePort, step->shardId); + if (pg_atomic_read_u64(&step->progress) == REBALANCE_PROGRESS_WAITING) + { + /* + * shard move has not started so we don't need target stats for + * this shard + */ + continue; + } + AddToWorkerShardIdSet(shardsByWorker, step->targetName, step->targetPort, step->shardId); } @@ -1559,6 +1602,8 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) * This uses the first relationId from the list, it's only used for display * purposes so it does not really matter which to show */ + SetupRebalanceMonitor(placementUpdateList, linitial_oid(options->relationIdList), + REBALANCE_PROGRESS_WAITING); ExecutePlacementUpdates(placementUpdateList, shardReplicationModeOid, "Moving"); FinalizeCurrentProgressMonitor(); } @@ -1635,11 +1680,21 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, errmsg("only moving or copying shards is supported"))); } + UpdateColocatedShardPlacementProgress(shardId, + sourceNode->workerName, + sourceNode->workerPort, + REBALANCE_PROGRESS_MOVING); + /* * In case of failure, we throw an error such that rebalance_table_shards * fails early. */ ExecuteRebalancerCommandInSeparateTransaction(placementUpdateCommand->data); + + UpdateColocatedShardPlacementProgress(shardId, + sourceNode->workerName, + sourceNode->workerPort, + REBALANCE_PROGRESS_MOVED); } @@ -2700,6 +2755,51 @@ WorkerNodeListContains(List *workerNodeList, const char *workerName, uint32 work } +/* + * UpdateColocatedShardPlacementProgress updates the progress of the given placement, + * along with its colocated placements, to the given state. + */ +static void +UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName, int sourcePort, + uint64 progress) +{ + ProgressMonitorData *header = GetCurrentProgressMonitor(); + + if (header != NULL) + { + PlacementUpdateEventProgress *steps = ProgressMonitorSteps(header); + ListCell *colocatedShardIntervalCell = NULL; + + ShardInterval *shardInterval = LoadShardInterval(shardId); + List *colocatedShardIntervalList = ColocatedShardIntervalList(shardInterval); + + for (int moveIndex = 0; moveIndex < header->stepCount; moveIndex++) + { + PlacementUpdateEventProgress *step = steps + moveIndex; + uint64 currentShardId = step->shardId; + bool colocatedShard = false; + + foreach(colocatedShardIntervalCell, colocatedShardIntervalList) + { + ShardInterval *candidateShard = lfirst(colocatedShardIntervalCell); + if (candidateShard->shardId == currentShardId) + { + colocatedShard = true; + break; + } + } + + if (colocatedShard && + strcmp(step->sourceName, sourceName) == 0 && + step->sourcePort == sourcePort) + { + pg_atomic_write_u64(&step->progress, progress); + } + } + } +} + + /* * pg_dist_rebalance_strategy_enterprise_check is a now removed function, but * to avoid issues during upgrades a C stub is kept. diff --git a/src/backend/distributed/progress/multi_progress.c b/src/backend/distributed/progress/multi_progress.c index 9b9c2faa6..657f3356c 100644 --- a/src/backend/distributed/progress/multi_progress.c +++ b/src/backend/distributed/progress/multi_progress.c @@ -109,11 +109,17 @@ GetCurrentProgressMonitor(void) /* * FinalizeCurrentProgressMonitor releases the dynamic memory segment of the current * progress monitoring data structure and removes the process from - * pg_stat_get_progress_info() output. + * pg_stat_get_progress_info() output. If there's no such dynamic memory + * segment this is a no-op. */ void FinalizeCurrentProgressMonitor(void) { + if (currentProgressDSMHandle == DSM_HANDLE_INVALID) + { + return; + } + dsm_segment *dsmSegment = dsm_find_mapping(currentProgressDSMHandle); if (dsmSegment != NULL) diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index d21894ffc..36c38ffff 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -73,7 +73,9 @@ /* *INDENT-ON* */ #define REBALANCE_ACTIVITY_MAGIC_NUMBER 1337 +#define REBALANCE_PROGRESS_WAITING 0 #define REBALANCE_PROGRESS_MOVING 1 +#define REBALANCE_PROGRESS_MOVED 2 /* Enumeration that defines different placement update types */ typedef enum @@ -193,7 +195,9 @@ extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlace extern void ExecuteRebalancerCommandInSeparateTransaction(char *command); extern void AcquirePlacementColocationLock(Oid relationId, int lockMode, const char *operationName); -extern void SetupRebalanceMonitor(List *placementUpdateList, Oid relationId); +extern void SetupRebalanceMonitor(List *placementUpdateList, + Oid relationId, + uint64 initialProgressState); #endif /* SHARD_REBALANCER_H */ diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index 000542bca..731f72c14 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -35,7 +35,9 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1 colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1 -(2 rows) +colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0 +colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0 +(4 rows) step s2-unlock-1-start: ROLLBACK; @@ -112,9 +114,11 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress --------------------------------------------------------------------- +colocated1|1500001| 73728|localhost | 57637| 0|localhost | 57638| 73728| 2 +colocated2|1500005| 401408|localhost | 57637| 0|localhost | 57638| 401408| 2 colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 1 colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 1 -(2 rows) +(4 rows) step s3-unlock-2-start: ROLLBACK; @@ -205,7 +209,9 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 -(2 rows) +colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0 +colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0 +(4 rows) step s7-release-lock: COMMIT; @@ -288,7 +294,9 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1 colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1 -(2 rows) +colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0 +colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0 +(4 rows) step s6-release-advisory-lock: SELECT pg_advisory_unlock(44000, 55152);