diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 5fbb8ae88..7c1f44113 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -144,7 +144,7 @@ typedef struct MetadataCacheData Oid distBackgroundTasksRelationId; Oid distBackgroundTasksTaskIdIndexId; Oid distBackgroundTasksStatusTaskIdIndexId; - Oid distBackgroundTaskssDependRelationId; + Oid distBackgroundTasksDependRelationId; Oid distBackgroundTasksDependTaskIdIndexId; Oid distBackgroundTasksDependDependsOnIndexId; Oid citusTaskStatusScheduledId; @@ -2406,12 +2406,12 @@ DistBackgroundTasksStatusTaskIdIndexId(void) Oid -DistBackgroundTaskssDependRelationId(void) +DistBackgroundTasksDependRelationId(void) { CachedRelationLookup("pg_dist_background_tasks_depend", - &MetadataCache.distBackgroundTaskssDependRelationId); + &MetadataCache.distBackgroundTasksDependRelationId); - return MetadataCache.distBackgroundTaskssDependRelationId; + return MetadataCache.distBackgroundTasksDependRelationId; } diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index cc020705e..70dea09fc 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -46,7 +46,7 @@ #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_background_tasks.h" -#include "distributed/pg_dist_rebalance_jobs_depend.h" +#include "distributed/pg_dist_backrgound_tasks_depend.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_placement.h" #include "distributed/reference_table_utils.h" @@ -2222,52 +2222,52 @@ IsForeignTable(Oid relationId) bool -HasScheduledRebalanceJobs() +HasScheduledBackgroundTask() { - const int scanKeyCount = 1; - ScanKeyData scanKey[1]; - bool indexOK = true; - - Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(), - AccessShareLock); + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTasksRelationId(), AccessShareLock); /* find any job in states listed here */ - BackgroundTaskStatus jobs[] = { + BackgroundTaskStatus taskStatus[] = { BACKGROUND_TASK_STATUS_RUNNING, BACKGROUND_TASK_STATUS_SCHEDULED }; - bool hasScheduledJob = false; - for (int i = 0; !hasScheduledJob && i < sizeof(jobs) / sizeof(jobs[0]); i++) + bool hasScheduledTask = false; + for (int i = 0; !hasScheduledTask && i < lengthof(taskStatus); i++) { - /* pg_dist_rebalance_jobs.status == jobs[i] */ + const int scanKeyCount = 1; + ScanKeyData scanKey[1] = { 0 }; + const bool indexOK = true; + + /* pg_dist_background_tasks.status == taskStatus[i] */ ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_status, BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RebalanceJobStatusOid(jobs[i]))); + ObjectIdGetDatum(CitusTaskStatusOid(taskStatus[i]))); - SysScanDesc scanDescriptor = systable_beginscan( - pgDistRebalanceJobs, - DistBackgroundTasksStatusTaskIdIndexId(), - indexOK, NULL, scanKeyCount, - scanKey); + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasks, + DistBackgroundTasksStatusTaskIdIndexId(), + indexOK, NULL, scanKeyCount, + scanKey); - HeapTuple jobTuple = systable_getnext(scanDescriptor); - if (HeapTupleIsValid(jobTuple)) + HeapTuple taskTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(taskTuple)) { - hasScheduledJob = true; + hasScheduledTask = true; } systable_endscan(scanDescriptor); } - table_close(pgDistRebalanceJobs, AccessShareLock); + table_close(pgDistBackgroundTasks, NoLock); - return hasScheduledJob; + return hasScheduledTask; } static BackgroundTaskStatus -RebalanceJobStatusByOid(Oid enumOid) +BackgroundTaskStatusByOid(Oid enumOid) { if (enumOid == CitusTaskStatusDoneId()) { @@ -2295,7 +2295,7 @@ RebalanceJobStatusByOid(Oid enumOid) bool -IsRebalanceJobStatusTerminal(BackgroundTaskStatus status) +IsCitusTaskStatusTerminal(BackgroundTaskStatus status) { switch (status) { @@ -2315,7 +2315,7 @@ IsRebalanceJobStatusTerminal(BackgroundTaskStatus status) Oid -RebalanceJobStatusOid(BackgroundTaskStatus status) +CitusTaskStatusOid(BackgroundTaskStatus status) { switch (status) { @@ -2365,43 +2365,43 @@ GetNextBackgroundTaskTaskId(void) SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); /* generate new and unique colocation id from sequence */ - Datum jobIdOid = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + Datum taskIdOid = DirectFunctionCall1(nextval_oid, sequenceIdDatum); SetUserIdAndSecContext(savedUserId, savedSecurityContext); - uint64 jobId = DatumGetInt64(jobIdOid); + uint64 taskId = DatumGetInt64(taskIdOid); - return jobId; + return taskId; } -RebalanceJob * -ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount, - int64 dependingJobIds[]) +BackgroundTask * +ScheduleBackgroundTask(char *command, int dependingTaskCount, + int64 dependingTaskIds[]) { - RebalanceJob *job = NULL; + BackgroundTask *task = NULL; - Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(), - RowExclusiveLock); - Relation pgDistRebalanceJobsDepend = NULL; - if (dependingJobCount > 0) + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTasksRelationId(), RowExclusiveLock); + Relation pgDistbackgroundTasksDepend = NULL; + if (dependingTaskCount > 0) { - pgDistRebalanceJobsDepend = table_open(DistBackgroundTaskssDependRelationId(), - RowExclusiveLock); + pgDistbackgroundTasksDepend = + table_open(DistBackgroundTasksDependRelationId(), RowExclusiveLock); } /* 1. TODO verify depending jobs exist and lock them */ - /* 2. insert new job */ + /* 2. insert new task */ { Datum values[Natts_pg_dist_background_tasks] = { 0 }; bool nulls[Natts_pg_dist_background_tasks] = { 0 }; memset(nulls, true, sizeof(nulls)); - int64 jobid = GetNextBackgroundTaskTaskId(); + int64 taskId = GetNextBackgroundTaskTaskId(); - values[Anum_pg_dist_background_tasks_task_id - 1] = Int64GetDatum(jobid); + values[Anum_pg_dist_background_tasks_task_id - 1] = Int64GetDatum(taskId); nulls[Anum_pg_dist_background_tasks_task_id - 1] = false; values[Anum_pg_dist_background_tasks_status - 1] = CitusTaskStatusScheduledId(); @@ -2410,48 +2410,47 @@ ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount, values[Anum_pg_dist_background_tasks_command - 1] = CStringGetTextDatum(command); nulls[Anum_pg_dist_background_tasks_command - 1] = false; - HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistRebalanceJobs), + HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTasks), values, nulls); - CatalogTupleInsert(pgDistRebalanceJobs, newTuple); + CatalogTupleInsert(pgDistBackgroundTasks, newTuple); - job = palloc0(sizeof(RebalanceJob)); - - job->jobid = jobid; - job->status = BACKGROUND_TASK_STATUS_SCHEDULED; - job->command = pstrdup(command); + task = palloc0(sizeof(BackgroundTask)); + task->taskid = taskId; + task->status = BACKGROUND_TASK_STATUS_SCHEDULED; + task->command = pstrdup(command); } /* 3. insert dependencies into catalog */ { - for (int i = 0; i < dependingJobCount; i++) + for (int i = 0; i < dependingTaskCount; i++) { - Assert(pgDistRebalanceJobsDepend != NULL); + Assert(pgDistbackgroundTasksDepend != NULL); - Datum values[Natts_pg_dist_rebalance_jobs_depend] = { 0 }; - bool nulls[Natts_pg_dist_rebalance_jobs_depend] = { 0 }; + Datum values[Natts_pg_dist_background_tasks_depend] = { 0 }; + bool nulls[Natts_pg_dist_background_tasks_depend] = { 0 }; memset(nulls, true, sizeof(nulls)); - values[Anum_pg_dist_rebalance_jobs_depend_jobid - 1] = - Int64GetDatum(job->jobid); - nulls[Anum_pg_dist_rebalance_jobs_depend_jobid - 1] = false; + values[Anum_pg_dist_background_tasks_depend_task_id - 1] = + Int64GetDatum(task->taskid); + nulls[Anum_pg_dist_background_tasks_depend_task_id - 1] = false; - values[Anum_pg_dist_rebalance_jobs_depend_depends_on - 1] = - Int64GetDatum(dependingJobIds[i]); - nulls[Anum_pg_dist_rebalance_jobs_depend_depends_on - 1] = false; + values[Anum_pg_dist_background_tasks_depend_depends_on - 1] = + Int64GetDatum(dependingTaskIds[i]); + nulls[Anum_pg_dist_background_tasks_depend_depends_on - 1] = false; HeapTuple newTuple = heap_form_tuple( - RelationGetDescr(pgDistRebalanceJobsDepend), values, nulls); - CatalogTupleInsert(pgDistRebalanceJobsDepend, newTuple); + RelationGetDescr(pgDistbackgroundTasksDepend), values, nulls); + CatalogTupleInsert(pgDistbackgroundTasksDepend, newTuple); } } - if (pgDistRebalanceJobsDepend) + if (pgDistbackgroundTasksDepend) { - table_close(pgDistRebalanceJobsDepend, NoLock); + table_close(pgDistbackgroundTasksDepend, NoLock); } - table_close(pgDistRebalanceJobs, NoLock); + table_close(pgDistBackgroundTasks, NoLock); - return job; + return task; } @@ -2462,28 +2461,29 @@ ResetRunningBackgroundTasks(void) ScanKeyData scanKey[1]; const bool indexOK = true; - Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(), - AccessShareLock); + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTasksRelationId(), AccessShareLock); /* pg_dist_rebalance_jobs.status == 'running' */ ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_status, - BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum( - CitusTaskStatusRunningId())); + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(CitusTaskStatusRunningId())); - SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, - DistBackgroundTasksStatusTaskIdIndexId(), - indexOK, NULL, scanKeyCount, - scanKey); + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasks, + DistBackgroundTasksStatusTaskIdIndexId(), + indexOK, NULL, scanKeyCount, + scanKey); - HeapTuple jobTuple = NULL; - while (HeapTupleIsValid(jobTuple = systable_getnext(scanDescriptor))) + HeapTuple taskTuple = NULL; + while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) { Datum values[Natts_pg_dist_background_tasks] = { 0 }; bool isnull[Natts_pg_dist_background_tasks] = { 0 }; bool replace[Natts_pg_dist_background_tasks] = { 0 }; - TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); - heap_deform_tuple(jobTuple, tupleDescriptor, values, isnull); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); + heap_deform_tuple(taskTuple, tupleDescriptor, values, isnull); values[Anum_pg_dist_background_tasks_status - 1] = ObjectIdGetDatum(CitusTaskStatusScheduledId()); @@ -2494,48 +2494,113 @@ ResetRunningBackgroundTasks(void) isnull[Anum_pg_dist_background_tasks_pid - 1] = true; replace[Anum_pg_dist_background_tasks_pid - 1] = true; - jobTuple = heap_modify_tuple(jobTuple, tupleDescriptor, values, isnull, replace); + taskTuple = heap_modify_tuple(taskTuple, tupleDescriptor, values, isnull, + replace); - CatalogTupleUpdate(pgDistRebalanceJobs, &jobTuple->t_self, jobTuple); + CatalogTupleUpdate(pgDistBackgroundTasks, &taskTuple->t_self, taskTuple); } CommandCounterIncrement(); systable_endscan(scanDescriptor); - table_close(pgDistRebalanceJobs, AccessShareLock); + table_close(pgDistBackgroundTasks, AccessShareLock); +} + + +static void +DeepFreeBackgroundTask(BackgroundTask *task) +{ + if (task->pid) + { + pfree(task->pid); + } + + if (task->command) + { + pfree(task->command); + } + + if (task->retry_count) + { + pfree(task->retry_count); + } + + if (task->message) + { + pfree(task->message); + } + + pfree(task); +} + + +static BackgroundTask * +DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple) +{ + Datum values[Natts_pg_dist_background_tasks] = { 0 }; + bool nulls[Natts_pg_dist_background_tasks] = { 0 }; + heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls); + + BackgroundTask *task = palloc0(sizeof(BackgroundTask)); + task->taskid = DatumGetInt64(values[Anum_pg_dist_background_tasks_task_id - 1]); + if (!nulls[Anum_pg_dist_background_tasks_pid - 1]) + { + task->pid = palloc0(sizeof(task->pid)); + *(task->pid) = DatumGetInt32(values[Anum_pg_dist_background_tasks_pid - 1]); + } + task->status = BackgroundTaskStatusByOid( + DatumGetObjectId(values[Anum_pg_dist_background_tasks_status - 1])); + + task->command = text_to_cstring( + DatumGetTextP(values[Anum_pg_dist_background_tasks_command - 1])); + + if (!nulls[Anum_pg_dist_background_tasks_retry_count - 1]) + { + task->retry_count = palloc0(sizeof(task->retry_count)); + *(task->retry_count) = DatumGetInt32( + values[Anum_pg_dist_background_tasks_retry_count - 1]); + } + + if (!nulls[Anum_pg_dist_background_tasks_message - 1]) + { + task->message = pstrdup(DatumGetCString( + values[Anum_pg_dist_background_tasks_message - 1])); + } + + return task; } bool -JobHasUmnetDependencies(int64 jobid) +BackgroundTaskHasUmnetDependencies(int64 taskId) { bool hasUnmetDependency = false; - Relation pgDistRebalanceJobsDepend = table_open( - DistBackgroundTaskssDependRelationId(), - AccessShareLock); + Relation pgDistBackgroundTasksDepend = + table_open(DistBackgroundTasksDependRelationId(), AccessShareLock); const int scanKeyCount = 1; ScanKeyData scanKey[1] = { 0 }; bool indexOK = true; - /* pg_catalog.pg_dist_rebalance_jobs_depend.jobid = $jobid */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_depend_jobid, - BTEqualStrategyNumber, F_INT8EQ, jobid); + /* pg_catalog.pg_dist_background_tasks_depend.task_id = $taskId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_depend_task_id, + BTEqualStrategyNumber, F_INT8EQ, taskId); - SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobsDepend, - DistBackgroundTasksDependTaskIdIndexId(), - indexOK, NULL, scanKeyCount, - scanKey); + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasksDepend, + DistBackgroundTasksDependTaskIdIndexId(), + indexOK, NULL, scanKeyCount, + scanKey); HeapTuple dependTuple = NULL; while (HeapTupleIsValid(dependTuple = systable_getnext(scanDescriptor))) { - Form_pg_dist_rebalance_jobs_depend depends = - (Form_pg_dist_rebalance_jobs_depend) GETSTRUCT(dependTuple); + Form_pg_dist_background_tasks_depend depends = + (Form_pg_dist_background_tasks_depend) GETSTRUCT(dependTuple); - RebalanceJob *dependingJob = GetScheduledRebalanceJobByJobID(depends->depends_on); + BackgroundTask *dependingJob = GetBackgroundTaskByTaskId(depends->depends_on); /* * Only when the status of all depending jobs is done we clear this job and say @@ -2543,6 +2608,7 @@ JobHasUmnetDependencies(int64 jobid) */ if (dependingJob->status == BACKGROUND_TASK_STATUS_DONE) { + DeepFreeBackgroundTask(dependingJob); continue; } @@ -2553,147 +2619,130 @@ JobHasUmnetDependencies(int64 jobid) */ Assert(dependingJob->status != BACKGROUND_TASK_STATUS_ERROR); + DeepFreeBackgroundTask(dependingJob); hasUnmetDependency = true; break; } systable_endscan(scanDescriptor); - table_close(pgDistRebalanceJobsDepend, AccessShareLock); + table_close(pgDistBackgroundTasksDepend, AccessShareLock); return hasUnmetDependency; } -RebalanceJob * -GetRunableRebalanceJob(void) +BackgroundTask * +GetRunnableBackgroundTask(void) { - const int scanKeyCount = 1; - ScanKeyData scanKey[1]; - bool indexOK = true; + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTasksRelationId(), AccessShareLock); - Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(), - AccessShareLock); - - BackgroundTaskStatus jobStatus[] = { + BackgroundTaskStatus taskStatus[] = { BACKGROUND_TASK_STATUS_SCHEDULED }; - RebalanceJob *job = NULL; - for (int i = 0; !job && i < sizeof(jobStatus) / sizeof(jobStatus[0]); i++) + BackgroundTask *task = NULL; + for (int i = 0; !task && i < sizeof(taskStatus) / sizeof(taskStatus[0]); i++) { - /* pg_dist_rebalance_jobs.status == jobStatus[i] */ + const int scanKeyCount = 1; + ScanKeyData scanKey[1] = { 0 }; + const bool indexOK = true; + + /* pg_dist_background_tasks.status == taskStatus[i] */ ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_status, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum( - RebalanceJobStatusOid(jobStatus[i]))); + CitusTaskStatusOid(taskStatus[i]))); - SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, - DistBackgroundTasksStatusTaskIdIndexId(), - indexOK, NULL, scanKeyCount, - scanKey); + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasks, + DistBackgroundTasksStatusTaskIdIndexId(), + indexOK, NULL, scanKeyCount, + scanKey); - HeapTuple jobTuple = NULL; - while (HeapTupleIsValid(jobTuple = systable_getnext(scanDescriptor))) + HeapTuple taskTuple = NULL; + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); + while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) { - Datum datumArray[Natts_pg_dist_background_tasks]; - bool isNullArray[Natts_pg_dist_background_tasks]; - TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); - heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray); - - int64 jobid = DatumGetInt64( - datumArray[Anum_pg_dist_background_tasks_task_id - 1]); - if (JobHasUmnetDependencies(jobid)) + task = DeformBackgroundTaskHeapTuple(tupleDescriptor, taskTuple); + if (!BackgroundTaskHasUmnetDependencies(task->taskid)) { - continue; + /* found task, close table and return */ + break; } - job = palloc0(sizeof(RebalanceJob)); - job->jobid = jobid; - job->status = RebalanceJobStatusByOid( - DatumGetObjectId(datumArray[Anum_pg_dist_background_tasks_status - 1])); - - job->command = text_to_cstring( - DatumGetTextP(datumArray[Anum_pg_dist_background_tasks_command - 1])); - - break; + DeepFreeBackgroundTask(task); + task = NULL; } systable_endscan(scanDescriptor); } - table_close(pgDistRebalanceJobs, AccessShareLock); + table_close(pgDistBackgroundTasks, NoLock); - return job; + return task; } -RebalanceJob * -GetScheduledRebalanceJobByJobID(int64 jobId) +BackgroundTask * +GetBackgroundTaskByTaskId(int64 taskId) { const int scanKeyCount = 1; ScanKeyData scanKey[1]; bool indexOK = true; - Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(), - AccessShareLock); + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTasksRelationId(), AccessShareLock); - /* pg_dist_rebalance_jobs.jobid == $jobId */ + /* pg_dist_background_tasks.task_id == $taskId */ ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId)); + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId)); - SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, - DistBackgroundTasksStatusTaskIdIndexId(), - indexOK, NULL, scanKeyCount, scanKey); + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasks, + DistBackgroundTasksStatusTaskIdIndexId(), + indexOK, NULL, scanKeyCount, scanKey); - HeapTuple jobTuple = systable_getnext(scanDescriptor); - RebalanceJob *job = NULL; - if (HeapTupleIsValid(jobTuple)) + HeapTuple taskTuple = systable_getnext(scanDescriptor); + BackgroundTask *task = NULL; + if (HeapTupleIsValid(taskTuple)) { - Datum datumArray[Natts_pg_dist_background_tasks]; - bool isNullArray[Natts_pg_dist_background_tasks]; - TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); - heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray); - - job = palloc0(sizeof(RebalanceJob)); - job->jobid = DatumGetInt64(datumArray[Anum_pg_dist_background_tasks_task_id - 1]); - job->status = RebalanceJobStatusByOid( - DatumGetObjectId(datumArray[Anum_pg_dist_background_tasks_status - 1])); - job->command = text_to_cstring( - DatumGetTextP(datumArray[Anum_pg_dist_background_tasks_command - 1])); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); + task = DeformBackgroundTaskHeapTuple(tupleDescriptor, taskTuple); } systable_endscan(scanDescriptor); - table_close(pgDistRebalanceJobs, AccessShareLock); + table_close(pgDistBackgroundTasks, AccessShareLock); - return job; + return task; } void -UpdateJobStatus(int64 jobid, pid_t *pid, BackgroundTaskStatus status, int32 *retry_count, - char *message) +UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status, + const int32 *retry_count, char *message) { - Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(), - RowExclusiveLock); - TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTasksRelationId(), RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); - ScanKeyData scanKey[1]; int scanKeyCount = 1; - - /* WHERE jobid = job->jobid */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid)); - + ScanKeyData scanKey[1] = { 0 }; const bool indexOK = true; - SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, - DistBackgroundTasksTaskIdIndexId(), - indexOK, - NULL, scanKeyCount, scanKey); + + /* WHERE task_id = $taskId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId)); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasks, + DistBackgroundTasksTaskIdIndexId(), + indexOK, NULL, scanKeyCount, scanKey); HeapTuple heapTuple = systable_getnext(scanDescriptor); if (!HeapTupleIsValid(heapTuple)) { - ereport(ERROR, (errmsg("could not find rebalance job entry for jobid: " - UINT64_FORMAT, jobid))); + ereport(ERROR, (errmsg("could not find background task entry for task_id: " + UINT64_FORMAT, taskId))); } Datum values[Natts_pg_dist_background_tasks] = { 0 }; @@ -2703,8 +2752,8 @@ UpdateJobStatus(int64 jobid, pid_t *pid, BackgroundTaskStatus status, int32 *ret heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); #define UPDATE_FIELD(field, newNull, newValue) \ - replace[(field - 1)] = ((newNull != isnull[(field - 1)]) || (values[(field - 1)] != \ - newValue)); \ + replace[(field - 1)] = (((newNull) != isnull[(field - 1)]) \ + || (values[(field - 1)] != (newValue))); \ isnull[(field - 1)] = (newNull); \ values[(field - 1)] = (newValue); @@ -2717,7 +2766,7 @@ UpdateJobStatus(int64 jobid, pid_t *pid, BackgroundTaskStatus status, int32 *ret UPDATE_FIELD(Anum_pg_dist_background_tasks_pid, true, InvalidOid); } - Oid statusOid = ObjectIdGetDatum(RebalanceJobStatusOid(status)); + Oid statusOid = ObjectIdGetDatum(CitusTaskStatusOid(status)); UPDATE_FIELD(Anum_pg_dist_background_tasks_status, false, statusOid); if (retry_count) @@ -2744,81 +2793,82 @@ UpdateJobStatus(int64 jobid, pid_t *pid, BackgroundTaskStatus status, int32 *ret heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); - CatalogTupleUpdate(pgDistRebalanceJobs, &heapTuple->t_self, heapTuple); + CatalogTupleUpdate(pgDistBackgroundTasks, &heapTuple->t_self, heapTuple); CommandCounterIncrement(); systable_endscan(scanDescriptor); - table_close(pgDistRebalanceJobs, NoLock); + table_close(pgDistBackgroundTasks, NoLock); } static List * -GetDependantJobs(int64 jobid) +GetDependantTasks(int64 taskId) { - Relation pgDistRebalanceJobsDepends = table_open( - DistBackgroundTaskssDependRelationId(), - RowExclusiveLock); - const bool indexOK = true; - ScanKeyData scanKey[1]; + Relation pgDistBackgroundTasksDepends = + table_open(DistBackgroundTasksDependRelationId(), RowExclusiveLock); + + ScanKeyData scanKey[1] = { 0 }; int scanKeyCount = 1; + const bool indexOK = true; - /* pg_dist_rebalance_jobs_depend.depends_on = $jobid */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_depend_depends_on, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid)); + /* pg_dist_background_tasks_depend.depends_on = $taskId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_depend_depends_on, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId)); - SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobsDepends, - DistBackgroundTasksDependDependsOnIndexId(), - indexOK, - NULL, scanKeyCount, scanKey); + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundTasksDepends, + DistBackgroundTasksDependDependsOnIndexId(), + indexOK, + NULL, scanKeyCount, scanKey); - List *dependantJobs = NIL; + List *dependantTasks = NIL; HeapTuple heapTuple = NULL; while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) { - Form_pg_dist_rebalance_jobs_depend depend = - (Form_pg_dist_rebalance_jobs_depend) GETSTRUCT(heapTuple); + Form_pg_dist_background_tasks_depend depend = + (Form_pg_dist_background_tasks_depend) GETSTRUCT(heapTuple); - int64 *dJobid = palloc0(sizeof(int64)); - *dJobid = depend->jobid; + int64 *dTaskId = palloc0(sizeof(int64)); + *dTaskId = depend->task_id; - dependantJobs = lappend(dependantJobs, dJobid); + dependantTasks = lappend(dependantTasks, dTaskId); } systable_endscan(scanDescriptor); - table_close(pgDistRebalanceJobsDepends, NoLock); + table_close(pgDistBackgroundTasksDepends, NoLock); - return dependantJobs; + return dependantTasks; } void -UnscheduleDependantJobs(int64 jobid) +UnscheduleDependantTasks(int64 taskId) { - Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(), - RowExclusiveLock); - TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); + Relation pgDistBackgroundTasks = + table_open(DistBackgroundTasksRelationId(), RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); - List *dependantJobs = GetDependantJobs(jobid); - while (list_length(dependantJobs) > 0) + List *dependantTasks = GetDependantTasks(taskId); + while (list_length(dependantTasks) > 0) { /* pop last item from stack */ - int64 cJobid = *(int64 *) llast(dependantJobs); - dependantJobs = list_delete_last(dependantJobs); + int64 cTaskId = *(int64 *) llast(dependantTasks); + dependantTasks = list_delete_last(dependantTasks); - /* push new dependant jobs on to stack */ - dependantJobs = list_concat(dependantJobs, GetDependantJobs(cJobid)); + /* push new dependant tasks on to stack */ + dependantTasks = list_concat(dependantTasks, GetDependantTasks(cTaskId)); - /* unschedule current job */ + /* unschedule current task */ { ScanKeyData scanKey[1] = { 0 }; int scanKeyCount = 1; - /* WHERE jobid = job->jobid */ + /* WHERE taskId = job->taskId */ ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(cJobid)); + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(cTaskId)); const bool indexOK = true; - SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, + SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks, DistBackgroundTasksTaskIdIndexId(), indexOK, NULL, scanKeyCount, scanKey); @@ -2826,8 +2876,8 @@ UnscheduleDependantJobs(int64 jobid) HeapTuple heapTuple = systable_getnext(scanDescriptor); if (!HeapTupleIsValid(heapTuple)) { - ereport(ERROR, (errmsg("could not find rebalance job entry for jobid: " - UINT64_FORMAT, cJobid))); + ereport(ERROR, (errmsg("could not find background task entry for " + "task_id: " UINT64_FORMAT, cTaskId))); } Datum values[Natts_pg_dist_background_tasks] = { 0 }; @@ -2841,11 +2891,11 @@ UnscheduleDependantJobs(int64 jobid) heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); - CatalogTupleUpdate(pgDistRebalanceJobs, &heapTuple->t_self, heapTuple); + CatalogTupleUpdate(pgDistBackgroundTasks, &heapTuple->t_self, heapTuple); systable_endscan(scanDescriptor); } } - table_close(pgDistRebalanceJobs, NoLock); + table_close(pgDistBackgroundTasks, NoLock); } diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 0d2465a35..65611a5b5 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -1579,9 +1579,9 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) move->targetNode->workerPort, quote_literal_cstr(shardTranferModeLabel)); - RebalanceJob *job = ScheduleBackgrounRebalanceJob(buf.data, first ? 0 : 1, - &prevJobId); - prevJobId = job->jobid; + BackgroundTask *job = ScheduleBackgroundTask(buf.data, first ? 0 : 1, + &prevJobId); + prevJobId = job->taskid; first = false; } diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index de788460b..4f7a76783 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -148,7 +148,7 @@ CitusBackgroundTaskMonitorMain(Datum arg) * later due to the committing and starting of new transactions */ MemoryContext oldContext = MemoryContextSwitchTo(perTaskContext); - RebalanceJob *job = GetRunableRebalanceJob(); + BackgroundTask *job = GetRunnableBackgroundTask(); MemoryContextSwitchTo(oldContext); if (!job) @@ -178,13 +178,13 @@ CitusBackgroundTaskMonitorMain(Datum arg) pid_t pid = 0; GetBackgroundWorkerPid(handle, &pid); - ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid))); + ereport(LOG, (errmsg("found job with jobid: %ld", job->taskid))); StartTransactionCommand(); PushActiveSnapshot(GetTransactionSnapshot()); /* Update job status to indicate it is running */ - UpdateJobStatus(job->jobid, &pid, BACKGROUND_TASK_STATUS_RUNNING, NULL, NULL); + UpdateJobStatus(job->taskid, &pid, BACKGROUND_TASK_STATUS_RUNNING, NULL, NULL); PopActiveSnapshot(); CommitTransactionCommand(); @@ -214,7 +214,7 @@ CitusBackgroundTaskMonitorMain(Datum arg) PushActiveSnapshot(GetTransactionSnapshot()); /* TODO job can actually also have failed*/ - UpdateJobStatus(job->jobid, NULL, BACKGROUND_TASK_STATUS_DONE, NULL, NULL); + UpdateJobStatus(job->taskid, NULL, BACKGROUND_TASK_STATUS_DONE, NULL, NULL); PopActiveSnapshot(); CommitTransactionCommand(); diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index c673dd16f..109442061 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -718,7 +718,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) { /* perform catalog precheck */ - shouldStartRebalanceJobsBackgroundWorker = HasScheduledRebalanceJobs(); + shouldStartRebalanceJobsBackgroundWorker = HasScheduledBackgroundTask(); } CommitTransactionCommand(); diff --git a/src/backend/distributed/utils/rebalance_job.c b/src/backend/distributed/utils/rebalance_job.c index e257f4852..52b5a3cd8 100644 --- a/src/backend/distributed/utils/rebalance_job.c +++ b/src/backend/distributed/utils/rebalance_job.c @@ -25,13 +25,13 @@ citus_wait_for_rebalance_job(PG_FUNCTION_ARGS) { CHECK_FOR_INTERRUPTS(); - RebalanceJob *job = GetScheduledRebalanceJobByJobID(jobid); + BackgroundTask *job = GetBackgroundTaskByTaskId(jobid); if (!job) { ereport(ERROR, (errmsg("unkown job with jobid: %ld", jobid))); } - if (IsRebalanceJobStatusTerminal(job->status)) + if (IsCitusTaskStatusTerminal(job->status)) { PG_RETURN_VOID(); } diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 6c3019cd4..60927b81e 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -239,7 +239,7 @@ extern Oid DistPartitionLogicalRelidIndexId(void); extern Oid DistPartitionColocationidIndexId(void); extern Oid DistBackgroundTasksTaskIdIndexId(void); extern Oid DistBackgroundTasksStatusTaskIdIndexId(void); -extern Oid DistBackgroundTaskssDependRelationId(void); +extern Oid DistBackgroundTasksDependRelationId(void); extern Oid DistBackgroundTasksDependTaskIdIndexId(void); extern Oid DistBackgroundTasksDependDependsOnIndexId(void); extern Oid DistShardLogicalRelidIndexId(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index fb171419d..1265f5a84 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -215,12 +215,15 @@ typedef enum BackgroundTaskStatus } BackgroundTaskStatus; -typedef struct RebalanceJob +typedef struct BackgroundTask { - int64 jobid; + int64 taskid; + int32 *pid; BackgroundTaskStatus status; char *command; -} RebalanceJob; + int32 *retry_count; + char *message; +} BackgroundTask; /* Size functions */ @@ -330,18 +333,18 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid ownerRelationId); extern void AlterSequenceType(Oid seqOid, Oid typeOid); extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId); -extern bool HasScheduledRebalanceJobs(void); +extern bool HasScheduledBackgroundTask(void); extern int64 GetNextBackgroundTaskTaskId(void); -extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount, - int64 dependingJobIds[]); -extern bool JobHasUmnetDependencies(int64 jobid); -extern RebalanceJob * GetRunableRebalanceJob(void); +extern BackgroundTask * ScheduleBackgroundTask(char *command, int dependingTaskCount, + int64 dependingTaskIds[]); +extern bool BackgroundTaskHasUmnetDependencies(int64 taskId); +extern BackgroundTask * GetRunnableBackgroundTask(void); extern void ResetRunningBackgroundTasks(void); -extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId); -extern void UpdateJobStatus(int64 jobid, pid_t *pid, BackgroundTaskStatus status, - int32 *retry_count, char *message); -extern bool UpdateJobError(RebalanceJob *job, ErrorData *edata); -extern void UnscheduleDependantJobs(int64 jobid); -extern bool IsRebalanceJobStatusTerminal(BackgroundTaskStatus status); -extern Oid RebalanceJobStatusOid(BackgroundTaskStatus status); +extern BackgroundTask * GetBackgroundTaskByTaskId(int64 taskId); +extern void UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status, + const int32 *retry_count, char *message); +extern bool UpdateJobError(BackgroundTask *job, ErrorData *edata); +extern void UnscheduleDependantTasks(int64 taskId); +extern bool IsCitusTaskStatusTerminal(BackgroundTaskStatus status); +extern Oid CitusTaskStatusOid(BackgroundTaskStatus status); #endif /* METADATA_UTILITY_H */ diff --git a/src/include/distributed/pg_dist_backrgound_tasks_depend.h b/src/include/distributed/pg_dist_backrgound_tasks_depend.h new file mode 100644 index 000000000..8ed7925b9 --- /dev/null +++ b/src/include/distributed/pg_dist_backrgound_tasks_depend.h @@ -0,0 +1,23 @@ + +#ifndef CITUS_PG_DIST_BACKGROUND_TASKS_DEPEND_H +#define CITUS_PG_DIST_BACKGROUND_TASKS_DEPEND_H + +typedef struct FormData_pg_dist_background_tasks_depend +{ + int64 task_id; + int64 depends_on; +#ifdef CATALOG_VARLEN /* variable-length fields start here */ +#endif +} FormData_pg_dist_background_tasks_depend; +typedef FormData_pg_dist_background_tasks_depend *Form_pg_dist_background_tasks_depend; + + +/* ---------------- + * compiler constants for pg_dist_background_tasks_depend + * ---------------- + */ +#define Natts_pg_dist_background_tasks_depend 2 +#define Anum_pg_dist_background_tasks_depend_task_id 1 +#define Anum_pg_dist_background_tasks_depend_depends_on 2 + +#endif /* CITUS_PG_DIST_BACKGROUND_TASKS_DEPEND_H */ diff --git a/src/include/distributed/pg_dist_rebalance_jobs_depend.h b/src/include/distributed/pg_dist_rebalance_jobs_depend.h deleted file mode 100644 index bf3208d12..000000000 --- a/src/include/distributed/pg_dist_rebalance_jobs_depend.h +++ /dev/null @@ -1,23 +0,0 @@ - -#ifndef CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H -#define CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H - -typedef struct FormData_pg_dist_rebalance_jobs_depend -{ - int64 jobid; - int64 depends_on; -#ifdef CATALOG_VARLEN /* variable-length fields start here */ -#endif -} FormData_pg_dist_rebalance_jobs_depend; -typedef FormData_pg_dist_rebalance_jobs_depend *Form_pg_dist_rebalance_jobs_depend; - - -/* ---------------- - * compiler constants for pg_dist_rebalance_jobs_depend - * ---------------- - */ -#define Natts_pg_dist_rebalance_jobs_depend 2 -#define Anum_pg_dist_rebalance_jobs_depend_jobid 1 -#define Anum_pg_dist_rebalance_jobs_depend_depends_on 2 - -#endif /* CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H */