diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 802b699d2..dcc1fe1ed 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -145,10 +145,13 @@ typedef struct MetadataCacheData Oid distRebalanceJobsJobsIndexId; Oid distRebalanceJobsStatusJobsIndexId; Oid distRebalanceJobsDependRelationId; + Oid distRebalanceJobsDependsJobIdIndexId; + Oid distRebalanceJobsDependsDependsOnIndexId; Oid jobStatusScheduledId; Oid jobStatusRunningId; Oid jobStatusDoneId; Oid jobStatusErrorId; + Oid jobStatusSnscheduledId; Oid distRebalanceStrategyRelationId; Oid distNodeRelationId; Oid distNodeNodeIdIndexId; @@ -2412,6 +2415,26 @@ DistRebalanceJobsDependRelationId(void) } +Oid +DistRebalanceJobsDependJobIdIndexId(void) +{ + CachedRelationLookup("pg_dist_rebalance_jobs_depend_jobid", + &MetadataCache.distRebalanceJobsDependsJobIdIndexId); + + return MetadataCache.distRebalanceJobsDependsJobIdIndexId; +} + + +Oid +DistRebalanceJobsDependDependsOnIndexId(void) +{ + CachedRelationLookup("pg_dist_rebalance_jobs_depend_depends_on", + &MetadataCache.distRebalanceJobsDependsDependsOnIndexId); + + return MetadataCache.distRebalanceJobsDependsDependsOnIndexId; +} + + /* return oid of pg_dist_rebalance_strategy relation */ Oid DistRebalanceStrategyRelationId(void) @@ -3186,6 +3209,19 @@ JobStatusErrorId(void) } +Oid +JobStatusUnscheduledId(void) +{ + if (!MetadataCache.jobStatusSnscheduledId) + { + MetadataCache.jobStatusSnscheduledId = + LookupStringEnumValueId("citus_job_status", "unscheduled"); + } + + return MetadataCache.jobStatusSnscheduledId; +} + + /* * citus_dist_partition_cache_invalidate is a trigger function that performs * relcache invalidations when the contents of pg_dist_partition are changed diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 957303502..a5959ad30 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -2285,6 +2285,10 @@ RebalanceJobStatusByOid(Oid enumOid) { return REBALANCE_JOB_STATUS_ERROR; } + else if (enumOid == JobStatusUnscheduledId()) + { + return REBALANCE_JOB_STATUS_UNSCHEDULED; + } ereport(ERROR, (errmsg("unknown enum value for citus_job_status"))); return REBALANCE_JOB_STATUS_UNKNOWN; } @@ -2297,6 +2301,7 @@ IsRebalanceJobStatusTerminal(RebalanceJobStatus status) { case REBALANCE_JOB_STATUS_DONE: case REBALANCE_JOB_STATUS_ERROR: + case REBALANCE_JOB_STATUS_UNSCHEDULED: { return true; } @@ -2329,6 +2334,16 @@ RebalanceJobStatusOid(RebalanceJobStatus status) return JobStatusDoneId(); } + case REBALANCE_JOB_STATUS_ERROR: + { + return JobStatusErrorId(); + } + + case REBALANCE_JOB_STATUS_UNSCHEDULED: + { + return JobStatusUnscheduledId(); + } + default: { return InvalidOid; @@ -2491,8 +2506,64 @@ ResetRunningJobs(void) } +bool +JobHasUmnetDependencies(int64 jobid) +{ + bool hasUnmetDependency = false; + + Relation pgDistRebalanceJobsDepend = table_open(DistRebalanceJobsDependRelationId(), + AccessShareLock); + + const int scanKeyCount = 1; + ScanKeyData scanKey[1] = { 0 }; + bool indexOK = true; + + /* pg_catalog.pg_dist_rebalance_jobs_depend.jobid = $jobid */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_depend_jobid, + BTEqualStrategyNumber, F_INT8EQ, jobid); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobsDepend, + DistRebalanceJobsDependJobIdIndexId(), + indexOK, NULL, scanKeyCount, + scanKey); + + HeapTuple dependTuple = NULL; + while (HeapTupleIsValid(dependTuple = systable_getnext(scanDescriptor))) + { + Form_pg_dist_rebalance_jobs_depend depends = + (Form_pg_dist_rebalance_jobs_depend) GETSTRUCT(dependTuple); + + RebalanceJob *dependingJob = GetScheduledRebalanceJobByJobID(depends->depends_on); + + /* + * Only when the status of all depending jobs is done we clear this job and say + * that is has no unmet dependencies. + */ + if (dependingJob->status == REBALANCE_JOB_STATUS_DONE) + { + continue; + } + + /* + * we assume that when we ask for job to be cleared it has no dependencies that + * have errored. Once we move the job to error it should unschedule all dependant + * jobs recursively. + */ + Assert(dependingJob->status != REBALANCE_JOB_STATUS_ERROR); + + hasUnmetDependency = true; + break; + } + + systable_endscan(scanDescriptor); + table_close(pgDistRebalanceJobsDepend, AccessShareLock); + + return hasUnmetDependency; +} + + RebalanceJob * -GetScheduledRebalanceJob(void) +GetRunableRebalanceJob(void) { const int scanKeyCount = 1; ScanKeyData scanKey[1]; @@ -2518,21 +2589,30 @@ GetScheduledRebalanceJob(void) indexOK, NULL, scanKeyCount, scanKey); - HeapTuple jobTuple = systable_getnext(scanDescriptor); - if (HeapTupleIsValid(jobTuple)) + HeapTuple jobTuple = NULL; + while (HeapTupleIsValid(jobTuple = systable_getnext(scanDescriptor))) { Datum datumArray[Natts_pg_dist_rebalance_jobs]; bool isNullArray[Natts_pg_dist_rebalance_jobs]; TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray); + int64 jobid = DatumGetInt64( + datumArray[Anum_pg_dist_rebalance_jobs_jobid - 1]); + if (JobHasUmnetDependencies(jobid)) + { + continue; + } + job = palloc0(sizeof(RebalanceJob)); - job->jobid = DatumGetInt64(datumArray[Anum_pg_dist_rebalance_jobs_jobid - 1]); + job->jobid = jobid; job->status = RebalanceJobStatusByOid( DatumGetObjectId(datumArray[Anum_pg_dist_rebalance_jobs_status - 1])); job->command = text_to_cstring( DatumGetTextP(datumArray[Anum_pg_dist_rebalance_jobs_command - 1])); + + break; } systable_endscan(scanDescriptor); @@ -2648,7 +2728,7 @@ UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus) } -void +bool UpdateJobError(RebalanceJob *job, ErrorData *edata) { Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), @@ -2692,6 +2772,11 @@ UpdateJobError(RebalanceJob *job, ErrorData *edata) isnull[Anum_pg_dist_rebalance_jobs_retry_count - 1] = false; replace[Anum_pg_dist_rebalance_jobs_retry_count - 1] = true; + values[Anum_pg_dist_rebalance_jobs_pid - 1] = InvalidOid; + isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = true; + replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true; + + bool statusError = false; if (retryCount >= 3) { /* after 3 failures we will transition the job to error and stop executing */ @@ -2699,6 +2784,8 @@ UpdateJobError(RebalanceJob *job, ErrorData *edata) ObjectIdGetDatum(JobStatusErrorId()); isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false; replace[Anum_pg_dist_rebalance_jobs_status - 1] = true; + + statusError = true; } StringInfoData buf = { 0 }; @@ -2756,4 +2843,109 @@ UpdateJobError(RebalanceJob *job, ErrorData *edata) systable_endscan(scanDescriptor); table_close(pgDistRebalanceJobs, NoLock); + + /* when we have changed the status to Error we will need to unschedule all dependent jobs (recursively) */ + if (statusError) + { + UnscheduleDependantJobs(job->jobid); + } + + return statusError; +} + + +static List * +GetDependantJobs(int64 jobid) +{ + Relation pgDistRebalanceJobsDepends = table_open(DistRebalanceJobsDependRelationId(), + RowExclusiveLock); + const bool indexOK = true; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + + /* pg_dist_rebalance_jobs_depend.depends_on = $jobid */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_depend_depends_on, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobsDepends, + DistRebalanceJobsDependDependsOnIndexId(), + indexOK, + NULL, scanKeyCount, scanKey); + + List *dependantJobs = NIL; + HeapTuple heapTuple = NULL; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + Form_pg_dist_rebalance_jobs_depend depend = + (Form_pg_dist_rebalance_jobs_depend) GETSTRUCT(heapTuple); + + int64 *dJobid = palloc0(sizeof(int64)); + *dJobid = depend->jobid; + + dependantJobs = lappend(dependantJobs, dJobid); + } + + systable_endscan(scanDescriptor); + table_close(pgDistRebalanceJobsDepends, NoLock); + + return dependantJobs; +} + + +void +UnscheduleDependantJobs(int64 jobid) +{ + Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), + RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); + + List *dependantJobs = GetDependantJobs(jobid); + while (list_length(dependantJobs) > 0) + { + /* pop last item from stack */ + int64 cJobid = *(int64 *) llast(dependantJobs); + dependantJobs = list_delete_last(dependantJobs); + + /* push new dependant jobs on to stack */ + dependantJobs = list_concat(dependantJobs, GetDependantJobs(cJobid)); + + /* unschedule current job */ + { + ScanKeyData scanKey[1] = { 0 }; + int scanKeyCount = 1; + + /* WHERE jobid = job->jobid */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(cJobid)); + const bool indexOK = true; + SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, + DistRebalanceJobsJobsIdIndexId(), + indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find rebalance job entry for jobid: " + UINT64_FORMAT, cJobid))); + } + + Datum values[Natts_pg_dist_rebalance_jobs] = { 0 }; + bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 }; + bool replace[Natts_pg_dist_rebalance_jobs] = { 0 }; + + values[Anum_pg_dist_rebalance_jobs_status - 1] = + ObjectIdGetDatum(JobStatusUnscheduledId()); + isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false; + replace[Anum_pg_dist_rebalance_jobs_status - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, + replace); + CatalogTupleUpdate(pgDistRebalanceJobs, &heapTuple->t_self, heapTuple); + + systable_endscan(scanDescriptor); + } + } + + table_close(pgDistRebalanceJobs, NoLock); } diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index c171efaa7..47ada61ce 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -77,7 +77,7 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text); #include "udfs/isolate_tenant_to_new_shard/11.1-1.sql" -CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'done', 'error'); +CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'done', 'error', 'unscheduled'); ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog; CREATE TABLE citus.pg_dist_rebalance_jobs( diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index e0f797d03..40f59270f 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -909,7 +909,7 @@ RebalanceJobsBackgroundWorkerMain(Datum arg) * later due to the committing and starting of new transactions */ MemoryContext oldContext = MemoryContextSwitchTo(perJobContext); - RebalanceJob *job = GetScheduledRebalanceJob(); + RebalanceJob *job = GetRunableRebalanceJob(); MemoryContextSwitchTo(oldContext); if (job) diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 4986a47ba..25ae4ed5e 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -240,6 +240,8 @@ extern Oid DistPartitionColocationidIndexId(void); extern Oid DistRebalanceJobsJobsIdIndexId(void); extern Oid DistRebalanceJobsStatusJobsIdIndexId(void); extern Oid DistRebalanceJobsDependRelationId(void); +extern Oid DistRebalanceJobsDependJobIdIndexId(void); +extern Oid DistRebalanceJobsDependDependsOnIndexId(void); extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardShardidIndexId(void); extern Oid DistPlacementShardidIndexId(void); @@ -276,6 +278,7 @@ extern Oid JobStatusScheduledId(void); extern Oid JobStatusRunningId(void); extern Oid JobStatusDoneId(void); extern Oid JobStatusErrorId(void); +extern Oid JobStatusUnscheduledId(void); /* user related functions */ extern Oid CitusExtensionOwner(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index ebdc8b458..071c3ad6b 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -210,7 +210,8 @@ typedef enum RebalanceJobStatus REBALANCE_JOB_STATUS_SCHEDULED, REBALANCE_JOB_STATUS_RUNNING, REBALANCE_JOB_STATUS_DONE, - REBALANCE_JOB_STATUS_ERROR + REBALANCE_JOB_STATUS_ERROR, + REBALANCE_JOB_STATUS_UNSCHEDULED } RebalanceJobStatus; @@ -333,11 +334,13 @@ extern bool HasScheduledRebalanceJobs(void); extern int64 GetNextRebalanceJobId(void); extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount, int64 dependingJobIds[]); -extern RebalanceJob * GetScheduledRebalanceJob(void); +extern bool JobHasUmnetDependencies(int64 jobid); +extern RebalanceJob * GetRunableRebalanceJob(void); extern void ResetRunningJobs(void); extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId); extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus); -extern void UpdateJobError(RebalanceJob *job, ErrorData *edata); +extern bool UpdateJobError(RebalanceJob *job, ErrorData *edata); +extern void UnscheduleDependantJobs(int64 jobid); extern bool IsRebalanceJobStatusTerminal(RebalanceJobStatus status); extern Oid RebalanceJobStatusOid(RebalanceJobStatus status); #endif /* METADATA_UTILITY_H */ diff --git a/src/include/distributed/pg_dist_rebalance_jobs_depend.h b/src/include/distributed/pg_dist_rebalance_jobs_depend.h index 34f94d0e8..bf3208d12 100644 --- a/src/include/distributed/pg_dist_rebalance_jobs_depend.h +++ b/src/include/distributed/pg_dist_rebalance_jobs_depend.h @@ -2,6 +2,16 @@ #ifndef CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H #define CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H +typedef struct FormData_pg_dist_rebalance_jobs_depend +{ + int64 jobid; + int64 depends_on; +#ifdef CATALOG_VARLEN /* variable-length fields start here */ +#endif +} FormData_pg_dist_rebalance_jobs_depend; +typedef FormData_pg_dist_rebalance_jobs_depend *Form_pg_dist_rebalance_jobs_depend; + + /* ---------------- * compiler constants for pg_dist_rebalance_jobs_depend * ----------------