diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index be2685c53..cffa666fc 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -141,6 +141,32 @@ typedef struct MetadataCacheData bool extensionLoaded; Oid distShardRelationId; Oid distPlacementRelationId; + Oid distBackgroundJobRelationId; + Oid distBackgroundJobPKeyIndexId; + Oid distBackgroundJobJobIdSequenceId; + Oid distBackgroundTaskRelationId; + Oid distBackgroundTaskPKeyIndexId; + Oid distBackgroundTaskJobIdTaskIdIndexId; + Oid distBackgroundTaskStatusTaskIdIndexId; + Oid distBackgroundTaskTaskIdSequenceId; + Oid distBackgroundTaskDependRelationId; + Oid distBackgroundTaskDependTaskIdIndexId; + Oid distBackgroundTaskDependDependsOnIndexId; + Oid citusJobStatusScheduledId; + Oid citusJobStatusRunningId; + Oid citusJobStatusCancellingId; + Oid citusJobStatusFinishedId; + Oid citusJobStatusCancelledId; + Oid citusJobStatusFailedId; + Oid citusJobStatusFailingId; + Oid citusTaskStatusBlockedId; + Oid citusTaskStatusRunnableId; + Oid citusTaskStatusRunningId; + Oid citusTaskStatusDoneId; + Oid citusTaskStatusErrorId; + Oid citusTaskStatusUnscheduledId; + Oid citusTaskStatusCancelledId; + Oid citusTaskStatusCancellingId; Oid distRebalanceStrategyRelationId; Oid distNodeRelationId; Oid distNodeNodeIdIndexId; @@ -2492,6 +2518,116 @@ DistLocalGroupIdRelationId(void) } +Oid +DistBackgroundJobRelationId(void) +{ + CachedRelationLookup("pg_dist_background_job", + &MetadataCache.distBackgroundJobRelationId); + + return MetadataCache.distBackgroundJobRelationId; +} + + +Oid +DistBackgroundJobPKeyIndexId(void) +{ + CachedRelationLookup("pg_dist_background_job_pkey", + &MetadataCache.distBackgroundJobPKeyIndexId); + + return MetadataCache.distBackgroundJobPKeyIndexId; +} + + +Oid +DistBackgroundJobJobIdSequenceId(void) +{ + CachedRelationLookup("pg_dist_background_job_job_id_seq", + &MetadataCache.distBackgroundJobJobIdSequenceId); + + return MetadataCache.distBackgroundJobJobIdSequenceId; +} + + +Oid +DistBackgroundTaskRelationId(void) +{ + CachedRelationLookup("pg_dist_background_task", + &MetadataCache.distBackgroundTaskRelationId); + + return MetadataCache.distBackgroundTaskRelationId; +} + + +Oid +DistBackgroundTaskPKeyIndexId(void) +{ + CachedRelationLookup("pg_dist_background_task_pkey", + &MetadataCache.distBackgroundTaskPKeyIndexId); + + return MetadataCache.distBackgroundTaskPKeyIndexId; +} + + +Oid +DistBackgroundTaskJobIdTaskIdIndexId(void) +{ + CachedRelationLookup("pg_dist_background_task_job_id_task_id", + &MetadataCache.distBackgroundTaskJobIdTaskIdIndexId); + + return MetadataCache.distBackgroundTaskJobIdTaskIdIndexId; +} + + +Oid +DistBackgroundTaskStatusTaskIdIndexId(void) +{ + CachedRelationLookup("pg_dist_background_task_status_task_id_index", + &MetadataCache.distBackgroundTaskStatusTaskIdIndexId); + + return MetadataCache.distBackgroundTaskStatusTaskIdIndexId; +} + + +Oid +DistBackgroundTaskTaskIdSequenceId(void) +{ + CachedRelationLookup("pg_dist_background_task_task_id_seq", + &MetadataCache.distBackgroundTaskTaskIdSequenceId); + + return MetadataCache.distBackgroundTaskTaskIdSequenceId; +} + + +Oid +DistBackgroundTaskDependRelationId(void) +{ + CachedRelationLookup("pg_dist_background_task_depend", + &MetadataCache.distBackgroundTaskDependRelationId); + + return MetadataCache.distBackgroundTaskDependRelationId; +} + + +Oid +DistBackgroundTaskDependTaskIdIndexId(void) +{ + CachedRelationLookup("pg_dist_background_task_depend_task_id", + &MetadataCache.distBackgroundTaskDependTaskIdIndexId); + + return MetadataCache.distBackgroundTaskDependTaskIdIndexId; +} + + +Oid +DistBackgroundTaskDependDependsOnIndexId(void) +{ + CachedRelationLookup("pg_dist_background_task_depend_depends_on", + &MetadataCache.distBackgroundTaskDependDependsOnIndexId); + + return MetadataCache.distBackgroundTaskDependDependsOnIndexId; +} + + /* return oid of pg_dist_rebalance_strategy relation */ Oid DistRebalanceStrategyRelationId(void) @@ -3236,6 +3372,201 @@ SecondaryNodeRoleId(void) } +Oid +CitusJobStatusScheduledId(void) +{ + if (!MetadataCache.citusJobStatusScheduledId) + { + MetadataCache.citusJobStatusScheduledId = + LookupStringEnumValueId("citus_job_status", "scheduled"); + } + + return MetadataCache.citusJobStatusScheduledId; +} + + +Oid +CitusJobStatusRunningId(void) +{ + if (!MetadataCache.citusJobStatusRunningId) + { + MetadataCache.citusJobStatusRunningId = + LookupStringEnumValueId("citus_job_status", "running"); + } + + return MetadataCache.citusJobStatusRunningId; +} + + +Oid +CitusJobStatusCancellingId(void) +{ + if (!MetadataCache.citusJobStatusCancellingId) + { + MetadataCache.citusJobStatusCancellingId = + LookupStringEnumValueId("citus_job_status", "cancelling"); + } + + return MetadataCache.citusJobStatusCancellingId; +} + + +Oid +CitusJobStatusFinishedId(void) +{ + if (!MetadataCache.citusJobStatusFinishedId) + { + MetadataCache.citusJobStatusFinishedId = + LookupStringEnumValueId("citus_job_status", "finished"); + } + + return MetadataCache.citusJobStatusFinishedId; +} + + +Oid +CitusJobStatusCancelledId(void) +{ + if (!MetadataCache.citusJobStatusCancelledId) + { + MetadataCache.citusJobStatusCancelledId = + LookupStringEnumValueId("citus_job_status", "cancelled"); + } + + return MetadataCache.citusJobStatusCancelledId; +} + + +Oid +CitusJobStatusFailedId(void) +{ + if (!MetadataCache.citusJobStatusFailedId) + { + MetadataCache.citusJobStatusFailedId = + LookupStringEnumValueId("citus_job_status", "failed"); + } + + return MetadataCache.citusJobStatusFailedId; +} + + +Oid +CitusJobStatusFailingId(void) +{ + if (!MetadataCache.citusJobStatusFailingId) + { + MetadataCache.citusJobStatusFailingId = + LookupStringEnumValueId("citus_job_status", "failing"); + } + + return MetadataCache.citusJobStatusFailingId; +} + + +Oid +CitusTaskStatusBlockedId(void) +{ + if (!MetadataCache.citusTaskStatusBlockedId) + { + MetadataCache.citusTaskStatusBlockedId = + LookupStringEnumValueId("citus_task_status", "blocked"); + } + + return MetadataCache.citusTaskStatusBlockedId; +} + + +Oid +CitusTaskStatusCancelledId(void) +{ + if (!MetadataCache.citusTaskStatusCancelledId) + { + MetadataCache.citusTaskStatusCancelledId = + LookupStringEnumValueId("citus_task_status", "cancelled"); + } + + return MetadataCache.citusTaskStatusCancelledId; +} + + +Oid +CitusTaskStatusCancellingId(void) +{ + if (!MetadataCache.citusTaskStatusCancellingId) + { + MetadataCache.citusTaskStatusCancellingId = + LookupStringEnumValueId("citus_task_status", "cancelling"); + } + + return MetadataCache.citusTaskStatusCancellingId; +} + + +Oid +CitusTaskStatusRunnableId(void) +{ + if (!MetadataCache.citusTaskStatusRunnableId) + { + MetadataCache.citusTaskStatusRunnableId = + LookupStringEnumValueId("citus_task_status", "runnable"); + } + + return MetadataCache.citusTaskStatusRunnableId; +} + + +Oid +CitusTaskStatusRunningId(void) +{ + if (!MetadataCache.citusTaskStatusRunningId) + { + MetadataCache.citusTaskStatusRunningId = + LookupStringEnumValueId("citus_task_status", "running"); + } + + return MetadataCache.citusTaskStatusRunningId; +} + + +Oid +CitusTaskStatusDoneId(void) +{ + if (!MetadataCache.citusTaskStatusDoneId) + { + MetadataCache.citusTaskStatusDoneId = + LookupStringEnumValueId("citus_task_status", "done"); + } + + return MetadataCache.citusTaskStatusDoneId; +} + + +Oid +CitusTaskStatusErrorId(void) +{ + if (!MetadataCache.citusTaskStatusErrorId) + { + MetadataCache.citusTaskStatusErrorId = + LookupStringEnumValueId("citus_task_status", "error"); + } + + return MetadataCache.citusTaskStatusErrorId; +} + + +Oid +CitusTaskStatusUnscheduledId(void) +{ + if (!MetadataCache.citusTaskStatusUnscheduledId) + { + MetadataCache.citusTaskStatusUnscheduledId = + LookupStringEnumValueId("citus_task_status", "unscheduled"); + } + + return MetadataCache.citusTaskStatusUnscheduledId; +} + + /* * citus_dist_partition_cache_invalidate is a trigger function that performs * relcache invalidations when the contents of pg_dist_partition are changed diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index ee7be267e..8c57357c9 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -25,11 +25,13 @@ #include "access/xact.h" #include "catalog/dependency.h" #include "catalog/indexing.h" +#include "catalog/pg_authid.h" #include "catalog/pg_constraint.h" #include "catalog/pg_extension.h" #include "catalog/pg_namespace.h" #include "catalog/pg_type.h" #include "commands/extension.h" +#include "commands/sequence.h" #include "distributed/colocation_utils.h" #include "distributed/connection_management.h" #include "distributed/citus_nodes.h" @@ -44,6 +46,9 @@ #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" +#include "distributed/pg_dist_background_job.h" +#include "distributed/pg_dist_background_task.h" +#include "distributed/pg_dist_backrgound_task_depend.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" @@ -59,10 +64,12 @@ #include "nodes/makefuncs.h" #include "parser/scansup.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/datum.h" #include "utils/fmgroids.h" +#include "utils/fmgrprotos.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -99,6 +106,21 @@ static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInt static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes, uint64 totalBytes); static bool GetLocalDiskSpaceStats(uint64 *availableBytes, uint64 *totalBytes); +static BackgroundTask * DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, + HeapTuple taskTuple); + +static bool SetFieldValue(int attno, Datum values[], bool isnull[], bool replace[], + Datum newValue); +static bool SetFieldText(int attno, Datum values[], bool isnull[], bool replace[], + const char *newValue); +static bool SetFieldNull(int attno, Datum values[], bool isnull[], bool replace[]); + +#define InitFieldValue(attno, values, isnull, initValue) \ + (void) SetFieldValue((attno), (values), (isnull), NULL, (initValue)) +#define InitFieldText(attno, values, isnull, initValue) \ + (void) SetFieldText((attno), (values), (isnull), NULL, (initValue)) +#define InitFieldNull(attno, values, isnull) \ + (void) SetFieldNull((attno), (values), (isnull), NULL) /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(citus_local_disk_space_stats); @@ -2357,3 +2379,1748 @@ IsForeignTable(Oid relationId) { return get_rel_relkind(relationId) == RELKIND_FOREIGN_TABLE; } + + +/* + * HasRunnableBackgroundTask looks in the catalog if there are any tasks that can be run. + * For a task to be able to run the following conditions apply: + * - Task is in Running state. This could happen when a Background Tasks Queue Monitor + * had crashed or is otherwise restarted. To recover from such a failure tasks in + * Running state are deeed Runnable. + * - Task is in Runnable state with either _no_ value set in not_before, or a value that + * has currently passed. If the not_before field is set to a time in the future the + * task is currently not ready to be started. + */ +bool +HasRunnableBackgroundTask() +{ + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTaskRelationId(), AccessShareLock); + + /* find any job in states listed here */ + BackgroundTaskStatus taskStatus[] = { + BACKGROUND_TASK_STATUS_RUNNABLE, + BACKGROUND_TASK_STATUS_RUNNING + }; + + bool hasScheduledTask = false; + for (int i = 0; !hasScheduledTask && i < lengthof(taskStatus); i++) + { + const int scanKeyCount = 1; + ScanKeyData scanKey[1] = { 0 }; + const bool indexOK = true; + + /* pg_dist_background_task.status == taskStatus[i] */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_task_status, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(BackgroundTaskStatusOid(taskStatus[i]))); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasks, + DistBackgroundTaskStatusTaskIdIndexId(), + indexOK, NULL, scanKeyCount, + scanKey); + + HeapTuple taskTuple = NULL; + while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) + { + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); + BackgroundTask *task = DeformBackgroundTaskHeapTuple(tupleDescriptor, + taskTuple); + + if (task->not_before && *(task->not_before) > GetCurrentTimestamp()) + { + continue; + } + + hasScheduledTask = true; + break; + } + + systable_endscan(scanDescriptor); + } + + table_close(pgDistBackgroundTasks, NoLock); + + return hasScheduledTask; +} + + +/* + * BackgroundJobStatusByOid returns the C enum representation of a BackgroundJobsStatus + * based on the Oid of the SQL enum value. + */ +BackgroundJobStatus +BackgroundJobStatusByOid(Oid enumOid) +{ + if (enumOid == CitusJobStatusScheduledId()) + { + return BACKGROUND_JOB_STATUS_SCHEDULED; + } + else if (enumOid == CitusJobStatusRunningId()) + { + return BACKGROUND_JOB_STATUS_RUNNING; + } + else if (enumOid == CitusJobStatusFinishedId()) + { + return BACKGROUND_JOB_STATUS_FINISHED; + } + else if (enumOid == CitusJobStatusCancelledId()) + { + return BACKGROUND_JOB_STATUS_CANCELLED; + } + else if (enumOid == CitusJobStatusFailingId()) + { + return BACKGROUND_JOB_STATUS_FAILING; + } + else if (enumOid == CitusJobStatusFailedId()) + { + return BACKGROUND_JOB_STATUS_FAILED; + } + else if (enumOid == CitusJobStatusCancellingId()) + { + return BACKGROUND_JOB_STATUS_CANCELLING; + } + elog(ERROR, "unknown enum value for citus_job_status"); +} + + +/* + * BackgroundTaskStatusByOid returns the C enum representation of a BackgroundTaskStatus + * based on the Oid of the SQL enum value. + */ +BackgroundTaskStatus +BackgroundTaskStatusByOid(Oid enumOid) +{ + if (enumOid == CitusTaskStatusDoneId()) + { + return BACKGROUND_TASK_STATUS_DONE; + } + else if (enumOid == CitusTaskStatusRunnableId()) + { + return BACKGROUND_TASK_STATUS_RUNNABLE; + } + else if (enumOid == CitusTaskStatusRunningId()) + { + return BACKGROUND_TASK_STATUS_RUNNING; + } + else if (enumOid == CitusTaskStatusErrorId()) + { + return BACKGROUND_TASK_STATUS_ERROR; + } + else if (enumOid == CitusTaskStatusUnscheduledId()) + { + return BACKGROUND_TASK_STATUS_UNSCHEDULED; + } + else if (enumOid == CitusTaskStatusBlockedId()) + { + return BACKGROUND_TASK_STATUS_BLOCKED; + } + else if (enumOid == CitusTaskStatusCancelledId()) + { + return BACKGROUND_TASK_STATUS_CANCELLED; + } + else if (enumOid == CitusTaskStatusCancellingId()) + { + return BACKGROUND_TASK_STATUS_CANCELLING; + } + ereport(ERROR, (errmsg("unknown enum value for citus_task_status"))); +} + + +/* + * IsBackgroundJobStatusTerminal is a predicate returning if the BackgroundJobStatus + * passed is a terminal state of the Background Job state machine. + * + * For a Job to be in it's terminal state, all tasks from that job should also be in their + * terminal state. + */ +bool +IsBackgroundJobStatusTerminal(BackgroundJobStatus status) +{ + switch (status) + { + case BACKGROUND_JOB_STATUS_CANCELLED: + case BACKGROUND_JOB_STATUS_FAILED: + case BACKGROUND_JOB_STATUS_FINISHED: + { + return true; + } + + case BACKGROUND_JOB_STATUS_CANCELLING: + case BACKGROUND_JOB_STATUS_FAILING: + case BACKGROUND_JOB_STATUS_RUNNING: + case BACKGROUND_JOB_STATUS_SCHEDULED: + { + return false; + } + + /* no default to make sure we explicitly add every state here */ + } + elog(ERROR, "unknown BackgroundJobStatus"); +} + + +/* + * IsBackgroundTaskStatusTerminal is a predicate returning if the BackgroundTaskStatus + * passed is a terminal state of the Background Task state machine. + */ +bool +IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status) +{ + switch (status) + { + case BACKGROUND_TASK_STATUS_CANCELLED: + case BACKGROUND_TASK_STATUS_DONE: + case BACKGROUND_TASK_STATUS_ERROR: + case BACKGROUND_TASK_STATUS_UNSCHEDULED: + { + return true; + } + + case BACKGROUND_TASK_STATUS_BLOCKED: + case BACKGROUND_TASK_STATUS_CANCELLING: + case BACKGROUND_TASK_STATUS_RUNNABLE: + case BACKGROUND_TASK_STATUS_RUNNING: + { + return false; + } + + /* no default to make sure we explicitly add every state here */ + } + elog(ERROR, "unknown BackgroundTaskStatus"); +} + + +/* + * BackgroundJobStatusOid returns the Oid corresponding to SQL enum value corresponding to + * the BackgroundJobStatus. + */ +Oid +BackgroundJobStatusOid(BackgroundJobStatus status) +{ + switch (status) + { + case BACKGROUND_JOB_STATUS_SCHEDULED: + { + return CitusJobStatusScheduledId(); + } + + case BACKGROUND_JOB_STATUS_RUNNING: + { + return CitusJobStatusRunningId(); + } + + case BACKGROUND_JOB_STATUS_CANCELLING: + { + return CitusJobStatusCancellingId(); + } + + case BACKGROUND_JOB_STATUS_FINISHED: + { + return CitusJobStatusFinishedId(); + } + + case BACKGROUND_JOB_STATUS_CANCELLED: + { + return CitusJobStatusCancelledId(); + } + + case BACKGROUND_JOB_STATUS_FAILING: + { + return CitusJobStatusFailingId(); + } + + case BACKGROUND_JOB_STATUS_FAILED: + { + return CitusJobStatusFailedId(); + } + } + + elog(ERROR, "unknown BackgroundJobStatus"); +} + + +/* + * BackgroundTaskStatusOid returns the Oid corresponding to SQL enum value corresponding to + * the BackgroundTaskStatus. + */ +Oid +BackgroundTaskStatusOid(BackgroundTaskStatus status) +{ + switch (status) + { + case BACKGROUND_TASK_STATUS_BLOCKED: + { + return CitusTaskStatusBlockedId(); + } + + case BACKGROUND_TASK_STATUS_RUNNABLE: + { + return CitusTaskStatusRunnableId(); + } + + case BACKGROUND_TASK_STATUS_RUNNING: + { + return CitusTaskStatusRunningId(); + } + + case BACKGROUND_TASK_STATUS_DONE: + { + return CitusTaskStatusDoneId(); + } + + case BACKGROUND_TASK_STATUS_ERROR: + { + return CitusTaskStatusErrorId(); + } + + case BACKGROUND_TASK_STATUS_UNSCHEDULED: + { + return CitusTaskStatusUnscheduledId(); + } + + case BACKGROUND_TASK_STATUS_CANCELLED: + { + return CitusTaskStatusCancelledId(); + } + + case BACKGROUND_TASK_STATUS_CANCELLING: + { + return CitusTaskStatusCancellingId(); + } + } + + elog(ERROR, "unknown BackgroundTaskStatus"); + return InvalidOid; +} + + +/* + * GetNextBackgroundJobsJobId reads and increments the SQL sequence associated with the + * background job's job_id. After incrementing the counter it returns the counter back to + * the caller. + * + * The return value is typically used to insert new jobs into the catalog. + */ +static int64 +GetNextBackgroundJobsJobId(void) +{ + return DatumGetInt64(nextval_internal(DistBackgroundJobJobIdSequenceId(), false)); +} + + +/* + * GetNextBackgroundTaskTaskId reads and increments the SQL sequence associated with the + * background job tasks' task_id. After incrementing the counter it returns the counter + * back to the caller. + * + * The return value is typically used to insert new tasks into the catalog. + */ +static int64 +GetNextBackgroundTaskTaskId(void) +{ + return DatumGetInt64(nextval_internal(DistBackgroundTaskTaskIdSequenceId(), false)); +} + + +/* + * CreateBackgroundJob is a helper function to insert a new Background Job into Citus' + * catalog. After inserting the new job's metadataa into the catalog it returns the job_id + * assigned to the new job. This is typically used to associate new tasks with the newly + * created job. + */ +int64 +CreateBackgroundJob(const char *jobType, const char *description) +{ + Relation pgDistBackgroundJobs = + table_open(DistBackgroundJobRelationId(), RowExclusiveLock); + + /* insert new job */ + Datum values[Natts_pg_dist_background_job] = { 0 }; + bool isnull[Natts_pg_dist_background_job] = { 0 }; + memset(isnull, true, sizeof(isnull)); + + int64 jobId = GetNextBackgroundJobsJobId(); + + InitFieldValue(Anum_pg_dist_background_job_job_id, values, isnull, + Int64GetDatum(jobId)); + + InitFieldValue(Anum_pg_dist_background_job_state, values, isnull, + ObjectIdGetDatum(CitusJobStatusScheduledId())); + + if (jobType) + { + NameData jobTypeName = { 0 }; + namestrcpy(&jobTypeName, jobType); + InitFieldValue(Anum_pg_dist_background_job_job_type, values, isnull, + NameGetDatum(&jobTypeName)); + } + + if (description) + { + InitFieldText(Anum_pg_dist_background_job_description, values, isnull, + description); + } + + HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundJobs), + values, isnull); + CatalogTupleInsert(pgDistBackgroundJobs, newTuple); + + CommandCounterIncrement(); + + table_close(pgDistBackgroundJobs, NoLock); + + return jobId; +} + + +/* + * ScheduleBackgroundTask creates a new background task to be executed in the background. + * + * The new task is associated with an existing job based on it's id. + * + * Optionally the new task can depend on separate tasks associated with the same job. When + * a new task is created with dependencies on previous tasks we assume this task is + * blocked on its depending tasks. + */ +BackgroundTask * +ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskCount, + int64 dependingTaskIds[]) +{ + BackgroundTask *task = NULL; + + Relation pgDistBackgroundJob = + table_open(DistBackgroundJobRelationId(), ExclusiveLock); + Relation pgDistBackgroundTask = + table_open(DistBackgroundTaskRelationId(), ExclusiveLock); + Relation pgDistbackgroundTasksDepend = NULL; + if (dependingTaskCount > 0) + { + pgDistbackgroundTasksDepend = + table_open(DistBackgroundTaskDependRelationId(), ExclusiveLock); + } + + /* 1. verify job exist */ + { + ScanKeyData scanKey[1] = { 0 }; + bool indexOK = true; + + /* pg_dist_background_job.job_id == $jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_job_job_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId)); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundJob, + DistBackgroundJobPKeyIndexId(), + indexOK, NULL, lengthof(scanKey), scanKey); + + HeapTuple jobTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(jobTuple)) + { + ereport(ERROR, (errmsg("job for newly created task does not exist."))); + } + + systable_endscan(scanDescriptor); + } + + /* 2. insert new task */ + { + Datum values[Natts_pg_dist_background_task] = { 0 }; + bool nulls[Natts_pg_dist_background_task] = { 0 }; + + memset(nulls, true, sizeof(nulls)); + + int64 taskId = GetNextBackgroundTaskTaskId(); + + values[Anum_pg_dist_background_task_job_id - 1] = Int64GetDatum(jobId); + nulls[Anum_pg_dist_background_task_job_id - 1] = false; + + values[Anum_pg_dist_background_task_task_id - 1] = Int64GetDatum(taskId); + nulls[Anum_pg_dist_background_task_task_id - 1] = false; + + values[Anum_pg_dist_background_task_owner - 1] = ObjectIdGetDatum(owner); + nulls[Anum_pg_dist_background_task_owner - 1] = false; + + Oid statusOid = InvalidOid; + if (dependingTaskCount == 0) + { + statusOid = CitusTaskStatusRunnableId(); + } + else + { + statusOid = CitusTaskStatusBlockedId(); + } + values[Anum_pg_dist_background_task_status - 1] = ObjectIdGetDatum(statusOid); + nulls[Anum_pg_dist_background_task_status - 1] = false; + + values[Anum_pg_dist_background_task_command - 1] = CStringGetTextDatum(command); + nulls[Anum_pg_dist_background_task_command - 1] = false; + + values[Anum_pg_dist_background_task_message - 1] = CStringGetTextDatum(""); + nulls[Anum_pg_dist_background_task_message - 1] = false; + + HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask), + values, nulls); + CatalogTupleInsert(pgDistBackgroundTask, newTuple); + + task = palloc0(sizeof(BackgroundTask)); + task->taskid = taskId; + task->status = BACKGROUND_TASK_STATUS_RUNNABLE; + task->command = pstrdup(command); + } + + /* 3. insert dependencies into catalog */ + { + for (int i = 0; i < dependingTaskCount; i++) + { + /* 3.1 after verifying the task exists for this job */ + { + ScanKeyData scanKey[2] = { 0 }; + bool indexOK = true; + + /* pg_dist_background_task.job_id == $jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_task_job_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId)); + + /* pg_dist_background_task.task_id == $taskId */ + ScanKeyInit(&scanKey[1], Anum_pg_dist_background_task_task_id, + BTEqualStrategyNumber, F_INT8EQ, + Int64GetDatum(dependingTaskIds[i])); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTask, + DistBackgroundTaskJobIdTaskIdIndexId(), + indexOK, NULL, lengthof(scanKey), scanKey); + + HeapTuple taskTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(taskTuple)) + { + ereport(ERROR, (errmsg("depending task for newly scheduled task does " + "not exist"))); + } + + systable_endscan(scanDescriptor); + } + + Assert(pgDistbackgroundTasksDepend != NULL); + + Datum values[Natts_pg_dist_background_task_depend] = { 0 }; + bool nulls[Natts_pg_dist_background_task_depend] = { 0 }; + memset(nulls, true, sizeof(nulls)); + + values[Anum_pg_dist_background_task_depend_job_id - 1] = + Int64GetDatum(jobId); + nulls[Anum_pg_dist_background_task_depend_job_id - 1] = false; + + values[Anum_pg_dist_background_task_depend_task_id - 1] = + Int64GetDatum(task->taskid); + nulls[Anum_pg_dist_background_task_depend_task_id - 1] = false; + + values[Anum_pg_dist_background_task_depend_depends_on - 1] = + Int64GetDatum(dependingTaskIds[i]); + nulls[Anum_pg_dist_background_task_depend_depends_on - 1] = false; + + HeapTuple newTuple = heap_form_tuple( + RelationGetDescr(pgDistbackgroundTasksDepend), values, nulls); + CatalogTupleInsert(pgDistbackgroundTasksDepend, newTuple); + } + } + + if (pgDistbackgroundTasksDepend) + { + table_close(pgDistbackgroundTasksDepend, NoLock); + } + table_close(pgDistBackgroundTask, NoLock); + table_close(pgDistBackgroundJob, NoLock); + + CommandCounterIncrement(); + + return task; +} + + +/* + * ResetRunningBackgroundTasks finds all tasks currently in Running state and resets their + * state back to runnable. + * + * While marking running tasks as runnable we check if the task might still be locked and + * the pid is managed by our current postmaster. If both are the case we terminate the + * backend. This will make sure that if a task was still running after a monitor crash or + * restart we stop the executor before we start a new one. + * + * Any pid associated with the running tasks will be cleared back to the NULL value. + */ +void +ResetRunningBackgroundTasks(void) +{ + const int scanKeyCount = 1; + ScanKeyData scanKey[1]; + const bool indexOK = true; + + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTaskRelationId(), ExclusiveLock); + + /* pg_dist_background_task.status == 'running' */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_task_status, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(CitusTaskStatusRunningId())); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasks, + DistBackgroundTaskStatusTaskIdIndexId(), + indexOK, NULL, scanKeyCount, + scanKey); + + HeapTuple taskTuple = NULL; + List *taskIdsToWait = NIL; + while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) + { + Datum values[Natts_pg_dist_background_task] = { 0 }; + bool isnull[Natts_pg_dist_background_task] = { 0 }; + bool replace[Natts_pg_dist_background_task] = { 0 }; + + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); + heap_deform_tuple(taskTuple, tupleDescriptor, values, isnull); + + values[Anum_pg_dist_background_task_status - 1] = + ObjectIdGetDatum(CitusTaskStatusRunnableId()); + isnull[Anum_pg_dist_background_task_status - 1] = false; + replace[Anum_pg_dist_background_task_status - 1] = true; + + /* if there is a pid we need to signal the backend to stop */ + if (!isnull[Anum_pg_dist_background_task_pid - 1]) + { + /* + * Before signalling the pid we check if the task lock is held, otherwise we + * might cancel an arbitrary postgres backend + */ + + int64 taskId = + DatumGetInt64(values[Anum_pg_dist_background_task_task_id - 1]); + + /* No need to release lock, will get unlocked once our changes commit */ + LOCKTAG locktag = { 0 }; + SET_LOCKTAG_BACKGROUND_TASK(locktag, taskId); + const bool sessionLock = false; + const bool dontWait = true; + LockAcquireResult locked = LockAcquire(&locktag, AccessExclusiveLock, + sessionLock, dontWait); + if (locked == LOCKACQUIRE_NOT_AVAIL) + { + /* + * There is still an executor holding the lock, needs a SIGTERM. + */ + Datum pidDatum = values[Anum_pg_dist_background_task_pid - 1]; + const Datum timeoutDatum = Int64GetDatum(0); + Datum signalSuccessDatum = DirectFunctionCall2(pg_terminate_backend, + pidDatum, timeoutDatum); + bool signalSuccess = DatumGetBool(signalSuccessDatum); + if (!signalSuccess) + { + /* + * We run this backend as superuser, any failure will probably cause + * long delays waiting on the task lock before we can commit. + */ + ereport(WARNING, + (errmsg("could not send signal to process %d: %m", + DatumGetInt32(pidDatum)), + errdetail("failing to signal an old executor could cause " + "delays starting the background task monitor"))); + } + + /* + * Since we didn't already acquire the lock here we need to wait on this + * lock before committing the change to the catalog. However, we first + * want to signal all backends before waiting on the lock, hence we keep a + * list for later + */ + int64 *taskIdTarget = palloc0(sizeof(int64)); + *taskIdTarget = taskId; + taskIdsToWait = lappend(taskIdsToWait, taskIdTarget); + } + } + + values[Anum_pg_dist_background_task_pid - 1] = InvalidOid; + isnull[Anum_pg_dist_background_task_pid - 1] = true; + replace[Anum_pg_dist_background_task_pid - 1] = true; + + taskTuple = heap_modify_tuple(taskTuple, tupleDescriptor, values, isnull, + replace); + + CatalogTupleUpdate(pgDistBackgroundTasks, &taskTuple->t_self, taskTuple); + } + + if (list_length(taskIdsToWait) > 0) + { + ereport(LOG, (errmsg("waiting till all tasks release their lock before " + "continuing with the background task monitor"))); + + /* there are tasks that need to release their lock before we can continue */ + int64 *taskId = NULL; + foreach_ptr(taskId, taskIdsToWait) + { + LOCKTAG locktag = { 0 }; + SET_LOCKTAG_BACKGROUND_TASK(locktag, *taskId); + const bool sessionLock = false; + const bool dontWait = false; + (void) LockAcquire(&locktag, AccessExclusiveLock, sessionLock, dontWait); + } + } + + CommandCounterIncrement(); + + systable_endscan(scanDescriptor); + + table_close(pgDistBackgroundTasks, NoLock); +} + + +/* + * DeformBackgroundJobHeapTuple pareses a HeapTuple from pg_dist_background_job into its + * inmemory representation. This can be used while scanning a heap to quickly get access + * to all fields of a Job. + */ +static BackgroundJob * +DeformBackgroundJobHeapTuple(TupleDesc tupleDescriptor, HeapTuple jobTuple) +{ + Datum values[Natts_pg_dist_background_job] = { 0 }; + bool nulls[Natts_pg_dist_background_job] = { 0 }; + heap_deform_tuple(jobTuple, tupleDescriptor, values, nulls); + + BackgroundJob *job = palloc0(sizeof(BackgroundJob)); + job->jobid = DatumGetInt64(values[Anum_pg_dist_background_job_job_id - 1]); + job->state = BackgroundJobStatusByOid( + DatumGetObjectId(values[Anum_pg_dist_background_job_state - 1])); + + if (!nulls[Anum_pg_dist_background_job_job_type - 1]) + { + Name jobTypeName = DatumGetName(values[Anum_pg_dist_background_job_job_type - + 1]); + job->jobType = pstrdup(NameStr(*jobTypeName)); + } + + if (!nulls[Anum_pg_dist_background_job_description - 1]) + { + job->description = text_to_cstring( + DatumGetTextP(values[Anum_pg_dist_background_job_description - 1])); + } + + if (!nulls[Anum_pg_dist_background_job_started_at - 1]) + { + TimestampTz startedAt = + DatumGetTimestampTz(values[Anum_pg_dist_background_job_started_at - 1]); + SET_NULLABLE_FIELD(job, started_at, startedAt); + } + + if (!nulls[Anum_pg_dist_background_job_finished_at - 1]) + { + TimestampTz finishedAt = + DatumGetTimestampTz(values[Anum_pg_dist_background_job_finished_at - 1]); + SET_NULLABLE_FIELD(job, finished_at, finishedAt); + } + + return job; +} + + +/* + * DeformBackgroundTaskHeapTuple parses a HeapTuple from pg_dist_background_task into its + * inmemory representation. This can be used while scanning a heap to quickly get access + * to all fields of a Task. + */ +static BackgroundTask * +DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple) +{ + Datum values[Natts_pg_dist_background_task] = { 0 }; + bool nulls[Natts_pg_dist_background_task] = { 0 }; + heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls); + + BackgroundTask *task = palloc0(sizeof(BackgroundTask)); + task->jobid = DatumGetInt64(values[Anum_pg_dist_background_task_job_id - 1]); + task->taskid = DatumGetInt64(values[Anum_pg_dist_background_task_task_id - 1]); + task->owner = DatumGetObjectId(values[Anum_pg_dist_background_task_owner - 1]); + if (!nulls[Anum_pg_dist_background_task_pid - 1]) + { + int32 pid = DatumGetInt32(values[Anum_pg_dist_background_task_pid - 1]); + SET_NULLABLE_FIELD(task, pid, pid); + } + task->status = BackgroundTaskStatusByOid( + DatumGetObjectId(values[Anum_pg_dist_background_task_status - 1])); + + task->command = text_to_cstring( + DatumGetTextP(values[Anum_pg_dist_background_task_command - 1])); + + if (!nulls[Anum_pg_dist_background_task_retry_count - 1]) + { + int32 retryCount = + DatumGetInt32(values[Anum_pg_dist_background_task_retry_count - 1]); + SET_NULLABLE_FIELD(task, retry_count, retryCount); + } + + if (!nulls[Anum_pg_dist_background_task_not_before - 1]) + { + TimestampTz notBefore = + DatumGetTimestampTz(values[Anum_pg_dist_background_task_not_before - 1]); + SET_NULLABLE_FIELD(task, not_before, notBefore); + } + + if (!nulls[Anum_pg_dist_background_task_message - 1]) + { + task->message = + TextDatumGetCString(values[Anum_pg_dist_background_task_message - 1]); + } + + return task; +} + + +/* + * BackgroundTaskHasUmnetDependencies checks if a task from the given job has any unmet + * dependencies. An unmet dependency is a Task that the task in question depends on and + * has not reached its Done state. + */ +static bool +BackgroundTaskHasUmnetDependencies(int64 jobId, int64 taskId) +{ + bool hasUnmetDependency = false; + + Relation pgDistBackgroundTasksDepend = + table_open(DistBackgroundTaskDependRelationId(), AccessShareLock); + + ScanKeyData scanKey[2] = { 0 }; + bool indexOK = true; + + /* pg_catalog.pg_dist_background_task_depend.job_id = jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_task_depend_job_id, + BTEqualStrategyNumber, F_INT8EQ, jobId); + + /* pg_catalog.pg_dist_background_task_depend.task_id = $taskId */ + ScanKeyInit(&scanKey[1], Anum_pg_dist_background_task_depend_task_id, + BTEqualStrategyNumber, F_INT8EQ, taskId); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasksDepend, + DistBackgroundTaskDependTaskIdIndexId(), + indexOK, NULL, lengthof(scanKey), + scanKey); + + HeapTuple dependTuple = NULL; + while (HeapTupleIsValid(dependTuple = systable_getnext(scanDescriptor))) + { + Form_pg_dist_background_task_depend depends = + (Form_pg_dist_background_task_depend) GETSTRUCT(dependTuple); + + BackgroundTask *dependingJob = GetBackgroundTaskByTaskId(depends->depends_on); + + /* + * Only when the status of all depending jobs is done we clear this job and say + * that is has no unmet dependencies. + */ + if (dependingJob->status == BACKGROUND_TASK_STATUS_DONE) + { + continue; + } + + hasUnmetDependency = true; + break; + } + + systable_endscan(scanDescriptor); + table_close(pgDistBackgroundTasksDepend, AccessShareLock); + + return hasUnmetDependency; +} + + +/* + * BackgroundTaskReadyToRun checks if a task is ready to run. This consists of two checks + * - the task has no unmet dependencies + * - the task either has no not_before value set, or the not_before time has passed. + * + * Due to the costs of checking we check them in reverse order, but conceptually they + * should be thought of in the above order. + */ +static bool +BackgroundTaskReadyToRun(BackgroundTask *task) +{ + if (task->not_before) + { + if (*(task->not_before) > GetCurrentTimestamp()) + { + /* task should not yet be run */ + return false; + } + } + + if (BackgroundTaskHasUmnetDependencies(task->jobid, task->taskid)) + { + return false; + } + + return true; +} + + +/* + * GetRunnableBackgroundTask returns the first candidate for a task to be run. When a task + * is returned it has been checked for all the preconditions to hold. + * + * That means, if there is no task returned the background worker should close and let the + * maintenance daemon start a new background tasks queue monitor once task become + * available. + */ +BackgroundTask * +GetRunnableBackgroundTask(void) +{ + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTaskRelationId(), ExclusiveLock); + + BackgroundTaskStatus taskStatus[] = { + BACKGROUND_TASK_STATUS_RUNNABLE + }; + + BackgroundTask *task = NULL; + for (int i = 0; !task && i < sizeof(taskStatus) / sizeof(taskStatus[0]); i++) + { + const int scanKeyCount = 1; + ScanKeyData scanKey[1] = { 0 }; + const bool indexOK = true; + + /* pg_dist_background_task.status == taskStatus[i] */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_task_status, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum( + BackgroundTaskStatusOid(taskStatus[i]))); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasks, + DistBackgroundTaskStatusTaskIdIndexId(), + indexOK, NULL, scanKeyCount, + scanKey); + + HeapTuple taskTuple = NULL; + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); + while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) + { + task = DeformBackgroundTaskHeapTuple(tupleDescriptor, taskTuple); + if (BackgroundTaskReadyToRun(task)) + { + /* found task, close table and return */ + break; + } + task = NULL; + } + + systable_endscan(scanDescriptor); + } + + table_close(pgDistBackgroundTasks, NoLock); + + return task; +} + + +/* + * GetBackgroundJobByJobId loads a BackgroundJob from the catalog into memory. Return's a + * null pointer if no job exist with the given JobId. + */ +BackgroundJob * +GetBackgroundJobByJobId(int64 jobId) +{ + ScanKeyData scanKey[1] = { 0 }; + bool indexOK = true; + + Relation pgDistBackgroundJobs = + table_open(DistBackgroundJobRelationId(), AccessShareLock); + + /* pg_dist_background_task.job_id == $jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_job_job_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId)); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundJobs, DistBackgroundJobPKeyIndexId(), + indexOK, NULL, lengthof(scanKey), scanKey); + + HeapTuple taskTuple = systable_getnext(scanDescriptor); + BackgroundJob *job = NULL; + if (HeapTupleIsValid(taskTuple)) + { + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundJobs); + job = DeformBackgroundJobHeapTuple(tupleDescriptor, taskTuple); + } + + systable_endscan(scanDescriptor); + table_close(pgDistBackgroundJobs, AccessShareLock); + + return job; +} + + +/* + * GetBackgroundTaskByTaskId loads a BackgroundTask from the catalog into memory. Return's + * a null pointer if no job exist with the given JobId and TaskId. + */ +BackgroundTask * +GetBackgroundTaskByTaskId(int64 taskId) +{ + ScanKeyData scanKey[1] = { 0 }; + bool indexOK = true; + + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTaskRelationId(), AccessShareLock); + + /* pg_dist_background_task.task_id == $taskId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_task_task_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId)); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasks, + DistBackgroundTaskPKeyIndexId(), + indexOK, NULL, lengthof(scanKey), scanKey); + + HeapTuple taskTuple = systable_getnext(scanDescriptor); + BackgroundTask *task = NULL; + if (HeapTupleIsValid(taskTuple)) + { + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); + task = DeformBackgroundTaskHeapTuple(tupleDescriptor, taskTuple); + } + + systable_endscan(scanDescriptor); + table_close(pgDistBackgroundTasks, AccessShareLock); + + return task; +} + + +typedef struct JobTaskStatusCounts +{ + int blocked; + int runnable; + int running; + int done; + int error; + int unscheduled; + int cancelled; + int cancelling; +} JobTaskStatusCounts; + + +/* + * JobTasksStatusCount scans all tasks associated with the provided job and count's the + * number of tasks that are tracked in each state. Effectively grouping and counting the + * tasks by their state. + */ +static JobTaskStatusCounts +JobTasksStatusCount(int64 jobId) +{ + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTaskRelationId(), RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); + + ScanKeyData scanKey[1] = { 0 }; + const bool indexOK = true; + + /* WHERE job_id = $task->jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_task_job_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId)); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasks, + DistBackgroundTaskJobIdTaskIdIndexId(), + indexOK, NULL, lengthof(scanKey), scanKey); + + JobTaskStatusCounts counts = { 0 }; + HeapTuple heapTuple = NULL; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + Datum values[Natts_pg_dist_background_task] = { 0 }; + bool isnull[Natts_pg_dist_background_task] = { 0 }; + + heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); + + Oid statusOid = DatumGetObjectId(values[Anum_pg_dist_background_task_status - + 1]); + BackgroundTaskStatus status = BackgroundTaskStatusByOid(statusOid); + + switch (status) + { + case BACKGROUND_TASK_STATUS_BLOCKED: + { + counts.blocked++; + break; + } + + case BACKGROUND_TASK_STATUS_RUNNABLE: + { + counts.runnable++; + break; + } + + case BACKGROUND_TASK_STATUS_RUNNING: + { + counts.running++; + break; + } + + case BACKGROUND_TASK_STATUS_DONE: + { + counts.done++; + break; + } + + case BACKGROUND_TASK_STATUS_ERROR: + { + counts.error++; + break; + } + + case BACKGROUND_TASK_STATUS_UNSCHEDULED: + { + counts.unscheduled++; + break; + } + + case BACKGROUND_TASK_STATUS_CANCELLED: + { + counts.cancelled++; + break; + } + + case BACKGROUND_TASK_STATUS_CANCELLING: + { + counts.cancelling++; + break; + } + + default: + { + elog(ERROR, "unknown state in pg_dist_background_task"); + } + } + } + + systable_endscan(scanDescriptor); + table_close(pgDistBackgroundTasks, NoLock); + + return counts; +} + + +/* + * SetFieldValue populates values, isnull, replace according to the newValue passed, + * returning if the value has been updated or not. The replace argument can be omitted if + * we are simply initializing a field. + * + * suggested use would be: + * bool updated = false; + * updated |= SetFieldValue(Anum_...._...., isnull, replace, values, newValue); + * updated |= SetFieldText(Anum_...._...., isnull, replace, values, "hello world"); + * updated |= SetFieldNull(Anum_...._...., isnull, replace, values); + * + * Only if updated is set in the end the tuple has to be updated in the catalog. + */ +static bool +SetFieldValue(int attno, Datum values[], bool isnull[], bool replace[], Datum newValue) +{ + int idx = attno - 1; + bool updated = false; + + if (!isnull[idx] && newValue == values[idx]) + { + return updated; + } + + values[idx] = newValue; + isnull[idx] = false; + updated = true; + + if (replace) + { + replace[idx] = true; + } + return updated; +} + + +/* + * SetFieldText populates values, isnull, replace according to the newValue passed, + * returning if the value has been updated or not. The replace argument can be omitted if + * we are simply initializing a field. + * + * suggested use would be: + * bool updated = false; + * updated |= SetFieldValue(Anum_...._...., isnull, replace, values, newValue); + * updated |= SetFieldText(Anum_...._...., isnull, replace, values, "hello world"); + * updated |= SetFieldNull(Anum_...._...., isnull, replace, values); + * + * Only if updated is set in the end the tuple has to be updated in the catalog. + */ +static bool +SetFieldText(int attno, Datum values[], bool isnull[], bool replace[], + const char *newValue) +{ + int idx = attno - 1; + bool updated = false; + + if (!isnull[idx]) + { + char *oldText = TextDatumGetCString(values[idx]); + if (strcmp(oldText, newValue) == 0) + { + return updated; + } + } + + values[idx] = CStringGetTextDatum(newValue); + isnull[idx] = false; + updated = true; + + if (replace) + { + replace[idx] = true; + } + return updated; +} + + +/* + * SetFieldNull populates values, isnull and replace according to a null value, + * returning if the value has been updated or not. The replace argument can be omitted if + * we are simply initializing a field. + * + * suggested use would be: + * bool updated = false; + * updated |= SetFieldValue(Anum_...._...., isnull, replace, values, newValue); + * updated |= SetFieldText(Anum_...._...., isnull, replace, values, "hello world"); + * updated |= SetFieldNull(Anum_...._...., isnull, replace, values); + * + * Only if updated is set in the end the tuple has to be updated in the catalog. + */ +static bool +SetFieldNull(int attno, Datum values[], bool isnull[], bool replace[]) +{ + int idx = attno - 1; + bool updated = false; + + if (isnull[idx]) + { + return updated; + } + + isnull[idx] = true; + values[idx] = InvalidOid; + updated = true; + + if (replace) + { + replace[idx] = true; + } + return updated; +} + + +/* + * UpdateBackgroundJob updates the job's metadata based on the most recent status of all + * its associated tasks. + * + * Since the state of a job is a function of the state of all associated tasks this + * function projects the tasks states into the job's state. + * + * When Citus makes a change to any of the tasks associated with the job it should call + * this function to correctly project the task updates onto the jobs metadata. + */ +void +UpdateBackgroundJob(int64 jobId) +{ + JobTaskStatusCounts counts = JobTasksStatusCount(jobId); + BackgroundJobStatus status = BACKGROUND_JOB_STATUS_RUNNING; + + if (counts.cancelling > 0) + { + status = BACKGROUND_JOB_STATUS_CANCELLING; + } + else if (counts.cancelled > 0) + { + status = BACKGROUND_JOB_STATUS_CANCELLED; + } + else if (counts.blocked + counts.runnable + counts.running + counts.error + + counts.unscheduled == 0) + { + /* all tasks are done, job is finished */ + status = BACKGROUND_JOB_STATUS_FINISHED; + } + else if (counts.error + counts.unscheduled > 0) + { + /* we are either failing, or failed */ + if (counts.blocked + counts.runnable + counts.running > 0) + { + /* failing, as there are still tasks to be run */ + status = BACKGROUND_JOB_STATUS_FAILING; + } + else + { + status = BACKGROUND_JOB_STATUS_FAILED; + } + } + else if (counts.blocked + counts.runnable + counts.running > 0) + { + status = BACKGROUND_JOB_STATUS_RUNNING; + } + else + { + return; + } + + Relation pgDistBackgroundJobs = + table_open(DistBackgroundJobRelationId(), RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundJobs); + + ScanKeyData scanKey[1] = { 0 }; + const bool indexOK = true; + + /* WHERE job_id = $task->jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_job_job_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId)); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundJobs, + DistBackgroundJobPKeyIndexId(), + indexOK, NULL, lengthof(scanKey), scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find background jobs entry for job_id: " + UINT64_FORMAT, jobId))); + } + + Datum values[Natts_pg_dist_background_task] = { 0 }; + bool isnull[Natts_pg_dist_background_task] = { 0 }; + bool replace[Natts_pg_dist_background_task] = { 0 }; + + heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); + + bool updated = false; + + Oid stateOid = BackgroundJobStatusOid(status); + updated |= SetFieldValue(Anum_pg_dist_background_job_state, values, isnull, replace, + ObjectIdGetDatum(stateOid)); + + if (status == BACKGROUND_JOB_STATUS_RUNNING) + { + if (isnull[Anum_pg_dist_background_job_started_at - 1]) + { + /* first time status has been updated and was running, updating started_at */ + TimestampTz startedAt = GetCurrentTimestamp(); + updated |= SetFieldValue(Anum_pg_dist_background_job_started_at, values, + isnull, replace, TimestampTzGetDatum(startedAt)); + } + } + + if (IsBackgroundJobStatusTerminal(status)) + { + if (isnull[Anum_pg_dist_background_job_finished_at - 1]) + { + /* didn't have a finished at time just yet, updating to now */ + TimestampTz finishedAt = GetCurrentTimestamp(); + updated |= SetFieldValue(Anum_pg_dist_background_job_finished_at, values, + isnull, replace, TimestampTzGetDatum(finishedAt)); + } + } + + if (updated) + { + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, + replace); + + CatalogTupleUpdate(pgDistBackgroundJobs, &heapTuple->t_self, heapTuple); + + CommandCounterIncrement(); + } + + systable_endscan(scanDescriptor); + table_close(pgDistBackgroundJobs, NoLock); +} + + +/* + * UpdateBackgroundTask updates the catalog entry for the passed task, preventing an + * actual update when the inmemory representation is the same as the one stored in the + * catalog. + */ +void +UpdateBackgroundTask(BackgroundTask *task) +{ + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTaskRelationId(), RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); + + ScanKeyData scanKey[1] = { 0 }; + const bool indexOK = true; + + /* WHERE task_id = $task->taskid */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_task_task_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(task->taskid)); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasks, + DistBackgroundTaskPKeyIndexId(), + indexOK, NULL, lengthof(scanKey), scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find background task entry for :" + "job_id/task_id: " UINT64_FORMAT "/" UINT64_FORMAT, + task->jobid, task->taskid))); + } + + Datum values[Natts_pg_dist_background_task] = { 0 }; + bool isnull[Natts_pg_dist_background_task] = { 0 }; + bool replace[Natts_pg_dist_background_task] = { 0 }; + + heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); + + bool updated = false; + + updated |= SetFieldValue(Anum_pg_dist_background_task_owner, values, isnull, replace, + task->owner); + + if (task->pid) + { + updated |= SetFieldValue(Anum_pg_dist_background_task_pid, values, isnull, + replace, Int32GetDatum(*task->pid)); + } + else + { + updated |= SetFieldNull(Anum_pg_dist_background_task_pid, values, isnull, + replace); + } + + Oid statusOid = ObjectIdGetDatum(BackgroundTaskStatusOid(task->status)); + updated |= SetFieldValue(Anum_pg_dist_background_task_status, values, isnull, replace, + statusOid); + + if (task->retry_count) + { + updated |= SetFieldValue(Anum_pg_dist_background_task_retry_count, values, isnull, + replace, Int32GetDatum(*task->retry_count)); + } + else + { + updated |= SetFieldNull(Anum_pg_dist_background_task_retry_count, values, isnull, + replace); + } + + if (task->not_before) + { + updated |= SetFieldValue(Anum_pg_dist_background_task_not_before, values, isnull, + replace, TimestampTzGetDatum(*task->not_before)); + } + else + { + updated |= SetFieldNull(Anum_pg_dist_background_task_not_before, values, isnull, + replace); + } + + if (task->message) + { + updated |= SetFieldText(Anum_pg_dist_background_task_message, values, isnull, + replace, task->message); + } + else + { + updated |= SetFieldNull(Anum_pg_dist_background_task_message, values, isnull, + replace); + } + + if (updated) + { + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, + replace); + + CatalogTupleUpdate(pgDistBackgroundTasks, &heapTuple->t_self, heapTuple); + + CommandCounterIncrement(); + } + + systable_endscan(scanDescriptor); + table_close(pgDistBackgroundTasks, NoLock); +} + + +/* + * GetDependantTasks returns a list of taskId's containing all tasks depending on the task + * passed via its arguments. + * + * Becasue tasks are int64 we allocate and return a List of int64 pointers. + */ +static List * +GetDependantTasks(int64 jobId, int64 taskId) +{ + Relation pgDistBackgroundTasksDepends = + table_open(DistBackgroundTaskDependRelationId(), RowExclusiveLock); + + ScanKeyData scanKey[2] = { 0 }; + const bool indexOK = true; + + /* pg_dist_background_task_depend.job_id = $jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_task_depend_job_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId)); + + /* pg_dist_background_task_depend.depends_on = $taskId */ + ScanKeyInit(&scanKey[1], Anum_pg_dist_background_task_depend_depends_on, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId)); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasksDepends, + DistBackgroundTaskDependDependsOnIndexId(), + indexOK, + NULL, lengthof(scanKey), scanKey); + + List *dependantTasks = NIL; + HeapTuple heapTuple = NULL; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + Form_pg_dist_background_task_depend depend = + (Form_pg_dist_background_task_depend) GETSTRUCT(heapTuple); + + int64 *dTaskId = palloc0(sizeof(int64)); + *dTaskId = depend->task_id; + + dependantTasks = lappend(dependantTasks, dTaskId); + } + + systable_endscan(scanDescriptor); + table_close(pgDistBackgroundTasksDepends, NoLock); + + return dependantTasks; +} + + +/* + * CancelTasksForJob cancels all tasks associated with a job that are not currently + * running and are not already in their terminal state. Canceling these tasks consist of + * updating the status of the task in the catalog. + * + * For all other tasks, namely the ones that are currently running, it returns the list of + * Pid's of the tasks running. These backends should be signalled for cancellation. + * + * Since we are either signalling or changing the status of a task we perform appropriate + * permission checks. This currently includes the exact same checks pg_cancel_backend + * would perform. + */ +List * +CancelTasksForJob(int64 jobid) +{ + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTaskRelationId(), ExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); + + ScanKeyData scanKey[1] = { 0 }; + + /* WHERE jobId = $jobid */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_task_job_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid)); + + const bool indexOK = true; + SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks, + DistBackgroundTaskJobIdTaskIdIndexId(), + indexOK, NULL, + lengthof(scanKey), scanKey); + + List *runningTaskPids = NIL; + HeapTuple taskTuple = NULL; + while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) + { + Datum values[Natts_pg_dist_background_task] = { 0 }; + bool nulls[Natts_pg_dist_background_task] = { 0 }; + bool replace[Natts_pg_dist_background_task] = { 0 }; + heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls); + + Oid statusOid = + DatumGetObjectId(values[Anum_pg_dist_background_task_status - 1]); + BackgroundTaskStatus status = BackgroundTaskStatusByOid(statusOid); + + if (IsBackgroundTaskStatusTerminal(status)) + { + continue; + } + + /* make sure the current user has the rights to cancel this task */ + Oid taskOwner = DatumGetObjectId(values[Anum_pg_dist_background_task_owner]); + if (superuser_arg(taskOwner) && !superuser()) + { + /* must be a superuser to cancel tasks owned by superuser */ + ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be a superuser to cancel superuser tasks"))); + } + else if (!has_privs_of_role(GetUserId(), taskOwner) && +#if PG_VERSION_NUM >= 140000 + !has_privs_of_role(GetUserId(), ROLE_PG_SIGNAL_BACKEND)) +#else + !has_privs_of_role(GetUserId(), DEFAULT_ROLE_SIGNAL_BACKENDID)) +#endif + { + /* user doesn't have the permissions to cancel this job */ + ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be a member of the role whose task is being " + "canceled or member of pg_signal_backend"))); + } + + BackgroundTaskStatus newStatus = BACKGROUND_TASK_STATUS_CANCELLED; + if (status == BACKGROUND_TASK_STATUS_RUNNING) + { + if (!nulls[Anum_pg_dist_background_task_pid - 1]) + { + int32 pid = DatumGetInt32(values[Anum_pg_dist_background_task_pid - 1]); + runningTaskPids = lappend_int(runningTaskPids, pid); + newStatus = BACKGROUND_TASK_STATUS_CANCELLING; + } + } + + /* update task to new status */ + nulls[Anum_pg_dist_background_task_status - 1] = false; + values[Anum_pg_dist_background_task_status - 1] = ObjectIdGetDatum( + BackgroundTaskStatusOid(newStatus)); + replace[Anum_pg_dist_background_task_status - 1] = true; + + taskTuple = heap_modify_tuple(taskTuple, tupleDescriptor, values, nulls, + replace); + CatalogTupleUpdate(pgDistBackgroundTasks, &taskTuple->t_self, taskTuple); + } + + systable_endscan(scanDescriptor); + table_close(pgDistBackgroundTasks, NoLock); + + CommandCounterIncrement(); + + return runningTaskPids; +} + + +/* + * UnscheduleDependentTasks follows the dependency tree of the provided task recursively + * to unschedule any task depending on the current task. + * + * This is useful to unschedule any task that can never run because it will never satisfy + * the unmet dependency constraint. + */ +void +UnscheduleDependentTasks(BackgroundTask *task) +{ + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTaskRelationId(), RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); + + List *dependantTasks = GetDependantTasks(task->jobid, task->taskid); + while (list_length(dependantTasks) > 0) + { + /* pop last item from stack */ + int64 cTaskId = *(int64 *) llast(dependantTasks); + dependantTasks = list_delete_last(dependantTasks); + + /* push new dependant tasks on to stack */ + dependantTasks = list_concat(dependantTasks, + GetDependantTasks(task->jobid, cTaskId)); + + /* unschedule current task */ + { + ScanKeyData scanKey[1] = { 0 }; + + /* WHERE taskId = dependentTask->taskId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_task_task_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(cTaskId)); + const bool indexOK = true; + SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks, + DistBackgroundTaskPKeyIndexId(), + indexOK, NULL, + lengthof(scanKey), scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find background task entry for " + "task_id: " UINT64_FORMAT, cTaskId))); + } + + Datum values[Natts_pg_dist_background_task] = { 0 }; + bool isnull[Natts_pg_dist_background_task] = { 0 }; + bool replace[Natts_pg_dist_background_task] = { 0 }; + + values[Anum_pg_dist_background_task_status - 1] = + ObjectIdGetDatum(CitusTaskStatusUnscheduledId()); + isnull[Anum_pg_dist_background_task_status - 1] = false; + replace[Anum_pg_dist_background_task_status - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, + replace); + CatalogTupleUpdate(pgDistBackgroundTasks, &heapTuple->t_self, heapTuple); + + systable_endscan(scanDescriptor); + } + } + + CommandCounterIncrement(); + + table_close(pgDistBackgroundTasks, NoLock); +} + + +/* + * UnblockDependingBackgroundTasks unblocks any depending task that now satisfies the + * constraaint that it doesn't have unmet dependencies anymore. For this to be done we + * will find all tasks depending on the current task. Per found task we check if it has + * any unmet dependencies. If no tasks are found that would block the execution of this + * task we transition the task to Runnable state. + */ +void +UnblockDependingBackgroundTasks(BackgroundTask *task) +{ + Relation pgDistBackgroundTasksDepend = + table_open(DistBackgroundTaskDependRelationId(), RowExclusiveLock); + + ScanKeyData scanKey[2] = { 0 }; + + /* WHERE jobId = $jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_task_depend_job_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(task->jobid)); + + /* WHERE depends_on = $taskId */ + ScanKeyInit(&scanKey[1], Anum_pg_dist_background_task_depend_depends_on, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(task->taskid)); + const bool indexOK = true; + SysScanDesc scanDescriptor = systable_beginscan( + pgDistBackgroundTasksDepend, DistBackgroundTaskDependDependsOnIndexId(), indexOK, + NULL, lengthof(scanKey), scanKey); + + HeapTuple heapTuple = NULL; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + Form_pg_dist_background_task_depend depend = + (Form_pg_dist_background_task_depend) GETSTRUCT(heapTuple); + + if (!BackgroundTaskHasUmnetDependencies(task->jobid, depend->task_id)) + { + /* + * The task does not have any unmet dependencies anymore and should become + * runnable + */ + + BackgroundTask *unblockedTask = GetBackgroundTaskByTaskId(depend->task_id); + if (unblockedTask->status == BACKGROUND_TASK_STATUS_CANCELLED) + { + continue; + } + + Assert(unblockedTask->status == BACKGROUND_TASK_STATUS_BLOCKED); + unblockedTask->status = BACKGROUND_TASK_STATUS_RUNNABLE; + UpdateBackgroundTask(unblockedTask); + } + } + + systable_endscan(scanDescriptor); + + table_close(pgDistBackgroundTasksDepend, NoLock); +} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 75bd7992d..0f5f1501f 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -32,6 +32,7 @@ #include "common/string.h" #include "executor/executor.h" #include "distributed/backend_data.h" +#include "distributed/background_jobs.h" #include "distributed/citus_depended_object.h" #include "distributed/citus_nodefuncs.h" #include "distributed/citus_safe_lib.h" @@ -869,6 +870,16 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.background_task_queue_interval", + gettext_noop("Time to wait between checks for scheduled background tasks."), + NULL, + &BackgroundTaskQueueCheckInterval, + 5000, -1, 7 * 24 * 3600 * 1000, + PGC_SIGHUP, + GUC_UNIT_MS, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.check_available_space_before_move", gettext_noop("When enabled will check free disk space before a shard move"), diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index a5a78fa87..a5433c7d4 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -110,3 +110,60 @@ GRANT SELECT ON pg_catalog.pg_dist_cleanup_recordid_seq TO public; -- old definition. By recreating it here upgrades also pick up the new changes. #include "udfs/pg_cancel_backend/11.0-1.sql" #include "udfs/pg_terminate_backend/11.0-1.sql" + +CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'finished', 'cancelling', 'cancelled', 'failing', 'failed'); +ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog; + +CREATE TABLE citus.pg_dist_background_job ( + job_id bigserial NOT NULL, + state pg_catalog.citus_job_status DEFAULT 'scheduled' NOT NULL, + job_type name NOT NULL, + description text NOT NULL, + started_at timestamptz, + finished_at timestamptz, + + CONSTRAINT pg_dist_background_job_pkey PRIMARY KEY (job_id) +); +ALTER TABLE citus.pg_dist_background_job SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.pg_dist_background_job TO PUBLIC; +GRANT SELECT ON pg_catalog.pg_dist_background_job_job_id_seq TO PUBLIC; + +CREATE TYPE citus.citus_task_status AS ENUM ('blocked', 'runnable', 'running', 'done', 'cancelling', 'error', 'unscheduled', 'cancelled'); +ALTER TYPE citus.citus_task_status SET SCHEMA pg_catalog; + +CREATE TABLE citus.pg_dist_background_task( + job_id bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_job(job_id), + task_id bigserial NOT NULL, + owner regrole NOT NULL DEFAULT CURRENT_USER::regrole, + pid integer, + status pg_catalog.citus_task_status default 'runnable' NOT NULL, + command text NOT NULL, + retry_count integer, + not_before timestamptz, -- can be null to indicate no delay for start of the task, will be set on failure to delay retries + message text NOT NULL DEFAULT '', + + CONSTRAINT pg_dist_background_task_pkey PRIMARY KEY (task_id), + CONSTRAINT pg_dist_background_task_job_id_task_id UNIQUE (job_id, task_id) -- required for FK's to enforce tasks only reference other tasks within the same job +); +ALTER TABLE citus.pg_dist_background_task SET SCHEMA pg_catalog; +CREATE INDEX pg_dist_background_task_status_task_id_index ON pg_catalog.pg_dist_background_task USING btree(status, task_id); +GRANT SELECT ON pg_catalog.pg_dist_background_task TO PUBLIC; +GRANT SELECT ON pg_catalog.pg_dist_background_task_task_id_seq TO PUBLIC; + +CREATE TABLE citus.pg_dist_background_task_depend( + job_id bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_job(job_id) ON DELETE CASCADE, + task_id bigint NOT NULL, + depends_on bigint NOT NULL, + + PRIMARY KEY (job_id, task_id, depends_on), + FOREIGN KEY (job_id, task_id) REFERENCES pg_catalog.pg_dist_background_task (job_id, task_id) ON DELETE CASCADE, + FOREIGN KEY (job_id, depends_on) REFERENCES pg_catalog.pg_dist_background_task (job_id, task_id) ON DELETE CASCADE +); + +ALTER TABLE citus.pg_dist_background_task_depend SET SCHEMA pg_catalog; +CREATE INDEX pg_dist_background_task_depend_task_id ON pg_catalog.pg_dist_background_task_depend USING btree(job_id, task_id); +CREATE INDEX pg_dist_background_task_depend_depends_on ON pg_catalog.pg_dist_background_task_depend USING btree(job_id, depends_on); +GRANT SELECT ON pg_catalog.pg_dist_background_task_depend TO PUBLIC; + +#include "udfs/citus_job_wait/11.1-1.sql" +#include "udfs/citus_job_cancel/11.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql index d859ef1d6..2318b98f1 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql @@ -105,3 +105,11 @@ DROP TABLE pg_catalog.pg_dist_cleanup; DROP SEQUENCE pg_catalog.pg_dist_operationid_seq; DROP SEQUENCE pg_catalog.pg_dist_cleanup_recordid_seq; DROP PROCEDURE pg_catalog.citus_cleanup_orphaned_resources(); + +DROP FUNCTION pg_catalog.citus_job_cancel(bigint); +DROP FUNCTION pg_catalog.citus_job_wait(bigint, pg_catalog.citus_job_status); +DROP TABLE pg_catalog.pg_dist_background_task_depend; +DROP TABLE pg_catalog.pg_dist_background_task; +DROP TYPE pg_catalog.citus_task_status; +DROP TABLE pg_catalog.pg_dist_background_job; +DROP TYPE pg_catalog.citus_job_status; diff --git a/src/backend/distributed/sql/udfs/citus_job_cancel/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_job_cancel/11.1-1.sql new file mode 100644 index 000000000..6a05a6911 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_job_cancel/11.1-1.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.citus_job_cancel(jobid bigint) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$citus_job_cancel$$; +COMMENT ON FUNCTION pg_catalog.citus_job_cancel(jobid bigint) + IS 'cancel a scheduled or running job and all of its tasks that didn''t finish yet'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_job_cancel(jobid bigint) TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_job_cancel/latest.sql b/src/backend/distributed/sql/udfs/citus_job_cancel/latest.sql new file mode 100644 index 000000000..6a05a6911 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_job_cancel/latest.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.citus_job_cancel(jobid bigint) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$citus_job_cancel$$; +COMMENT ON FUNCTION pg_catalog.citus_job_cancel(jobid bigint) + IS 'cancel a scheduled or running job and all of its tasks that didn''t finish yet'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_job_cancel(jobid bigint) TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_job_wait/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_job_wait/11.1-1.sql new file mode 100644 index 000000000..fa35607dd --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_job_wait/11.1-1.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.citus_job_wait(jobid bigint, desired_status pg_catalog.citus_job_status DEFAULT NULL) + RETURNS VOID + LANGUAGE C + AS 'MODULE_PATHNAME',$$citus_job_wait$$; +COMMENT ON FUNCTION pg_catalog.citus_job_wait(jobid bigint, desired_status pg_catalog.citus_job_status) + IS 'blocks till the job identified by jobid is at the specified status, or reached a terminal status. Only waits for terminal status when no desired_status was specified. The return value indicates if the desired status was reached or not. When no desired status was specified it will assume any terminal status was desired'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_job_wait(jobid bigint, desired_status pg_catalog.citus_job_status) TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_job_wait/latest.sql b/src/backend/distributed/sql/udfs/citus_job_wait/latest.sql new file mode 100644 index 000000000..fa35607dd --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_job_wait/latest.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.citus_job_wait(jobid bigint, desired_status pg_catalog.citus_job_status DEFAULT NULL) + RETURNS VOID + LANGUAGE C + AS 'MODULE_PATHNAME',$$citus_job_wait$$; +COMMENT ON FUNCTION pg_catalog.citus_job_wait(jobid bigint, desired_status pg_catalog.citus_job_status) + IS 'blocks till the job identified by jobid is at the specified status, or reached a terminal status. Only waits for terminal status when no desired_status was specified. The return value indicates if the desired status was reached or not. When no desired status was specified it will assume any terminal status was desired'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_job_wait(jobid bigint, desired_status pg_catalog.citus_job_status) TO PUBLIC; diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c new file mode 100644 index 000000000..f28bef486 --- /dev/null +++ b/src/backend/distributed/utils/background_jobs.c @@ -0,0 +1,1340 @@ +/*------------------------------------------------------------------------- + * + * background_jobs.c + * Background jobs run as a background worker, spawned from the + * maintenance daemon. Jobs have tasks, tasks can depend on other + * tasks before execution. + * + * This file contains the code for two separate background workers to + * achieve the goal of running background tasks asynchronously from the + * main database workload. This first background worker is the + * Background Tasks Queue Monitor. This background worker keeps track of + * tasks recorded in pg_dist_background_task and ensures execution based + * on a statemachine. When a task needs to be executed it starts a + * Background Task Executor that executes the sql statement defined in the + * task. The output of the Executor is shared with the Monitor via a + * shared memory queue. + * + * To make sure there is only ever exactly one monitor running per database + * it takes an exclusive lock on the CITUS_BACKGROUND_TASK_MONITOR + * operation. This lock is consulted from the maintenance daemon to only + * spawn a new monitor when the lock is not held. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "safe_mem_lib.h" + +#include "access/xact.h" +#include "commands/dbcommands.h" +#include "libpq-fe.h" +#include "libpq/pqformat.h" +#include "libpq/pqmq.h" +#include "libpq/pqsignal.h" +#include "parser/analyze.h" +#include "pgstat.h" +#include "storage/dsm.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "tcop/pquery.h" +#include "tcop/tcopprot.h" +#include "tcop/utility.h" +#include "utils/fmgrprotos.h" +#include "utils/memutils.h" +#include "utils/portal.h" +#include "utils/ps_status.h" +#include "utils/resowner.h" +#include "utils/snapmgr.h" +#include "utils/timeout.h" + +#include "distributed/background_jobs.h" +#include "distributed/citus_safe_lib.h" +#include "distributed/listutils.h" +#include "distributed/maintenanced.h" +#include "distributed/metadata_cache.h" +#include "distributed/metadata_utility.h" +#include "distributed/shard_cleaner.h" +#include "distributed/resource_lock.h" + +/* Table-of-contents constants for our dynamic shared memory segment. */ +#define CITUS_BACKGROUND_TASK_MAGIC 0x51028081 +#define CITUS_BACKGROUND_TASK_KEY_DATABASE 0 +#define CITUS_BACKGROUND_TASK_KEY_USERNAME 1 +#define CITUS_BACKGROUND_TASK_KEY_COMMAND 2 +#define CITUS_BACKGROUND_TASK_KEY_QUEUE 3 +#define CITUS_BACKGROUND_TASK_KEY_TASK_ID 4 +#define CITUS_BACKGROUND_TASK_NKEYS 5 + +static BackgroundWorkerHandle * StartCitusBackgroundTaskExecuter(char *database, + char *user, + char *command, + int64 taskId, + dsm_segment **pSegment); +static void ExecuteSqlString(const char *sql); +static void ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, + bool *hadError); +static void UpdateDependingTasks(BackgroundTask *task); +static int64 CalculateBackoffDelay(int retryCount); + +PG_FUNCTION_INFO_V1(citus_job_cancel); +PG_FUNCTION_INFO_V1(citus_job_wait); + + +/* + * pg_catalog.citus_job_cancel(jobid bigint) void + * cancels a scheduled/running job + * + * When cancelling a job there are two phases. + * 1. scan all associated tasks and transition all tasks that are not already in their + * terminal state to cancelled. Except if the task is currently running. + * 2. for all running tasks we send a cancelation signal to the backend running the + * query. The background executor/monitor will transition this task to cancelled. + * + * We apply the same policy checks as pg_cancel_backend to check if a user can cancel a + * job. + */ +Datum +citus_job_cancel(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + int64 jobid = PG_GETARG_INT64(0); + + /* Cancel all tasks that were scheduled before */ + List *pids = CancelTasksForJob(jobid); + + /* send cancellation to any running backends */ + int pid = 0; + foreach_int(pid, pids) + { + Datum pidDatum = Int32GetDatum(pid); + Datum signalSuccessDatum = DirectFunctionCall1(pg_cancel_backend, pidDatum); + bool signalSuccess = DatumGetBool(signalSuccessDatum); + if (!signalSuccess) + { + ereport(WARNING, (errmsg("could not send signal to process %d: %m", pid))); + } + } + + UpdateBackgroundJob(jobid); + + PG_RETURN_VOID(); +} + + +/* + * pg_catalog.citus_job_wait(jobid bigint, + * desired_status citus_job_status DEFAULT NULL) boolean + * waits till a job reaches a desired status, or can't reach the status anymore because + * it reached a (different) terminal state. When no desired_status is given it will + * assume any terminal state as its desired status. The function returns if the + * desired_state was reached. + * + * The current implementation is a polling implementation with an interval of 1 second. + * Ideally we would have some synchronization between the background tasks queue monitor + * and any backend calling this function to receive a signal when the job changes state. + */ +Datum +citus_job_wait(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + int64 jobid = PG_GETARG_INT64(0); + + /* parse the optional desired_status argument */ + bool hasDesiredStatus = !PG_ARGISNULL(1); + BackgroundJobStatus desiredStatus = { 0 }; + if (hasDesiredStatus) + { + desiredStatus = BackgroundJobStatusByOid(PG_GETARG_OID(1)); + } + + /* + * Since we are wait polling we will actually allocate memory on every poll. To make + * sure we don't put unneeded pressure on the memory we create a context that we clear + * every iteration. + */ + MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext, + "JobsWaitContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContext oldContext = MemoryContextSwitchTo(waitContext); + + while (true) + { + MemoryContextReset(waitContext); + + BackgroundJob *job = GetBackgroundJobByJobId(jobid); + if (!job) + { + ereport(ERROR, (errmsg("no job found for job with jobid: %ld", jobid))); + PG_RETURN_VOID(); + } + + if (hasDesiredStatus && job->state == desiredStatus) + { + /* job has reached its desired status, done waiting */ + break; + } + + if (IsBackgroundJobStatusTerminal(job->state)) + { + if (hasDesiredStatus) + { + /* + * We have reached a terminal state, which is not the desired state we + * were waiting for, otherwise we would have escaped earlier. Since it is + * a terminal state we know that we can never reach the desired state. + */ + + Oid reachedStatusOid = BackgroundJobStatusOid(job->state); + Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out, + reachedStatusOid); + char *reachedStatusName = DatumGetCString(reachedStatusNameDatum); + + Oid desiredStatusOid = BackgroundJobStatusOid(desiredStatus); + Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, + desiredStatusOid); + char *desiredStatusName = DatumGetCString(desiredStatusNameDatum); + + ereport(ERROR, + (errmsg("Job reached terminal state \"%s\" instead of desired " + "state \"%s\"", reachedStatusName, desiredStatusName))); + } + + /* job has reached its terminal state, done waiting */ + break; + } + + /* sleep for a while, before rechecking the job status */ + CHECK_FOR_INTERRUPTS(); + const long delay_ms = 1000; + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + delay_ms, + WAIT_EVENT_PG_SLEEP); + + ResetLatch(MyLatch); + } + + MemoryContextSwitchTo(oldContext); + MemoryContextDelete(waitContext); + + PG_RETURN_VOID(); +} + + +/* + * StartCitusBackgroundTaskQueueMonitor spawns a new background worker connected to the + * current database and owner. This background worker consumes the tasks that are ready + * for execution. + */ +BackgroundWorkerHandle * +StartCitusBackgroundTaskQueueMonitor(Oid database, Oid extensionOwner) +{ + BackgroundWorker worker = { 0 }; + BackgroundWorkerHandle *handle = NULL; + + /* Configure a worker. */ + memset(&worker, 0, sizeof(worker)); + SafeSnprintf(worker.bgw_name, BGW_MAXLEN, + "Citus Background Task Queue Monitor: %u/%u", + database, extensionOwner); + worker.bgw_flags = + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + + /* don't restart, we manage restarts from maintenance daemon */ + worker.bgw_restart_time = BGW_NEVER_RESTART; + strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); + strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), + "CitusBackgroundTaskQueueMonitorMain"); + worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); + memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner, + sizeof(Oid)); + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + { + return NULL; + } + + pid_t pid; + WaitForBackgroundWorkerStartup(handle, &pid); + + return handle; +} + + +/* + * context for any log/error messages emitted from the background task queue monitor. + */ +typedef struct CitusBackgroundTaskQueueMonitorErrorCallbackContext +{ + const char *database; +} CitusBackgroundTaskQueueMonitorCallbackContext; + + +/* + * CitusBackgroundTaskQueueMonitorErrorCallback is a callback handler that gets called for + * any ereport to add extra context to the message. + */ +static void +CitusBackgroundTaskQueueMonitorErrorCallback(void *arg) +{ + CitusBackgroundTaskQueueMonitorCallbackContext *context = + (CitusBackgroundTaskQueueMonitorCallbackContext *) arg; + errcontext("Citus Background Task Queue Monitor: %s", context->database); +} + + +/* + * CitusBackgroundTaskQueueMonitorMain is the main entry point for the background worker + * running the background tasks queue monitor. + * + * It's mainloop reads a runnable task from pg_dist_background_task and progressing the + * tasks and jobs state machines associated with the task. When no new task can be found + * it will exit(0) and lets the maintenance daemon poll for new tasks. + * + * The main loop is currently implemented as a synchronous loop stepping through the task + * and update its state before going to the next. + */ +void +CitusBackgroundTaskQueueMonitorMain(Datum arg) +{ + Oid databaseOid = DatumGetObjectId(arg); + + /* extension owner is passed via bgw_extra */ + Oid extensionOwner = InvalidOid; + memcpy_s(&extensionOwner, sizeof(extensionOwner), + MyBgworkerEntry->bgw_extra, sizeof(Oid)); + + BackgroundWorkerUnblockSignals(); + + /* connect to database, after that we can actually access catalogs */ + BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0); + + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* load database name and copy to a memory context that survives the transaction */ + const char *databasename = get_database_name(MyDatabaseId); + MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext); + databasename = pstrdup(databasename); + MemoryContextSwitchTo(oldContext); + + /* setup error context to indicate the errors came from a running background task */ + ErrorContextCallback errorCallback = { 0 }; + struct CitusBackgroundTaskQueueMonitorErrorCallbackContext context = { + .database = databasename, + }; + errorCallback.callback = CitusBackgroundTaskQueueMonitorErrorCallback; + errorCallback.arg = (void *) &context; + errorCallback.previous = error_context_stack; + error_context_stack = &errorCallback; + + PopActiveSnapshot(); + CommitTransactionCommand(); + + /* + * There should be exactly one background task monitor running, running multiple would + * cause conflicts on processing the tasks in the catalog table as well as violate + * parallelism guarantees. To make sure there is at most, exactly one backend running + * we take a session lock on the CITUS_BACKGROUND_TASK_MONITOR operation. + * + * TODO now that we have a lock, we should install a term handler to terminate any + * 'child' backend when we are terminated. Otherwise we will still have a situation + * where the actual task could be running multiple times. + */ + LOCKTAG tag = { 0 }; + SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_BACKGROUND_TASK_MONITOR); + const bool sessionLock = true; + const bool dontWait = true; + LockAcquireResult locked = + LockAcquire(&tag, AccessExclusiveLock, sessionLock, dontWait); + if (locked == LOCKACQUIRE_NOT_AVAIL) + { + ereport(ERROR, (errmsg("background task queue monitor already running for " + "database"))); + } + + /* make worker recognizable in pg_stat_activity */ + pgstat_report_appname("citus background task queue monitor"); + + ereport(DEBUG1, (errmsg("started citus background task queue monitor"))); + + MemoryContext perTaskContext = AllocSetContextCreate(CurrentMemoryContext, + "PerTaskContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* + * First we find all jobs that are running, we need to check if they are still running + * if not reset their state back to scheduled. + */ + { + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + ResetRunningBackgroundTasks(); + + PopActiveSnapshot(); + CommitTransactionCommand(); + } + + + MemoryContext oldContextPerJob = MemoryContextSwitchTo(perTaskContext); + TimestampTz backgroundWorkerFailedStartTime = 0; + + /* + * Although this variable could be omitted it does quickly and adequately describe + * till when we are looping. + */ + bool hasTasks = true; + while (hasTasks) + { + MemoryContextReset(perTaskContext); + + CHECK_FOR_INTERRUPTS(); + + InvalidateMetadataSystemCache(); + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* + * We need to load the task into the perTaskContext as we will switch contexts + * later due to the committing and starting of new transactions + */ + oldContext = MemoryContextSwitchTo(perTaskContext); + BackgroundTask *task = GetRunnableBackgroundTask(); + + if (!task) + { + MemoryContextSwitchTo(oldContext); + + PopActiveSnapshot(); + CommitTransactionCommand(); + + hasTasks = false; + break; + } + + /* we load the database name and username here as we are still in a transaction */ + char *databaseName = get_database_name(MyDatabaseId); + char *userName = GetUserNameFromId(task->owner, false); + + MemoryContextSwitchTo(oldContext); + + PopActiveSnapshot(); + CommitTransactionCommand(); + + MemoryContextSwitchTo(perTaskContext); + + /* + * The background worker needs to be started outside of the transaction, otherwise + * it will complain about leaking shared memory segments used, among other things, + * to communicate the output of the backend. + */ + dsm_segment *seg = NULL; + BackgroundWorkerHandle *handle = + StartCitusBackgroundTaskExecuter(databaseName, userName, task->command, + task->taskid, &seg); + + if (handle == NULL) + { + /* + * We are unable to start a background worker for the task execution. + * Probably we are out of background workers. Warn once and restart the loop + * after a short sleep. + */ + if (backgroundWorkerFailedStartTime == 0) + { + ereport(WARNING, (errmsg("unable to start background worker for " + "background task execution"))); + backgroundWorkerFailedStartTime = GetCurrentTimestamp(); + } + + const long delay_ms = 1000; + (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + delay_ms, WAIT_EVENT_PG_SLEEP); + ResetLatch(MyLatch); + + continue; + } + + if (backgroundWorkerFailedStartTime > 0) + { + /* + * We had a delay in starting the background worker for task execution. Report + * the actual delay and reset the time. This allows a subsequent task to + * report again if it can't start a background worker directly. + */ + long secs = 0; + int microsecs = 0; + TimestampDifference(backgroundWorkerFailedStartTime, GetCurrentTimestamp(), + &secs, µsecs); + ereport(LOG, (errmsg("able to start a background worker with %ld seconds" + "delay", secs))); + + backgroundWorkerFailedStartTime = 0; + } + + + pid_t pid = 0; + GetBackgroundWorkerPid(handle, &pid); + + ereport(LOG, (errmsg("found task with jobid/taskid: %ld/%ld", + task->jobid, task->taskid))); + + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* + * Reload task while holding a new ExclusiveLock on the table. A separate process + * could have cancelled or removed the task by now, they would not see the pid and + * status update, so it is our responsibility to stop the backend and _not_ write + * the pid and running status. + * + * The lock will release on transaction commit. + */ + LockRelationOid(DistBackgroundTaskRelationId(), ExclusiveLock); + + oldContext = MemoryContextSwitchTo(perTaskContext); + task = GetBackgroundTaskByTaskId(task->taskid); + MemoryContextSwitchTo(oldContext); + + if (!task || task->status == BACKGROUND_TASK_STATUS_CANCELLING || + task->status == BACKGROUND_TASK_STATUS_CANCELLED) + { + task->status = BACKGROUND_TASK_STATUS_CANCELLED; + UpdateBackgroundTask(task); + UpdateBackgroundJob(task->jobid); + + PopActiveSnapshot(); + CommitTransactionCommand(); + + /* + * Terminate backend and release shared memory to not leak these resources + * across iterations. + */ + TerminateBackgroundWorker(handle); + dsm_detach(seg); + + /* there could be an other task ready to run, let a new loop decide */ + continue; + } + + /* + * Now that we have verified the task has not been cancelled and still exist we + * update it to reflect the new state + */ + task->status = BACKGROUND_TASK_STATUS_RUNNING; + SET_NULLABLE_FIELD(task, pid, pid); + + /* Update task status to indicate it is running */ + UpdateBackgroundTask(task); + UpdateBackgroundJob(task->jobid); + + PopActiveSnapshot(); + CommitTransactionCommand(); + + MemoryContextSwitchTo(perTaskContext); + + bool hadError = false; + StringInfoData message = { 0 }; + initStringInfo(&message); + + { + shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_TASK_MAGIC, + dsm_segment_address(seg)); + shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false); + shm_mq_handle *responseq = shm_mq_attach(mq, seg, NULL); + + /* + * We consume the complete shm_mq here as ConsumeTaskWorkerOutput loops till + * it reaches a SHM_MQ_DETACHED response. + */ + ConsumeTaskWorkerOutput(responseq, &message, &hadError); + + shm_mq_detach(responseq); + } + + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* + * Same as before, we need to lock pg_dist_background_task in a way where we can + * check if there had been a concurrent cancel. + */ + LockRelationOid(DistBackgroundTaskRelationId(), ExclusiveLock); + + oldContext = MemoryContextSwitchTo(perTaskContext); + task = GetBackgroundTaskByTaskId(task->taskid); + MemoryContextSwitchTo(oldContext); + + if (!task || task->status == BACKGROUND_TASK_STATUS_CANCELLING || + task->status == BACKGROUND_TASK_STATUS_CANCELLED) + { + /* + * A concurrent cancel has happened or the task has disappeared, we are not + * retrying or changing state, we will only reflect the message onto the task + * and for completeness we update the job aswell, this should be a no-op + * + * We still need to release the shared memory and xact before looping + */ + + dsm_detach(seg); + + task->status = BACKGROUND_TASK_STATUS_CANCELLED; + task->message = message.data; + UpdateBackgroundTask(task); + UpdateBackgroundJob(task->jobid); + + PopActiveSnapshot(); + CommitTransactionCommand(); + + continue; + } + else if (hadError) + { + /* + * When we had an error we need to decide if we want to retry (keep the + * runnable state), or move to error state + */ + if (!task->retry_count) + { + SET_NULLABLE_FIELD(task, retry_count, 1); + } + else + { + (*task->retry_count)++; + } + + /* + * based on the retry count we either transition the task to its error + * state, or we calculate a new backoff time for future execution. + */ + int64 delayMs = CalculateBackoffDelay(*(task->retry_count)); + if (delayMs < 0) + { + task->status = BACKGROUND_TASK_STATUS_ERROR; + UNSET_NULLABLE_FIELD(task, not_before); + } + else + { + TimestampTz notBefore = TimestampTzPlusMilliseconds( + GetCurrentTimestamp(), delayMs); + SET_NULLABLE_FIELD(task, not_before, notBefore); + + task->status = BACKGROUND_TASK_STATUS_RUNNABLE; + } + } + else + { + task->status = BACKGROUND_TASK_STATUS_DONE; + } + UNSET_NULLABLE_FIELD(task, pid); + task->message = message.data; + + UpdateBackgroundTask(task); + UpdateDependingTasks(task); + UpdateBackgroundJob(task->jobid); + + dsm_detach(seg); + + PopActiveSnapshot(); + CommitTransactionCommand(); + } + + MemoryContextSwitchTo(oldContextPerJob); + MemoryContextDelete(perTaskContext); +} + + +/* + * CalculateBackoffDelay calculates the time to backoff between retries. + * + * Per try we increase the delay as follows: + * retry 1: 5 sec + * retry 2: 20 sec + * retry 3-32 (30 tries in total): 1 min + * + * returns -1 when retrying should stop. + * + * In the future we would like a callback on the job_type that could + * distinguish the retry count and delay + potential jitter on a + * job_type basis. For now we only assume this to be used by the + * rebalancer and settled on the retry scheme above. + */ +static int64 +CalculateBackoffDelay(int retryCount) +{ + if (retryCount == 1) + { + return 5 * 1000; + } + else if (retryCount == 2) + { + return 20 * 1000; + } + else if (retryCount <= 32) + { + return 60 * 1000; + } + return -1; +} + + +#if PG_VERSION_NUM < PG_VERSION_15 +static const char * +error_severity(int elevel) +{ + const char *prefix; + + switch (elevel) + { + case DEBUG1: + case DEBUG2: + case DEBUG3: + case DEBUG4: + case DEBUG5: + { + prefix = gettext_noop("DEBUG"); + break; + } + + case LOG: + case LOG_SERVER_ONLY: + { + prefix = gettext_noop("LOG"); + break; + } + + case INFO: + { + prefix = gettext_noop("INFO"); + break; + } + + case NOTICE: + { + prefix = gettext_noop("NOTICE"); + break; + } + + case WARNING: + { + prefix = gettext_noop("WARNING"); + break; + } + +#if PG_VERSION_NUM >= PG_VERSION_14 + case WARNING_CLIENT_ONLY: + { + prefix = gettext_noop("WARNING"); + break; + } +#endif + + case ERROR: + { + prefix = gettext_noop("ERROR"); + break; + } + + case FATAL: + { + prefix = gettext_noop("FATAL"); + break; + } + + case PANIC: + { + prefix = gettext_noop("PANIC"); + break; + } + + default: + { + prefix = "???"; + break; + } + } + + return prefix; +} + + +#endif + + +/* + * bgw_generate_returned_message - + * generates the message to be inserted into the job_run_details table + * first part is comming from error_severity (elog.c) + */ +static void +bgw_generate_returned_message(StringInfoData *display_msg, ErrorData edata) +{ + const char *prefix = error_severity(edata.elevel); + appendStringInfo(display_msg, "%s: %s", prefix, edata.message); + if (edata.detail != NULL) + { + appendStringInfo(display_msg, "\nDETAIL: %s", edata.detail); + } + + if (edata.hint != NULL) + { + appendStringInfo(display_msg, "\nHINT: %s", edata.hint); + } + + if (edata.context != NULL) + { + appendStringInfo(display_msg, "\nCONTEXT: %s", edata.context); + } +} + + +/* + * UpdateDependingTasks updates all depending tasks, based on the type of terminal state + * the current task reached. + */ +static void +UpdateDependingTasks(BackgroundTask *task) +{ + switch (task->status) + { + case BACKGROUND_TASK_STATUS_DONE: + { + UnblockDependingBackgroundTasks(task); + break; + } + + case BACKGROUND_TASK_STATUS_ERROR: + { + /* when we error this task, we need to unschedule all dependant tasks */ + UnscheduleDependentTasks(task); + break; + } + + default: + { + /* nothing to do for other states */ + break; + } + } +} + + +/* + * ConsumeTaskWorkerOutput consumes the output of an executor and mutates the + * BackgroundTask object to reflect changes like the message and status on the task. + */ +static void +ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, bool *hadError) +{ + /* + * Message-parsing routines operate on a null-terminated StringInfo, + * so we must construct one. + */ + StringInfoData msg = { 0 }; + initStringInfo(&msg); + + for (;;) + { + resetStringInfo(&msg); + + /* + * Get next message. Currently blocking, when multiple backends get implemented it + * should switch to a non-blocking receive + */ + Size nbytes = 0; + void *data = NULL; + const bool noWait = false; + shm_mq_result res = shm_mq_receive(responseq, &nbytes, &data, noWait); + + if (res != SHM_MQ_SUCCESS) + { + break; + } + + appendBinaryStringInfo(&msg, data, nbytes); + + /* + * msgtype seems to be documented on + * https://www.postgresql.org/docs/current/protocol-message-formats.html + * + * Here we mostly handle the same message types as supported in pg_cron as the + * executor is highly influenced by the implementation there. + */ + char msgtype = pq_getmsgbyte(&msg); + switch (msgtype) + { + case 'E': /* ErrorResponse */ + { + if (hadError) + { + *hadError = true; + } + __attribute__((fallthrough)); + } + + case 'N': /* NoticeResponse */ + { + ErrorData edata = { 0 }; + StringInfoData display_msg = { 0 }; + + pq_parse_errornotice(&msg, &edata); + initStringInfo(&display_msg); + bgw_generate_returned_message(&display_msg, edata); + + /* we keep only the last message */ + resetStringInfo(message); + appendStringInfoString(message, display_msg.data); + appendStringInfoChar(message, '\n'); + + pfree(display_msg.data); + + break; + } + + case 'C': /* CommandComplete */ + { + const char *tag = pq_getmsgstring(&msg); + + char *nonconst_tag = pstrdup(tag); + + /* append the nonconst_tag to the task's message */ + appendStringInfoString(message, nonconst_tag); + appendStringInfoChar(message, '\n'); + + pfree(nonconst_tag); + + break; + } + + case 'A': + case 'D': + case 'G': + case 'H': + case 'T': + case 'W': + case 'Z': + { + break; + } + + default: + { + elog(WARNING, "unknown message type: %c (%zu bytes)", + msg.data[0], nbytes); + break; + } + } + } + + pfree(msg.data); +} + + +/* + * StoreArgumentsInDSM creates a dynamic shared memory segment to pass the query and its + * environment to the executor. + */ +static dsm_segment * +StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) +{ + /* + * Create the shared memory that we will pass to the background + * worker process. We use DSM_CREATE_NULL_IF_MAXSEGMENTS so that we + * do not ERROR here. This way, we can mark the job as failed and + * keep the launcher process running normally. + */ + shm_toc_estimator e = { 0 }; + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, strlen(database) + 1); + shm_toc_estimate_chunk(&e, strlen(username) + 1); + shm_toc_estimate_chunk(&e, strlen(command) + 1); +#define QUEUE_SIZE ((Size) 65536) + shm_toc_estimate_chunk(&e, QUEUE_SIZE); + shm_toc_estimate_chunk(&e, sizeof(int64)); + shm_toc_estimate_keys(&e, CITUS_BACKGROUND_TASK_NKEYS); + Size segsize = shm_toc_estimate(&e); + + dsm_segment *seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS); + if (seg == NULL) + { + ereport(ERROR, + (errmsg("max number of DSM segments may has been reached"))); + + return NULL; + } + + shm_toc *toc = shm_toc_create(CITUS_BACKGROUND_TASK_MAGIC, dsm_segment_address(seg), + segsize); + + Size size = strlen(database) + 1; + char *databaseTarget = shm_toc_allocate(toc, size); + strcpy_s(databaseTarget, size, database); + shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_DATABASE, databaseTarget); + + size = strlen(username) + 1; + char *usernameTarget = shm_toc_allocate(toc, size); + strcpy_s(usernameTarget, size, username); + shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_USERNAME, usernameTarget); + + size = strlen(command) + 1; + char *commandTarget = shm_toc_allocate(toc, size); + strcpy_s(commandTarget, size, command); + shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_COMMAND, commandTarget); + + shm_mq *mq = shm_mq_create(shm_toc_allocate(toc, QUEUE_SIZE), QUEUE_SIZE); + shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, mq); + shm_mq_set_receiver(mq, MyProc); + + int64 *taskIdTarget = shm_toc_allocate(toc, sizeof(int64)); + *taskIdTarget = taskId; + shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, taskIdTarget); + + /* + * Attach the queue before launching a worker, so that we'll automatically + * detach the queue if we error out. (Otherwise, the worker might sit + * there trying to write the queue long after we've gone away.) + */ + MemoryContext oldcontext = MemoryContextSwitchTo(TopMemoryContext); + shm_mq_attach(mq, seg, NULL); + MemoryContextSwitchTo(oldcontext); + + return seg; +} + + +/* + * StartCitusBackgroundTaskExecuter start a new background worker for the execution of a + * background task. Callers interested in the shared memory segment that is created + * between the background worker and the current backend can pass in a segOut to get a + * pointer to the dynamic shared memory. + */ +static BackgroundWorkerHandle * +StartCitusBackgroundTaskExecuter(char *database, char *user, char *command, int64 taskId, + dsm_segment **pSegment) +{ + dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId); + + /* Configure a worker. */ + BackgroundWorker worker = { 0 }; + memset(&worker, 0, sizeof(worker)); + SafeSnprintf(worker.bgw_name, BGW_MAXLEN, + "Citus Background Task Queue Executor: %s/%s", + database, user); + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + + /* don't restart, we manage restarts from maintenance daemon */ + worker.bgw_restart_time = BGW_NEVER_RESTART; + strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); + strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), + "CitusBackgroundTaskExecuter"); + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); + worker.bgw_notify_pid = MyProcPid; + + BackgroundWorkerHandle *handle = NULL; + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + { + dsm_detach(seg); + return NULL; + } + + pid_t pid = { 0 }; + WaitForBackgroundWorkerStartup(handle, &pid); + + if (pSegment) + { + *pSegment = seg; + } + + return handle; +} + + +/* + * context for any log/error messages emitted from the background task executor. + */ +typedef struct CitusBackgroundJobExecuterErrorCallbackContext +{ + const char *database; + const char *username; +} CitusBackgroundJobExecuterErrorCallbackContext; + + +/* + * CitusBackgroundJobExecuterErrorCallback is a callback handler that gets called for any + * ereport to add extra context to the message. + */ +static void +CitusBackgroundJobExecuterErrorCallback(void *arg) +{ + CitusBackgroundJobExecuterErrorCallbackContext *context = + (CitusBackgroundJobExecuterErrorCallbackContext *) arg; + errcontext("Citus Background Task Queue Executor: %s/%s", context->database, + context->username); +} + + +/* + * CitusBackgroundTaskExecuter is the main function of the background tasks queue + * executor. This backend attaches to a shared memory segment as identified by the + * main_arg of the background worker. + * + * This is mostly based on the background worker logic in pg_cron + */ +void +CitusBackgroundTaskExecuter(Datum main_arg) +{ + /* + * TODO figure out if we need this signal handler that is in pgcron + * pqsignal(SIGTERM, pg_cron_background_worker_sigterm); + */ + BackgroundWorkerUnblockSignals(); + + /* Set up a memory context and resource owner. */ + Assert(CurrentResourceOwner == NULL); + CurrentResourceOwner = ResourceOwnerCreate(NULL, "citus background job"); + CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext, + "citus background job execution", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* Set up a dynamic shared memory segment. */ + dsm_segment *seg = dsm_attach(DatumGetInt32(main_arg)); + if (seg == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); + } + + shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_TASK_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + } + + char *database = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_DATABASE, false); + char *username = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_USERNAME, false); + char *command = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_COMMAND, false); + int64 *taskId = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, false); + shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false); + + shm_mq_set_sender(mq, MyProc); + shm_mq_handle *responseq = shm_mq_attach(mq, seg, NULL); + pq_redirect_to_shm_mq(seg, responseq); + + /* setup error context to indicate the errors came from a running background task */ + ErrorContextCallback errorCallback = { 0 }; + CitusBackgroundJobExecuterErrorCallbackContext context = { + .database = database, + .username = username, + }; + errorCallback.callback = CitusBackgroundJobExecuterErrorCallback; + errorCallback.arg = (void *) &context; + errorCallback.previous = error_context_stack; + error_context_stack = &errorCallback; + + BackgroundWorkerInitializeConnection(database, username, 0); + + /* make sure we are the only backend running for this task */ + LOCKTAG locktag = { 0 }; + SET_LOCKTAG_BACKGROUND_TASK(locktag, *taskId); + const bool sessionLock = true; + const bool dontWait = true; + LockAcquireResult locked = + LockAcquire(&locktag, AccessExclusiveLock, sessionLock, dontWait); + if (locked == LOCKACQUIRE_NOT_AVAIL) + { + ereport(ERROR, (errmsg("unable to acquire background task lock for taskId: %ld", + *taskId), + errdetail("this indicates that an other backend is already " + "executing this task"))); + } + + /* Prepare to execute the query. */ + SetCurrentStatementStartTimestamp(); + debug_query_string = command; + char *appname = psprintf("citus background task queue executor (taskId %ld)", + *taskId); + pgstat_report_appname(appname); + pgstat_report_activity(STATE_RUNNING, command); + StartTransactionCommand(); + if (StatementTimeout > 0) + { + enable_timeout_after(STATEMENT_TIMEOUT, StatementTimeout); + } + else + { + disable_timeout(STATEMENT_TIMEOUT, false); + } + + /* Execute the query. */ + ExecuteSqlString(command); + + /* Post-execution cleanup. */ + disable_timeout(STATEMENT_TIMEOUT, false); + CommitTransactionCommand(); + pgstat_report_activity(STATE_IDLE, command); + pgstat_report_stat(true); + + /* Signal that we are done. */ + ReadyForQuery(DestRemote); + + dsm_detach(seg); + proc_exit(0); +} + + +/* + * Execute given SQL string without SPI or a libpq session. + */ +static void +ExecuteSqlString(const char *sql) +{ + /* + * Parse the SQL string into a list of raw parse trees. + * + * Because we allow statements that perform internal transaction control, + * we can't do this in TopTransactionContext; the parse trees might get + * blown away before we're done executing them. + */ + MemoryContext parsecontext = AllocSetContextCreate(CurrentMemoryContext, + "query parse/plan", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContext oldcontext = MemoryContextSwitchTo(parsecontext); + List *raw_parsetree_list = pg_parse_query(sql); + int commands_remaining = list_length(raw_parsetree_list); + bool isTopLevel = commands_remaining == 1; + MemoryContextSwitchTo(oldcontext); + + /* + * Do parse analysis, rule rewrite, planning, and execution for each raw + * parsetree. We must fully execute each query before beginning parse + * analysis on the next one, since there may be interdependencies. + */ + RawStmt *parsetree = NULL; + foreach_ptr(parsetree, raw_parsetree_list) + { + /* + * We don't allow transaction-control commands like COMMIT and ABORT + * here. The entire SQL statement is executed as a single transaction + * which commits if no errors are encountered. + */ + if (IsA(parsetree, TransactionStmt)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "transaction control statements are not allowed in background job"))); + } + + /* + * Get the command name for use in status display (it also becomes the + * default completion tag, down inside PortalRun). Set ps_status and + * do any special start-of-SQL-command processing needed by the + * destination. + */ + CommandTag commandTag = CreateCommandTag(parsetree->stmt); + set_ps_display(GetCommandTagName(commandTag)); + BeginCommand(commandTag, DestNone); + + /* Set up a snapshot if parse analysis/planning will need one. */ + bool snapshot_set = false; + if (analyze_requires_snapshot(parsetree)) + { + PushActiveSnapshot(GetTransactionSnapshot()); + snapshot_set = true; + } + + /* + * OK to analyze, rewrite, and plan this query. + * + * As with parsing, we need to make sure this data outlives the + * transaction, because of the possibility that the statement might + * perform internal transaction control. + */ + oldcontext = MemoryContextSwitchTo(parsecontext); + +#if PG_VERSION_NUM >= 150000 + List *querytree_list = + pg_analyze_and_rewrite_fixedparams(parsetree, sql, NULL, 0, NULL); +#else + List *querytree_list = + pg_analyze_and_rewrite(parsetree, sql, NULL, 0, NULL); +#endif + + List *plantree_list = pg_plan_queries(querytree_list, sql, 0, NULL); + + /* Done with the snapshot used for parsing/planning */ + if (snapshot_set) + { + PopActiveSnapshot(); + } + + /* If we got a cancel signal in analysis or planning, quit */ + CHECK_FOR_INTERRUPTS(); + + /* + * Execute the query using the unnamed portal. + */ + Portal portal = CreatePortal("", true, true); + + /* Don't display the portal in pg_cursors */ + portal->visible = false; + PortalDefineQuery(portal, NULL, sql, commandTag, plantree_list, NULL); + PortalStart(portal, NULL, 0, InvalidSnapshot); + int16 format[] = { 1 }; + PortalSetResultFormat(portal, lengthof(format), format); /* binary format */ + + commands_remaining--; + DestReceiver *receiver = CreateDestReceiver(DestNone); + + /* + * Only once the portal and destreceiver have been established can + * we return to the transaction context. All that stuff needs to + * survive an internal commit inside PortalRun! + */ + MemoryContextSwitchTo(oldcontext); + + /* Here's where we actually execute the command. */ + QueryCompletion qc = { 0 }; + (void) PortalRun(portal, FETCH_ALL, isTopLevel, true, receiver, receiver, &qc); + + /* Clean up the receiver. */ + (*receiver->rDestroy)(receiver); + + /* + * Send a CommandComplete message even if we suppressed the query + * results. The user backend will report these in the absence of + * any true query results. + */ + EndCommand(&qc, DestRemote, false); + + /* Clean up the portal. */ + PortalDrop(portal, false); + } + + /* Be sure to advance the command counter after the last script command */ + CommandCounterIncrement(); +} diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index f3e4014b2..3836c506a 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -27,11 +27,13 @@ #include "access/xlog.h" #include "catalog/pg_extension.h" #include "citus_version.h" +#include "catalog/pg_authid.h" #include "catalog/pg_namespace.h" #include "commands/async.h" #include "commands/extension.h" #include "libpq/pqsignal.h" #include "catalog/namespace.h" +#include "distributed/background_jobs.h" #include "distributed/citus_safe_lib.h" #include "distributed/distributed_deadlock_detection.h" #include "distributed/maintenanced.h" @@ -54,8 +56,10 @@ #include "storage/lwlock.h" #include "tcop/tcopprot.h" #include "common/hashfn.h" +#include "utils/builtins.h" #include "utils/memutils.h" #include "utils/lsyscache.h" +#include "distributed/resource_lock.h" /* * Shared memory data for all maintenance workers. @@ -93,6 +97,7 @@ typedef struct MaintenanceDaemonDBData double DistributedDeadlockDetectionTimeoutFactor = 2.0; int Recover2PCInterval = 60000; int DeferShardDeleteInterval = 15000; +int BackgroundTaskQueueCheckInterval = 5000; /* config variables for metadata sync timeout */ int MetadataSyncInterval = 60000; @@ -120,7 +125,6 @@ static void MaintenanceDaemonErrorContext(void *arg); static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData); static void WarnMaintenanceDaemonNotStarted(void); - /* * InitializeMaintenanceDaemon, called at server start, is responsible for * requesting shared memory and related infrastructure required by maintenance @@ -277,12 +281,15 @@ CitusMaintenanceDaemonMain(Datum main_arg) TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000); bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false; - ErrorContextCallback errorCallback; TimestampTz lastRecoveryTime = 0; TimestampTz lastShardCleanTime = 0; TimestampTz lastStatStatementsPurgeTime = 0; TimestampTz nextMetadataSyncTime = 0; + /* state kept for the background tasks queue monitor */ + TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp(); + BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL; + bool backgroundTasksQueueWarnedForLock = false; /* * We do metadata sync in a separate background worker. We need its @@ -354,6 +361,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) * Do so before setting up signals etc, so we never exit without the * context setup. */ + ErrorContextCallback errorCallback = { 0 }; memset(&errorCallback, 0, sizeof(errorCallback)); errorCallback.callback = MaintenanceDaemonErrorContext; errorCallback.arg = (void *) myDbData; @@ -682,6 +690,108 @@ CitusMaintenanceDaemonMain(Datum main_arg) timeout = Min(timeout, (StatStatementsPurgeInterval * 1000)); } + pid_t backgroundTaskQueueWorkerPid = 0; + BgwHandleStatus backgroundTaskQueueWorkerStatus = + backgroundTasksQueueBgwHandle != NULL ? GetBackgroundWorkerPid( + backgroundTasksQueueBgwHandle, &backgroundTaskQueueWorkerPid) : + BGWH_STOPPED; + if (!RecoveryInProgress() && BackgroundTaskQueueCheckInterval > 0 && + TimestampDifferenceExceeds(lastBackgroundTaskQueueCheck, + GetCurrentTimestamp(), + BackgroundTaskQueueCheckInterval) && + backgroundTaskQueueWorkerStatus == BGWH_STOPPED) + { + /* clear old background worker for task queue before checking for new tasks */ + if (backgroundTasksQueueBgwHandle) + { + pfree(backgroundTasksQueueBgwHandle); + backgroundTasksQueueBgwHandle = NULL; + } + + StartTransactionCommand(); + + bool shouldStartBackgroundTaskQueueBackgroundWorker = false; + if (!LockCitusExtension()) + { + ereport(DEBUG1, (errmsg("could not lock the citus extension, " + "skipping stat statements purging"))); + } + else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) + { + /* perform catalog precheck */ + shouldStartBackgroundTaskQueueBackgroundWorker = + HasRunnableBackgroundTask(); + } + + CommitTransactionCommand(); + + if (shouldStartBackgroundTaskQueueBackgroundWorker) + { + /* + * Before we start the background worker we want to check if an orphaned + * one is still running. This could happen when the maintenance daemon + * restarted in a way where the background task queue monitor wasn't + * restarted. + * + * To check if an orphaned background task queue monitor is still running + * we quickly acquire the lock without waiting. If we can't acquire the + * lock this means that some other backed still has the lock. We prevent a + * new backend from starting and log a warning that we found that another + * process still holds the lock. + */ + LOCKTAG tag = { 0 }; + SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_BACKGROUND_TASK_MONITOR); + const bool sessionLock = false; + const bool dontWait = true; + LockAcquireResult locked = + LockAcquire(&tag, AccessExclusiveLock, sessionLock, dontWait); + + if (locked == LOCKACQUIRE_NOT_AVAIL) + { + if (!backgroundTasksQueueWarnedForLock) + { + ereport(WARNING, (errmsg("background task queue monitor already " + "held"), + errdetail("the background task queue monitor " + "lock is held by another backend, " + "indicating the maintenance daemon " + "has lost track of an already " + "running background task queue " + "monitor, not starting a new one"))); + backgroundTasksQueueWarnedForLock = true; + } + } + else + { + LockRelease(&tag, AccessExclusiveLock, sessionLock); + + /* we were able to acquire the lock, reset the warning tracker */ + backgroundTasksQueueWarnedForLock = false; + + /* spawn background worker */ + ereport(LOG, (errmsg("found scheduled background tasks, starting new " + "background task queue monitor"))); + + backgroundTasksQueueBgwHandle = + StartCitusBackgroundTaskQueueMonitor(MyDatabaseId, + myDbData->userOid); + + if (!backgroundTasksQueueBgwHandle || + GetBackgroundWorkerPid(backgroundTasksQueueBgwHandle, + &backgroundTaskQueueWorkerPid) == + BGWH_STOPPED) + { + ereport(WARNING, (errmsg("unable to start background worker for " + "background task execution"))); + } + } + } + + /* interval management */ + lastBackgroundTaskQueueCheck = GetCurrentTimestamp(); + timeout = Min(timeout, BackgroundTaskQueueCheckInterval); + } + /* * Wait until timeout, or until somebody wakes us up. Also cast the timeout to * integer where we've calculated it using double for not losing the precision. diff --git a/src/include/distributed/background_jobs.h b/src/include/distributed/background_jobs.h new file mode 100644 index 000000000..e38e57569 --- /dev/null +++ b/src/include/distributed/background_jobs.h @@ -0,0 +1,23 @@ +/*------------------------------------------------------------------------- + * + * background_jobs.h + * Functions related to running the background tasks queue monitor. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_BACKGROUND_JOBS_H +#define CITUS_BACKGROUND_JOBS_H + +#include "postgres.h" + +#include "postmaster/bgworker.h" + +extern BackgroundWorkerHandle * StartCitusBackgroundTaskQueueMonitor(Oid database, + Oid extensionOwner); +extern void CitusBackgroundTaskQueueMonitorMain(Datum arg); +extern void CitusBackgroundTaskExecuter(Datum main_arg); + +#endif /*CITUS_BACKGROUND_JOBS_H */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index a6b3732c4..8632bae9c 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -230,6 +230,8 @@ extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); extern Oid DistPlacementRelationId(void); extern Oid DistNodeRelationId(void); +extern Oid DistBackgroundJobRelationId(void); +extern Oid DistBackgroundTaskRelationId(void); extern Oid DistRebalanceStrategyRelationId(void); extern Oid DistLocalGroupIdRelationId(void); extern Oid DistObjectRelationId(void); @@ -239,6 +241,13 @@ extern Oid DistEnabledCustomAggregatesId(void); extern Oid DistNodeNodeIdIndexId(void); extern Oid DistPartitionLogicalRelidIndexId(void); extern Oid DistPartitionColocationidIndexId(void); +extern Oid DistBackgroundJobPKeyIndexId(void); +extern Oid DistBackgroundTaskPKeyIndexId(void); +extern Oid DistBackgroundTaskJobIdTaskIdIndexId(void); +extern Oid DistBackgroundTaskStatusTaskIdIndexId(void); +extern Oid DistBackgroundTaskDependRelationId(void); +extern Oid DistBackgroundTaskDependTaskIdIndexId(void); +extern Oid DistBackgroundTaskDependDependsOnIndexId(void); extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardShardidIndexId(void); extern Oid DistPlacementShardidIndexId(void); @@ -250,6 +259,10 @@ extern Oid DistPlacementGroupidIndexId(void); extern Oid DistObjectPrimaryKeyIndexId(void); extern Oid DistCleanupPrimaryKeyIndexId(void); +/* sequence oids */ +extern Oid DistBackgroundJobJobIdSequenceId(void); +extern Oid DistBackgroundTaskTaskIdSequenceId(void); + /* type oids */ extern Oid LookupTypeOid(char *schemaNameSting, char *typeNameString); extern Oid CitusCopyFormatTypeId(void); @@ -272,6 +285,21 @@ extern Oid SecondaryNodeRoleId(void); extern Oid CitusCopyFormatTypeId(void); extern Oid TextCopyFormatId(void); extern Oid BinaryCopyFormatId(void); +extern Oid CitusJobStatusScheduledId(void); +extern Oid CitusJobStatusRunningId(void); +extern Oid CitusJobStatusCancellingId(void); +extern Oid CitusJobStatusFinishedId(void); +extern Oid CitusJobStatusCancelledId(void); +extern Oid CitusJobStatusFailedId(void); +extern Oid CitusJobStatusFailingId(void); +extern Oid CitusTaskStatusBlockedId(void); +extern Oid CitusTaskStatusRunnableId(void); +extern Oid CitusTaskStatusRunningId(void); +extern Oid CitusTaskStatusDoneId(void); +extern Oid CitusTaskStatusErrorId(void); +extern Oid CitusTaskStatusUnscheduledId(void); +extern Oid CitusTaskStatusCancelledId(void); +extern Oid CitusTaskStatusCancellingId(void); /* user related functions */ extern Oid CitusExtensionOwner(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 793790fe0..748c9ff81 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -203,6 +203,74 @@ typedef enum SizeQueryType TABLE_SIZE /* pg_table_size() */ } SizeQueryType; +typedef enum BackgroundJobStatus +{ + BACKGROUND_JOB_STATUS_SCHEDULED, + BACKGROUND_JOB_STATUS_RUNNING, + BACKGROUND_JOB_STATUS_FINISHED, + BACKGROUND_JOB_STATUS_CANCELLING, + BACKGROUND_JOB_STATUS_CANCELLED, + BACKGROUND_JOB_STATUS_FAILING, + BACKGROUND_JOB_STATUS_FAILED +} BackgroundJobStatus; + +typedef struct BackgroundJob +{ + int64 jobid; + BackgroundJobStatus state; + char *jobType; + char *description; + TimestampTz *started_at; + TimestampTz *finished_at; + + /* extra space to store values for nullable value types above */ + struct + { + TimestampTz started_at; + TimestampTz finished_at; + } __nullable_storage; +} BackgroundJob; + +typedef enum BackgroundTaskStatus +{ + BACKGROUND_TASK_STATUS_BLOCKED, + BACKGROUND_TASK_STATUS_RUNNABLE, + BACKGROUND_TASK_STATUS_RUNNING, + BACKGROUND_TASK_STATUS_CANCELLING, + BACKGROUND_TASK_STATUS_DONE, + BACKGROUND_TASK_STATUS_ERROR, + BACKGROUND_TASK_STATUS_UNSCHEDULED, + BACKGROUND_TASK_STATUS_CANCELLED +} BackgroundTaskStatus; + +typedef struct BackgroundTask +{ + int64 jobid; + int64 taskid; + Oid owner; + int32 *pid; + BackgroundTaskStatus status; + char *command; + int32 *retry_count; + TimestampTz *not_before; + char *message; + + /* extra space to store values for nullable value types above */ + struct + { + int32 pid; + int32 retry_count; + TimestampTz not_before; + } __nullable_storage; +} BackgroundTask; + +#define SET_NULLABLE_FIELD(ptr, field, value) \ + (ptr)->__nullable_storage.field = (value); \ + (ptr)->field = &((ptr)->__nullable_storage.field) + +#define UNSET_NULLABLE_FIELD(ptr, field) \ + (ptr)->field = NULL; \ + memset_struct_0((ptr)->__nullable_storage.field) /* Size functions */ extern Datum citus_table_size(PG_FUNCTION_ARGS); @@ -315,4 +383,27 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid ownerRelationId); extern void AlterSequenceType(Oid seqOid, Oid typeOid); extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId); +extern bool HasRunnableBackgroundTask(void); +extern int64 CreateBackgroundJob(const char *jobType, const char *description); +extern BackgroundTask * ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, + int dependingTaskCount, + int64 dependingTaskIds[]); +extern BackgroundTask * GetRunnableBackgroundTask(void); +extern void ResetRunningBackgroundTasks(void); +extern BackgroundJob * GetBackgroundJobByJobId(int64 jobId); +extern BackgroundTask * GetBackgroundTaskByTaskId(int64 taskId); +extern void UpdateBackgroundJob(int64 jobId); +extern void UpdateBackgroundTask(BackgroundTask *task); +extern void UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status, + const int32 *retry_count, char *message); +extern bool UpdateJobError(BackgroundTask *job, ErrorData *edata); +extern List * CancelTasksForJob(int64 jobid); +extern void UnscheduleDependentTasks(BackgroundTask *task); +extern void UnblockDependingBackgroundTasks(BackgroundTask *task); +extern BackgroundJobStatus BackgroundJobStatusByOid(Oid enumOid); +extern BackgroundTaskStatus BackgroundTaskStatusByOid(Oid enumOid); +extern bool IsBackgroundJobStatusTerminal(BackgroundJobStatus status); +extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status); +extern Oid BackgroundJobStatusOid(BackgroundJobStatus status); +extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status); #endif /* METADATA_UTILITY_H */ diff --git a/src/include/distributed/pg_dist_background_job.h b/src/include/distributed/pg_dist_background_job.h new file mode 100644 index 000000000..3a0838c17 --- /dev/null +++ b/src/include/distributed/pg_dist_background_job.h @@ -0,0 +1,26 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_background_job.h + * definition of the relation that holds the jobs metadata + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_PG_DIST_BACKGROUND_JOB_H +#define CITUS_PG_DIST_BACKGROUND_JOB_H + +/* ---------------- + * compiler constants for pg_dist_background_job + * ---------------- + */ +#define Natts_pg_dist_background_job 6 +#define Anum_pg_dist_background_job_job_id 1 +#define Anum_pg_dist_background_job_state 2 +#define Anum_pg_dist_background_job_job_type 3 +#define Anum_pg_dist_background_job_description 4 +#define Anum_pg_dist_background_job_started_at 5 +#define Anum_pg_dist_background_job_finished_at 6 + +#endif /* CITUS_PG_DIST_BACKGROUND_JOB_H */ diff --git a/src/include/distributed/pg_dist_background_task.h b/src/include/distributed/pg_dist_background_task.h new file mode 100644 index 000000000..b6d132e59 --- /dev/null +++ b/src/include/distributed/pg_dist_background_task.h @@ -0,0 +1,29 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_background_task.h + * definition of the relation that holds the tasks metadata + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_PG_DIST_BACKGROUND_TASK_H +#define CITUS_PG_DIST_BACKGROUND_TASK_H + +/* ---------------- + * compiler constants for pg_dist_background_task + * ---------------- + */ +#define Natts_pg_dist_background_task 9 +#define Anum_pg_dist_background_task_job_id 1 +#define Anum_pg_dist_background_task_task_id 2 +#define Anum_pg_dist_background_task_owner 3 +#define Anum_pg_dist_background_task_pid 4 +#define Anum_pg_dist_background_task_status 5 +#define Anum_pg_dist_background_task_command 6 +#define Anum_pg_dist_background_task_retry_count 7 +#define Anum_pg_dist_background_task_not_before 8 +#define Anum_pg_dist_background_task_message 9 + +#endif /* CITUS_PG_DIST_BACKGROUND_TASK_H */ diff --git a/src/include/distributed/pg_dist_backrgound_task_depend.h b/src/include/distributed/pg_dist_backrgound_task_depend.h new file mode 100644 index 000000000..1df24724b --- /dev/null +++ b/src/include/distributed/pg_dist_backrgound_task_depend.h @@ -0,0 +1,35 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_background_task_depend.h + * definition of the relation that holds which tasks depend on each + * other. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_PG_DIST_BACKGROUND_TASK_DEPEND_H +#define CITUS_PG_DIST_BACKGROUND_TASK_DEPEND_H + +typedef struct FormData_pg_dist_background_task_depend +{ + int64 job_id; + int64 task_id; + int64 depends_on; +#ifdef CATALOG_VARLEN /* variable-length fields start here */ +#endif +} FormData_pg_dist_background_task_depend; +typedef FormData_pg_dist_background_task_depend *Form_pg_dist_background_task_depend; + + +/* ---------------- + * compiler constants for pg_dist_background_task_depend + * ---------------- + */ +#define Natts_pg_dist_background_task_depend 3 +#define Anum_pg_dist_background_task_depend_job_id 1 +#define Anum_pg_dist_background_task_depend_task_id 2 +#define Anum_pg_dist_background_task_depend_depends_on 3 + +#endif /* CITUS_PG_DIST_BACKGROUND_TASK_DEPEND_H */ diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 0a7ea674b..9e143e467 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -42,7 +42,8 @@ typedef enum AdvisoryLocktagClass ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9, ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID = 10, ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, - ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13 + ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13, + ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK = 14 } AdvisoryLocktagClass; /* CitusOperations has constants for citus operations */ @@ -52,7 +53,8 @@ typedef enum CitusOperations CITUS_NONBLOCKING_SPLIT = 1, CITUS_CREATE_DISTRIBUTED_TABLE_CONCURRENTLY = 2, CITUS_CREATE_COLOCATION_DEFAULT = 3, - CITUS_SHARD_MOVE = 4 + CITUS_SHARD_MOVE = 4, + CITUS_BACKGROUND_TASK_MONITOR = 5 } CitusOperations; /* reuse advisory lock, but with different, unused field 4 (4)*/ @@ -129,6 +131,16 @@ typedef enum CitusOperations (uint32) 0, \ ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION) +/* reuse advisory lock, but with different, unused field 4 (14) + * Also it has the database hardcoded to MyDatabaseId, to ensure the locks + * are local to each database */ +#define SET_LOCKTAG_BACKGROUND_TASK(tag, taskId) \ + SET_LOCKTAG_ADVISORY(tag, \ + MyDatabaseId, \ + (uint32) ((taskId) >> 32), \ + (uint32) (taskId), \ + ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK) + /* * DistLockConfigs are used to configure the locking behaviour of AcquireDistributedLockOnRelations */ diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index df316f493..d5d2a3617 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -13,6 +13,7 @@ /* GUC to configure deferred shard deletion */ extern int DeferShardDeleteInterval; +extern int BackgroundTaskQueueCheckInterval; extern bool DeferShardDeleteOnMove; extern double DesiredPercentFreeAfterMove; extern bool CheckAvailableSpaceBeforeMove; diff --git a/src/test/regress/expected/background_task_queue_monitor.out b/src/test/regress/expected/background_task_queue_monitor.out new file mode 100644 index 000000000..49744771d --- /dev/null +++ b/src/test/regress/expected/background_task_queue_monitor.out @@ -0,0 +1,218 @@ +CREATE SCHEMA background_task_queue_monitor; +SET search_path TO background_task_queue_monitor; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 3536400; +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +CREATE TABLE results (a int); +-- simple job that inserts 1 into results to show that query runs +SELECT a FROM results WHERE a = 1; -- verify result is not in there + a +--------------------------------------------------------------------- +(0 rows) + +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ INSERT INTO background_task_queue_monitor.results VALUES ( 1 ); $job$) RETURNING task_id \gset +SELECT citus_job_wait(:job_id); -- wait for the job to be finished + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT a FROM results WHERE a = 1; -- verify result is there + a +--------------------------------------------------------------------- + 1 +(1 row) + +-- cancel a scheduled job +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancel a scheduled job') RETURNING job_id \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset +SELECT citus_job_cancel(:job_id); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +-- verify we get an error when waiting for a job to reach a specific status while it is already in a different terminal status +SELECT citus_job_wait(:job_id, desired_status => 'finished'); +ERROR: Job reached terminal state "cancelled" instead of desired state "finished" +-- show that the status has been cancelled +SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; + state | did_start +--------------------------------------------------------------------- + cancelled | f +(1 row) + +SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; + status | did_start +--------------------------------------------------------------------- + cancelled | f +(1 row) + +-- cancel a running job +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancelling a task after it started') RETURNING job_id \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset +SELECT citus_job_wait(:job_id, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_cancel(:job_id); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +-- show that the status has been cancelled +SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; + state | did_start +--------------------------------------------------------------------- + cancelled | t +(1 row) + +SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; + status | did_start +--------------------------------------------------------------------- + cancelled | t +(1 row) + +-- test a failing task becomes runnable in the future again +-- we cannot fully test the backoff strategy currently as it is hard coded to take about 50 minutes. +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'failure test due to division by zero') RETURNING job_id \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT 1/0; $job$) RETURNING task_id \gset +SELECT citus_job_wait(:job_id, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT pg_sleep(.1); -- make sure it has time to error after it started running + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT status, pid, retry_count, NOT(message = '') AS has_message, (not_before > now()) AS scheduled_into_the_future FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; + status | pid | retry_count | has_message | scheduled_into_the_future +--------------------------------------------------------------------- + runnable | | 1 | t | t +(1 row) + +-- test cancelling a failed/retrying job +SELECT citus_job_cancel(:job_id); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; + state | did_start +--------------------------------------------------------------------- + cancelled | t +(1 row) + +SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; + status | did_start +--------------------------------------------------------------------- + cancelled | t +(1 row) + +-- test running two dependant tasks +TRUNCATE TABLE results; +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ INSERT INTO background_task_queue_monitor.results VALUES ( 5 ); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ UPDATE background_task_queue_monitor.results SET a = a * 7; $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ UPDATE background_task_queue_monitor.results SET a = a + 13; $job$) RETURNING task_id AS task_id3 \gset +INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id2, :task_id1); +INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id3, :task_id2); +COMMIT; +SELECT citus_job_wait(:job_id); -- wait for the job to be finished + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT a FROM results; + a +--------------------------------------------------------------------- + 48 +(1 row) + +-- test running two dependant tasks, with a failing task that we cancel +TRUNCATE TABLE results; +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ INSERT INTO background_task_queue_monitor.results VALUES ( 5 ); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ SELECT 1/0; $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ UPDATE background_task_queue_monitor.results SET a = a + 13; $job$) RETURNING task_id AS task_id3 \gset +INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id2, :task_id1); +INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id3, :task_id2); +COMMIT; +SELECT citus_job_wait(:job_id, desired_status => 'running'); -- wait for the job to be running, and possibly hitting a failure + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT pg_sleep(.1); -- improve chances of hitting the failure + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_cancel(:job_id); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id); -- wait for the job to be cancelled + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; + state | did_start +--------------------------------------------------------------------- + cancelled | t +(1 row) + +SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; + status | did_start +--------------------------------------------------------------------- + done | t + cancelled | t + cancelled | f +(3 rows) + +SET client_min_messages TO WARNING; +DROP SCHEMA background_task_queue_monitor CASCADE; +ALTER SYSTEM RESET citus.background_task_queue_interval; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index c79771817..34d98b7d8 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1138,6 +1138,8 @@ SELECT * FROM multi_extension.print_extension_changes(); table columnar.stripe | | function citus_cleanup_orphaned_resources() | function citus_internal_delete_partition_metadata(regclass) void + | function citus_job_cancel(bigint) void + | function citus_job_wait(bigint,citus_job_status) void | function citus_locks() SETOF record | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void | function create_distributed_table_concurrently(regclass,text,citus.distribution_type,text,integer) void @@ -1147,14 +1149,21 @@ SELECT * FROM multi_extension.print_extension_changes(); | function worker_split_copy(bigint,text,split_copy_info[]) void | function worker_split_shard_release_dsm() void | function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info + | sequence pg_dist_background_job_job_id_seq + | sequence pg_dist_background_task_task_id_seq | sequence pg_dist_cleanup_recordid_seq | sequence pg_dist_operationid_seq + | table pg_dist_background_job + | table pg_dist_background_task + | table pg_dist_background_task_depend | table pg_dist_cleanup + | type citus_job_status + | type citus_task_status | type replication_slot_info | type split_copy_info | type split_shard_info | view citus_locks -(41 rows) +(50 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_basic_after.out b/src/test/regress/expected/upgrade_basic_after.out index ad77b2157..3448b2fc5 100644 --- a/src/test/regress/expected/upgrade_basic_after.out +++ b/src/test/regress/expected/upgrade_basic_after.out @@ -51,6 +51,18 @@ SELECT nextval('pg_dist_cleanup_recordid_seq') = MAX(record_id)+1 FROM pg_dist_c (1 row) +SELECT nextval('pg_dist_background_job_job_id_seq') > COALESCE(MAX(job_id), 0) FROM pg_dist_background_job; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT nextval('pg_dist_background_task_task_id_seq') > COALESCE(MAX(task_id), 0) FROM pg_dist_background_task; + ?column? +--------------------------------------------------------------------- + t +(1 row) + -- If this query gives output it means we've added a new sequence that should -- possibly be restored after upgrades. SELECT sequence_name FROM information_schema.sequences @@ -63,7 +75,9 @@ SELECT sequence_name FROM information_schema.sequences 'pg_dist_node_nodeid_seq', 'pg_dist_colocationid_seq', 'pg_dist_operationid_seq', - 'pg_dist_cleanup_recordid_seq' + 'pg_dist_cleanup_recordid_seq', + 'pg_dist_background_job_job_id_seq', + 'pg_dist_background_task_task_id_seq' ); sequence_name --------------------------------------------------------------------- diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index bf15d2fbf..2ddde1310 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -74,6 +74,8 @@ ORDER BY 1; function citus_internal_update_relation_colocation(oid,integer) function citus_is_coordinator() function citus_isolation_test_session_is_blocked(integer,integer[]) + function citus_job_cancel(bigint) + function citus_job_wait(bigint, citus_job_status) function citus_json_concatenate(json,json) function citus_json_concatenate_final(json) function citus_jsonb_concatenate(jsonb,jsonb) @@ -236,6 +238,8 @@ ORDER BY 1; function worker_split_shard_replication_setup(split_shard_info[]) schema citus schema citus_internal + sequence pg_dist_background_job_job_id_seq + sequence pg_dist_background_task_task_id_seq sequence pg_dist_cleanup_recordid_seq sequence pg_dist_colocationid_seq sequence pg_dist_groupid_seq @@ -244,6 +248,9 @@ ORDER BY 1; sequence pg_dist_placement_placementid_seq sequence pg_dist_shardid_seq table pg_dist_authinfo + table pg_dist_background_job + table pg_dist_background_task + table pg_dist_background_task_depend table pg_dist_cleanup table pg_dist_colocation table pg_dist_local_group @@ -259,6 +266,8 @@ ORDER BY 1; type citus.distribution_type type citus.shard_transfer_mode type citus_copy_format + type citus_job_status + type citus_task_status type noderole type replication_slot_info type split_copy_info @@ -274,5 +283,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(266 rows) +(275 rows) diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index ef64e7657..c7b804f85 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -98,6 +98,7 @@ test: issue_5248 issue_5099 test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes +test: background_task_queue_monitor # --------- diff --git a/src/test/regress/sql/background_task_queue_monitor.sql b/src/test/regress/sql/background_task_queue_monitor.sql new file mode 100644 index 000000000..d8dddc720 --- /dev/null +++ b/src/test/regress/sql/background_task_queue_monitor.sql @@ -0,0 +1,98 @@ +CREATE SCHEMA background_task_queue_monitor; +SET search_path TO background_task_queue_monitor; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 3536400; + +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +SELECT pg_reload_conf(); + +CREATE TABLE results (a int); + +-- simple job that inserts 1 into results to show that query runs +SELECT a FROM results WHERE a = 1; -- verify result is not in there +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ INSERT INTO background_task_queue_monitor.results VALUES ( 1 ); $job$) RETURNING task_id \gset +SELECT citus_job_wait(:job_id); -- wait for the job to be finished +SELECT a FROM results WHERE a = 1; -- verify result is there + +-- cancel a scheduled job +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancel a scheduled job') RETURNING job_id \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset + +SELECT citus_job_cancel(:job_id); +SELECT citus_job_wait(:job_id); + +-- verify we get an error when waiting for a job to reach a specific status while it is already in a different terminal status +SELECT citus_job_wait(:job_id, desired_status => 'finished'); + +-- show that the status has been cancelled +SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; +SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; + +-- cancel a running job +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job','cancelling a task after it started') RETURNING job_id \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id \gset + +SELECT citus_job_wait(:job_id, desired_status => 'running'); +SELECT citus_job_cancel(:job_id); +SELECT citus_job_wait(:job_id); + +-- show that the status has been cancelled +SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; +SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; + +-- test a failing task becomes runnable in the future again +-- we cannot fully test the backoff strategy currently as it is hard coded to take about 50 minutes. +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'failure test due to division by zero') RETURNING job_id \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ SELECT 1/0; $job$) RETURNING task_id \gset + +SELECT citus_job_wait(:job_id, desired_status => 'running'); +SELECT pg_sleep(.1); -- make sure it has time to error after it started running + +SELECT status, pid, retry_count, NOT(message = '') AS has_message, (not_before > now()) AS scheduled_into_the_future FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; + +-- test cancelling a failed/retrying job +SELECT citus_job_cancel(:job_id); + +SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; +SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; + +-- test running two dependant tasks +TRUNCATE TABLE results; +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ INSERT INTO background_task_queue_monitor.results VALUES ( 5 ); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ UPDATE background_task_queue_monitor.results SET a = a * 7; $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ UPDATE background_task_queue_monitor.results SET a = a + 13; $job$) RETURNING task_id AS task_id3 \gset +INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id2, :task_id1); +INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id3, :task_id2); +COMMIT; + +SELECT citus_job_wait(:job_id); -- wait for the job to be finished +SELECT a FROM results; + +-- test running two dependant tasks, with a failing task that we cancel +TRUNCATE TABLE results; +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify background execution') RETURNING job_id \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id, $job$ INSERT INTO background_task_queue_monitor.results VALUES ( 5 ); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ SELECT 1/0; $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, status, command) VALUES (:job_id, 'blocked', $job$ UPDATE background_task_queue_monitor.results SET a = a + 13; $job$) RETURNING task_id AS task_id3 \gset +INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id2, :task_id1); +INSERT INTO pg_dist_background_task_depend (job_id, task_id, depends_on) VALUES (:job_id, :task_id3, :task_id2); +COMMIT; + +SELECT citus_job_wait(:job_id, desired_status => 'running'); -- wait for the job to be running, and possibly hitting a failure +SELECT pg_sleep(.1); -- improve chances of hitting the failure + +SELECT citus_job_cancel(:job_id); +SELECT citus_job_wait(:job_id); -- wait for the job to be cancelled +SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; +SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; + +SET client_min_messages TO WARNING; +DROP SCHEMA background_task_queue_monitor CASCADE; + +ALTER SYSTEM RESET citus.background_task_queue_interval; +SELECT pg_reload_conf(); diff --git a/src/test/regress/sql/upgrade_basic_after.sql b/src/test/regress/sql/upgrade_basic_after.sql index 0ee0ce0f5..11cb2dcc1 100644 --- a/src/test/regress/sql/upgrade_basic_after.sql +++ b/src/test/regress/sql/upgrade_basic_after.sql @@ -10,6 +10,8 @@ SELECT nextval('pg_dist_node_nodeid_seq') = MAX(nodeid)+1 FROM pg_dist_node; SELECT nextval('pg_dist_colocationid_seq') = MAX(colocationid)+1 FROM pg_dist_colocation; SELECT nextval('pg_dist_operationid_seq') = MAX(operation_id)+1 FROM pg_dist_cleanup; SELECT nextval('pg_dist_cleanup_recordid_seq') = MAX(record_id)+1 FROM pg_dist_cleanup; +SELECT nextval('pg_dist_background_job_job_id_seq') > COALESCE(MAX(job_id), 0) FROM pg_dist_background_job; +SELECT nextval('pg_dist_background_task_task_id_seq') > COALESCE(MAX(task_id), 0) FROM pg_dist_background_task; -- If this query gives output it means we've added a new sequence that should -- possibly be restored after upgrades. @@ -23,7 +25,9 @@ SELECT sequence_name FROM information_schema.sequences 'pg_dist_node_nodeid_seq', 'pg_dist_colocationid_seq', 'pg_dist_operationid_seq', - 'pg_dist_cleanup_recordid_seq' + 'pg_dist_cleanup_recordid_seq', + 'pg_dist_background_job_job_id_seq', + 'pg_dist_background_task_task_id_seq' ); SELECT logicalrelid FROM pg_dist_partition