diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 9854ecc34..81ad43f38 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1690,6 +1690,24 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_background_task_executors", + gettext_noop( + "Sets the maximum number of parallel task executor workers for scheduled " + "background tasks"), + gettext_noop( + "Controls the maximum number of parallel task executors the task monitor " + "can create for scheduled background tasks. Note that the value is not effective " + "if it is set a value higher than 'max_worker_processes' postgres parameter . It is " + "also not guaranteed to have exactly specified number of parallel task executors " + "because total background worker count is shared by all background workers. The value " + "represents the possible maximum number of task executors."), + &MaxBackgroundTaskExecutors, + 4, 1, MAX_BG_TASK_EXECUTORS, + PGC_SIGHUP, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.max_cached_connection_lifetime", gettext_noop("Sets the maximum lifetime of cached connections to other nodes."), diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 524bcdb2a..45f7a842f 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -31,6 +31,7 @@ #include "access/xact.h" #include "commands/dbcommands.h" +#include "common/hashfn.h" #include "libpq-fe.h" #include "libpq/pqformat.h" #include "libpq/pqmq.h" @@ -46,6 +47,7 @@ #include "tcop/tcopprot.h" #include "tcop/utility.h" #include "utils/fmgrprotos.h" +#include "utils/hsearch.h" #include "utils/memutils.h" #include "utils/portal.h" #include "utils/ps_status.h" @@ -55,6 +57,7 @@ #include "distributed/background_jobs.h" #include "distributed/citus_safe_lib.h" +#include "distributed/hash_helpers.h" #include "distributed/listutils.h" #include "distributed/maintenanced.h" #include "distributed/metadata_cache.h" @@ -71,16 +74,38 @@ #define CITUS_BACKGROUND_TASK_KEY_TASK_ID 4 #define CITUS_BACKGROUND_TASK_NKEYS 5 -static BackgroundWorkerHandle * StartCitusBackgroundTaskExecuter(char *database, +static BackgroundWorkerHandle * StartCitusBackgroundTaskExecutor(char *database, char *user, char *command, int64 taskId, dsm_segment **pSegment); static void ExecuteSqlString(const char *sql); -static void ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, - bool *hadError); +static shm_mq_result ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, + bool *hadError); static void UpdateDependingTasks(BackgroundTask *task); static int64 CalculateBackoffDelay(int retryCount); +static bool NewExecutorExceedsCitusLimit( + QueueMonitorExecutionContext *queueMonitorExecutionContext); +static bool NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle, + QueueMonitorExecutionContext * + queueMonitorExecutionContext); +static bool AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask, + QueueMonitorExecutionContext * + queueMonitorExecutionContext); +static void AssignRunnableTasks( + QueueMonitorExecutionContext *queueMonitorExecutionContext); +static List * GetRunningTaskEntries(HTAB *currentExecutors); +static shm_mq_result ReadFromExecutorQueue( + BackgroundExecutorHashEntry *backgroundExecutorHashEntry, + bool *hadError); +static void CheckAndResetLastWorkerAllocationFailure( + QueueMonitorExecutionContext *queueMonitorExecutionContext); +static TaskExecutionStatus TaskConcurrentCancelCheck( + TaskExecutionContext *taskExecutionContext); +static TaskExecutionStatus ConsumeExecutorQueue( + TaskExecutionContext *taskExecutionContext); +static void TaskHadError(TaskExecutionContext *taskExecutionContext); +static void TaskEnded(TaskExecutionContext *taskExecutionContext); PG_FUNCTION_INFO_V1(citus_job_cancel); PG_FUNCTION_INFO_V1(citus_job_wait); @@ -310,6 +335,412 @@ CitusBackgroundTaskQueueMonitorErrorCallback(void *arg) } +/* + * NewExecutorExceedsCitusLimit returns true if currently we reached Citus' max worker count. + * It also sleeps 1 sec to let running tasks progress so that we have better chance to not hit + * that limit again. + */ +static bool +NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *queueMonitorExecutionContext) +{ + if (queueMonitorExecutionContext->currentExecutorCount >= MaxBackgroundTaskExecutors) + { + /* + * we hit to Citus' maximum task executor count. Warn for the first failure + * after a successful worker allocation happened, that is, we do not warn if + * we repeatedly come here without a successful worker allocation. + */ + if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime == 0) + { + ereport(WARNING, (errmsg("unable to start background worker for " + "background task execution"), + errdetail( + "Already reached the maximum number of task " + "executors: %ld/%d", + queueMonitorExecutionContext->currentExecutorCount, + MaxBackgroundTaskExecutors))); + queueMonitorExecutionContext->backgroundWorkerFailedStartTime = + GetCurrentTimestamp(); + } + + return true; + } + + return false; +} + + +/* + * NewExecutorExceedsPgMaxWorkers returns true if currently we reached Postgres' max worker count. + * It also sleeps 1 sec to let running tasks progress so that we have better chance to not hit + * that limit again. + */ +static bool +NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle, + QueueMonitorExecutionContext *queueMonitorExecutionContext) +{ + if (handle == NULL) + { + /* + * we are unable to start a background worker for the task execution. + * Probably we are out of background workers. Warn for the first failure + * after a successful worker allocation happened, that is, we do not warn if + * we repeatedly come here without a successful worker allocation. + */ + if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime == 0) + { + ereport(WARNING, (errmsg( + "unable to start background worker for " + "background task execution"), + errdetail( + "Current number of task " + "executors: %ld/%d", + queueMonitorExecutionContext->currentExecutorCount, + MaxBackgroundTaskExecutors))); + queueMonitorExecutionContext->backgroundWorkerFailedStartTime = + GetCurrentTimestamp(); + } + + return true; + } + + return false; +} + + +/* + * AssignRunnableTaskToNewExecutor tries to assign given runnable task to a new task executor. + * It reports the assignment status as return value. + */ +static bool +AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask, + QueueMonitorExecutionContext *queueMonitorExecutionContext) +{ + Assert(runnableTask && runnableTask->status == BACKGROUND_TASK_STATUS_RUNNABLE); + + if (NewExecutorExceedsCitusLimit(queueMonitorExecutionContext)) + { + /* escape if we hit citus executor limit */ + return false; + } + + char *databaseName = get_database_name(MyDatabaseId); + char *userName = GetUserNameFromId(runnableTask->owner, false); + + /* try to create new executor and make it alive during queue monitor lifetime */ + MemoryContext oldContext = MemoryContextSwitchTo(queueMonitorExecutionContext->ctx); + dsm_segment *seg = NULL; + BackgroundWorkerHandle *handle = + StartCitusBackgroundTaskExecutor(databaseName, userName, runnableTask->command, + runnableTask->taskid, &seg); + MemoryContextSwitchTo(oldContext); + + if (NewExecutorExceedsPgMaxWorkers(handle, queueMonitorExecutionContext)) + { + /* escape if we hit pg worker limit */ + return false; + } + + /* assign the allocated executor to the runnable task and increment total executor count */ + bool handleEntryFound = false; + BackgroundExecutorHashEntry *handleEntry = hash_search( + queueMonitorExecutionContext->currentExecutors, + &runnableTask->taskid, + HASH_ENTER, &handleEntryFound); + Assert(!handleEntryFound); + handleEntry->handle = handle; + handleEntry->seg = seg; + + /* reset worker allocation timestamp and log time elapsed since the last failure */ + CheckAndResetLastWorkerAllocationFailure(queueMonitorExecutionContext); + + /* make message alive during queue monitor lifetime */ + oldContext = MemoryContextSwitchTo(queueMonitorExecutionContext->ctx); + handleEntry->message = makeStringInfo(); + MemoryContextSwitchTo(oldContext); + + /* set runnable task's status as running */ + runnableTask->status = BACKGROUND_TASK_STATUS_RUNNING; + UpdateBackgroundTask(runnableTask); + UpdateBackgroundJob(runnableTask->jobid); + + queueMonitorExecutionContext->currentExecutorCount++; + + ereport(LOG, (errmsg("task jobid/taskid started: %ld/%ld", + runnableTask->jobid, runnableTask->taskid))); + + return true; +} + + +/* + * AssignRunnableTasks tries to assign all runnable tasks to a new task executor. + * If an assignment fails, it stops in case we hit some limitation. We do not load + * all the runnable tasks in memory at once as it can load memory much + we have + * limited worker to which we can assign task. + */ +static void +AssignRunnableTasks(QueueMonitorExecutionContext *queueMonitorExecutionContext) +{ + BackgroundTask *runnableTask = NULL; + bool taskAssigned = false; + do { + /* fetch a runnable task from catalog */ + runnableTask = GetRunnableBackgroundTask(); + if (runnableTask) + { + taskAssigned = AssignRunnableTaskToNewExecutor(runnableTask, + queueMonitorExecutionContext); + } + else + { + taskAssigned = false; + } + } while (taskAssigned); +} + + +/* + * GetRunningTaskEntries returns list of BackgroundExecutorHashEntry from given hash table + */ +static List * +GetRunningTaskEntries(HTAB *currentExecutors) +{ + List *runningTaskEntries = NIL; + + HASH_SEQ_STATUS status; + BackgroundExecutorHashEntry *backgroundExecutorHashEntry; + foreach_htab(backgroundExecutorHashEntry, &status, currentExecutors) + { + runningTaskEntries = lappend(runningTaskEntries, backgroundExecutorHashEntry); + } + + return runningTaskEntries; +} + + +/* + * CheckAndResetLastWorkerAllocationFailure checks the last time background worker allocation + * is failed. If it is set, we print how long we have waited to successfully allocate the worker. + * It also resets the failure timestamp. + */ +static void +CheckAndResetLastWorkerAllocationFailure( + QueueMonitorExecutionContext *queueMonitorExecutionContext) +{ + if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime > 0) + { + /* + * we had a delay in starting the background worker for task execution. Report + * the actual delay and reset the time. This allows a subsequent task to + * report again if it can't start a background worker directly. + */ + long secs = 0; + int microsecs = 0; + TimestampDifference( + queueMonitorExecutionContext-> + backgroundWorkerFailedStartTime, + GetCurrentTimestamp(), + &secs, µsecs); + ereport(LOG, (errmsg( + "able to start a background worker with %ld seconds" + "delay", secs))); + + queueMonitorExecutionContext->backgroundWorkerFailedStartTime = 0; + } +} + + +/* + * TaskConcurrentCancelCheck checks if concurrent task cancellation or removal happened by + * taking Exclusive lock. It mutates task's pid and status. Returns execution status for the + * task. + */ +static TaskExecutionStatus +TaskConcurrentCancelCheck(TaskExecutionContext *taskExecutionContext) +{ + /* + * here we take exclusive lock on pg_dist_background_task table to prevent a + * concurrent modification. A separate process could have cancelled or removed + * the task by now, they would not see the pid and status update, so it is our + * responsibility to stop the backend and update the pid and status. + * + * The lock will release on transaction commit. + */ + LockRelationOid(DistBackgroundTaskRelationId(), ExclusiveLock); + + BackgroundExecutorHashEntry *handleEntry = taskExecutionContext->handleEntry; + BackgroundTask *task = GetBackgroundTaskByTaskId(handleEntry->taskid); + taskExecutionContext->task = task; + + if (!task || task->status == BACKGROUND_TASK_STATUS_CANCELLING) + { + /* + * being in that step means that a concurrent cancel or removal happened. we should + * mark task status as cancelled. We also want to reflect cancel message by consuming + * task executor queue. + */ + bool hadError = false; + ReadFromExecutorQueue(handleEntry, &hadError); + + ereport(LOG, (errmsg( + "task jobid/taskid is cancelled: %ld/%ld", + task->jobid, task->taskid))); + + task->status = BACKGROUND_TASK_STATUS_CANCELLED; + + return TASK_EXECUTION_STATUS_CANCELLED; + } + else + { + /* + * now that we have verified the task has not been cancelled and still exist we + * update it to reflect the new state. If task is already in running status, + * the operation is idempotent. But for runnable tasks, we make their status + * as running. + */ + + pid_t pid = 0; + GetBackgroundWorkerPid(handleEntry->handle, &pid); + task->status = BACKGROUND_TASK_STATUS_RUNNING; + SET_NULLABLE_FIELD(task, pid, pid); + + /* Update task status to indicate it is running */ + UpdateBackgroundTask(task); + UpdateBackgroundJob(task->jobid); + + return TASK_EXECUTION_STATUS_RUNNING; + } +} + + +/* + * ConsumeExecutorQueue consumes executor's shared memory queue and returns execution status + * for the task. + */ +static TaskExecutionStatus +ConsumeExecutorQueue(TaskExecutionContext *taskExecutionContext) +{ + BackgroundExecutorHashEntry *handleEntry = taskExecutionContext->handleEntry; + BackgroundTask *task = taskExecutionContext->task; + + /* + * we consume task executor response queue. + * possible response codes can lead us different steps below. + */ + bool hadError = false; + shm_mq_result mq_res = ReadFromExecutorQueue(handleEntry, &hadError); + + if (hadError) + { + ereport(LOG, (errmsg("task jobid/taskid failed: %ld/%ld", + task->jobid, task->taskid))); + + return TASK_EXECUTION_STATUS_ERROR; + } + else if (mq_res == SHM_MQ_DETACHED) + { + ereport(LOG, (errmsg("task jobid/taskid succeeded: %ld/%ld", + task->jobid, task->taskid))); + + /* update task status as done. */ + task->status = BACKGROUND_TASK_STATUS_DONE; + + return TASK_EXECUTION_STATUS_SUCCESS; + } + else + { + /* still running the task */ + Assert(mq_res == SHM_MQ_WOULD_BLOCK); + return TASK_EXECUTION_STATUS_WOULDBLOCK; + } +} + + +/* + * TaskHadError updates retry count of a failed task inside taskExecutionContext. + * If maximum retry count is reached, task status is marked as failed. Otherwise, backoff + * delay is calculated, notBefore time is updated and the task is marked as runnable. + */ +static void +TaskHadError(TaskExecutionContext *taskExecutionContext) +{ + BackgroundTask *task = taskExecutionContext->task; + + /* + * when we had an error in response queue, we need to decide if we want to retry (keep the + * runnable state), or move to error state + */ + + if (!task->retry_count) + { + SET_NULLABLE_FIELD(task, retry_count, 1); + } + else + { + (*task->retry_count)++; + } + + /* + * based on the retry count we either transition the task to its error + * state, or we calculate a new backoff time for future execution. + */ + int64 delayMs = CalculateBackoffDelay(*(task->retry_count)); + if (delayMs < 0) + { + task->status = BACKGROUND_TASK_STATUS_ERROR; + UNSET_NULLABLE_FIELD(task, not_before); + } + else + { + TimestampTz notBefore = TimestampTzPlusMilliseconds( + GetCurrentTimestamp(), delayMs); + SET_NULLABLE_FIELD(task, not_before, notBefore); + + task->status = BACKGROUND_TASK_STATUS_RUNNABLE; + } + + TaskEnded(taskExecutionContext); +} + + +/* + * TaskEnded updates task inside taskExecutionContext. It also updates depending + * tasks and the job to which task belongs. At the end, it also updates executor map and + * count inside queueMonitorExecutionContext after terminating the executor. + */ +static void +TaskEnded(TaskExecutionContext *taskExecutionContext) +{ + QueueMonitorExecutionContext *queueMonitorExecutionContext = + taskExecutionContext->queueMonitorExecutionContext; + + HTAB *currentExecutors = queueMonitorExecutionContext->currentExecutors; + BackgroundExecutorHashEntry *handleEntry = taskExecutionContext->handleEntry; + BackgroundTask *task = taskExecutionContext->task; + + /* + * we update task and job fields. We also update depending jobs. + * At the end, do cleanup. + */ + UNSET_NULLABLE_FIELD(task, pid); + task->message = handleEntry->message->data; + + UpdateBackgroundTask(task); + UpdateDependingTasks(task); + UpdateBackgroundJob(task->jobid); + + /* we are sure that at least one task did not block on current iteration */ + queueMonitorExecutionContext->allTasksWouldBlock = false; + + hash_search(currentExecutors, &task->taskid, + HASH_REMOVE, NULL); + TerminateBackgroundWorker(handleEntry->handle); + dsm_detach(handleEntry->seg); + queueMonitorExecutionContext->currentExecutorCount--; +} + + /* * CitusBackgroundTaskQueueMonitorMain is the main entry point for the background worker * running the background tasks queue monitor. @@ -318,8 +749,11 @@ CitusBackgroundTaskQueueMonitorErrorCallback(void *arg) * tasks and jobs state machines associated with the task. When no new task can be found * it will exit(0) and lets the maintenance daemon poll for new tasks. * - * The main loop is currently implemented as a synchronous loop stepping through the task - * and update its state before going to the next. + * The main loop is implemented as asynchronous loop stepping through the task + * and update its state before going to the next. Loop assigns runnable tasks to new task + * executors as much as possible. If the max task executor limit is hit, the tasks will be + * waiting in runnable status until currently running tasks finish. Each parallel worker + * executes one task at a time without blocking each other by using nonblocking api. */ void CitusBackgroundTaskQueueMonitorMain(Datum arg) @@ -336,12 +770,22 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) /* connect to database, after that we can actually access catalogs */ BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0); + /* + * save old context until monitor loop exits, we use backgroundTaskContext for + * all allocations. + */ + MemoryContext firstContext = CurrentMemoryContext; + MemoryContext backgroundTaskContext = AllocSetContextCreate(TopMemoryContext, + "BackgroundTaskContext", + ALLOCSET_DEFAULT_SIZES); + StartTransactionCommand(); PushActiveSnapshot(GetTransactionSnapshot()); - /* load database name and copy to a memory context that survives the transaction */ const char *databasename = get_database_name(MyDatabaseId); - MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext); + + /* make databasename alive during queue monitor lifetime */ + MemoryContext oldContext = MemoryContextSwitchTo(backgroundTaskContext); databasename = pstrdup(databasename); MemoryContextSwitchTo(oldContext); @@ -385,292 +829,147 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) ereport(DEBUG1, (errmsg("started citus background task queue monitor"))); - MemoryContext perTaskContext = AllocSetContextCreate(CurrentMemoryContext, - "PerTaskContext", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - /* * First we find all jobs that are running, we need to check if they are still running * if not reset their state back to scheduled. */ - { - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); - ResetRunningBackgroundTasks(); + ResetRunningBackgroundTasks(); - PopActiveSnapshot(); - CommitTransactionCommand(); - } + PopActiveSnapshot(); + CommitTransactionCommand(); - - MemoryContext oldContextPerJob = MemoryContextSwitchTo(perTaskContext); - TimestampTz backgroundWorkerFailedStartTime = 0; + /* create a map to store parallel task executors */ + HTAB *currentExecutors = CreateSimpleHashWithNameAndSize(int64, + BackgroundExecutorHashEntry, + "Background Executor Hash", + MAX_BG_TASK_EXECUTORS); /* - * Although this variable could be omitted it does quickly and adequately describe - * till when we are looping. + * monitor execution context that is useful during the monitor loop. + * we store current executor count, last background failure timestamp, + * currently executed task context and also a memory context to persist + * some allocations throughout the loop. */ - bool hasTasks = true; - while (hasTasks) - { - MemoryContextReset(perTaskContext); + QueueMonitorExecutionContext queueMonitorExecutionContext = { + .currentExecutorCount = 0, + .backgroundWorkerFailedStartTime = 0, + .allTasksWouldBlock = true, + .currentExecutors = currentExecutors, + .ctx = backgroundTaskContext + }; + /* loop exits if there is no running or runnable tasks left */ + bool hasAnyTask = true; + while (hasAnyTask) + { + /* handle signals */ CHECK_FOR_INTERRUPTS(); + /* invalidate cache for new data in catalog */ InvalidateMetadataSystemCache(); + + /* assign runnable tasks, if any, to new task executors in a transaction */ + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + AssignRunnableTasks(&queueMonitorExecutionContext); + PopActiveSnapshot(); + CommitTransactionCommand(); + + /* get running task entries from hash table */ + List *runningTaskEntries = GetRunningTaskEntries( + queueMonitorExecutionContext.currentExecutors); + hasAnyTask = list_length(runningTaskEntries) > 0; + + /* useful to sleep if all tasks ewouldblock on current iteration */ + queueMonitorExecutionContext.allTasksWouldBlock = true; + + /* monitor executors inside transaction */ StartTransactionCommand(); PushActiveSnapshot(GetTransactionSnapshot()); - /* - * We need to load the task into the perTaskContext as we will switch contexts - * later due to the committing and starting of new transactions - */ - oldContext = MemoryContextSwitchTo(perTaskContext); - BackgroundTask *task = GetRunnableBackgroundTask(); - - if (!task) + /* iterate over all handle entries and monitor each task's output */ + BackgroundExecutorHashEntry *handleEntry = NULL; + foreach_ptr(handleEntry, runningTaskEntries) { - MemoryContextSwitchTo(oldContext); + /* create task execution context and assign it to queueMonitorExecutionContext */ + TaskExecutionContext taskExecutionContext = { + .queueMonitorExecutionContext = &queueMonitorExecutionContext, + .handleEntry = handleEntry, + .task = NULL + }; - PopActiveSnapshot(); - CommitTransactionCommand(); + /* check if concurrent cancellation occurred */ + TaskExecutionStatus taskExecutionStatus = TaskConcurrentCancelCheck( + &taskExecutionContext); - hasTasks = false; - break; + /* + * check task status. If it is cancelled, we do not need to consume queue + * as we already consumed the queue. + */ + if (taskExecutionStatus == TASK_EXECUTION_STATUS_CANCELLED) + { + TaskEnded(&taskExecutionContext); + continue; + } + + taskExecutionStatus = ConsumeExecutorQueue(&taskExecutionContext); + if (taskExecutionStatus == TASK_EXECUTION_STATUS_ERROR) + { + TaskHadError(&taskExecutionContext); + } + else if (taskExecutionStatus == TASK_EXECUTION_STATUS_SUCCESS) + { + TaskEnded(&taskExecutionContext); + } } - /* we load the database name and username here as we are still in a transaction */ - char *databaseName = get_database_name(MyDatabaseId); - char *userName = GetUserNameFromId(task->owner, false); - - MemoryContextSwitchTo(oldContext); - PopActiveSnapshot(); CommitTransactionCommand(); - MemoryContextSwitchTo(perTaskContext); - - /* - * The background worker needs to be started outside of the transaction, otherwise - * it will complain about leaking shared memory segments used, among other things, - * to communicate the output of the backend. - */ - dsm_segment *seg = NULL; - BackgroundWorkerHandle *handle = - StartCitusBackgroundTaskExecuter(databaseName, userName, task->command, - task->taskid, &seg); - - if (handle == NULL) + if (queueMonitorExecutionContext.allTasksWouldBlock) { /* - * We are unable to start a background worker for the task execution. - * Probably we are out of background workers. Warn once and restart the loop - * after a short sleep. + * sleep to lower cpu consumption if all tasks responded with EWOULD_BLOCK on the last iteration. + * That will also let those tasks to progress to generate some output probably. */ - if (backgroundWorkerFailedStartTime == 0) - { - ereport(WARNING, (errmsg("unable to start background worker for " - "background task execution"))); - backgroundWorkerFailedStartTime = GetCurrentTimestamp(); - } - const long delay_ms = 1000; - (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | + WL_EXIT_ON_PM_DEATH, delay_ms, WAIT_EVENT_PG_SLEEP); ResetLatch(MyLatch); - - continue; } - - if (backgroundWorkerFailedStartTime > 0) - { - /* - * We had a delay in starting the background worker for task execution. Report - * the actual delay and reset the time. This allows a subsequent task to - * report again if it can't start a background worker directly. - */ - long secs = 0; - int microsecs = 0; - TimestampDifference(backgroundWorkerFailedStartTime, GetCurrentTimestamp(), - &secs, µsecs); - ereport(LOG, (errmsg("able to start a background worker with %ld seconds" - "delay", secs))); - - backgroundWorkerFailedStartTime = 0; - } - - - pid_t pid = 0; - GetBackgroundWorkerPid(handle, &pid); - - ereport(LOG, (errmsg("found task with jobid/taskid: %ld/%ld", - task->jobid, task->taskid))); - - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - - /* - * Reload task while holding a new ExclusiveLock on the table. A separate process - * could have cancelled or removed the task by now, they would not see the pid and - * status update, so it is our responsibility to stop the backend and _not_ write - * the pid and running status. - * - * The lock will release on transaction commit. - */ - LockRelationOid(DistBackgroundTaskRelationId(), ExclusiveLock); - - oldContext = MemoryContextSwitchTo(perTaskContext); - task = GetBackgroundTaskByTaskId(task->taskid); - MemoryContextSwitchTo(oldContext); - - if (!task || task->status == BACKGROUND_TASK_STATUS_CANCELLING || - task->status == BACKGROUND_TASK_STATUS_CANCELLED) - { - task->status = BACKGROUND_TASK_STATUS_CANCELLED; - UpdateBackgroundTask(task); - UpdateBackgroundJob(task->jobid); - - PopActiveSnapshot(); - CommitTransactionCommand(); - - /* - * Terminate backend and release shared memory to not leak these resources - * across iterations. - */ - TerminateBackgroundWorker(handle); - dsm_detach(seg); - - /* there could be an other task ready to run, let a new loop decide */ - continue; - } - - /* - * Now that we have verified the task has not been cancelled and still exist we - * update it to reflect the new state - */ - task->status = BACKGROUND_TASK_STATUS_RUNNING; - SET_NULLABLE_FIELD(task, pid, pid); - - /* Update task status to indicate it is running */ - UpdateBackgroundTask(task); - UpdateBackgroundJob(task->jobid); - - PopActiveSnapshot(); - CommitTransactionCommand(); - - MemoryContextSwitchTo(perTaskContext); - - bool hadError = false; - StringInfoData message = { 0 }; - initStringInfo(&message); - - { - shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_TASK_MAGIC, - dsm_segment_address(seg)); - shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false); - shm_mq_handle *responseq = shm_mq_attach(mq, seg, NULL); - - /* - * We consume the complete shm_mq here as ConsumeTaskWorkerOutput loops till - * it reaches a SHM_MQ_DETACHED response. - */ - ConsumeTaskWorkerOutput(responseq, &message, &hadError); - - shm_mq_detach(responseq); - } - - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - - /* - * Same as before, we need to lock pg_dist_background_task in a way where we can - * check if there had been a concurrent cancel. - */ - LockRelationOid(DistBackgroundTaskRelationId(), ExclusiveLock); - - oldContext = MemoryContextSwitchTo(perTaskContext); - task = GetBackgroundTaskByTaskId(task->taskid); - MemoryContextSwitchTo(oldContext); - - if (!task || task->status == BACKGROUND_TASK_STATUS_CANCELLING || - task->status == BACKGROUND_TASK_STATUS_CANCELLED) - { - /* - * A concurrent cancel has happened or the task has disappeared, we are not - * retrying or changing state, we will only reflect the message onto the task - * and for completeness we update the job aswell, this should be a no-op - * - * We still need to release the shared memory and xact before looping - */ - - dsm_detach(seg); - - task->status = BACKGROUND_TASK_STATUS_CANCELLED; - task->message = message.data; - UpdateBackgroundTask(task); - UpdateBackgroundJob(task->jobid); - - PopActiveSnapshot(); - CommitTransactionCommand(); - - continue; - } - else if (hadError) - { - /* - * When we had an error we need to decide if we want to retry (keep the - * runnable state), or move to error state - */ - if (!task->retry_count) - { - SET_NULLABLE_FIELD(task, retry_count, 1); - } - else - { - (*task->retry_count)++; - } - - /* - * based on the retry count we either transition the task to its error - * state, or we calculate a new backoff time for future execution. - */ - int64 delayMs = CalculateBackoffDelay(*(task->retry_count)); - if (delayMs < 0) - { - task->status = BACKGROUND_TASK_STATUS_ERROR; - UNSET_NULLABLE_FIELD(task, not_before); - } - else - { - TimestampTz notBefore = TimestampTzPlusMilliseconds( - GetCurrentTimestamp(), delayMs); - SET_NULLABLE_FIELD(task, not_before, notBefore); - - task->status = BACKGROUND_TASK_STATUS_RUNNABLE; - } - } - else - { - task->status = BACKGROUND_TASK_STATUS_DONE; - } - UNSET_NULLABLE_FIELD(task, pid); - task->message = message.data; - - UpdateBackgroundTask(task); - UpdateDependingTasks(task); - UpdateBackgroundJob(task->jobid); - - dsm_detach(seg); - - PopActiveSnapshot(); - CommitTransactionCommand(); } - MemoryContextSwitchTo(oldContextPerJob); - MemoryContextDelete(perTaskContext); + MemoryContextSwitchTo(firstContext); + MemoryContextDelete(backgroundTaskContext); + + proc_exit(0); +} + + +/* + * ReadFromExecutorQueue reads from task executor's response queue into the message. + * It also sets hadError flag if an error response is encountered in the queue. + */ +static shm_mq_result +ReadFromExecutorQueue(BackgroundExecutorHashEntry *backgroundExecutorHashEntry, + bool *hadError) +{ + dsm_segment *seg = backgroundExecutorHashEntry->seg; + shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_TASK_MAGIC, + dsm_segment_address(seg)); + shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false); + shm_mq_handle *responseq = shm_mq_attach(mq, seg, NULL); + + /* + * Consume background executor's queue and get a response code. + */ + StringInfo message = backgroundExecutorHashEntry->message; + shm_mq_result mq_res = ConsumeTaskWorkerOutput(responseq, message, hadError); + return mq_res; } @@ -850,12 +1149,15 @@ UpdateDependingTasks(BackgroundTask *task) /* - * ConsumeTaskWorkerOutput consumes the output of an executor and mutates the - * BackgroundTask object to reflect changes like the message and status on the task. + * ConsumeTaskWorkerOutput consumes the output of an executor and sets the message as + * the last message read from the queue. It also sets hadError as true if executor had + * error. */ -static void +static shm_mq_result ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, bool *hadError) { + shm_mq_result res; + /* * Message-parsing routines operate on a null-terminated StringInfo, * so we must construct one. @@ -868,13 +1170,12 @@ ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, bool *hadE resetStringInfo(&msg); /* - * Get next message. Currently blocking, when multiple backends get implemented it - * should switch to a non-blocking receive + * non-blocking receive to not block other bg workers */ Size nbytes = 0; void *data = NULL; - const bool noWait = false; - shm_mq_result res = shm_mq_receive(responseq, &nbytes, &data, noWait); + const bool noWait = true; + res = shm_mq_receive(responseq, &nbytes, &data, noWait); if (res != SHM_MQ_SUCCESS) { @@ -958,6 +1259,7 @@ ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, bool *hadE } pfree(msg.data); + return res; } @@ -986,6 +1288,7 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) Size segsize = shm_toc_estimate(&e); dsm_segment *seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS); + if (seg == NULL) { ereport(ERROR, @@ -994,6 +1297,13 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) return NULL; } + /* + * when we have CurrentResourceOwner != NULL, segment will be released upon CurrentResourceOwner release, + * but we may consume the queue in segment even after CurrentResourceOwner released. 'dsm_pin_mapping' helps + * persisting the segment until the session ends or the segment is detached explicitly by 'dsm_detach'. + */ + dsm_pin_mapping(seg); + shm_toc *toc = shm_toc_create(CITUS_BACKGROUND_TASK_MAGIC, dsm_segment_address(seg), segsize); @@ -1020,27 +1330,20 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) *taskIdTarget = taskId; shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, taskIdTarget); - /* - * Attach the queue before launching a worker, so that we'll automatically - * detach the queue if we error out. (Otherwise, the worker might sit - * there trying to write the queue long after we've gone away.) - */ - MemoryContext oldcontext = MemoryContextSwitchTo(TopMemoryContext); shm_mq_attach(mq, seg, NULL); - MemoryContextSwitchTo(oldcontext); return seg; } /* - * StartCitusBackgroundTaskExecuter start a new background worker for the execution of a + * StartCitusBackgroundTaskExecutor start a new background worker for the execution of a * background task. Callers interested in the shared memory segment that is created * between the background worker and the current backend can pass in a segOut to get a * pointer to the dynamic shared memory. */ static BackgroundWorkerHandle * -StartCitusBackgroundTaskExecuter(char *database, char *user, char *command, int64 taskId, +StartCitusBackgroundTaskExecutor(char *database, char *user, char *command, int64 taskId, dsm_segment **pSegment) { dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId); @@ -1058,7 +1361,7 @@ StartCitusBackgroundTaskExecuter(char *database, char *user, char *command, int6 worker.bgw_restart_time = BGW_NEVER_RESTART; strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), - "CitusBackgroundTaskExecuter"); + "CitusBackgroundTaskExecutor"); worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); worker.bgw_notify_pid = MyProcPid; @@ -1084,36 +1387,36 @@ StartCitusBackgroundTaskExecuter(char *database, char *user, char *command, int6 /* * context for any log/error messages emitted from the background task executor. */ -typedef struct CitusBackgroundJobExecuterErrorCallbackContext +typedef struct CitusBackgroundJobExecutorErrorCallbackContext { const char *database; const char *username; -} CitusBackgroundJobExecuterErrorCallbackContext; +} CitusBackgroundJobExecutorErrorCallbackContext; /* - * CitusBackgroundJobExecuterErrorCallback is a callback handler that gets called for any + * CitusBackgroundJobExecutorErrorCallback is a callback handler that gets called for any * ereport to add extra context to the message. */ static void -CitusBackgroundJobExecuterErrorCallback(void *arg) +CitusBackgroundJobExecutorErrorCallback(void *arg) { - CitusBackgroundJobExecuterErrorCallbackContext *context = - (CitusBackgroundJobExecuterErrorCallbackContext *) arg; + CitusBackgroundJobExecutorErrorCallbackContext *context = + (CitusBackgroundJobExecutorErrorCallbackContext *) arg; errcontext("Citus Background Task Queue Executor: %s/%s", context->database, context->username); } /* - * CitusBackgroundTaskExecuter is the main function of the background tasks queue + * CitusBackgroundTaskExecutor is the main function of the background tasks queue * executor. This backend attaches to a shared memory segment as identified by the * main_arg of the background worker. * * This is mostly based on the background worker logic in pg_cron */ void -CitusBackgroundTaskExecuter(Datum main_arg) +CitusBackgroundTaskExecutor(Datum main_arg) { /* * TODO figure out if we need this signal handler that is in pgcron @@ -1121,15 +1424,6 @@ CitusBackgroundTaskExecuter(Datum main_arg) */ BackgroundWorkerUnblockSignals(); - /* Set up a memory context and resource owner. */ - Assert(CurrentResourceOwner == NULL); - CurrentResourceOwner = ResourceOwnerCreate(NULL, "citus background job"); - CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext, - "citus background job execution", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - /* Set up a dynamic shared memory segment. */ dsm_segment *seg = dsm_attach(DatumGetInt32(main_arg)); if (seg == NULL) @@ -1159,11 +1453,11 @@ CitusBackgroundTaskExecuter(Datum main_arg) /* setup error context to indicate the errors came from a running background task */ ErrorContextCallback errorCallback = { 0 }; - CitusBackgroundJobExecuterErrorCallbackContext context = { + CitusBackgroundJobExecutorErrorCallbackContext context = { .database = database, .username = username, }; - errorCallback.callback = CitusBackgroundJobExecuterErrorCallback; + errorCallback.callback = CitusBackgroundJobExecutorErrorCallback; errorCallback.arg = (void *) &context; errorCallback.previous = error_context_stack; error_context_stack = &errorCallback; diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 3836c506a..6cc0330a9 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -98,6 +98,7 @@ double DistributedDeadlockDetectionTimeoutFactor = 2.0; int Recover2PCInterval = 60000; int DeferShardDeleteInterval = 15000; int BackgroundTaskQueueCheckInterval = 5000; +int MaxBackgroundTaskExecutors = 4; /* config variables for metadata sync timeout */ int MetadataSyncInterval = 60000; diff --git a/src/include/distributed/background_jobs.h b/src/include/distributed/background_jobs.h index d814a2165..a83928e82 100644 --- a/src/include/distributed/background_jobs.h +++ b/src/include/distributed/background_jobs.h @@ -17,10 +17,77 @@ #include "distributed/metadata_utility.h" + +/* + * BackgroundExecutorHashEntry hash table entry to refer existing task executors + */ +typedef struct BackgroundExecutorHashEntry +{ + /* hash key must be the first to hash correctly */ + int64 taskid; + + BackgroundWorkerHandle *handle; + dsm_segment *seg; + StringInfo message; +} BackgroundExecutorHashEntry; + + +/* + * TaskExecutionStatus status for task execution in queue monitor + */ +typedef enum TaskExecutionStatus +{ + TASK_EXECUTION_STATUS_SUCCESS = 0, + TASK_EXECUTION_STATUS_ERROR, + TASK_EXECUTION_STATUS_CANCELLED, + TASK_EXECUTION_STATUS_RUNNING, + TASK_EXECUTION_STATUS_WOULDBLOCK +} TaskExecutionStatus; + + +/* + * QueueMonitorExecutionContext encapsulates info related to executors and tasks + * in queue monitor + */ +typedef struct QueueMonitorExecutionContext +{ + /* current total # of parallel task executors */ + int64 currentExecutorCount; + + /* map of current executors */ + HTAB *currentExecutors; + + /* last background allocation failure timestamp */ + TimestampTz backgroundWorkerFailedStartTime; + + /* useful to track if all tasks EWOULDBLOCK'd at current iteration */ + bool allTasksWouldBlock; + + /* context for monitor related allocations */ + MemoryContext ctx; +} QueueMonitorExecutionContext; + + +/* + * TaskExecutionContext encapsulates info for currently executed task in queue monitor + */ +typedef struct TaskExecutionContext +{ + /* active background executor entry */ + BackgroundExecutorHashEntry *handleEntry; + + /* active background task */ + BackgroundTask *task; + + /* context for queue monitor */ + QueueMonitorExecutionContext *queueMonitorExecutionContext; +} TaskExecutionContext; + + extern BackgroundWorkerHandle * StartCitusBackgroundTaskQueueMonitor(Oid database, Oid extensionOwner); extern void CitusBackgroundTaskQueueMonitorMain(Datum arg); -extern void CitusBackgroundTaskExecuter(Datum main_arg); +extern void CitusBackgroundTaskExecutor(Datum main_arg); extern Datum citus_job_cancel(PG_FUNCTION_ARGS); extern Datum citus_job_wait(PG_FUNCTION_ARGS); diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index 6a1f76664..af983c428 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -11,9 +11,12 @@ #ifndef CITUS_SHARD_CLEANER_H #define CITUS_SHARD_CLEANER_H +#define MAX_BG_TASK_EXECUTORS 1000 + /* GUC to configure deferred shard deletion */ extern int DeferShardDeleteInterval; extern int BackgroundTaskQueueCheckInterval; +extern int MaxBackgroundTaskExecutors; extern double DesiredPercentFreeAfterMove; extern bool CheckAvailableSpaceBeforeMove; diff --git a/src/test/regress/expected/background_task_queue_monitor.out b/src/test/regress/expected/background_task_queue_monitor.out index 49744771d..7278eebfc 100644 --- a/src/test/regress/expected/background_task_queue_monitor.out +++ b/src/test/regress/expected/background_task_queue_monitor.out @@ -207,6 +207,135 @@ SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE cancelled | f (3 rows) +-- verify that we do not allow parallel task executors more than citus.max_background_task_executors(4 by default) +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id3 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id4 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset +COMMIT; +SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT job_id, task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable) + job_id | task_id | status +--------------------------------------------------------------------- + 7 | 11 | running + 7 | 12 | running + 7 | 13 | running + 8 | 14 | running + 9 | 15 | runnable +(5 rows) + +SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the last runnable task will be running + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id3, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT job_id, task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY job_id, task_id; -- show that last task is running + job_id | task_id | status +--------------------------------------------------------------------- + 7 | 11 | running + 7 | 12 | running + 7 | 13 | running + 8 | 14 | cancelled + 9 | 15 | running +(5 rows) + +SELECT citus_job_cancel(:job_id1); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_cancel(:job_id3); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id1); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id3); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT job_id, task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY job_id, task_id; -- show that multiple cancels worked + job_id | task_id | status +--------------------------------------------------------------------- + 7 | 11 | cancelled + 7 | 12 | cancelled + 7 | 13 | cancelled + 8 | 14 | cancelled + 9 | 15 | cancelled +(5 rows) + +-- verify that task is not starved by currently long running task +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset +COMMIT; +SELECT citus_job_wait(:job_id1, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2, desired_status => 'finished'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT job_id, task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY job_id, task_id; -- show that last task is finished without starvation + job_id | task_id | status +--------------------------------------------------------------------- + 10 | 16 | running + 11 | 17 | done +(2 rows) + +SELECT citus_job_cancel(:job_id1); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + SET client_min_messages TO WARNING; DROP SCHEMA background_task_queue_monitor CASCADE; ALTER SYSTEM RESET citus.background_task_queue_interval; diff --git a/src/test/regress/sql/background_task_queue_monitor.sql b/src/test/regress/sql/background_task_queue_monitor.sql index d8dddc720..4319f2bf8 100644 --- a/src/test/regress/sql/background_task_queue_monitor.sql +++ b/src/test/regress/sql/background_task_queue_monitor.sql @@ -91,6 +91,55 @@ SELECT citus_job_wait(:job_id); -- wait for the job to be cancelled SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; +-- verify that we do not allow parallel task executors more than citus.max_background_task_executors(4 by default) +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id3 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id4 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset +COMMIT; + +SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit + +SELECT job_id, task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable) + +SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the last runnable task will be running +SELECT citus_job_wait(:job_id3, desired_status => 'running'); +SELECT job_id, task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY job_id, task_id; -- show that last task is running + +SELECT citus_job_cancel(:job_id1); +SELECT citus_job_cancel(:job_id3); +SELECT citus_job_wait(:job_id1); +SELECT citus_job_wait(:job_id2); +SELECT citus_job_wait(:job_id3); +SELECT job_id, task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY job_id, task_id; -- show that multiple cancels worked + + +-- verify that task is not starved by currently long running task +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset +COMMIT; + +SELECT citus_job_wait(:job_id1, desired_status => 'running'); +SELECT citus_job_wait(:job_id2, desired_status => 'finished'); +SELECT job_id, task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY job_id, task_id; -- show that last task is finished without starvation +SELECT citus_job_cancel(:job_id1); + SET client_min_messages TO WARNING; DROP SCHEMA background_task_queue_monitor CASCADE;