Add Progress Tracking Infrastructure

This change adds a general purpose infrastructure to log and monitor
process about long running progresses. It uses
`pg_stat_get_progress_info` infrastructure, introduced with PostgreSQL
9.6 and used for tracking `VACUUM` commands.

This patch only handles the creation of a memory space in dynamic shared
memory, putting its info in `pg_stat_get_progress_info`, fetching the
progress monitors on demand and finalizing the progress tracking.
pull/1508/head
Eren Başak 2017-07-21 16:27:49 +03:00
parent d33cb7d832
commit a12f1980de
7 changed files with 760 additions and 2 deletions

View File

@ -19,7 +19,7 @@ DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)
DATA_built = $(foreach v,$(EXTVERSIONS),$(EXTENSION)--$(v).sql)
# directories with source files
SUBDIRS = . commands connection executor master metadata planner relay test transaction utils worker
SUBDIRS = . commands connection executor master metadata planner progress relay test transaction utils worker
# That patsubst rule searches all directories listed in SUBDIRS for .c
# files, and adds the corresponding .o files to OBJS

View File

@ -0,0 +1,275 @@
/*-------------------------------------------------------------------------
*
* multi_progress.c
* Routines for tracking long-running jobs and seeing their progress.
*
* Copyright (c) 2017, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_progress.h"
#include "nodes/execnodes.h"
#include "storage/dsm.h"
#include "utils/builtins.h"
/* dynamic shared memory handle of the current progress */
static uint64 currentProgressDSMHandle = DSM_HANDLE_INVALID;
static ProgressMonitorData * MonitorDataFromDSMHandle(dsm_handle dsmHandle,
dsm_segment **attachedSegment);
static ReturnSetInfo * FunctionCallGetTupleStore1(PGFunction function, Oid functionId,
Datum argument);
/*
* CreateProgressMonitor is used to create a place to store progress information related
* to long running processes. The function creates a dynamic shared memory segment
* consisting of a header regarding to the process and an array of "steps" that the long
* running "operations" consists of. The handle of the dynamic shared memory is stored in
* pg_stat_get_progress_info output, to be parsed by a progress retrieval command
* later on. This behavior may cause unrelated (but hopefully harmless) rows in
* pg_stat_progress_vacuum output. The caller of this function should provide a magic
* number, a unique 64 bit unsigned integer, to distinguish different types of commands.
*/
ProgressMonitorData *
CreateProgressMonitor(uint64 progressTypeMagicNumber, int stepCount, Size stepSize,
Oid relationId)
{
dsm_segment *dsmSegment = NULL;
dsm_handle dsmHandle = 0;
ProgressMonitorData *monitor = NULL;
Size monitorSize = 0;
if (stepSize <= 0 || stepCount <= 0)
{
ereport(ERROR,
(errmsg("number of steps and size of each step should be "
"positive values")));
}
monitorSize = sizeof(ProgressMonitorData) + stepSize * stepCount;
dsmSegment = dsm_create(monitorSize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
if (dsmSegment == NULL)
{
ereport(WARNING,
(errmsg("could not create a dynamic shared memory segment to "
"keep track of progress of the current command")));
return NULL;
}
dsmHandle = dsm_segment_handle(dsmSegment);
monitor = MonitorDataFromDSMHandle(dsmHandle, &dsmSegment);
monitor->stepCount = stepCount;
monitor->processId = MyProcPid;
pgstat_progress_start_command(PROGRESS_COMMAND_VACUUM, relationId);
pgstat_progress_update_param(1, dsmHandle);
pgstat_progress_update_param(0, progressTypeMagicNumber);
currentProgressDSMHandle = dsmHandle;
return monitor;
}
/*
* GetCurrentProgressMonitor function returns the header and steps array related to the
* current progress. A progress monitor should be created by calling
* CreateProgressMonitor, before calling this function.
*/
ProgressMonitorData *
GetCurrentProgressMonitor(void)
{
dsm_segment *dsmSegment = NULL;
ProgressMonitorData *monitor = MonitorDataFromDSMHandle(currentProgressDSMHandle,
&dsmSegment);
return monitor;
}
/*
* FinalizeCurrentProgressMonitor releases the dynamic memory segment of the current
* progress monitoring data structure and removes the process from
* pg_stat_get_progress_info() output.
*/
void
FinalizeCurrentProgressMonitor(void)
{
dsm_segment *dsmSegment = dsm_find_mapping(currentProgressDSMHandle);
if (dsmSegment != NULL)
{
dsm_detach(dsmSegment);
}
pgstat_progress_end_command();
currentProgressDSMHandle = DSM_HANDLE_INVALID;
}
/*
* ProgressMonitorList returns the addresses of monitors of ongoing commands, associated
* with the given identifier magic number. The function takes a pass in
* pg_stat_get_progress_info output, filters the rows according to the given magic number,
* and returns the list of addresses of dynamic shared memory segments. Notice that the
* caller detach from the attached segments with a call to DetachFromDSMSegments function.
*/
List *
ProgressMonitorList(uint64 commandTypeMagicNumber, List **attachedDSMSegments)
{
/*
* The expected magic number should reside in the first progress field and the
* actual segment handle in the second but the slot ordering is 1-indexed in the
* tuple table slot and there are 3 other fields before the progress fields in the
* pg_stat_get_progress_info output.
*/
const int magicNumberIndex = 0 + 1 + 3;
const int dsmHandleIndex = 1 + 1 + 3;
/*
* Currently, Postgres' progress logging mechanism supports only the VACUUM,
* operations. Therefore, we identify ourselves as a VACUUM command but only fill
* a couple of the available fields. Therefore the commands that use Citus' progress
* monitoring API will appear in pg_stat_progress_vacuum output.
*/
text *commandTypeText = cstring_to_text("VACUUM");
Datum commandTypeDatum = PointerGetDatum(commandTypeText);
Oid getProgressInfoFunctionOid = InvalidOid;
TupleTableSlot *tupleTableSlot = NULL;
ReturnSetInfo *progressResultSet = NULL;
List *monitorList = NIL;
getProgressInfoFunctionOid = FunctionOid("pg_catalog",
"pg_stat_get_progress_info",
1);
progressResultSet = FunctionCallGetTupleStore1(pg_stat_get_progress_info,
getProgressInfoFunctionOid,
commandTypeDatum);
tupleTableSlot = MakeSingleTupleTableSlot(progressResultSet->setDesc);
/* iterate over tuples in tuple store, and send them to destination */
for (;;)
{
bool nextTuple = false;
bool isNull = false;
Datum magicNumberDatum = 0;
uint64 magicNumber = 0;
nextTuple = tuplestore_gettupleslot(progressResultSet->setResult,
true,
false,
tupleTableSlot);
if (!nextTuple)
{
break;
}
magicNumberDatum = slot_getattr(tupleTableSlot, magicNumberIndex, &isNull);
magicNumber = DatumGetUInt64(magicNumberDatum);
if (!isNull && magicNumber == commandTypeMagicNumber)
{
Datum dsmHandleDatum = slot_getattr(tupleTableSlot, dsmHandleIndex, &isNull);
dsm_handle dsmHandle = DatumGetUInt64(dsmHandleDatum);
dsm_segment *attachedSegment = NULL;
ProgressMonitorData *monitor = MonitorDataFromDSMHandle(dsmHandle,
&attachedSegment);
if (monitor != NULL)
{
*attachedDSMSegments = lappend(*attachedDSMSegments, attachedSegment);
monitorList = lappend(monitorList, monitor);
}
}
ExecClearTuple(tupleTableSlot);
}
ExecDropSingleTupleTableSlot(tupleTableSlot);
return monitorList;
}
/*
* MonitorDataFromDSMHandle returns the progress monitoring data structure at the
* given segment
*/
ProgressMonitorData *
MonitorDataFromDSMHandle(dsm_handle dsmHandle, dsm_segment **attachedSegment)
{
dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle);
ProgressMonitorData *monitor = NULL;
if (dsmSegment == NULL)
{
dsmSegment = dsm_attach(dsmHandle);
}
if (dsmSegment != NULL)
{
monitor = (ProgressMonitorData *) dsm_segment_address(dsmSegment);
monitor->steps = (void *) (monitor + 1);
*attachedSegment = dsmSegment;
}
return monitor;
}
/*
* DetachFromDSMSegments ensures that the process is detached from all of the segments in
* the given list.
*/
void
DetachFromDSMSegments(List *dsmSegmentList)
{
ListCell *dsmSegmentCell = NULL;
foreach(dsmSegmentCell, dsmSegmentList)
{
dsm_segment *dsmSegment = (dsm_segment *) lfirst(dsmSegmentCell);
dsm_detach(dsmSegment);
}
}
/*
* FunctionCallGetTupleStore1 calls the given set-returning PGFunction with the given
* argument and returns the ResultSetInfo filled by the called function.
*/
static ReturnSetInfo *
FunctionCallGetTupleStore1(PGFunction function, Oid functionId, Datum argument)
{
FunctionCallInfoData fcinfo;
FmgrInfo flinfo;
ReturnSetInfo *rsinfo = makeNode(ReturnSetInfo);
EState *estate = CreateExecutorState();
rsinfo->econtext = GetPerTupleExprContext(estate);
rsinfo->allowedModes = SFRM_Materialize;
fmgr_info(functionId, &flinfo);
InitFunctionCallInfoData(fcinfo, &flinfo, 1, InvalidOid, NULL, (Node *) rsinfo);
fcinfo.arg[0] = argument;
fcinfo.argnull[0] = false;
(*function)(&fcinfo);
return rsinfo;
}

