diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index cbcb2f396..5d7f4fd98 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -142,6 +142,7 @@ typedef struct MetadataCacheData Oid distShardRelationId; Oid distPlacementRelationId; Oid distBackgroundJobsRelationId; + Oid distBackgroundJobsJobIdIndexId; Oid distBackgroundTasksRelationId; Oid distBackgroundTasksTaskIdIndexId; Oid distBackgroundTasksStatusTaskIdIndexId; @@ -153,6 +154,7 @@ typedef struct MetadataCacheData Oid citusJobStatusFinishedId; Oid citusJobStatusCancelledId; Oid citusJobStatusFailedId; + Oid citusJobStatusFailingId; Oid citusTaskStatusScheduledId; Oid citusTaskStatusRunningId; Oid citusTaskStatusDoneId; @@ -2391,6 +2393,16 @@ DistBackgroundJobsRelationId(void) } +Oid +DistBackgroundJobsJobIdIndexId(void) +{ + CachedRelationLookup("pg_dist_background_jobs_job_id_index", + &MetadataCache.distBackgroundJobsJobIdIndexId); + + return MetadataCache.distBackgroundJobsJobIdIndexId; +} + + Oid DistBackgroundTasksRelationId(void) { @@ -3238,6 +3250,19 @@ CitusJobStatusFailedId(void) } +Oid +CitusJobStatusFailingId(void) +{ + if (!MetadataCache.citusJobStatusFailingId) + { + MetadataCache.citusJobStatusFailingId = + LookupStringEnumValueId("citus_job_status", "failing"); + } + + return MetadataCache.citusJobStatusFailingId; +} + + Oid CitusTaskStatusScheduledId(void) { diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 84d1a60bf..8a2f4a891 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -2801,6 +2801,208 @@ GetBackgroundTaskByTaskId(int64 jobId, int64 taskId) } +static bool +JobTasksStatusCount(int64 jobId, int *scheduled, int *running, int *done, int *error, + int *unscheduled) +{ +#define CLEAR_OPTIONAL_COUNTER(counter) \ + if (counter) \ + { \ + *counter = 0; \ + } + + CLEAR_OPTIONAL_COUNTER(scheduled); + CLEAR_OPTIONAL_COUNTER(running); + CLEAR_OPTIONAL_COUNTER(done); + CLEAR_OPTIONAL_COUNTER(error); + CLEAR_OPTIONAL_COUNTER(unscheduled); + +#undef CLEAR_OPTIONAL_COUNTER; + + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTasksRelationId(), RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); + + ScanKeyData scanKey[1] = { 0 }; + const bool indexOK = true; + + /* WHERE job_id = $task->jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_job_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId)); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasks, + DistBackgroundTasksTaskIdIndexId(), + indexOK, NULL, lengthof(scanKey), scanKey); + + HeapTuple heapTuple = NULL; + bool allDone = true; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + Datum values[Natts_pg_dist_background_tasks] = { 0 }; + bool isnull[Natts_pg_dist_background_tasks] = { 0 }; + + heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); + + Oid statusOid = DatumGetObjectId(values[Anum_pg_dist_background_tasks_status - + 1]); + BackgroundTaskStatus status = BackgroundTaskStatusByOid(statusOid); + +#define INCREMENT_OPTIONAL_COUNTER(counter) \ + if (counter) \ + { \ + *counter = (*counter)++; \ + } + + switch (status) + { + case BACKGROUND_TASK_STATUS_SCHEDULED: + { + INCREMENT_OPTIONAL_COUNTER(scheduled); + break; + } + + case BACKGROUND_TASK_STATUS_RUNNING: + { + INCREMENT_OPTIONAL_COUNTER(running); + break; + } + + case BACKGROUND_TASK_STATUS_DONE: + { + INCREMENT_OPTIONAL_COUNTER(done); + break; + } + + case BACKGROUND_TASK_STATUS_ERROR: + { + INCREMENT_OPTIONAL_COUNTER(error); + break; + } + + case BACKGROUND_TASK_STATUS_UNSCHEDULED: + { + INCREMENT_OPTIONAL_COUNTER(unscheduled); + break; + } + + default: + { + elog(ERROR, "unknown state in pg_dist_background_tasks"); + } + } + +#undef INCREMENT_OPTIONAL_COUNTER + } + + systable_endscan(scanDescriptor); + table_close(pgDistBackgroundTasks, NoLock); + + return allDone; +} + + +void +UpdateBackgroundJob(int64 jobId) +{ + int scheduled = 0; + int running = 0; + int done = 0; + int error = 0; + int unscheduled = 0; + + JobTasksStatusCount(jobId, &scheduled, &running, &done, &error, &unscheduled); + Oid stateOid = InvalidOid; + + if (scheduled + running + error + unscheduled == 0) + { + /* all tasks are done, job is finished */ + stateOid = ObjectIdGetDatum(CitusJobStatusFinisehdId()); + } + else if (error + unscheduled > 0) + { + /* we are either failing, or failed */ + if (scheduled + running > 0) + { + /* failing, as there are still tasks to be run */ + stateOid = ObjectIdGetDatum(CitusJobStatusFailingId()); + } + else + { + /* failed */ + stateOid = ObjectIdGetDatum(CitusJobStatusFailedId()); + } + } + else if (scheduled + running > 0) + { + stateOid = ObjectIdGetDatum(CitusJobStatusRunningId()); + } + else + { + return; + } + + Relation pgDistBackgroundJobs = + table_open(DistBackgroundJobsRelationId(), RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundJobs); + + ScanKeyData scanKey[1] = { 0 }; + const bool indexOK = true; + + /* WHERE job_id = $task->jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_jobs_job_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId)); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundJobs, + DistBackgroundJobsJobIdIndexId(), + indexOK, NULL, lengthof(scanKey), scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find background jobs entry for job_id: " + UINT64_FORMAT, jobId))); + } + + Datum values[Natts_pg_dist_background_tasks] = { 0 }; + bool isnull[Natts_pg_dist_background_tasks] = { 0 }; + bool replace[Natts_pg_dist_background_tasks] = { 0 }; + + heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); + + bool updated = false; + +#define UPDATE_FIELD(field, newNull, newValue) \ + { \ + replace[((field) - 1)] = (((newNull) != isnull[((field) - 1)]) \ + || (values[((field) - 1)] != (newValue))); \ + isnull[((field) - 1)] = (newNull); \ + values[((field) - 1)] = (newValue); \ + updated |= replace[((field) - 1)]; \ + } + + UPDATE_FIELD(Anum_pg_dist_background_jobs_state, false, stateOid); + + /* TODO update finished time */ + +#undef UPDATE_FIELD + + if (updated) + { + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, + replace); + + CatalogTupleUpdate(pgDistBackgroundJobs, &heapTuple->t_self, heapTuple); + + CommandCounterIncrement(); + } + + systable_endscan(scanDescriptor); + table_close(pgDistBackgroundJobs, NoLock); +} + + void UpdateBackgroundTask(BackgroundTask *task) { diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 63f7bbf66..5a538b5a7 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -77,7 +77,7 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text); #include "udfs/isolate_tenant_to_new_shard/11.1-1.sql" -CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'finished', 'cancelled', 'failed'); +CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'finished', 'cancelled', 'failing', 'failed'); ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog; CREATE TABLE citus.pg_dist_background_jobs ( diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 657dbc90e..a97f3eaf7 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -334,6 +334,7 @@ CitusBackgroundTaskMonitorMain(Datum arg) } } UpdateBackgroundTask(task); + UpdateBackgroundJob(task->jobid); dsm_detach(seg); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 04b660956..d369eb023 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -238,6 +238,7 @@ extern Oid DistEnabledCustomAggregatesId(void); extern Oid DistNodeNodeIdIndexId(void); extern Oid DistPartitionLogicalRelidIndexId(void); extern Oid DistPartitionColocationidIndexId(void); +extern Oid DistBackgroundJobsJobIdIndexId(void); extern Oid DistBackgroundTasksTaskIdIndexId(void); extern Oid DistBackgroundTasksStatusTaskIdIndexId(void); extern Oid DistBackgroundTasksDependRelationId(void); @@ -280,6 +281,7 @@ extern Oid CitusJobStatusRunningId(void); extern Oid CitusJobStatusFinisehdId(void); extern Oid CitusJobStatusCancelledId(void); extern Oid CitusJobStatusFailedId(void); +extern Oid CitusJobStatusFailingId(void); extern Oid CitusTaskStatusScheduledId(void); extern Oid CitusTaskStatusRunningId(void); extern Oid CitusTaskStatusDoneId(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 8d7de80a3..21ec6d431 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -346,6 +346,7 @@ extern BackgroundTask * GetRunnableBackgroundTask(void); extern void ResetRunningBackgroundTasks(void); extern void DeepFreeBackgroundTask(BackgroundTask *task); extern BackgroundTask * GetBackgroundTaskByTaskId(int64 jobId, int64 taskId); +extern void UpdateBackgroundJob(int64 jobId); extern void UpdateBackgroundTask(BackgroundTask *task); extern void UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status, const int32 *retry_count, char *message);