Avoid two race conditions in the rebalance progress monitor (#5050)

The first and main issue was that we were putting absolute pointers into
shared memory for the `steps` field of the `ProgressMonitorData`. This
pointer was being overwritten every time a process requested the monitor
steps, which is the only reason why this even worked in the first place.

To quote a part of a relevant stack overflow answer:

> First of all, putting absolute pointers in shared memory segments is
> terrible terible idea - those pointers would only be valid in the
> process that filled in their values. Shared memory segments are not
> guaranteed to attach at the same virtual address in every process.
> On the contrary - they attach where the system deems it possible when
> `shmaddr == NULL` is specified on call to `shmat()`

Source: https://stackoverflow.com/a/10781921/2570866

In this case a race condition occurred when a second process overwrote
the pointer in between the first process its write and read of the steps
field.

This issue is fixed by not storing the pointer in shared memory anymore.
Instead we now calculate it's position every time we need it.

The second race condition I have not been able to trigger, but I found
it while investigating this. This issue was that we published the handle
of the shared memory segment, before we initialized the data in the
steps. This means that during initialization of the data, a call to
`get_rebalance_progress()` could read partial data in an unsynchronized
manner.

(cherry picked from commit ca00b63272)
pull/5152/head
Jelte Fennema 2021-06-21 16:03:42 +02:00 committed by Hanefi Onaldi
parent 998b044fdc
commit 6986ac2f17
No known key found for this signature in database
GPG Key ID: F18CDB10BA0DFDC7
4 changed files with 89 additions and 43 deletions

View File

@ -744,12 +744,12 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId)
List *colocatedUpdateList = GetColocatedRebalanceSteps(placementUpdateList); List *colocatedUpdateList = GetColocatedRebalanceSteps(placementUpdateList);
ListCell *colocatedUpdateCell = NULL; ListCell *colocatedUpdateCell = NULL;
ProgressMonitorData *monitor = CreateProgressMonitor(REBALANCE_ACTIVITY_MAGIC_NUMBER, dsm_handle dsmHandle;
list_length(colocatedUpdateList), ProgressMonitorData *monitor = CreateProgressMonitor(
sizeof( list_length(colocatedUpdateList),
PlacementUpdateEventProgress), sizeof(PlacementUpdateEventProgress),
relationId); &dsmHandle);
PlacementUpdateEventProgress *rebalanceSteps = monitor->steps; PlacementUpdateEventProgress *rebalanceSteps = ProgressMonitorSteps(monitor);
int32 eventIndex = 0; int32 eventIndex = 0;
foreach(colocatedUpdateCell, colocatedUpdateList) foreach(colocatedUpdateCell, colocatedUpdateList)
@ -767,6 +767,7 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId)
eventIndex++; eventIndex++;
} }
RegisterProgressMonitor(REBALANCE_ACTIVITY_MAGIC_NUMBER, relationId, dsmHandle);
} }
@ -1075,7 +1076,6 @@ get_rebalance_progress(PG_FUNCTION_ARGS)
{ {
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
List *segmentList = NIL; List *segmentList = NIL;
ListCell *rebalanceMonitorCell = NULL;
TupleDesc tupdesc; TupleDesc tupdesc;
Tuplestorestate *tupstore = SetupTuplestore(fcinfo, &tupdesc); Tuplestorestate *tupstore = SetupTuplestore(fcinfo, &tupdesc);
@ -1083,12 +1083,12 @@ get_rebalance_progress(PG_FUNCTION_ARGS)
List *rebalanceMonitorList = ProgressMonitorList(REBALANCE_ACTIVITY_MAGIC_NUMBER, List *rebalanceMonitorList = ProgressMonitorList(REBALANCE_ACTIVITY_MAGIC_NUMBER,
&segmentList); &segmentList);
ProgressMonitorData *monitor = NULL;
foreach(rebalanceMonitorCell, rebalanceMonitorList) foreach_ptr(monitor, rebalanceMonitorList)
{ {
ProgressMonitorData *monitor = lfirst(rebalanceMonitorCell); PlacementUpdateEventProgress *placementUpdateEvents = ProgressMonitorSteps(
PlacementUpdateEventProgress *placementUpdateEvents = monitor->steps; monitor);
HTAB *shardStatistics = BuildWorkerShardStatisticsHash(monitor->steps, HTAB *shardStatistics = BuildWorkerShardStatisticsHash(placementUpdateEvents,
monitor->stepCount); monitor->stepCount);
HTAB *shardSizes = BuildShardSizesHash(monitor, shardStatistics); HTAB *shardSizes = BuildShardSizesHash(monitor, shardStatistics);
for (int eventIndex = 0; eventIndex < monitor->stepCount; eventIndex++) for (int eventIndex = 0; eventIndex < monitor->stepCount; eventIndex++)
@ -1158,10 +1158,12 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics)
HTAB *shardSizes = hash_create( HTAB *shardSizes = hash_create(
"ShardSizeHash", 32, &info, "ShardSizeHash", 32, &info,
HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
PlacementUpdateEventProgress *placementUpdateEvents = monitor->steps; PlacementUpdateEventProgress *placementUpdateEvents = ProgressMonitorSteps(monitor);
for (int eventIndex = 0; eventIndex < monitor->stepCount; eventIndex++) for (int eventIndex = 0; eventIndex < monitor->stepCount; eventIndex++)
{ {
PlacementUpdateEventProgress *step = placementUpdateEvents + eventIndex; PlacementUpdateEventProgress *step = placementUpdateEvents + eventIndex;
uint64 shardId = step->shardId; uint64 shardId = step->shardId;
uint64 shardSize = 0; uint64 shardSize = 0;
uint64 backupShardSize = 0; uint64 backupShardSize = 0;
@ -2769,9 +2771,9 @@ UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName, int sour
{ {
ProgressMonitorData *header = GetCurrentProgressMonitor(); ProgressMonitorData *header = GetCurrentProgressMonitor();
if (header != NULL && header->steps != NULL) if (header != NULL)
{ {
PlacementUpdateEventProgress *steps = header->steps; PlacementUpdateEventProgress *steps = ProgressMonitorSteps(header);
ListCell *colocatedShardIntervalCell = NULL; ListCell *colocatedShardIntervalCell = NULL;
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);

View File

@ -27,18 +27,16 @@ static ProgressMonitorData * MonitorDataFromDSMHandle(dsm_handle dsmHandle,
/* /*
* CreateProgressMonitor is used to create a place to store progress information related * CreateProgressMonitor is used to create a place to store progress
* to long running processes. The function creates a dynamic shared memory segment * information related to long running processes. The function creates a
* consisting of a header regarding to the process and an array of "steps" that the long * dynamic shared memory segment consisting of a header regarding to the
* running "operations" consists of. The handle of the dynamic shared memory is stored in * process and an array of "steps" that the long running "operations" consists
* pg_stat_get_progress_info output, to be parsed by a progress retrieval command * of. After initializing the data in the array of steps, the shared memory
* later on. This behavior may cause unrelated (but hopefully harmless) rows in * segment can be shared with other processes using RegisterProgressMonitor, by
* pg_stat_progress_vacuum output. The caller of this function should provide a magic * giving it the value that's written to the dsmHandle argument.
* number, a unique 64 bit unsigned integer, to distinguish different types of commands.
*/ */
ProgressMonitorData * ProgressMonitorData *
CreateProgressMonitor(uint64 progressTypeMagicNumber, int stepCount, Size stepSize, CreateProgressMonitor(int stepCount, Size stepSize, dsm_handle *dsmHandle)
Oid relationId)
{ {
if (stepSize <= 0 || stepCount <= 0) if (stepSize <= 0 || stepCount <= 0)
{ {
@ -58,20 +56,37 @@ CreateProgressMonitor(uint64 progressTypeMagicNumber, int stepCount, Size stepSi
return NULL; return NULL;
} }
dsm_handle dsmHandle = dsm_segment_handle(dsmSegment); *dsmHandle = dsm_segment_handle(dsmSegment);
ProgressMonitorData *monitor = MonitorDataFromDSMHandle(dsmHandle, &dsmSegment); ProgressMonitorData *monitor = MonitorDataFromDSMHandle(*dsmHandle, &dsmSegment);
monitor->stepCount = stepCount; monitor->stepCount = stepCount;
monitor->processId = MyProcPid; monitor->processId = MyProcPid;
return monitor;
}
/*
* RegisterProgressMonitor shares dsmHandle with other postgres process by
* storing it in pg_stat_get_progress_info output, to be parsed by a
* progress retrieval command later on. This behavior may cause unrelated (but
* hopefully harmless) rows in pg_stat_progress_vacuum output. The caller of
* this function should provide a magic number, a unique 64 bit unsigned
* integer, to distinguish different types of commands.
*
* IMPORTANT: After registering the progress monitor, all modification to the
* data should be done using concurrency safe operations (i.e. locks and
* atomics)
*/
void
RegisterProgressMonitor(uint64 progressTypeMagicNumber, Oid relationId,
dsm_handle dsmHandle)
{
pgstat_progress_start_command(PROGRESS_COMMAND_VACUUM, relationId); pgstat_progress_start_command(PROGRESS_COMMAND_VACUUM, relationId);
pgstat_progress_update_param(1, dsmHandle); pgstat_progress_update_param(1, dsmHandle);
pgstat_progress_update_param(0, progressTypeMagicNumber); pgstat_progress_update_param(0, progressTypeMagicNumber);
currentProgressDSMHandle = dsmHandle; currentProgressDSMHandle = dsmHandle;
return monitor;
} }
@ -204,24 +219,46 @@ ProgressMonitorData *
MonitorDataFromDSMHandle(dsm_handle dsmHandle, dsm_segment **attachedSegment) MonitorDataFromDSMHandle(dsm_handle dsmHandle, dsm_segment **attachedSegment)
{ {
dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle); dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle);
ProgressMonitorData *monitor = NULL;
if (dsmSegment == NULL) if (dsmSegment == NULL)
{ {
dsmSegment = dsm_attach(dsmHandle); dsmSegment = dsm_attach(dsmHandle);
} }
if (dsmSegment != NULL) if (dsmSegment == NULL)
{ {
monitor = (ProgressMonitorData *) dsm_segment_address(dsmSegment); return NULL;
monitor->steps = (void *) (monitor + 1);
*attachedSegment = dsmSegment;
} }
ProgressMonitorData *monitor = (ProgressMonitorData *) dsm_segment_address(
dsmSegment);
*attachedSegment = dsmSegment;
return monitor; return monitor;
} }
/*
* ProgressMonitorSteps returns a pointer to the array of steps that are stored
* in a progress monitor. This is simply the data right after the header, so
* this function is trivial. The main purpose of this function is to make the
* intent clear to readers of the code.
*
* NOTE: The pointer this function returns is explicitly not stored in the
* header, because the header is shared between processes. The absolute pointer
* to the steps can have a different value between processes though, because
* the same piece of shared memory often has a different address in different
* processes. So we calculate this pointer over and over to make sure we use
* the right value for each process.
*/
void *
ProgressMonitorSteps(ProgressMonitorData *monitor)
{
return monitor + 1;
}
/* /*
* DetachFromDSMSegments ensures that the process is detached from all of the segments in * DetachFromDSMSegments ensures that the process is detached from all of the segments in
* the given list. * the given list.

View File

@ -36,12 +36,13 @@ create_progress(PG_FUNCTION_ARGS)
{ {
uint64 magicNumber = PG_GETARG_INT64(0); uint64 magicNumber = PG_GETARG_INT64(0);
int stepCount = PG_GETARG_INT32(1); int stepCount = PG_GETARG_INT32(1);
ProgressMonitorData *monitor = CreateProgressMonitor(magicNumber, stepCount, dsm_handle dsmHandle;
sizeof(uint64), 0); ProgressMonitorData *monitor = CreateProgressMonitor(stepCount,
sizeof(uint64), &dsmHandle);
if (monitor != NULL) if (monitor != NULL)
{ {
uint64 *steps = (uint64 *) monitor->steps; uint64 *steps = (uint64 *) ProgressMonitorSteps(monitor);
int i = 0; int i = 0;
for (; i < stepCount; i++) for (; i < stepCount; i++)
@ -50,6 +51,7 @@ create_progress(PG_FUNCTION_ARGS)
} }
} }
RegisterProgressMonitor(magicNumber, 0, dsmHandle);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -64,7 +66,7 @@ update_progress(PG_FUNCTION_ARGS)
if (monitor != NULL && step < monitor->stepCount) if (monitor != NULL && step < monitor->stepCount)
{ {
uint64 *steps = (uint64 *) monitor->steps; uint64 *steps = (uint64 *) ProgressMonitorSteps(monitor);
steps[step] = newValue; steps[step] = newValue;
} }
@ -93,7 +95,7 @@ show_progress(PG_FUNCTION_ARGS)
ProgressMonitorData *monitor = NULL; ProgressMonitorData *monitor = NULL;
foreach_ptr(monitor, monitorList) foreach_ptr(monitor, monitorList)
{ {
uint64 *steps = monitor->steps; uint64 *steps = ProgressMonitorSteps(monitor);
for (int stepIndex = 0; stepIndex < monitor->stepCount; stepIndex++) for (int stepIndex = 0; stepIndex < monitor->stepCount; stepIndex++)
{ {

View File

@ -13,26 +13,31 @@
#define MULTI_PROGRESS_H #define MULTI_PROGRESS_H
#include "postgres.h"
#include "fmgr.h" #include "fmgr.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "storage/dsm.h"
typedef struct ProgressMonitorData typedef struct ProgressMonitorData
{ {
uint64 processId; uint64 processId;
int stepCount; int stepCount;
void *steps;
} ProgressMonitorData; } ProgressMonitorData;
extern ProgressMonitorData * CreateProgressMonitor(uint64 progressTypeMagicNumber, extern ProgressMonitorData * CreateProgressMonitor(int stepCount, Size stepSize,
int stepCount, Size stepSize, dsm_handle *dsmHandle);
Oid relationId); extern void RegisterProgressMonitor(uint64 progressTypeMagicNumber,
Oid relationId,
dsm_handle dsmHandle);
extern ProgressMonitorData * GetCurrentProgressMonitor(void); extern ProgressMonitorData * GetCurrentProgressMonitor(void);
extern void FinalizeCurrentProgressMonitor(void); extern void FinalizeCurrentProgressMonitor(void);
extern List * ProgressMonitorList(uint64 commandTypeMagicNumber, extern List * ProgressMonitorList(uint64 commandTypeMagicNumber,
List **attachedDSMSegmentList); List **attachedDSMSegmentList);
extern void DetachFromDSMSegments(List *dsmSegmentList); extern void DetachFromDSMSegments(List *dsmSegmentList);
extern void * ProgressMonitorSteps(ProgressMonitorData *monitor);
#endif /* MULTI_PROGRESS_H */ #endif /* MULTI_PROGRESS_H */