View File

@ -0,0 +1,151 @@
/*-------------------------------------------------------------------------
*
* progress_utils.c
*
* This file contains functions to exercise progress monitoring functionality
* within Citus.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "fmgr.h"
#include "funcapi.h"
#include <unistd.h>
#include "distributed/multi_progress.h"
#include "nodes/execnodes.h"
#include "utils/tuplestore.h"
PG_FUNCTION_INFO_V1(create_progress);
PG_FUNCTION_INFO_V1(update_progress);
PG_FUNCTION_INFO_V1(finish_progress);
PG_FUNCTION_INFO_V1(show_progress);
Datum
create_progress(PG_FUNCTION_ARGS)
{
uint64 magicNumber = PG_GETARG_INT64(0);
int stepCount = PG_GETARG_INT32(1);
ProgressMonitorData *monitor = CreateProgressMonitor(magicNumber, stepCount,
sizeof(uint64), 0);
if (monitor != NULL)
{
uint64 *steps = (uint64 *) monitor->steps;
int i = 0;
for (; i < stepCount; i++)
{
steps[i] = 0;
}
}
PG_RETURN_VOID();
}
Datum
update_progress(PG_FUNCTION_ARGS)
{
uint64 step = PG_GETARG_INT64(0);
uint64 newValue = PG_GETARG_INT64(1);
ProgressMonitorData *monitor = GetCurrentProgressMonitor();
if (monitor != NULL && step < monitor->stepCount)
{
uint64 *steps = (uint64 *) monitor->steps;
steps[step] = newValue;
}
PG_RETURN_VOID();
}
Datum
finish_progress(PG_FUNCTION_ARGS)
{
FinalizeCurrentProgressMonitor();
PG_RETURN_VOID();
}
Datum
show_progress(PG_FUNCTION_ARGS)
{
uint64 magicNumber = PG_GETARG_INT64(0);
List *attachedDSMSegments = NIL;
List *monitorList = ProgressMonitorList(magicNumber, &attachedDSMSegments);
Tuplestorestate *tupstore = NULL;
TupleDesc tupdesc;
MemoryContext perQueryContext;
MemoryContext currentContext;
ReturnSetInfo *resultSet = (ReturnSetInfo *) fcinfo->resultinfo;
ListCell *monitorCell = NULL;
/* check to see if caller supports us returning a tuplestore */
if (resultSet == NULL || !IsA(resultSet, ReturnSetInfo))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot " \
"accept a set")));
}
if (!(resultSet->allowedModes & SFRM_Materialize))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
}
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
{
elog(ERROR, "return type must be a row type");
}
perQueryContext = resultSet->econtext->ecxt_per_query_memory;
currentContext = MemoryContextSwitchTo(perQueryContext);
tupstore = tuplestore_begin_heap(true, false, work_mem);
resultSet->returnMode = SFRM_Materialize;
resultSet->setResult = tupstore;
resultSet->setDesc = tupdesc;
MemoryContextSwitchTo(currentContext);
foreach(monitorCell, monitorList)
{
ProgressMonitorData *monitor = lfirst(monitorCell);
uint64 *steps = monitor->steps;
int stepIndex = 0;
for (stepIndex = 0; stepIndex < monitor->stepCount; stepIndex++)
{
uint64 step = steps[stepIndex];
Datum values[2];
bool nulls[2];
memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(stepIndex);
values[1] = UInt64GetDatum(step);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
}
tuplestore_donestoring(tupstore);
DetachFromDSMSegments(attachedDSMSegments);
return (Datum) 0;
}

