Implement infrastructure to run sql jobs in the background (#6296)

DESCRIPTION: Add infrastructure to run long running management operations in background

This infrastructure introduces the primitives of jobs and tasks.
A task consists of a sql statement and an owner. Tasks belong to a
Job and can depend on other tasks from the same job.

When there are either runnable or running tasks we would like to
make sure a bacgrkound task queue monitor process is running. A Task
could be in running state while there is actually no monitor present
due to a database restart or failover. Once the monitor starts it
will reset any running task to its runnable state.

To make sure only one background task queue monitor is ever running
at once it will acquire an advisory lock that self conflicts.

Once a task is done it will find all tasks depending on this task.
After checking that the task doesn't have unmet dependencies it will
transition the task from blocked to runnable state for the task to
be picked up on a subsequent task start.

Currently only one task can be running at a time. This can be
improved upon in later releases without changes to the higher level
API.

The initial goal for this background tasks is to allow a rebalance
to run in the background. This will be implemented in a subsequent PR.
pull/6299/head
Nils Dijk 2022-09-09 15:11:19 +02:00 committed by GitHub
parent 76137e967f
commit 00a94c7f13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 4262 additions and 8 deletions

View File

@ -141,6 +141,32 @@ typedef struct MetadataCacheData
bool extensionLoaded;
Oid distShardRelationId;
Oid distPlacementRelationId;
Oid distBackgroundJobRelationId;
Oid distBackgroundJobPKeyIndexId;
Oid distBackgroundJobJobIdSequenceId;
Oid distBackgroundTaskRelationId;
Oid distBackgroundTaskPKeyIndexId;
Oid distBackgroundTaskJobIdTaskIdIndexId;
Oid distBackgroundTaskStatusTaskIdIndexId;
Oid distBackgroundTaskTaskIdSequenceId;
Oid distBackgroundTaskDependRelationId;
Oid distBackgroundTaskDependTaskIdIndexId;
Oid distBackgroundTaskDependDependsOnIndexId;
Oid citusJobStatusScheduledId;
Oid citusJobStatusRunningId;
Oid citusJobStatusCancellingId;
Oid citusJobStatusFinishedId;
Oid citusJobStatusCancelledId;
Oid citusJobStatusFailedId;
Oid citusJobStatusFailingId;
Oid citusTaskStatusBlockedId;
Oid citusTaskStatusRunnableId;
Oid citusTaskStatusRunningId;
Oid citusTaskStatusDoneId;
Oid citusTaskStatusErrorId;
Oid citusTaskStatusUnscheduledId;
Oid citusTaskStatusCancelledId;
Oid citusTaskStatusCancellingId;
Oid distRebalanceStrategyRelationId;
Oid distNodeRelationId;
Oid distNodeNodeIdIndexId;
@ -2492,6 +2518,116 @@ DistLocalGroupIdRelationId(void)
}
Oid
DistBackgroundJobRelationId(void)
{
CachedRelationLookup("pg_dist_background_job",
&MetadataCache.distBackgroundJobRelationId);
return MetadataCache.distBackgroundJobRelationId;
}
Oid
DistBackgroundJobPKeyIndexId(void)
{
CachedRelationLookup("pg_dist_background_job_pkey",
&MetadataCache.distBackgroundJobPKeyIndexId);
return MetadataCache.distBackgroundJobPKeyIndexId;
}
Oid
DistBackgroundJobJobIdSequenceId(void)
{
CachedRelationLookup("pg_dist_background_job_job_id_seq",
&MetadataCache.distBackgroundJobJobIdSequenceId);
return MetadataCache.distBackgroundJobJobIdSequenceId;
}
Oid
DistBackgroundTaskRelationId(void)
{
CachedRelationLookup("pg_dist_background_task",
&MetadataCache.distBackgroundTaskRelationId);
return MetadataCache.distBackgroundTaskRelationId;
}
Oid
DistBackgroundTaskPKeyIndexId(void)
{
CachedRelationLookup("pg_dist_background_task_pkey",
&MetadataCache.distBackgroundTaskPKeyIndexId);
return MetadataCache.distBackgroundTaskPKeyIndexId;
}
Oid
DistBackgroundTaskJobIdTaskIdIndexId(void)
{
CachedRelationLookup("pg_dist_background_task_job_id_task_id",
&MetadataCache.distBackgroundTaskJobIdTaskIdIndexId);
return MetadataCache.distBackgroundTaskJobIdTaskIdIndexId;
}
Oid
DistBackgroundTaskStatusTaskIdIndexId(void)
{
CachedRelationLookup("pg_dist_background_task_status_task_id_index",
&MetadataCache.distBackgroundTaskStatusTaskIdIndexId);
return MetadataCache.distBackgroundTaskStatusTaskIdIndexId;
}
Oid
DistBackgroundTaskTaskIdSequenceId(void)
{
CachedRelationLookup("pg_dist_background_task_task_id_seq",
&MetadataCache.distBackgroundTaskTaskIdSequenceId);
return MetadataCache.distBackgroundTaskTaskIdSequenceId;
}
Oid
DistBackgroundTaskDependRelationId(void)
{
CachedRelationLookup("pg_dist_background_task_depend",
&MetadataCache.distBackgroundTaskDependRelationId);
return MetadataCache.distBackgroundTaskDependRelationId;
}
Oid
DistBackgroundTaskDependTaskIdIndexId(void)
{
CachedRelationLookup("pg_dist_background_task_depend_task_id",
&MetadataCache.distBackgroundTaskDependTaskIdIndexId);
return MetadataCache.distBackgroundTaskDependTaskIdIndexId;
}
Oid
DistBackgroundTaskDependDependsOnIndexId(void)
{
CachedRelationLookup("pg_dist_background_task_depend_depends_on",
&MetadataCache.distBackgroundTaskDependDependsOnIndexId);
return MetadataCache.distBackgroundTaskDependDependsOnIndexId;
}
/* return oid of pg_dist_rebalance_strategy relation */
Oid
DistRebalanceStrategyRelationId(void)
@ -3236,6 +3372,201 @@ SecondaryNodeRoleId(void)
}
Oid
CitusJobStatusScheduledId(void)
{
if (!MetadataCache.citusJobStatusScheduledId)
{
MetadataCache.citusJobStatusScheduledId =
LookupStringEnumValueId("citus_job_status", "scheduled");
}
return MetadataCache.citusJobStatusScheduledId;
}
Oid
CitusJobStatusRunningId(void)
{
if (!MetadataCache.citusJobStatusRunningId)
{
MetadataCache.citusJobStatusRunningId =
LookupStringEnumValueId("citus_job_status", "running");
}
return MetadataCache.citusJobStatusRunningId;
}
Oid
CitusJobStatusCancellingId(void)
{
if (!MetadataCache.citusJobStatusCancellingId)
{
MetadataCache.citusJobStatusCancellingId =
LookupStringEnumValueId("citus_job_status", "cancelling");
}
return MetadataCache.citusJobStatusCancellingId;
}
Oid
CitusJobStatusFinishedId(void)
{
if (!MetadataCache.citusJobStatusFinishedId)
{
MetadataCache.citusJobStatusFinishedId =
LookupStringEnumValueId("citus_job_status", "finished");
}
return MetadataCache.citusJobStatusFinishedId;
}
Oid
CitusJobStatusCancelledId(void)
{
if (!MetadataCache.citusJobStatusCancelledId)
{
MetadataCache.citusJobStatusCancelledId =
LookupStringEnumValueId("citus_job_status", "cancelled");
}
return MetadataCache.citusJobStatusCancelledId;
}
Oid
CitusJobStatusFailedId(void)
{
if (!MetadataCache.citusJobStatusFailedId)
{
MetadataCache.citusJobStatusFailedId =
LookupStringEnumValueId("citus_job_status", "failed");
}
return MetadataCache.citusJobStatusFailedId;
}
Oid
CitusJobStatusFailingId(void)
{
if (!MetadataCache.citusJobStatusFailingId)
{
MetadataCache.citusJobStatusFailingId =
LookupStringEnumValueId("citus_job_status", "failing");
}
return MetadataCache.citusJobStatusFailingId;
}
Oid
CitusTaskStatusBlockedId(void)
{
if (!MetadataCache.citusTaskStatusBlockedId)
{
MetadataCache.citusTaskStatusBlockedId =
LookupStringEnumValueId("citus_task_status", "blocked");
}
return MetadataCache.citusTaskStatusBlockedId;
}
Oid
CitusTaskStatusCancelledId(void)
{
if (!MetadataCache.citusTaskStatusCancelledId)
{
MetadataCache.citusTaskStatusCancelledId =
LookupStringEnumValueId("citus_task_status", "cancelled");
}
return MetadataCache.citusTaskStatusCancelledId;
}
Oid
CitusTaskStatusCancellingId(void)
{
if (!MetadataCache.citusTaskStatusCancellingId)
{
MetadataCache.citusTaskStatusCancellingId =
LookupStringEnumValueId("citus_task_status", "cancelling");
}
return MetadataCache.citusTaskStatusCancellingId;
}
Oid
CitusTaskStatusRunnableId(void)
{
if (!MetadataCache.citusTaskStatusRunnableId)
{
MetadataCache.citusTaskStatusRunnableId =
LookupStringEnumValueId("citus_task_status", "runnable");
}
return MetadataCache.citusTaskStatusRunnableId;
}
Oid
CitusTaskStatusRunningId(void)
{
if (!MetadataCache.citusTaskStatusRunningId)
{
MetadataCache.citusTaskStatusRunningId =
LookupStringEnumValueId("citus_task_status", "running");
}
return MetadataCache.citusTaskStatusRunningId;
}
Oid
CitusTaskStatusDoneId(void)
{
if (!MetadataCache.citusTaskStatusDoneId)
{
MetadataCache.citusTaskStatusDoneId =
LookupStringEnumValueId("citus_task_status", "done");
}
return MetadataCache.citusTaskStatusDoneId;
}
Oid
CitusTaskStatusErrorId(void)
{
if (!MetadataCache.citusTaskStatusErrorId)
{
MetadataCache.citusTaskStatusErrorId =
LookupStringEnumValueId("citus_task_status", "error");
}
return MetadataCache.citusTaskStatusErrorId;
}
Oid
CitusTaskStatusUnscheduledId(void)
{
if (!MetadataCache.citusTaskStatusUnscheduledId)
{
MetadataCache.citusTaskStatusUnscheduledId =
LookupStringEnumValueId("citus_task_status", "unscheduled");
}
return MetadataCache.citusTaskStatusUnscheduledId;
}
/*
* citus_dist_partition_cache_invalidate is a trigger function that performs
* relcache invalidations when the contents of pg_dist_partition are changed

File diff suppressed because it is too large Load Diff

View File

@ -32,6 +32,7 @@
#include "common/string.h"
#include "executor/executor.h"
#include "distributed/backend_data.h"
#include "distributed/background_jobs.h"
#include "distributed/citus_depended_object.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_safe_lib.h"
@ -869,6 +870,16 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.background_task_queue_interval",
gettext_noop("Time to wait between checks for scheduled background tasks."),
NULL,
&BackgroundTaskQueueCheckInterval,
5000, -1, 7 * 24 * 3600 * 1000,
PGC_SIGHUP,
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.check_available_space_before_move",
gettext_noop("When enabled will check free disk space before a shard move"),

View File

@ -110,3 +110,60 @@ GRANT SELECT ON pg_catalog.pg_dist_cleanup_recordid_seq TO public;
-- old definition. By recreating it here upgrades also pick up the new changes.
#include "udfs/pg_cancel_backend/11.0-1.sql"
#include "udfs/pg_terminate_backend/11.0-1.sql"
CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'finished', 'cancelling', 'cancelled', 'failing', 'failed');
ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog;
CREATE TABLE citus.pg_dist_background_job (
job_id bigserial NOT NULL,
state pg_catalog.citus_job_status DEFAULT 'scheduled' NOT NULL,
job_type name NOT NULL,
description text NOT NULL,
started_at timestamptz,
finished_at timestamptz,
CONSTRAINT pg_dist_background_job_pkey PRIMARY KEY (job_id)
);
ALTER TABLE citus.pg_dist_background_job SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.pg_dist_background_job TO PUBLIC;
GRANT SELECT ON pg_catalog.pg_dist_background_job_job_id_seq TO PUBLIC;
CREATE TYPE citus.citus_task_status AS ENUM ('blocked', 'runnable', 'running', 'done', 'cancelling', 'error', 'unscheduled', 'cancelled');
ALTER TYPE citus.citus_task_status SET SCHEMA pg_catalog;
CREATE TABLE citus.pg_dist_background_task(
job_id bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_job(job_id),
task_id bigserial NOT NULL,
owner regrole NOT NULL DEFAULT CURRENT_USER::regrole,
pid integer,
status pg_catalog.citus_task_status default 'runnable' NOT NULL,
command text NOT NULL,
retry_count integer,
not_before timestamptz, -- can be null to indicate no delay for start of the task, will be set on failure to delay retries
message text NOT NULL DEFAULT '',
CONSTRAINT pg_dist_background_task_pkey PRIMARY KEY (task_id),
CONSTRAINT pg_dist_background_task_job_id_task_id UNIQUE (job_id, task_id) -- required for FK's to enforce tasks only reference other tasks within the same job
);
ALTER TABLE citus.pg_dist_background_task SET SCHEMA pg_catalog;
CREATE INDEX pg_dist_background_task_status_task_id_index ON pg_catalog.pg_dist_background_task USING btree(status, task_id);
GRANT SELECT ON pg_catalog.pg_dist_background_task TO PUBLIC;
GRANT SELECT ON pg_catalog.pg_dist_background_task_task_id_seq TO PUBLIC;
CREATE TABLE citus.pg_dist_background_task_depend(
job_id bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_job(job_id) ON DELETE CASCADE,
task_id bigint NOT NULL,
depends_on bigint NOT NULL,
PRIMARY KEY (job_id, task_id, depends_on),
FOREIGN KEY (job_id, task_id) REFERENCES pg_catalog.pg_dist_background_task (job_id, task_id) ON DELETE CASCADE,
FOREIGN KEY (job_id, depends_on) REFERENCES pg_catalog.pg_dist_background_task (job_id, task_id) ON DELETE CASCADE
);
ALTER TABLE citus.pg_dist_background_task_depend SET SCHEMA pg_catalog;
CREATE INDEX pg_dist_background_task_depend_task_id ON pg_catalog.pg_dist_background_task_depend USING btree(job_id, task_id);
CREATE INDEX pg_dist_background_task_depend_depends_on ON pg_catalog.pg_dist_background_task_depend USING btree(job_id, depends_on);
GRANT SELECT ON pg_catalog.pg_dist_background_task_depend TO PUBLIC;
#include "udfs/citus_job_wait/11.1-1.sql"
#include "udfs/citus_job_cancel/11.1-1.sql"

View File

@ -105,3 +105,11 @@ DROP TABLE pg_catalog.pg_dist_cleanup;
DROP SEQUENCE pg_catalog.pg_dist_operationid_seq;
DROP SEQUENCE pg_catalog.pg_dist_cleanup_recordid_seq;
DROP PROCEDURE pg_catalog.citus_cleanup_orphaned_resources();
DROP FUNCTION pg_catalog.citus_job_cancel(bigint);
DROP FUNCTION pg_catalog.citus_job_wait(bigint, pg_catalog.citus_job_status);
DROP TABLE pg_catalog.pg_dist_background_task_depend;
DROP TABLE pg_catalog.pg_dist_background_task;
DROP TYPE pg_catalog.citus_task_status;
DROP TABLE pg_catalog.pg_dist_background_job;
DROP TYPE pg_catalog.citus_job_status;

View File

@ -0,0 +1,8 @@
CREATE FUNCTION pg_catalog.citus_job_cancel(jobid bigint)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_job_cancel$$;
COMMENT ON FUNCTION pg_catalog.citus_job_cancel(jobid bigint)
IS 'cancel a scheduled or running job and all of its tasks that didn''t finish yet';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_job_cancel(jobid bigint) TO PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE FUNCTION pg_catalog.citus_job_cancel(jobid bigint)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_job_cancel$$;
COMMENT ON FUNCTION pg_catalog.citus_job_cancel(jobid bigint)
IS 'cancel a scheduled or running job and all of its tasks that didn''t finish yet';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_job_cancel(jobid bigint) TO PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE FUNCTION pg_catalog.citus_job_wait(jobid bigint, desired_status pg_catalog.citus_job_status DEFAULT NULL)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME',$$citus_job_wait$$;
COMMENT ON FUNCTION pg_catalog.citus_job_wait(jobid bigint, desired_status pg_catalog.citus_job_status)
IS 'blocks till the job identified by jobid is at the specified status, or reached a terminal status. Only waits for terminal status when no desired_status was specified. The return value indicates if the desired status was reached or not. When no desired status was specified it will assume any terminal status was desired';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_job_wait(jobid bigint, desired_status pg_catalog.citus_job_status) TO PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE FUNCTION pg_catalog.citus_job_wait(jobid bigint, desired_status pg_catalog.citus_job_status DEFAULT NULL)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME',$$citus_job_wait$$;
COMMENT ON FUNCTION pg_catalog.citus_job_wait(jobid bigint, desired_status pg_catalog.citus_job_status)
IS 'blocks till the job identified by jobid is at the specified status, or reached a terminal status. Only waits for terminal status when no desired_status was specified. The return value indicates if the desired status was reached or not. When no desired status was specified it will assume any terminal status was desired';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_job_wait(jobid bigint, desired_status pg_catalog.citus_job_status) TO PUBLIC;

File diff suppressed because it is too large Load Diff

View File

@ -27,11 +27,13 @@
#include "access/xlog.h"
#include "catalog/pg_extension.h"
#include "citus_version.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_namespace.h"
#include "commands/async.h"
#include "commands/extension.h"
#include "libpq/pqsignal.h"
#include "catalog/namespace.h"
#include "distributed/background_jobs.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/distributed_deadlock_detection.h"
#include "distributed/maintenanced.h"
@ -54,8 +56,10 @@
#include "storage/lwlock.h"
#include "tcop/tcopprot.h"
#include "common/hashfn.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/lsyscache.h"
#include "distributed/resource_lock.h"
/*
* Shared memory data for all maintenance workers.
@ -93,6 +97,7 @@ typedef struct MaintenanceDaemonDBData
double DistributedDeadlockDetectionTimeoutFactor = 2.0;
int Recover2PCInterval = 60000;
int DeferShardDeleteInterval = 15000;
int BackgroundTaskQueueCheckInterval = 5000;
/* config variables for metadata sync timeout */
int MetadataSyncInterval = 60000;
@ -120,7 +125,6 @@ static void MaintenanceDaemonErrorContext(void *arg);
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
static void WarnMaintenanceDaemonNotStarted(void);
/*
* InitializeMaintenanceDaemon, called at server start, is responsible for
* requesting shared memory and related infrastructure required by maintenance
@ -277,12 +281,15 @@ CitusMaintenanceDaemonMain(Datum main_arg)
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
ErrorContextCallback errorCallback;
TimestampTz lastRecoveryTime = 0;
TimestampTz lastShardCleanTime = 0;
TimestampTz lastStatStatementsPurgeTime = 0;
TimestampTz nextMetadataSyncTime = 0;
/* state kept for the background tasks queue monitor */
TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL;
bool backgroundTasksQueueWarnedForLock = false;
/*
* We do metadata sync in a separate background worker. We need its
@ -354,6 +361,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
* Do so before setting up signals etc, so we never exit without the
* context setup.
*/
ErrorContextCallback errorCallback = { 0 };
memset(&errorCallback, 0, sizeof(errorCallback));
errorCallback.callback = MaintenanceDaemonErrorContext;
errorCallback.arg = (void *) myDbData;
@ -682,6 +690,108 @@ CitusMaintenanceDaemonMain(Datum main_arg)
timeout = Min(timeout, (StatStatementsPurgeInterval * 1000));
}
pid_t backgroundTaskQueueWorkerPid = 0;
BgwHandleStatus backgroundTaskQueueWorkerStatus =
backgroundTasksQueueBgwHandle != NULL ? GetBackgroundWorkerPid(
backgroundTasksQueueBgwHandle, &backgroundTaskQueueWorkerPid) :
BGWH_STOPPED;
if (!RecoveryInProgress() && BackgroundTaskQueueCheckInterval > 0 &&
TimestampDifferenceExceeds(lastBackgroundTaskQueueCheck,
GetCurrentTimestamp(),
BackgroundTaskQueueCheckInterval) &&
backgroundTaskQueueWorkerStatus == BGWH_STOPPED)
{
/* clear old background worker for task queue before checking for new tasks */
if (backgroundTasksQueueBgwHandle)
{
pfree(backgroundTasksQueueBgwHandle);
backgroundTasksQueueBgwHandle = NULL;
}
StartTransactionCommand();
bool shouldStartBackgroundTaskQueueBackgroundWorker = false;
if (!LockCitusExtension())
{
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
"skipping stat statements purging")));
}
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
{
/* perform catalog precheck */
shouldStartBackgroundTaskQueueBackgroundWorker =
HasRunnableBackgroundTask();
}
CommitTransactionCommand();
if (shouldStartBackgroundTaskQueueBackgroundWorker)
{
/*
* Before we start the background worker we want to check if an orphaned
* one is still running. This could happen when the maintenance daemon
* restarted in a way where the background task queue monitor wasn't
* restarted.
*
* To check if an orphaned background task queue monitor is still running
* we quickly acquire the lock without waiting. If we can't acquire the
* lock this means that some other backed still has the lock. We prevent a
* new backend from starting and log a warning that we found that another
* process still holds the lock.
*/
LOCKTAG tag = { 0 };
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_BACKGROUND_TASK_MONITOR);
const bool sessionLock = false;
const bool dontWait = true;
LockAcquireResult locked =
LockAcquire(&tag, AccessExclusiveLock, sessionLock, dontWait);
if (locked == LOCKACQUIRE_NOT_AVAIL)
{
if (!backgroundTasksQueueWarnedForLock)
{
ereport(WARNING, (errmsg("background task queue monitor already "
"held"),
errdetail("the background task queue monitor "
"lock is held by another backend, "
"indicating the maintenance daemon "
"has lost track of an already "
"running background task queue "
"monitor, not starting a new one")));
backgroundTasksQueueWarnedForLock = true;
}
}
else
{
LockRelease(&tag, AccessExclusiveLock, sessionLock);
/* we were able to acquire the lock, reset the warning tracker */
backgroundTasksQueueWarnedForLock = false;
/* spawn background worker */
ereport(LOG, (errmsg("found scheduled background tasks, starting new "
"background task queue monitor")));
backgroundTasksQueueBgwHandle =
StartCitusBackgroundTaskQueueMonitor(MyDatabaseId,
myDbData->userOid);
if (!backgroundTasksQueueBgwHandle ||
GetBackgroundWorkerPid(backgroundTasksQueueBgwHandle,
&backgroundTaskQueueWorkerPid) ==
BGWH_STOPPED)
{
ereport(WARNING, (errmsg("unable to start background worker for "
"background task execution")));
}
}
}
/* interval management */
lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
timeout = Min(timeout, BackgroundTaskQueueCheckInterval);
}
/*
* Wait until timeout, or until somebody wakes us up. Also cast the timeout to
* integer where we've calculated it using double for not losing the precision.

View File

@ -0,0 +1,23 @@
/*-------------------------------------------------------------------------
*
* background_jobs.h
* Functions related to running the background tasks queue monitor.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CITUS_BACKGROUND_JOBS_H
#define CITUS_BACKGROUND_JOBS_H
#include "postgres.h"
#include "postmaster/bgworker.h"
extern BackgroundWorkerHandle * StartCitusBackgroundTaskQueueMonitor(Oid database,
Oid extensionOwner);
extern void CitusBackgroundTaskQueueMonitorMain(Datum arg);
extern void CitusBackgroundTaskExecuter(Datum main_arg);
#endif /*CITUS_BACKGROUND_JOBS_H */

