mirror of https://github.com/citusdata/citus.git
wip update job state
parent
a23d340fbe
commit
780d9da1c6
|
|
@ -142,6 +142,7 @@ typedef struct MetadataCacheData
|
|||
Oid distShardRelationId;
|
||||
Oid distPlacementRelationId;
|
||||
Oid distBackgroundJobsRelationId;
|
||||
Oid distBackgroundJobsJobIdIndexId;
|
||||
Oid distBackgroundTasksRelationId;
|
||||
Oid distBackgroundTasksTaskIdIndexId;
|
||||
Oid distBackgroundTasksStatusTaskIdIndexId;
|
||||
|
|
@ -153,6 +154,7 @@ typedef struct MetadataCacheData
|
|||
Oid citusJobStatusFinishedId;
|
||||
Oid citusJobStatusCancelledId;
|
||||
Oid citusJobStatusFailedId;
|
||||
Oid citusJobStatusFailingId;
|
||||
Oid citusTaskStatusScheduledId;
|
||||
Oid citusTaskStatusRunningId;
|
||||
Oid citusTaskStatusDoneId;
|
||||
|
|
@ -2391,6 +2393,16 @@ DistBackgroundJobsRelationId(void)
|
|||
}
|
||||
|
||||
|
||||
Oid
|
||||
DistBackgroundJobsJobIdIndexId(void)
|
||||
{
|
||||
CachedRelationLookup("pg_dist_background_jobs_job_id_index",
|
||||
&MetadataCache.distBackgroundJobsJobIdIndexId);
|
||||
|
||||
return MetadataCache.distBackgroundJobsJobIdIndexId;
|
||||
}
|
||||
|
||||
|
||||
Oid
|
||||
DistBackgroundTasksRelationId(void)
|
||||
{
|
||||
|
|
@ -3238,6 +3250,19 @@ CitusJobStatusFailedId(void)
|
|||
}
|
||||
|
||||
|
||||
Oid
|
||||
CitusJobStatusFailingId(void)
|
||||
{
|
||||
if (!MetadataCache.citusJobStatusFailingId)
|
||||
{
|
||||
MetadataCache.citusJobStatusFailingId =
|
||||
LookupStringEnumValueId("citus_job_status", "failing");
|
||||
}
|
||||
|
||||
return MetadataCache.citusJobStatusFailingId;
|
||||
}
|
||||
|
||||
|
||||
Oid
|
||||
CitusTaskStatusScheduledId(void)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -2801,6 +2801,208 @@ GetBackgroundTaskByTaskId(int64 jobId, int64 taskId)
|
|||
}
|
||||
|
||||
|
||||
static bool
|
||||
JobTasksStatusCount(int64 jobId, int *scheduled, int *running, int *done, int *error,
|
||||
int *unscheduled)
|
||||
{
|
||||
#define CLEAR_OPTIONAL_COUNTER(counter) \
|
||||
if (counter) \
|
||||
{ \
|
||||
*counter = 0; \
|
||||
}
|
||||
|
||||
CLEAR_OPTIONAL_COUNTER(scheduled);
|
||||
CLEAR_OPTIONAL_COUNTER(running);
|
||||
CLEAR_OPTIONAL_COUNTER(done);
|
||||
CLEAR_OPTIONAL_COUNTER(error);
|
||||
CLEAR_OPTIONAL_COUNTER(unscheduled);
|
||||
|
||||
#undef CLEAR_OPTIONAL_COUNTER;
|
||||
|
||||
Relation pgDistBackgroundTasks =
|
||||
table_open(DistBackgroundTasksRelationId(), RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks);
|
||||
|
||||
ScanKeyData scanKey[1] = { 0 };
|
||||
const bool indexOK = true;
|
||||
|
||||
/* WHERE job_id = $task->jobId */
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_job_id,
|
||||
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId));
|
||||
|
||||
SysScanDesc scanDescriptor =
|
||||
systable_beginscan(pgDistBackgroundTasks,
|
||||
DistBackgroundTasksTaskIdIndexId(),
|
||||
indexOK, NULL, lengthof(scanKey), scanKey);
|
||||
|
||||
HeapTuple heapTuple = NULL;
|
||||
bool allDone = true;
|
||||
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
||||
{
|
||||
Datum values[Natts_pg_dist_background_tasks] = { 0 };
|
||||
bool isnull[Natts_pg_dist_background_tasks] = { 0 };
|
||||
|
||||
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
|
||||
|
||||
Oid statusOid = DatumGetObjectId(values[Anum_pg_dist_background_tasks_status -
|
||||
1]);
|
||||
BackgroundTaskStatus status = BackgroundTaskStatusByOid(statusOid);
|
||||
|
||||
#define INCREMENT_OPTIONAL_COUNTER(counter) \
|
||||
if (counter) \
|
||||
{ \
|
||||
*counter = (*counter)++; \
|
||||
}
|
||||
|
||||
switch (status)
|
||||
{
|
||||
case BACKGROUND_TASK_STATUS_SCHEDULED:
|
||||
{
|
||||
INCREMENT_OPTIONAL_COUNTER(scheduled);
|
||||
break;
|
||||
}
|
||||
|
||||
case BACKGROUND_TASK_STATUS_RUNNING:
|
||||
{
|
||||
INCREMENT_OPTIONAL_COUNTER(running);
|
||||
break;
|
||||
}
|
||||
|
||||
case BACKGROUND_TASK_STATUS_DONE:
|
||||
{
|
||||
INCREMENT_OPTIONAL_COUNTER(done);
|
||||
break;
|
||||
}
|
||||
|
||||
case BACKGROUND_TASK_STATUS_ERROR:
|
||||
{
|
||||
INCREMENT_OPTIONAL_COUNTER(error);
|
||||
break;
|
||||
}
|
||||
|
||||
case BACKGROUND_TASK_STATUS_UNSCHEDULED:
|
||||
{
|
||||
INCREMENT_OPTIONAL_COUNTER(unscheduled);
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
elog(ERROR, "unknown state in pg_dist_background_tasks");
|
||||
}
|
||||
}
|
||||
|
||||
#undef INCREMENT_OPTIONAL_COUNTER
|
||||
}
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistBackgroundTasks, NoLock);
|
||||
|
||||
return allDone;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
UpdateBackgroundJob(int64 jobId)
|
||||
{
|
||||
int scheduled = 0;
|
||||
int running = 0;
|
||||
int done = 0;
|
||||
int error = 0;
|
||||
int unscheduled = 0;
|
||||
|
||||
JobTasksStatusCount(jobId, &scheduled, &running, &done, &error, &unscheduled);
|
||||
Oid stateOid = InvalidOid;
|
||||
|
||||
if (scheduled + running + error + unscheduled == 0)
|
||||
{
|
||||
/* all tasks are done, job is finished */
|
||||
stateOid = ObjectIdGetDatum(CitusJobStatusFinisehdId());
|
||||
}
|
||||
else if (error + unscheduled > 0)
|
||||
{
|
||||
/* we are either failing, or failed */
|
||||
if (scheduled + running > 0)
|
||||
{
|
||||
/* failing, as there are still tasks to be run */
|
||||
stateOid = ObjectIdGetDatum(CitusJobStatusFailingId());
|
||||
}
|
||||
else
|
||||
{
|
||||
/* failed */
|
||||
stateOid = ObjectIdGetDatum(CitusJobStatusFailedId());
|
||||
}
|
||||
}
|
||||
else if (scheduled + running > 0)
|
||||
{
|
||||
stateOid = ObjectIdGetDatum(CitusJobStatusRunningId());
|
||||
}
|
||||
else
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
Relation pgDistBackgroundJobs =
|
||||
table_open(DistBackgroundJobsRelationId(), RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundJobs);
|
||||
|
||||
ScanKeyData scanKey[1] = { 0 };
|
||||
const bool indexOK = true;
|
||||
|
||||
/* WHERE job_id = $task->jobId */
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_jobs_job_id,
|
||||
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId));
|
||||
|
||||
SysScanDesc scanDescriptor =
|
||||
systable_beginscan(pgDistBackgroundJobs,
|
||||
DistBackgroundJobsJobIdIndexId(),
|
||||
indexOK, NULL, lengthof(scanKey), scanKey);
|
||||
|
||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||
if (!HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not find background jobs entry for job_id: "
|
||||
UINT64_FORMAT, jobId)));
|
||||
}
|
||||
|
||||
Datum values[Natts_pg_dist_background_tasks] = { 0 };
|
||||
bool isnull[Natts_pg_dist_background_tasks] = { 0 };
|
||||
bool replace[Natts_pg_dist_background_tasks] = { 0 };
|
||||
|
||||
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
|
||||
|
||||
bool updated = false;
|
||||
|
||||
#define UPDATE_FIELD(field, newNull, newValue) \
|
||||
{ \
|
||||
replace[((field) - 1)] = (((newNull) != isnull[((field) - 1)]) \
|
||||
|| (values[((field) - 1)] != (newValue))); \
|
||||
isnull[((field) - 1)] = (newNull); \
|
||||
values[((field) - 1)] = (newValue); \
|
||||
updated |= replace[((field) - 1)]; \
|
||||
}
|
||||
|
||||
UPDATE_FIELD(Anum_pg_dist_background_jobs_state, false, stateOid);
|
||||
|
||||
/* TODO update finished time */
|
||||
|
||||
#undef UPDATE_FIELD
|
||||
|
||||
if (updated)
|
||||
{
|
||||
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull,
|
||||
replace);
|
||||
|
||||
CatalogTupleUpdate(pgDistBackgroundJobs, &heapTuple->t_self, heapTuple);
|
||||
|
||||
CommandCounterIncrement();
|
||||
}
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistBackgroundJobs, NoLock);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
UpdateBackgroundTask(BackgroundTask *task)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
|
|||
DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text);
|
||||
#include "udfs/isolate_tenant_to_new_shard/11.1-1.sql"
|
||||
|
||||
CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'finished', 'cancelled', 'failed');
|
||||
CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'finished', 'cancelled', 'failing', 'failed');
|
||||
ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog;
|
||||
|
||||
CREATE TABLE citus.pg_dist_background_jobs (
|
||||
|
|
|
|||
|
|
@ -334,6 +334,7 @@ CitusBackgroundTaskMonitorMain(Datum arg)
|
|||
}
|
||||
}
|
||||
UpdateBackgroundTask(task);
|
||||
UpdateBackgroundJob(task->jobid);
|
||||
|
||||
dsm_detach(seg);
|
||||
|
||||
|
|
|
|||
|
|
@ -238,6 +238,7 @@ extern Oid DistEnabledCustomAggregatesId(void);
|
|||
extern Oid DistNodeNodeIdIndexId(void);
|
||||
extern Oid DistPartitionLogicalRelidIndexId(void);
|
||||
extern Oid DistPartitionColocationidIndexId(void);
|
||||
extern Oid DistBackgroundJobsJobIdIndexId(void);
|
||||
extern Oid DistBackgroundTasksTaskIdIndexId(void);
|
||||
extern Oid DistBackgroundTasksStatusTaskIdIndexId(void);
|
||||
extern Oid DistBackgroundTasksDependRelationId(void);
|
||||
|
|
@ -280,6 +281,7 @@ extern Oid CitusJobStatusRunningId(void);
|
|||
extern Oid CitusJobStatusFinisehdId(void);
|
||||
extern Oid CitusJobStatusCancelledId(void);
|
||||
extern Oid CitusJobStatusFailedId(void);
|
||||
extern Oid CitusJobStatusFailingId(void);
|
||||
extern Oid CitusTaskStatusScheduledId(void);
|
||||
extern Oid CitusTaskStatusRunningId(void);
|
||||
extern Oid CitusTaskStatusDoneId(void);
|
||||
|
|
|
|||
|
|
@ -346,6 +346,7 @@ extern BackgroundTask * GetRunnableBackgroundTask(void);
|
|||
extern void ResetRunningBackgroundTasks(void);
|
||||
extern void DeepFreeBackgroundTask(BackgroundTask *task);
|
||||
extern BackgroundTask * GetBackgroundTaskByTaskId(int64 jobId, int64 taskId);
|
||||
extern void UpdateBackgroundJob(int64 jobId);
|
||||
extern void UpdateBackgroundTask(BackgroundTask *task);
|
||||
extern void UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status,
|
||||
const int32 *retry_count, char *message);
|
||||
|
|
|
|||
Loading…
Reference in New Issue