mirror of https://github.com/citusdata/citus.git
Fix data race in get_rebalance_progress (#5008)
To be able to report progress of the rebalancer, the rebalancer updates the state of a shard move in a shared memory segment. To then fetch the progress, `get_rebalance_progress` can be called which reads this shared memory. Without this change it did so without using any synchronization primitives, allowing for data races. This fixes that by using atomic operations to update and read from the parts of the shared memory that can be changed after initialization.pull/5005/head^2
parent
8c3f85692d
commit
3271f1bd13
|
@ -759,6 +759,8 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId)
|
|||
event->shardId = colocatedUpdate->shardId;
|
||||
event->sourcePort = colocatedUpdate->sourceNode->workerPort;
|
||||
event->targetPort = colocatedUpdate->targetNode->workerPort;
|
||||
pg_atomic_init_u64(&event->progress, REBALANCE_PROGRESS_WAITING);
|
||||
|
||||
eventIndex++;
|
||||
}
|
||||
}
|
||||
|
@ -1115,7 +1117,7 @@ get_rebalance_progress(PG_FUNCTION_ARGS)
|
|||
values[5] = UInt32GetDatum(step->sourcePort);
|
||||
values[6] = PointerGetDatum(cstring_to_text(step->targetName));
|
||||
values[7] = UInt32GetDatum(step->targetPort);
|
||||
values[8] = UInt64GetDatum(step->progress);
|
||||
values[8] = UInt64GetDatum(pg_atomic_read_u64(&step->progress));
|
||||
values[9] = UInt64GetDatum(sourceSize);
|
||||
values[10] = UInt64GetDatum(targetSize);
|
||||
|
||||
|
@ -1156,15 +1158,15 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics)
|
|||
uint64 shardId = step->shardId;
|
||||
uint64 shardSize = 0;
|
||||
uint64 backupShardSize = 0;
|
||||
uint64 progress = pg_atomic_read_u64(&step->progress);
|
||||
|
||||
uint64 sourceSize = WorkerShardSize(shardStatistics, step->sourceName,
|
||||
step->sourcePort, shardId);
|
||||
uint64 targetSize = WorkerShardSize(shardStatistics, step->targetName,
|
||||
step->targetPort, shardId);
|
||||
|
||||
|
||||
if (step->progress == REBALANCE_PROGRESS_WAITING ||
|
||||
step->progress == REBALANCE_PROGRESS_MOVING)
|
||||
if (progress == REBALANCE_PROGRESS_WAITING ||
|
||||
progress == REBALANCE_PROGRESS_MOVING)
|
||||
{
|
||||
/*
|
||||
* If we are not done with the move, the correct shard size is the
|
||||
|
@ -1173,7 +1175,7 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics)
|
|||
shardSize = sourceSize;
|
||||
backupShardSize = targetSize;
|
||||
}
|
||||
else if (step->progress == REBALANCE_PROGRESS_MOVED)
|
||||
else if (progress == REBALANCE_PROGRESS_MOVED)
|
||||
{
|
||||
/*
|
||||
* If we are done with the move, the correct shard size is the size
|
||||
|
@ -1200,14 +1202,15 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics)
|
|||
}
|
||||
|
||||
/*
|
||||
* There exist some race conditions where it's possible that the
|
||||
* the state of the steps we see in shared memory are a bit behind
|
||||
* what is actually going on. So it is possible that even though a
|
||||
* step is reported as still being in the MOVING state, the shard
|
||||
* move might have just finished completing. This in turn can mean
|
||||
* that the source size is 0 while the target size is not. We try
|
||||
* to handle such rare edge cases by falling back on the other
|
||||
* shard size if that one is not 0.
|
||||
* Because of the way we fetch shard sizes they are from a slightly
|
||||
* earlier moment than the progress state we just read from shared
|
||||
* memory. Usually this is no problem, but there exist some race
|
||||
* conditions where this matters. For example, for very quick moves
|
||||
* it is possible that even though a step is now reported as MOVED,
|
||||
* when we read the shard sizes the move had not even started yet.
|
||||
* This in turn can mean that the target size is 0 while the source
|
||||
* size is not. We try to handle such rare edge cases by falling
|
||||
* back on the other shard size if that one is not 0.
|
||||
*/
|
||||
shardSize = backupShardSize;
|
||||
}
|
||||
|
@ -1423,7 +1426,7 @@ GetMovedShardIdsByWorker(PlacementUpdateEventProgress *steps, int stepCount,
|
|||
AddToWorkerShardIdSet(shardsByWorker, step->sourceName, step->sourcePort,
|
||||
step->shardId);
|
||||
|
||||
if (step->progress == REBALANCE_PROGRESS_WAITING)
|
||||
if (pg_atomic_read_u64(&step->progress) == REBALANCE_PROGRESS_WAITING)
|
||||
{
|
||||
/*
|
||||
* shard move has not started so we don't need target stats for
|
||||
|
@ -2775,7 +2778,7 @@ UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName, int sour
|
|||
strcmp(step->sourceName, sourceName) == 0 &&
|
||||
step->sourcePort == sourcePort)
|
||||
{
|
||||
step->progress = progress;
|
||||
pg_atomic_write_u64(&step->progress, progress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -105,7 +105,7 @@ typedef struct PlacementUpdateEventProgress
|
|||
int sourcePort;
|
||||
char targetName[255];
|
||||
int targetPort;
|
||||
uint64 progress;
|
||||
pg_atomic_uint64 progress;
|
||||
} PlacementUpdateEventProgress;
|
||||
|
||||
typedef struct NodeFillState
|
||||
|
|
Loading…
Reference in New Issue