View File

@ -230,6 +230,8 @@ extern Oid DistPartitionRelationId(void);
extern Oid DistShardRelationId(void);
extern Oid DistPlacementRelationId(void);
extern Oid DistNodeRelationId(void);
extern Oid DistBackgroundJobRelationId(void);
extern Oid DistBackgroundTaskRelationId(void);
extern Oid DistRebalanceStrategyRelationId(void);
extern Oid DistLocalGroupIdRelationId(void);
extern Oid DistObjectRelationId(void);
@ -239,6 +241,13 @@ extern Oid DistEnabledCustomAggregatesId(void);
extern Oid DistNodeNodeIdIndexId(void);
extern Oid DistPartitionLogicalRelidIndexId(void);
extern Oid DistPartitionColocationidIndexId(void);
extern Oid DistBackgroundJobPKeyIndexId(void);
extern Oid DistBackgroundTaskPKeyIndexId(void);
extern Oid DistBackgroundTaskJobIdTaskIdIndexId(void);
extern Oid DistBackgroundTaskStatusTaskIdIndexId(void);
extern Oid DistBackgroundTaskDependRelationId(void);
extern Oid DistBackgroundTaskDependTaskIdIndexId(void);
extern Oid DistBackgroundTaskDependDependsOnIndexId(void);
extern Oid DistShardLogicalRelidIndexId(void);
extern Oid DistShardShardidIndexId(void);
extern Oid DistPlacementShardidIndexId(void);
@ -250,6 +259,10 @@ extern Oid DistPlacementGroupidIndexId(void);
extern Oid DistObjectPrimaryKeyIndexId(void);
extern Oid DistCleanupPrimaryKeyIndexId(void);
/* sequence oids */
extern Oid DistBackgroundJobJobIdSequenceId(void);
extern Oid DistBackgroundTaskTaskIdSequenceId(void);
/* type oids */
extern Oid LookupTypeOid(char *schemaNameSting, char *typeNameString);
extern Oid CitusCopyFormatTypeId(void);
@ -272,6 +285,21 @@ extern Oid SecondaryNodeRoleId(void);
extern Oid CitusCopyFormatTypeId(void);
extern Oid TextCopyFormatId(void);
extern Oid BinaryCopyFormatId(void);
extern Oid CitusJobStatusScheduledId(void);
extern Oid CitusJobStatusRunningId(void);
extern Oid CitusJobStatusCancellingId(void);
extern Oid CitusJobStatusFinishedId(void);
extern Oid CitusJobStatusCancelledId(void);
extern Oid CitusJobStatusFailedId(void);
extern Oid CitusJobStatusFailingId(void);
extern Oid CitusTaskStatusBlockedId(void);
extern Oid CitusTaskStatusRunnableId(void);
extern Oid CitusTaskStatusRunningId(void);
extern Oid CitusTaskStatusDoneId(void);
extern Oid CitusTaskStatusErrorId(void);
extern Oid CitusTaskStatusUnscheduledId(void);
extern Oid CitusTaskStatusCancelledId(void);
extern Oid CitusTaskStatusCancellingId(void);
/* user related functions */
extern Oid CitusExtensionOwner(void);

