From 414dbf9070e9855e6ec418ae2bd24f86aab8ed51 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Wed, 10 Aug 2022 00:50:16 +0200 Subject: [PATCH] capture output of backend running background task --- .../distributed/metadata/metadata_utility.c | 33 +- .../distributed/utils/background_jobs.c | 341 ++++++++++++++++-- src/include/distributed/metadata_utility.h | 1 + 3 files changed, 347 insertions(+), 28 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index e32337fe2..27fc24974 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -2508,7 +2508,7 @@ ResetRunningBackgroundTasks(void) } -static void +void DeepFreeBackgroundTask(BackgroundTask *task) { if (task->pid) @@ -2783,8 +2783,35 @@ UpdateBackgroundTask(BackgroundTask *task) if (task->message) { - Oid messageOid = CStringGetTextDatum(task->message); - UPDATE_FIELD(Anum_pg_dist_background_tasks_message, false, messageOid); + /* + * we check if the old message was either a null pointer or different from what we + * currently have, if any we know that the message has changed and we update the + * message + */ + bool updateMessage = false; + if (isnull[Anum_pg_dist_background_tasks_message - 1]) + { + updateMessage = true; + } + else + { + text *oldMessageText = + DatumGetTextP(values[Anum_pg_dist_background_tasks_message - 1]); + char *oldMessage = text_to_cstring(oldMessageText); + if (strcmp(oldMessage, task->message) != 0) + { + updateMessage = true; + } + } + + if (updateMessage) + { + values[Anum_pg_dist_background_tasks_message - 1] = + CStringGetTextDatum(task->message); + isnull[Anum_pg_dist_background_tasks_message - 1] = false; + replace[Anum_pg_dist_background_tasks_message - 1] = true; + updated |= true; + } } else { diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index c319577ca..9141f030c 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -3,7 +3,10 @@ #include "safe_mem_lib.h" +#include "libpq-fe.h" +#include "libpq/pqformat.h" #include "libpq/pqmq.h" +#include "libpq/pqsignal.h" #include "parser/analyze.h" #include "pgstat.h" #include "storage/dsm.h" @@ -33,17 +36,19 @@ bool BackgroundTaskMonitorDebugDelay = false; /* Table-of-contents constants for our dynamic shared memory segment. */ -#define CITUS_BACKGROUND_JOB_MAGIC 0x51028081 -#define CITUS_BACKGROUND_JOB_KEY_DATABASE 0 -#define CITUS_BACKGROUND_JOB_KEY_USERNAME 1 -#define CITUS_BACKGROUND_JOB_KEY_COMMAND 2 -#define CITUS_BACKGROUND_JOB_KEY_QUEUE 3 -#define CITUS_BACKGROUND_JOB_NKEYS 4 +#define CITUS_BACKGROUND_TASK_MAGIC 0x51028081 +#define CITUS_BACKGROUND_TASK_KEY_DATABASE 0 +#define CITUS_BACKGROUND_TASK_KEY_USERNAME 1 +#define CITUS_BACKGROUND_TASK_KEY_COMMAND 2 +#define CITUS_BACKGROUND_TASK_KEY_QUEUE 3 +#define CITUS_BACKGROUND_TASK_NKEYS 4 static BackgroundWorkerHandle * StartCitusBackgroundJobExecuter(char *database, - char *user, - char *command); + char *user, char *command, + dsm_segment **pSegment); static void ExecuteSqlString(const char *sql); +static void ConsumeTaskWorkerOutput(shm_mq_handle *responseq, BackgroundTask *task, + bool *hadError); static void BackgroundTaskUpdatePid(BackgroundTask *task, pid_t *pid) @@ -126,6 +131,10 @@ CitusBackgroundTaskMonitorMain(Datum arg) * cause conflicts on processing the tasks in the catalog table as well as violate * parallelism guarantees. To make sure there is at most, exactly one backend running * we take a session lock on the CITUS_BACKGROUND_TASK_MONITOR operation. + * + * TODO now that we have a lock, we should install a term handler to terminate any + * 'child' backend when we are terminated. Otherwise we will still have a situation + * where the actual task could be running multiple times. */ LOCKTAG tag = { 0 }; SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_BACKGROUND_TASK_MONITOR); @@ -207,8 +216,10 @@ CitusBackgroundTaskMonitorMain(Datum arg) MemoryContextSwitchTo(perTaskContext); /* TODO find the actual database and username */ + dsm_segment *seg = NULL; BackgroundWorkerHandle *handle = - StartCitusBackgroundJobExecuter("postgres", "nilsdijk", task->command); + StartCitusBackgroundJobExecuter("postgres", "nilsdijk", task->command, + &seg); if (handle == NULL) { @@ -235,6 +246,18 @@ CitusBackgroundTaskMonitorMain(Datum arg) MemoryContextSwitchTo(perTaskContext); + bool hadError = false; + + /* + * We reset the old message (if present). This will only retain the last message + * in the catalog. Otherwise it would concatenate all retries. + */ + if (task->message) + { + pfree(task->message); + } + task->message = NULL; + /* TODO keep polling the task */ while (GetBackgroundWorkerPid(handle, &pid) != BGWH_STOPPED) { @@ -252,18 +275,74 @@ CitusBackgroundTaskMonitorMain(Datum arg) ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); } + + shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_TASK_MAGIC, + dsm_segment_address(seg)); + shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false); + shm_mq_handle *responseq = shm_mq_attach(mq, seg, NULL); + + ConsumeTaskWorkerOutput(responseq, task, &hadError); + + shm_mq_detach(responseq); } StartTransactionCommand(); PushActiveSnapshot(GetTransactionSnapshot()); - /* TODO task can actually also have failed*/ - task->status = BACKGROUND_TASK_STATUS_DONE; + { + shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_TASK_MAGIC, + dsm_segment_address(seg)); + shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false); + shm_mq_handle *responseq = shm_mq_attach(mq, seg, NULL); + + ConsumeTaskWorkerOutput(responseq, task, &hadError); + + shm_mq_detach(responseq); + } + BackgroundTaskUpdatePid(task, NULL); + task->status = BACKGROUND_TASK_STATUS_DONE; + if (hadError) + { + /* + * When we had an error we need to decide if we want to retry (keep the + * scheduled state), or move to failed state + */ + + if (!task->retry_count) + { + /* first retry, need to allocate a counter */ + MemoryContext taskContext = GetMemoryChunkContext(task); + task->retry_count = MemoryContextAlloc(taskContext, sizeof(int32)); + *task->retry_count = 1; + } + else + { + (*task->retry_count)++; + } + + if (*task->retry_count >= 3) + { + /* fail after 3 retries */ + task->status = BACKGROUND_TASK_STATUS_ERROR; + + /* when we error this task, we need to unschedule all dependant tasks */ + UnscheduleDependantTasks(task->taskid); + } + else + { + task->status = BACKGROUND_TASK_STATUS_SCHEDULED; + } + } UpdateBackgroundTask(task); + dsm_detach(seg); + PopActiveSnapshot(); CommitTransactionCommand(); + + DeepFreeBackgroundTask(task); + task = NULL; } MemoryContextSwitchTo(oldContextPerJob); @@ -271,6 +350,212 @@ CitusBackgroundTaskMonitorMain(Datum arg) } +/* + * bgw_generate_returned_message - + * generates the message to be inserted into the job_run_details table + * first part is comming from error_severity (elog.c) + */ +static void +bgw_generate_returned_message(StringInfoData *display_msg, ErrorData edata) +{ + const char *prefix = NULL; + + switch (edata.elevel) + { + case DEBUG1: + case DEBUG2: + case DEBUG3: + case DEBUG4: + case DEBUG5: + { + prefix = gettext_noop("DEBUG"); + break; + } + + case LOG: + case LOG_SERVER_ONLY: + { + prefix = gettext_noop("LOG"); + break; + } + + case INFO: + { + prefix = gettext_noop("INFO"); + break; + } + + case NOTICE: + { + prefix = gettext_noop("NOTICE"); + break; + } + + case WARNING: + { + prefix = gettext_noop("WARNING"); + break; + } + + case ERROR: + { + prefix = gettext_noop("ERROR"); + break; + } + + case FATAL: + { + prefix = gettext_noop("FATAL"); + break; + } + + case PANIC: + { + prefix = gettext_noop("PANIC"); + break; + } + + default: + { + prefix = "???"; + break; + } + } + + appendStringInfo(display_msg, "%s: %s", prefix, edata.message); + if (edata.detail != NULL) + { + appendStringInfo(display_msg, "\nDETAIL: %s", edata.detail); + } + + if (edata.hint != NULL) + { + appendStringInfo(display_msg, "\nHINT: %s", edata.hint); + } + + if (edata.context != NULL) + { + appendStringInfo(display_msg, "\nCONTEXT: %s", edata.context); + } +} + + +static void +ConsumeTaskWorkerOutput(shm_mq_handle *responseq, BackgroundTask *task, bool *hadError) +{ + /* + * Message-parsing routines operate on a null-terminated StringInfo, + * so we must construct one. + */ + StringInfoData msg = { 0 }; + initStringInfo(&msg); + + for (;;) + { + resetStringInfo(&msg); + + /* Get next message. */ + Size nbytes = 0; + void *data = NULL; + shm_mq_result res = shm_mq_receive(responseq, &nbytes, &data, false); + + if (res != SHM_MQ_SUCCESS) + { + break; + } + + enlargeStringInfo(&msg, nbytes); + msg.len = nbytes; + memcpy(msg.data, data, nbytes); + msg.data[nbytes] = '\0'; + + char msgtype = pq_getmsgbyte(&msg); + switch (msgtype) + { + case 'E': /* ERROR */ + { + if (hadError) + { + *hadError = true; + } + + /* fall-through */ + } + + case 'N': /* NOTICE */ + { + ErrorData edata = { 0 }; + StringInfoData display_msg = { 0 }; + + pq_parse_errornotice(&msg, &edata); + initStringInfo(&display_msg); + bgw_generate_returned_message(&display_msg, edata); + + StringInfoData fullMessage = { 0 }; + initStringInfo(&fullMessage); + if (task->message) + { + appendStringInfoString(&fullMessage, task->message); + } + appendStringInfoString(&fullMessage, display_msg.data); + appendStringInfoChar(&fullMessage, '\n'); + + /* + * the task might live in a separate context, hence we find its context + * and allocate a coppy of the message in there + */ + MemoryContext taskContext = GetMemoryChunkContext(task); + task->message = MemoryContextStrdup(taskContext, fullMessage.data); + + pfree(display_msg.data); + pfree(fullMessage.data); + + break; + } + + case 'T': + { + break; + } + + case 'C': /* completed? */ + { + const char *tag = pq_getmsgstring(&msg); + char *nonconst_tag = NULL; + + nonconst_tag = pstrdup(tag); + + /* what does nonconst_tag contain? */ + + task->status = BACKGROUND_TASK_STATUS_DONE; + + pfree(nonconst_tag); + break; + } + + case 'A': + case 'D': + case 'G': + case 'H': + case 'W': + case 'Z': + { + break; + } + + default: + { + elog(WARNING, "unknown message type: %c (%zu bytes)", + msg.data[0], nbytes); + break; + } + } + } + + pfree(msg.data); +} + + static dsm_segment * StoreArgumentsInDSM(char *database, char *username, char *command) { @@ -287,7 +572,7 @@ StoreArgumentsInDSM(char *database, char *username, char *command) shm_toc_estimate_chunk(&e, strlen(command) + 1); #define QUEUE_SIZE ((Size) 65536) shm_toc_estimate_chunk(&e, QUEUE_SIZE); - shm_toc_estimate_keys(&e, CITUS_BACKGROUND_JOB_NKEYS); + shm_toc_estimate_keys(&e, CITUS_BACKGROUND_TASK_NKEYS); Size segsize = shm_toc_estimate(&e); dsm_segment *seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS); @@ -299,26 +584,26 @@ StoreArgumentsInDSM(char *database, char *username, char *command) return NULL; } - shm_toc *toc = shm_toc_create(CITUS_BACKGROUND_JOB_MAGIC, dsm_segment_address(seg), + shm_toc *toc = shm_toc_create(CITUS_BACKGROUND_TASK_MAGIC, dsm_segment_address(seg), segsize); Size size = strlen(database) + 1; char *databaseTarget = shm_toc_allocate(toc, size); strcpy_s(databaseTarget, size, database); - shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_DATABASE, databaseTarget); + shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_DATABASE, databaseTarget); size = strlen(username) + 1; char *usernameTarget = shm_toc_allocate(toc, size); strcpy_s(usernameTarget, size, username); - shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_USERNAME, usernameTarget); + shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_USERNAME, usernameTarget); size = strlen(command) + 1; char *commandTarget = shm_toc_allocate(toc, size); strcpy_s(commandTarget, size, command); - shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_COMMAND, commandTarget); + shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_COMMAND, commandTarget); shm_mq *mq = shm_mq_create(shm_toc_allocate(toc, QUEUE_SIZE), QUEUE_SIZE); - shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_QUEUE, mq); + shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, mq); shm_mq_set_receiver(mq, MyProc); /* @@ -335,15 +620,16 @@ StoreArgumentsInDSM(char *database, char *username, char *command) static BackgroundWorkerHandle * -StartCitusBackgroundJobExecuter(char *database, char *username, char *command) +StartCitusBackgroundJobExecuter(char *database, char *user, char *command, + dsm_segment **segOut) { - dsm_segment *seg = StoreArgumentsInDSM(database, username, command); + dsm_segment *seg = StoreArgumentsInDSM(database, user, command); /* Configure a worker. */ BackgroundWorker worker = { 0 }; memset(&worker, 0, sizeof(worker)); SafeSnprintf(worker.bgw_name, BGW_MAXLEN, "Citus Background Job Executor: %s/%s", - database, username); + database, user); worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; worker.bgw_start_time = BgWorkerStart_ConsistentState; @@ -365,6 +651,11 @@ StartCitusBackgroundJobExecuter(char *database, char *username, char *command) pid_t pid = { 0 }; WaitForBackgroundWorkerStartup(handle, &pid); + if (segOut) + { + *segOut = seg; + } + return handle; } @@ -401,7 +692,7 @@ CitusBackgroundJobExecuter(Datum main_arg) errmsg("unable to map dynamic shared memory segment"))); } - shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_JOB_MAGIC, dsm_segment_address(seg)); + shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_TASK_MAGIC, dsm_segment_address(seg)); if (toc == NULL) { ereport(ERROR, @@ -409,10 +700,10 @@ CitusBackgroundJobExecuter(Datum main_arg) errmsg("bad magic number in dynamic shared memory segment"))); } - char *database = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_DATABASE, false); - char *username = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_USERNAME, false); - char *command = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_COMMAND, false); - shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_QUEUE, false); + char *database = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_DATABASE, false); + char *username = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_USERNAME, false); + char *command = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_COMMAND, false); + shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false); shm_mq_set_sender(mq, MyProc); shm_mq_handle *responseq = shm_mq_attach(mq, seg, NULL); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 2c48f7a50..09efb9805 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -340,6 +340,7 @@ extern BackgroundTask * ScheduleBackgroundTask(char *command, int dependingTaskC extern bool BackgroundTaskHasUmnetDependencies(int64 taskId); extern BackgroundTask * GetRunnableBackgroundTask(void); extern void ResetRunningBackgroundTasks(void); +extern void DeepFreeBackgroundTask(BackgroundTask *task); extern BackgroundTask * GetBackgroundTaskByTaskId(int64 taskId); extern void UpdateBackgroundTask(BackgroundTask *task); extern void UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status,