big refactoring

background-job-details
Nils Dijk 2022-08-09 17:53:45 +02:00 committed by Jelte Fennema
parent 88ea7508f9
commit c09d0feb85
10 changed files with 326 additions and 273 deletions

View File

@ -144,7 +144,7 @@ typedef struct MetadataCacheData
Oid distBackgroundTasksRelationId;
Oid distBackgroundTasksTaskIdIndexId;
Oid distBackgroundTasksStatusTaskIdIndexId;
Oid distBackgroundTaskssDependRelationId;
Oid distBackgroundTasksDependRelationId;
Oid distBackgroundTasksDependTaskIdIndexId;
Oid distBackgroundTasksDependDependsOnIndexId;
Oid citusTaskStatusScheduledId;
@ -2406,12 +2406,12 @@ DistBackgroundTasksStatusTaskIdIndexId(void)
Oid
DistBackgroundTaskssDependRelationId(void)
DistBackgroundTasksDependRelationId(void)
{
CachedRelationLookup("pg_dist_background_tasks_depend",
&MetadataCache.distBackgroundTaskssDependRelationId);
&MetadataCache.distBackgroundTasksDependRelationId);
return MetadataCache.distBackgroundTaskssDependRelationId;
return MetadataCache.distBackgroundTasksDependRelationId;
}

View File