View File

@ -203,6 +203,74 @@ typedef enum SizeQueryType
TABLE_SIZE /* pg_table_size() */
} SizeQueryType;
typedef enum BackgroundJobStatus
{
BACKGROUND_JOB_STATUS_SCHEDULED,
BACKGROUND_JOB_STATUS_RUNNING,
BACKGROUND_JOB_STATUS_FINISHED,
BACKGROUND_JOB_STATUS_CANCELLING,
BACKGROUND_JOB_STATUS_CANCELLED,
BACKGROUND_JOB_STATUS_FAILING,
BACKGROUND_JOB_STATUS_FAILED
} BackgroundJobStatus;
typedef struct BackgroundJob
{
int64 jobid;
BackgroundJobStatus state;
char *jobType;
char *description;
TimestampTz *started_at;
TimestampTz *finished_at;
/* extra space to store values for nullable value types above */
struct
{
TimestampTz started_at;
TimestampTz finished_at;
} __nullable_storage;
} BackgroundJob;
typedef enum BackgroundTaskStatus
{
BACKGROUND_TASK_STATUS_BLOCKED,
BACKGROUND_TASK_STATUS_RUNNABLE,
BACKGROUND_TASK_STATUS_RUNNING,
BACKGROUND_TASK_STATUS_CANCELLING,
BACKGROUND_TASK_STATUS_DONE,
BACKGROUND_TASK_STATUS_ERROR,
BACKGROUND_TASK_STATUS_UNSCHEDULED,
BACKGROUND_TASK_STATUS_CANCELLED
} BackgroundTaskStatus;
typedef struct BackgroundTask
{
int64 jobid;
int64 taskid;
Oid owner;
int32 *pid;
BackgroundTaskStatus status;
char *command;
int32 *retry_count;
TimestampTz *not_before;
char *message;
/* extra space to store values for nullable value types above */
struct
{
int32 pid;
int32 retry_count;
TimestampTz not_before;
} __nullable_storage;
} BackgroundTask;
#define SET_NULLABLE_FIELD(ptr, field, value) \
(ptr)->__nullable_storage.field = (value); \
(ptr)->field = &((ptr)->__nullable_storage.field)
#define UNSET_NULLABLE_FIELD(ptr, field) \
(ptr)->field = NULL; \
memset_struct_0((ptr)->__nullable_storage.field)
/* Size functions */
extern Datum citus_table_size(PG_FUNCTION_ARGS);
@ -315,4 +383,27 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid
ownerRelationId);
extern void AlterSequenceType(Oid seqOid, Oid typeOid);
extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId);
extern bool HasRunnableBackgroundTask(void);
extern int64 CreateBackgroundJob(const char *jobType, const char *description);
extern BackgroundTask * ScheduleBackgroundTask(int64 jobId, Oid owner, char *command,
int dependingTaskCount,
int64 dependingTaskIds[]);
extern BackgroundTask * GetRunnableBackgroundTask(void);
extern void ResetRunningBackgroundTasks(void);
extern BackgroundJob * GetBackgroundJobByJobId(int64 jobId);
extern BackgroundTask * GetBackgroundTaskByTaskId(int64 taskId);
extern void UpdateBackgroundJob(int64 jobId);
extern void UpdateBackgroundTask(BackgroundTask *task);
extern void UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status,
const int32 *retry_count, char *message);
extern bool UpdateJobError(BackgroundTask *job, ErrorData *edata);
extern List * CancelTasksForJob(int64 jobid);
extern void UnscheduleDependentTasks(BackgroundTask *task);
extern void UnblockDependingBackgroundTasks(BackgroundTask *task);
extern BackgroundJobStatus BackgroundJobStatusByOid(Oid enumOid);
extern BackgroundTaskStatus BackgroundTaskStatusByOid(Oid enumOid);
extern bool IsBackgroundJobStatusTerminal(BackgroundJobStatus status);
extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status);
extern Oid BackgroundJobStatusOid(BackgroundJobStatus status);
extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status);
#endif /* METADATA_UTILITY_H */

