capture output of backend running background task

background-job-details
Nils Dijk 2022-08-10 00:50:16 +02:00 committed by Jelte Fennema
parent 3db49b43c2
commit 414dbf9070
3 changed files with 347 additions and 28 deletions

View File

@ -2508,7 +2508,7 @@ ResetRunningBackgroundTasks(void)
}
static void
void
DeepFreeBackgroundTask(BackgroundTask *task)
{
if (task->pid)
@ -2783,8 +2783,35 @@ UpdateBackgroundTask(BackgroundTask *task)
if (task->message)
{
Oid messageOid = CStringGetTextDatum(task->message);
UPDATE_FIELD(Anum_pg_dist_background_tasks_message, false, messageOid);
/*
* we check if the old message was either a null pointer or different from what we
* currently have, if any we know that the message has changed and we update the
* message
*/
bool updateMessage = false;
if (isnull[Anum_pg_dist_background_tasks_message - 1])
{
updateMessage = true;
}
else
{
text *oldMessageText =
DatumGetTextP(values[Anum_pg_dist_background_tasks_message - 1]);
char *oldMessage = text_to_cstring(oldMessageText);
if (strcmp(oldMessage, task->message) != 0)
{
updateMessage = true;
}
}
if (updateMessage)
{
values[Anum_pg_dist_background_tasks_message - 1] =
CStringGetTextDatum(task->message);
isnull[Anum_pg_dist_background_tasks_message - 1] = false;
replace[Anum_pg_dist_background_tasks_message - 1] = true;
updated |= true;
}
}
else
{

View File

@ -3,7 +3,10 @@
#include "safe_mem_lib.h"
#include "libpq-fe.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
#include "libpq/pqsignal.h"
#include "parser/analyze.h"
#include "pgstat.h"
#include "storage/dsm.h"
@ -33,17 +36,19 @@
bool BackgroundTaskMonitorDebugDelay = false;
/* Table-of-contents constants for our dynamic shared memory segment. */
#define CITUS_BACKGROUND_JOB_MAGIC 0x51028081
#define CITUS_BACKGROUND_JOB_KEY_DATABASE 0
#define CITUS_BACKGROUND_JOB_KEY_USERNAME 1
#define CITUS_BACKGROUND_JOB_KEY_COMMAND 2
#define CITUS_BACKGROUND_JOB_KEY_QUEUE 3
#define CITUS_BACKGROUND_JOB_NKEYS 4
#define CITUS_BACKGROUND_TASK_MAGIC 0x51028081
#define CITUS_BACKGROUND_TASK_KEY_DATABASE 0
#define CITUS_BACKGROUND_TASK_KEY_USERNAME 1
#define CITUS_BACKGROUND_TASK_KEY_COMMAND 2
#define CITUS_BACKGROUND_TASK_KEY_QUEUE 3
#define CITUS_BACKGROUND_TASK_NKEYS 4
static BackgroundWorkerHandle * StartCitusBackgroundJobExecuter(char *database,
char *user,
char *command);
char *user, char *command,
dsm_segment **pSegment);
static void ExecuteSqlString(const char *sql);
static void ConsumeTaskWorkerOutput(shm_mq_handle *responseq, BackgroundTask *task,
bool *hadError);
static void
BackgroundTaskUpdatePid(BackgroundTask *task, pid_t *pid)
@ -126,6 +131,10 @@ CitusBackgroundTaskMonitorMain(Datum arg)
* cause conflicts on processing the tasks in the catalog table as well as violate
* parallelism guarantees. To make sure there is at most, exactly one backend running
* we take a session lock on the CITUS_BACKGROUND_TASK_MONITOR operation.
*
* TODO now that we have a lock, we should install a term handler to terminate any
* 'child' backend when we are terminated. Otherwise we will still have a situation
* where the actual task could be running multiple times.
*/
LOCKTAG tag = { 0 };
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_BACKGROUND_TASK_MONITOR);
@ -207,8 +216,10 @@ CitusBackgroundTaskMonitorMain(Datum arg)
MemoryContextSwitchTo(perTaskContext);
/* TODO find the actual database and username */
dsm_segment *seg = NULL;
BackgroundWorkerHandle *handle =
StartCitusBackgroundJobExecuter("postgres", "nilsdijk", task->command);
StartCitusBackgroundJobExecuter("postgres", "nilsdijk", task->command,
&seg);
if (handle == NULL)
{
@ -235,6 +246,18 @@ CitusBackgroundTaskMonitorMain(Datum arg)
MemoryContextSwitchTo(perTaskContext);
bool hadError = false;
/*
* We reset the old message (if present). This will only retain the last message
* in the catalog. Otherwise it would concatenate all retries.
*/
if (task->message)
{
pfree(task->message);
}
task->message = NULL;
/* TODO keep polling the task */
while (GetBackgroundWorkerPid(handle, &pid) != BGWH_STOPPED)
{
@ -252,18 +275,74 @@ CitusBackgroundTaskMonitorMain(Datum arg)
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_TASK_MAGIC,
dsm_segment_address(seg));
shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false);
shm_mq_handle *responseq = shm_mq_attach(mq, seg, NULL);
ConsumeTaskWorkerOutput(responseq, task, &hadError);
shm_mq_detach(responseq);
}
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
/* TODO task can actually also have failed*/
task->status = BACKGROUND_TASK_STATUS_DONE;
{
shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_TASK_MAGIC,
dsm_segment_address(seg));
shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false);
shm_mq_handle *responseq = shm_mq_attach(mq, seg, NULL);
ConsumeTaskWorkerOutput(responseq, task, &hadError);
shm_mq_detach(responseq);
}
BackgroundTaskUpdatePid(task, NULL);
task->status = BACKGROUND_TASK_STATUS_DONE;
if (hadError)
{
/*
* When we had an error we need to decide if we want to retry (keep the
* scheduled state), or move to failed state
*/
if (!task->retry_count)
{
/* first retry, need to allocate a counter */
MemoryContext taskContext = GetMemoryChunkContext(task);
task->retry_count = MemoryContextAlloc(taskContext, sizeof(int32));
*task->retry_count = 1;
}
else
{
(*task->retry_count)++;
}
if (*task->retry_count >= 3)
{
/* fail after 3 retries */
task->status = BACKGROUND_TASK_STATUS_ERROR;
/* when we error this task, we need to unschedule all dependant tasks */
UnscheduleDependantTasks(task->taskid);
}
else
{
task->status = BACKGROUND_TASK_STATUS_SCHEDULED;
}
}
UpdateBackgroundTask(task);
dsm_detach(seg);
PopActiveSnapshot();
CommitTransactionCommand();
DeepFreeBackgroundTask(task);
task = NULL;
}
MemoryContextSwitchTo(oldContextPerJob);
@ -271,6 +350,212 @@ CitusBackgroundTaskMonitorMain(Datum arg)
}
/*
* bgw_generate_returned_message -
* generates the message to be inserted into the job_run_details table
* first part is comming from error_severity (elog.c)
*/
static void
bgw_generate_returned_message(StringInfoData *display_msg, ErrorData edata)
{
const char *prefix = NULL;
switch (edata.elevel)
{
case DEBUG1:
case DEBUG2:
case DEBUG3:
case DEBUG4:
case DEBUG5:
{
prefix = gettext_noop("DEBUG");
break;
}
case LOG:
case LOG_SERVER_ONLY:
{
prefix = gettext_noop("LOG");
break;
}
case INFO:
{
prefix = gettext_noop("INFO");
break;
}
case NOTICE:
{
prefix = gettext_noop("NOTICE");
break;
}
case WARNING:
{
prefix = gettext_noop("WARNING");
break;
}
case ERROR:
{
prefix = gettext_noop("ERROR");
break;
}
case FATAL:
{
prefix = gettext_noop("FATAL");
break;
}
case PANIC:
{
prefix = gettext_noop("PANIC");
break;
}
default:
{
prefix = "???";
break;
}
}
appendStringInfo(display_msg, "%s: %s", prefix, edata.message);
if (edata.detail != NULL)
{
appendStringInfo(display_msg, "\nDETAIL: %s", edata.detail);
}
if (edata.hint != NULL)
{
appendStringInfo(display_msg, "\nHINT: %s", edata.hint);
}
if (edata.context != NULL)
{
appendStringInfo(display_msg, "\nCONTEXT: %s", edata.context);
}
}
static void
ConsumeTaskWorkerOutput(shm_mq_handle *responseq, BackgroundTask *task, bool *hadError)
{
/*
* Message-parsing routines operate on a null-terminated StringInfo,
* so we must construct one.
*/
StringInfoData msg = { 0 };
initStringInfo(&msg);
for (;;)
{
resetStringInfo(&msg);
/* Get next message. */
Size nbytes = 0;
void *data = NULL;
shm_mq_result res = shm_mq_receive(responseq, &nbytes, &data, false);
if (res != SHM_MQ_SUCCESS)
{
break;
}
enlargeStringInfo(&msg, nbytes);
msg.len = nbytes;
memcpy(msg.data, data, nbytes);
msg.data[nbytes] = '\0';
char msgtype = pq_getmsgbyte(&msg);
switch (msgtype)
{
case 'E': /* ERROR */
{
if (hadError)
{
*hadError = true;
}
/* fall-through */
}
case 'N': /* NOTICE */
{
ErrorData edata = { 0 };
StringInfoData display_msg = { 0 };
pq_parse_errornotice(&msg, &edata);
initStringInfo(&display_msg);
bgw_generate_returned_message(&display_msg, edata);
StringInfoData fullMessage = { 0 };
initStringInfo(&fullMessage);
if (task->message)
{
appendStringInfoString(&fullMessage, task->message);
}
appendStringInfoString(&fullMessage, display_msg.data);
appendStringInfoChar(&fullMessage, '\n');
/*
* the task might live in a separate context, hence we find its context
* and allocate a coppy of the message in there
*/
MemoryContext taskContext = GetMemoryChunkContext(task);
task->message = MemoryContextStrdup(taskContext, fullMessage.data);
pfree(display_msg.data);
pfree(fullMessage.data);
break;
}
case 'T':
{
break;
}
case 'C': /* completed? */
{
const char *tag = pq_getmsgstring(&msg);
char *nonconst_tag = NULL;
nonconst_tag = pstrdup(tag);
/* what does nonconst_tag contain? */
task->status = BACKGROUND_TASK_STATUS_DONE;
pfree(nonconst_tag);
break;
}
case 'A':
case 'D':
case 'G':
case 'H':
case 'W':
case 'Z':
{
break;
}
default:
{
elog(WARNING, "unknown message type: %c (%zu bytes)",
msg.data[0], nbytes);
break;
}
}
}
pfree(msg.data);
}
static dsm_segment *
StoreArgumentsInDSM(char *database, char *username, char *command)
{
@ -287,7 +572,7 @@ StoreArgumentsInDSM(char *database, char *username, char *command)
shm_toc_estimate_chunk(&e, strlen(command) + 1);
#define QUEUE_SIZE ((Size) 65536)
shm_toc_estimate_chunk(&e, QUEUE_SIZE);
shm_toc_estimate_keys(&e, CITUS_BACKGROUND_JOB_NKEYS);
shm_toc_estimate_keys(&e, CITUS_BACKGROUND_TASK_NKEYS);
Size segsize = shm_toc_estimate(&e);
dsm_segment *seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
@ -299,26 +584,26 @@ StoreArgumentsInDSM(char *database, char *username, char *command)
return NULL;
}
shm_toc *toc = shm_toc_create(CITUS_BACKGROUND_JOB_MAGIC, dsm_segment_address(seg),
shm_toc *toc = shm_toc_create(CITUS_BACKGROUND_TASK_MAGIC, dsm_segment_address(seg),
segsize);
Size size = strlen(database) + 1;
char *databaseTarget = shm_toc_allocate(toc, size);
strcpy_s(databaseTarget, size, database);
shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_DATABASE, databaseTarget);
shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_DATABASE, databaseTarget);
size = strlen(username) + 1;
char *usernameTarget = shm_toc_allocate(toc, size);
strcpy_s(usernameTarget, size, username);
shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_USERNAME, usernameTarget);
shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_USERNAME, usernameTarget);
size = strlen(command) + 1;
char *commandTarget = shm_toc_allocate(toc, size);
strcpy_s(commandTarget, size, command);
shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_COMMAND, commandTarget);
shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_COMMAND, commandTarget);
shm_mq *mq = shm_mq_create(shm_toc_allocate(toc, QUEUE_SIZE), QUEUE_SIZE);
shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_QUEUE, mq);
shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, mq);
shm_mq_set_receiver(mq, MyProc);
/*
@ -335,15 +620,16 @@ StoreArgumentsInDSM(char *database, char *username, char *command)
static BackgroundWorkerHandle *
StartCitusBackgroundJobExecuter(char *database, char *username, char *command)
StartCitusBackgroundJobExecuter(char *database, char *user, char *command,
dsm_segment **segOut)
{
dsm_segment *seg = StoreArgumentsInDSM(database, username, command);
dsm_segment *seg = StoreArgumentsInDSM(database, user, command);
/* Configure a worker. */
BackgroundWorker worker = { 0 };
memset(&worker, 0, sizeof(worker));
SafeSnprintf(worker.bgw_name, BGW_MAXLEN, "Citus Background Job Executor: %s/%s",
database, username);
database, user);
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
@ -365,6 +651,11 @@ StartCitusBackgroundJobExecuter(char *database, char *username, char *command)
pid_t pid = { 0 };
WaitForBackgroundWorkerStartup(handle, &pid);
if (segOut)
{
*segOut = seg;
}
return handle;
}
@ -401,7 +692,7 @@ CitusBackgroundJobExecuter(Datum main_arg)
errmsg("unable to map dynamic shared memory segment")));
}
shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_JOB_MAGIC, dsm_segment_address(seg));
shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_TASK_MAGIC, dsm_segment_address(seg));
if (toc == NULL)
{
ereport(ERROR,
@ -409,10 +700,10 @@ CitusBackgroundJobExecuter(Datum main_arg)
errmsg("bad magic number in dynamic shared memory segment")));
}
char *database = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_DATABASE, false);
char *username = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_USERNAME, false);
char *command = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_COMMAND, false);
shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_QUEUE, false);
char *database = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_DATABASE, false);
char *username = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_USERNAME, false);
char *command = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_COMMAND, false);
shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false);
shm_mq_set_sender(mq, MyProc);
shm_mq_handle *responseq = shm_mq_attach(mq, seg, NULL);

View File

@ -340,6 +340,7 @@ extern BackgroundTask * ScheduleBackgroundTask(char *command, int dependingTaskC
extern bool BackgroundTaskHasUmnetDependencies(int64 taskId);
extern BackgroundTask * GetRunnableBackgroundTask(void);
extern void ResetRunningBackgroundTasks(void);
extern void DeepFreeBackgroundTask(BackgroundTask *task);
extern BackgroundTask * GetBackgroundTaskByTaskId(int64 taskId);
extern void UpdateBackgroundTask(BackgroundTask *task);
extern void UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status,