@ -46,7 +46,7 @@
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_background_tasks.h"
#include "distributed/pg_dist_rebalance_jobs_depend.h"
#include "distributed/pg_dist_backrgound_tasks_depend.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_placement.h"
#include "distributed/reference_table_utils.h"
@ -2222,52 +2222,52 @@ IsForeignTable(Oid relationId)
bool
HasScheduledRebalanceJobs()
HasScheduledBackgroundTask()
{
const int scanKeyCount = 1;
ScanKeyData scanKey[1];
bool indexOK = true;
Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(),
AccessShareLock);
Relation pgDistBackgroundTasks =
table_open(DistBackgroundTasksRelationId(), AccessShareLock);
/* find any job in states listed here */
BackgroundTaskStatus jobs[] = {
BackgroundTaskStatus taskStatus[] = {
BACKGROUND_TASK_STATUS_RUNNING,
BACKGROUND_TASK_STATUS_SCHEDULED
};
bool hasScheduledJob = false;
for (int i = 0; !hasScheduledJob && i < sizeof(jobs) / sizeof(jobs[0]); i++)
bool hasScheduledTask = false;
for (int i = 0; !hasScheduledTask && i < lengthof(taskStatus); i++)
{
/* pg_dist_rebalance_jobs.status == jobs[i] */
const int scanKeyCount = 1;
ScanKeyData scanKey[1] = { 0 };
const bool indexOK = true;
/* pg_dist_background_tasks.status == taskStatus[i] */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_status,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RebalanceJobStatusOid(jobs[i])));
ObjectIdGetDatum(CitusTaskStatusOid(taskStatus[i])));
SysScanDesc scanDescriptor = systable_beginscan(
pgDistRebalanceJobs,
DistBackgroundTasksStatusTaskIdIndexId(),
indexOK, NULL, scanKeyCount,
scanKey);
SysScanDesc scanDescriptor =
systable_beginscan(pgDistBackgroundTasks,
DistBackgroundTasksStatusTaskIdIndexId(),
indexOK, NULL, scanKeyCount,
scanKey);
HeapTuple jobTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(jobTuple))
HeapTuple taskTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(taskTuple))
{
hasScheduledJob = true;
hasScheduledTask = true;
}
systable_endscan(scanDescriptor);
}
table_close(pgDistRebalanceJobs, AccessShareLock);
table_close(pgDistBackgroundTasks, NoLock);
return hasScheduledJob;
return hasScheduledTask;
}
static BackgroundTaskStatus
RebalanceJobStatusByOid(Oid enumOid)
BackgroundTaskStatusByOid(Oid enumOid)
{
if (enumOid == CitusTaskStatusDoneId())
{
@ -2295,7 +2295,7 @@ RebalanceJobStatusByOid(Oid enumOid)
bool
IsRebalanceJobStatusTerminal(BackgroundTaskStatus status)
IsCitusTaskStatusTerminal(BackgroundTaskStatus status)
{
switch (status)
{
@ -2315,7 +2315,7 @@ IsRebalanceJobStatusTerminal(BackgroundTaskStatus status)
Oid
RebalanceJobStatusOid(BackgroundTaskStatus status)
CitusTaskStatusOid(BackgroundTaskStatus status)
{
switch (status)
{
@ -2365,43 +2365,43 @@ GetNextBackgroundTaskTaskId(void)
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique colocation id from sequence */
Datum jobIdOid = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
Datum taskIdOid = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
uint64 jobId = DatumGetInt64(jobIdOid);
uint64 taskId = DatumGetInt64(taskIdOid);
return jobId;
return taskId;
}
RebalanceJob *
ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount,
int64 dependingJobIds[])
BackgroundTask *
ScheduleBackgroundTask(char *command, int dependingTaskCount,
int64 dependingTaskIds[])
{
RebalanceJob *job = NULL;
BackgroundTask *task = NULL;
Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(),
RowExclusiveLock);
Relation pgDistRebalanceJobsDepend = NULL;
if (dependingJobCount > 0)
Relation pgDistBackgroundTasks =
table_open(DistBackgroundTasksRelationId(), RowExclusiveLock);
Relation pgDistbackgroundTasksDepend = NULL;
if (dependingTaskCount > 0)
{
pgDistRebalanceJobsDepend = table_open(DistBackgroundTaskssDependRelationId(),
RowExclusiveLock);
pgDistbackgroundTasksDepend =
table_open(DistBackgroundTasksDependRelationId(), RowExclusiveLock);
}
/* 1. TODO verify depending jobs exist and lock them */
/* 2. insert new job */
/* 2. insert new task */
{
Datum values[Natts_pg_dist_background_tasks] = { 0 };
bool nulls[Natts_pg_dist_background_tasks] = { 0 };
memset(nulls, true, sizeof(nulls));
int64 jobid = GetNextBackgroundTaskTaskId();
int64 taskId = GetNextBackgroundTaskTaskId();
values[Anum_pg_dist_background_tasks_task_id - 1] = Int64GetDatum(jobid);
values[Anum_pg_dist_background_tasks_task_id - 1] = Int64GetDatum(taskId);
nulls[Anum_pg_dist_background_tasks_task_id - 1] = false;
values[Anum_pg_dist_background_tasks_status - 1] = CitusTaskStatusScheduledId();
@ -2410,48 +2410,47 @@ ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount,
values[Anum_pg_dist_background_tasks_command - 1] = CStringGetTextDatum(command);
nulls[Anum_pg_dist_background_tasks_command - 1] = false;
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistRebalanceJobs),
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTasks),
values, nulls);
CatalogTupleInsert(pgDistRebalanceJobs, newTuple);
CatalogTupleInsert(pgDistBackgroundTasks, newTuple);
job = palloc0(sizeof(RebalanceJob));
job->jobid = jobid;
job->status = BACKGROUND_TASK_STATUS_SCHEDULED;
job->command = pstrdup(command);
task = palloc0(sizeof(BackgroundTask));
task->taskid = taskId;
task->status = BACKGROUND_TASK_STATUS_SCHEDULED;
task->command = pstrdup(command);
}
/* 3. insert dependencies into catalog */
{
for (int i = 0; i < dependingJobCount; i++)
for (int i = 0; i < dependingTaskCount; i++)
{
Assert(pgDistRebalanceJobsDepend != NULL);
Assert(pgDistbackgroundTasksDepend != NULL);
Datum values[Natts_pg_dist_rebalance_jobs_depend] = { 0 };
bool nulls[Natts_pg_dist_rebalance_jobs_depend] = { 0 };
Datum values[Natts_pg_dist_background_tasks_depend] = { 0 };
bool nulls[Natts_pg_dist_background_tasks_depend] = { 0 };
memset(nulls, true, sizeof(nulls));
values[Anum_pg_dist_rebalance_jobs_depend_jobid - 1] =
Int64GetDatum(job->jobid);
nulls[Anum_pg_dist_rebalance_jobs_depend_jobid - 1] = false;
values[Anum_pg_dist_background_tasks_depend_task_id - 1] =
Int64GetDatum(task->taskid);
nulls[Anum_pg_dist_background_tasks_depend_task_id - 1] = false;
values[Anum_pg_dist_rebalance_jobs_depend_depends_on - 1] =
Int64GetDatum(dependingJobIds[i]);
nulls[Anum_pg_dist_rebalance_jobs_depend_depends_on - 1] = false;
values[Anum_pg_dist_background_tasks_depend_depends_on - 1] =
Int64GetDatum(dependingTaskIds[i]);
nulls[Anum_pg_dist_background_tasks_depend_depends_on - 1] = false;
HeapTuple newTuple = heap_form_tuple(
RelationGetDescr(pgDistRebalanceJobsDepend), values, nulls);
CatalogTupleInsert(pgDistRebalanceJobsDepend, newTuple);
RelationGetDescr(pgDistbackgroundTasksDepend), values, nulls);
CatalogTupleInsert(pgDistbackgroundTasksDepend, newTuple);
}
}
if (pgDistRebalanceJobsDepend)
if (pgDistbackgroundTasksDepend)
{
table_close(pgDistRebalanceJobsDepend, NoLock);
table_close(pgDistbackgroundTasksDepend, NoLock);
}
table_close(pgDistRebalanceJobs, NoLock);
table_close(pgDistBackgroundTasks, NoLock);
return job;
return task;
}
@ -2462,28 +2461,29 @@ ResetRunningBackgroundTasks(void)
ScanKeyData scanKey[1];
const bool indexOK = true;
Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(),
AccessShareLock);
Relation pgDistBackgroundTasks =
table_open(DistBackgroundTasksRelationId(), AccessShareLock);
/* pg_dist_rebalance_jobs.status == 'running' */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_status,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(
CitusTaskStatusRunningId()));
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(CitusTaskStatusRunningId()));
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistBackgroundTasksStatusTaskIdIndexId(),
indexOK, NULL, scanKeyCount,
scanKey);
SysScanDesc scanDescriptor =
systable_beginscan(pgDistBackgroundTasks,
DistBackgroundTasksStatusTaskIdIndexId(),
indexOK, NULL, scanKeyCount,
scanKey);
HeapTuple jobTuple = NULL;
while (HeapTupleIsValid(jobTuple = systable_getnext(scanDescriptor)))
HeapTuple taskTuple = NULL;
while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor)))
{
Datum values[Natts_pg_dist_background_tasks] = { 0 };
bool isnull[Natts_pg_dist_background_tasks] = { 0 };
bool replace[Natts_pg_dist_background_tasks] = { 0 };
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
heap_deform_tuple(jobTuple, tupleDescriptor, values, isnull);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks);
heap_deform_tuple(taskTuple, tupleDescriptor, values, isnull);
values[Anum_pg_dist_background_tasks_status - 1] =
ObjectIdGetDatum(CitusTaskStatusScheduledId());
@ -2494,48 +2494,113 @@ ResetRunningBackgroundTasks(void)
isnull[Anum_pg_dist_background_tasks_pid - 1] = true;
replace[Anum_pg_dist_background_tasks_pid - 1] = true;
jobTuple = heap_modify_tuple(jobTuple, tupleDescriptor, values, isnull, replace);
taskTuple = heap_modify_tuple(taskTuple, tupleDescriptor, values, isnull,
replace);
CatalogTupleUpdate(pgDistRebalanceJobs, &jobTuple->t_self, jobTuple);
CatalogTupleUpdate(pgDistBackgroundTasks, &taskTuple->t_self, taskTuple);
}
CommandCounterIncrement();
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobs, AccessShareLock);
table_close(pgDistBackgroundTasks, AccessShareLock);
}
static void
DeepFreeBackgroundTask(BackgroundTask *task)
{
if (task->pid)
{
pfree(task->pid);
}
if (task->command)
{
pfree(task->command);
}
if (task->retry_count)
{
pfree(task->retry_count);
}
if (task->message)
{
pfree(task->message);
}
pfree(task);
}
static BackgroundTask *
DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple)
{
Datum values[Natts_pg_dist_background_tasks] = { 0 };
bool nulls[Natts_pg_dist_background_tasks] = { 0 };
heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls);
BackgroundTask *task = palloc0(sizeof(BackgroundTask));
task->taskid = DatumGetInt64(values[Anum_pg_dist_background_tasks_task_id - 1]);
if (!nulls[Anum_pg_dist_background_tasks_pid - 1])
{
task->pid = palloc0(sizeof(task->pid));
*(task->pid) = DatumGetInt32(values[Anum_pg_dist_background_tasks_pid - 1]);
}
task->status = BackgroundTaskStatusByOid(
DatumGetObjectId(values[Anum_pg_dist_background_tasks_status - 1]));
task->command = text_to_cstring(
DatumGetTextP(values[Anum_pg_dist_background_tasks_command - 1]));
if (!nulls[Anum_pg_dist_background_tasks_retry_count - 1])
{
task->retry_count = palloc0(sizeof(task->retry_count));
*(task->retry_count) = DatumGetInt32(
values[Anum_pg_dist_background_tasks_retry_count - 1]);
}
if (!nulls[Anum_pg_dist_background_tasks_message - 1])
{
task->message = pstrdup(DatumGetCString(
values[Anum_pg_dist_background_tasks_message - 1]));
}
return task;
}
bool
JobHasUmnetDependencies(int64 jobid)
BackgroundTaskHasUmnetDependencies(int64 taskId)
{
bool hasUnmetDependency = false;
Relation pgDistRebalanceJobsDepend = table_open(
DistBackgroundTaskssDependRelationId(),
AccessShareLock);
Relation pgDistBackgroundTasksDepend =
table_open(DistBackgroundTasksDependRelationId(), AccessShareLock);
const int scanKeyCount = 1;
ScanKeyData scanKey[1] = { 0 };
bool indexOK = true;
/* pg_catalog.pg_dist_rebalance_jobs_depend.jobid = $jobid */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_depend_jobid,
BTEqualStrategyNumber, F_INT8EQ, jobid);
/* pg_catalog.pg_dist_background_tasks_depend.task_id = $taskId */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_depend_task_id,
BTEqualStrategyNumber, F_INT8EQ, taskId);
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobsDepend,
DistBackgroundTasksDependTaskIdIndexId(),
indexOK, NULL, scanKeyCount,
scanKey);
SysScanDesc scanDescriptor =
systable_beginscan(pgDistBackgroundTasksDepend,
DistBackgroundTasksDependTaskIdIndexId(),
indexOK, NULL, scanKeyCount,
scanKey);
HeapTuple dependTuple = NULL;
while (HeapTupleIsValid(dependTuple = systable_getnext(scanDescriptor)))
{
Form_pg_dist_rebalance_jobs_depend depends =
(Form_pg_dist_rebalance_jobs_depend) GETSTRUCT(dependTuple);
Form_pg_dist_background_tasks_depend depends =
(Form_pg_dist_background_tasks_depend) GETSTRUCT(dependTuple);
RebalanceJob *dependingJob = GetScheduledRebalanceJobByJobID(depends->depends_on);
BackgroundTask *dependingJob = GetBackgroundTaskByTaskId(depends->depends_on);
/*
* Only when the status of all depending jobs is done we clear this job and say
@ -2543,6 +2608,7 @@ JobHasUmnetDependencies(int64 jobid)
*/
if (dependingJob->status == BACKGROUND_TASK_STATUS_DONE)
{
DeepFreeBackgroundTask(dependingJob);
continue;
}
@ -2553,147 +2619,130 @@ JobHasUmnetDependencies(int64 jobid)
*/
Assert(dependingJob->status != BACKGROUND_TASK_STATUS_ERROR);
DeepFreeBackgroundTask(dependingJob);
hasUnmetDependency = true;
break;
}
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobsDepend, AccessShareLock);
table_close(pgDistBackgroundTasksDepend, AccessShareLock);
return hasUnmetDependency;
}
RebalanceJob *
GetRunableRebalanceJob(void)
BackgroundTask *
GetRunnableBackgroundTask(void)
{
const int scanKeyCount = 1;
ScanKeyData scanKey[1];
bool indexOK = true;
Relation pgDistBackgroundTasks =
table_open(DistBackgroundTasksRelationId(), AccessShareLock);
Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(),
AccessShareLock);
BackgroundTaskStatus jobStatus[] = {
BackgroundTaskStatus taskStatus[] = {
BACKGROUND_TASK_STATUS_SCHEDULED
};
RebalanceJob *job = NULL;
for (int i = 0; !job && i < sizeof(jobStatus) / sizeof(jobStatus[0]); i++)
BackgroundTask *task = NULL;
for (int i = 0; !task && i < sizeof(taskStatus) / sizeof(taskStatus[0]); i++)
{
/* pg_dist_rebalance_jobs.status == jobStatus[i] */
const int scanKeyCount = 1;
ScanKeyData scanKey[1] = { 0 };
const bool indexOK = true;
/* pg_dist_background_tasks.status == taskStatus[i] */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_status,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(
RebalanceJobStatusOid(jobStatus[i])));
CitusTaskStatusOid(taskStatus[i])));
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistBackgroundTasksStatusTaskIdIndexId(),
indexOK, NULL, scanKeyCount,
scanKey);
SysScanDesc scanDescriptor =
systable_beginscan(pgDistBackgroundTasks,
DistBackgroundTasksStatusTaskIdIndexId(),
indexOK, NULL, scanKeyCount,
scanKey);
HeapTuple jobTuple = NULL;
while (HeapTupleIsValid(jobTuple = systable_getnext(scanDescriptor)))
HeapTuple taskTuple = NULL;
TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks);
while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor)))
{
Datum datumArray[Natts_pg_dist_background_tasks];
bool isNullArray[Natts_pg_dist_background_tasks];
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray);
int64 jobid = DatumGetInt64(
datumArray[Anum_pg_dist_background_tasks_task_id - 1]);
if (JobHasUmnetDependencies(jobid))
task = DeformBackgroundTaskHeapTuple(tupleDescriptor, taskTuple);
if (!BackgroundTaskHasUmnetDependencies(task->taskid))
{
continue;
/* found task, close table and return */
break;
}
job = palloc0(sizeof(RebalanceJob));
job->jobid = jobid;
job->status = RebalanceJobStatusByOid(
DatumGetObjectId(datumArray[Anum_pg_dist_background_tasks_status - 1]));
job->command = text_to_cstring(
DatumGetTextP(datumArray[Anum_pg_dist_background_tasks_command - 1]));
break;
DeepFreeBackgroundTask(task);
task = NULL;
}
systable_endscan(scanDescriptor);
}
table_close(pgDistRebalanceJobs, AccessShareLock);
table_close(pgDistBackgroundTasks, NoLock);
return job;
return task;
}
RebalanceJob *
GetScheduledRebalanceJobByJobID(int64 jobId)
BackgroundTask *
GetBackgroundTaskByTaskId(int64 taskId)
{
const int scanKeyCount = 1;
ScanKeyData scanKey[1];
bool indexOK = true;
Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(),
AccessShareLock);
Relation pgDistBackgroundTasks =
table_open(DistBackgroundTasksRelationId(), AccessShareLock);
/* pg_dist_rebalance_jobs.jobid == $jobId */
/* pg_dist_background_tasks.task_id == $taskId */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId));
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistBackgroundTasksStatusTaskIdIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
SysScanDesc scanDescriptor =
systable_beginscan(pgDistBackgroundTasks,
DistBackgroundTasksStatusTaskIdIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
HeapTuple jobTuple = systable_getnext(scanDescriptor);
RebalanceJob *job = NULL;
if (HeapTupleIsValid(jobTuple))
HeapTuple taskTuple = systable_getnext(scanDescriptor);
BackgroundTask *task = NULL;
if (HeapTupleIsValid(taskTuple))
{
Datum datumArray[Natts_pg_dist_background_tasks];
bool isNullArray[Natts_pg_dist_background_tasks];
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray);
job = palloc0(sizeof(RebalanceJob));
job->jobid = DatumGetInt64(datumArray[Anum_pg_dist_background_tasks_task_id - 1]);
job->status = RebalanceJobStatusByOid(
DatumGetObjectId(datumArray[Anum_pg_dist_background_tasks_status - 1]));
job->command = text_to_cstring(
DatumGetTextP(datumArray[Anum_pg_dist_background_tasks_command - 1]));
TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks);
task = DeformBackgroundTaskHeapTuple(tupleDescriptor, taskTuple);
}
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobs, AccessShareLock);
table_close(pgDistBackgroundTasks, AccessShareLock);
return job;
return task;
}
void
UpdateJobStatus(int64 jobid, pid_t *pid, BackgroundTaskStatus status, int32 *retry_count,
char *message)
UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status,
const int32 *retry_count, char *message)
{
Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(),
RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
Relation pgDistBackgroundTasks =
table_open(DistBackgroundTasksRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks);
ScanKeyData scanKey[1];
int scanKeyCount = 1;
/* WHERE jobid = job->jobid */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid));
ScanKeyData scanKey[1] = { 0 };
const bool indexOK = true;
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistBackgroundTasksTaskIdIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);
/* WHERE task_id = $taskId */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId));
SysScanDesc scanDescriptor =
systable_beginscan(pgDistBackgroundTasks,
DistBackgroundTasksTaskIdIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find rebalance job entry for jobid: "
UINT64_FORMAT, jobid)));
ereport(ERROR, (errmsg("could not find background task entry for task_id: "
UINT64_FORMAT, taskId)));
}
Datum values[Natts_pg_dist_background_tasks] = { 0 };
@ -2703,8 +2752,8 @@ UpdateJobStatus(int64 jobid, pid_t *pid, BackgroundTaskStatus status, int32 *ret
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
#define UPDATE_FIELD(field, newNull, newValue) \
replace[(field - 1)] = ((newNull != isnull[(field - 1)]) || (values[(field - 1)] != \
newValue)); \
replace[(field - 1)] = (((newNull) != isnull[(field - 1)]) \
|| (values[(field - 1)] != (newValue))); \
isnull[(field - 1)] = (newNull); \
values[(field - 1)] = (newValue);
@ -2717,7 +2766,7 @@ UpdateJobStatus(int64 jobid, pid_t *pid, BackgroundTaskStatus status, int32 *ret
UPDATE_FIELD(Anum_pg_dist_background_tasks_pid, true, InvalidOid);
}
Oid statusOid = ObjectIdGetDatum(RebalanceJobStatusOid(status));
Oid statusOid = ObjectIdGetDatum(CitusTaskStatusOid(status));
UPDATE_FIELD(Anum_pg_dist_background_tasks_status, false, statusOid);
if (retry_count)
@ -2744,81 +2793,82 @@ UpdateJobStatus(int64 jobid, pid_t *pid, BackgroundTaskStatus status, int32 *ret
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
CatalogTupleUpdate(pgDistRebalanceJobs, &heapTuple->t_self, heapTuple);
CatalogTupleUpdate(pgDistBackgroundTasks, &heapTuple->t_self, heapTuple);
CommandCounterIncrement();
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobs, NoLock);
table_close(pgDistBackgroundTasks, NoLock);
}
static List *
GetDependantJobs(int64 jobid)
GetDependantTasks(int64 taskId)
{
Relation pgDistRebalanceJobsDepends = table_open(
DistBackgroundTaskssDependRelationId(),
RowExclusiveLock);
const bool indexOK = true;
ScanKeyData scanKey[1];
Relation pgDistBackgroundTasksDepends =
table_open(DistBackgroundTasksDependRelationId(), RowExclusiveLock);
ScanKeyData scanKey[1] = { 0 };
int scanKeyCount = 1;
const bool indexOK = true;
/* pg_dist_rebalance_jobs_depend.depends_on = $jobid */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_depend_depends_on,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid));
/* pg_dist_background_tasks_depend.depends_on = $taskId */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_depend_depends_on,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobsDepends,
DistBackgroundTasksDependDependsOnIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);
SysScanDesc scanDescriptor =
systable_beginscan(pgDistBackgroundTasksDepends,
DistBackgroundTasksDependDependsOnIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);
List *dependantJobs = NIL;
List *dependantTasks = NIL;
HeapTuple heapTuple = NULL;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
Form_pg_dist_rebalance_jobs_depend depend =
(Form_pg_dist_rebalance_jobs_depend) GETSTRUCT(heapTuple);
Form_pg_dist_background_tasks_depend depend =
(Form_pg_dist_background_tasks_depend) GETSTRUCT(heapTuple);
int64 *dJobid = palloc0(sizeof(int64));
*dJobid = depend->jobid;
int64 *dTaskId = palloc0(sizeof(int64));
*dTaskId = depend->task_id;
dependantJobs = lappend(dependantJobs, dJobid);
dependantTasks = lappend(dependantTasks, dTaskId);
}
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobsDepends, NoLock);
table_close(pgDistBackgroundTasksDepends, NoLock);
return dependantJobs;
return dependantTasks;
}
void
UnscheduleDependantJobs(int64 jobid)
UnscheduleDependantTasks(int64 taskId)
{
Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(),
RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
Relation pgDistBackgroundTasks =
table_open(DistBackgroundTasksRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks);
List *dependantJobs = GetDependantJobs(jobid);
while (list_length(dependantJobs) > 0)
List *dependantTasks = GetDependantTasks(taskId);
while (list_length(dependantTasks) > 0)
{
/* pop last item from stack */
int64 cJobid = *(int64 *) llast(dependantJobs);
dependantJobs = list_delete_last(dependantJobs);
int64 cTaskId = *(int64 *) llast(dependantTasks);
dependantTasks = list_delete_last(dependantTasks);
/* push new dependant jobs on to stack */
dependantJobs = list_concat(dependantJobs, GetDependantJobs(cJobid));
/* push new dependant tasks on to stack */
dependantTasks = list_concat(dependantTasks, GetDependantTasks(cTaskId));
/* unschedule current job */
/* unschedule current task */
{
ScanKeyData scanKey[1] = { 0 };
int scanKeyCount = 1;
/* WHERE jobid = job->jobid */
/* WHERE taskId = job->taskId */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(cJobid));
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(cTaskId));
const bool indexOK = true;
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks,
DistBackgroundTasksTaskIdIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);
@ -2826,8 +2876,8 @@ UnscheduleDependantJobs(int64 jobid)
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find rebalance job entry for jobid: "
UINT64_FORMAT, cJobid)));
ereport(ERROR, (errmsg("could not find background task entry for "
"task_id: " UINT64_FORMAT, cTaskId)));
}
Datum values[Natts_pg_dist_background_tasks] = { 0 };
@ -2841,11 +2891,11 @@ UnscheduleDependantJobs(int64 jobid)
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull,
replace);
CatalogTupleUpdate(pgDistRebalanceJobs, &heapTuple->t_self, heapTuple);
CatalogTupleUpdate(pgDistBackgroundTasks, &heapTuple->t_self, heapTuple);
systable_endscan(scanDescriptor);
}
}
table_close(pgDistRebalanceJobs, NoLock);
table_close(pgDistBackgroundTasks, NoLock);
}

