diff --git a/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql b/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql index 1c059dac5..1d0a01f81 100644 --- a/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql +++ b/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql @@ -10,3 +10,4 @@ DROP FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer) #include "udfs/citus_is_clock_after/11.2-1.sql" #include "udfs/citus_internal_adjust_local_clock_to_remote/11.2-1.sql" #include "udfs/worker_split_shard_replication_setup/11.2-1.sql" +#include "udfs/citus_task_wait/11.2-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql b/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql index 6031c2fd0..a7eac668e 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql @@ -22,3 +22,4 @@ COMMENT ON FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, in IS 'append a regular table''s contents to the shard'; #include "../udfs/worker_split_shard_replication_setup/11.1-1.sql" +DROP FUNCTION pg_catalog.citus_task_wait(bigint, pg_catalog.citus_task_status); diff --git a/src/backend/distributed/sql/udfs/citus_task_wait/11.2-1.sql b/src/backend/distributed/sql/udfs/citus_task_wait/11.2-1.sql new file mode 100644 index 000000000..68b685ac0 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_task_wait/11.2-1.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.citus_task_wait(taskid bigint, desired_status pg_catalog.citus_task_status DEFAULT NULL) + RETURNS VOID + LANGUAGE C + AS 'MODULE_PATHNAME',$$citus_task_wait$$; +COMMENT ON FUNCTION pg_catalog.citus_task_wait(taskid bigint, desired_status pg_catalog.citus_task_status) + IS 'blocks till the task identified by taskid is at the specified status, or reached a terminal status. Only waits for terminal status when no desired_status was specified. The return value indicates if the desired status was reached or not. When no desired status was specified it will assume any terminal status was desired'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_task_wait(taskid bigint, desired_status pg_catalog.citus_task_status) TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_task_wait/latest.sql b/src/backend/distributed/sql/udfs/citus_task_wait/latest.sql new file mode 100644 index 000000000..68b685ac0 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_task_wait/latest.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.citus_task_wait(taskid bigint, desired_status pg_catalog.citus_task_status DEFAULT NULL) + RETURNS VOID + LANGUAGE C + AS 'MODULE_PATHNAME',$$citus_task_wait$$; +COMMENT ON FUNCTION pg_catalog.citus_task_wait(taskid bigint, desired_status pg_catalog.citus_task_status) + IS 'blocks till the task identified by taskid is at the specified status, or reached a terminal status. Only waits for terminal status when no desired_status was specified. The return value indicates if the desired status was reached or not. When no desired status was specified it will assume any terminal status was desired'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_task_wait(taskid bigint, desired_status pg_catalog.citus_task_status) TO PUBLIC; diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 961acc356..d7a5a31bd 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -123,6 +123,7 @@ static volatile sig_atomic_t GotSighup = false; PG_FUNCTION_INFO_V1(citus_job_cancel); PG_FUNCTION_INFO_V1(citus_job_wait); +PG_FUNCTION_INFO_V1(citus_task_wait); /* @@ -203,7 +204,41 @@ citus_job_wait(PG_FUNCTION_ARGS) /* - * citus_job_wait_internal imaplements the waiting on a job for reuse in other areas where + * pg_catalog.citus_task_wait(taskid bigint, + * desired_status citus_task_status DEFAULT NULL) boolean + * waits till a task reaches a desired status, or can't reach the status anymore because + * it reached a (different) terminal state. When no desired_status is given it will + * assume any terminal state as its desired status. The function returns if the + * desired_state was reached. + * + * The current implementation is a polling implementation with an interval of 1 second. + * Ideally we would have some synchronization between the background tasks queue monitor + * and any backend calling this function to receive a signal when the task changes state. + */ +Datum +citus_task_wait(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + int64 taskid = PG_GETARG_INT64(0); + + /* parse the optional desired_status argument */ + bool hasDesiredStatus = !PG_ARGISNULL(1); + BackgroundTaskStatus desiredStatus = { 0 }; + if (hasDesiredStatus) + { + desiredStatus = BackgroundTaskStatusByOid(PG_GETARG_OID(1)); + } + + citus_task_wait_internal(taskid, hasDesiredStatus ? &desiredStatus : NULL); + + PG_RETURN_VOID(); +} + + +/* + * citus_job_wait_internal implements the waiting on a job for reuse in other areas where * we want to wait on jobs. eg the background rebalancer. * * When a desiredStatus is provided it will provide an error when a different state is @@ -285,6 +320,89 @@ citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus) } +/* + * citus_task_wait_internal implements the waiting on a task for reuse in other areas where + * we want to wait on tasks. + * + * When a desiredStatus is provided it will provide an error when a different state is + * reached and the state cannot ever reach the desired state anymore. + */ +void +citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus) +{ + /* + * Since we are wait polling we will actually allocate memory on every poll. To make + * sure we don't put unneeded pressure on the memory we create a context that we clear + * every iteration. + */ + MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext, + "TasksWaitContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContext oldContext = MemoryContextSwitchTo(waitContext); + + while (true) + { + MemoryContextReset(waitContext); + + BackgroundTask *task = GetBackgroundTaskByTaskId(taskid); + if (!task) + { + ereport(ERROR, (errmsg("no task found with taskid: %ld", taskid))); + } + + if (desiredStatus && task->status == *desiredStatus) + { + /* task has reached its desired status, done waiting */ + break; + } + + if (IsBackgroundTaskStatusTerminal(task->status)) + { + if (desiredStatus) + { + /* + * We have reached a terminal state, which is not the desired state we + * were waiting for, otherwise we would have escaped earlier. Since it is + * a terminal state we know that we can never reach the desired state. + */ + + Oid reachedStatusOid = BackgroundTaskStatusOid(task->status); + Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out, + reachedStatusOid); + char *reachedStatusName = DatumGetCString(reachedStatusNameDatum); + + Oid desiredStatusOid = BackgroundTaskStatusOid(*desiredStatus); + Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, + desiredStatusOid); + char *desiredStatusName = DatumGetCString(desiredStatusNameDatum); + + ereport(ERROR, + (errmsg("Task reached terminal state \"%s\" instead of desired " + "state \"%s\"", reachedStatusName, desiredStatusName))); + } + + /* task has reached its terminal state, done waiting */ + break; + } + + /* sleep for a while, before rechecking the task status */ + CHECK_FOR_INTERRUPTS(); + const long delay_ms = 1000; + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + delay_ms, + WAIT_EVENT_PG_SLEEP); + + ResetLatch(MyLatch); + } + + MemoryContextSwitchTo(oldContext); + MemoryContextDelete(waitContext); +} + + /* * StartCitusBackgroundTaskQueueMonitor spawns a new background worker connected to the * current database and owner. This background worker consumes the tasks that are ready @@ -745,8 +863,7 @@ TaskEnded(TaskExecutionContext *taskExecutionContext) hash_search(currentExecutors, &task->taskid, HASH_REMOVE, NULL); - TerminateBackgroundWorker(handleEntry->handle); - dsm_detach(handleEntry->seg); + WaitForBackgroundWorkerShutdown(handleEntry->handle); queueMonitorExecutionContext->currentExecutorCount--; } @@ -1022,6 +1139,9 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) /* handle signals */ CHECK_FOR_INTERRUPTS(); + /* invalidate cache for new data in catalog */ + InvalidateMetadataSystemCache(); + /* * if the flag is set, we should terminate all task executor workers to prevent duplicate * runs of the same task on the next start of the monitor, which is dangerous for non-idempotent @@ -1051,9 +1171,6 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) ProcessConfigFile(PGC_SIGHUP); } - /* invalidate cache for new data in catalog */ - InvalidateMetadataSystemCache(); - /* assign runnable tasks, if any, to new task executors in a transaction if we do not have SIGTERM or SIGINT */ if (!MonitorGotTerminationOrCancellationRequest()) { @@ -1485,13 +1602,6 @@ StoreArgumentsInDSM(char *database, char *username, char *command, return NULL; } - /* - * when we have CurrentResourceOwner != NULL, segment will be released upon CurrentResourceOwner release, - * but we may consume the queue in segment even after CurrentResourceOwner released. 'dsm_pin_mapping' helps - * persisting the segment until the session ends or the segment is detached explicitly by 'dsm_detach'. - */ - dsm_pin_mapping(seg); - shm_toc *toc = shm_toc_create(CITUS_BACKGROUND_TASK_MAGIC, dsm_segment_address(seg), segsize); @@ -1524,6 +1634,13 @@ StoreArgumentsInDSM(char *database, char *username, char *command, shm_mq_attach(mq, seg, NULL); + /* + * when we have CurrentResourceOwner != NULL, segment will be released upon CurrentResourceOwner release, + * but we may consume the queue in segment even after CurrentResourceOwner released. 'dsm_pin_mapping' helps + * persisting the segment until the session ends or the segment is detached explicitly by 'dsm_detach'. + */ + dsm_pin_mapping(seg); + return seg; } @@ -1675,31 +1792,10 @@ CitusBackgroundTaskExecutor(Datum main_arg) "executing this task"))); } - /* Prepare to execute the query. */ - SetCurrentStatementStartTimestamp(); - debug_query_string = command; - char *appname = psprintf("citus background task queue executor (%ld/%ld)", - *jobId, *taskId); - pgstat_report_appname(appname); - pgstat_report_activity(STATE_RUNNING, command); - StartTransactionCommand(); - if (StatementTimeout > 0) - { - enable_timeout_after(STATEMENT_TIMEOUT, StatementTimeout); - } - else - { - disable_timeout(STATEMENT_TIMEOUT, false); - } - /* Execute the query. */ + StartTransactionCommand(); ExecuteSqlString(command); - - /* Post-execution cleanup. */ - disable_timeout(STATEMENT_TIMEOUT, false); CommitTransactionCommand(); - pgstat_report_activity(STATE_IDLE, command); - pgstat_report_stat(true); /* Signal that we are done. */ ReadyForQuery(DestRemote); diff --git a/src/include/distributed/background_jobs.h b/src/include/distributed/background_jobs.h index 75b7b982b..3a14b6207 100644 --- a/src/include/distributed/background_jobs.h +++ b/src/include/distributed/background_jobs.h @@ -92,6 +92,8 @@ extern void CitusBackgroundTaskExecutor(Datum main_arg); extern Datum citus_job_cancel(PG_FUNCTION_ARGS); extern Datum citus_job_wait(PG_FUNCTION_ARGS); +extern Datum citus_task_wait(PG_FUNCTION_ARGS); extern void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus); +extern void citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus); #endif /*CITUS_BACKGROUND_JOBS_H */ diff --git a/src/test/regress/expected/background_task_queue_monitor.out b/src/test/regress/expected/background_task_queue_monitor.out index ebaa2148c..9435af3d3 100644 --- a/src/test/regress/expected/background_task_queue_monitor.out +++ b/src/test/regress/expected/background_task_queue_monitor.out @@ -3,6 +3,33 @@ SET search_path TO background_task_queue_monitor; SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 3536400; +-- reset sequence values +ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 1450000; +ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1450000; +-- create helper procedure to wait until given task is retried or timeout occurs +CREATE OR REPLACE PROCEDURE citus_wait_task_until_retried(taskid bigint) +LANGUAGE plpgsql +AS $$ +DECLARE + retried boolean := false; + loop_count int := 0; +BEGIN + WHILE retried = false and loop_count < 20 + LOOP + SELECT (retry_count IS NOT NULL) INTO retried FROM pg_dist_background_task WHERE task_id = taskid; + PERFORM pg_sleep(1); + loop_count := loop_count + 1; + COMMIT; + END LOOP; + + -- timeout if the task is not retried in 20 sec + IF retried = false + THEN + RAISE WARNING 'Timeout while waiting for retrying task:%', taskid; + END IF; +END; +$$; +-- ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; SELECT pg_reload_conf(); pg_reload_conf @@ -11,6 +38,7 @@ SELECT pg_reload_conf(); (1 row) CREATE TABLE results (a int); +-- TEST1 -- simple job that inserts 1 into results to show that query runs SELECT a FROM results WHERE a = 1; -- verify result is not in there a @@ -31,37 +59,7 @@ SELECT a FROM results WHERE a = 1; -- verify result is there 1 (1 row) --- cancel a scheduled job -INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancel a scheduled job') RETURNING job_id \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset -SELECT citus_job_cancel(:job_id); - citus_job_cancel ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_job_wait(:job_id); - citus_job_wait ---------------------------------------------------------------------- - -(1 row) - --- verify we get an error when waiting for a job to reach a specific status while it is already in a different terminal status -SELECT citus_job_wait(:job_id, desired_status => 'finished'); -ERROR: Job reached terminal state "cancelled" instead of desired state "finished" --- show that the status has been cancelled -SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; - state | did_start ---------------------------------------------------------------------- - cancelled | f -(1 row) - -SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; - status | did_start ---------------------------------------------------------------------- - cancelled | f -(1 row) - +-- TEST2 -- cancel a running job INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancelling a task after it started') RETURNING job_id \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset @@ -96,6 +94,7 @@ SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE cancelled | t (1 row) +-- TEST3 -- test a failing task becomes runnable in the future again -- we cannot fully test the backoff strategy currently as it is hard coded to take about 50 minutes. INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'failure test due to division by zero') RETURNING job_id \gset @@ -106,12 +105,7 @@ SELECT citus_job_wait(:job_id, desired_status => 'running'); (1 row) -SELECT pg_sleep(.1); -- make sure it has time to error after it started running - pg_sleep ---------------------------------------------------------------------- - -(1 row) - +CALL citus_wait_task_until_retried(:task_id); -- shows that we increased retry count for task after failure SELECT status, pid, retry_count, NOT(message = '') AS has_message, (not_before > now()) AS scheduled_into_the_future FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; status | pid | retry_count | has_message | scheduled_into_the_future --------------------------------------------------------------------- @@ -143,7 +137,8 @@ SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE cancelled | t (1 row) --- test running two dependant tasks +-- TEST4 +-- test running two dependent tasks TRUNCATE TABLE results; BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset @@ -165,7 +160,8 @@ SELECT a FROM results; 48 (1 row) --- test running two dependant tasks, with a failing task that we cancel +-- TEST5 +-- test running two dependent tasks, with a failing task that we cancel TRUNCATE TABLE results; BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset @@ -181,12 +177,7 @@ SELECT citus_job_wait(:job_id, desired_status => 'running'); -- wait for the job (1 row) -SELECT pg_sleep(.1); -- improve chances of hitting the failure - pg_sleep ---------------------------------------------------------------------- - -(1 row) - +CALL citus_wait_task_until_retried(:task_id2); -- shows that we increased retry count for task after failure SELECT citus_job_cancel(:job_id); citus_job_cancel --------------------------------------------------------------------- @@ -213,6 +204,7 @@ SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE cancelled | f (3 rows) +-- TEST6 -- verify that we do not allow parallel task executors more than citus.max_background_task_executors(4 by default) BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset @@ -224,8 +216,32 @@ INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SE INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset COMMIT; -SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit - pg_sleep +SELECT citus_task_wait(:task_id1, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id2, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id3, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id4, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id5, desired_status => 'runnable'); + citus_task_wait --------------------------------------------------------------------- (1 row) @@ -233,13 +249,13 @@ SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in runn SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable) - job_id | task_id | status + job_id | task_id | status --------------------------------------------------------------------- - 7 | 11 | running - 7 | 12 | running - 7 | 13 | running - 8 | 14 | running - 9 | 15 | runnable + 1450005 | 1450009 | running + 1450005 | 1450010 | running + 1450005 | 1450011 | running + 1450006 | 1450012 | running + 1450007 | 1450013 | runnable (5 rows) SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the last runnable task will be running @@ -263,13 +279,13 @@ SELECT citus_job_wait(:job_id3, desired_status => 'running'); SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) ORDER BY job_id, task_id; -- show that last task is running - job_id | task_id | status + job_id | task_id | status --------------------------------------------------------------------- - 7 | 11 | running - 7 | 12 | running - 7 | 13 | running - 8 | 14 | cancelled - 9 | 15 | running + 1450005 | 1450009 | running + 1450005 | 1450010 | running + 1450005 | 1450011 | running + 1450006 | 1450012 | cancelled + 1450007 | 1450013 | running (5 rows) SELECT citus_job_cancel(:job_id1); @@ -299,15 +315,16 @@ SELECT citus_job_wait(:job_id3); SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) ORDER BY job_id, task_id; -- show that multiple cancels worked - job_id | task_id | status + job_id | task_id | status --------------------------------------------------------------------- - 7 | 11 | cancelled - 7 | 12 | cancelled - 7 | 13 | cancelled - 8 | 14 | cancelled - 9 | 15 | cancelled + 1450005 | 1450009 | cancelled + 1450005 | 1450010 | cancelled + 1450005 | 1450011 | cancelled + 1450006 | 1450012 | cancelled + 1450007 | 1450013 | cancelled (5 rows) +-- TEST7 -- verify that a task, previously not started due to lack of workers, is executed after we increase max worker count BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset @@ -319,8 +336,32 @@ INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SE INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset COMMIT; -SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit - pg_sleep +SELECT citus_task_wait(:task_id1, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id2, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id3, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id4, desired_status => 'running'); + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id5, desired_status => 'runnable'); + citus_task_wait --------------------------------------------------------------------- (1 row) @@ -330,11 +371,11 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; -- show that last task is not running but ready to run(runnable) task_id | status --------------------------------------------------------------------- - 16 | running - 17 | running - 18 | running - 19 | running - 20 | runnable + 1450014 | running + 1450015 | running + 1450016 | running + 1450017 | running + 1450018 | runnable (5 rows) ALTER SYSTEM SET citus.max_background_task_executors TO 5; @@ -355,11 +396,11 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; -- show that last task is running task_id | status --------------------------------------------------------------------- - 16 | running - 17 | running - 18 | running - 19 | running - 20 | running + 1450014 | running + 1450015 | running + 1450016 | running + 1450017 | running + 1450018 | running (5 rows) SELECT citus_job_cancel(:job_id1); @@ -403,19 +444,20 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; -- show that all tasks are cancelled task_id | status --------------------------------------------------------------------- - 16 | cancelled - 17 | cancelled - 18 | cancelled - 19 | cancelled - 20 | cancelled + 1450014 | cancelled + 1450015 | cancelled + 1450016 | cancelled + 1450017 | cancelled + 1450018 | cancelled (5 rows) +-- TEST8 -- verify that upon termination signal, all tasks fail and retry policy sets their status back to runnable BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id2 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset COMMIT; SELECT citus_job_wait(:job_id1, desired_status => 'running'); citus_job_wait @@ -434,8 +476,8 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; task_id | status --------------------------------------------------------------------- - 21 | running - 22 | running + 1450019 | running + 1450020 | running (2 rows) SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset @@ -445,22 +487,18 @@ SELECT pg_terminate_backend(:monitor_pid); -- terminate monitor process t (1 row) -SELECT pg_sleep(2); -- wait enough to show that tasks are terminated - pg_sleep ---------------------------------------------------------------------- - -(1 row) - +CALL citus_wait_task_until_retried(:task_id1); -- shows that we increased retry count for task after failure +CALL citus_wait_task_until_retried(:task_id2); -- shows that we increased retry count for task after failure SELECT task_id, status, retry_count, message FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2) ORDER BY task_id; -- show that all tasks are runnable by retry policy after termination signal - task_id | status | retry_count | message + task_id | status | retry_count | message --------------------------------------------------------------------- - 21 | runnable | 1 | FATAL: terminating connection due to administrator command + - | | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (13/21)+ + 1450019 | runnable | 1 | FATAL: terminating connection due to administrator command + + | | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (1450011/1450019) + | | | - 22 | runnable | 1 | FATAL: terminating connection due to administrator command + - | | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (14/22)+ + 1450020 | runnable | 1 | FATAL: terminating connection due to administrator command + + | | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (1450012/1450020) + | | | (2 rows) @@ -493,16 +531,17 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; -- show that all tasks are cancelled task_id | status --------------------------------------------------------------------- - 21 | cancelled - 22 | cancelled + 1450019 | cancelled + 1450020 | cancelled (2 rows) +-- TEST9 -- verify that upon cancellation signal, all tasks are cancelled BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id2 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset COMMIT; SELECT citus_job_wait(:job_id1, desired_status => 'running'); citus_job_wait @@ -521,8 +560,8 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; task_id | status --------------------------------------------------------------------- - 23 | running - 24 | running + 1450021 | running + 1450022 | running (2 rows) SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset @@ -532,8 +571,14 @@ SELECT pg_cancel_backend(:monitor_pid); -- cancel monitor process t (1 row) -SELECT pg_sleep(2); -- wait enough to show that tasks are cancelled - pg_sleep +SELECT citus_task_wait(:task_id1, desired_status => 'cancelled'); -- shows that we cancelled the task + citus_task_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_task_wait(:task_id2, desired_status => 'cancelled'); -- shows that we cancelled the task + citus_task_wait --------------------------------------------------------------------- (1 row) @@ -555,14 +600,15 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; -- show that all tasks are cancelled task_id | status --------------------------------------------------------------------- - 23 | cancelled - 24 | cancelled + 1450021 | cancelled + 1450022 | cancelled (2 rows) +-- TEST10 -- verify that task is not starved by currently long running task BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id2 \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset COMMIT; @@ -581,10 +627,10 @@ SELECT citus_job_wait(:job_id2, desired_status => 'finished'); SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2) ORDER BY job_id, task_id; -- show that last task is finished without starvation - job_id | task_id | status + job_id | task_id | status --------------------------------------------------------------------- - 17 | 25 | running - 18 | 26 | done + 1450015 | 1450023 | running + 1450016 | 1450024 | done (2 rows) SELECT citus_job_cancel(:job_id1); @@ -602,15 +648,19 @@ SELECT citus_job_wait(:job_id1); SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2) ORDER BY job_id, task_id; -- show that task is cancelled - job_id | task_id | status + job_id | task_id | status --------------------------------------------------------------------- - 17 | 25 | cancelled - 18 | 26 | done + 1450015 | 1450023 | cancelled + 1450016 | 1450024 | done (2 rows) SET client_min_messages TO WARNING; +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; DROP SCHEMA background_task_queue_monitor CASCADE; ALTER SYSTEM RESET citus.background_task_queue_interval; +ALTER SYSTEM RESET citus.max_background_task_executors; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index a03422e40..a171aed69 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1208,6 +1208,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_get_transaction_clock() cluster_clock | function citus_internal_adjust_local_clock_to_remote(cluster_clock) void | function citus_is_clock_after(cluster_clock,cluster_clock) boolean + | function citus_task_wait(bigint,citus_task_status) void | function cluster_clock_cmp(cluster_clock,cluster_clock) integer | function cluster_clock_eq(cluster_clock,cluster_clock) boolean | function cluster_clock_ge(cluster_clock,cluster_clock) boolean @@ -1232,7 +1233,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | operator family cluster_clock_ops for access method btree | sequence pg_dist_clock_logical_seq | type cluster_clock -(31 rows) +(32 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 88d07b38d..930da193b 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -117,6 +117,7 @@ ORDER BY 1; function citus_stat_statements_reset() function citus_table_is_visible(oid) function citus_table_size(regclass) + function citus_task_wait(bigint,citus_task_status) function citus_text_send_as_jsonb(text) function citus_total_relation_size(regclass,boolean) function citus_truncate_trigger() @@ -311,5 +312,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(303 rows) +(304 rows) diff --git a/src/test/regress/sql/background_task_queue_monitor.sql b/src/test/regress/sql/background_task_queue_monitor.sql index c04fe90d6..04bb898db 100644 --- a/src/test/regress/sql/background_task_queue_monitor.sql +++ b/src/test/regress/sql/background_task_queue_monitor.sql @@ -4,11 +4,41 @@ SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 3536400; +-- reset sequence values +ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 1450000; +ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1450000; + +-- create helper procedure to wait until given task is retried or timeout occurs +CREATE OR REPLACE PROCEDURE citus_wait_task_until_retried(taskid bigint) +LANGUAGE plpgsql +AS $$ +DECLARE + retried boolean := false; + loop_count int := 0; +BEGIN + WHILE retried = false and loop_count < 20 + LOOP + SELECT (retry_count IS NOT NULL) INTO retried FROM pg_dist_background_task WHERE task_id = taskid; + PERFORM pg_sleep(1); + loop_count := loop_count + 1; + COMMIT; + END LOOP; + + -- timeout if the task is not retried in 20 sec + IF retried = false + THEN + RAISE WARNING 'Timeout while waiting for retrying task:%', taskid; + END IF; +END; +$$; +-- + ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; SELECT pg_reload_conf(); CREATE TABLE results (a int); +-- TEST1 -- simple job that inserts 1 into results to show that query runs SELECT a FROM results WHERE a = 1; -- verify result is not in there INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset @@ -16,20 +46,7 @@ INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ INS SELECT citus_job_wait(:job_id); -- wait for the job to be finished SELECT a FROM results WHERE a = 1; -- verify result is there --- cancel a scheduled job -INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancel a scheduled job') RETURNING job_id \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset - -SELECT citus_job_cancel(:job_id); -SELECT citus_job_wait(:job_id); - --- verify we get an error when waiting for a job to reach a specific status while it is already in a different terminal status -SELECT citus_job_wait(:job_id, desired_status => 'finished'); - --- show that the status has been cancelled -SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; -SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; - +-- TEST2 -- cancel a running job INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancelling a task after it started') RETURNING job_id \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset @@ -42,14 +59,15 @@ SELECT citus_job_wait(:job_id); SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; +-- TEST3 -- test a failing task becomes runnable in the future again -- we cannot fully test the backoff strategy currently as it is hard coded to take about 50 minutes. INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'failure test due to division by zero') RETURNING job_id \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT 1/0; $job$) RETURNING task_id \gset SELECT citus_job_wait(:job_id, desired_status => 'running'); -SELECT pg_sleep(.1); -- make sure it has time to error after it started running +CALL citus_wait_task_until_retried(:task_id); -- shows that we increased retry count for task after failure SELECT status, pid, retry_count, NOT(message = '') AS has_message, (not_before > now()) AS scheduled_into_the_future FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; -- test cancelling a failed/retrying job @@ -59,7 +77,9 @@ SELECT citus_job_wait(:job_id); SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; --- test running two dependant tasks + +-- TEST4 +-- test running two dependent tasks TRUNCATE TABLE results; BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset @@ -73,7 +93,8 @@ COMMIT; SELECT citus_job_wait(:job_id); -- wait for the job to be finished SELECT a FROM results; --- test running two dependant tasks, with a failing task that we cancel +-- TEST5 +-- test running two dependent tasks, with a failing task that we cancel TRUNCATE TABLE results; BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset @@ -85,13 +106,14 @@ INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES COMMIT; SELECT citus_job_wait(:job_id, desired_status => 'running'); -- wait for the job to be running, and possibly hitting a failure -SELECT pg_sleep(.1); -- improve chances of hitting the failure +CALL citus_wait_task_until_retried(:task_id2); -- shows that we increased retry count for task after failure SELECT citus_job_cancel(:job_id); SELECT citus_job_wait(:job_id); -- wait for the job to be cancelled SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; +-- TEST6 -- verify that we do not allow parallel task executors more than citus.max_background_task_executors(4 by default) BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset @@ -104,7 +126,11 @@ INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', ' INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset COMMIT; -SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit +SELECT citus_task_wait(:task_id1, desired_status => 'running'); +SELECT citus_task_wait(:task_id2, desired_status => 'running'); +SELECT citus_task_wait(:task_id3, desired_status => 'running'); +SELECT citus_task_wait(:task_id4, desired_status => 'running'); +SELECT citus_task_wait(:task_id5, desired_status => 'runnable'); SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) @@ -125,7 +151,7 @@ SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) ORDER BY job_id, task_id; -- show that multiple cancels worked - +-- TEST7 -- verify that a task, previously not started due to lack of workers, is executed after we increase max worker count BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset @@ -138,7 +164,11 @@ INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', ' INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset COMMIT; -SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit +SELECT citus_task_wait(:task_id1, desired_status => 'running'); +SELECT citus_task_wait(:task_id2, desired_status => 'running'); +SELECT citus_task_wait(:task_id3, desired_status => 'running'); +SELECT citus_task_wait(:task_id4, desired_status => 'running'); +SELECT citus_task_wait(:task_id5, desired_status => 'runnable'); SELECT task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) @@ -162,12 +192,13 @@ SELECT task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) ORDER BY task_id; -- show that all tasks are cancelled +-- TEST8 -- verify that upon termination signal, all tasks fail and retry policy sets their status back to runnable BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id2 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset COMMIT; SELECT citus_job_wait(:job_id1, desired_status => 'running'); @@ -181,7 +212,8 @@ SELECT task_id, status FROM pg_dist_background_task SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset SELECT pg_terminate_backend(:monitor_pid); -- terminate monitor process -SELECT pg_sleep(2); -- wait enough to show that tasks are terminated +CALL citus_wait_task_until_retried(:task_id1); -- shows that we increased retry count for task after failure +CALL citus_wait_task_until_retried(:task_id2); -- shows that we increased retry count for task after failure SELECT task_id, status, retry_count, message FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2) @@ -196,12 +228,13 @@ SELECT task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2) ORDER BY task_id; -- show that all tasks are cancelled +-- TEST9 -- verify that upon cancellation signal, all tasks are cancelled BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id2 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset COMMIT; SELECT citus_job_wait(:job_id1, desired_status => 'running'); @@ -215,7 +248,8 @@ SELECT task_id, status FROM pg_dist_background_task SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset SELECT pg_cancel_backend(:monitor_pid); -- cancel monitor process -SELECT pg_sleep(2); -- wait enough to show that tasks are cancelled +SELECT citus_task_wait(:task_id1, desired_status => 'cancelled'); -- shows that we cancelled the task +SELECT citus_task_wait(:task_id2, desired_status => 'cancelled'); -- shows that we cancelled the task SELECT citus_job_wait(:job_id1); SELECT citus_job_wait(:job_id2); @@ -224,10 +258,11 @@ SELECT task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2) ORDER BY task_id; -- show that all tasks are cancelled +-- TEST10 -- verify that task is not starved by currently long running task BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id2 \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset COMMIT; @@ -245,7 +280,11 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that task is cancelled SET client_min_messages TO WARNING; +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; DROP SCHEMA background_task_queue_monitor CASCADE; ALTER SYSTEM RESET citus.background_task_queue_interval; +ALTER SYSTEM RESET citus.max_background_task_executors; SELECT pg_reload_conf();