add citus_task_wait udf to wait on desired task status (#6475)

We already have citus_job_wait to wait until the job reaches the desired
state. That PR adds waiting on task state to allow more granular
waiting. It can be used for Citus operations. Moreover, it is also
useful for testing purposes. (wait until a task reaches specified state)

Related to #6459.
pull/6555/head
aykut-bozkurt 2022-12-12 22:41:03 +03:00 committed by GitHub
parent 80686907a3
commit 1ad1a0a336
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 386 additions and 179 deletions

View File

@ -10,3 +10,4 @@ DROP FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer)
#include "udfs/citus_is_clock_after/11.2-1.sql"
#include "udfs/citus_internal_adjust_local_clock_to_remote/11.2-1.sql"
#include "udfs/worker_split_shard_replication_setup/11.2-1.sql"
#include "udfs/citus_task_wait/11.2-1.sql"

View File

@ -22,3 +22,4 @@ COMMENT ON FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, in
IS 'append a regular table''s contents to the shard';
#include "../udfs/worker_split_shard_replication_setup/11.1-1.sql"
DROP FUNCTION pg_catalog.citus_task_wait(bigint, pg_catalog.citus_task_status);

View File

@ -0,0 +1,8 @@
CREATE FUNCTION pg_catalog.citus_task_wait(taskid bigint, desired_status pg_catalog.citus_task_status DEFAULT NULL)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME',$$citus_task_wait$$;
COMMENT ON FUNCTION pg_catalog.citus_task_wait(taskid bigint, desired_status pg_catalog.citus_task_status)
IS 'blocks till the task identified by taskid is at the specified status, or reached a terminal status. Only waits for terminal status when no desired_status was specified. The return value indicates if the desired status was reached or not. When no desired status was specified it will assume any terminal status was desired';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_task_wait(taskid bigint, desired_status pg_catalog.citus_task_status) TO PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE FUNCTION pg_catalog.citus_task_wait(taskid bigint, desired_status pg_catalog.citus_task_status DEFAULT NULL)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME',$$citus_task_wait$$;
COMMENT ON FUNCTION pg_catalog.citus_task_wait(taskid bigint, desired_status pg_catalog.citus_task_status)
IS 'blocks till the task identified by taskid is at the specified status, or reached a terminal status. Only waits for terminal status when no desired_status was specified. The return value indicates if the desired status was reached or not. When no desired status was specified it will assume any terminal status was desired';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_task_wait(taskid bigint, desired_status pg_catalog.citus_task_status) TO PUBLIC;

View File