View File

@ -0,0 +1,45 @@
/*-------------------------------------------------------------------------
*
* multi_progress.h
* Declarations for public functions and variables used in progress
* tracking functions in Citus.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef MULTI_PROGRESS_H
#define MULTI_PROGRESS_H
#include "fmgr.h"
#include "nodes/pg_list.h"
#if (PG_VERSION_NUM < 100000)
/* define symbols that are undefined in PostgreSQL <= 9.6 */
#define DSM_HANDLE_INVALID 0
extern Datum pg_stat_get_progress_info(PG_FUNCTION_ARGS);
#endif
typedef struct ProgressMonitorData
{
uint64 processId;
int stepCount;
void *steps;
} ProgressMonitorData;
extern ProgressMonitorData * CreateProgressMonitor(uint64 progressTypeMagicNumber,
int stepCount, Size stepSize,
Oid relationId);
extern ProgressMonitorData * GetCurrentProgressMonitor(void);
extern void FinalizeCurrentProgressMonitor(void);
extern List * ProgressMonitorList(uint64 commandTypeMagicNumber,
List **attachedDSMSegmentList);
extern void DetachFromDSMSegments(List *dsmSegmentList);
#endif /* MULTI_PROGRESS_H */

View File

@ -0,0 +1,154 @@
Parsed test spec with 5 sessions
starting permutation: take-locks s1-start-operation s2-start-operation s3-start-operation show-progress release-locks-1 show-progress release-locks-2 show-progress release-locks-3
step take-locks:
-- Locks for steps of sample operation in s1
SELECT pg_advisory_lock(10);
SELECT pg_advisory_lock(11);
SELECT pg_advisory_lock(12);
-- Locks for steps of sample operation in s2
SELECT pg_advisory_lock(20);
SELECT pg_advisory_lock(21);
SELECT pg_advisory_lock(22);
-- Locks for steps of sample operation in s3
SELECT pg_advisory_lock(30);
SELECT pg_advisory_lock(31);
SELECT pg_advisory_lock(32);
pg_advisory_lock
pg_advisory_lock
pg_advisory_lock
pg_advisory_lock
pg_advisory_lock
pg_advisory_lock
pg_advisory_lock
pg_advisory_lock
pg_advisory_lock
step s1-start-operation:
SELECT sample_operation(1337, 10, -1);
<waiting ...>
step s2-start-operation:
SELECT sample_operation(1337, 20, 2);
<waiting ...>
step s3-start-operation:
SELECT sample_operation(3778, 30, 9);
<waiting ...>
step show-progress:
SELECT show_progress(1337);
SELECT show_progress(3778);
show_progress
(0,0)
(1,0)
(0,0)
(1,0)
show_progress
(0,0)
(1,0)
step release-locks-1:
-- Release the locks of first steps of sample operations
SELECT pg_advisory_unlock(10);
SELECT pg_advisory_unlock(20);
SELECT pg_advisory_unlock(30);
pg_advisory_unlock
t
pg_advisory_unlock
t
pg_advisory_unlock
t
step show-progress:
SELECT show_progress(1337);
SELECT show_progress(3778);
show_progress
(0,-1)
(1,0)
(0,2)
(1,0)
show_progress
(0,9)
(1,0)
step release-locks-2:
-- Release the locks of second steps of sample operations
SELECT pg_advisory_unlock(11);
SELECT pg_advisory_unlock(21);
SELECT pg_advisory_unlock(31);
pg_advisory_unlock
t
pg_advisory_unlock
t
pg_advisory_unlock
t
step show-progress:
SELECT show_progress(1337);
SELECT show_progress(3778);
show_progress
(0,-1)
(1,-1)
(0,2)
(1,2)
show_progress
(0,9)
(1,9)
step release-locks-3:
-- Release the locks of final steps of sample operations
SELECT pg_advisory_unlock(12);
SELECT pg_advisory_unlock(22);
SELECT pg_advisory_unlock(32);
pg_advisory_unlock
t
pg_advisory_unlock
t
pg_advisory_unlock
t
step s1-start-operation: <... completed>
sample_operation
step s2-start-operation: <... completed>
sample_operation
step s3-start-operation: <... completed>
sample_operation