View File

@ -0,0 +1,26 @@
/*-------------------------------------------------------------------------
*
* pg_dist_background_job.h
* definition of the relation that holds the jobs metadata
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CITUS_PG_DIST_BACKGROUND_JOB_H
#define CITUS_PG_DIST_BACKGROUND_JOB_H
/* ----------------
* compiler constants for pg_dist_background_job
* ----------------
*/
#define Natts_pg_dist_background_job 6
#define Anum_pg_dist_background_job_job_id 1
#define Anum_pg_dist_background_job_state 2
#define Anum_pg_dist_background_job_job_type 3
#define Anum_pg_dist_background_job_description 4
#define Anum_pg_dist_background_job_started_at 5
#define Anum_pg_dist_background_job_finished_at 6
#endif /* CITUS_PG_DIST_BACKGROUND_JOB_H */

View File

@ -0,0 +1,29 @@
/*-------------------------------------------------------------------------
*
* pg_dist_background_task.h
* definition of the relation that holds the tasks metadata
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CITUS_PG_DIST_BACKGROUND_TASK_H
#define CITUS_PG_DIST_BACKGROUND_TASK_H
/* ----------------
* compiler constants for pg_dist_background_task
* ----------------
*/
#define Natts_pg_dist_background_task 9
#define Anum_pg_dist_background_task_job_id 1
#define Anum_pg_dist_background_task_task_id 2
#define Anum_pg_dist_background_task_owner 3
#define Anum_pg_dist_background_task_pid 4
#define Anum_pg_dist_background_task_status 5
#define Anum_pg_dist_background_task_command 6
#define Anum_pg_dist_background_task_retry_count 7
#define Anum_pg_dist_background_task_not_before 8
#define Anum_pg_dist_background_task_message 9
#endif /* CITUS_PG_DIST_BACKGROUND_TASK_H */