@ -123,6 +123,7 @@ static volatile sig_atomic_t GotSighup = false;
PG_FUNCTION_INFO_V1(citus_job_cancel);
PG_FUNCTION_INFO_V1(citus_job_wait);
PG_FUNCTION_INFO_V1(citus_task_wait);
/*
@ -203,7 +204,41 @@ citus_job_wait(PG_FUNCTION_ARGS)
/*
* citus_job_wait_internal imaplements the waiting on a job for reuse in other areas where
* pg_catalog.citus_task_wait(taskid bigint,
* desired_status citus_task_status DEFAULT NULL) boolean
* waits till a task reaches a desired status, or can't reach the status anymore because
* it reached a (different) terminal state. When no desired_status is given it will
* assume any terminal state as its desired status. The function returns if the
* desired_state was reached.
*
* The current implementation is a polling implementation with an interval of 1 second.
* Ideally we would have some synchronization between the background tasks queue monitor
* and any backend calling this function to receive a signal when the task changes state.
*/
Datum
citus_task_wait(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
int64 taskid = PG_GETARG_INT64(0);
/* parse the optional desired_status argument */
bool hasDesiredStatus = !PG_ARGISNULL(1);
BackgroundTaskStatus desiredStatus = { 0 };
if (hasDesiredStatus)
{
desiredStatus = BackgroundTaskStatusByOid(PG_GETARG_OID(1));
}
citus_task_wait_internal(taskid, hasDesiredStatus ? &desiredStatus : NULL);
PG_RETURN_VOID();
}
/*
* citus_job_wait_internal implements the waiting on a job for reuse in other areas where
* we want to wait on jobs. eg the background rebalancer.
*
* When a desiredStatus is provided it will provide an error when a different state is
@ -285,6 +320,89 @@ citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus)
}
/*
* citus_task_wait_internal implements the waiting on a task for reuse in other areas where
* we want to wait on tasks.
*
* When a desiredStatus is provided it will provide an error when a different state is
* reached and the state cannot ever reach the desired state anymore.
*/
void
citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus)
{
/*
* Since we are wait polling we will actually allocate memory on every poll. To make
* sure we don't put unneeded pressure on the memory we create a context that we clear
* every iteration.
*/
MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext,
"TasksWaitContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContext oldContext = MemoryContextSwitchTo(waitContext);
while (true)
{
MemoryContextReset(waitContext);
BackgroundTask *task = GetBackgroundTaskByTaskId(taskid);
if (!task)
{
ereport(ERROR, (errmsg("no task found with taskid: %ld", taskid)));
}
if (desiredStatus && task->status == *desiredStatus)
{
/* task has reached its desired status, done waiting */
break;
}
if (IsBackgroundTaskStatusTerminal(task->status))
{
if (desiredStatus)
{
/*
* We have reached a terminal state, which is not the desired state we
* were waiting for, otherwise we would have escaped earlier. Since it is
* a terminal state we know that we can never reach the desired state.
*/
Oid reachedStatusOid = BackgroundTaskStatusOid(task->status);
Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out,
reachedStatusOid);
char *reachedStatusName = DatumGetCString(reachedStatusNameDatum);
Oid desiredStatusOid = BackgroundTaskStatusOid(*desiredStatus);
Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out,
desiredStatusOid);
char *desiredStatusName = DatumGetCString(desiredStatusNameDatum);
ereport(ERROR,
(errmsg("Task reached terminal state \"%s\" instead of desired "
"state \"%s\"", reachedStatusName, desiredStatusName)));
}
/* task has reached its terminal state, done waiting */
break;
}
/* sleep for a while, before rechecking the task status */
CHECK_FOR_INTERRUPTS();
const long delay_ms = 1000;
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
delay_ms,
WAIT_EVENT_PG_SLEEP);
ResetLatch(MyLatch);
}
MemoryContextSwitchTo(oldContext);
MemoryContextDelete(waitContext);
}
/*
* StartCitusBackgroundTaskQueueMonitor spawns a new background worker connected to the
* current database and owner. This background worker consumes the tasks that are ready
@ -745,8 +863,7 @@ TaskEnded(TaskExecutionContext *taskExecutionContext)
hash_search(currentExecutors, &task->taskid,
HASH_REMOVE, NULL);
TerminateBackgroundWorker(handleEntry->handle);
dsm_detach(handleEntry->seg);
WaitForBackgroundWorkerShutdown(handleEntry->handle);
queueMonitorExecutionContext->currentExecutorCount--;
}
@ -1022,6 +1139,9 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
/* handle signals */
CHECK_FOR_INTERRUPTS();
/* invalidate cache for new data in catalog */
InvalidateMetadataSystemCache();
/*
* if the flag is set, we should terminate all task executor workers to prevent duplicate
* runs of the same task on the next start of the monitor, which is dangerous for non-idempotent
@ -1051,9 +1171,6 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
ProcessConfigFile(PGC_SIGHUP);
}
/* invalidate cache for new data in catalog */
InvalidateMetadataSystemCache();
/* assign runnable tasks, if any, to new task executors in a transaction if we do not have SIGTERM or SIGINT */
if (!MonitorGotTerminationOrCancellationRequest())
{
@ -1485,13 +1602,6 @@ StoreArgumentsInDSM(char *database, char *username, char *command,
return NULL;
}
/*
* when we have CurrentResourceOwner != NULL, segment will be released upon CurrentResourceOwner release,
* but we may consume the queue in segment even after CurrentResourceOwner released. 'dsm_pin_mapping' helps
* persisting the segment until the session ends or the segment is detached explicitly by 'dsm_detach'.
*/
dsm_pin_mapping(seg);
shm_toc *toc = shm_toc_create(CITUS_BACKGROUND_TASK_MAGIC, dsm_segment_address(seg),
segsize);
@ -1524,6 +1634,13 @@ StoreArgumentsInDSM(char *database, char *username, char *command,
shm_mq_attach(mq, seg, NULL);
/*
* when we have CurrentResourceOwner != NULL, segment will be released upon CurrentResourceOwner release,
* but we may consume the queue in segment even after CurrentResourceOwner released. 'dsm_pin_mapping' helps
* persisting the segment until the session ends or the segment is detached explicitly by 'dsm_detach'.
*/
dsm_pin_mapping(seg);
return seg;
}
@ -1675,31 +1792,10 @@ CitusBackgroundTaskExecutor(Datum main_arg)
"executing this task")));
}
/* Prepare to execute the query. */
SetCurrentStatementStartTimestamp();
debug_query_string = command;
char *appname = psprintf("citus background task queue executor (%ld/%ld)",
*jobId, *taskId);
pgstat_report_appname(appname);
pgstat_report_activity(STATE_RUNNING, command);
StartTransactionCommand();
if (StatementTimeout > 0)
{
enable_timeout_after(STATEMENT_TIMEOUT, StatementTimeout);
}
else
{
disable_timeout(STATEMENT_TIMEOUT, false);
}
/* Execute the query. */
StartTransactionCommand();
ExecuteSqlString(command);
/* Post-execution cleanup. */
disable_timeout(STATEMENT_TIMEOUT, false);
CommitTransactionCommand();
pgstat_report_activity(STATE_IDLE, command);
pgstat_report_stat(true);
/* Signal that we are done. */
ReadyForQuery(DestRemote);

