diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 0049d1f0e..e4a1aedf8 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -744,12 +744,12 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId) List *colocatedUpdateList = GetColocatedRebalanceSteps(placementUpdateList); ListCell *colocatedUpdateCell = NULL; - ProgressMonitorData *monitor = CreateProgressMonitor(REBALANCE_ACTIVITY_MAGIC_NUMBER, - list_length(colocatedUpdateList), - sizeof( - PlacementUpdateEventProgress), - relationId); - PlacementUpdateEventProgress *rebalanceSteps = monitor->steps; + dsm_handle dsmHandle; + ProgressMonitorData *monitor = CreateProgressMonitor( + list_length(colocatedUpdateList), + sizeof(PlacementUpdateEventProgress), + &dsmHandle); + PlacementUpdateEventProgress *rebalanceSteps = ProgressMonitorSteps(monitor); int32 eventIndex = 0; foreach(colocatedUpdateCell, colocatedUpdateList) @@ -767,6 +767,7 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId) eventIndex++; } + RegisterProgressMonitor(REBALANCE_ACTIVITY_MAGIC_NUMBER, relationId, dsmHandle); } @@ -1075,7 +1076,6 @@ get_rebalance_progress(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); List *segmentList = NIL; - ListCell *rebalanceMonitorCell = NULL; TupleDesc tupdesc; Tuplestorestate *tupstore = SetupTuplestore(fcinfo, &tupdesc); @@ -1083,12 +1083,12 @@ get_rebalance_progress(PG_FUNCTION_ARGS) List *rebalanceMonitorList = ProgressMonitorList(REBALANCE_ACTIVITY_MAGIC_NUMBER, &segmentList); - - foreach(rebalanceMonitorCell, rebalanceMonitorList) + ProgressMonitorData *monitor = NULL; + foreach_ptr(monitor, rebalanceMonitorList) { - ProgressMonitorData *monitor = lfirst(rebalanceMonitorCell); - PlacementUpdateEventProgress *placementUpdateEvents = monitor->steps; - HTAB *shardStatistics = BuildWorkerShardStatisticsHash(monitor->steps, + PlacementUpdateEventProgress *placementUpdateEvents = ProgressMonitorSteps( + monitor); + HTAB *shardStatistics = BuildWorkerShardStatisticsHash(placementUpdateEvents, monitor->stepCount); HTAB *shardSizes = BuildShardSizesHash(monitor, shardStatistics); for (int eventIndex = 0; eventIndex < monitor->stepCount; eventIndex++) @@ -1158,10 +1158,12 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics) HTAB *shardSizes = hash_create( "ShardSizeHash", 32, &info, HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); - PlacementUpdateEventProgress *placementUpdateEvents = monitor->steps; + PlacementUpdateEventProgress *placementUpdateEvents = ProgressMonitorSteps(monitor); + for (int eventIndex = 0; eventIndex < monitor->stepCount; eventIndex++) { PlacementUpdateEventProgress *step = placementUpdateEvents + eventIndex; + uint64 shardId = step->shardId; uint64 shardSize = 0; uint64 backupShardSize = 0; @@ -2769,9 +2771,9 @@ UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName, int sour { ProgressMonitorData *header = GetCurrentProgressMonitor(); - if (header != NULL && header->steps != NULL) + if (header != NULL) { - PlacementUpdateEventProgress *steps = header->steps; + PlacementUpdateEventProgress *steps = ProgressMonitorSteps(header); ListCell *colocatedShardIntervalCell = NULL; ShardInterval *shardInterval = LoadShardInterval(shardId); diff --git a/src/backend/distributed/progress/multi_progress.c b/src/backend/distributed/progress/multi_progress.c index 23a40f723..abe388eb2 100644 --- a/src/backend/distributed/progress/multi_progress.c +++ b/src/backend/distributed/progress/multi_progress.c @@ -27,18 +27,16 @@ static ProgressMonitorData * MonitorDataFromDSMHandle(dsm_handle dsmHandle, /* - * CreateProgressMonitor is used to create a place to store progress information related - * to long running processes. The function creates a dynamic shared memory segment - * consisting of a header regarding to the process and an array of "steps" that the long - * running "operations" consists of. The handle of the dynamic shared memory is stored in - * pg_stat_get_progress_info output, to be parsed by a progress retrieval command - * later on. This behavior may cause unrelated (but hopefully harmless) rows in - * pg_stat_progress_vacuum output. The caller of this function should provide a magic - * number, a unique 64 bit unsigned integer, to distinguish different types of commands. + * CreateProgressMonitor is used to create a place to store progress + * information related to long running processes. The function creates a + * dynamic shared memory segment consisting of a header regarding to the + * process and an array of "steps" that the long running "operations" consists + * of. After initializing the data in the array of steps, the shared memory + * segment can be shared with other processes using RegisterProgressMonitor, by + * giving it the value that's written to the dsmHandle argument. */ ProgressMonitorData * -CreateProgressMonitor(uint64 progressTypeMagicNumber, int stepCount, Size stepSize, - Oid relationId) +CreateProgressMonitor(int stepCount, Size stepSize, dsm_handle *dsmHandle) { if (stepSize <= 0 || stepCount <= 0) { @@ -58,20 +56,37 @@ CreateProgressMonitor(uint64 progressTypeMagicNumber, int stepCount, Size stepSi return NULL; } - dsm_handle dsmHandle = dsm_segment_handle(dsmSegment); + *dsmHandle = dsm_segment_handle(dsmSegment); - ProgressMonitorData *monitor = MonitorDataFromDSMHandle(dsmHandle, &dsmSegment); + ProgressMonitorData *monitor = MonitorDataFromDSMHandle(*dsmHandle, &dsmSegment); monitor->stepCount = stepCount; monitor->processId = MyProcPid; + return monitor; +} + +/* + * RegisterProgressMonitor shares dsmHandle with other postgres process by + * storing it in pg_stat_get_progress_info output, to be parsed by a + * progress retrieval command later on. This behavior may cause unrelated (but + * hopefully harmless) rows in pg_stat_progress_vacuum output. The caller of + * this function should provide a magic number, a unique 64 bit unsigned + * integer, to distinguish different types of commands. + * + * IMPORTANT: After registering the progress monitor, all modification to the + * data should be done using concurrency safe operations (i.e. locks and + * atomics) + */ +void +RegisterProgressMonitor(uint64 progressTypeMagicNumber, Oid relationId, + dsm_handle dsmHandle) +{ pgstat_progress_start_command(PROGRESS_COMMAND_VACUUM, relationId); pgstat_progress_update_param(1, dsmHandle); pgstat_progress_update_param(0, progressTypeMagicNumber); currentProgressDSMHandle = dsmHandle; - - return monitor; } @@ -204,24 +219,46 @@ ProgressMonitorData * MonitorDataFromDSMHandle(dsm_handle dsmHandle, dsm_segment **attachedSegment) { dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle); - ProgressMonitorData *monitor = NULL; if (dsmSegment == NULL) { dsmSegment = dsm_attach(dsmHandle); } - if (dsmSegment != NULL) + if (dsmSegment == NULL) { - monitor = (ProgressMonitorData *) dsm_segment_address(dsmSegment); - monitor->steps = (void *) (monitor + 1); - *attachedSegment = dsmSegment; + return NULL; } + ProgressMonitorData *monitor = (ProgressMonitorData *) dsm_segment_address( + dsmSegment); + + *attachedSegment = dsmSegment; + return monitor; } +/* + * ProgressMonitorSteps returns a pointer to the array of steps that are stored + * in a progress monitor. This is simply the data right after the header, so + * this function is trivial. The main purpose of this function is to make the + * intent clear to readers of the code. + * + * NOTE: The pointer this function returns is explicitly not stored in the + * header, because the header is shared between processes. The absolute pointer + * to the steps can have a different value between processes though, because + * the same piece of shared memory often has a different address in different + * processes. So we calculate this pointer over and over to make sure we use + * the right value for each process. + */ +void * +ProgressMonitorSteps(ProgressMonitorData *monitor) +{ + return monitor + 1; +} + + /* * DetachFromDSMSegments ensures that the process is detached from all of the segments in * the given list. diff --git a/src/backend/distributed/test/progress_utils.c b/src/backend/distributed/test/progress_utils.c index 0d776c963..c50e2dd5e 100644 --- a/src/backend/distributed/test/progress_utils.c +++ b/src/backend/distributed/test/progress_utils.c @@ -36,12 +36,13 @@ create_progress(PG_FUNCTION_ARGS) { uint64 magicNumber = PG_GETARG_INT64(0); int stepCount = PG_GETARG_INT32(1); - ProgressMonitorData *monitor = CreateProgressMonitor(magicNumber, stepCount, - sizeof(uint64), 0); + dsm_handle dsmHandle; + ProgressMonitorData *monitor = CreateProgressMonitor(stepCount, + sizeof(uint64), &dsmHandle); if (monitor != NULL) { - uint64 *steps = (uint64 *) monitor->steps; + uint64 *steps = (uint64 *) ProgressMonitorSteps(monitor); int i = 0; for (; i < stepCount; i++) @@ -50,6 +51,7 @@ create_progress(PG_FUNCTION_ARGS) } } + RegisterProgressMonitor(magicNumber, 0, dsmHandle); PG_RETURN_VOID(); } @@ -64,7 +66,7 @@ update_progress(PG_FUNCTION_ARGS) if (monitor != NULL && step < monitor->stepCount) { - uint64 *steps = (uint64 *) monitor->steps; + uint64 *steps = (uint64 *) ProgressMonitorSteps(monitor); steps[step] = newValue; } @@ -93,7 +95,7 @@ show_progress(PG_FUNCTION_ARGS) ProgressMonitorData *monitor = NULL; foreach_ptr(monitor, monitorList) { - uint64 *steps = monitor->steps; + uint64 *steps = ProgressMonitorSteps(monitor); for (int stepIndex = 0; stepIndex < monitor->stepCount; stepIndex++) { diff --git a/src/include/distributed/multi_progress.h b/src/include/distributed/multi_progress.h index abc3b2bb7..591116583 100644 --- a/src/include/distributed/multi_progress.h +++ b/src/include/distributed/multi_progress.h @@ -13,26 +13,31 @@ #define MULTI_PROGRESS_H +#include "postgres.h" + #include "fmgr.h" #include "nodes/pg_list.h" +#include "storage/dsm.h" typedef struct ProgressMonitorData { uint64 processId; int stepCount; - void *steps; } ProgressMonitorData; -extern ProgressMonitorData * CreateProgressMonitor(uint64 progressTypeMagicNumber, - int stepCount, Size stepSize, - Oid relationId); +extern ProgressMonitorData * CreateProgressMonitor(int stepCount, Size stepSize, + dsm_handle *dsmHandle); +extern void RegisterProgressMonitor(uint64 progressTypeMagicNumber, + Oid relationId, + dsm_handle dsmHandle); extern ProgressMonitorData * GetCurrentProgressMonitor(void); extern void FinalizeCurrentProgressMonitor(void); extern List * ProgressMonitorList(uint64 commandTypeMagicNumber, List **attachedDSMSegmentList); extern void DetachFromDSMSegments(List *dsmSegmentList); +extern void * ProgressMonitorSteps(ProgressMonitorData *monitor); #endif /* MULTI_PROGRESS_H */