diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 2032b7e65..a502b9219 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -72,12 +72,14 @@ #define CITUS_BACKGROUND_TASK_KEY_COMMAND 2 #define CITUS_BACKGROUND_TASK_KEY_QUEUE 3 #define CITUS_BACKGROUND_TASK_KEY_TASK_ID 4 -#define CITUS_BACKGROUND_TASK_NKEYS 5 +#define CITUS_BACKGROUND_TASK_KEY_JOB_ID 5 +#define CITUS_BACKGROUND_TASK_NKEYS 6 static BackgroundWorkerHandle * StartCitusBackgroundTaskExecutor(char *database, char *user, char *command, int64 taskId, + int64 jobId, dsm_segment **pSegment); static void ExecuteSqlString(const char *sql); static shm_mq_result ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, @@ -106,6 +108,18 @@ static TaskExecutionStatus ConsumeExecutorQueue( TaskExecutionContext *taskExecutionContext); static void TaskHadError(TaskExecutionContext *taskExecutionContext); static void TaskEnded(TaskExecutionContext *taskExecutionContext); +static void TerminateAllTaskExecutors(HTAB *currentExecutors); +static HTAB * GetRunningUniqueJobIds(HTAB *currentExecutors); +static void CancelAllTaskExecutors(HTAB *currentExecutors); +static bool MonitorGotTerminationOrCancellationRequest(); +static void QueueMonitorSigTermHandler(SIGNAL_ARGS); +static void QueueMonitorSigIntHandler(SIGNAL_ARGS); +static void QueueMonitorSigHupHandler(SIGNAL_ARGS); + +/* flags set by signal handlers */ +static volatile sig_atomic_t GotSigterm = false; +static volatile sig_atomic_t GotSigint = false; +static volatile sig_atomic_t GotSighup = false; PG_FUNCTION_INFO_V1(citus_job_cancel); PG_FUNCTION_INFO_V1(citus_job_wait); @@ -337,8 +351,6 @@ CitusBackgroundTaskQueueMonitorErrorCallback(void *arg) /* * NewExecutorExceedsCitusLimit returns true if currently we reached Citus' max worker count. - * It also sleeps 1 sec to let running tasks progress so that we have better chance to not hit - * that limit again. */ static bool NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *queueMonitorExecutionContext) @@ -372,8 +384,6 @@ NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *queueMonitorExecution /* * NewExecutorExceedsPgMaxWorkers returns true if currently we reached Postgres' max worker count. - * It also sleeps 1 sec to let running tasks progress so that we have better chance to not hit - * that limit again. */ static bool NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle, @@ -389,9 +399,8 @@ NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle, */ if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime == 0) { - ereport(WARNING, (errmsg( - "unable to start background worker for " - "background task execution"), + ereport(WARNING, (errmsg("unable to start background worker for " + "background task execution"), errdetail( "Current number of task " "executors: %ld/%d", @@ -432,7 +441,7 @@ AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask, dsm_segment *seg = NULL; BackgroundWorkerHandle *handle = StartCitusBackgroundTaskExecutor(databaseName, userName, runnableTask->command, - runnableTask->taskid, &seg); + runnableTask->taskid, runnableTask->jobid, &seg); MemoryContextSwitchTo(oldContext); if (NewExecutorExceedsPgMaxWorkers(handle, queueMonitorExecutionContext)) @@ -450,6 +459,7 @@ AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask, Assert(!handleEntryFound); handleEntry->handle = handle; handleEntry->seg = seg; + handleEntry->jobid = runnableTask->jobid; /* reset worker allocation timestamp and log time elapsed since the last failure */ CheckAndResetLastWorkerAllocationFailure(queueMonitorExecutionContext); @@ -741,6 +751,138 @@ TaskEnded(TaskExecutionContext *taskExecutionContext) } +/* + * QueueMonitorSigHupHandler handles SIGHUP to update monitor related config params. + */ +static void +QueueMonitorSigHupHandler(SIGNAL_ARGS) +{ + int saved_errno = errno; + + GotSighup = true; + + if (MyProc) + { + SetLatch(&MyProc->procLatch); + } + + errno = saved_errno; +} + + +/* + * MonitorGotTerminationOrCancellationRequest returns true if monitor had SIGTERM or SIGINT signals + */ +static bool +MonitorGotTerminationOrCancellationRequest() +{ + return GotSigterm || GotSigint; +} + + +/* + * QueueMonitorSigTermHandler handles SIGTERM by setting a flag to inform the monitor process + * so that it can terminate active task executors properly. It also sets the latch to awake the + * monitor if it waits on it. + */ +static void +QueueMonitorSigTermHandler(SIGNAL_ARGS) +{ + int saved_errno = errno; + + GotSigterm = true; + + if (MyProc) + { + SetLatch(&MyProc->procLatch); + } + + errno = saved_errno; +} + + +/* + * QueueMonitorSigIntHandler handles SIGINT by setting a flag to inform the monitor process + * so that it can terminate active task executors properly. It also sets the latch to awake the + * monitor if it waits on it. + */ +static void +QueueMonitorSigIntHandler(SIGNAL_ARGS) +{ + int saved_errno = errno; + + GotSigint = true; + + if (MyProc) + { + SetLatch(&MyProc->procLatch); + } + + errno = saved_errno; +} + + +/* + * TerminateAllTaskExecutors terminates task executors given in the hash map. + */ +static void +TerminateAllTaskExecutors(HTAB *currentExecutors) +{ + HASH_SEQ_STATUS status; + BackgroundExecutorHashEntry *backgroundExecutorHashEntry; + foreach_htab(backgroundExecutorHashEntry, &status, currentExecutors) + { + TerminateBackgroundWorker(backgroundExecutorHashEntry->handle); + } +} + + +/* + * GetRunningUniqueJobIds returns unique job ids from currentExecutors + */ +static HTAB * +GetRunningUniqueJobIds(HTAB *currentExecutors) +{ + /* create a set to store unique job ids for currently executing tasks */ + HTAB *uniqueJobIds = CreateSimpleHashSetWithSize(int64, MAX_BG_TASK_EXECUTORS); + + HASH_SEQ_STATUS status; + BackgroundExecutorHashEntry *backgroundExecutorHashEntry; + foreach_htab(backgroundExecutorHashEntry, &status, currentExecutors) + { + hash_search(uniqueJobIds, &backgroundExecutorHashEntry->jobid, HASH_ENTER, NULL); + } + + return uniqueJobIds; +} + + +/* + * CancelAllTaskExecutors cancels task executors given in the hash map. + */ +static void +CancelAllTaskExecutors(HTAB *currentExecutors) +{ + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* get unique job id set for running tasks in currentExecutors */ + HTAB *uniqueJobIds = GetRunningUniqueJobIds(currentExecutors); + + HASH_SEQ_STATUS status; + int64 *uniqueJobId; + foreach_htab(uniqueJobId, &status, uniqueJobIds) + { + ereport(DEBUG1, (errmsg("cancelling job: %ld", *uniqueJobId))); + Datum jobidDatum = Int64GetDatum(*uniqueJobId); + DirectFunctionCall1(citus_job_cancel, jobidDatum); + } + + PopActiveSnapshot(); + CommitTransactionCommand(); +} + + /* * CitusBackgroundTaskQueueMonitorMain is the main entry point for the background worker * running the background tasks queue monitor. @@ -758,6 +900,18 @@ TaskEnded(TaskExecutionContext *taskExecutionContext) void CitusBackgroundTaskQueueMonitorMain(Datum arg) { + /* handle SIGTERM to properly terminate active task executors */ + pqsignal(SIGTERM, QueueMonitorSigTermHandler); + + /* handle SIGINT to properly cancel active task executors */ + pqsignal(SIGINT, QueueMonitorSigIntHandler); + + /* handle SIGHUP to update MaxBackgroundTaskExecutors */ + pqsignal(SIGHUP, QueueMonitorSigHupHandler); + + /* ready to handle signals */ + BackgroundWorkerUnblockSignals(); + Oid databaseOid = DatumGetObjectId(arg); /* extension owner is passed via bgw_extra */ @@ -765,8 +919,6 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) memcpy_s(&extensionOwner, sizeof(extensionOwner), MyBgworkerEntry->bgw_extra, sizeof(Oid)); - BackgroundWorkerUnblockSignals(); - /* connect to database, after that we can actually access catalogs */ BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0); @@ -807,10 +959,6 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) * cause conflicts on processing the tasks in the catalog table as well as violate * parallelism guarantees. To make sure there is at most, exactly one backend running * we take a session lock on the CITUS_BACKGROUND_TASK_MONITOR operation. - * - * TODO now that we have a lock, we should install a term handler to terminate any - * 'child' backend when we are terminated. Otherwise we will still have a situation - * where the actual task could be running multiple times. */ LOCKTAG tag = { 0 }; SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_BACKGROUND_TASK_MONITOR); @@ -841,11 +989,13 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) PopActiveSnapshot(); CommitTransactionCommand(); - /* create a map to store parallel task executors */ + /* create a map to store parallel task executors. Persist it in monitor memory context */ + oldContext = MemoryContextSwitchTo(backgroundTaskContext); HTAB *currentExecutors = CreateSimpleHashWithNameAndSize(int64, BackgroundExecutorHashEntry, "Background Executor Hash", MAX_BG_TASK_EXECUTORS); + MemoryContextSwitchTo(oldContext); /* * monitor execution context that is useful during the monitor loop. @@ -861,6 +1011,10 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) .ctx = backgroundTaskContext }; + /* flag to prevent duplicate termination and cancellation of task executors */ + bool terminateExecutorsStarted = false; + bool cancelExecutorsStarted = false; + /* loop exits if there is no running or runnable tasks left */ bool hasAnyTask = true; while (hasAnyTask) @@ -868,15 +1022,47 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) /* handle signals */ CHECK_FOR_INTERRUPTS(); + /* + * if the flag is set, we should terminate all task executor workers to prevent duplicate + * runs of the same task on the next start of the monitor, which is dangerous for non-idempotent + * tasks. We do not break the loop here as we want to reflect tasks' messages. Hence, we wait until + * all tasks finish and also do not allow new runnable tasks to start running. After all current tasks + * finish, we can exit the loop safely. + */ + if (GotSigterm && !terminateExecutorsStarted) + { + ereport(LOG, (errmsg("handling termination signal"))); + terminateExecutorsStarted = true; + TerminateAllTaskExecutors(queueMonitorExecutionContext.currentExecutors); + } + + if (GotSigint && !cancelExecutorsStarted) + { + ereport(LOG, (errmsg("handling cancellation signal"))); + cancelExecutorsStarted = true; + CancelAllTaskExecutors(queueMonitorExecutionContext.currentExecutors); + } + + if (GotSighup) + { + GotSighup = false; + + /* update max_background_task_executors if changed */ + ProcessConfigFile(PGC_SIGHUP); + } + /* invalidate cache for new data in catalog */ InvalidateMetadataSystemCache(); - /* assign runnable tasks, if any, to new task executors in a transaction */ - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - AssignRunnableTasks(&queueMonitorExecutionContext); - PopActiveSnapshot(); - CommitTransactionCommand(); + /* assign runnable tasks, if any, to new task executors in a transaction if we do not have SIGTERM or SIGINT */ + if (!MonitorGotTerminationOrCancellationRequest()) + { + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + AssignRunnableTasks(&queueMonitorExecutionContext); + PopActiveSnapshot(); + CommitTransactionCommand(); + } /* get running task entries from hash table */ List *runningTaskEntries = GetRunningTaskEntries( @@ -1268,7 +1454,8 @@ ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, bool *hadE * environment to the executor. */ static dsm_segment * -StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) +StoreArgumentsInDSM(char *database, char *username, char *command, + int64 taskId, int64 jobId) { /* * Create the shared memory that we will pass to the background @@ -1284,6 +1471,7 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) #define QUEUE_SIZE ((Size) 65536) shm_toc_estimate_chunk(&e, QUEUE_SIZE); shm_toc_estimate_chunk(&e, sizeof(int64)); + shm_toc_estimate_chunk(&e, sizeof(int64)); shm_toc_estimate_keys(&e, CITUS_BACKGROUND_TASK_NKEYS); Size segsize = shm_toc_estimate(&e); @@ -1330,6 +1518,10 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) *taskIdTarget = taskId; shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, taskIdTarget); + int64 *jobIdTarget = shm_toc_allocate(toc, sizeof(int64)); + *jobIdTarget = jobId; + shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_JOB_ID, jobIdTarget); + shm_mq_attach(mq, seg, NULL); return seg; @@ -1343,17 +1535,17 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) * pointer to the dynamic shared memory. */ static BackgroundWorkerHandle * -StartCitusBackgroundTaskExecutor(char *database, char *user, char *command, int64 taskId, - dsm_segment **pSegment) +StartCitusBackgroundTaskExecutor(char *database, char *user, char *command, + int64 taskId, int64 jobId, dsm_segment **pSegment) { - dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId); + dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId, jobId); /* Configure a worker. */ BackgroundWorker worker = { 0 }; memset(&worker, 0, sizeof(worker)); SafeSnprintf(worker.bgw_name, BGW_MAXLEN, - "Citus Background Task Queue Executor: %s/%s", - database, user); + "Citus Background Task Queue Executor: %s/%s for (%ld/%ld)", + database, user, jobId, taskId); worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; worker.bgw_start_time = BgWorkerStart_ConsistentState; @@ -1391,6 +1583,8 @@ typedef struct CitusBackgroundJobExecutorErrorCallbackContext { const char *database; const char *username; + int64 taskId; + int64 jobId; } CitusBackgroundJobExecutorErrorCallbackContext; @@ -1403,8 +1597,9 @@ CitusBackgroundJobExecutorErrorCallback(void *arg) { CitusBackgroundJobExecutorErrorCallbackContext *context = (CitusBackgroundJobExecutorErrorCallbackContext *) arg; - errcontext("Citus Background Task Queue Executor: %s/%s", context->database, - context->username); + errcontext("Citus Background Task Queue Executor: %s/%s for (%ld/%ld)", + context->database, context->username, + context->jobId, context->taskId); } @@ -1418,10 +1613,6 @@ CitusBackgroundJobExecutorErrorCallback(void *arg) void CitusBackgroundTaskExecutor(Datum main_arg) { - /* - * TODO figure out if we need this signal handler that is in pgcron - * pqsignal(SIGTERM, pg_cron_background_worker_sigterm); - */ BackgroundWorkerUnblockSignals(); /* Set up a dynamic shared memory segment. */ @@ -1445,6 +1636,7 @@ CitusBackgroundTaskExecutor(Datum main_arg) char *username = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_USERNAME, false); char *command = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_COMMAND, false); int64 *taskId = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, false); + int64 *jobId = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_JOB_ID, false); shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false); shm_mq_set_sender(mq, MyProc); @@ -1456,6 +1648,8 @@ CitusBackgroundTaskExecutor(Datum main_arg) CitusBackgroundJobExecutorErrorCallbackContext context = { .database = database, .username = username, + .taskId = *taskId, + .jobId = *jobId, }; errorCallback.callback = CitusBackgroundJobExecutorErrorCallback; errorCallback.arg = (void *) &context; @@ -1482,8 +1676,8 @@ CitusBackgroundTaskExecutor(Datum main_arg) /* Prepare to execute the query. */ SetCurrentStatementStartTimestamp(); debug_query_string = command; - char *appname = psprintf("citus background task queue executor (taskId %ld)", - *taskId); + char *appname = psprintf("citus background task queue executor (%ld/%ld)", + *jobId, *taskId); pgstat_report_appname(appname); pgstat_report_activity(STATE_RUNNING, command); StartTransactionCommand(); diff --git a/src/include/distributed/background_jobs.h b/src/include/distributed/background_jobs.h index a83928e82..75b7b982b 100644 --- a/src/include/distributed/background_jobs.h +++ b/src/include/distributed/background_jobs.h @@ -28,6 +28,7 @@ typedef struct BackgroundExecutorHashEntry BackgroundWorkerHandle *handle; dsm_segment *seg; + int64 jobid; StringInfo message; } BackgroundExecutorHashEntry; diff --git a/src/test/regress/expected/background_task_queue_monitor.out b/src/test/regress/expected/background_task_queue_monitor.out index 7278eebfc..2d528c50c 100644 --- a/src/test/regress/expected/background_task_queue_monitor.out +++ b/src/test/regress/expected/background_task_queue_monitor.out @@ -125,6 +125,12 @@ SELECT citus_job_cancel(:job_id); (1 row) +SELECT citus_job_wait(:job_id); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; state | did_start --------------------------------------------------------------------- @@ -242,6 +248,12 @@ SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the l (1 row) +SELECT citus_job_wait(:job_id2); -- wait for the job to be cancelled + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + SELECT citus_job_wait(:job_id3, desired_status => 'running'); citus_job_wait --------------------------------------------------------------------- @@ -278,12 +290,6 @@ SELECT citus_job_wait(:job_id1); (1 row) -SELECT citus_job_wait(:job_id2); - citus_job_wait ---------------------------------------------------------------------- - -(1 row) - SELECT citus_job_wait(:job_id3); citus_job_wait --------------------------------------------------------------------- @@ -302,11 +308,262 @@ SELECT job_id, task_id, status FROM pg_dist_background_task 9 | 15 | cancelled (5 rows) --- verify that task is not starved by currently long running task +-- verify that a task, previously not started due to lack of workers, is executed after we increase max worker count BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id3 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id4 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset +COMMIT; +SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that last task is not running but ready to run(runnable) + task_id | status +--------------------------------------------------------------------- + 16 | running + 17 | running + 18 | running + 19 | running + 20 | runnable +(5 rows) + +ALTER SYSTEM SET citus.max_background_task_executors TO 5; +SELECT pg_reload_conf(); -- the last runnable task will be running after change + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT citus_job_wait(:job_id3, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that last task is running + task_id | status +--------------------------------------------------------------------- + 16 | running + 17 | running + 18 | running + 19 | running + 20 | running +(5 rows) + +SELECT citus_job_cancel(:job_id1); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_cancel(:job_id2); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_cancel(:job_id3); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id1); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id3); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that all tasks are cancelled + task_id | status +--------------------------------------------------------------------- + 16 | cancelled + 17 | cancelled + 18 | cancelled + 19 | cancelled + 20 | cancelled +(5 rows) + +-- verify that upon termination signal, all tasks fail and retry policy sets their status back to runnable +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +COMMIT; +SELECT citus_job_wait(:job_id1, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; + task_id | status +--------------------------------------------------------------------- + 21 | running + 22 | running +(2 rows) + +SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset +SELECT pg_terminate_backend(:monitor_pid); -- terminate monitor process + pg_terminate_backend +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(2); -- wait enough to show that tasks are terminated + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status, retry_count, message FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are runnable by retry policy after termination signal + task_id | status | retry_count | message +--------------------------------------------------------------------- + 21 | runnable | 1 | FATAL: terminating background worker "Citus Background Task Queue Executor: regression/postgres for (13/21)" due to administrator command+ + | | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (13/21) + + | | | + 22 | runnable | 1 | FATAL: terminating background worker "Citus Background Task Queue Executor: regression/postgres for (14/22)" due to administrator command+ + | | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (14/22) + + | | | +(2 rows) + +SELECT citus_job_cancel(:job_id1); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_cancel(:job_id2); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id1); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are cancelled + task_id | status +--------------------------------------------------------------------- + 21 | cancelled + 22 | cancelled +(2 rows) + +-- verify that upon cancellation signal, all tasks are cancelled +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +COMMIT; +SELECT citus_job_wait(:job_id1, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; + task_id | status +--------------------------------------------------------------------- + 23 | running + 24 | running +(2 rows) + +SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset +SELECT pg_cancel_backend(:monitor_pid); -- cancel monitor process + pg_cancel_backend +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(2); -- wait enough to show that tasks are cancelled + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id1); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are cancelled + task_id | status +--------------------------------------------------------------------- + 23 | cancelled + 24 | cancelled +(2 rows) + +-- verify that task is not starved by currently long running task +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id2 \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset COMMIT; SELECT citus_job_wait(:job_id1, desired_status => 'running'); @@ -326,8 +583,8 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that last task is finished without starvation job_id | task_id | status --------------------------------------------------------------------- - 10 | 16 | running - 11 | 17 | done + 17 | 25 | running + 18 | 26 | done (2 rows) SELECT citus_job_cancel(:job_id1); @@ -336,6 +593,21 @@ SELECT citus_job_cancel(:job_id1); (1 row) +SELECT citus_job_wait(:job_id1); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT job_id, task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY job_id, task_id; -- show that task is cancelled + job_id | task_id | status +--------------------------------------------------------------------- + 17 | 25 | cancelled + 18 | 26 | done +(2 rows) + SET client_min_messages TO WARNING; DROP SCHEMA background_task_queue_monitor CASCADE; ALTER SYSTEM RESET citus.background_task_queue_interval; diff --git a/src/test/regress/sql/background_task_queue_monitor.sql b/src/test/regress/sql/background_task_queue_monitor.sql index 4319f2bf8..c04fe90d6 100644 --- a/src/test/regress/sql/background_task_queue_monitor.sql +++ b/src/test/regress/sql/background_task_queue_monitor.sql @@ -54,6 +54,7 @@ SELECT status, pid, retry_count, NOT(message = '') AS has_message, (not_before > -- test cancelling a failed/retrying job SELECT citus_job_cancel(:job_id); +SELECT citus_job_wait(:job_id); SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; @@ -110,6 +111,7 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable) SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the last runnable task will be running +SELECT citus_job_wait(:job_id2); -- wait for the job to be cancelled SELECT citus_job_wait(:job_id3, desired_status => 'running'); SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) @@ -118,18 +120,115 @@ SELECT job_id, task_id, status FROM pg_dist_background_task SELECT citus_job_cancel(:job_id1); SELECT citus_job_cancel(:job_id3); SELECT citus_job_wait(:job_id1); -SELECT citus_job_wait(:job_id2); SELECT citus_job_wait(:job_id3); SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) ORDER BY job_id, task_id; -- show that multiple cancels worked --- verify that task is not starved by currently long running task +-- verify that a task, previously not started due to lack of workers, is executed after we increase max worker count BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id3 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id4 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset +COMMIT; + +SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that last task is not running but ready to run(runnable) + +ALTER SYSTEM SET citus.max_background_task_executors TO 5; +SELECT pg_reload_conf(); -- the last runnable task will be running after change +SELECT citus_job_wait(:job_id3, desired_status => 'running'); +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that last task is running + +SELECT citus_job_cancel(:job_id1); +SELECT citus_job_cancel(:job_id2); +SELECT citus_job_cancel(:job_id3); +SELECT citus_job_wait(:job_id1); +SELECT citus_job_wait(:job_id2); +SELECT citus_job_wait(:job_id3); + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that all tasks are cancelled + +-- verify that upon termination signal, all tasks fail and retry policy sets their status back to runnable +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +COMMIT; + +SELECT citus_job_wait(:job_id1, desired_status => 'running'); +SELECT citus_job_wait(:job_id2, desired_status => 'running'); + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; + + +SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset +SELECT pg_terminate_backend(:monitor_pid); -- terminate monitor process + +SELECT pg_sleep(2); -- wait enough to show that tasks are terminated + +SELECT task_id, status, retry_count, message FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are runnable by retry policy after termination signal + +SELECT citus_job_cancel(:job_id1); +SELECT citus_job_cancel(:job_id2); +SELECT citus_job_wait(:job_id1); +SELECT citus_job_wait(:job_id2); + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are cancelled + +-- verify that upon cancellation signal, all tasks are cancelled +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +COMMIT; + +SELECT citus_job_wait(:job_id1, desired_status => 'running'); +SELECT citus_job_wait(:job_id2, desired_status => 'running'); + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; + + +SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset +SELECT pg_cancel_backend(:monitor_pid); -- cancel monitor process + +SELECT pg_sleep(2); -- wait enough to show that tasks are cancelled + +SELECT citus_job_wait(:job_id1); +SELECT citus_job_wait(:job_id2); + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are cancelled + +-- verify that task is not starved by currently long running task +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id2 \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset COMMIT; @@ -139,6 +238,11 @@ SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2) ORDER BY job_id, task_id; -- show that last task is finished without starvation SELECT citus_job_cancel(:job_id1); +SELECT citus_job_wait(:job_id1); + +SELECT job_id, task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY job_id, task_id; -- show that task is cancelled SET client_min_messages TO WARNING; DROP SCHEMA background_task_queue_monitor CASCADE;