mirror of https://github.com/citusdata/citus.git
Avoid two race conditions in the rebalance progress monitor (#5050)
The first and main issue was that we were putting absolute pointers into shared memory for the `steps` field of the `ProgressMonitorData`. This pointer was being overwritten every time a process requested the monitor steps, which is the only reason why this even worked in the first place. To quote a part of a relevant stack overflow answer: > First of all, putting absolute pointers in shared memory segments is > terrible terible idea - those pointers would only be valid in the > process that filled in their values. Shared memory segments are not > guaranteed to attach at the same virtual address in every process. > On the contrary - they attach where the system deems it possible when > `shmaddr == NULL` is specified on call to `shmat()` Source: https://stackoverflow.com/a/10781921/2570866 In this case a race condition occurred when a second process overwrote the pointer in between the first process its write and read of the steps field. This issue is fixed by not storing the pointer in shared memory anymore. Instead we now calculate it's position every time we need it. The second race condition I have not been able to trigger, but I found it while investigating this. This issue was that we published the handle of the shared memory segment, before we initialized the data in the steps. This means that during initialization of the data, a call to `get_rebalance_progress()` could read partial data in an unsynchronized manner.pull/5066/head
parent
206401b708
commit
ca00b63272
|
@ -744,12 +744,12 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId)
|
|||
List *colocatedUpdateList = GetColocatedRebalanceSteps(placementUpdateList);
|
||||
ListCell *colocatedUpdateCell = NULL;
|
||||
|
||||
ProgressMonitorData *monitor = CreateProgressMonitor(REBALANCE_ACTIVITY_MAGIC_NUMBER,
|
||||
list_length(colocatedUpdateList),
|
||||
sizeof(
|
||||
PlacementUpdateEventProgress),
|
||||
relationId);
|
||||
PlacementUpdateEventProgress *rebalanceSteps = monitor->steps;
|
||||
dsm_handle dsmHandle;
|
||||
ProgressMonitorData *monitor = CreateProgressMonitor(
|
||||
list_length(colocatedUpdateList),
|
||||
sizeof(PlacementUpdateEventProgress),
|
||||
&dsmHandle);
|
||||
PlacementUpdateEventProgress *rebalanceSteps = ProgressMonitorSteps(monitor);
|
||||
|
||||
int32 eventIndex = 0;
|
||||
foreach(colocatedUpdateCell, colocatedUpdateList)
|
||||
|
@ -767,6 +767,7 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId)
|
|||
|
||||
eventIndex++;
|
||||
}
|
||||
RegisterProgressMonitor(REBALANCE_ACTIVITY_MAGIC_NUMBER, relationId, dsmHandle);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1075,7 +1076,6 @@ get_rebalance_progress(PG_FUNCTION_ARGS)
|
|||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
List *segmentList = NIL;
|
||||
ListCell *rebalanceMonitorCell = NULL;
|
||||
TupleDesc tupdesc;
|
||||
Tuplestorestate *tupstore = SetupTuplestore(fcinfo, &tupdesc);
|
||||
|
||||
|
@ -1083,12 +1083,12 @@ get_rebalance_progress(PG_FUNCTION_ARGS)
|
|||
List *rebalanceMonitorList = ProgressMonitorList(REBALANCE_ACTIVITY_MAGIC_NUMBER,
|
||||
&segmentList);
|
||||
|
||||
|
||||
foreach(rebalanceMonitorCell, rebalanceMonitorList)
|
||||
ProgressMonitorData *monitor = NULL;
|
||||
foreach_ptr(monitor, rebalanceMonitorList)
|
||||
{
|
||||
ProgressMonitorData *monitor = lfirst(rebalanceMonitorCell);
|
||||
PlacementUpdateEventProgress *placementUpdateEvents = monitor->steps;
|
||||
HTAB *shardStatistics = BuildWorkerShardStatisticsHash(monitor->steps,
|
||||
PlacementUpdateEventProgress *placementUpdateEvents = ProgressMonitorSteps(
|
||||
monitor);
|
||||
HTAB *shardStatistics = BuildWorkerShardStatisticsHash(placementUpdateEvents,
|
||||
monitor->stepCount);
|
||||
HTAB *shardSizes = BuildShardSizesHash(monitor, shardStatistics);
|
||||
for (int eventIndex = 0; eventIndex < monitor->stepCount; eventIndex++)
|
||||
|
@ -1158,10 +1158,12 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics)
|
|||
HTAB *shardSizes = hash_create(
|
||||
"ShardSizeHash", 32, &info,
|
||||
HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
|
||||
PlacementUpdateEventProgress *placementUpdateEvents = monitor->steps;
|
||||
PlacementUpdateEventProgress *placementUpdateEvents = ProgressMonitorSteps(monitor);
|
||||
|
||||
for (int eventIndex = 0; eventIndex < monitor->stepCount; eventIndex++)
|
||||
{
|
||||
PlacementUpdateEventProgress *step = placementUpdateEvents + eventIndex;
|
||||
|
||||
uint64 shardId = step->shardId;
|
||||
uint64 shardSize = 0;
|
||||
uint64 backupShardSize = 0;
|
||||
|
@ -2769,9 +2771,9 @@ UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName, int sour
|
|||
{
|
||||
ProgressMonitorData *header = GetCurrentProgressMonitor();
|
||||
|
||||
if (header != NULL && header->steps != NULL)
|
||||
if (header != NULL)
|
||||
{
|
||||
PlacementUpdateEventProgress *steps = header->steps;
|
||||
PlacementUpdateEventProgress *steps = ProgressMonitorSteps(header);
|
||||
ListCell *colocatedShardIntervalCell = NULL;
|
||||
|
||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
|
|
|
@ -27,18 +27,16 @@ static ProgressMonitorData * MonitorDataFromDSMHandle(dsm_handle dsmHandle,
|
|||
|
||||
|
||||
/*
|
||||
* CreateProgressMonitor is used to create a place to store progress information related
|
||||
* to long running processes. The function creates a dynamic shared memory segment
|
||||
* consisting of a header regarding to the process and an array of "steps" that the long
|
||||
* running "operations" consists of. The handle of the dynamic shared memory is stored in
|
||||
* pg_stat_get_progress_info output, to be parsed by a progress retrieval command
|
||||
* later on. This behavior may cause unrelated (but hopefully harmless) rows in
|
||||
* pg_stat_progress_vacuum output. The caller of this function should provide a magic
|
||||
* number, a unique 64 bit unsigned integer, to distinguish different types of commands.
|
||||
* CreateProgressMonitor is used to create a place to store progress
|
||||
* information related to long running processes. The function creates a
|
||||
* dynamic shared memory segment consisting of a header regarding to the
|
||||
* process and an array of "steps" that the long running "operations" consists
|
||||
* of. After initializing the data in the array of steps, the shared memory
|
||||
* segment can be shared with other processes using RegisterProgressMonitor, by
|
||||
* giving it the value that's written to the dsmHandle argument.
|
||||
*/
|
||||
ProgressMonitorData *
|
||||
CreateProgressMonitor(uint64 progressTypeMagicNumber, int stepCount, Size stepSize,
|
||||
Oid relationId)
|
||||
CreateProgressMonitor(int stepCount, Size stepSize, dsm_handle *dsmHandle)
|
||||
{
|
||||
if (stepSize <= 0 || stepCount <= 0)
|
||||
{
|
||||
|
@ -58,20 +56,37 @@ CreateProgressMonitor(uint64 progressTypeMagicNumber, int stepCount, Size stepSi
|
|||
return NULL;
|
||||
}
|
||||
|
||||
dsm_handle dsmHandle = dsm_segment_handle(dsmSegment);
|
||||
*dsmHandle = dsm_segment_handle(dsmSegment);
|
||||
|
||||
ProgressMonitorData *monitor = MonitorDataFromDSMHandle(dsmHandle, &dsmSegment);
|
||||
ProgressMonitorData *monitor = MonitorDataFromDSMHandle(*dsmHandle, &dsmSegment);
|
||||
|
||||
monitor->stepCount = stepCount;
|
||||
monitor->processId = MyProcPid;
|
||||
return monitor;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RegisterProgressMonitor shares dsmHandle with other postgres process by
|
||||
* storing it in pg_stat_get_progress_info output, to be parsed by a
|
||||
* progress retrieval command later on. This behavior may cause unrelated (but
|
||||
* hopefully harmless) rows in pg_stat_progress_vacuum output. The caller of
|
||||
* this function should provide a magic number, a unique 64 bit unsigned
|
||||
* integer, to distinguish different types of commands.
|
||||
*
|
||||
* IMPORTANT: After registering the progress monitor, all modification to the
|
||||
* data should be done using concurrency safe operations (i.e. locks and
|
||||
* atomics)
|
||||
*/
|
||||
void
|
||||
RegisterProgressMonitor(uint64 progressTypeMagicNumber, Oid relationId,
|
||||
dsm_handle dsmHandle)
|
||||
{
|
||||
pgstat_progress_start_command(PROGRESS_COMMAND_VACUUM, relationId);
|
||||
pgstat_progress_update_param(1, dsmHandle);
|
||||
pgstat_progress_update_param(0, progressTypeMagicNumber);
|
||||
|
||||
currentProgressDSMHandle = dsmHandle;
|
||||
|
||||
return monitor;
|
||||
}
|
||||
|
||||
|
||||
|
@ -204,24 +219,46 @@ ProgressMonitorData *
|
|||
MonitorDataFromDSMHandle(dsm_handle dsmHandle, dsm_segment **attachedSegment)
|
||||
{
|
||||
dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle);
|
||||
ProgressMonitorData *monitor = NULL;
|
||||
|
||||
if (dsmSegment == NULL)
|
||||
{
|
||||
dsmSegment = dsm_attach(dsmHandle);
|
||||
}
|
||||
|
||||
if (dsmSegment != NULL)
|
||||
if (dsmSegment == NULL)
|
||||
{
|
||||
monitor = (ProgressMonitorData *) dsm_segment_address(dsmSegment);
|
||||
monitor->steps = (void *) (monitor + 1);
|
||||
*attachedSegment = dsmSegment;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ProgressMonitorData *monitor = (ProgressMonitorData *) dsm_segment_address(
|
||||
dsmSegment);
|
||||
|
||||
*attachedSegment = dsmSegment;
|
||||
|
||||
return monitor;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ProgressMonitorSteps returns a pointer to the array of steps that are stored
|
||||
* in a progress monitor. This is simply the data right after the header, so
|
||||
* this function is trivial. The main purpose of this function is to make the
|
||||
* intent clear to readers of the code.
|
||||
*
|
||||
* NOTE: The pointer this function returns is explicitly not stored in the
|
||||
* header, because the header is shared between processes. The absolute pointer
|
||||
* to the steps can have a different value between processes though, because
|
||||
* the same piece of shared memory often has a different address in different
|
||||
* processes. So we calculate this pointer over and over to make sure we use
|
||||
* the right value for each process.
|
||||
*/
|
||||
void *
|
||||
ProgressMonitorSteps(ProgressMonitorData *monitor)
|
||||
{
|
||||
return monitor + 1;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DetachFromDSMSegments ensures that the process is detached from all of the segments in
|
||||
* the given list.
|
||||
|
|
|
@ -36,12 +36,13 @@ create_progress(PG_FUNCTION_ARGS)
|
|||
{
|
||||
uint64 magicNumber = PG_GETARG_INT64(0);
|
||||
int stepCount = PG_GETARG_INT32(1);
|
||||
ProgressMonitorData *monitor = CreateProgressMonitor(magicNumber, stepCount,
|
||||
sizeof(uint64), 0);
|
||||
dsm_handle dsmHandle;
|
||||
ProgressMonitorData *monitor = CreateProgressMonitor(stepCount,
|
||||
sizeof(uint64), &dsmHandle);
|
||||
|
||||
if (monitor != NULL)
|
||||
{
|
||||
uint64 *steps = (uint64 *) monitor->steps;
|
||||
uint64 *steps = (uint64 *) ProgressMonitorSteps(monitor);
|
||||
|
||||
int i = 0;
|
||||
for (; i < stepCount; i++)
|
||||
|
@ -50,6 +51,7 @@ create_progress(PG_FUNCTION_ARGS)
|
|||
}
|
||||
}
|
||||
|
||||
RegisterProgressMonitor(magicNumber, 0, dsmHandle);
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
@ -64,7 +66,7 @@ update_progress(PG_FUNCTION_ARGS)
|
|||
|
||||
if (monitor != NULL && step < monitor->stepCount)
|
||||
{
|
||||
uint64 *steps = (uint64 *) monitor->steps;
|
||||
uint64 *steps = (uint64 *) ProgressMonitorSteps(monitor);
|
||||
steps[step] = newValue;
|
||||
}
|
||||
|
||||
|
@ -93,7 +95,7 @@ show_progress(PG_FUNCTION_ARGS)
|
|||
ProgressMonitorData *monitor = NULL;
|
||||
foreach_ptr(monitor, monitorList)
|
||||
{
|
||||
uint64 *steps = monitor->steps;
|
||||
uint64 *steps = ProgressMonitorSteps(monitor);
|
||||
|
||||
for (int stepIndex = 0; stepIndex < monitor->stepCount; stepIndex++)
|
||||
{
|
||||
|
|
|
@ -13,26 +13,31 @@
|
|||
#define MULTI_PROGRESS_H
|
||||
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "fmgr.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "storage/dsm.h"
|
||||
|
||||
|
||||
typedef struct ProgressMonitorData
|
||||
{
|
||||
uint64 processId;
|
||||
int stepCount;
|
||||
void *steps;
|
||||
} ProgressMonitorData;
|
||||
|
||||
|
||||
extern ProgressMonitorData * CreateProgressMonitor(uint64 progressTypeMagicNumber,
|
||||
int stepCount, Size stepSize,
|
||||
Oid relationId);
|
||||
extern ProgressMonitorData * CreateProgressMonitor(int stepCount, Size stepSize,
|
||||
dsm_handle *dsmHandle);
|
||||
extern void RegisterProgressMonitor(uint64 progressTypeMagicNumber,
|
||||
Oid relationId,
|
||||
dsm_handle dsmHandle);
|
||||
extern ProgressMonitorData * GetCurrentProgressMonitor(void);
|
||||
extern void FinalizeCurrentProgressMonitor(void);
|
||||
extern List * ProgressMonitorList(uint64 commandTypeMagicNumber,
|
||||
List **attachedDSMSegmentList);
|
||||
extern void DetachFromDSMSegments(List *dsmSegmentList);
|
||||
extern void * ProgressMonitorSteps(ProgressMonitorData *monitor);
|
||||
|
||||
|
||||
#endif /* MULTI_PROGRESS_H */
|
||||
|
|
Loading…
Reference in New Issue