View File

@ -0,0 +1,35 @@
/*-------------------------------------------------------------------------
*
* pg_dist_background_task_depend.h
* definition of the relation that holds which tasks depend on each
* other.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CITUS_PG_DIST_BACKGROUND_TASK_DEPEND_H
#define CITUS_PG_DIST_BACKGROUND_TASK_DEPEND_H
typedef struct FormData_pg_dist_background_task_depend
{
int64 job_id;
int64 task_id;
int64 depends_on;
#ifdef CATALOG_VARLEN /* variable-length fields start here */
#endif
} FormData_pg_dist_background_task_depend;
typedef FormData_pg_dist_background_task_depend *Form_pg_dist_background_task_depend;
/* ----------------
* compiler constants for pg_dist_background_task_depend
* ----------------
*/
#define Natts_pg_dist_background_task_depend 3
#define Anum_pg_dist_background_task_depend_job_id 1
#define Anum_pg_dist_background_task_depend_task_id 2
#define Anum_pg_dist_background_task_depend_depends_on 3
#endif /* CITUS_PG_DIST_BACKGROUND_TASK_DEPEND_H */

View File

@ -42,7 +42,8 @@ typedef enum AdvisoryLocktagClass
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9,
ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID = 10,
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12,
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13,
ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK = 14
} AdvisoryLocktagClass;
/* CitusOperations has constants for citus operations */
@ -52,7 +53,8 @@ typedef enum CitusOperations
CITUS_NONBLOCKING_SPLIT = 1,
CITUS_CREATE_DISTRIBUTED_TABLE_CONCURRENTLY = 2,
CITUS_CREATE_COLOCATION_DEFAULT = 3,
CITUS_SHARD_MOVE = 4
CITUS_SHARD_MOVE = 4,
CITUS_BACKGROUND_TASK_MONITOR = 5
} CitusOperations;
/* reuse advisory lock, but with different, unused field 4 (4)*/
@ -129,6 +131,16 @@ typedef enum CitusOperations
(uint32) 0, \
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION)
/* reuse advisory lock, but with different, unused field 4 (14)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */
#define SET_LOCKTAG_BACKGROUND_TASK(tag, taskId) \
SET_LOCKTAG_ADVISORY(tag, \
MyDatabaseId, \
(uint32) ((taskId) >> 32), \
(uint32) (taskId), \
ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK)
/*
* DistLockConfigs are used to configure the locking behaviour of AcquireDistributedLockOnRelations
*/

