nonblocking concurrent task execution via background workers (#6459)

Improvement on our background task monitoring API (PR #6296) to support
concurrent and nonblocking task execution.

Mainly we have a queue monitor background process which forks task
executors for `Runnable` tasks and then monitors their status by
fetching messages from shared memory queue in nonblocking way.
pull/6532/head
aykut-bozkurt 2022-11-30 14:29:46 +03:00 committed by GitHub
parent 83ef600f27
commit 1f8675da43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 857 additions and 296 deletions

View File

@ -1690,6 +1690,24 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_background_task_executors",
gettext_noop(
"Sets the maximum number of parallel task executor workers for scheduled "
"background tasks"),
gettext_noop(
"Controls the maximum number of parallel task executors the task monitor "
"can create for scheduled background tasks. Note that the value is not effective "
"if it is set a value higher than 'max_worker_processes' postgres parameter . It is "
"also not guaranteed to have exactly specified number of parallel task executors "
"because total background worker count is shared by all background workers. The value "
"represents the possible maximum number of task executors."),
&MaxBackgroundTaskExecutors,
4, 1, MAX_BG_TASK_EXECUTORS,
PGC_SIGHUP,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_cached_connection_lifetime",
gettext_noop("Sets the maximum lifetime of cached connections to other nodes."),

File diff suppressed because it is too large Load Diff

View File

@ -98,6 +98,7 @@ double DistributedDeadlockDetectionTimeoutFactor = 2.0;
int Recover2PCInterval = 60000;
int DeferShardDeleteInterval = 15000;
int BackgroundTaskQueueCheckInterval = 5000;
int MaxBackgroundTaskExecutors = 4;
/* config variables for metadata sync timeout */
int MetadataSyncInterval = 60000;

View File

@ -17,10 +17,77 @@
#include "distributed/metadata_utility.h"
/*
* BackgroundExecutorHashEntry hash table entry to refer existing task executors
*/
typedef struct BackgroundExecutorHashEntry
{
/* hash key must be the first to hash correctly */
int64 taskid;
BackgroundWorkerHandle *handle;
dsm_segment *seg;
StringInfo message;
} BackgroundExecutorHashEntry;
/*
* TaskExecutionStatus status for task execution in queue monitor
*/
typedef enum TaskExecutionStatus
{
TASK_EXECUTION_STATUS_SUCCESS = 0,
TASK_EXECUTION_STATUS_ERROR,
TASK_EXECUTION_STATUS_CANCELLED,
TASK_EXECUTION_STATUS_RUNNING,
TASK_EXECUTION_STATUS_WOULDBLOCK
} TaskExecutionStatus;
/*
* QueueMonitorExecutionContext encapsulates info related to executors and tasks
* in queue monitor
*/
typedef struct QueueMonitorExecutionContext
{
/* current total # of parallel task executors */
int64 currentExecutorCount;
/* map of current executors */
HTAB *currentExecutors;
/* last background allocation failure timestamp */
TimestampTz backgroundWorkerFailedStartTime;
/* useful to track if all tasks EWOULDBLOCK'd at current iteration */
bool allTasksWouldBlock;
/* context for monitor related allocations */
MemoryContext ctx;
} QueueMonitorExecutionContext;
/*
* TaskExecutionContext encapsulates info for currently executed task in queue monitor
*/
typedef struct TaskExecutionContext
{
/* active background executor entry */
BackgroundExecutorHashEntry *handleEntry;
/* active background task */
BackgroundTask *task;
/* context for queue monitor */
QueueMonitorExecutionContext *queueMonitorExecutionContext;
} TaskExecutionContext;
extern BackgroundWorkerHandle * StartCitusBackgroundTaskQueueMonitor(Oid database,
Oid extensionOwner);
extern void CitusBackgroundTaskQueueMonitorMain(Datum arg);
extern void CitusBackgroundTaskExecuter(Datum main_arg);
extern void CitusBackgroundTaskExecutor(Datum main_arg);
extern Datum citus_job_cancel(PG_FUNCTION_ARGS);
extern Datum citus_job_wait(PG_FUNCTION_ARGS);

View File

@ -11,9 +11,12 @@
#ifndef CITUS_SHARD_CLEANER_H
#define CITUS_SHARD_CLEANER_H
#define MAX_BG_TASK_EXECUTORS 1000
/* GUC to configure deferred shard deletion */
extern int DeferShardDeleteInterval;
extern int BackgroundTaskQueueCheckInterval;
extern int MaxBackgroundTaskExecutors;
extern double DesiredPercentFreeAfterMove;
extern bool CheckAvailableSpaceBeforeMove;

View File

@ -207,6 +207,135 @@ SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE
cancelled | f
(3 rows)
-- verify that we do not allow parallel task executors more than citus.max_background_task_executors(4 by default)
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id3 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id4 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset
COMMIT;
SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable)
job_id | task_id | status
---------------------------------------------------------------------
7 | 11 | running
7 | 12 | running
7 | 13 | running
8 | 14 | running
9 | 15 | runnable
(5 rows)
SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the last runnable task will be running
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id3, desired_status => 'running');
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY job_id, task_id; -- show that last task is running
job_id | task_id | status
---------------------------------------------------------------------
7 | 11 | running
7 | 12 | running
7 | 13 | running
8 | 14 | cancelled
9 | 15 | running
(5 rows)
SELECT citus_job_cancel(:job_id1);
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SELECT citus_job_cancel(:job_id3);
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id1);
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id2);
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id3);
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY job_id, task_id; -- show that multiple cancels worked
job_id | task_id | status
---------------------------------------------------------------------
7 | 11 | cancelled
7 | 12 | cancelled
7 | 13 | cancelled
8 | 14 | cancelled
9 | 15 | cancelled
(5 rows)
-- verify that task is not starved by currently long running task
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset
COMMIT;
SELECT citus_job_wait(:job_id1, desired_status => 'running');
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id2, desired_status => 'finished');
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY job_id, task_id; -- show that last task is finished without starvation
job_id | task_id | status
---------------------------------------------------------------------
10 | 16 | running
11 | 17 | done
(2 rows)
SELECT citus_job_cancel(:job_id1);
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO WARNING;
DROP SCHEMA background_task_queue_monitor CASCADE;
ALTER SYSTEM RESET citus.background_task_queue_interval;

View File

@ -91,6 +91,55 @@ SELECT citus_job_wait(:job_id); -- wait for the job to be cancelled
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
-- verify that we do not allow parallel task executors more than citus.max_background_task_executors(4 by default)
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id3 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id4 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset
COMMIT;
SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable)
SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the last runnable task will be running
SELECT citus_job_wait(:job_id3, desired_status => 'running');
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY job_id, task_id; -- show that last task is running
SELECT citus_job_cancel(:job_id1);
SELECT citus_job_cancel(:job_id3);
SELECT citus_job_wait(:job_id1);
SELECT citus_job_wait(:job_id2);
SELECT citus_job_wait(:job_id3);
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY job_id, task_id; -- show that multiple cancels worked
-- verify that task is not starved by currently long running task
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset
COMMIT;
SELECT citus_job_wait(:job_id1, desired_status => 'running');
SELECT citus_job_wait(:job_id2, desired_status => 'finished');
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY job_id, task_id; -- show that last task is finished without starvation
SELECT citus_job_cancel(:job_id1);
SET client_min_messages TO WARNING;
DROP SCHEMA background_task_queue_monitor CASCADE;