diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index dcc1fe1ed..5fbb8ae88 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -141,17 +141,17 @@ typedef struct MetadataCacheData bool extensionLoaded; Oid distShardRelationId; Oid distPlacementRelationId; - Oid distRebalanceJobsRelationId; - Oid distRebalanceJobsJobsIndexId; - Oid distRebalanceJobsStatusJobsIndexId; - Oid distRebalanceJobsDependRelationId; - Oid distRebalanceJobsDependsJobIdIndexId; - Oid distRebalanceJobsDependsDependsOnIndexId; - Oid jobStatusScheduledId; - Oid jobStatusRunningId; - Oid jobStatusDoneId; - Oid jobStatusErrorId; - Oid jobStatusSnscheduledId; + Oid distBackgroundTasksRelationId; + Oid distBackgroundTasksTaskIdIndexId; + Oid distBackgroundTasksStatusTaskIdIndexId; + Oid distBackgroundTaskssDependRelationId; + Oid distBackgroundTasksDependTaskIdIndexId; + Oid distBackgroundTasksDependDependsOnIndexId; + Oid citusTaskStatusScheduledId; + Oid citusTaskStatusRunningId; + Oid citusTaskStatusDoneId; + Oid citusTaskStatusErrorId; + Oid citusTaskStatusSnscheduledId; Oid distRebalanceStrategyRelationId; Oid distNodeRelationId; Oid distNodeNodeIdIndexId; @@ -2376,62 +2376,62 @@ DistLocalGroupIdRelationId(void) Oid -DistRebalanceJobsRelationId(void) +DistBackgroundTasksRelationId(void) { - CachedRelationLookup("pg_dist_rebalance_jobs", - &MetadataCache.distRebalanceJobsRelationId); + CachedRelationLookup("pg_dist_background_tasks", + &MetadataCache.distBackgroundTasksRelationId); - return MetadataCache.distRebalanceJobsRelationId; + return MetadataCache.distBackgroundTasksRelationId; } Oid -DistRebalanceJobsJobsIdIndexId(void) +DistBackgroundTasksTaskIdIndexId(void) { - CachedRelationLookup("pg_dist_rebalance_jobs_jobid_index", - &MetadataCache.distRebalanceJobsJobsIndexId); + CachedRelationLookup("pg_dist_background_tasks_task_id_index", + &MetadataCache.distBackgroundTasksTaskIdIndexId); - return MetadataCache.distRebalanceJobsJobsIndexId; + return MetadataCache.distBackgroundTasksTaskIdIndexId; } Oid -DistRebalanceJobsStatusJobsIdIndexId(void) +DistBackgroundTasksStatusTaskIdIndexId(void) { - CachedRelationLookup("pg_dist_rebalance_jobs_status_jobid_index", - &MetadataCache.distRebalanceJobsStatusJobsIndexId); + CachedRelationLookup("pg_dist_background_tasks_status_task_id_index", + &MetadataCache.distBackgroundTasksStatusTaskIdIndexId); - return MetadataCache.distRebalanceJobsStatusJobsIndexId; + return MetadataCache.distBackgroundTasksStatusTaskIdIndexId; } Oid -DistRebalanceJobsDependRelationId(void) +DistBackgroundTaskssDependRelationId(void) { - CachedRelationLookup("pg_dist_rebalance_jobs_depend", - &MetadataCache.distRebalanceJobsDependRelationId); + CachedRelationLookup("pg_dist_background_tasks_depend", + &MetadataCache.distBackgroundTaskssDependRelationId); - return MetadataCache.distRebalanceJobsDependRelationId; + return MetadataCache.distBackgroundTaskssDependRelationId; } Oid -DistRebalanceJobsDependJobIdIndexId(void) +DistBackgroundTasksDependTaskIdIndexId(void) { - CachedRelationLookup("pg_dist_rebalance_jobs_depend_jobid", - &MetadataCache.distRebalanceJobsDependsJobIdIndexId); + CachedRelationLookup("pg_dist_background_tasks_depend_task_id", + &MetadataCache.distBackgroundTasksDependTaskIdIndexId); - return MetadataCache.distRebalanceJobsDependsJobIdIndexId; + return MetadataCache.distBackgroundTasksDependTaskIdIndexId; } Oid -DistRebalanceJobsDependDependsOnIndexId(void) +DistBackgroundTasksDependDependsOnIndexId(void) { - CachedRelationLookup("pg_dist_rebalance_jobs_depend_depends_on", - &MetadataCache.distRebalanceJobsDependsDependsOnIndexId); + CachedRelationLookup("pg_dist_background_tasks_depend_depends_on", + &MetadataCache.distBackgroundTasksDependDependsOnIndexId); - return MetadataCache.distRebalanceJobsDependsDependsOnIndexId; + return MetadataCache.distBackgroundTasksDependDependsOnIndexId; } @@ -3158,67 +3158,67 @@ SecondaryNodeRoleId(void) Oid -JobStatusScheduledId(void) +CitusTaskStatusScheduledId(void) { - if (!MetadataCache.jobStatusScheduledId) + if (!MetadataCache.citusTaskStatusScheduledId) { - MetadataCache.jobStatusScheduledId = - LookupStringEnumValueId("citus_job_status", "scheduled"); + MetadataCache.citusTaskStatusScheduledId = + LookupStringEnumValueId("citus_task_status", "scheduled"); } - return MetadataCache.jobStatusScheduledId; + return MetadataCache.citusTaskStatusScheduledId; } Oid -JobStatusRunningId(void) +CitusTaskStatusRunningId(void) { - if (!MetadataCache.jobStatusRunningId) + if (!MetadataCache.citusTaskStatusRunningId) { - MetadataCache.jobStatusRunningId = - LookupStringEnumValueId("citus_job_status", "running"); + MetadataCache.citusTaskStatusRunningId = + LookupStringEnumValueId("citus_task_status", "running"); } - return MetadataCache.jobStatusRunningId; + return MetadataCache.citusTaskStatusRunningId; } Oid -JobStatusDoneId(void) +CitusTaskStatusDoneId(void) { - if (!MetadataCache.jobStatusDoneId) + if (!MetadataCache.citusTaskStatusDoneId) { - MetadataCache.jobStatusDoneId = - LookupStringEnumValueId("citus_job_status", "done"); + MetadataCache.citusTaskStatusDoneId = + LookupStringEnumValueId("citus_task_status", "done"); } - return MetadataCache.jobStatusDoneId; + return MetadataCache.citusTaskStatusDoneId; } Oid -JobStatusErrorId(void) +CitusTaskStatusErrorId(void) { - if (!MetadataCache.jobStatusErrorId) + if (!MetadataCache.citusTaskStatusErrorId) { - MetadataCache.jobStatusErrorId = - LookupStringEnumValueId("citus_job_status", "error"); + MetadataCache.citusTaskStatusErrorId = + LookupStringEnumValueId("citus_task_status", "error"); } - return MetadataCache.jobStatusErrorId; + return MetadataCache.citusTaskStatusErrorId; } Oid -JobStatusUnscheduledId(void) +CitusTaskStatusSnscheduledId(void) { - if (!MetadataCache.jobStatusSnscheduledId) + if (!MetadataCache.citusTaskStatusSnscheduledId) { - MetadataCache.jobStatusSnscheduledId = - LookupStringEnumValueId("citus_job_status", "unscheduled"); + MetadataCache.citusTaskStatusSnscheduledId = + LookupStringEnumValueId("citus_task_status", "unscheduled"); } - return MetadataCache.jobStatusSnscheduledId; + return MetadataCache.citusTaskStatusSnscheduledId; } diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index a7ba887a5..cc020705e 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -45,7 +45,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" -#include "distributed/pg_dist_rebalance_jobs.h" +#include "distributed/pg_dist_background_tasks.h" #include "distributed/pg_dist_rebalance_jobs_depend.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_placement.h" @@ -2228,26 +2228,26 @@ HasScheduledRebalanceJobs() ScanKeyData scanKey[1]; bool indexOK = true; - Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), + Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(), AccessShareLock); /* find any job in states listed here */ - RebalanceJobStatus jobs[] = { - REBALANCE_JOB_STATUS_RUNNING, - REBALANCE_JOB_STATUS_SCHEDULED + BackgroundTaskStatus jobs[] = { + BACKGROUND_TASK_STATUS_RUNNING, + BACKGROUND_TASK_STATUS_SCHEDULED }; bool hasScheduledJob = false; for (int i = 0; !hasScheduledJob && i < sizeof(jobs) / sizeof(jobs[0]); i++) { /* pg_dist_rebalance_jobs.status == jobs[i] */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status, + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_status, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(RebalanceJobStatusOid(jobs[i]))); SysScanDesc scanDescriptor = systable_beginscan( pgDistRebalanceJobs, - DistRebalanceJobsStatusJobsIdIndexId(), + DistBackgroundTasksStatusTaskIdIndexId(), indexOK, NULL, scanKeyCount, scanKey); @@ -2266,42 +2266,42 @@ HasScheduledRebalanceJobs() } -static RebalanceJobStatus +static BackgroundTaskStatus RebalanceJobStatusByOid(Oid enumOid) { - if (enumOid == JobStatusDoneId()) + if (enumOid == CitusTaskStatusDoneId()) { - return REBALANCE_JOB_STATUS_DONE; + return BACKGROUND_TASK_STATUS_DONE; } - else if (enumOid == JobStatusScheduledId()) + else if (enumOid == CitusTaskStatusScheduledId()) { - return REBALANCE_JOB_STATUS_SCHEDULED; + return BACKGROUND_TASK_STATUS_SCHEDULED; } - else if (enumOid == JobStatusRunningId()) + else if (enumOid == CitusTaskStatusRunningId()) { - return REBALANCE_JOB_STATUS_RUNNING; + return BACKGROUND_TASK_STATUS_RUNNING; } - else if (enumOid == JobStatusErrorId()) + else if (enumOid == CitusTaskStatusErrorId()) { - return REBALANCE_JOB_STATUS_ERROR; + return BACKGROUND_TASK_STATUS_ERROR; } - else if (enumOid == JobStatusUnscheduledId()) + else if (enumOid == CitusTaskStatusSnscheduledId()) { - return REBALANCE_JOB_STATUS_UNSCHEDULED; + return BACKGROUND_TASK_STATUS_UNSCHEDULED; } ereport(ERROR, (errmsg("unknown enum value for citus_job_status"))); - return REBALANCE_JOB_STATUS_UNKNOWN; + return BACKGROUND_TASK_STATUS_UNKNOWN; } bool -IsRebalanceJobStatusTerminal(RebalanceJobStatus status) +IsRebalanceJobStatusTerminal(BackgroundTaskStatus status) { switch (status) { - case REBALANCE_JOB_STATUS_DONE: - case REBALANCE_JOB_STATUS_ERROR: - case REBALANCE_JOB_STATUS_UNSCHEDULED: + case BACKGROUND_TASK_STATUS_DONE: + case BACKGROUND_TASK_STATUS_ERROR: + case BACKGROUND_TASK_STATUS_UNSCHEDULED: { return true; } @@ -2315,33 +2315,33 @@ IsRebalanceJobStatusTerminal(RebalanceJobStatus status) Oid -RebalanceJobStatusOid(RebalanceJobStatus status) +RebalanceJobStatusOid(BackgroundTaskStatus status) { switch (status) { - case REBALANCE_JOB_STATUS_SCHEDULED: + case BACKGROUND_TASK_STATUS_SCHEDULED: { - return JobStatusScheduledId(); + return CitusTaskStatusScheduledId(); } - case REBALANCE_JOB_STATUS_RUNNING: + case BACKGROUND_TASK_STATUS_RUNNING: { - return JobStatusRunningId(); + return CitusTaskStatusRunningId(); } - case REBALANCE_JOB_STATUS_DONE: + case BACKGROUND_TASK_STATUS_DONE: { - return JobStatusDoneId(); + return CitusTaskStatusDoneId(); } - case REBALANCE_JOB_STATUS_ERROR: + case BACKGROUND_TASK_STATUS_ERROR: { - return JobStatusErrorId(); + return CitusTaskStatusErrorId(); } - case REBALANCE_JOB_STATUS_UNSCHEDULED: + case BACKGROUND_TASK_STATUS_UNSCHEDULED: { - return JobStatusUnscheduledId(); + return CitusTaskStatusSnscheduledId(); } default: @@ -2353,9 +2353,9 @@ RebalanceJobStatusOid(RebalanceJobStatus status) int64 -GetNextRebalanceJobId(void) +GetNextBackgroundTaskTaskId(void) { - text *sequenceName = cstring_to_text(REBALANCE_JOB_JOBID_SEQUENCE_NAME); + text *sequenceName = cstring_to_text(PG_DIST_BACKGROUND_TASK_TASK_ID_SEQUENCE_NAME); Oid sequenceId = ResolveRelationId(sequenceName, false); Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); Oid savedUserId = InvalidOid; @@ -2381,12 +2381,12 @@ ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount, { RebalanceJob *job = NULL; - Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), + Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(), RowExclusiveLock); Relation pgDistRebalanceJobsDepend = NULL; if (dependingJobCount > 0) { - pgDistRebalanceJobsDepend = table_open(DistRebalanceJobsDependRelationId(), + pgDistRebalanceJobsDepend = table_open(DistBackgroundTaskssDependRelationId(), RowExclusiveLock); } @@ -2394,21 +2394,21 @@ ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount, /* 2. insert new job */ { - Datum values[Natts_pg_dist_rebalance_jobs] = { 0 }; - bool nulls[Natts_pg_dist_rebalance_jobs] = { 0 }; + Datum values[Natts_pg_dist_background_tasks] = { 0 }; + bool nulls[Natts_pg_dist_background_tasks] = { 0 }; memset(nulls, true, sizeof(nulls)); - int64 jobid = GetNextRebalanceJobId(); + int64 jobid = GetNextBackgroundTaskTaskId(); - values[Anum_pg_dist_rebalance_jobs_jobid - 1] = Int64GetDatum(jobid); - nulls[Anum_pg_dist_rebalance_jobs_jobid - 1] = false; + values[Anum_pg_dist_background_tasks_task_id - 1] = Int64GetDatum(jobid); + nulls[Anum_pg_dist_background_tasks_task_id - 1] = false; - values[Anum_pg_dist_rebalance_jobs_status - 1] = JobStatusScheduledId(); - nulls[Anum_pg_dist_rebalance_jobs_status - 1] = false; + values[Anum_pg_dist_background_tasks_status - 1] = CitusTaskStatusScheduledId(); + nulls[Anum_pg_dist_background_tasks_status - 1] = false; - values[Anum_pg_dist_rebalance_jobs_command - 1] = CStringGetTextDatum(command); - nulls[Anum_pg_dist_rebalance_jobs_command - 1] = false; + values[Anum_pg_dist_background_tasks_command - 1] = CStringGetTextDatum(command); + nulls[Anum_pg_dist_background_tasks_command - 1] = false; HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistRebalanceJobs), values, nulls); @@ -2417,7 +2417,7 @@ ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount, job = palloc0(sizeof(RebalanceJob)); job->jobid = jobid; - job->status = REBALANCE_JOB_STATUS_SCHEDULED; + job->status = BACKGROUND_TASK_STATUS_SCHEDULED; job->command = pstrdup(command); } @@ -2456,42 +2456,43 @@ ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount, void -ResetRunningJobs(void) +ResetRunningBackgroundTasks(void) { const int scanKeyCount = 1; ScanKeyData scanKey[1]; const bool indexOK = true; - Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), + Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(), AccessShareLock); /* pg_dist_rebalance_jobs.status == 'running' */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status, - BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(JobStatusRunningId())); + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_status, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum( + CitusTaskStatusRunningId())); SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, - DistRebalanceJobsStatusJobsIdIndexId(), + DistBackgroundTasksStatusTaskIdIndexId(), indexOK, NULL, scanKeyCount, scanKey); HeapTuple jobTuple = NULL; while (HeapTupleIsValid(jobTuple = systable_getnext(scanDescriptor))) { - Datum values[Natts_pg_dist_rebalance_jobs] = { 0 }; - bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 }; - bool replace[Natts_pg_dist_rebalance_jobs] = { 0 }; + Datum values[Natts_pg_dist_background_tasks] = { 0 }; + bool isnull[Natts_pg_dist_background_tasks] = { 0 }; + bool replace[Natts_pg_dist_background_tasks] = { 0 }; TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); heap_deform_tuple(jobTuple, tupleDescriptor, values, isnull); - values[Anum_pg_dist_rebalance_jobs_status - 1] = - ObjectIdGetDatum(JobStatusScheduledId()); - isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false; - replace[Anum_pg_dist_rebalance_jobs_status - 1] = true; + values[Anum_pg_dist_background_tasks_status - 1] = + ObjectIdGetDatum(CitusTaskStatusScheduledId()); + isnull[Anum_pg_dist_background_tasks_status - 1] = false; + replace[Anum_pg_dist_background_tasks_status - 1] = true; - values[Anum_pg_dist_rebalance_jobs_pid - 1] = InvalidOid; - isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = true; - replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true; + values[Anum_pg_dist_background_tasks_pid - 1] = InvalidOid; + isnull[Anum_pg_dist_background_tasks_pid - 1] = true; + replace[Anum_pg_dist_background_tasks_pid - 1] = true; jobTuple = heap_modify_tuple(jobTuple, tupleDescriptor, values, isnull, replace); @@ -2511,8 +2512,9 @@ JobHasUmnetDependencies(int64 jobid) { bool hasUnmetDependency = false; - Relation pgDistRebalanceJobsDepend = table_open(DistRebalanceJobsDependRelationId(), - AccessShareLock); + Relation pgDistRebalanceJobsDepend = table_open( + DistBackgroundTaskssDependRelationId(), + AccessShareLock); const int scanKeyCount = 1; ScanKeyData scanKey[1] = { 0 }; @@ -2523,7 +2525,7 @@ JobHasUmnetDependencies(int64 jobid) BTEqualStrategyNumber, F_INT8EQ, jobid); SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobsDepend, - DistRebalanceJobsDependJobIdIndexId(), + DistBackgroundTasksDependTaskIdIndexId(), indexOK, NULL, scanKeyCount, scanKey); @@ -2539,7 +2541,7 @@ JobHasUmnetDependencies(int64 jobid) * Only when the status of all depending jobs is done we clear this job and say * that is has no unmet dependencies. */ - if (dependingJob->status == REBALANCE_JOB_STATUS_DONE) + if (dependingJob->status == BACKGROUND_TASK_STATUS_DONE) { continue; } @@ -2549,7 +2551,7 @@ JobHasUmnetDependencies(int64 jobid) * have errored. Once we move the job to error it should unschedule all dependant * jobs recursively. */ - Assert(dependingJob->status != REBALANCE_JOB_STATUS_ERROR); + Assert(dependingJob->status != BACKGROUND_TASK_STATUS_ERROR); hasUnmetDependency = true; break; @@ -2569,36 +2571,36 @@ GetRunableRebalanceJob(void) ScanKeyData scanKey[1]; bool indexOK = true; - Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), + Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(), AccessShareLock); - RebalanceJobStatus jobStatus[] = { - REBALANCE_JOB_STATUS_SCHEDULED + BackgroundTaskStatus jobStatus[] = { + BACKGROUND_TASK_STATUS_SCHEDULED }; RebalanceJob *job = NULL; for (int i = 0; !job && i < sizeof(jobStatus) / sizeof(jobStatus[0]); i++) { /* pg_dist_rebalance_jobs.status == jobStatus[i] */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status, + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_status, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum( RebalanceJobStatusOid(jobStatus[i]))); SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, - DistRebalanceJobsStatusJobsIdIndexId(), + DistBackgroundTasksStatusTaskIdIndexId(), indexOK, NULL, scanKeyCount, scanKey); HeapTuple jobTuple = NULL; while (HeapTupleIsValid(jobTuple = systable_getnext(scanDescriptor))) { - Datum datumArray[Natts_pg_dist_rebalance_jobs]; - bool isNullArray[Natts_pg_dist_rebalance_jobs]; + Datum datumArray[Natts_pg_dist_background_tasks]; + bool isNullArray[Natts_pg_dist_background_tasks]; TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray); int64 jobid = DatumGetInt64( - datumArray[Anum_pg_dist_rebalance_jobs_jobid - 1]); + datumArray[Anum_pg_dist_background_tasks_task_id - 1]); if (JobHasUmnetDependencies(jobid)) { continue; @@ -2607,10 +2609,10 @@ GetRunableRebalanceJob(void) job = palloc0(sizeof(RebalanceJob)); job->jobid = jobid; job->status = RebalanceJobStatusByOid( - DatumGetObjectId(datumArray[Anum_pg_dist_rebalance_jobs_status - 1])); + DatumGetObjectId(datumArray[Anum_pg_dist_background_tasks_status - 1])); job->command = text_to_cstring( - DatumGetTextP(datumArray[Anum_pg_dist_rebalance_jobs_command - 1])); + DatumGetTextP(datumArray[Anum_pg_dist_background_tasks_command - 1])); break; } @@ -2631,32 +2633,32 @@ GetScheduledRebalanceJobByJobID(int64 jobId) ScanKeyData scanKey[1]; bool indexOK = true; - Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), + Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(), AccessShareLock); /* pg_dist_rebalance_jobs.jobid == $jobId */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid, + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id, BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId)); SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, - DistRebalanceJobsStatusJobsIdIndexId(), + DistBackgroundTasksStatusTaskIdIndexId(), indexOK, NULL, scanKeyCount, scanKey); HeapTuple jobTuple = systable_getnext(scanDescriptor); RebalanceJob *job = NULL; if (HeapTupleIsValid(jobTuple)) { - Datum datumArray[Natts_pg_dist_rebalance_jobs]; - bool isNullArray[Natts_pg_dist_rebalance_jobs]; + Datum datumArray[Natts_pg_dist_background_tasks]; + bool isNullArray[Natts_pg_dist_background_tasks]; TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray); job = palloc0(sizeof(RebalanceJob)); - job->jobid = DatumGetInt64(datumArray[Anum_pg_dist_rebalance_jobs_jobid - 1]); + job->jobid = DatumGetInt64(datumArray[Anum_pg_dist_background_tasks_task_id - 1]); job->status = RebalanceJobStatusByOid( - DatumGetObjectId(datumArray[Anum_pg_dist_rebalance_jobs_status - 1])); + DatumGetObjectId(datumArray[Anum_pg_dist_background_tasks_status - 1])); job->command = text_to_cstring( - DatumGetTextP(datumArray[Anum_pg_dist_rebalance_jobs_command - 1])); + DatumGetTextP(datumArray[Anum_pg_dist_background_tasks_command - 1])); } systable_endscan(scanDescriptor); @@ -2667,10 +2669,10 @@ GetScheduledRebalanceJobByJobID(int64 jobId) void -UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status, int32 *retry_count, +UpdateJobStatus(int64 jobid, pid_t *pid, BackgroundTaskStatus status, int32 *retry_count, char *message) { - Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), + Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); @@ -2678,12 +2680,12 @@ UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status, int32 *retry int scanKeyCount = 1; /* WHERE jobid = job->jobid */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid, + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id, BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid)); const bool indexOK = true; SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, - DistRebalanceJobsJobsIdIndexId(), + DistBackgroundTasksTaskIdIndexId(), indexOK, NULL, scanKeyCount, scanKey); @@ -2694,9 +2696,9 @@ UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status, int32 *retry UINT64_FORMAT, jobid))); } - Datum values[Natts_pg_dist_rebalance_jobs] = { 0 }; - bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 }; - bool replace[Natts_pg_dist_rebalance_jobs] = { 0 }; + Datum values[Natts_pg_dist_background_tasks] = { 0 }; + bool isnull[Natts_pg_dist_background_tasks] = { 0 }; + bool replace[Natts_pg_dist_background_tasks] = { 0 }; heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); @@ -2708,34 +2710,34 @@ UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status, int32 *retry if (pid) { - UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_pid, false, Int32GetDatum(*pid)); + UPDATE_FIELD(Anum_pg_dist_background_tasks_pid, false, Int32GetDatum(*pid)); } else { - UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_pid, true, InvalidOid); + UPDATE_FIELD(Anum_pg_dist_background_tasks_pid, true, InvalidOid); } Oid statusOid = ObjectIdGetDatum(RebalanceJobStatusOid(status)); - UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_status, false, statusOid); + UPDATE_FIELD(Anum_pg_dist_background_tasks_status, false, statusOid); if (retry_count) { - UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_retry_count, false, Int32GetDatum( + UPDATE_FIELD(Anum_pg_dist_background_tasks_retry_count, false, Int32GetDatum( *retry_count)); } else { - UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_retry_count, true, InvalidOid); + UPDATE_FIELD(Anum_pg_dist_background_tasks_retry_count, true, InvalidOid); } if (message) { Oid messageOid = CStringGetTextDatum(message); - UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_message, false, messageOid); + UPDATE_FIELD(Anum_pg_dist_background_tasks_message, false, messageOid); } else { - UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_message, true, InvalidOid); + UPDATE_FIELD(Anum_pg_dist_background_tasks_message, true, InvalidOid); } #undef UPDATE_FIELD @@ -2754,8 +2756,9 @@ UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status, int32 *retry static List * GetDependantJobs(int64 jobid) { - Relation pgDistRebalanceJobsDepends = table_open(DistRebalanceJobsDependRelationId(), - RowExclusiveLock); + Relation pgDistRebalanceJobsDepends = table_open( + DistBackgroundTaskssDependRelationId(), + RowExclusiveLock); const bool indexOK = true; ScanKeyData scanKey[1]; int scanKeyCount = 1; @@ -2765,7 +2768,7 @@ GetDependantJobs(int64 jobid) BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid)); SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobsDepends, - DistRebalanceJobsDependDependsOnIndexId(), + DistBackgroundTasksDependDependsOnIndexId(), indexOK, NULL, scanKeyCount, scanKey); @@ -2792,7 +2795,7 @@ GetDependantJobs(int64 jobid) void UnscheduleDependantJobs(int64 jobid) { - Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), + Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); @@ -2812,11 +2815,11 @@ UnscheduleDependantJobs(int64 jobid) int scanKeyCount = 1; /* WHERE jobid = job->jobid */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid, + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id, BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(cJobid)); const bool indexOK = true; SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, - DistRebalanceJobsJobsIdIndexId(), + DistBackgroundTasksTaskIdIndexId(), indexOK, NULL, scanKeyCount, scanKey); @@ -2827,14 +2830,14 @@ UnscheduleDependantJobs(int64 jobid) UINT64_FORMAT, cJobid))); } - Datum values[Natts_pg_dist_rebalance_jobs] = { 0 }; - bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 }; - bool replace[Natts_pg_dist_rebalance_jobs] = { 0 }; + Datum values[Natts_pg_dist_background_tasks] = { 0 }; + bool isnull[Natts_pg_dist_background_tasks] = { 0 }; + bool replace[Natts_pg_dist_background_tasks] = { 0 }; - values[Anum_pg_dist_rebalance_jobs_status - 1] = - ObjectIdGetDatum(JobStatusUnscheduledId()); - isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false; - replace[Anum_pg_dist_rebalance_jobs_status - 1] = true; + values[Anum_pg_dist_background_tasks_status - 1] = + ObjectIdGetDatum(CitusTaskStatusSnscheduledId()); + isnull[Anum_pg_dist_background_tasks_status - 1] = false; + replace[Anum_pg_dist_background_tasks_status - 1] = true; heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 7129b10dc..6f90c1f4a 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -841,6 +841,17 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + /* TODO remove before merge */ + DefineCustomBoolVariable( + "citus.background_task_monitor_debug_delay", + NULL, + NULL, + &BackgroundTaskMonitorDebugDelay, + false, + PGC_SIGHUP, + GUC_UNIT_MS, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.check_available_space_before_move", gettext_noop("When enabled will check free disk space before a shard move"), @@ -1893,17 +1904,6 @@ RegisterCitusConfigVariables(void) GUC_UNIT_MS, NULL, NULL, NULL); - /* TODO remove before merge */ - DefineCustomBoolVariable( - "citus.rebalance_job_debug_delay", - NULL, - NULL, - &RebalanceJobDebugDelay, - false, - PGC_SIGHUP, - GUC_UNIT_MS, - NULL, NULL, NULL); - DefineCustomIntVariable( "citus.recover_2pc_interval", gettext_noop("Sets the time to wait between recovering 2PCs."), diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 47ada61ce..bd968e430 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -77,31 +77,31 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text); #include "udfs/isolate_tenant_to_new_shard/11.1-1.sql" -CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'done', 'error', 'unscheduled'); -ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog; +CREATE TYPE citus.citus_task_status AS ENUM ('scheduled', 'running', 'done', 'error', 'unscheduled'); +ALTER TYPE citus.citus_task_status SET SCHEMA pg_catalog; -CREATE TABLE citus.pg_dist_rebalance_jobs( - jobid bigserial NOT NULL, +CREATE TABLE citus.pg_dist_background_tasks( + task_id bigserial NOT NULL, pid integer, - status pg_catalog.citus_job_status default 'scheduled' NOT NULL, + status pg_catalog.citus_task_status default 'scheduled' NOT NULL, command text NOT NULL, retry_count integer, message text ); -ALTER TABLE citus.pg_dist_rebalance_jobs SET SCHEMA pg_catalog; -CREATE UNIQUE INDEX pg_dist_rebalance_jobs_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(jobid); -CREATE INDEX pg_dist_rebalance_jobs_status_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(status, jobid); +ALTER TABLE citus.pg_dist_background_tasks SET SCHEMA pg_catalog; +CREATE UNIQUE INDEX pg_dist_background_tasks_task_id_index ON pg_catalog.pg_dist_background_tasks using btree(task_id); +CREATE INDEX pg_dist_background_tasks_status_task_id_index ON pg_catalog.pg_dist_background_tasks using btree(status, task_id); -CREATE TABLE citus.pg_dist_rebalance_jobs_depend( - jobid bigint NOT NULL REFERENCES pg_catalog.pg_dist_rebalance_jobs(jobid) ON DELETE CASCADE, - depends_on bigint NOT NULL REFERENCES pg_catalog.pg_dist_rebalance_jobs(jobid) ON DELETE CASCADE, +CREATE TABLE citus.pg_dist_background_tasks_depend( + task_id bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_tasks(task_id) ON DELETE CASCADE, + depends_on bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_tasks(task_id) ON DELETE CASCADE, - UNIQUE(jobid, depends_on) + UNIQUE(task_id, depends_on) ); -ALTER TABLE citus.pg_dist_rebalance_jobs_depend SET SCHEMA pg_catalog; -CREATE INDEX pg_dist_rebalance_jobs_depend_jobid ON pg_catalog.pg_dist_rebalance_jobs_depend USING btree(jobid); -CREATE INDEX pg_dist_rebalance_jobs_depend_depends_on ON pg_catalog.pg_dist_rebalance_jobs_depend USING btree(depends_on); +ALTER TABLE citus.pg_dist_background_tasks_depend SET SCHEMA pg_catalog; +CREATE INDEX pg_dist_background_tasks_depend_task_id ON pg_catalog.pg_dist_background_tasks_depend USING btree(task_id); +CREATE INDEX pg_dist_background_tasks_depend_depends_on ON pg_catalog.pg_dist_background_tasks_depend USING btree(depends_on); #include "udfs/citus_wait_for_rebalance_job/11.1-1.sql" diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index e37713c87..de788460b 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -29,7 +29,7 @@ #include "distributed/metadata_utility.h" #include "distributed/shard_cleaner.h" -bool RebalanceJobDebugDelay = false; +bool BackgroundTaskMonitorDebugDelay = false; /* Table-of-contents constants for our dynamic shared memory segment. */ #define CITUS_BACKGROUND_JOB_MAGIC 0x51028081 @@ -45,7 +45,7 @@ static BackgroundWorkerHandle * StartCitusBackgroundJobExecuter(char *database, static void ExecuteSqlString(const char *sql); BackgroundWorkerHandle * -StartCitusBackgroundJobWorker(Oid database, Oid extensionOwner) +StartCitusBackgroundTaskMonitorWorker(Oid database, Oid extensionOwner) { BackgroundWorker worker = { 0 }; BackgroundWorkerHandle *handle = NULL; @@ -53,7 +53,7 @@ StartCitusBackgroundJobWorker(Oid database, Oid extensionOwner) /* Configure a worker. */ memset(&worker, 0, sizeof(worker)); SafeSnprintf(worker.bgw_name, BGW_MAXLEN, - "Citus Rebalance Jobs Worker: %u/%u", + "Citus Background Task Monitor: %u/%u", database, extensionOwner); worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; @@ -63,7 +63,7 @@ StartCitusBackgroundJobWorker(Oid database, Oid extensionOwner) worker.bgw_restart_time = BGW_NEVER_RESTART; strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), - "CitusBackgroundJobMain"); + "CitusBackgroundTaskMonitorMain"); worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner, sizeof(Oid)); @@ -82,7 +82,7 @@ StartCitusBackgroundJobWorker(Oid database, Oid extensionOwner) void -CitusBackgroundJobMain(Datum arg) +CitusBackgroundTaskMonitorMain(Datum arg) { Oid databaseOid = DatumGetObjectId(arg); @@ -96,21 +96,24 @@ CitusBackgroundJobMain(Datum arg) /* connect to database, after that we can actually access catalogs */ BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0); + /* TODO get lock to make sure there is only one worker running per databasse */ + /* make worker recognizable in pg_stat_activity */ - pgstat_report_appname("rebalance jobs worker"); + pgstat_report_appname("citus background task monitor"); - ereport(LOG, (errmsg("background jobs runner"))); + ereport(LOG, (errmsg("citus background task monitor"))); - if (RebalanceJobDebugDelay) + if (BackgroundTaskMonitorDebugDelay) { pg_usleep(30 * 1000 * 1000); } - MemoryContext perJobContext = AllocSetContextCreateExtended(CurrentMemoryContext, - "PerJobContext", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); + MemoryContext perTaskContext = + AllocSetContextCreateExtended(CurrentMemoryContext, + "PerTaskContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); /* * First we find all jobs that are running, we need to check if they are still running @@ -121,18 +124,18 @@ CitusBackgroundJobMain(Datum arg) PushActiveSnapshot(GetTransactionSnapshot()); /* TODO have an actual function to check if the worker is still running */ - ResetRunningJobs(); + ResetRunningBackgroundTasks(); PopActiveSnapshot(); CommitTransactionCommand(); } - MemoryContext oldContextPerJob = MemoryContextSwitchTo(perJobContext); + MemoryContext oldContextPerJob = MemoryContextSwitchTo(perTaskContext); bool hasJobs = true; while (hasJobs) { - MemoryContextReset(perJobContext); + MemoryContextReset(perTaskContext); CHECK_FOR_INTERRUPTS(); @@ -141,10 +144,10 @@ CitusBackgroundJobMain(Datum arg) PushActiveSnapshot(GetTransactionSnapshot()); /* - * We need to load the job into the perJobContext as we will switch contexts + * We need to load the job into the perTaskContext as we will switch contexts * later due to the committing and starting of new transactions */ - MemoryContext oldContext = MemoryContextSwitchTo(perJobContext); + MemoryContext oldContext = MemoryContextSwitchTo(perTaskContext); RebalanceJob *job = GetRunableRebalanceJob(); MemoryContextSwitchTo(oldContext); @@ -160,7 +163,7 @@ CitusBackgroundJobMain(Datum arg) PopActiveSnapshot(); CommitTransactionCommand(); - MemoryContextSwitchTo(perJobContext); + MemoryContextSwitchTo(perTaskContext); /* TODO find the actual database and username */ BackgroundWorkerHandle *handle = @@ -181,12 +184,12 @@ CitusBackgroundJobMain(Datum arg) PushActiveSnapshot(GetTransactionSnapshot()); /* Update job status to indicate it is running */ - UpdateJobStatus(job->jobid, &pid, REBALANCE_JOB_STATUS_RUNNING, NULL, NULL); + UpdateJobStatus(job->jobid, &pid, BACKGROUND_TASK_STATUS_RUNNING, NULL, NULL); PopActiveSnapshot(); CommitTransactionCommand(); - MemoryContextSwitchTo(perJobContext); + MemoryContextSwitchTo(perTaskContext); /* TODO keep polling the job */ while (GetBackgroundWorkerPid(handle, &pid) != BGWH_STOPPED) @@ -211,14 +214,14 @@ CitusBackgroundJobMain(Datum arg) PushActiveSnapshot(GetTransactionSnapshot()); /* TODO job can actually also have failed*/ - UpdateJobStatus(job->jobid, NULL, REBALANCE_JOB_STATUS_DONE, NULL, NULL); + UpdateJobStatus(job->jobid, NULL, BACKGROUND_TASK_STATUS_DONE, NULL, NULL); PopActiveSnapshot(); CommitTransactionCommand(); } MemoryContextSwitchTo(oldContextPerJob); - MemoryContextDelete(perJobContext); + MemoryContextDelete(perTaskContext); } diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 2cd16b0c7..c673dd16f 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -730,8 +730,8 @@ CitusMaintenanceDaemonMain(Datum main_arg) "Starting background worker for execution."))); rebalanceBgwHandle = - StartCitusBackgroundJobWorker(MyDatabaseId, - myDbData->userOid); + StartCitusBackgroundTaskMonitorWorker(MyDatabaseId, + myDbData->userOid); if (!rebalanceBgwHandle || GetBackgroundWorkerPid(rebalanceBgwHandle, &rebalanceWorkerPid) == diff --git a/src/include/distributed/background_jobs.h b/src/include/distributed/background_jobs.h index 06178fdac..205ff5d30 100644 --- a/src/include/distributed/background_jobs.h +++ b/src/include/distributed/background_jobs.h @@ -5,11 +5,11 @@ #include "postmaster/bgworker.h" -extern BackgroundWorkerHandle * StartCitusBackgroundJobWorker(Oid database, Oid - extensionOwner); -extern void CitusBackgroundJobMain(Datum arg); +extern BackgroundWorkerHandle * StartCitusBackgroundTaskMonitorWorker(Oid database, Oid + extensionOwner); +extern void CitusBackgroundTaskMonitorMain(Datum arg); extern void CitusBackgroundJobExecuter(Datum main_arg); -extern bool RebalanceJobDebugDelay; +extern bool BackgroundTaskMonitorDebugDelay; #endif /*CITUS_BACKGROUND_JOBS_H */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 25ae4ed5e..6c3019cd4 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -227,7 +227,7 @@ extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); extern Oid DistPlacementRelationId(void); extern Oid DistNodeRelationId(void); -extern Oid DistRebalanceJobsRelationId(void); +extern Oid DistBackgroundTasksRelationId(void); extern Oid DistRebalanceStrategyRelationId(void); extern Oid DistLocalGroupIdRelationId(void); extern Oid DistObjectRelationId(void); @@ -237,11 +237,11 @@ extern Oid DistEnabledCustomAggregatesId(void); extern Oid DistNodeNodeIdIndexId(void); extern Oid DistPartitionLogicalRelidIndexId(void); extern Oid DistPartitionColocationidIndexId(void); -extern Oid DistRebalanceJobsJobsIdIndexId(void); -extern Oid DistRebalanceJobsStatusJobsIdIndexId(void); -extern Oid DistRebalanceJobsDependRelationId(void); -extern Oid DistRebalanceJobsDependJobIdIndexId(void); -extern Oid DistRebalanceJobsDependDependsOnIndexId(void); +extern Oid DistBackgroundTasksTaskIdIndexId(void); +extern Oid DistBackgroundTasksStatusTaskIdIndexId(void); +extern Oid DistBackgroundTaskssDependRelationId(void); +extern Oid DistBackgroundTasksDependTaskIdIndexId(void); +extern Oid DistBackgroundTasksDependDependsOnIndexId(void); extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardShardidIndexId(void); extern Oid DistPlacementShardidIndexId(void); @@ -274,11 +274,11 @@ extern Oid SecondaryNodeRoleId(void); extern Oid CitusCopyFormatTypeId(void); extern Oid TextCopyFormatId(void); extern Oid BinaryCopyFormatId(void); -extern Oid JobStatusScheduledId(void); -extern Oid JobStatusRunningId(void); -extern Oid JobStatusDoneId(void); -extern Oid JobStatusErrorId(void); -extern Oid JobStatusUnscheduledId(void); +extern Oid CitusTaskStatusScheduledId(void); +extern Oid CitusTaskStatusRunningId(void); +extern Oid CitusTaskStatusDoneId(void); +extern Oid CitusTaskStatusErrorId(void); +extern Oid CitusTaskStatusSnscheduledId(void); /* user related functions */ extern Oid CitusExtensionOwner(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 88e5cec21..fb171419d 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -24,7 +24,7 @@ #include "distributed/citus_nodes.h" #include "distributed/connection_management.h" #include "distributed/errormessage.h" -#include "distributed/pg_dist_rebalance_jobs.h" +#include "distributed/pg_dist_background_tasks.h" #include "distributed/relay_utility.h" #include "distributed/worker_manager.h" #include "utils/acl.h" @@ -204,21 +204,21 @@ typedef enum SizeQueryType TABLE_SIZE /* pg_table_size() */ } SizeQueryType; -typedef enum RebalanceJobStatus +typedef enum BackgroundTaskStatus { - REBALANCE_JOB_STATUS_UNKNOWN, - REBALANCE_JOB_STATUS_SCHEDULED, - REBALANCE_JOB_STATUS_RUNNING, - REBALANCE_JOB_STATUS_DONE, - REBALANCE_JOB_STATUS_ERROR, - REBALANCE_JOB_STATUS_UNSCHEDULED -} RebalanceJobStatus; + BACKGROUND_TASK_STATUS_UNKNOWN, + BACKGROUND_TASK_STATUS_SCHEDULED, + BACKGROUND_TASK_STATUS_RUNNING, + BACKGROUND_TASK_STATUS_DONE, + BACKGROUND_TASK_STATUS_ERROR, + BACKGROUND_TASK_STATUS_UNSCHEDULED +} BackgroundTaskStatus; typedef struct RebalanceJob { int64 jobid; - RebalanceJobStatus status; + BackgroundTaskStatus status; char *command; } RebalanceJob; @@ -331,17 +331,17 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid extern void AlterSequenceType(Oid seqOid, Oid typeOid); extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId); extern bool HasScheduledRebalanceJobs(void); -extern int64 GetNextRebalanceJobId(void); +extern int64 GetNextBackgroundTaskTaskId(void); extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount, int64 dependingJobIds[]); extern bool JobHasUmnetDependencies(int64 jobid); extern RebalanceJob * GetRunableRebalanceJob(void); -extern void ResetRunningJobs(void); +extern void ResetRunningBackgroundTasks(void); extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId); -extern void UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status, +extern void UpdateJobStatus(int64 jobid, pid_t *pid, BackgroundTaskStatus status, int32 *retry_count, char *message); extern bool UpdateJobError(RebalanceJob *job, ErrorData *edata); extern void UnscheduleDependantJobs(int64 jobid); -extern bool IsRebalanceJobStatusTerminal(RebalanceJobStatus status); -extern Oid RebalanceJobStatusOid(RebalanceJobStatus status); +extern bool IsRebalanceJobStatusTerminal(BackgroundTaskStatus status); +extern Oid RebalanceJobStatusOid(BackgroundTaskStatus status); #endif /* METADATA_UTILITY_H */ diff --git a/src/include/distributed/pg_dist_background_tasks.h b/src/include/distributed/pg_dist_background_tasks.h new file mode 100644 index 000000000..2881bb38e --- /dev/null +++ b/src/include/distributed/pg_dist_background_tasks.h @@ -0,0 +1,20 @@ + +#ifndef CITUS_PG_DIST_BACKGROUND_TASKS_H +#define CITUS_PG_DIST_BACKGROUND_TASKS_H + +/* ---------------- + * compiler constants for pg_dist_background_tasks + * ---------------- + */ +#define Natts_pg_dist_background_tasks 6 +#define Anum_pg_dist_background_tasks_task_id 1 +#define Anum_pg_dist_background_tasks_pid 2 +#define Anum_pg_dist_background_tasks_status 3 +#define Anum_pg_dist_background_tasks_command 4 +#define Anum_pg_dist_background_tasks_retry_count 5 +#define Anum_pg_dist_background_tasks_message 6 + +#define PG_DIST_BACKGROUND_TASK_TASK_ID_SEQUENCE_NAME \ + "pg_catalog.pg_dist_background_tasks_task_id_seq" + +#endif /* CITUS_PG_DIST_BACKGROUND_TASKS_H */ diff --git a/src/include/distributed/pg_dist_rebalance_jobs.h b/src/include/distributed/pg_dist_rebalance_jobs.h deleted file mode 100644 index 41a1e8b36..000000000 --- a/src/include/distributed/pg_dist_rebalance_jobs.h +++ /dev/null @@ -1,19 +0,0 @@ - -#ifndef CITUS_PG_DIST_REBALANCE_JOBS_H -#define CITUS_PG_DIST_REBALANCE_JOBS_H - -/* ---------------- - * compiler constants for pg_dist_rebalance_jobs - * ---------------- - */ -#define Natts_pg_dist_rebalance_jobs 6 -#define Anum_pg_dist_rebalance_jobs_jobid 1 -#define Anum_pg_dist_rebalance_jobs_pid 2 -#define Anum_pg_dist_rebalance_jobs_status 3 -#define Anum_pg_dist_rebalance_jobs_command 4 -#define Anum_pg_dist_rebalance_jobs_retry_count 5 -#define Anum_pg_dist_rebalance_jobs_message 6 - -#define REBALANCE_JOB_JOBID_SEQUENCE_NAME "pg_catalog.pg_dist_rebalance_jobs_jobid_seq" - -#endif /* CITUS_PG_DIST_REBALANCE_JOBS_H */