View File

@ -1579,9 +1579,9 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
move->targetNode->workerPort,
quote_literal_cstr(shardTranferModeLabel));
RebalanceJob *job = ScheduleBackgrounRebalanceJob(buf.data, first ? 0 : 1,
&prevJobId);
prevJobId = job->jobid;
BackgroundTask *job = ScheduleBackgroundTask(buf.data, first ? 0 : 1,
&prevJobId);
prevJobId = job->taskid;
first = false;
}

View File

@ -148,7 +148,7 @@ CitusBackgroundTaskMonitorMain(Datum arg)
* later due to the committing and starting of new transactions
*/
MemoryContext oldContext = MemoryContextSwitchTo(perTaskContext);
RebalanceJob *job = GetRunableRebalanceJob();
BackgroundTask *job = GetRunnableBackgroundTask();
MemoryContextSwitchTo(oldContext);
if (!job)
@ -178,13 +178,13 @@ CitusBackgroundTaskMonitorMain(Datum arg)
pid_t pid = 0;
GetBackgroundWorkerPid(handle, &pid);
ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid)));
ereport(LOG, (errmsg("found job with jobid: %ld", job->taskid)));
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
/* Update job status to indicate it is running */
UpdateJobStatus(job->jobid, &pid, BACKGROUND_TASK_STATUS_RUNNING, NULL, NULL);
UpdateJobStatus(job->taskid, &pid, BACKGROUND_TASK_STATUS_RUNNING, NULL, NULL);
PopActiveSnapshot();
CommitTransactionCommand();
@ -214,7 +214,7 @@ CitusBackgroundTaskMonitorMain(Datum arg)
PushActiveSnapshot(GetTransactionSnapshot());
/* TODO job can actually also have failed*/
UpdateJobStatus(job->jobid, NULL, BACKGROUND_TASK_STATUS_DONE, NULL, NULL);
UpdateJobStatus(job->taskid, NULL, BACKGROUND_TASK_STATUS_DONE, NULL, NULL);
PopActiveSnapshot();
CommitTransactionCommand();

