unschedule dependant jobs on failure

background-job-details
Nils Dijk 2022-07-20 19:23:58 +02:00 committed by Jelte Fennema
parent aee5dba634
commit 5a2ec73475
7 changed files with 254 additions and 10 deletions

View File

@ -145,10 +145,13 @@ typedef struct MetadataCacheData
Oid distRebalanceJobsJobsIndexId; Oid distRebalanceJobsJobsIndexId;
Oid distRebalanceJobsStatusJobsIndexId; Oid distRebalanceJobsStatusJobsIndexId;
Oid distRebalanceJobsDependRelationId; Oid distRebalanceJobsDependRelationId;
Oid distRebalanceJobsDependsJobIdIndexId;
Oid distRebalanceJobsDependsDependsOnIndexId;
Oid jobStatusScheduledId; Oid jobStatusScheduledId;
Oid jobStatusRunningId; Oid jobStatusRunningId;
Oid jobStatusDoneId; Oid jobStatusDoneId;
Oid jobStatusErrorId; Oid jobStatusErrorId;
Oid jobStatusSnscheduledId;
Oid distRebalanceStrategyRelationId; Oid distRebalanceStrategyRelationId;
Oid distNodeRelationId; Oid distNodeRelationId;
Oid distNodeNodeIdIndexId; Oid distNodeNodeIdIndexId;
@ -2412,6 +2415,26 @@ DistRebalanceJobsDependRelationId(void)
} }
Oid
DistRebalanceJobsDependJobIdIndexId(void)
{
CachedRelationLookup("pg_dist_rebalance_jobs_depend_jobid",
&MetadataCache.distRebalanceJobsDependsJobIdIndexId);
return MetadataCache.distRebalanceJobsDependsJobIdIndexId;
}
Oid
DistRebalanceJobsDependDependsOnIndexId(void)
{
CachedRelationLookup("pg_dist_rebalance_jobs_depend_depends_on",
&MetadataCache.distRebalanceJobsDependsDependsOnIndexId);
return MetadataCache.distRebalanceJobsDependsDependsOnIndexId;
}
/* return oid of pg_dist_rebalance_strategy relation */ /* return oid of pg_dist_rebalance_strategy relation */
Oid Oid
DistRebalanceStrategyRelationId(void) DistRebalanceStrategyRelationId(void)
@ -3186,6 +3209,19 @@ JobStatusErrorId(void)
} }
Oid
JobStatusUnscheduledId(void)
{
if (!MetadataCache.jobStatusSnscheduledId)
{
MetadataCache.jobStatusSnscheduledId =
LookupStringEnumValueId("citus_job_status", "unscheduled");
}
return MetadataCache.jobStatusSnscheduledId;
}
/* /*
* citus_dist_partition_cache_invalidate is a trigger function that performs * citus_dist_partition_cache_invalidate is a trigger function that performs
* relcache invalidations when the contents of pg_dist_partition are changed * relcache invalidations when the contents of pg_dist_partition are changed

View File

@ -2285,6 +2285,10 @@ RebalanceJobStatusByOid(Oid enumOid)
{ {
return REBALANCE_JOB_STATUS_ERROR; return REBALANCE_JOB_STATUS_ERROR;
} }
else if (enumOid == JobStatusUnscheduledId())
{
return REBALANCE_JOB_STATUS_UNSCHEDULED;
}
ereport(ERROR, (errmsg("unknown enum value for citus_job_status"))); ereport(ERROR, (errmsg("unknown enum value for citus_job_status")));
return REBALANCE_JOB_STATUS_UNKNOWN; return REBALANCE_JOB_STATUS_UNKNOWN;
} }
@ -2297,6 +2301,7 @@ IsRebalanceJobStatusTerminal(RebalanceJobStatus status)
{ {
case REBALANCE_JOB_STATUS_DONE: case REBALANCE_JOB_STATUS_DONE:
case REBALANCE_JOB_STATUS_ERROR: case REBALANCE_JOB_STATUS_ERROR:
case REBALANCE_JOB_STATUS_UNSCHEDULED:
{ {
return true; return true;
} }
@ -2329,6 +2334,16 @@ RebalanceJobStatusOid(RebalanceJobStatus status)
return JobStatusDoneId(); return JobStatusDoneId();
} }
case REBALANCE_JOB_STATUS_ERROR:
{
return JobStatusErrorId();
}
case REBALANCE_JOB_STATUS_UNSCHEDULED:
{
return JobStatusUnscheduledId();
}
default: default:
{ {
return InvalidOid; return InvalidOid;
@ -2491,8 +2506,64 @@ ResetRunningJobs(void)
} }
bool
JobHasUmnetDependencies(int64 jobid)
{
bool hasUnmetDependency = false;
Relation pgDistRebalanceJobsDepend = table_open(DistRebalanceJobsDependRelationId(),
AccessShareLock);
const int scanKeyCount = 1;
ScanKeyData scanKey[1] = { 0 };
bool indexOK = true;
/* pg_catalog.pg_dist_rebalance_jobs_depend.jobid = $jobid */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_depend_jobid,
BTEqualStrategyNumber, F_INT8EQ, jobid);
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobsDepend,
DistRebalanceJobsDependJobIdIndexId(),
indexOK, NULL, scanKeyCount,
scanKey);
HeapTuple dependTuple = NULL;
while (HeapTupleIsValid(dependTuple = systable_getnext(scanDescriptor)))
{
Form_pg_dist_rebalance_jobs_depend depends =
(Form_pg_dist_rebalance_jobs_depend) GETSTRUCT(dependTuple);
RebalanceJob *dependingJob = GetScheduledRebalanceJobByJobID(depends->depends_on);
/*
* Only when the status of all depending jobs is done we clear this job and say
* that is has no unmet dependencies.
*/
if (dependingJob->status == REBALANCE_JOB_STATUS_DONE)
{
continue;
}
/*
* we assume that when we ask for job to be cleared it has no dependencies that
* have errored. Once we move the job to error it should unschedule all dependant
* jobs recursively.
*/
Assert(dependingJob->status != REBALANCE_JOB_STATUS_ERROR);
hasUnmetDependency = true;
break;
}
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobsDepend, AccessShareLock);
return hasUnmetDependency;
}
RebalanceJob * RebalanceJob *
GetScheduledRebalanceJob(void) GetRunableRebalanceJob(void)
{ {
const int scanKeyCount = 1; const int scanKeyCount = 1;
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
@ -2518,21 +2589,30 @@ GetScheduledRebalanceJob(void)
indexOK, NULL, scanKeyCount, indexOK, NULL, scanKeyCount,
scanKey); scanKey);
HeapTuple jobTuple = systable_getnext(scanDescriptor); HeapTuple jobTuple = NULL;
if (HeapTupleIsValid(jobTuple)) while (HeapTupleIsValid(jobTuple = systable_getnext(scanDescriptor)))
{ {
Datum datumArray[Natts_pg_dist_rebalance_jobs]; Datum datumArray[Natts_pg_dist_rebalance_jobs];
bool isNullArray[Natts_pg_dist_rebalance_jobs]; bool isNullArray[Natts_pg_dist_rebalance_jobs];
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray); heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray);
int64 jobid = DatumGetInt64(
datumArray[Anum_pg_dist_rebalance_jobs_jobid - 1]);
if (JobHasUmnetDependencies(jobid))
{
continue;
}
job = palloc0(sizeof(RebalanceJob)); job = palloc0(sizeof(RebalanceJob));
job->jobid = DatumGetInt64(datumArray[Anum_pg_dist_rebalance_jobs_jobid - 1]); job->jobid = jobid;
job->status = RebalanceJobStatusByOid( job->status = RebalanceJobStatusByOid(
DatumGetObjectId(datumArray[Anum_pg_dist_rebalance_jobs_status - 1])); DatumGetObjectId(datumArray[Anum_pg_dist_rebalance_jobs_status - 1]));
job->command = text_to_cstring( job->command = text_to_cstring(
DatumGetTextP(datumArray[Anum_pg_dist_rebalance_jobs_command - 1])); DatumGetTextP(datumArray[Anum_pg_dist_rebalance_jobs_command - 1]));
break;
} }
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
@ -2648,7 +2728,7 @@ UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus)
} }
void bool
UpdateJobError(RebalanceJob *job, ErrorData *edata) UpdateJobError(RebalanceJob *job, ErrorData *edata)
{ {
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(),
@ -2692,6 +2772,11 @@ UpdateJobError(RebalanceJob *job, ErrorData *edata)
isnull[Anum_pg_dist_rebalance_jobs_retry_count - 1] = false; isnull[Anum_pg_dist_rebalance_jobs_retry_count - 1] = false;
replace[Anum_pg_dist_rebalance_jobs_retry_count - 1] = true; replace[Anum_pg_dist_rebalance_jobs_retry_count - 1] = true;
values[Anum_pg_dist_rebalance_jobs_pid - 1] = InvalidOid;
isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = true;
replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true;
bool statusError = false;
if (retryCount >= 3) if (retryCount >= 3)
{ {
/* after 3 failures we will transition the job to error and stop executing */ /* after 3 failures we will transition the job to error and stop executing */
@ -2699,6 +2784,8 @@ UpdateJobError(RebalanceJob *job, ErrorData *edata)
ObjectIdGetDatum(JobStatusErrorId()); ObjectIdGetDatum(JobStatusErrorId());
isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false; isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false;
replace[Anum_pg_dist_rebalance_jobs_status - 1] = true; replace[Anum_pg_dist_rebalance_jobs_status - 1] = true;
statusError = true;
} }
StringInfoData buf = { 0 }; StringInfoData buf = { 0 };
@ -2756,4 +2843,109 @@ UpdateJobError(RebalanceJob *job, ErrorData *edata)
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobs, NoLock); table_close(pgDistRebalanceJobs, NoLock);
/* when we have changed the status to Error we will need to unschedule all dependent jobs (recursively) */
if (statusError)
{
UnscheduleDependantJobs(job->jobid);
}
return statusError;
}
static List *
GetDependantJobs(int64 jobid)
{
Relation pgDistRebalanceJobsDepends = table_open(DistRebalanceJobsDependRelationId(),
RowExclusiveLock);
const bool indexOK = true;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
/* pg_dist_rebalance_jobs_depend.depends_on = $jobid */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_depend_depends_on,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid));
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobsDepends,
DistRebalanceJobsDependDependsOnIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);
List *dependantJobs = NIL;
HeapTuple heapTuple = NULL;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
Form_pg_dist_rebalance_jobs_depend depend =
(Form_pg_dist_rebalance_jobs_depend) GETSTRUCT(heapTuple);
int64 *dJobid = palloc0(sizeof(int64));
*dJobid = depend->jobid;
dependantJobs = lappend(dependantJobs, dJobid);
}
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobsDepends, NoLock);
return dependantJobs;
}
void
UnscheduleDependantJobs(int64 jobid)
{
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(),
RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
List *dependantJobs = GetDependantJobs(jobid);
while (list_length(dependantJobs) > 0)
{
/* pop last item from stack */
int64 cJobid = *(int64 *) llast(dependantJobs);
dependantJobs = list_delete_last(dependantJobs);
/* push new dependant jobs on to stack */
dependantJobs = list_concat(dependantJobs, GetDependantJobs(cJobid));
/* unschedule current job */
{
ScanKeyData scanKey[1] = { 0 };
int scanKeyCount = 1;
/* WHERE jobid = job->jobid */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(cJobid));
const bool indexOK = true;
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistRebalanceJobsJobsIdIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find rebalance job entry for jobid: "
UINT64_FORMAT, cJobid)));
}
Datum values[Natts_pg_dist_rebalance_jobs] = { 0 };
bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 };
bool replace[Natts_pg_dist_rebalance_jobs] = { 0 };
values[Anum_pg_dist_rebalance_jobs_status - 1] =
ObjectIdGetDatum(JobStatusUnscheduledId());
isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false;
replace[Anum_pg_dist_rebalance_jobs_status - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull,
replace);
CatalogTupleUpdate(pgDistRebalanceJobs, &heapTuple->t_self, heapTuple);
systable_endscan(scanDescriptor);
}
}
table_close(pgDistRebalanceJobs, NoLock);
} }