View File

@ -13,6 +13,7 @@
/* GUC to configure deferred shard deletion */
extern int DeferShardDeleteInterval;
extern int BackgroundTaskQueueCheckInterval;
extern bool DeferShardDeleteOnMove;
extern double DesiredPercentFreeAfterMove;
extern bool CheckAvailableSpaceBeforeMove;

View File

@ -0,0 +1,218 @@
CREATE SCHEMA background_task_queue_monitor;
SET search_path TO background_task_queue_monitor;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 3536400;
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE results (a int);
-- simple job that inserts 1 into results to show that query runs
SELECT a FROM results WHERE a = 1; -- verify result is not in there
a
---------------------------------------------------------------------
(0 rows)
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ INSERT INTO background_task_queue_monitor.results VALUES ( 1 ); $job$) RETURNING task_id \gset
SELECT citus_job_wait(:job_id); -- wait for the job to be finished
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT a FROM results WHERE a = 1; -- verify result is there
a
---------------------------------------------------------------------
1
(1 row)
-- cancel a scheduled job
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancel a scheduled job') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset
SELECT citus_job_cancel(:job_id);
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id);
citus_job_wait
---------------------------------------------------------------------
(1 row)
-- verify we get an error when waiting for a job to reach a specific status while it is already in a different terminal status
SELECT citus_job_wait(:job_id, desired_status => 'finished');
ERROR: Job reached terminal state "cancelled" instead of desired state "finished"
-- show that the status has been cancelled
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
state | did_start
---------------------------------------------------------------------
cancelled | f
(1 row)
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
status | did_start
---------------------------------------------------------------------
cancelled | f
(1 row)
-- cancel a running job
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancelling a task after it started') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset
SELECT citus_job_wait(:job_id, desired_status => 'running');
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_job_cancel(:job_id);
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id);
citus_job_wait
---------------------------------------------------------------------
(1 row)
-- show that the status has been cancelled
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
state | did_start
---------------------------------------------------------------------
cancelled | t
(1 row)
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
status | did_start
---------------------------------------------------------------------
cancelled | t
(1 row)
-- test a failing task becomes runnable in the future again
-- we cannot fully test the backoff strategy currently as it is hard coded to take about 50 minutes.
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'failure test due to division by zero') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT 1/0; $job$) RETURNING task_id \gset
SELECT citus_job_wait(:job_id, desired_status => 'running');
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT pg_sleep(.1); -- make sure it has time to error after it started running
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT status, pid, retry_count, NOT(message = '') AS has_message, (not_before > now()) AS scheduled_into_the_future FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
status | pid | retry_count | has_message | scheduled_into_the_future
---------------------------------------------------------------------
runnable | | 1 | t | t
(1 row)
-- test cancelling a failed/retrying job
SELECT citus_job_cancel(:job_id);
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
state | did_start
---------------------------------------------------------------------
cancelled | t
(1 row)
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
status | did_start
---------------------------------------------------------------------
cancelled | t
(1 row)
-- test running two dependant tasks
TRUNCATE TABLE results;
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ INSERT INTO background_task_queue_monitor.results VALUES ( 5 ); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ UPDATE background_task_queue_monitor.results SET a = a * 7; $job$) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ UPDATE background_task_queue_monitor.results SET a = a + 13; $job$) RETURNING task_id AS task_id3 \gset
INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id2, :task_id1);
INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id3, :task_id2);
COMMIT;
SELECT citus_job_wait(:job_id); -- wait for the job to be finished
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT a FROM results;
a
---------------------------------------------------------------------
48
(1 row)
-- test running two dependant tasks, with a failing task that we cancel
TRUNCATE TABLE results;
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ INSERT INTO background_task_queue_monitor.results VALUES ( 5 ); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ SELECT 1/0; $job$) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ UPDATE background_task_queue_monitor.results SET a = a + 13; $job$) RETURNING task_id AS task_id3 \gset
INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id2, :task_id1);
INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id3, :task_id2);
COMMIT;
SELECT citus_job_wait(:job_id, desired_status => 'running'); -- wait for the job to be running, and possibly hitting a failure
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT pg_sleep(.1); -- improve chances of hitting the failure
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT citus_job_cancel(:job_id);
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id); -- wait for the job to be cancelled
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
state | did_start
---------------------------------------------------------------------
cancelled | t
(1 row)
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
status | did_start
---------------------------------------------------------------------
done | t
cancelled | t
cancelled | f
(3 rows)
SET client_min_messages TO WARNING;
DROP SCHEMA background_task_queue_monitor CASCADE;
ALTER SYSTEM RESET citus.background_task_queue_interval;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)

