mirror of https://github.com/citusdata/citus.git
2026 lines
60 KiB
C
2026 lines
60 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* background_jobs.c
|
|
* Background jobs run as a background worker, spawned from the
|
|
* maintenance daemon. Jobs have tasks, tasks can depend on other
|
|
* tasks before execution.
|
|
*
|
|
* This file contains the code for two separate background workers to
|
|
* achieve the goal of running background tasks asynchronously from the
|
|
* main database workload. This first background worker is the
|
|
* Background Tasks Queue Monitor. This background worker keeps track of
|
|
* tasks recorded in pg_dist_background_task and ensures execution based
|
|
* on a statemachine. When a task needs to be executed it starts a
|
|
* Background Task Executor that executes the sql statement defined in the
|
|
* task. The output of the Executor is shared with the Monitor via a
|
|
* shared memory queue.
|
|
*
|
|
* To make sure there is only ever exactly one monitor running per database
|
|
* it takes an exclusive lock on the CITUS_BACKGROUND_TASK_MONITOR
|
|
* operation. This lock is consulted from the maintenance daemon to only
|
|
* spawn a new monitor when the lock is not held.
|
|
*
|
|
* Copyright (c) Citus Data, Inc.
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "libpq-fe.h"
|
|
#include "pgstat.h"
|
|
#include "safe_mem_lib.h"
|
|
|
|
#include "access/xact.h"
|
|
#include "commands/dbcommands.h"
|
|
#include "common/hashfn.h"
|
|
#include "libpq/pqformat.h"
|
|
#include "libpq/pqmq.h"
|
|
#include "libpq/pqsignal.h"
|
|
#include "parser/analyze.h"
|
|
#include "storage/dsm.h"
|
|
#include "storage/ipc.h"
|
|
#include "storage/procarray.h"
|
|
#include "storage/shm_mq.h"
|
|
#include "storage/shm_toc.h"
|
|
#include "tcop/pquery.h"
|
|
#include "tcop/tcopprot.h"
|
|
#include "tcop/utility.h"
|
|
#include "utils/fmgrprotos.h"
|
|
#include "utils/hsearch.h"
|
|
#include "utils/memutils.h"
|
|
#include "utils/portal.h"
|
|
#include "utils/ps_status.h"
|
|
#include "utils/resowner.h"
|
|
#include "utils/snapmgr.h"
|
|
#include "utils/timeout.h"
|
|
|
|
#include "distributed/background_jobs.h"
|
|
#include "distributed/citus_safe_lib.h"
|
|
#include "distributed/hash_helpers.h"
|
|
#include "distributed/listutils.h"
|
|
#include "distributed/maintenanced.h"
|
|
#include "distributed/metadata_cache.h"
|
|
#include "distributed/metadata_utility.h"
|
|
#include "distributed/resource_lock.h"
|
|
#include "distributed/shard_cleaner.h"
|
|
#include "distributed/shard_rebalancer.h"
|
|
|
|
/* Table-of-contents constants for our dynamic shared memory segment. */
|
|
#define CITUS_BACKGROUND_TASK_MAGIC 0x51028081
|
|
#define CITUS_BACKGROUND_TASK_KEY_DATABASE 0
|
|
#define CITUS_BACKGROUND_TASK_KEY_USERNAME 1
|
|
#define CITUS_BACKGROUND_TASK_KEY_COMMAND 2
|
|
#define CITUS_BACKGROUND_TASK_KEY_QUEUE 3
|
|
#define CITUS_BACKGROUND_TASK_KEY_TASK_ID 4
|
|
#define CITUS_BACKGROUND_TASK_KEY_JOB_ID 5
|
|
#define CITUS_BACKGROUND_TASK_NKEYS 6
|
|
|
|
static BackgroundWorkerHandle * StartCitusBackgroundTaskExecutor(char *database,
|
|
char *user,
|
|
char *command,
|
|
int64 taskId,
|
|
int64 jobId,
|
|
dsm_segment **pSegment);
|
|
static void ExecuteSqlString(const char *sql);
|
|
static shm_mq_result ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message,
|
|
bool *hadError);
|
|
static void UpdateDependingTasks(BackgroundTask *task);
|
|
static int64 CalculateBackoffDelay(int retryCount);
|
|
static bool NewExecutorExceedsCitusLimit(
|
|
QueueMonitorExecutionContext *queueMonitorExecutionContext);
|
|
static bool NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle,
|
|
QueueMonitorExecutionContext *
|
|
queueMonitorExecutionContext);
|
|
static bool AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask,
|
|
QueueMonitorExecutionContext *
|
|
queueMonitorExecutionContext);
|
|
static void AssignRunnableTasks(
|
|
QueueMonitorExecutionContext *queueMonitorExecutionContext);
|
|
static List * GetRunningTaskEntries(HTAB *currentExecutors);
|
|
static shm_mq_result ReadFromExecutorQueue(
|
|
BackgroundExecutorHashEntry *backgroundExecutorHashEntry,
|
|
bool *hadError);
|
|
static void CheckAndResetLastWorkerAllocationFailure(
|
|
QueueMonitorExecutionContext *queueMonitorExecutionContext);
|
|
static TaskExecutionStatus TaskConcurrentCancelCheck(
|
|
TaskExecutionContext *taskExecutionContext);
|
|
static TaskExecutionStatus ConsumeExecutorQueue(
|
|
TaskExecutionContext *taskExecutionContext);
|
|
static void TaskHadError(TaskExecutionContext *taskExecutionContext);
|
|
static void TaskEnded(TaskExecutionContext *taskExecutionContext);
|
|
static void TerminateAllTaskExecutors(HTAB *currentExecutors);
|
|
static HTAB * GetRunningUniqueJobIds(HTAB *currentExecutors);
|
|
static void CancelAllTaskExecutors(HTAB *currentExecutors);
|
|
static bool MonitorGotTerminationOrCancellationRequest();
|
|
static void QueueMonitorSigTermHandler(SIGNAL_ARGS);
|
|
static void QueueMonitorSigIntHandler(SIGNAL_ARGS);
|
|
static void QueueMonitorSigHupHandler(SIGNAL_ARGS);
|
|
static void DecrementParallelTaskCountForNodesInvolved(BackgroundTask *task);
|
|
|
|
/* flags set by signal handlers */
|
|
static volatile sig_atomic_t GotSigterm = false;
|
|
static volatile sig_atomic_t GotSigint = false;
|
|
static volatile sig_atomic_t GotSighup = false;
|
|
|
|
/* keeping track of parallel background tasks per node */
|
|
HTAB *ParallelTasksPerNode = NULL;
|
|
int MaxBackgroundTaskExecutorsPerNode = 1;
|
|
|
|
PG_FUNCTION_INFO_V1(citus_job_cancel);
|
|
PG_FUNCTION_INFO_V1(citus_job_wait);
|
|
PG_FUNCTION_INFO_V1(citus_task_wait);
|
|
|
|
|
|
/*
|
|
* pg_catalog.citus_job_cancel(jobid bigint) void
|
|
* cancels a scheduled/running job
|
|
*
|
|
* When cancelling a job there are two phases.
|
|
* 1. scan all associated tasks and transition all tasks that are not already in their
|
|
* terminal state to cancelled. Except if the task is currently running.
|
|
* 2. for all running tasks we send a cancelation signal to the backend running the
|
|
* query. The background executor/monitor will transition this task to cancelled.
|
|
*
|
|
* We apply the same policy checks as pg_cancel_backend to check if a user can cancel a
|
|
* job.
|
|
*/
|
|
Datum
|
|
citus_job_cancel(PG_FUNCTION_ARGS)
|
|
{
|
|
CheckCitusVersion(ERROR);
|
|
EnsureCoordinator();
|
|
|
|
int64 jobid = PG_GETARG_INT64(0);
|
|
|
|
/* Cancel all tasks that were scheduled before */
|
|
List *pids = CancelTasksForJob(jobid);
|
|
|
|
/* send cancellation to any running backends */
|
|
int pid = 0;
|
|
foreach_int(pid, pids)
|
|
{
|
|
Datum pidDatum = Int32GetDatum(pid);
|
|
Datum signalSuccessDatum = DirectFunctionCall1(pg_cancel_backend, pidDatum);
|
|
bool signalSuccess = DatumGetBool(signalSuccessDatum);
|
|
if (!signalSuccess)
|
|
{
|
|
ereport(WARNING, (errmsg("could not send signal to process %d: %m", pid)));
|
|
}
|
|
}
|
|
|
|
UpdateBackgroundJob(jobid);
|
|
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
|
|
/*
|
|
* pg_catalog.citus_job_wait(jobid bigint,
|
|
* desired_status citus_job_status DEFAULT NULL) boolean
|
|
* waits till a job reaches a desired status, or can't reach the status anymore because
|
|
* it reached a (different) terminal state. When no desired_status is given it will
|
|
* assume any terminal state as its desired status. The function returns if the
|
|
* desired_state was reached.
|
|
*
|
|
* The current implementation is a polling implementation with an interval of 1 second.
|
|
* Ideally we would have some synchronization between the background tasks queue monitor
|
|
* and any backend calling this function to receive a signal when the job changes state.
|
|
*/
|
|
Datum
|
|
citus_job_wait(PG_FUNCTION_ARGS)
|
|
{
|
|
CheckCitusVersion(ERROR);
|
|
EnsureCoordinator();
|
|
|
|
int64 jobid = PG_GETARG_INT64(0);
|
|
|
|
/* parse the optional desired_status argument */
|
|
bool hasDesiredStatus = !PG_ARGISNULL(1);
|
|
BackgroundJobStatus desiredStatus = { 0 };
|
|
if (hasDesiredStatus)
|
|
{
|
|
desiredStatus = BackgroundJobStatusByOid(PG_GETARG_OID(1));
|
|
}
|
|
|
|
citus_job_wait_internal(jobid, hasDesiredStatus ? &desiredStatus : NULL);
|
|
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
|
|
/*
|
|
* pg_catalog.citus_task_wait(taskid bigint,
|
|
* desired_status citus_task_status DEFAULT NULL) boolean
|
|
* waits till a task reaches a desired status, or can't reach the status anymore because
|
|
* it reached a (different) terminal state. When no desired_status is given it will
|
|
* assume any terminal state as its desired status. The function returns if the
|
|
* desired_state was reached.
|
|
*
|
|
* The current implementation is a polling implementation with an interval of 0.1 seconds.
|
|
* Ideally we would have some synchronization between the background tasks queue monitor
|
|
* and any backend calling this function to receive a signal when the task changes state.
|
|
*/
|
|
Datum
|
|
citus_task_wait(PG_FUNCTION_ARGS)
|
|
{
|
|
CheckCitusVersion(ERROR);
|
|
EnsureCoordinator();
|
|
|
|
int64 taskid = PG_GETARG_INT64(0);
|
|
|
|
/* parse the optional desired_status argument */
|
|
bool hasDesiredStatus = !PG_ARGISNULL(1);
|
|
BackgroundTaskStatus desiredStatus = { 0 };
|
|
if (hasDesiredStatus)
|
|
{
|
|
desiredStatus = BackgroundTaskStatusByOid(PG_GETARG_OID(1));
|
|
}
|
|
|
|
citus_task_wait_internal(taskid, hasDesiredStatus ? &desiredStatus : NULL);
|
|
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
|
|
/*
|
|
* citus_job_wait_internal implements the waiting on a job for reuse in other areas where
|
|
* we want to wait on jobs. eg the background rebalancer.
|
|
*
|
|
* When a desiredStatus is provided it will provide an error when a different state is
|
|
* reached and the state cannot ever reach the desired state anymore.
|
|
*/
|
|
void
|
|
citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus)
|
|
{
|
|
/*
|
|
* Since we are wait polling we will actually allocate memory on every poll. To make
|
|
* sure we don't put unneeded pressure on the memory we create a context that we clear
|
|
* every iteration.
|
|
*/
|
|
MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext,
|
|
"JobsWaitContext",
|
|
ALLOCSET_DEFAULT_MINSIZE,
|
|
ALLOCSET_DEFAULT_INITSIZE,
|
|
ALLOCSET_DEFAULT_MAXSIZE);
|
|
MemoryContext oldContext = MemoryContextSwitchTo(waitContext);
|
|
|
|
while (true)
|
|
{
|
|
MemoryContextReset(waitContext);
|
|
|
|
BackgroundJob *job = GetBackgroundJobByJobId(jobid);
|
|
if (!job)
|
|
{
|
|
ereport(ERROR, (errmsg("no job found for job with jobid: %ld", jobid)));
|
|
}
|
|
|
|
if (desiredStatus && job->state == *desiredStatus)
|
|
{
|
|
/* job has reached its desired status, done waiting */
|
|
break;
|
|
}
|
|
|
|
if (IsBackgroundJobStatusTerminal(job->state))
|
|
{
|
|
if (desiredStatus)
|
|
{
|
|
/*
|
|
* We have reached a terminal state, which is not the desired state we
|
|
* were waiting for, otherwise we would have escaped earlier. Since it is
|
|
* a terminal state we know that we can never reach the desired state.
|
|
*/
|
|
|
|
Oid reachedStatusOid = BackgroundJobStatusOid(job->state);
|
|
Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out,
|
|
reachedStatusOid);
|
|
char *reachedStatusName = DatumGetCString(reachedStatusNameDatum);
|
|
|
|
Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus);
|
|
Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out,
|
|
desiredStatusOid);
|
|
char *desiredStatusName = DatumGetCString(desiredStatusNameDatum);
|
|
|
|
ereport(ERROR,
|
|
(errmsg("Job reached terminal state \"%s\" instead of desired "
|
|
"state \"%s\"", reachedStatusName, desiredStatusName)));
|
|
}
|
|
|
|
/* job has reached its terminal state, done waiting */
|
|
break;
|
|
}
|
|
|
|
/* sleep for a while, before rechecking the job status */
|
|
CHECK_FOR_INTERRUPTS();
|
|
const long delay_ms = 1000;
|
|
(void) WaitLatch(MyLatch,
|
|
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
|
delay_ms,
|
|
WAIT_EVENT_PG_SLEEP);
|
|
|
|
ResetLatch(MyLatch);
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldContext);
|
|
MemoryContextDelete(waitContext);
|
|
}
|
|
|
|
|
|
/*
|
|
* citus_task_wait_internal implements the waiting on a task for reuse in other areas where
|
|
* we want to wait on tasks.
|
|
*
|
|
* When a desiredStatus is provided it will provide an error when a different state is
|
|
* reached and the state cannot ever reach the desired state anymore.
|
|
*/
|
|
void
|
|
citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus)
|
|
{
|
|
/*
|
|
* Since we are wait polling we will actually allocate memory on every poll. To make
|
|
* sure we don't put unneeded pressure on the memory we create a context that we clear
|
|
* every iteration.
|
|
*/
|
|
MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext,
|
|
"TasksWaitContext",
|
|
ALLOCSET_DEFAULT_MINSIZE,
|
|
ALLOCSET_DEFAULT_INITSIZE,
|
|
ALLOCSET_DEFAULT_MAXSIZE);
|
|
MemoryContext oldContext = MemoryContextSwitchTo(waitContext);
|
|
|
|
while (true)
|
|
{
|
|
MemoryContextReset(waitContext);
|
|
|
|
BackgroundTask *task = GetBackgroundTaskByTaskId(taskid);
|
|
if (!task)
|
|
{
|
|
ereport(ERROR, (errmsg("no task found with taskid: %ld", taskid)));
|
|
}
|
|
|
|
if (desiredStatus && task->status == *desiredStatus)
|
|
{
|
|
/* task has reached its desired status, done waiting */
|
|
break;
|
|
}
|
|
|
|
if (IsBackgroundTaskStatusTerminal(task->status))
|
|
{
|
|
if (desiredStatus)
|
|
{
|
|
/*
|
|
* We have reached a terminal state, which is not the desired state we
|
|
* were waiting for, otherwise we would have escaped earlier. Since it is
|
|
* a terminal state we know that we can never reach the desired state.
|
|
*/
|
|
|
|
Oid reachedStatusOid = BackgroundTaskStatusOid(task->status);
|
|
Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out,
|
|
reachedStatusOid);
|
|
char *reachedStatusName = DatumGetCString(reachedStatusNameDatum);
|
|
|
|
Oid desiredStatusOid = BackgroundTaskStatusOid(*desiredStatus);
|
|
Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out,
|
|
desiredStatusOid);
|
|
char *desiredStatusName = DatumGetCString(desiredStatusNameDatum);
|
|
|
|
ereport(ERROR,
|
|
(errmsg("Task reached terminal state \"%s\" instead of desired "
|
|
"state \"%s\"", reachedStatusName, desiredStatusName)));
|
|
}
|
|
|
|
/* task has reached its terminal state, done waiting */
|
|
break;
|
|
}
|
|
|
|
/* sleep for a while, before rechecking the task status */
|
|
CHECK_FOR_INTERRUPTS();
|
|
const long delay_ms = 100;
|
|
(void) WaitLatch(MyLatch,
|
|
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
|
delay_ms,
|
|
WAIT_EVENT_PG_SLEEP);
|
|
|
|
ResetLatch(MyLatch);
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldContext);
|
|
MemoryContextDelete(waitContext);
|
|
}
|
|
|
|
|
|
/*
|
|
* StartCitusBackgroundTaskQueueMonitor spawns a new background worker connected to the
|
|
* current database and owner. This background worker consumes the tasks that are ready
|
|
* for execution.
|
|
*/
|
|
BackgroundWorkerHandle *
|
|
StartCitusBackgroundTaskQueueMonitor(Oid database, Oid extensionOwner)
|
|
{
|
|
BackgroundWorker worker = { 0 };
|
|
BackgroundWorkerHandle *handle = NULL;
|
|
|
|
/* Configure a worker. */
|
|
memset(&worker, 0, sizeof(worker));
|
|
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
|
|
"Citus Background Task Queue Monitor: %u/%u",
|
|
database, extensionOwner);
|
|
worker.bgw_flags =
|
|
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
|
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
|
|
|
/* don't restart, we manage restarts from maintenance daemon */
|
|
worker.bgw_restart_time = BGW_NEVER_RESTART;
|
|
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
|
|
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
|
"CitusBackgroundTaskQueueMonitorMain");
|
|
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
|
|
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
|
|
sizeof(Oid));
|
|
worker.bgw_notify_pid = MyProcPid;
|
|
|
|
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
|
{
|
|
return NULL;
|
|
}
|
|
|
|
pid_t pid;
|
|
WaitForBackgroundWorkerStartup(handle, &pid);
|
|
|
|
return handle;
|
|
}
|
|
|
|
|
|
/*
|
|
* context for any log/error messages emitted from the background task queue monitor.
|
|
*/
|
|
typedef struct CitusBackgroundTaskQueueMonitorErrorCallbackContext
|
|
{
|
|
const char *database;
|
|
} CitusBackgroundTaskQueueMonitorCallbackContext;
|
|
|
|
|
|
/*
|
|
* CitusBackgroundTaskQueueMonitorErrorCallback is a callback handler that gets called for
|
|
* any ereport to add extra context to the message.
|
|
*/
|
|
static void
|
|
CitusBackgroundTaskQueueMonitorErrorCallback(void *arg)
|
|
{
|
|
CitusBackgroundTaskQueueMonitorCallbackContext *context =
|
|
(CitusBackgroundTaskQueueMonitorCallbackContext *) arg;
|
|
errcontext("Citus Background Task Queue Monitor: %s", context->database);
|
|
}
|
|
|
|
|
|
/*
|
|
* NewExecutorExceedsCitusLimit returns true if currently we reached Citus' max worker count.
|
|
*/
|
|
static bool
|
|
NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *queueMonitorExecutionContext)
|
|
{
|
|
if (queueMonitorExecutionContext->currentExecutorCount >= MaxBackgroundTaskExecutors)
|
|
{
|
|
/*
|
|
* we hit to Citus' maximum task executor count. Warn for the first failure
|
|
* after a successful worker allocation happened, that is, we do not warn if
|
|
* we repeatedly come here without a successful worker allocation.
|
|
*/
|
|
if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime == 0)
|
|
{
|
|
ereport(WARNING, (errmsg("unable to start background worker for "
|
|
"background task execution"),
|
|
errdetail(
|
|
"Already reached the maximum number of task "
|
|
"executors: %ld/%d",
|
|
queueMonitorExecutionContext->currentExecutorCount,
|
|
MaxBackgroundTaskExecutors)));
|
|
queueMonitorExecutionContext->backgroundWorkerFailedStartTime =
|
|
GetCurrentTimestamp();
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
|
|
/*
|
|
* NewExecutorExceedsPgMaxWorkers returns true if currently we reached Postgres' max worker count.
|
|
*/
|
|
static bool
|
|
NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle,
|
|
QueueMonitorExecutionContext *queueMonitorExecutionContext)
|
|
{
|
|
if (handle == NULL)
|
|
{
|
|
/*
|
|
* we are unable to start a background worker for the task execution.
|
|
* Probably we are out of background workers. Warn for the first failure
|
|
* after a successful worker allocation happened, that is, we do not warn if
|
|
* we repeatedly come here without a successful worker allocation.
|
|
*/
|
|
if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime == 0)
|
|
{
|
|
ereport(WARNING, (errmsg("unable to start background worker for "
|
|
"background task execution"),
|
|
errdetail(
|
|
"Current number of task "
|
|
"executors: %ld/%d",
|
|
queueMonitorExecutionContext->currentExecutorCount,
|
|
MaxBackgroundTaskExecutors)));
|
|
queueMonitorExecutionContext->backgroundWorkerFailedStartTime =
|
|
GetCurrentTimestamp();
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
|
|
/*
|
|
* AssignRunnableTaskToNewExecutor tries to assign given runnable task to a new task executor.
|
|
* It reports the assignment status as return value.
|
|
*/
|
|
static bool
|
|
AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask,
|
|
QueueMonitorExecutionContext *queueMonitorExecutionContext)
|
|
{
|
|
Assert(runnableTask && runnableTask->status == BACKGROUND_TASK_STATUS_RUNNABLE);
|
|
|
|
if (NewExecutorExceedsCitusLimit(queueMonitorExecutionContext))
|
|
{
|
|
/* escape if we hit citus executor limit */
|
|
return false;
|
|
}
|
|
|
|
char *databaseName = get_database_name(MyDatabaseId);
|
|
char *userName = GetUserNameFromId(runnableTask->owner, false);
|
|
|
|
/* try to create new executor and make it alive during queue monitor lifetime */
|
|
MemoryContext oldContext = MemoryContextSwitchTo(queueMonitorExecutionContext->ctx);
|
|
dsm_segment *seg = NULL;
|
|
BackgroundWorkerHandle *handle =
|
|
StartCitusBackgroundTaskExecutor(databaseName, userName, runnableTask->command,
|
|
runnableTask->taskid, runnableTask->jobid, &seg);
|
|
MemoryContextSwitchTo(oldContext);
|
|
|
|
if (NewExecutorExceedsPgMaxWorkers(handle, queueMonitorExecutionContext))
|
|
{
|
|
/* escape if we hit pg worker limit */
|
|
return false;
|
|
}
|
|
|
|
/* assign the allocated executor to the runnable task and increment total executor count */
|
|
bool handleEntryFound = false;
|
|
BackgroundExecutorHashEntry *handleEntry = hash_search(
|
|
queueMonitorExecutionContext->currentExecutors,
|
|
&runnableTask->taskid,
|
|
HASH_ENTER, &handleEntryFound);
|
|
Assert(!handleEntryFound);
|
|
handleEntry->handle = handle;
|
|
handleEntry->seg = seg;
|
|
handleEntry->jobid = runnableTask->jobid;
|
|
|
|
/* reset worker allocation timestamp and log time elapsed since the last failure */
|
|
CheckAndResetLastWorkerAllocationFailure(queueMonitorExecutionContext);
|
|
|
|
/* make message alive during queue monitor lifetime */
|
|
oldContext = MemoryContextSwitchTo(queueMonitorExecutionContext->ctx);
|
|
handleEntry->message = makeStringInfo();
|
|
MemoryContextSwitchTo(oldContext);
|
|
|
|
/* set runnable task's status as running */
|
|
runnableTask->status = BACKGROUND_TASK_STATUS_RUNNING;
|
|
UpdateBackgroundTask(runnableTask);
|
|
UpdateBackgroundJob(runnableTask->jobid);
|
|
|
|
queueMonitorExecutionContext->currentExecutorCount++;
|
|
|
|
ereport(LOG, (errmsg("task jobid/taskid started: %ld/%ld",
|
|
runnableTask->jobid, runnableTask->taskid)));
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
/*
|
|
* AssignRunnableTasks tries to assign all runnable tasks to a new task executor.
|
|
* If an assignment fails, it stops in case we hit some limitation. We do not load
|
|
* all the runnable tasks in memory at once as it can load memory much + we have
|
|
* limited worker to which we can assign task.
|
|
*/
|
|
static void
|
|
AssignRunnableTasks(QueueMonitorExecutionContext *queueMonitorExecutionContext)
|
|
{
|
|
BackgroundTask *runnableTask = NULL;
|
|
bool taskAssigned = false;
|
|
do {
|
|
/* fetch a runnable task from catalog */
|
|
runnableTask = GetRunnableBackgroundTask();
|
|
if (runnableTask)
|
|
{
|
|
taskAssigned = AssignRunnableTaskToNewExecutor(runnableTask,
|
|
queueMonitorExecutionContext);
|
|
}
|
|
else
|
|
{
|
|
taskAssigned = false;
|
|
}
|
|
} while (taskAssigned);
|
|
}
|
|
|
|
|
|
/*
|
|
* GetRunningTaskEntries returns list of BackgroundExecutorHashEntry from given hash table
|
|
*/
|
|
static List *
|
|
GetRunningTaskEntries(HTAB *currentExecutors)
|
|
{
|
|
List *runningTaskEntries = NIL;
|
|
|
|
HASH_SEQ_STATUS status;
|
|
BackgroundExecutorHashEntry *backgroundExecutorHashEntry;
|
|
foreach_htab(backgroundExecutorHashEntry, &status, currentExecutors)
|
|
{
|
|
runningTaskEntries = lappend(runningTaskEntries, backgroundExecutorHashEntry);
|
|
}
|
|
|
|
return runningTaskEntries;
|
|
}
|
|
|
|
|
|
/*
|
|
* CheckAndResetLastWorkerAllocationFailure checks the last time background worker allocation
|
|
* is failed. If it is set, we print how long we have waited to successfully allocate the worker.
|
|
* It also resets the failure timestamp.
|
|
*/
|
|
static void
|
|
CheckAndResetLastWorkerAllocationFailure(
|
|
QueueMonitorExecutionContext *queueMonitorExecutionContext)
|
|
{
|
|
if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime > 0)
|
|
{
|
|
/*
|
|
* we had a delay in starting the background worker for task execution. Report
|
|
* the actual delay and reset the time. This allows a subsequent task to
|
|
* report again if it can't start a background worker directly.
|
|
*/
|
|
long secs = 0;
|
|
int microsecs = 0;
|
|
TimestampDifference(
|
|
queueMonitorExecutionContext->
|
|
backgroundWorkerFailedStartTime,
|
|
GetCurrentTimestamp(),
|
|
&secs, µsecs);
|
|
ereport(LOG, (errmsg(
|
|
"able to start a background worker with %ld seconds "
|
|
"delay", secs)));
|
|
|
|
queueMonitorExecutionContext->backgroundWorkerFailedStartTime = 0;
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* TaskConcurrentCancelCheck checks if concurrent task cancellation or removal happened by
|
|
* taking Exclusive lock. It mutates task's pid and status. Returns execution status for the
|
|
* task.
|
|
*/
|
|
static TaskExecutionStatus
|
|
TaskConcurrentCancelCheck(TaskExecutionContext *taskExecutionContext)
|
|
{
|
|
/*
|
|
* here we take exclusive lock on pg_dist_background_task table to prevent a
|
|
* concurrent modification. A separate process could have cancelled or removed
|
|
* the task by now, they would not see the pid and status update, so it is our
|
|
* responsibility to stop the backend and update the pid and status.
|
|
*
|
|
* The lock will release on transaction commit.
|
|
*/
|
|
LockRelationOid(DistBackgroundTaskRelationId(), ExclusiveLock);
|
|
|
|
BackgroundExecutorHashEntry *handleEntry = taskExecutionContext->handleEntry;
|
|
BackgroundTask *task = GetBackgroundTaskByTaskId(handleEntry->taskid);
|
|
taskExecutionContext->task = task;
|
|
if (!task)
|
|
{
|
|
ereport(ERROR, (errmsg("unexpected missing task id: %ld", handleEntry->taskid)));
|
|
}
|
|
|
|
if (task->status == BACKGROUND_TASK_STATUS_CANCELLING)
|
|
{
|
|
/*
|
|
* being in that step means that a concurrent cancel or removal happened. we should
|
|
* mark task status as cancelled. We also want to reflect cancel message by consuming
|
|
* task executor queue.
|
|
*/
|
|
bool hadError = false;
|
|
ReadFromExecutorQueue(handleEntry, &hadError);
|
|
|
|
ereport(LOG, (errmsg(
|
|
"task jobid/taskid is cancelled: %ld/%ld",
|
|
task->jobid, task->taskid)));
|
|
|
|
task->status = BACKGROUND_TASK_STATUS_CANCELLED;
|
|
|
|
return TASK_EXECUTION_STATUS_CANCELLED;
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* now that we have verified the task has not been cancelled and still exist we
|
|
* update it to reflect the new state. If task is already in running status,
|
|
* the operation is idempotent. But for runnable tasks, we make their status
|
|
* as running.
|
|
*/
|
|
|
|
pid_t pid = 0;
|
|
GetBackgroundWorkerPid(handleEntry->handle, &pid);
|
|
task->status = BACKGROUND_TASK_STATUS_RUNNING;
|
|
SET_NULLABLE_FIELD(task, pid, pid);
|
|
|
|
/* Update task status to indicate it is running */
|
|
UpdateBackgroundTask(task);
|
|
UpdateBackgroundJob(task->jobid);
|
|
|
|
return TASK_EXECUTION_STATUS_RUNNING;
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* ConsumeExecutorQueue consumes executor's shared memory queue and returns execution status
|
|
* for the task.
|
|
*/
|
|
static TaskExecutionStatus
|
|
ConsumeExecutorQueue(TaskExecutionContext *taskExecutionContext)
|
|
{
|
|
BackgroundExecutorHashEntry *handleEntry = taskExecutionContext->handleEntry;
|
|
BackgroundTask *task = taskExecutionContext->task;
|
|
|
|
/*
|
|
* we consume task executor response queue.
|
|
* possible response codes can lead us different steps below.
|
|
*/
|
|
bool hadError = false;
|
|
shm_mq_result mq_res = ReadFromExecutorQueue(handleEntry, &hadError);
|
|
|
|
if (hadError)
|
|
{
|
|
ereport(LOG, (errmsg("task jobid/taskid failed: %ld/%ld",
|
|
task->jobid, task->taskid)));
|
|
|
|
return TASK_EXECUTION_STATUS_ERROR;
|
|
}
|
|
else if (mq_res == SHM_MQ_DETACHED)
|
|
{
|
|
ereport(LOG, (errmsg("task jobid/taskid succeeded: %ld/%ld",
|
|
task->jobid, task->taskid)));
|
|
|
|
/* update task status as done. */
|
|
task->status = BACKGROUND_TASK_STATUS_DONE;
|
|
|
|
return TASK_EXECUTION_STATUS_SUCCESS;
|
|
}
|
|
else
|
|
{
|
|
/* still running the task */
|
|
Assert(mq_res == SHM_MQ_WOULD_BLOCK);
|
|
return TASK_EXECUTION_STATUS_WOULDBLOCK;
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* TaskHadError updates retry count of a failed task inside taskExecutionContext.
|
|
* If maximum retry count is reached, task status is marked as failed. Otherwise, backoff
|
|
* delay is calculated, notBefore time is updated and the task is marked as runnable.
|
|
*/
|
|
static void
|
|
TaskHadError(TaskExecutionContext *taskExecutionContext)
|
|
{
|
|
BackgroundTask *task = taskExecutionContext->task;
|
|
|
|
/*
|
|
* when we had an error in response queue, we need to decide if we want to retry (keep the
|
|
* runnable state), or move to error state
|
|
*/
|
|
|
|
if (!task->retry_count)
|
|
{
|
|
SET_NULLABLE_FIELD(task, retry_count, 1);
|
|
}
|
|
else
|
|
{
|
|
(*task->retry_count)++;
|
|
}
|
|
|
|
/*
|
|
* based on the retry count we either transition the task to its error
|
|
* state, or we calculate a new backoff time for future execution.
|
|
*/
|
|
int64 delayMs = CalculateBackoffDelay(*(task->retry_count));
|
|
if (delayMs < 0)
|
|
{
|
|
task->status = BACKGROUND_TASK_STATUS_ERROR;
|
|
UNSET_NULLABLE_FIELD(task, not_before);
|
|
}
|
|
else
|
|
{
|
|
TimestampTz notBefore = TimestampTzPlusMilliseconds(
|
|
GetCurrentTimestamp(), delayMs);
|
|
SET_NULLABLE_FIELD(task, not_before, notBefore);
|
|
|
|
task->status = BACKGROUND_TASK_STATUS_RUNNABLE;
|
|
}
|
|
|
|
TaskEnded(taskExecutionContext);
|
|
}
|
|
|
|
|
|
/*
|
|
* TaskEnded updates task inside taskExecutionContext. It also updates depending
|
|
* tasks and the job to which task belongs. At the end, it also updates executor map and
|
|
* count inside queueMonitorExecutionContext after terminating the executor.
|
|
*/
|
|
static void
|
|
TaskEnded(TaskExecutionContext *taskExecutionContext)
|
|
{
|
|
QueueMonitorExecutionContext *queueMonitorExecutionContext =
|
|
taskExecutionContext->queueMonitorExecutionContext;
|
|
|
|
HTAB *currentExecutors = queueMonitorExecutionContext->currentExecutors;
|
|
BackgroundExecutorHashEntry *handleEntry = taskExecutionContext->handleEntry;
|
|
BackgroundTask *task = taskExecutionContext->task;
|
|
|
|
/*
|
|
* we update task and job fields. We also update depending jobs.
|
|
* At the end, do cleanup.
|
|
*/
|
|
UNSET_NULLABLE_FIELD(task, pid);
|
|
task->message = handleEntry->message->data;
|
|
|
|
UpdateBackgroundTask(task);
|
|
UpdateDependingTasks(task);
|
|
UpdateBackgroundJob(task->jobid);
|
|
DecrementParallelTaskCountForNodesInvolved(task);
|
|
|
|
/* we are sure that at least one task did not block on current iteration */
|
|
queueMonitorExecutionContext->allTasksWouldBlock = false;
|
|
|
|
hash_search(currentExecutors, &task->taskid,
|
|
HASH_REMOVE, NULL);
|
|
WaitForBackgroundWorkerShutdown(handleEntry->handle);
|
|
queueMonitorExecutionContext->currentExecutorCount--;
|
|
}
|
|
|
|
|
|
/*
|
|
* IncrementParallelTaskCountForNodesInvolved
|
|
* Checks whether we have reached the limit of parallel tasks per node
|
|
* per each of the nodes involved with the task
|
|
* If at least one limit is reached, it returns false.
|
|
* If limits aren't reached, it increments the parallel task count
|
|
* for each of the nodes involved with the task, and returns true.
|
|
*/
|
|
bool
|
|
IncrementParallelTaskCountForNodesInvolved(BackgroundTask *task)
|
|
{
|
|
if (task->nodesInvolved)
|
|
{
|
|
int node;
|
|
|
|
/* first check whether we have reached the limit for any of the nodes */
|
|
foreach_int(node, task->nodesInvolved)
|
|
{
|
|
bool found;
|
|
ParallelTasksPerNodeEntry *hashEntry = hash_search(
|
|
ParallelTasksPerNode, &(node), HASH_ENTER, &found);
|
|
if (!found)
|
|
{
|
|
hashEntry->counter = 0;
|
|
}
|
|
else if (hashEntry->counter >= MaxBackgroundTaskExecutorsPerNode)
|
|
{
|
|
/* at least one node's limit is reached */
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/* then, increment the parallel task count per each node */
|
|
foreach_int(node, task->nodesInvolved)
|
|
{
|
|
ParallelTasksPerNodeEntry *hashEntry = hash_search(
|
|
ParallelTasksPerNode, &(node), HASH_FIND, NULL);
|
|
Assert(hashEntry);
|
|
hashEntry->counter += 1;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
/*
|
|
* DecrementParallelTaskCountForNodesInvolved
|
|
* Decrements the parallel task count for each of the nodes involved
|
|
* with the task.
|
|
* We call this function after the task has gone through Running state
|
|
* and then has ended.
|
|
*/
|
|
static void
|
|
DecrementParallelTaskCountForNodesInvolved(BackgroundTask *task)
|
|
{
|
|
if (task->nodesInvolved)
|
|
{
|
|
int node;
|
|
foreach_int(node, task->nodesInvolved)
|
|
{
|
|
ParallelTasksPerNodeEntry *hashEntry = hash_search(ParallelTasksPerNode,
|
|
&(node),
|
|
HASH_FIND, NULL);
|
|
|
|
hashEntry->counter -= 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* QueueMonitorSigHupHandler handles SIGHUP to update monitor related config params.
|
|
*/
|
|
static void
|
|
QueueMonitorSigHupHandler(SIGNAL_ARGS)
|
|
{
|
|
int saved_errno = errno;
|
|
|
|
GotSighup = true;
|
|
|
|
if (MyProc)
|
|
{
|
|
SetLatch(&MyProc->procLatch);
|
|
}
|
|
|
|
errno = saved_errno;
|
|
}
|
|
|
|
|
|
/*
|
|
* MonitorGotTerminationOrCancellationRequest returns true if monitor had SIGTERM or SIGINT signals
|
|
*/
|
|
static bool
|
|
MonitorGotTerminationOrCancellationRequest()
|
|
{
|
|
return GotSigterm || GotSigint;
|
|
}
|
|
|
|
|
|
/*
|
|
* QueueMonitorSigTermHandler handles SIGTERM by setting a flag to inform the monitor process
|
|
* so that it can terminate active task executors properly. It also sets the latch to awake the
|
|
* monitor if it waits on it.
|
|
*/
|
|
static void
|
|
QueueMonitorSigTermHandler(SIGNAL_ARGS)
|
|
{
|
|
int saved_errno = errno;
|
|
|
|
GotSigterm = true;
|
|
|
|
if (MyProc)
|
|
{
|
|
SetLatch(&MyProc->procLatch);
|
|
}
|
|
|
|
errno = saved_errno;
|
|
}
|
|
|
|
|
|
/*
|
|
* QueueMonitorSigIntHandler handles SIGINT by setting a flag to inform the monitor process
|
|
* so that it can terminate active task executors properly. It also sets the latch to awake the
|
|
* monitor if it waits on it.
|
|
*/
|
|
static void
|
|
QueueMonitorSigIntHandler(SIGNAL_ARGS)
|
|
{
|
|
int saved_errno = errno;
|
|
|
|
GotSigint = true;
|
|
|
|
if (MyProc)
|
|
{
|
|
SetLatch(&MyProc->procLatch);
|
|
}
|
|
|
|
errno = saved_errno;
|
|
}
|
|
|
|
|
|
/*
|
|
* TerminateAllTaskExecutors terminates task executors given in the hash map.
|
|
*/
|
|
static void
|
|
TerminateAllTaskExecutors(HTAB *currentExecutors)
|
|
{
|
|
HASH_SEQ_STATUS status;
|
|
BackgroundExecutorHashEntry *backgroundExecutorHashEntry;
|
|
foreach_htab(backgroundExecutorHashEntry, &status, currentExecutors)
|
|
{
|
|
TerminateBackgroundWorker(backgroundExecutorHashEntry->handle);
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* GetRunningUniqueJobIds returns unique job ids from currentExecutors
|
|
*/
|
|
static HTAB *
|
|
GetRunningUniqueJobIds(HTAB *currentExecutors)
|
|
{
|
|
/* create a set to store unique job ids for currently executing tasks */
|
|
HTAB *uniqueJobIds = CreateSimpleHashSetWithSize(int64, MAX_BG_TASK_EXECUTORS);
|
|
|
|
HASH_SEQ_STATUS status;
|
|
BackgroundExecutorHashEntry *backgroundExecutorHashEntry;
|
|
foreach_htab(backgroundExecutorHashEntry, &status, currentExecutors)
|
|
{
|
|
hash_search(uniqueJobIds, &backgroundExecutorHashEntry->jobid, HASH_ENTER, NULL);
|
|
}
|
|
|
|
return uniqueJobIds;
|
|
}
|
|
|
|
|
|
/*
|
|
* CancelAllTaskExecutors cancels task executors given in the hash map.
|
|
*/
|
|
static void
|
|
CancelAllTaskExecutors(HTAB *currentExecutors)
|
|
{
|
|
StartTransactionCommand();
|
|
PushActiveSnapshot(GetTransactionSnapshot());
|
|
|
|
/* get unique job id set for running tasks in currentExecutors */
|
|
HTAB *uniqueJobIds = GetRunningUniqueJobIds(currentExecutors);
|
|
|
|
HASH_SEQ_STATUS status;
|
|
int64 *uniqueJobId;
|
|
foreach_htab(uniqueJobId, &status, uniqueJobIds)
|
|
{
|
|
ereport(DEBUG1, (errmsg("cancelling job: %ld", *uniqueJobId)));
|
|
Datum jobidDatum = Int64GetDatum(*uniqueJobId);
|
|
DirectFunctionCall1(citus_job_cancel, jobidDatum);
|
|
}
|
|
|
|
PopActiveSnapshot();
|
|
CommitTransactionCommand();
|
|
}
|
|
|
|
|
|
/*
|
|
* CitusBackgroundTaskQueueMonitorMain is the main entry point for the background worker
|
|
* running the background tasks queue monitor.
|
|
*
|
|
* It's mainloop reads a runnable task from pg_dist_background_task and progressing the
|
|
* tasks and jobs state machines associated with the task. When no new task can be found
|
|
* it will exit(0) and lets the maintenance daemon poll for new tasks.
|
|
*
|
|
* The main loop is implemented as asynchronous loop stepping through the task
|
|
* and update its state before going to the next. Loop assigns runnable tasks to new task
|
|
* executors as much as possible. If the max task executor limit is hit, the tasks will be
|
|
* waiting in runnable status until currently running tasks finish. Each parallel worker
|
|
* executes one task at a time without blocking each other by using nonblocking api.
|
|
*/
|
|
void
|
|
CitusBackgroundTaskQueueMonitorMain(Datum arg)
|
|
{
|
|
/* handle SIGTERM to properly terminate active task executors */
|
|
pqsignal(SIGTERM, QueueMonitorSigTermHandler);
|
|
|
|
/* handle SIGINT to properly cancel active task executors */
|
|
pqsignal(SIGINT, QueueMonitorSigIntHandler);
|
|
|
|
/* handle SIGHUP to update MaxBackgroundTaskExecutors and MaxBackgroundTaskExecutorsPerNode */
|
|
pqsignal(SIGHUP, QueueMonitorSigHupHandler);
|
|
|
|
/* ready to handle signals */
|
|
BackgroundWorkerUnblockSignals();
|
|
|
|
Oid databaseOid = DatumGetObjectId(arg);
|
|
|
|
/* extension owner is passed via bgw_extra */
|
|
Oid extensionOwner = InvalidOid;
|
|
memcpy_s(&extensionOwner, sizeof(extensionOwner),
|
|
MyBgworkerEntry->bgw_extra, sizeof(Oid));
|
|
|
|
/* connect to database, after that we can actually access catalogs */
|
|
BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0);
|
|
|
|
/*
|
|
* save old context until monitor loop exits, we use backgroundTaskContext for
|
|
* all allocations.
|
|
*/
|
|
MemoryContext firstContext = CurrentMemoryContext;
|
|
MemoryContext backgroundTaskContext = AllocSetContextCreate(TopMemoryContext,
|
|
"BackgroundTaskContext",
|
|
ALLOCSET_DEFAULT_SIZES);
|
|
|
|
StartTransactionCommand();
|
|
PushActiveSnapshot(GetTransactionSnapshot());
|
|
|
|
const char *databasename = get_database_name(MyDatabaseId);
|
|
|
|
/* make databasename alive during queue monitor lifetime */
|
|
MemoryContext oldContext = MemoryContextSwitchTo(backgroundTaskContext);
|
|
databasename = pstrdup(databasename);
|
|
MemoryContextSwitchTo(oldContext);
|
|
|
|
/* setup error context to indicate the errors came from a running background task */
|
|
ErrorContextCallback errorCallback = { 0 };
|
|
struct CitusBackgroundTaskQueueMonitorErrorCallbackContext context = {
|
|
.database = databasename,
|
|
};
|
|
errorCallback.callback = CitusBackgroundTaskQueueMonitorErrorCallback;
|
|
errorCallback.arg = (void *) &context;
|
|
errorCallback.previous = error_context_stack;
|
|
error_context_stack = &errorCallback;
|
|
|
|
PopActiveSnapshot();
|
|
CommitTransactionCommand();
|
|
|
|
/*
|
|
* There should be exactly one background task monitor running, running multiple would
|
|
* cause conflicts on processing the tasks in the catalog table as well as violate
|
|
* parallelism guarantees. To make sure there is at most, exactly one backend running
|
|
* we take a session lock on the CITUS_BACKGROUND_TASK_MONITOR operation.
|
|
*/
|
|
LOCKTAG tag = { 0 };
|
|
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_BACKGROUND_TASK_MONITOR);
|
|
const bool sessionLock = true;
|
|
const bool dontWait = true;
|
|
LockAcquireResult locked =
|
|
LockAcquire(&tag, AccessExclusiveLock, sessionLock, dontWait);
|
|
if (locked == LOCKACQUIRE_NOT_AVAIL)
|
|
{
|
|
ereport(ERROR, (errmsg("background task queue monitor already running for "
|
|
"database")));
|
|
}
|
|
|
|
/* make worker recognizable in pg_stat_activity */
|
|
pgstat_report_appname("citus background task queue monitor");
|
|
|
|
ereport(DEBUG1, (errmsg("started citus background task queue monitor")));
|
|
|
|
/*
|
|
* First we find all jobs that are running, we need to check if they are still running
|
|
* if not reset their state back to scheduled.
|
|
*/
|
|
StartTransactionCommand();
|
|
PushActiveSnapshot(GetTransactionSnapshot());
|
|
|
|
ResetRunningBackgroundTasks();
|
|
|
|
PopActiveSnapshot();
|
|
CommitTransactionCommand();
|
|
|
|
/* create a map to store parallel task executors. Persist it in monitor memory context */
|
|
oldContext = MemoryContextSwitchTo(backgroundTaskContext);
|
|
HTAB *currentExecutors = CreateSimpleHashWithNameAndSize(int64,
|
|
BackgroundExecutorHashEntry,
|
|
"Background Executor Hash",
|
|
MAX_BG_TASK_EXECUTORS);
|
|
MemoryContextSwitchTo(oldContext);
|
|
|
|
/*
|
|
* monitor execution context that is useful during the monitor loop.
|
|
* we store current executor count, last background failure timestamp,
|
|
* currently executed task context and also a memory context to persist
|
|
* some allocations throughout the loop.
|
|
*/
|
|
QueueMonitorExecutionContext queueMonitorExecutionContext = {
|
|
.currentExecutorCount = 0,
|
|
.backgroundWorkerFailedStartTime = 0,
|
|
.allTasksWouldBlock = true,
|
|
.currentExecutors = currentExecutors,
|
|
.ctx = backgroundTaskContext
|
|
};
|
|
|
|
/* flag to prevent duplicate termination and cancellation of task executors */
|
|
bool terminateExecutorsStarted = false;
|
|
bool cancelExecutorsStarted = false;
|
|
|
|
/* loop exits if there is no running or runnable tasks left */
|
|
bool hasAnyTask = true;
|
|
while (hasAnyTask)
|
|
{
|
|
/* handle signals */
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
/* invalidate cache for new data in catalog */
|
|
InvalidateMetadataSystemCache();
|
|
|
|
/*
|
|
* if the flag is set, we should terminate all task executor workers to prevent duplicate
|
|
* runs of the same task on the next start of the monitor, which is dangerous for non-idempotent
|
|
* tasks. We do not break the loop here as we want to reflect tasks' messages. Hence, we wait until
|
|
* all tasks finish and also do not allow new runnable tasks to start running. After all current tasks
|
|
* finish, we can exit the loop safely.
|
|
*/
|
|
if (GotSigterm && !terminateExecutorsStarted)
|
|
{
|
|
ereport(LOG, (errmsg("handling termination signal")));
|
|
terminateExecutorsStarted = true;
|
|
TerminateAllTaskExecutors(queueMonitorExecutionContext.currentExecutors);
|
|
}
|
|
|
|
if (GotSigint && !cancelExecutorsStarted)
|
|
{
|
|
ereport(LOG, (errmsg("handling cancellation signal")));
|
|
cancelExecutorsStarted = true;
|
|
CancelAllTaskExecutors(queueMonitorExecutionContext.currentExecutors);
|
|
}
|
|
|
|
if (GotSighup)
|
|
{
|
|
GotSighup = false;
|
|
|
|
/* update max_background_task_executors and max_background_task_executors_per_node if changed */
|
|
ProcessConfigFile(PGC_SIGHUP);
|
|
}
|
|
|
|
if (ParallelTasksPerNode == NULL)
|
|
{
|
|
ParallelTasksPerNode = CreateSimpleHash(int32, ParallelTasksPerNodeEntry);
|
|
}
|
|
|
|
/* assign runnable tasks, if any, to new task executors in a transaction if we do not have SIGTERM or SIGINT */
|
|
if (!MonitorGotTerminationOrCancellationRequest())
|
|
{
|
|
StartTransactionCommand();
|
|
PushActiveSnapshot(GetTransactionSnapshot());
|
|
AssignRunnableTasks(&queueMonitorExecutionContext);
|
|
PopActiveSnapshot();
|
|
CommitTransactionCommand();
|
|
}
|
|
|
|
/* get running task entries from hash table */
|
|
List *runningTaskEntries = GetRunningTaskEntries(
|
|
queueMonitorExecutionContext.currentExecutors);
|
|
hasAnyTask = list_length(runningTaskEntries) > 0;
|
|
|
|
/* useful to sleep if all tasks ewouldblock on current iteration */
|
|
queueMonitorExecutionContext.allTasksWouldBlock = true;
|
|
|
|
/* monitor executors inside transaction */
|
|
StartTransactionCommand();
|
|
PushActiveSnapshot(GetTransactionSnapshot());
|
|
|
|
/* iterate over all handle entries and monitor each task's output */
|
|
BackgroundExecutorHashEntry *handleEntry = NULL;
|
|
foreach_ptr(handleEntry, runningTaskEntries)
|
|
{
|
|
/* create task execution context and assign it to queueMonitorExecutionContext */
|
|
TaskExecutionContext taskExecutionContext = {
|
|
.queueMonitorExecutionContext = &queueMonitorExecutionContext,
|
|
.handleEntry = handleEntry,
|
|
.task = NULL
|
|
};
|
|
|
|
/* check if concurrent cancellation occurred */
|
|
TaskExecutionStatus taskExecutionStatus = TaskConcurrentCancelCheck(
|
|
&taskExecutionContext);
|
|
|
|
/*
|
|
* check task status. If it is cancelled, we do not need to consume queue
|
|
* as we already consumed the queue.
|
|
*/
|
|
if (taskExecutionStatus == TASK_EXECUTION_STATUS_CANCELLED)
|
|
{
|
|
TaskEnded(&taskExecutionContext);
|
|
continue;
|
|
}
|
|
|
|
taskExecutionStatus = ConsumeExecutorQueue(&taskExecutionContext);
|
|
if (taskExecutionStatus == TASK_EXECUTION_STATUS_ERROR)
|
|
{
|
|
TaskHadError(&taskExecutionContext);
|
|
}
|
|
else if (taskExecutionStatus == TASK_EXECUTION_STATUS_SUCCESS)
|
|
{
|
|
TaskEnded(&taskExecutionContext);
|
|
}
|
|
}
|
|
|
|
PopActiveSnapshot();
|
|
CommitTransactionCommand();
|
|
|
|
if (queueMonitorExecutionContext.allTasksWouldBlock)
|
|
{
|
|
/*
|
|
* sleep to lower cpu consumption if all tasks responded with EWOULD_BLOCK on the last iteration.
|
|
* That will also let those tasks to progress to generate some output probably.
|
|
*/
|
|
const long delay_ms = 1000;
|
|
(void) WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT |
|
|
WL_EXIT_ON_PM_DEATH,
|
|
delay_ms, WAIT_EVENT_PG_SLEEP);
|
|
ResetLatch(MyLatch);
|
|
}
|
|
}
|
|
|
|
MemoryContextSwitchTo(firstContext);
|
|
MemoryContextDelete(backgroundTaskContext);
|
|
|
|
proc_exit(0);
|
|
}
|
|
|
|
|
|
/*
|
|
* ReadFromExecutorQueue reads from task executor's response queue into the message.
|
|
* It also sets hadError flag if an error response is encountered in the queue.
|
|
*/
|
|
static shm_mq_result
|
|
ReadFromExecutorQueue(BackgroundExecutorHashEntry *backgroundExecutorHashEntry,
|
|
bool *hadError)
|
|
{
|
|
dsm_segment *seg = backgroundExecutorHashEntry->seg;
|
|
shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_TASK_MAGIC,
|
|
dsm_segment_address(seg));
|
|
shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false);
|
|
shm_mq_handle *responseq = shm_mq_attach(mq, seg, NULL);
|
|
|
|
/*
|
|
* Consume background executor's queue and get a response code.
|
|
*/
|
|
StringInfo message = backgroundExecutorHashEntry->message;
|
|
shm_mq_result mq_res = ConsumeTaskWorkerOutput(responseq, message, hadError);
|
|
return mq_res;
|
|
}
|
|
|
|
|
|
/*
|
|
* CalculateBackoffDelay calculates the time to backoff between retries.
|
|
*
|
|
* Per try we increase the delay as follows:
|
|
* retry 1: 5 sec
|
|
* retry 2: 20 sec
|
|
* retry 3-32 (30 tries in total): 1 min
|
|
*
|
|
* returns -1 when retrying should stop.
|
|
*
|
|
* In the future we would like a callback on the job_type that could
|
|
* distinguish the retry count and delay + potential jitter on a
|
|
* job_type basis. For now we only assume this to be used by the
|
|
* rebalancer and settled on the retry scheme above.
|
|
*/
|
|
static int64
|
|
CalculateBackoffDelay(int retryCount)
|
|
{
|
|
if (retryCount == 1)
|
|
{
|
|
return 5 * 1000;
|
|
}
|
|
else if (retryCount == 2)
|
|
{
|
|
return 20 * 1000;
|
|
}
|
|
else if (retryCount <= 32)
|
|
{
|
|
return 60 * 1000;
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
|
|
#if PG_VERSION_NUM < PG_VERSION_15
|
|
static const char *
|
|
error_severity(int elevel)
|
|
{
|
|
const char *prefix;
|
|
|
|
switch (elevel)
|
|
{
|
|
case DEBUG1:
|
|
case DEBUG2:
|
|
case DEBUG3:
|
|
case DEBUG4:
|
|
case DEBUG5:
|
|
{
|
|
prefix = gettext_noop("DEBUG");
|
|
break;
|
|
}
|
|
|
|
case LOG:
|
|
case LOG_SERVER_ONLY:
|
|
{
|
|
prefix = gettext_noop("LOG");
|
|
break;
|
|
}
|
|
|
|
case INFO:
|
|
{
|
|
prefix = gettext_noop("INFO");
|
|
break;
|
|
}
|
|
|
|
case NOTICE:
|
|
{
|
|
prefix = gettext_noop("NOTICE");
|
|
break;
|
|
}
|
|
|
|
case WARNING:
|
|
{
|
|
prefix = gettext_noop("WARNING");
|
|
break;
|
|
}
|
|
|
|
case WARNING_CLIENT_ONLY:
|
|
{
|
|
prefix = gettext_noop("WARNING");
|
|
break;
|
|
}
|
|
|
|
case ERROR:
|
|
{
|
|
prefix = gettext_noop("ERROR");
|
|
break;
|
|
}
|
|
|
|
case FATAL:
|
|
{
|
|
prefix = gettext_noop("FATAL");
|
|
break;
|
|
}
|
|
|
|
case PANIC:
|
|
{
|
|
prefix = gettext_noop("PANIC");
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
prefix = "???";
|
|
break;
|
|
}
|
|
}
|
|
|
|
return prefix;
|
|
}
|
|
|
|
|
|
#endif
|
|
|
|
|
|
/*
|
|
* bgw_generate_returned_message -
|
|
* generates the message to be inserted into the job_run_details table
|
|
* first part is comming from error_severity (elog.c)
|
|
*/
|
|
static void
|
|
bgw_generate_returned_message(StringInfoData *display_msg, ErrorData edata)
|
|
{
|
|
const char *prefix = error_severity(edata.elevel);
|
|
appendStringInfo(display_msg, "%s: %s", prefix, edata.message);
|
|
if (edata.detail != NULL)
|
|
{
|
|
appendStringInfo(display_msg, "\nDETAIL: %s", edata.detail);
|
|
}
|
|
|
|
if (edata.hint != NULL)
|
|
{
|
|
appendStringInfo(display_msg, "\nHINT: %s", edata.hint);
|
|
}
|
|
|
|
if (edata.context != NULL)
|
|
{
|
|
appendStringInfo(display_msg, "\nCONTEXT: %s", edata.context);
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* UpdateDependingTasks updates all depending tasks, based on the type of terminal state
|
|
* the current task reached.
|
|
*/
|
|
static void
|
|
UpdateDependingTasks(BackgroundTask *task)
|
|
{
|
|
switch (task->status)
|
|
{
|
|
case BACKGROUND_TASK_STATUS_DONE:
|
|
{
|
|
UnblockDependingBackgroundTasks(task);
|
|
break;
|
|
}
|
|
|
|
case BACKGROUND_TASK_STATUS_ERROR:
|
|
{
|
|
/* when we error this task, we need to unschedule all dependant tasks */
|
|
UnscheduleDependentTasks(task);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
/* nothing to do for other states */
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* ConsumeTaskWorkerOutput consumes the output of an executor and sets the message as
|
|
* the last message read from the queue. It also sets hadError as true if executor had
|
|
* error.
|
|
*/
|
|
static shm_mq_result
|
|
ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, bool *hadError)
|
|
{
|
|
shm_mq_result res;
|
|
|
|
/*
|
|
* Message-parsing routines operate on a null-terminated StringInfo,
|
|
* so we must construct one.
|
|
*/
|
|
StringInfoData msg = { 0 };
|
|
initStringInfo(&msg);
|
|
|
|
for (;;)
|
|
{
|
|
resetStringInfo(&msg);
|
|
|
|
/*
|
|
* non-blocking receive to not block other bg workers
|
|
*/
|
|
Size nbytes = 0;
|
|
void *data = NULL;
|
|
const bool noWait = true;
|
|
res = shm_mq_receive(responseq, &nbytes, &data, noWait);
|
|
|
|
if (res != SHM_MQ_SUCCESS)
|
|
{
|
|
break;
|
|
}
|
|
|
|
appendBinaryStringInfo(&msg, data, nbytes);
|
|
|
|
/*
|
|
* msgtype seems to be documented on
|
|
* https://www.postgresql.org/docs/current/protocol-message-formats.html
|
|
*
|
|
* Here we mostly handle the same message types as supported in pg_cron as the
|
|
* executor is highly influenced by the implementation there.
|
|
*/
|
|
char msgtype = pq_getmsgbyte(&msg);
|
|
switch (msgtype)
|
|
{
|
|
case 'E': /* ErrorResponse */
|
|
{
|
|
if (hadError)
|
|
{
|
|
*hadError = true;
|
|
}
|
|
}
|
|
|
|
/* FALLTHROUGH */
|
|
|
|
case 'N': /* NoticeResponse */
|
|
{
|
|
ErrorData edata = { 0 };
|
|
StringInfoData display_msg = { 0 };
|
|
|
|
pq_parse_errornotice(&msg, &edata);
|
|
initStringInfo(&display_msg);
|
|
bgw_generate_returned_message(&display_msg, edata);
|
|
|
|
/* we keep only the last message */
|
|
resetStringInfo(message);
|
|
appendStringInfoString(message, display_msg.data);
|
|
appendStringInfoChar(message, '\n');
|
|
|
|
pfree(display_msg.data);
|
|
|
|
break;
|
|
}
|
|
|
|
case 'C': /* CommandComplete */
|
|
{
|
|
const char *tag = pq_getmsgstring(&msg);
|
|
|
|
char *nonconst_tag = pstrdup(tag);
|
|
|
|
/* append the nonconst_tag to the task's message */
|
|
appendStringInfoString(message, nonconst_tag);
|
|
appendStringInfoChar(message, '\n');
|
|
|
|
pfree(nonconst_tag);
|
|
|
|
break;
|
|
}
|
|
|
|
case 'A':
|
|
case 'D':
|
|
case 'G':
|
|
case 'H':
|
|
case 'T':
|
|
case 'W':
|
|
case 'Z':
|
|
{
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
elog(WARNING, "unknown message type: %c (%zu bytes)",
|
|
msg.data[0], nbytes);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
pfree(msg.data);
|
|
return res;
|
|
}
|
|
|
|
|
|
/*
|
|
* StoreArgumentsInDSM creates a dynamic shared memory segment to pass the query and its
|
|
* environment to the executor.
|
|
*/
|
|
static dsm_segment *
|
|
StoreArgumentsInDSM(char *database, char *username, char *command,
|
|
int64 taskId, int64 jobId)
|
|
{
|
|
/*
|
|
* Create the shared memory that we will pass to the background
|
|
* worker process. We use DSM_CREATE_NULL_IF_MAXSEGMENTS so that we
|
|
* do not ERROR here. This way, we can mark the job as failed and
|
|
* keep the launcher process running normally.
|
|
*/
|
|
shm_toc_estimator e = { 0 };
|
|
shm_toc_initialize_estimator(&e);
|
|
shm_toc_estimate_chunk(&e, strlen(database) + 1);
|
|
shm_toc_estimate_chunk(&e, strlen(username) + 1);
|
|
shm_toc_estimate_chunk(&e, strlen(command) + 1);
|
|
#define QUEUE_SIZE ((Size) 65536)
|
|
shm_toc_estimate_chunk(&e, QUEUE_SIZE);
|
|
shm_toc_estimate_chunk(&e, sizeof(int64));
|
|
shm_toc_estimate_chunk(&e, sizeof(int64));
|
|
shm_toc_estimate_keys(&e, CITUS_BACKGROUND_TASK_NKEYS);
|
|
Size segsize = shm_toc_estimate(&e);
|
|
|
|
dsm_segment *seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
|
|
|
|
if (seg == NULL)
|
|
{
|
|
ereport(ERROR,
|
|
(errmsg("max number of DSM segments may has been reached")));
|
|
|
|
return NULL;
|
|
}
|
|
|
|
shm_toc *toc = shm_toc_create(CITUS_BACKGROUND_TASK_MAGIC, dsm_segment_address(seg),
|
|
segsize);
|
|
|
|
Size size = strlen(database) + 1;
|
|
char *databaseTarget = shm_toc_allocate(toc, size);
|
|
strcpy_s(databaseTarget, size, database);
|
|
shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_DATABASE, databaseTarget);
|
|
|
|
size = strlen(username) + 1;
|
|
char *usernameTarget = shm_toc_allocate(toc, size);
|
|
strcpy_s(usernameTarget, size, username);
|
|
shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_USERNAME, usernameTarget);
|
|
|
|
size = strlen(command) + 1;
|
|
char *commandTarget = shm_toc_allocate(toc, size);
|
|
strcpy_s(commandTarget, size, command);
|
|
shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_COMMAND, commandTarget);
|
|
|
|
shm_mq *mq = shm_mq_create(shm_toc_allocate(toc, QUEUE_SIZE), QUEUE_SIZE);
|
|
shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, mq);
|
|
shm_mq_set_receiver(mq, MyProc);
|
|
|
|
int64 *taskIdTarget = shm_toc_allocate(toc, sizeof(int64));
|
|
*taskIdTarget = taskId;
|
|
shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, taskIdTarget);
|
|
|
|
int64 *jobIdTarget = shm_toc_allocate(toc, sizeof(int64));
|
|
*jobIdTarget = jobId;
|
|
shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_JOB_ID, jobIdTarget);
|
|
|
|
shm_mq_attach(mq, seg, NULL);
|
|
|
|
/*
|
|
* when we have CurrentResourceOwner != NULL, segment will be released upon CurrentResourceOwner release,
|
|
* but we may consume the queue in segment even after CurrentResourceOwner released. 'dsm_pin_mapping' helps
|
|
* persisting the segment until the session ends or the segment is detached explicitly by 'dsm_detach'.
|
|
*/
|
|
dsm_pin_mapping(seg);
|
|
|
|
return seg;
|
|
}
|
|
|
|
|
|
/*
|
|
* StartCitusBackgroundTaskExecutor start a new background worker for the execution of a
|
|
* background task. Callers interested in the shared memory segment that is created
|
|
* between the background worker and the current backend can pass in a segOut to get a
|
|
* pointer to the dynamic shared memory.
|
|
*/
|
|
static BackgroundWorkerHandle *
|
|
StartCitusBackgroundTaskExecutor(char *database, char *user, char *command,
|
|
int64 taskId, int64 jobId, dsm_segment **pSegment)
|
|
{
|
|
dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId, jobId);
|
|
|
|
/* Configure a worker. */
|
|
BackgroundWorker worker = { 0 };
|
|
memset(&worker, 0, sizeof(worker));
|
|
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
|
|
"Citus Background Task Queue Executor: %s/%s for (%ld/%ld)",
|
|
database, user, jobId, taskId);
|
|
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
|
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
|
|
|
/* don't restart, we manage restarts from maintenance daemon */
|
|
worker.bgw_restart_time = BGW_NEVER_RESTART;
|
|
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
|
|
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
|
"CitusBackgroundTaskExecutor");
|
|
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
|
|
worker.bgw_notify_pid = MyProcPid;
|
|
|
|
BackgroundWorkerHandle *handle = NULL;
|
|
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
|
{
|
|
dsm_detach(seg);
|
|
return NULL;
|
|
}
|
|
|
|
pid_t pid = { 0 };
|
|
WaitForBackgroundWorkerStartup(handle, &pid);
|
|
|
|
if (pSegment)
|
|
{
|
|
*pSegment = seg;
|
|
}
|
|
|
|
return handle;
|
|
}
|
|
|
|
|
|
/*
|
|
* context for any log/error messages emitted from the background task executor.
|
|
*/
|
|
typedef struct CitusBackgroundJobExecutorErrorCallbackContext
|
|
{
|
|
const char *database;
|
|
const char *username;
|
|
int64 taskId;
|
|
int64 jobId;
|
|
} CitusBackgroundJobExecutorErrorCallbackContext;
|
|
|
|
|
|
/*
|
|
* CitusBackgroundJobExecutorErrorCallback is a callback handler that gets called for any
|
|
* ereport to add extra context to the message.
|
|
*/
|
|
static void
|
|
CitusBackgroundJobExecutorErrorCallback(void *arg)
|
|
{
|
|
CitusBackgroundJobExecutorErrorCallbackContext *context =
|
|
(CitusBackgroundJobExecutorErrorCallbackContext *) arg;
|
|
errcontext("Citus Background Task Queue Executor: %s/%s for (%ld/%ld)",
|
|
context->database, context->username,
|
|
context->jobId, context->taskId);
|
|
}
|
|
|
|
|
|
/*
|
|
* CitusBackgroundTaskExecutor is the main function of the background tasks queue
|
|
* executor. This backend attaches to a shared memory segment as identified by the
|
|
* main_arg of the background worker.
|
|
*
|
|
* This is mostly based on the background worker logic in pg_cron
|
|
*/
|
|
void
|
|
CitusBackgroundTaskExecutor(Datum main_arg)
|
|
{
|
|
/* handles SIGTERM similar to backends */
|
|
pqsignal(SIGTERM, die);
|
|
BackgroundWorkerUnblockSignals();
|
|
|
|
/* Set up a dynamic shared memory segment. */
|
|
dsm_segment *seg = dsm_attach(DatumGetInt32(main_arg));
|
|
if (seg == NULL)
|
|
{
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("unable to map dynamic shared memory segment")));
|
|
}
|
|
|
|
shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_TASK_MAGIC, dsm_segment_address(seg));
|
|
if (toc == NULL)
|
|
{
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("bad magic number in dynamic shared memory segment")));
|
|
}
|
|
|
|
char *database = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_DATABASE, false);
|
|
char *username = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_USERNAME, false);
|
|
char *command = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_COMMAND, false);
|
|
int64 *taskId = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, false);
|
|
int64 *jobId = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_JOB_ID, false);
|
|
shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false);
|
|
|
|
shm_mq_set_sender(mq, MyProc);
|
|
shm_mq_handle *responseq = shm_mq_attach(mq, seg, NULL);
|
|
pq_redirect_to_shm_mq(seg, responseq);
|
|
|
|
/* setup error context to indicate the errors came from a running background task */
|
|
ErrorContextCallback errorCallback = { 0 };
|
|
CitusBackgroundJobExecutorErrorCallbackContext context = {
|
|
.database = database,
|
|
.username = username,
|
|
.taskId = *taskId,
|
|
.jobId = *jobId,
|
|
};
|
|
errorCallback.callback = CitusBackgroundJobExecutorErrorCallback;
|
|
errorCallback.arg = (void *) &context;
|
|
errorCallback.previous = error_context_stack;
|
|
error_context_stack = &errorCallback;
|
|
|
|
BackgroundWorkerInitializeConnection(database, username, 0);
|
|
|
|
/* make sure we are the only backend running for this task */
|
|
LOCKTAG locktag = { 0 };
|
|
SET_LOCKTAG_BACKGROUND_TASK(locktag, *taskId);
|
|
const bool sessionLock = true;
|
|
const bool dontWait = true;
|
|
LockAcquireResult locked =
|
|
LockAcquire(&locktag, AccessExclusiveLock, sessionLock, dontWait);
|
|
if (locked == LOCKACQUIRE_NOT_AVAIL)
|
|
{
|
|
ereport(ERROR, (errmsg("unable to acquire background task lock for taskId: %ld",
|
|
*taskId),
|
|
errdetail("this indicates that an other backend is already "
|
|
"executing this task")));
|
|
}
|
|
|
|
/* Execute the query. */
|
|
StartTransactionCommand();
|
|
ExecuteSqlString(command);
|
|
CommitTransactionCommand();
|
|
|
|
/* Signal that we are done. */
|
|
ReadyForQuery(DestRemote);
|
|
|
|
dsm_detach(seg);
|
|
proc_exit(0);
|
|
}
|
|
|
|
|
|
/*
|
|
* Execute given SQL string without SPI or a libpq session.
|
|
*/
|
|
static void
|
|
ExecuteSqlString(const char *sql)
|
|
{
|
|
/*
|
|
* Parse the SQL string into a list of raw parse trees.
|
|
*
|
|
* Because we allow statements that perform internal transaction control,
|
|
* we can't do this in TopTransactionContext; the parse trees might get
|
|
* blown away before we're done executing them.
|
|
*/
|
|
MemoryContext parsecontext = AllocSetContextCreate(CurrentMemoryContext,
|
|
"query parse/plan",
|
|
ALLOCSET_DEFAULT_MINSIZE,
|
|
ALLOCSET_DEFAULT_INITSIZE,
|
|
ALLOCSET_DEFAULT_MAXSIZE);
|
|
MemoryContext oldcontext = MemoryContextSwitchTo(parsecontext);
|
|
List *raw_parsetree_list = pg_parse_query(sql);
|
|
int commands_remaining = list_length(raw_parsetree_list);
|
|
bool isTopLevel = commands_remaining == 1;
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
/*
|
|
* Do parse analysis, rule rewrite, planning, and execution for each raw
|
|
* parsetree. We must fully execute each query before beginning parse
|
|
* analysis on the next one, since there may be interdependencies.
|
|
*/
|
|
RawStmt *parsetree = NULL;
|
|
foreach_ptr(parsetree, raw_parsetree_list)
|
|
{
|
|
/*
|
|
* We don't allow transaction-control commands like COMMIT and ABORT
|
|
* here. The entire SQL statement is executed as a single transaction
|
|
* which commits if no errors are encountered.
|
|
*/
|
|
if (IsA(parsetree, TransactionStmt))
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg(
|
|
"transaction control statements are not allowed in background job")));
|
|
}
|
|
|
|
/*
|
|
* Get the command name for use in status display (it also becomes the
|
|
* default completion tag, down inside PortalRun). Set ps_status and
|
|
* do any special start-of-SQL-command processing needed by the
|
|
* destination.
|
|
*/
|
|
CommandTag commandTag = CreateCommandTag(parsetree->stmt);
|
|
set_ps_display(GetCommandTagName(commandTag));
|
|
BeginCommand(commandTag, DestNone);
|
|
|
|
/* Set up a snapshot if parse analysis/planning will need one. */
|
|
bool snapshot_set = false;
|
|
if (analyze_requires_snapshot(parsetree))
|
|
{
|
|
PushActiveSnapshot(GetTransactionSnapshot());
|
|
snapshot_set = true;
|
|
}
|
|
|
|
/*
|
|
* OK to analyze, rewrite, and plan this query.
|
|
*
|
|
* As with parsing, we need to make sure this data outlives the
|
|
* transaction, because of the possibility that the statement might
|
|
* perform internal transaction control.
|
|
*/
|
|
oldcontext = MemoryContextSwitchTo(parsecontext);
|
|
|
|
#if PG_VERSION_NUM >= 150000
|
|
List *querytree_list =
|
|
pg_analyze_and_rewrite_fixedparams(parsetree, sql, NULL, 0, NULL);
|
|
#else
|
|
List *querytree_list =
|
|
pg_analyze_and_rewrite(parsetree, sql, NULL, 0, NULL);
|
|
#endif
|
|
|
|
List *plantree_list = pg_plan_queries(querytree_list, sql, 0, NULL);
|
|
|
|
/* Done with the snapshot used for parsing/planning */
|
|
if (snapshot_set)
|
|
{
|
|
PopActiveSnapshot();
|
|
}
|
|
|
|
/* If we got a cancel signal in analysis or planning, quit */
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
/*
|
|
* Execute the query using the unnamed portal.
|
|
*/
|
|
Portal portal = CreatePortal("", true, true);
|
|
|
|
/* Don't display the portal in pg_cursors */
|
|
portal->visible = false;
|
|
PortalDefineQuery(portal, NULL, sql, commandTag, plantree_list, NULL);
|
|
PortalStart(portal, NULL, 0, InvalidSnapshot);
|
|
int16 format[] = { 1 };
|
|
PortalSetResultFormat(portal, lengthof(format), format); /* binary format */
|
|
|
|
commands_remaining--;
|
|
DestReceiver *receiver = CreateDestReceiver(DestNone);
|
|
|
|
/*
|
|
* Only once the portal and destreceiver have been established can
|
|
* we return to the transaction context. All that stuff needs to
|
|
* survive an internal commit inside PortalRun!
|
|
*/
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
/* Here's where we actually execute the command. */
|
|
QueryCompletion qc = { 0 };
|
|
(void) PortalRun(portal, FETCH_ALL, isTopLevel, true, receiver, receiver, &qc);
|
|
|
|
/* Clean up the receiver. */
|
|
(*receiver->rDestroy)(receiver);
|
|
|
|
/*
|
|
* Send a CommandComplete message even if we suppressed the query
|
|
* results. The user backend will report these in the absence of
|
|
* any true query results.
|
|
*/
|
|
EndCommand(&qc, DestRemote, false);
|
|
|
|
/* Clean up the portal. */
|
|
PortalDrop(portal, false);
|
|
}
|
|
|
|
/* Be sure to advance the command counter after the last script command */
|
|
CommandCounterIncrement();
|
|
}
|