View File

@ -77,7 +77,7 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text); DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text);
#include "udfs/isolate_tenant_to_new_shard/11.1-1.sql" #include "udfs/isolate_tenant_to_new_shard/11.1-1.sql"
CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'done', 'error'); CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'done', 'error', 'unscheduled');
ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog; ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog;
CREATE TABLE citus.pg_dist_rebalance_jobs( CREATE TABLE citus.pg_dist_rebalance_jobs(

View File

@ -909,7 +909,7 @@ RebalanceJobsBackgroundWorkerMain(Datum arg)
* later due to the committing and starting of new transactions * later due to the committing and starting of new transactions
*/ */
MemoryContext oldContext = MemoryContextSwitchTo(perJobContext); MemoryContext oldContext = MemoryContextSwitchTo(perJobContext);
RebalanceJob *job = GetScheduledRebalanceJob(); RebalanceJob *job = GetRunableRebalanceJob();
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
if (job) if (job)

View File

@ -240,6 +240,8 @@ extern Oid DistPartitionColocationidIndexId(void);
extern Oid DistRebalanceJobsJobsIdIndexId(void); extern Oid DistRebalanceJobsJobsIdIndexId(void);
extern Oid DistRebalanceJobsStatusJobsIdIndexId(void); extern Oid DistRebalanceJobsStatusJobsIdIndexId(void);
extern Oid DistRebalanceJobsDependRelationId(void); extern Oid DistRebalanceJobsDependRelationId(void);
extern Oid DistRebalanceJobsDependJobIdIndexId(void);
extern Oid DistRebalanceJobsDependDependsOnIndexId(void);
extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardLogicalRelidIndexId(void);
extern Oid DistShardShardidIndexId(void); extern Oid DistShardShardidIndexId(void);
extern Oid DistPlacementShardidIndexId(void); extern Oid DistPlacementShardidIndexId(void);
@ -276,6 +278,7 @@ extern Oid JobStatusScheduledId(void);
extern Oid JobStatusRunningId(void); extern Oid JobStatusRunningId(void);
extern Oid JobStatusDoneId(void); extern Oid JobStatusDoneId(void);
extern Oid JobStatusErrorId(void); extern Oid JobStatusErrorId(void);
extern Oid JobStatusUnscheduledId(void);
/* user related functions */ /* user related functions */
extern Oid CitusExtensionOwner(void); extern Oid CitusExtensionOwner(void);

View File

@ -210,7 +210,8 @@ typedef enum RebalanceJobStatus
REBALANCE_JOB_STATUS_SCHEDULED, REBALANCE_JOB_STATUS_SCHEDULED,
REBALANCE_JOB_STATUS_RUNNING, REBALANCE_JOB_STATUS_RUNNING,
REBALANCE_JOB_STATUS_DONE, REBALANCE_JOB_STATUS_DONE,
REBALANCE_JOB_STATUS_ERROR REBALANCE_JOB_STATUS_ERROR,
REBALANCE_JOB_STATUS_UNSCHEDULED
} RebalanceJobStatus; } RebalanceJobStatus;
@ -333,11 +334,13 @@ extern bool HasScheduledRebalanceJobs(void);
extern int64 GetNextRebalanceJobId(void); extern int64 GetNextRebalanceJobId(void);
extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount, extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount,
int64 dependingJobIds[]); int64 dependingJobIds[]);
extern RebalanceJob * GetScheduledRebalanceJob(void); extern bool JobHasUmnetDependencies(int64 jobid);
extern RebalanceJob * GetRunableRebalanceJob(void);
extern void ResetRunningJobs(void); extern void ResetRunningJobs(void);
extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId); extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId);
extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus); extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus);
extern void UpdateJobError(RebalanceJob *job, ErrorData *edata); extern bool UpdateJobError(RebalanceJob *job, ErrorData *edata);
extern void UnscheduleDependantJobs(int64 jobid);
extern bool IsRebalanceJobStatusTerminal(RebalanceJobStatus status); extern bool IsRebalanceJobStatusTerminal(RebalanceJobStatus status);
extern Oid RebalanceJobStatusOid(RebalanceJobStatus status); extern Oid RebalanceJobStatusOid(RebalanceJobStatus status);
#endif /* METADATA_UTILITY_H */ #endif /* METADATA_UTILITY_H */

View File

@ -2,6 +2,16 @@
#ifndef CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H #ifndef CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H
#define CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H #define CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H
typedef struct FormData_pg_dist_rebalance_jobs_depend
{
int64 jobid;
int64 depends_on;
#ifdef CATALOG_VARLEN /* variable-length fields start here */
#endif
} FormData_pg_dist_rebalance_jobs_depend;
typedef FormData_pg_dist_rebalance_jobs_depend *Form_pg_dist_rebalance_jobs_depend;
/* ---------------- /* ----------------
* compiler constants for pg_dist_rebalance_jobs_depend * compiler constants for pg_dist_rebalance_jobs_depend
* ---------------- * ----------------