View File

@ -1138,6 +1138,8 @@ SELECT * FROM multi_extension.print_extension_changes();
table columnar.stripe |
| function citus_cleanup_orphaned_resources()
| function citus_internal_delete_partition_metadata(regclass) void
| function citus_job_cancel(bigint) void
| function citus_job_wait(bigint,citus_job_status) void
| function citus_locks() SETOF record
| function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void
| function create_distributed_table_concurrently(regclass,text,citus.distribution_type,text,integer) void
@ -1147,14 +1149,21 @@ SELECT * FROM multi_extension.print_extension_changes();
| function worker_split_copy(bigint,text,split_copy_info[]) void
| function worker_split_shard_release_dsm() void
| function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info
| sequence pg_dist_background_job_job_id_seq
| sequence pg_dist_background_task_task_id_seq
| sequence pg_dist_cleanup_recordid_seq
| sequence pg_dist_operationid_seq
| table pg_dist_background_job
| table pg_dist_background_task
| table pg_dist_background_task_depend
| table pg_dist_cleanup
| type citus_job_status
| type citus_task_status
| type replication_slot_info
| type split_copy_info
| type split_shard_info
| view citus_locks
(41 rows)
(50 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -51,6 +51,18 @@ SELECT nextval('pg_dist_cleanup_recordid_seq') = MAX(record_id)+1 FROM pg_dist_c
(1 row)
SELECT nextval('pg_dist_background_job_job_id_seq') > COALESCE(MAX(job_id), 0) FROM pg_dist_background_job;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT nextval('pg_dist_background_task_task_id_seq') > COALESCE(MAX(task_id), 0) FROM pg_dist_background_task;
?column?
---------------------------------------------------------------------
t
(1 row)
-- If this query gives output it means we've added a new sequence that should
-- possibly be restored after upgrades.
SELECT sequence_name FROM information_schema.sequences
@ -63,7 +75,9 @@ SELECT sequence_name FROM information_schema.sequences
'pg_dist_node_nodeid_seq',
'pg_dist_colocationid_seq',
'pg_dist_operationid_seq',
'pg_dist_cleanup_recordid_seq'
'pg_dist_cleanup_recordid_seq',
'pg_dist_background_job_job_id_seq',
'pg_dist_background_task_task_id_seq'
);
sequence_name
---------------------------------------------------------------------

View File

