diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 4e3791de1..d07eee868 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -759,6 +759,8 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId) event->shardId = colocatedUpdate->shardId; event->sourcePort = colocatedUpdate->sourceNode->workerPort; event->targetPort = colocatedUpdate->targetNode->workerPort; + pg_atomic_init_u64(&event->progress, REBALANCE_PROGRESS_WAITING); + eventIndex++; } } @@ -1115,7 +1117,7 @@ get_rebalance_progress(PG_FUNCTION_ARGS) values[5] = UInt32GetDatum(step->sourcePort); values[6] = PointerGetDatum(cstring_to_text(step->targetName)); values[7] = UInt32GetDatum(step->targetPort); - values[8] = UInt64GetDatum(step->progress); + values[8] = UInt64GetDatum(pg_atomic_read_u64(&step->progress)); values[9] = UInt64GetDatum(sourceSize); values[10] = UInt64GetDatum(targetSize); @@ -1156,15 +1158,15 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics) uint64 shardId = step->shardId; uint64 shardSize = 0; uint64 backupShardSize = 0; + uint64 progress = pg_atomic_read_u64(&step->progress); uint64 sourceSize = WorkerShardSize(shardStatistics, step->sourceName, step->sourcePort, shardId); uint64 targetSize = WorkerShardSize(shardStatistics, step->targetName, step->targetPort, shardId); - - if (step->progress == REBALANCE_PROGRESS_WAITING || - step->progress == REBALANCE_PROGRESS_MOVING) + if (progress == REBALANCE_PROGRESS_WAITING || + progress == REBALANCE_PROGRESS_MOVING) { /* * If we are not done with the move, the correct shard size is the @@ -1173,7 +1175,7 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics) shardSize = sourceSize; backupShardSize = targetSize; } - else if (step->progress == REBALANCE_PROGRESS_MOVED) + else if (progress == REBALANCE_PROGRESS_MOVED) { /* * If we are done with the move, the correct shard size is the size @@ -1200,14 +1202,15 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics) } /* - * There exist some race conditions where it's possible that the - * the state of the steps we see in shared memory are a bit behind - * what is actually going on. So it is possible that even though a - * step is reported as still being in the MOVING state, the shard - * move might have just finished completing. This in turn can mean - * that the source size is 0 while the target size is not. We try - * to handle such rare edge cases by falling back on the other - * shard size if that one is not 0. + * Because of the way we fetch shard sizes they are from a slightly + * earlier moment than the progress state we just read from shared + * memory. Usually this is no problem, but there exist some race + * conditions where this matters. For example, for very quick moves + * it is possible that even though a step is now reported as MOVED, + * when we read the shard sizes the move had not even started yet. + * This in turn can mean that the target size is 0 while the source + * size is not. We try to handle such rare edge cases by falling + * back on the other shard size if that one is not 0. */ shardSize = backupShardSize; } @@ -1423,7 +1426,7 @@ GetMovedShardIdsByWorker(PlacementUpdateEventProgress *steps, int stepCount, AddToWorkerShardIdSet(shardsByWorker, step->sourceName, step->sourcePort, step->shardId); - if (step->progress == REBALANCE_PROGRESS_WAITING) + if (pg_atomic_read_u64(&step->progress) == REBALANCE_PROGRESS_WAITING) { /* * shard move has not started so we don't need target stats for @@ -2775,7 +2778,7 @@ UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName, int sour strcmp(step->sourceName, sourceName) == 0 && step->sourcePort == sourcePort) { - step->progress = progress; + pg_atomic_write_u64(&step->progress, progress); } } } diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index ea1ff6827..096af4a58 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -105,7 +105,7 @@ typedef struct PlacementUpdateEventProgress int sourcePort; char targetName[255]; int targetPort; - uint64 progress; + pg_atomic_uint64 progress; } PlacementUpdateEventProgress; typedef struct NodeFillState