View File

@ -92,6 +92,8 @@ extern void CitusBackgroundTaskExecutor(Datum main_arg);
extern Datum citus_job_cancel(PG_FUNCTION_ARGS);
extern Datum citus_job_wait(PG_FUNCTION_ARGS);
extern Datum citus_task_wait(PG_FUNCTION_ARGS);
extern void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus);
extern void citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus);
#endif /*CITUS_BACKGROUND_JOBS_H */

View File

@ -3,6 +3,33 @@ SET search_path TO background_task_queue_monitor;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 3536400;
-- reset sequence values
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 1450000;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1450000;
-- create helper procedure to wait until given task is retried or timeout occurs
CREATE OR REPLACE PROCEDURE citus_wait_task_until_retried(taskid bigint)
LANGUAGE plpgsql
AS $$
DECLARE
retried boolean := false;
loop_count int := 0;
BEGIN
WHILE retried = false and loop_count < 20
LOOP
SELECT (retry_count IS NOT NULL) INTO retried FROM pg_dist_background_task WHERE task_id = taskid;
PERFORM pg_sleep(1);
loop_count := loop_count + 1;
COMMIT;
END LOOP;
-- timeout if the task is not retried in 20 sec
IF retried = false
THEN
RAISE WARNING 'Timeout while waiting for retrying task:%', taskid;
END IF;
END;
$$;
--
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
SELECT pg_reload_conf();
pg_reload_conf
@ -11,6 +38,7 @@ SELECT pg_reload_conf();
(1 row)
CREATE TABLE results (a int);
-- TEST1
-- simple job that inserts 1 into results to show that query runs
SELECT a FROM results WHERE a = 1; -- verify result is not in there
a
@ -31,37 +59,7 @@ SELECT a FROM results WHERE a = 1; -- verify result is there
1
(1 row)
-- cancel a scheduled job
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancel a scheduled job') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset
SELECT citus_job_cancel(:job_id);
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id);
citus_job_wait
---------------------------------------------------------------------
(1 row)
-- verify we get an error when waiting for a job to reach a specific status while it is already in a different terminal status
SELECT citus_job_wait(:job_id, desired_status => 'finished');
ERROR: Job reached terminal state "cancelled" instead of desired state "finished"
-- show that the status has been cancelled
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
state | did_start
---------------------------------------------------------------------
cancelled | f
(1 row)
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
status | did_start
---------------------------------------------------------------------
cancelled | f
(1 row)
-- TEST2
-- cancel a running job
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancelling a task after it started') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset
@ -96,6 +94,7 @@ SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE
cancelled | t
(1 row)
-- TEST3
-- test a failing task becomes runnable in the future again
-- we cannot fully test the backoff strategy currently as it is hard coded to take about 50 minutes.
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'failure test due to division by zero') RETURNING job_id \gset
@ -106,12 +105,7 @@ SELECT citus_job_wait(:job_id, desired_status => 'running');
(1 row)
SELECT pg_sleep(.1); -- make sure it has time to error after it started running
pg_sleep
---------------------------------------------------------------------
(1 row)
CALL citus_wait_task_until_retried(:task_id); -- shows that we increased retry count for task after failure
SELECT status, pid, retry_count, NOT(message = '') AS has_message, (not_before > now()) AS scheduled_into_the_future FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
status | pid | retry_count | has_message | scheduled_into_the_future
---------------------------------------------------------------------
@ -143,7 +137,8 @@ SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE
cancelled | t
(1 row)
-- test running two dependant tasks
-- TEST4
-- test running two dependent tasks
TRUNCATE TABLE results;
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset
@ -165,7 +160,8 @@ SELECT a FROM results;
48
(1 row)
-- test running two dependant tasks, with a failing task that we cancel
-- TEST5
-- test running two dependent tasks, with a failing task that we cancel
TRUNCATE TABLE results;
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset
@ -181,12 +177,7 @@ SELECT citus_job_wait(:job_id, desired_status => 'running'); -- wait for the job
(1 row)
SELECT pg_sleep(.1); -- improve chances of hitting the failure
pg_sleep
---------------------------------------------------------------------
(1 row)
CALL citus_wait_task_until_retried(:task_id2); -- shows that we increased retry count for task after failure
SELECT citus_job_cancel(:job_id);
citus_job_cancel
---------------------------------------------------------------------
@ -213,6 +204,7 @@ SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE
cancelled | f
(3 rows)
-- TEST6
-- verify that we do not allow parallel task executors more than citus.max_background_task_executors(4 by default)
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset
@ -224,8 +216,32 @@ INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SE
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset
COMMIT;
SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit
pg_sleep
SELECT citus_task_wait(:task_id1, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id2, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id3, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id4, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id5, desired_status => 'runnable');
citus_task_wait
---------------------------------------------------------------------
(1 row)
@ -233,13 +249,13 @@ SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in runn
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable)
job_id | task_id | status
job_id | task_id | status
---------------------------------------------------------------------
7 | 11 | running
7 | 12 | running
7 | 13 | running
8 | 14 | running
9 | 15 | runnable
1450005 | 1450009 | running
1450005 | 1450010 | running
1450005 | 1450011 | running
1450006 | 1450012 | running
1450007 | 1450013 | runnable
(5 rows)
SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the last runnable task will be running
@ -263,13 +279,13 @@ SELECT citus_job_wait(:job_id3, desired_status => 'running');
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY job_id, task_id; -- show that last task is running
job_id | task_id | status
job_id | task_id | status
---------------------------------------------------------------------
7 | 11 | running
7 | 12 | running
7 | 13 | running
8 | 14 | cancelled
9 | 15 | running
1450005 | 1450009 | running
1450005 | 1450010 | running
1450005 | 1450011 | running
1450006 | 1450012 | cancelled
1450007 | 1450013 | running
(5 rows)
SELECT citus_job_cancel(:job_id1);
@ -299,15 +315,16 @@ SELECT citus_job_wait(:job_id3);
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY job_id, task_id; -- show that multiple cancels worked
job_id | task_id | status
job_id | task_id | status
---------------------------------------------------------------------
7 | 11 | cancelled
7 | 12 | cancelled
7 | 13 | cancelled
8 | 14 | cancelled
9 | 15 | cancelled
1450005 | 1450009 | cancelled
1450005 | 1450010 | cancelled
1450005 | 1450011 | cancelled
1450006 | 1450012 | cancelled
1450007 | 1450013 | cancelled
(5 rows)
-- TEST7
-- verify that a task, previously not started due to lack of workers, is executed after we increase max worker count
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset
@ -319,8 +336,32 @@ INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SE
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset
COMMIT;
SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit
pg_sleep
SELECT citus_task_wait(:task_id1, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id2, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id3, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id4, desired_status => 'running');
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id5, desired_status => 'runnable');
citus_task_wait
---------------------------------------------------------------------
(1 row)
@ -330,11 +371,11 @@ SELECT task_id, status FROM pg_dist_background_task
ORDER BY task_id; -- show that last task is not running but ready to run(runnable)
task_id | status
---------------------------------------------------------------------
16 | running
17 | running
18 | running
19 | running
20 | runnable
1450014 | running
1450015 | running
1450016 | running
1450017 | running
1450018 | runnable
(5 rows)
ALTER SYSTEM SET citus.max_background_task_executors TO 5;
@ -355,11 +396,11 @@ SELECT task_id, status FROM pg_dist_background_task
ORDER BY task_id; -- show that last task is running
task_id | status
---------------------------------------------------------------------
16 | running
17 | running
18 | running
19 | running
20 | running
1450014 | running
1450015 | running
1450016 | running
1450017 | running
1450018 | running
(5 rows)
SELECT citus_job_cancel(:job_id1);
@ -403,19 +444,20 @@ SELECT task_id, status FROM pg_dist_background_task
ORDER BY task_id; -- show that all tasks are cancelled
task_id | status
---------------------------------------------------------------------
16 | cancelled
17 | cancelled
18 | cancelled
19 | cancelled
20 | cancelled
1450014 | cancelled
1450015 | cancelled
1450016 | cancelled
1450017 | cancelled
1450018 | cancelled
(5 rows)
-- TEST8
-- verify that upon termination signal, all tasks fail and retry policy sets their status back to runnable
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset
COMMIT;
SELECT citus_job_wait(:job_id1, desired_status => 'running');
citus_job_wait
@ -434,8 +476,8 @@ SELECT task_id, status FROM pg_dist_background_task
ORDER BY task_id;
task_id | status
---------------------------------------------------------------------
21 | running
22 | running
1450019 | running
1450020 | running
(2 rows)
SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset
@ -445,22 +487,18 @@ SELECT pg_terminate_backend(:monitor_pid); -- terminate monitor process
t
(1 row)
SELECT pg_sleep(2); -- wait enough to show that tasks are terminated
pg_sleep
---------------------------------------------------------------------
(1 row)
CALL citus_wait_task_until_retried(:task_id1); -- shows that we increased retry count for task after failure
CALL citus_wait_task_until_retried(:task_id2); -- shows that we increased retry count for task after failure
SELECT task_id, status, retry_count, message FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY task_id; -- show that all tasks are runnable by retry policy after termination signal
task_id | status | retry_count | message
task_id | status | retry_count | message
---------------------------------------------------------------------
21 | runnable | 1 | FATAL: terminating connection due to administrator command +
| | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (13/21)+
1450019 | runnable | 1 | FATAL: terminating connection due to administrator command +
| | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (1450011/1450019) +
| | |
22 | runnable | 1 | FATAL: terminating connection due to administrator command +
| | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (14/22)+
1450020 | runnable | 1 | FATAL: terminating connection due to administrator command +
| | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (1450012/1450020) +
| | |
(2 rows)
@ -493,16 +531,17 @@ SELECT task_id, status FROM pg_dist_background_task
ORDER BY task_id; -- show that all tasks are cancelled
task_id | status
---------------------------------------------------------------------
21 | cancelled
22 | cancelled
1450019 | cancelled
1450020 | cancelled
(2 rows)
-- TEST9
-- verify that upon cancellation signal, all tasks are cancelled
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset
COMMIT;
SELECT citus_job_wait(:job_id1, desired_status => 'running');
citus_job_wait
@ -521,8 +560,8 @@ SELECT task_id, status FROM pg_dist_background_task
ORDER BY task_id;
task_id | status
---------------------------------------------------------------------
23 | running
24 | running
1450021 | running
1450022 | running
(2 rows)
SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset
@ -532,8 +571,14 @@ SELECT pg_cancel_backend(:monitor_pid); -- cancel monitor process
t
(1 row)
SELECT pg_sleep(2); -- wait enough to show that tasks are cancelled
pg_sleep
SELECT citus_task_wait(:task_id1, desired_status => 'cancelled'); -- shows that we cancelled the task
citus_task_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_task_wait(:task_id2, desired_status => 'cancelled'); -- shows that we cancelled the task
citus_task_wait
---------------------------------------------------------------------
(1 row)
@ -555,14 +600,15 @@ SELECT task_id, status FROM pg_dist_background_task
ORDER BY task_id; -- show that all tasks are cancelled
task_id | status
---------------------------------------------------------------------
23 | cancelled
24 | cancelled
1450021 | cancelled
1450022 | cancelled
(2 rows)
-- TEST10
-- verify that task is not starved by currently long running task
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset
COMMIT;
@ -581,10 +627,10 @@ SELECT citus_job_wait(:job_id2, desired_status => 'finished');
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY job_id, task_id; -- show that last task is finished without starvation
job_id | task_id | status
job_id | task_id | status
---------------------------------------------------------------------
17 | 25 | running
18 | 26 | done
1450015 | 1450023 | running
1450016 | 1450024 | done
(2 rows)
SELECT citus_job_cancel(:job_id1);
@ -602,15 +648,19 @@ SELECT citus_job_wait(:job_id1);
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY job_id, task_id; -- show that task is cancelled
job_id | task_id | status
job_id | task_id | status
---------------------------------------------------------------------
17 | 25 | cancelled
18 | 26 | done
1450015 | 1450023 | cancelled
1450016 | 1450024 | done
(2 rows)
SET client_min_messages TO WARNING;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
DROP SCHEMA background_task_queue_monitor CASCADE;
ALTER SYSTEM RESET citus.background_task_queue_interval;
ALTER SYSTEM RESET citus.max_background_task_executors;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------