@ -74,6 +74,8 @@ ORDER BY 1;
function citus_internal_update_relation_colocation(oid,integer)
function citus_is_coordinator()
function citus_isolation_test_session_is_blocked(integer,integer[])
function citus_job_cancel(bigint)
function citus_job_wait(bigint, citus_job_status)
function citus_json_concatenate(json,json)
function citus_json_concatenate_final(json)
function citus_jsonb_concatenate(jsonb,jsonb)
@ -236,6 +238,8 @@ ORDER BY 1;
function worker_split_shard_replication_setup(split_shard_info[])
schema citus
schema citus_internal
sequence pg_dist_background_job_job_id_seq
sequence pg_dist_background_task_task_id_seq
sequence pg_dist_cleanup_recordid_seq
sequence pg_dist_colocationid_seq
sequence pg_dist_groupid_seq
@ -244,6 +248,9 @@ ORDER BY 1;
sequence pg_dist_placement_placementid_seq
sequence pg_dist_shardid_seq
table pg_dist_authinfo
table pg_dist_background_job
table pg_dist_background_task
table pg_dist_background_task_depend
table pg_dist_cleanup
table pg_dist_colocation
table pg_dist_local_group
@ -259,6 +266,8 @@ ORDER BY 1;
type citus.distribution_type
type citus.shard_transfer_mode
type citus_copy_format
type citus_job_status
type citus_task_status
type noderole
type replication_slot_info
type split_copy_info
@ -274,5 +283,5 @@ ORDER BY 1;
view citus_stat_statements
view pg_dist_shard_placement
view time_partitions
(266 rows)
(275 rows)

View File

@ -98,6 +98,7 @@ test: issue_5248 issue_5099
test: object_propagation_debug
test: undistribute_table
test: run_command_on_all_nodes
test: background_task_queue_monitor
# ---------

View File

@ -0,0 +1,98 @@
CREATE SCHEMA background_task_queue_monitor;
SET search_path TO background_task_queue_monitor;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 3536400;
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
SELECT pg_reload_conf();
CREATE TABLE results (a int);
-- simple job that inserts 1 into results to show that query runs
SELECT a FROM results WHERE a = 1; -- verify result is not in there
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ INSERT INTO background_task_queue_monitor.results VALUES ( 1 ); $job$) RETURNING task_id \gset
SELECT citus_job_wait(:job_id); -- wait for the job to be finished
SELECT a FROM results WHERE a = 1; -- verify result is there
-- cancel a scheduled job
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancel a scheduled job') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset
SELECT citus_job_cancel(:job_id);
SELECT citus_job_wait(:job_id);
-- verify we get an error when waiting for a job to reach a specific status while it is already in a different terminal status
SELECT citus_job_wait(:job_id, desired_status => 'finished');
-- show that the status has been cancelled
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
-- cancel a running job
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancelling a task after it started') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset
SELECT citus_job_wait(:job_id, desired_status => 'running');
SELECT citus_job_cancel(:job_id);
SELECT citus_job_wait(:job_id);
-- show that the status has been cancelled
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
-- test a failing task becomes runnable in the future again
-- we cannot fully test the backoff strategy currently as it is hard coded to take about 50 minutes.
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'failure test due to division by zero') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT 1/0; $job$) RETURNING task_id \gset
SELECT citus_job_wait(:job_id, desired_status => 'running');
SELECT pg_sleep(.1); -- make sure it has time to error after it started running
SELECT status, pid, retry_count, NOT(message = '') AS has_message, (not_before > now()) AS scheduled_into_the_future FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
-- test cancelling a failed/retrying job
SELECT citus_job_cancel(:job_id);
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
-- test running two dependant tasks
TRUNCATE TABLE results;
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ INSERT INTO background_task_queue_monitor.results VALUES ( 5 ); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ UPDATE background_task_queue_monitor.results SET a = a * 7; $job$) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ UPDATE background_task_queue_monitor.results SET a = a + 13; $job$) RETURNING task_id AS task_id3 \gset
INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id2, :task_id1);
INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id3, :task_id2);
COMMIT;
SELECT citus_job_wait(:job_id); -- wait for the job to be finished
SELECT a FROM results;
-- test running two dependant tasks, with a failing task that we cancel
TRUNCATE TABLE results;
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ INSERT INTO background_task_queue_monitor.results VALUES ( 5 ); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ SELECT 1/0; $job$) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ UPDATE background_task_queue_monitor.results SET a = a + 13; $job$) RETURNING task_id AS task_id3 \gset
INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id2, :task_id1);
INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id3, :task_id2);
COMMIT;
SELECT citus_job_wait(:job_id, desired_status => 'running'); -- wait for the job to be running, and possibly hitting a failure
SELECT pg_sleep(.1); -- improve chances of hitting the failure
SELECT citus_job_cancel(:job_id);
SELECT citus_job_wait(:job_id); -- wait for the job to be cancelled
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
SET client_min_messages TO WARNING;
DROP SCHEMA background_task_queue_monitor CASCADE;
ALTER SYSTEM RESET citus.background_task_queue_interval;
SELECT pg_reload_conf();

View File

@ -10,6 +10,8 @@ SELECT nextval('pg_dist_node_nodeid_seq') = MAX(nodeid)+1 FROM pg_dist_node;
SELECT nextval('pg_dist_colocationid_seq') = MAX(colocationid)+1 FROM pg_dist_colocation;
SELECT nextval('pg_dist_operationid_seq') = MAX(operation_id)+1 FROM pg_dist_cleanup;
SELECT nextval('pg_dist_cleanup_recordid_seq') = MAX(record_id)+1 FROM pg_dist_cleanup;
SELECT nextval('pg_dist_background_job_job_id_seq') > COALESCE(MAX(job_id), 0) FROM pg_dist_background_job;
SELECT nextval('pg_dist_background_task_task_id_seq') > COALESCE(MAX(task_id), 0) FROM pg_dist_background_task;
-- If this query gives output it means we've added a new sequence that should
-- possibly be restored after upgrades.
@ -23,7 +25,9 @@ SELECT sequence_name FROM information_schema.sequences
'pg_dist_node_nodeid_seq',
'pg_dist_colocationid_seq',
'pg_dist_operationid_seq',
'pg_dist_cleanup_recordid_seq'
'pg_dist_cleanup_recordid_seq',
'pg_dist_background_job_job_id_seq',
'pg_dist_background_task_task_id_seq'
);
SELECT logicalrelid FROM pg_dist_partition