From 3271f1bd1307fc32d913faf1748decea054f2af6 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Mon, 31 May 2021 15:27:32 +0200 Subject: [PATCH] Fix data race in get_rebalance_progress (#5008) To be able to report progress of the rebalancer, the rebalancer updates the state of a shard move in a shared memory segment. To then fetch the progress, `get_rebalance_progress` can be called which reads this shared memory. Without this change it did so without using any synchronization primitives, allowing for data races. This fixes that by using atomic operations to update and read from the parts of the shared memory that can be changed after initialization. --- .../distributed/operations/shard_rebalancer.c | 33 ++++++++++--------- src/include/distributed/shard_rebalancer.h | 2 +- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 4e3791de1..d07eee868 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -759,6 +759,8 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId) event->shardId = colocatedUpdate->shardId; event->sourcePort = colocatedUpdate->sourceNode->workerPort; event->targetPort = colocatedUpdate->targetNode->workerPort; + pg_atomic_init_u64(&event->progress, REBALANCE_PROGRESS_WAITING); + eventIndex++; } } @@ -1115,7 +1117,7 @@ get_rebalance_progress(PG_FUNCTION_ARGS) values[5] = UInt32GetDatum(step->sourcePort); values[6] = PointerGetDatum(cstring_to_text(step->targetName)); values[7] = UInt32GetDatum(step->targetPort); - values[8] = UInt64GetDatum(step->progress); + values[8] = UInt64GetDatum(pg_atomic_read_u64(&step->progress)); values[9] = UInt64GetDatum(sourceSize); values[10] = UInt64GetDatum(targetSize); @@ -1156,15 +1158,15 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics) uint64 shardId = step->shardId; uint64 shardSize = 0; uint64 backupShardSize = 0; + uint64 progress = pg_atomic_read_u64(&step->progress); uint64 sourceSize = WorkerShardSize(shardStatistics, step->sourceName, step->sourcePort, shardId); uint64 targetSize = WorkerShardSize(shardStatistics, step->targetName, step->targetPort, shardId); - - if (step->progress == REBALANCE_PROGRESS_WAITING || - step->progress == REBALANCE_PROGRESS_MOVING) + if (progress == REBALANCE_PROGRESS_WAITING || + progress == REBALANCE_PROGRESS_MOVING) { /* * If we are not done with the move, the correct shard size is the @@ -1173,7 +1175,7 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics) shardSize = sourceSize; backupShardSize = targetSize; } - else if (step->progress == REBALANCE_PROGRESS_MOVED) + else if (progress == REBALANCE_PROGRESS_MOVED) { /* * If we are done with the move, the correct shard size is the size @@ -1200,14 +1202,15 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics) } /* - * There exist some race conditions where it's possible that the - * the state of the steps we see in shared memory are a bit behind - * what is actually going on. So it is possible that even though a - * step is reported as still being in the MOVING state, the shard - * move might have just finished completing. This in turn can mean - * that the source size is 0 while the target size is not. We try - * to handle such rare edge cases by falling back on the other - * shard size if that one is not 0. + * Because of the way we fetch shard sizes they are from a slightly + * earlier moment than the progress state we just read from shared + * memory. Usually this is no problem, but there exist some race + * conditions where this matters. For example, for very quick moves + * it is possible that even though a step is now reported as MOVED, + * when we read the shard sizes the move had not even started yet. + * This in turn can mean that the target size is 0 while the source + * size is not. We try to handle such rare edge cases by falling + * back on the other shard size if that one is not 0. */ shardSize = backupShardSize; } @@ -1423,7 +1426,7 @@ GetMovedShardIdsByWorker(PlacementUpdateEventProgress *steps, int stepCount, AddToWorkerShardIdSet(shardsByWorker, step->sourceName, step->sourcePort, step->shardId); - if (step->progress == REBALANCE_PROGRESS_WAITING) + if (pg_atomic_read_u64(&step->progress) == REBALANCE_PROGRESS_WAITING) { /* * shard move has not started so we don't need target stats for @@ -2775,7 +2778,7 @@ UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName, int sour strcmp(step->sourceName, sourceName) == 0 && step->sourcePort == sourcePort) { - step->progress = progress; + pg_atomic_write_u64(&step->progress, progress); } } } diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index ea1ff6827..096af4a58 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -105,7 +105,7 @@ typedef struct PlacementUpdateEventProgress int sourcePort; char targetName[255]; int targetPort; - uint64 progress; + pg_atomic_uint64 progress; } PlacementUpdateEventProgress; typedef struct NodeFillState