View File

@ -718,7 +718,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
{
/* perform catalog precheck */
shouldStartRebalanceJobsBackgroundWorker = HasScheduledRebalanceJobs();
shouldStartRebalanceJobsBackgroundWorker = HasScheduledBackgroundTask();
}
CommitTransactionCommand();

View File

@ -25,13 +25,13 @@ citus_wait_for_rebalance_job(PG_FUNCTION_ARGS)
{
CHECK_FOR_INTERRUPTS();
RebalanceJob *job = GetScheduledRebalanceJobByJobID(jobid);
BackgroundTask *job = GetBackgroundTaskByTaskId(jobid);
if (!job)
{
ereport(ERROR, (errmsg("unkown job with jobid: %ld", jobid)));
}
if (IsRebalanceJobStatusTerminal(job->status))
if (IsCitusTaskStatusTerminal(job->status))
{
PG_RETURN_VOID();
}

View File

@ -239,7 +239,7 @@ extern Oid DistPartitionLogicalRelidIndexId(void);
extern Oid DistPartitionColocationidIndexId(void);
extern Oid DistBackgroundTasksTaskIdIndexId(void);
extern Oid DistBackgroundTasksStatusTaskIdIndexId(void);
extern Oid DistBackgroundTaskssDependRelationId(void);
extern Oid DistBackgroundTasksDependRelationId(void);
extern Oid DistBackgroundTasksDependTaskIdIndexId(void);
extern Oid DistBackgroundTasksDependDependsOnIndexId(void);
extern Oid DistShardLogicalRelidIndexId(void);

View File

@ -215,12 +215,15 @@ typedef enum BackgroundTaskStatus
} BackgroundTaskStatus;
typedef struct RebalanceJob
typedef struct BackgroundTask
{
int64 jobid;
int64 taskid;
int32 *pid;
BackgroundTaskStatus status;
char *command;
} RebalanceJob;
int32 *retry_count;
char *message;
} BackgroundTask;
/* Size functions */
@ -330,18 +333,18 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid
ownerRelationId);
extern void AlterSequenceType(Oid seqOid, Oid typeOid);
extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId);
extern bool HasScheduledRebalanceJobs(void);
extern bool HasScheduledBackgroundTask(void);
extern int64 GetNextBackgroundTaskTaskId(void);
extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount,
int64 dependingJobIds[]);
extern bool JobHasUmnetDependencies(int64 jobid);
extern RebalanceJob * GetRunableRebalanceJob(void);
extern BackgroundTask * ScheduleBackgroundTask(char *command, int dependingTaskCount,
int64 dependingTaskIds[]);
extern bool BackgroundTaskHasUmnetDependencies(int64 taskId);
extern BackgroundTask * GetRunnableBackgroundTask(void);
extern void ResetRunningBackgroundTasks(void);
extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId);
extern void UpdateJobStatus(int64 jobid, pid_t *pid, BackgroundTaskStatus status,
int32 *retry_count, char *message);
extern bool UpdateJobError(RebalanceJob *job, ErrorData *edata);
extern void UnscheduleDependantJobs(int64 jobid);
extern bool IsRebalanceJobStatusTerminal(BackgroundTaskStatus status);
extern Oid RebalanceJobStatusOid(BackgroundTaskStatus status);
extern BackgroundTask * GetBackgroundTaskByTaskId(int64 taskId);
extern void UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status,
const int32 *retry_count, char *message);
extern bool UpdateJobError(BackgroundTask *job, ErrorData *edata);
extern void UnscheduleDependantTasks(int64 taskId);
extern bool IsCitusTaskStatusTerminal(BackgroundTaskStatus status);
extern Oid CitusTaskStatusOid(BackgroundTaskStatus status);
#endif /* METADATA_UTILITY_H */