View File

@ -9,5 +9,5 @@ test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement isolati
test: isolation_concurrent_dml isolation_data_migration
test: isolation_drop_shards isolation_copy_placement_vs_modification
test: isolation_insert_vs_vacuum isolation_transaction_recovery
test: isolation_distributed_transaction_id
test: isolation_distributed_transaction_id isolation_progress_monitoring
test: isolation_dump_local_wait_edges isolation_dump_global_wait_edges

View File

@ -0,0 +1,133 @@
# Isolation tests for checking the progress monitoring infrastructure
# We create three different processes, two of the type "1337" and one of type "3778"
# We utilize advisory locks to control steps of the processes
# Different locks are held for each step so that the processes stop at each step and
# we can see their progress.
setup
{
CREATE FUNCTION create_progress(bigint, bigint)
RETURNS void
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION update_progress(bigint, bigint)
RETURNS void
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION finish_progress()
RETURNS void
AS 'citus'
LANGUAGE C STRICT;
CREATE OR REPLACE FUNCTION show_progress(bigint)
RETURNS TABLE(step int, progress bigint)
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION sample_operation(command_type bigint, lockid bigint, progress bigint)
RETURNS VOID AS $$
BEGIN
PERFORM create_progress(command_type, 2);
PERFORM pg_advisory_xact_lock(lockid);
PERFORM update_progress(0, progress);
PERFORM pg_advisory_xact_lock(lockid + 1);
PERFORM update_progress(1, progress);
PERFORM pg_advisory_xact_lock(lockid + 2);
PERFORM finish_progress();
END;
$$ LANGUAGE 'plpgsql';
}
teardown
{
DROP FUNCTION IF EXISTS create_progress(bigint, bigint);
DROP FUNCTION IF EXISTS update_progress(bigint, bigint);
DROP FUNCTION IF EXISTS finish_progress();
DROP FUNCTION IF EXISTS show_progress(bigint);
DROP FUNCTION IF EXISTS sample_operation(bigint, bigint, bigint);
}
session "s1"
step "s1-start-operation"
{
SELECT sample_operation(1337, 10, -1);
}
session "s2"
step "s2-start-operation"
{
SELECT sample_operation(1337, 20, 2);
}
session "s3"
step "s3-start-operation"
{
SELECT sample_operation(3778, 30, 9);
}
session "lock-orchestrator"
step "take-locks"
{
-- Locks for steps of sample operation in s1
SELECT pg_advisory_lock(10);
SELECT pg_advisory_lock(11);
SELECT pg_advisory_lock(12);
-- Locks for steps of sample operation in s2
SELECT pg_advisory_lock(20);
SELECT pg_advisory_lock(21);
SELECT pg_advisory_lock(22);
-- Locks for steps of sample operation in s3
SELECT pg_advisory_lock(30);
SELECT pg_advisory_lock(31);
SELECT pg_advisory_lock(32);
}
step "release-locks-1"
{
-- Release the locks of first steps of sample operations
SELECT pg_advisory_unlock(10);
SELECT pg_advisory_unlock(20);
SELECT pg_advisory_unlock(30);
}
step "release-locks-2"
{
-- Release the locks of second steps of sample operations
SELECT pg_advisory_unlock(11);
SELECT pg_advisory_unlock(21);
SELECT pg_advisory_unlock(31);
}
step "release-locks-3"
{
-- Release the locks of final steps of sample operations
SELECT pg_advisory_unlock(12);
SELECT pg_advisory_unlock(22);
SELECT pg_advisory_unlock(32);
}
session "monitor"
step "show-progress"
{
SELECT show_progress(1337);
SELECT show_progress(3778);
}
permutation "take-locks" "s1-start-operation" "s2-start-operation" "s3-start-operation" "show-progress" "release-locks-1" "show-progress" "release-locks-2" "show-progress" "release-locks-3"