View File

@ -1208,6 +1208,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_get_transaction_clock() cluster_clock
| function citus_internal_adjust_local_clock_to_remote(cluster_clock) void
| function citus_is_clock_after(cluster_clock,cluster_clock) boolean
| function citus_task_wait(bigint,citus_task_status) void
| function cluster_clock_cmp(cluster_clock,cluster_clock) integer
| function cluster_clock_eq(cluster_clock,cluster_clock) boolean
| function cluster_clock_ge(cluster_clock,cluster_clock) boolean
@ -1232,7 +1233,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| operator family cluster_clock_ops for access method btree
| sequence pg_dist_clock_logical_seq
| type cluster_clock
(31 rows)
(32 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -117,6 +117,7 @@ ORDER BY 1;
function citus_stat_statements_reset()
function citus_table_is_visible(oid)
function citus_table_size(regclass)
function citus_task_wait(bigint,citus_task_status)
function citus_text_send_as_jsonb(text)
function citus_total_relation_size(regclass,boolean)
function citus_truncate_trigger()
@ -311,5 +312,5 @@ ORDER BY 1;
view citus_stat_statements
view pg_dist_shard_placement
view time_partitions
(303 rows)
(304 rows)

View File

@ -4,11 +4,41 @@ SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 3536400;
-- reset sequence values
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 1450000;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1450000;
-- create helper procedure to wait until given task is retried or timeout occurs
CREATE OR REPLACE PROCEDURE citus_wait_task_until_retried(taskid bigint)
LANGUAGE plpgsql
AS $$
DECLARE
retried boolean := false;
loop_count int := 0;
BEGIN
WHILE retried = false and loop_count < 20
LOOP
SELECT (retry_count IS NOT NULL) INTO retried FROM pg_dist_background_task WHERE task_id = taskid;
PERFORM pg_sleep(1);
loop_count := loop_count + 1;
COMMIT;
END LOOP;
-- timeout if the task is not retried in 20 sec
IF retried = false
THEN
RAISE WARNING 'Timeout while waiting for retrying task:%', taskid;
END IF;
END;
$$;
--
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
SELECT pg_reload_conf();
CREATE TABLE results (a int);
-- TEST1
-- simple job that inserts 1 into results to show that query runs
SELECT a FROM results WHERE a = 1; -- verify result is not in there
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset
@ -16,20 +46,7 @@ INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ INS
SELECT citus_job_wait(:job_id); -- wait for the job to be finished
SELECT a FROM results WHERE a = 1; -- verify result is there
-- cancel a scheduled job
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancel a scheduled job') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset
SELECT citus_job_cancel(:job_id);
SELECT citus_job_wait(:job_id);
-- verify we get an error when waiting for a job to reach a specific status while it is already in a different terminal status
SELECT citus_job_wait(:job_id, desired_status => 'finished');
-- show that the status has been cancelled
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
-- TEST2
-- cancel a running job
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancelling a task after it started') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset
@ -42,14 +59,15 @@ SELECT citus_job_wait(:job_id);
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
-- TEST3
-- test a failing task becomes runnable in the future again
-- we cannot fully test the backoff strategy currently as it is hard coded to take about 50 minutes.
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'failure test due to division by zero') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT 1/0; $job$) RETURNING task_id \gset
SELECT citus_job_wait(:job_id, desired_status => 'running');
SELECT pg_sleep(.1); -- make sure it has time to error after it started running
CALL citus_wait_task_until_retried(:task_id); -- shows that we increased retry count for task after failure
SELECT status, pid, retry_count, NOT(message = '') AS has_message, (not_before > now()) AS scheduled_into_the_future FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
-- test cancelling a failed/retrying job
@ -59,7 +77,9 @@ SELECT citus_job_wait(:job_id);
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
-- test running two dependant tasks
-- TEST4
-- test running two dependent tasks
TRUNCATE TABLE results;
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset
@ -73,7 +93,8 @@ COMMIT;
SELECT citus_job_wait(:job_id); -- wait for the job to be finished
SELECT a FROM results;
-- test running two dependant tasks, with a failing task that we cancel
-- TEST5
-- test running two dependent tasks, with a failing task that we cancel
TRUNCATE TABLE results;
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset
@ -85,13 +106,14 @@ INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES
COMMIT;
SELECT citus_job_wait(:job_id, desired_status => 'running'); -- wait for the job to be running, and possibly hitting a failure
SELECT pg_sleep(.1); -- improve chances of hitting the failure
CALL citus_wait_task_until_retried(:task_id2); -- shows that we increased retry count for task after failure
SELECT citus_job_cancel(:job_id);
SELECT citus_job_wait(:job_id); -- wait for the job to be cancelled
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
-- TEST6
-- verify that we do not allow parallel task executors more than citus.max_background_task_executors(4 by default)
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset
@ -104,7 +126,11 @@ INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', '
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset
COMMIT;
SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit
SELECT citus_task_wait(:task_id1, desired_status => 'running');
SELECT citus_task_wait(:task_id2, desired_status => 'running');
SELECT citus_task_wait(:task_id3, desired_status => 'running');
SELECT citus_task_wait(:task_id4, desired_status => 'running');
SELECT citus_task_wait(:task_id5, desired_status => 'runnable');
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
@ -125,7 +151,7 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY job_id, task_id; -- show that multiple cancels worked
-- TEST7
-- verify that a task, previously not started due to lack of workers, is executed after we increase max worker count
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset
@ -138,7 +164,11 @@ INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', '
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset
COMMIT;
SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit
SELECT citus_task_wait(:task_id1, desired_status => 'running');
SELECT citus_task_wait(:task_id2, desired_status => 'running');
SELECT citus_task_wait(:task_id3, desired_status => 'running');
SELECT citus_task_wait(:task_id4, desired_status => 'running');
SELECT citus_task_wait(:task_id5, desired_status => 'runnable');
SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
@ -162,12 +192,13 @@ SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY task_id; -- show that all tasks are cancelled
-- TEST8
-- verify that upon termination signal, all tasks fail and retry policy sets their status back to runnable
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset
COMMIT;
SELECT citus_job_wait(:job_id1, desired_status => 'running');
@ -181,7 +212,8 @@ SELECT task_id, status FROM pg_dist_background_task
SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset
SELECT pg_terminate_backend(:monitor_pid); -- terminate monitor process
SELECT pg_sleep(2); -- wait enough to show that tasks are terminated
CALL citus_wait_task_until_retried(:task_id1); -- shows that we increased retry count for task after failure
CALL citus_wait_task_until_retried(:task_id2); -- shows that we increased retry count for task after failure
SELECT task_id, status, retry_count, message FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
@ -196,12 +228,13 @@ SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY task_id; -- show that all tasks are cancelled
-- TEST9
-- verify that upon cancellation signal, all tasks are cancelled
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset
COMMIT;
SELECT citus_job_wait(:job_id1, desired_status => 'running');
@ -215,7 +248,8 @@ SELECT task_id, status FROM pg_dist_background_task
SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset
SELECT pg_cancel_backend(:monitor_pid); -- cancel monitor process
SELECT pg_sleep(2); -- wait enough to show that tasks are cancelled
SELECT citus_task_wait(:task_id1, desired_status => 'cancelled'); -- shows that we cancelled the task
SELECT citus_task_wait(:task_id2, desired_status => 'cancelled'); -- shows that we cancelled the task
SELECT citus_job_wait(:job_id1);
SELECT citus_job_wait(:job_id2);
@ -224,10 +258,11 @@ SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY task_id; -- show that all tasks are cancelled
-- TEST10
-- verify that task is not starved by currently long running task
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset
COMMIT;
@ -245,7 +280,11 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
ORDER BY job_id, task_id; -- show that task is cancelled
SET client_min_messages TO WARNING;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
DROP SCHEMA background_task_queue_monitor CASCADE;
ALTER SYSTEM RESET citus.background_task_queue_interval;
ALTER SYSTEM RESET citus.max_background_task_executors;
SELECT pg_reload_conf();