View File

@ -0,0 +1,23 @@
#ifndef CITUS_PG_DIST_BACKGROUND_TASKS_DEPEND_H
#define CITUS_PG_DIST_BACKGROUND_TASKS_DEPEND_H
typedef struct FormData_pg_dist_background_tasks_depend
{
int64 task_id;
int64 depends_on;
#ifdef CATALOG_VARLEN /* variable-length fields start here */
#endif
} FormData_pg_dist_background_tasks_depend;
typedef FormData_pg_dist_background_tasks_depend *Form_pg_dist_background_tasks_depend;
/* ----------------
* compiler constants for pg_dist_background_tasks_depend
* ----------------
*/
#define Natts_pg_dist_background_tasks_depend 2
#define Anum_pg_dist_background_tasks_depend_task_id 1
#define Anum_pg_dist_background_tasks_depend_depends_on 2
#endif /* CITUS_PG_DIST_BACKGROUND_TASKS_DEPEND_H */

View File

@ -1,23 +0,0 @@
#ifndef CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H
#define CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H
typedef struct FormData_pg_dist_rebalance_jobs_depend
{
int64 jobid;
int64 depends_on;
#ifdef CATALOG_VARLEN /* variable-length fields start here */
#endif
} FormData_pg_dist_rebalance_jobs_depend;
typedef FormData_pg_dist_rebalance_jobs_depend *Form_pg_dist_rebalance_jobs_depend;
/* ----------------
* compiler constants for pg_dist_rebalance_jobs_depend
* ----------------
*/
#define Natts_pg_dist_rebalance_jobs_depend 2
#define Anum_pg_dist_rebalance_jobs_depend_jobid 1
#define Anum_pg_dist_rebalance_jobs_depend_depends_on 2
#endif /* CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H */