diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 193797384..4388b86fd 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -21,6 +21,7 @@ #include "catalog/pg_class.h" #include "catalog/pg_enum.h" #include "distributed/adaptive_executor.h" +#include "distributed/backend_data.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" @@ -397,15 +398,28 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) targetNodeName, targetNodePort); - WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); - WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); + /* + * We want to be able to track progress of shard moves using + * get_rebalancer_progress. If this move is initiated by the rebalancer, + * then the rebalancer call has already set up the shared memory that is + * used to do that. But if citus_move_shard_placement is called directly by + * the user (or through any other mechanism), then the shared memory is not + * set up yet. In that case we do it here. + */ + if (!IsRebalancerInternalBackend()) + { + WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); + WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); - PlacementUpdateEvent *placementUpdateEvent = palloc0(sizeof(PlacementUpdateEvent)); - placementUpdateEvent->updateType = PLACEMENT_UPDATE_MOVE; - placementUpdateEvent->shardId = shardId; - placementUpdateEvent->sourceNode = sourceNode; - placementUpdateEvent->targetNode = targetNode; - SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId); + PlacementUpdateEvent *placementUpdateEvent = palloc0( + sizeof(PlacementUpdateEvent)); + placementUpdateEvent->updateType = PLACEMENT_UPDATE_MOVE; + placementUpdateEvent->shardId = shardId; + placementUpdateEvent->sourceNode = sourceNode; + placementUpdateEvent->targetNode = targetNode; + SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId, + REBALANCE_PROGRESS_MOVING); + } /* * At this point of the shard moves, we don't need to block the writes to diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index ad7cccb0e..c6079eb83 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -201,6 +201,8 @@ static int PlacementsHashCompare(const void *lhsKey, const void *rhsKey, Size ke static uint32 PlacementsHashHashCode(const void *key, Size keySize); static bool WorkerNodeListContains(List *workerNodeList, const char *workerName, uint32 workerPort); +static void UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName, + int sourcePort, uint64 progress); static bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode); static NodeFillState * FindFillStateForPlacement(RebalanceState *state, ShardPlacement *placement); @@ -258,6 +260,7 @@ PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check); bool RunningUnderIsolationTest = false; int MaxRebalancerLoggedIgnoredMoves = 5; + #ifdef USE_ASSERT_CHECKING /* @@ -763,7 +766,9 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, * dsm handle so that it can be used for updating the progress and cleaning things up. */ void -SetupRebalanceMonitor(List *placementUpdateList, Oid relationId) +SetupRebalanceMonitor(List *placementUpdateList, + Oid relationId, + uint64 initialProgressState) { List *colocatedUpdateList = GetColocatedRebalanceSteps(placementUpdateList); ListCell *colocatedUpdateCell = NULL; @@ -787,7 +792,7 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId) event->shardId = colocatedUpdate->shardId; event->sourcePort = colocatedUpdate->sourceNode->workerPort; event->targetPort = colocatedUpdate->targetNode->workerPort; - pg_atomic_init_u64(&event->progress, REBALANCE_PROGRESS_MOVING); + pg_atomic_init_u64(&event->progress, initialProgressState); eventIndex++; } @@ -1186,34 +1191,63 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics) PlacementUpdateEventProgress *step = placementUpdateEvents + eventIndex; uint64 shardId = step->shardId; + uint64 shardSize = 0; + uint64 backupShardSize = 0; + uint64 progress = pg_atomic_read_u64(&step->progress); - uint64 shardSize = WorkerShardSize(shardStatistics, step->sourceName, - step->sourcePort, shardId); + uint64 sourceSize = WorkerShardSize(shardStatistics, step->sourceName, + step->sourcePort, shardId); + uint64 targetSize = WorkerShardSize(shardStatistics, step->targetName, + step->targetPort, shardId); + + if (progress == REBALANCE_PROGRESS_WAITING || + progress == REBALANCE_PROGRESS_MOVING) + { + /* + * If we are not done with the move, the correct shard size is the + * size on the source. + */ + shardSize = sourceSize; + backupShardSize = targetSize; + } + else if (progress == REBALANCE_PROGRESS_MOVED) + { + /* + * If we are done with the move, the correct shard size is the size + * on the target + */ + shardSize = targetSize; + backupShardSize = sourceSize; + } if (shardSize == 0) { - /* - * It's possible that we are reading the sizes after the move has - * already fininshed. This means that the shards on the source - * might have already been deleted. In that case we instead report - * the size on the target as the shard size, since that is now the - * only existing shard. - */ - shardSize = WorkerShardSize(shardStatistics, step->targetName, - step->targetPort, shardId); - if (shardSize == 0) + if (backupShardSize == 0) { /* * We don't have any useful shard size. This can happen when a * shard is moved multiple times and it is not present on * either of these nodes. Probably the shard is on a worker - * related to the next move. In the weird case that this shard + * related to another event. In the weird case that this shard * is on the nodes and actually is size 0, we will have no * entry in the hashmap. When fetching from it we always * default to 0 if no entry is found, so that's fine. */ continue; } + + /* + * Because of the way we fetch shard sizes they are from a slightly + * earlier moment than the progress state we just read from shared + * memory. Usually this is no problem, but there exist some race + * conditions where this matters. For example, for very quick moves + * it is possible that even though a step is now reported as MOVED, + * when we read the shard sizes the move had not even started yet. + * This in turn can mean that the target size is 0 while the source + * size is not. We try to handle such rare edge cases by falling + * back on the other shard size if that one is not 0. + */ + shardSize = backupShardSize; } @@ -1427,6 +1461,15 @@ GetMovedShardIdsByWorker(PlacementUpdateEventProgress *steps, int stepCount, AddToWorkerShardIdSet(shardsByWorker, step->sourceName, step->sourcePort, step->shardId); + if (pg_atomic_read_u64(&step->progress) == REBALANCE_PROGRESS_WAITING) + { + /* + * shard move has not started so we don't need target stats for + * this shard + */ + continue; + } + AddToWorkerShardIdSet(shardsByWorker, step->targetName, step->targetPort, step->shardId); } @@ -1559,6 +1602,8 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) * This uses the first relationId from the list, it's only used for display * purposes so it does not really matter which to show */ + SetupRebalanceMonitor(placementUpdateList, linitial_oid(options->relationIdList), + REBALANCE_PROGRESS_WAITING); ExecutePlacementUpdates(placementUpdateList, shardReplicationModeOid, "Moving"); FinalizeCurrentProgressMonitor(); } @@ -1635,11 +1680,21 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, errmsg("only moving or copying shards is supported"))); } + UpdateColocatedShardPlacementProgress(shardId, + sourceNode->workerName, + sourceNode->workerPort, + REBALANCE_PROGRESS_MOVING); + /* * In case of failure, we throw an error such that rebalance_table_shards * fails early. */ ExecuteRebalancerCommandInSeparateTransaction(placementUpdateCommand->data); + + UpdateColocatedShardPlacementProgress(shardId, + sourceNode->workerName, + sourceNode->workerPort, + REBALANCE_PROGRESS_MOVED); } @@ -2700,6 +2755,51 @@ WorkerNodeListContains(List *workerNodeList, const char *workerName, uint32 work } +/* + * UpdateColocatedShardPlacementProgress updates the progress of the given placement, + * along with its colocated placements, to the given state. + */ +static void +UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName, int sourcePort, + uint64 progress) +{ + ProgressMonitorData *header = GetCurrentProgressMonitor(); + + if (header != NULL) + { + PlacementUpdateEventProgress *steps = ProgressMonitorSteps(header); + ListCell *colocatedShardIntervalCell = NULL; + + ShardInterval *shardInterval = LoadShardInterval(shardId); + List *colocatedShardIntervalList = ColocatedShardIntervalList(shardInterval); + + for (int moveIndex = 0; moveIndex < header->stepCount; moveIndex++) + { + PlacementUpdateEventProgress *step = steps + moveIndex; + uint64 currentShardId = step->shardId; + bool colocatedShard = false; + + foreach(colocatedShardIntervalCell, colocatedShardIntervalList) + { + ShardInterval *candidateShard = lfirst(colocatedShardIntervalCell); + if (candidateShard->shardId == currentShardId) + { + colocatedShard = true; + break; + } + } + + if (colocatedShard && + strcmp(step->sourceName, sourceName) == 0 && + step->sourcePort == sourcePort) + { + pg_atomic_write_u64(&step->progress, progress); + } + } + } +} + + /* * pg_dist_rebalance_strategy_enterprise_check is a now removed function, but * to avoid issues during upgrades a C stub is kept. diff --git a/src/backend/distributed/progress/multi_progress.c b/src/backend/distributed/progress/multi_progress.c index 9b9c2faa6..657f3356c 100644 --- a/src/backend/distributed/progress/multi_progress.c +++ b/src/backend/distributed/progress/multi_progress.c @@ -109,11 +109,17 @@ GetCurrentProgressMonitor(void) /* * FinalizeCurrentProgressMonitor releases the dynamic memory segment of the current * progress monitoring data structure and removes the process from - * pg_stat_get_progress_info() output. + * pg_stat_get_progress_info() output. If there's no such dynamic memory + * segment this is a no-op. */ void FinalizeCurrentProgressMonitor(void) { + if (currentProgressDSMHandle == DSM_HANDLE_INVALID) + { + return; + } + dsm_segment *dsmSegment = dsm_find_mapping(currentProgressDSMHandle); if (dsmSegment != NULL) diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index d21894ffc..36c38ffff 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -73,7 +73,9 @@ /* *INDENT-ON* */ #define REBALANCE_ACTIVITY_MAGIC_NUMBER 1337 +#define REBALANCE_PROGRESS_WAITING 0 #define REBALANCE_PROGRESS_MOVING 1 +#define REBALANCE_PROGRESS_MOVED 2 /* Enumeration that defines different placement update types */ typedef enum @@ -193,7 +195,9 @@ extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlace extern void ExecuteRebalancerCommandInSeparateTransaction(char *command); extern void AcquirePlacementColocationLock(Oid relationId, int lockMode, const char *operationName); -extern void SetupRebalanceMonitor(List *placementUpdateList, Oid relationId); +extern void SetupRebalanceMonitor(List *placementUpdateList, + Oid relationId, + uint64 initialProgressState); #endif /* SHARD_REBALANCER_H */ diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index 000542bca..731f72c14 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -35,7 +35,9 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1 colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1 -(2 rows) +colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0 +colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0 +(4 rows) step s2-unlock-1-start: ROLLBACK; @@ -112,9 +114,11 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress --------------------------------------------------------------------- +colocated1|1500001| 73728|localhost | 57637| 0|localhost | 57638| 73728| 2 +colocated2|1500005| 401408|localhost | 57637| 0|localhost | 57638| 401408| 2 colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 1 colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 1 -(2 rows) +(4 rows) step s3-unlock-2-start: ROLLBACK; @@ -205,7 +209,9 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 -(2 rows) +colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0 +colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0 +(4 rows) step s7-release-lock: COMMIT; @@ -288,7 +294,9 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname --------------------------------------------------------------------- colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1 colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1 -(2 rows) +colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0 +colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0 +(4 rows) step s6-release-advisory-lock: SELECT pg_advisory_unlock(44000, 55152);