big naming refactor

background-job-details
Nils Dijk 2022-08-09 16:52:22 +02:00 committed by Jelte Fennema
parent 1be6d30588
commit 88ea7508f9
11 changed files with 280 additions and 273 deletions

View File

@ -141,17 +141,17 @@ typedef struct MetadataCacheData
bool extensionLoaded; bool extensionLoaded;
Oid distShardRelationId; Oid distShardRelationId;
Oid distPlacementRelationId; Oid distPlacementRelationId;
Oid distRebalanceJobsRelationId; Oid distBackgroundTasksRelationId;
Oid distRebalanceJobsJobsIndexId; Oid distBackgroundTasksTaskIdIndexId;
Oid distRebalanceJobsStatusJobsIndexId; Oid distBackgroundTasksStatusTaskIdIndexId;
Oid distRebalanceJobsDependRelationId; Oid distBackgroundTaskssDependRelationId;
Oid distRebalanceJobsDependsJobIdIndexId; Oid distBackgroundTasksDependTaskIdIndexId;
Oid distRebalanceJobsDependsDependsOnIndexId; Oid distBackgroundTasksDependDependsOnIndexId;
Oid jobStatusScheduledId; Oid citusTaskStatusScheduledId;
Oid jobStatusRunningId; Oid citusTaskStatusRunningId;
Oid jobStatusDoneId; Oid citusTaskStatusDoneId;
Oid jobStatusErrorId; Oid citusTaskStatusErrorId;
Oid jobStatusSnscheduledId; Oid citusTaskStatusSnscheduledId;
Oid distRebalanceStrategyRelationId; Oid distRebalanceStrategyRelationId;
Oid distNodeRelationId; Oid distNodeRelationId;
Oid distNodeNodeIdIndexId; Oid distNodeNodeIdIndexId;
@ -2376,62 +2376,62 @@ DistLocalGroupIdRelationId(void)
Oid Oid
DistRebalanceJobsRelationId(void) DistBackgroundTasksRelationId(void)
{ {
CachedRelationLookup("pg_dist_rebalance_jobs", CachedRelationLookup("pg_dist_background_tasks",
&MetadataCache.distRebalanceJobsRelationId); &MetadataCache.distBackgroundTasksRelationId);
return MetadataCache.distRebalanceJobsRelationId; return MetadataCache.distBackgroundTasksRelationId;
} }
Oid Oid
DistRebalanceJobsJobsIdIndexId(void) DistBackgroundTasksTaskIdIndexId(void)
{ {
CachedRelationLookup("pg_dist_rebalance_jobs_jobid_index", CachedRelationLookup("pg_dist_background_tasks_task_id_index",
&MetadataCache.distRebalanceJobsJobsIndexId); &MetadataCache.distBackgroundTasksTaskIdIndexId);
return MetadataCache.distRebalanceJobsJobsIndexId; return MetadataCache.distBackgroundTasksTaskIdIndexId;
} }
Oid Oid
DistRebalanceJobsStatusJobsIdIndexId(void) DistBackgroundTasksStatusTaskIdIndexId(void)
{ {
CachedRelationLookup("pg_dist_rebalance_jobs_status_jobid_index", CachedRelationLookup("pg_dist_background_tasks_status_task_id_index",
&MetadataCache.distRebalanceJobsStatusJobsIndexId); &MetadataCache.distBackgroundTasksStatusTaskIdIndexId);
return MetadataCache.distRebalanceJobsStatusJobsIndexId; return MetadataCache.distBackgroundTasksStatusTaskIdIndexId;
} }
Oid Oid
DistRebalanceJobsDependRelationId(void) DistBackgroundTaskssDependRelationId(void)
{ {
CachedRelationLookup("pg_dist_rebalance_jobs_depend", CachedRelationLookup("pg_dist_background_tasks_depend",
&MetadataCache.distRebalanceJobsDependRelationId); &MetadataCache.distBackgroundTaskssDependRelationId);
return MetadataCache.distRebalanceJobsDependRelationId; return MetadataCache.distBackgroundTaskssDependRelationId;
} }
Oid Oid
DistRebalanceJobsDependJobIdIndexId(void) DistBackgroundTasksDependTaskIdIndexId(void)
{ {
CachedRelationLookup("pg_dist_rebalance_jobs_depend_jobid", CachedRelationLookup("pg_dist_background_tasks_depend_task_id",
&MetadataCache.distRebalanceJobsDependsJobIdIndexId); &MetadataCache.distBackgroundTasksDependTaskIdIndexId);
return MetadataCache.distRebalanceJobsDependsJobIdIndexId; return MetadataCache.distBackgroundTasksDependTaskIdIndexId;
} }
Oid Oid
DistRebalanceJobsDependDependsOnIndexId(void) DistBackgroundTasksDependDependsOnIndexId(void)
{ {
CachedRelationLookup("pg_dist_rebalance_jobs_depend_depends_on", CachedRelationLookup("pg_dist_background_tasks_depend_depends_on",
&MetadataCache.distRebalanceJobsDependsDependsOnIndexId); &MetadataCache.distBackgroundTasksDependDependsOnIndexId);
return MetadataCache.distRebalanceJobsDependsDependsOnIndexId; return MetadataCache.distBackgroundTasksDependDependsOnIndexId;
} }
@ -3158,67 +3158,67 @@ SecondaryNodeRoleId(void)
Oid Oid
JobStatusScheduledId(void) CitusTaskStatusScheduledId(void)
{ {
if (!MetadataCache.jobStatusScheduledId) if (!MetadataCache.citusTaskStatusScheduledId)
{ {
MetadataCache.jobStatusScheduledId = MetadataCache.citusTaskStatusScheduledId =
LookupStringEnumValueId("citus_job_status", "scheduled"); LookupStringEnumValueId("citus_task_status", "scheduled");
} }
return MetadataCache.jobStatusScheduledId; return MetadataCache.citusTaskStatusScheduledId;
} }
Oid Oid
JobStatusRunningId(void) CitusTaskStatusRunningId(void)
{ {
if (!MetadataCache.jobStatusRunningId) if (!MetadataCache.citusTaskStatusRunningId)
{ {
MetadataCache.jobStatusRunningId = MetadataCache.citusTaskStatusRunningId =
LookupStringEnumValueId("citus_job_status", "running"); LookupStringEnumValueId("citus_task_status", "running");
} }
return MetadataCache.jobStatusRunningId; return MetadataCache.citusTaskStatusRunningId;
} }
Oid Oid
JobStatusDoneId(void) CitusTaskStatusDoneId(void)
{ {
if (!MetadataCache.jobStatusDoneId) if (!MetadataCache.citusTaskStatusDoneId)
{ {
MetadataCache.jobStatusDoneId = MetadataCache.citusTaskStatusDoneId =
LookupStringEnumValueId("citus_job_status", "done"); LookupStringEnumValueId("citus_task_status", "done");
} }
return MetadataCache.jobStatusDoneId; return MetadataCache.citusTaskStatusDoneId;
} }
Oid Oid
JobStatusErrorId(void) CitusTaskStatusErrorId(void)
{ {
if (!MetadataCache.jobStatusErrorId) if (!MetadataCache.citusTaskStatusErrorId)
{ {
MetadataCache.jobStatusErrorId = MetadataCache.citusTaskStatusErrorId =
LookupStringEnumValueId("citus_job_status", "error"); LookupStringEnumValueId("citus_task_status", "error");
} }
return MetadataCache.jobStatusErrorId; return MetadataCache.citusTaskStatusErrorId;
} }
Oid Oid
JobStatusUnscheduledId(void) CitusTaskStatusSnscheduledId(void)
{ {
if (!MetadataCache.jobStatusSnscheduledId) if (!MetadataCache.citusTaskStatusSnscheduledId)
{ {
MetadataCache.jobStatusSnscheduledId = MetadataCache.citusTaskStatusSnscheduledId =
LookupStringEnumValueId("citus_job_status", "unscheduled"); LookupStringEnumValueId("citus_task_status", "unscheduled");
} }
return MetadataCache.jobStatusSnscheduledId; return MetadataCache.citusTaskStatusSnscheduledId;
} }

View File

@ -45,7 +45,7 @@
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_rebalance_jobs.h" #include "distributed/pg_dist_background_tasks.h"
#include "distributed/pg_dist_rebalance_jobs_depend.h" #include "distributed/pg_dist_rebalance_jobs_depend.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_placement.h" #include "distributed/pg_dist_placement.h"
@ -2228,26 +2228,26 @@ HasScheduledRebalanceJobs()
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
bool indexOK = true; bool indexOK = true;
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(),
AccessShareLock); AccessShareLock);
/* find any job in states listed here */ /* find any job in states listed here */
RebalanceJobStatus jobs[] = { BackgroundTaskStatus jobs[] = {
REBALANCE_JOB_STATUS_RUNNING, BACKGROUND_TASK_STATUS_RUNNING,
REBALANCE_JOB_STATUS_SCHEDULED BACKGROUND_TASK_STATUS_SCHEDULED
}; };
bool hasScheduledJob = false; bool hasScheduledJob = false;
for (int i = 0; !hasScheduledJob && i < sizeof(jobs) / sizeof(jobs[0]); i++) for (int i = 0; !hasScheduledJob && i < sizeof(jobs) / sizeof(jobs[0]); i++)
{ {
/* pg_dist_rebalance_jobs.status == jobs[i] */ /* pg_dist_rebalance_jobs.status == jobs[i] */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status, ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_status,
BTEqualStrategyNumber, F_OIDEQ, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RebalanceJobStatusOid(jobs[i]))); ObjectIdGetDatum(RebalanceJobStatusOid(jobs[i])));
SysScanDesc scanDescriptor = systable_beginscan( SysScanDesc scanDescriptor = systable_beginscan(
pgDistRebalanceJobs, pgDistRebalanceJobs,
DistRebalanceJobsStatusJobsIdIndexId(), DistBackgroundTasksStatusTaskIdIndexId(),
indexOK, NULL, scanKeyCount, indexOK, NULL, scanKeyCount,
scanKey); scanKey);
@ -2266,42 +2266,42 @@ HasScheduledRebalanceJobs()
} }
static RebalanceJobStatus static BackgroundTaskStatus
RebalanceJobStatusByOid(Oid enumOid) RebalanceJobStatusByOid(Oid enumOid)
{ {
if (enumOid == JobStatusDoneId()) if (enumOid == CitusTaskStatusDoneId())
{ {
return REBALANCE_JOB_STATUS_DONE; return BACKGROUND_TASK_STATUS_DONE;
} }
else if (enumOid == JobStatusScheduledId()) else if (enumOid == CitusTaskStatusScheduledId())
{ {
return REBALANCE_JOB_STATUS_SCHEDULED; return BACKGROUND_TASK_STATUS_SCHEDULED;
} }
else if (enumOid == JobStatusRunningId()) else if (enumOid == CitusTaskStatusRunningId())
{ {
return REBALANCE_JOB_STATUS_RUNNING; return BACKGROUND_TASK_STATUS_RUNNING;
} }
else if (enumOid == JobStatusErrorId()) else if (enumOid == CitusTaskStatusErrorId())
{ {
return REBALANCE_JOB_STATUS_ERROR; return BACKGROUND_TASK_STATUS_ERROR;
} }
else if (enumOid == JobStatusUnscheduledId()) else if (enumOid == CitusTaskStatusSnscheduledId())
{ {
return REBALANCE_JOB_STATUS_UNSCHEDULED; return BACKGROUND_TASK_STATUS_UNSCHEDULED;
} }
ereport(ERROR, (errmsg("unknown enum value for citus_job_status"))); ereport(ERROR, (errmsg("unknown enum value for citus_job_status")));
return REBALANCE_JOB_STATUS_UNKNOWN; return BACKGROUND_TASK_STATUS_UNKNOWN;
} }
bool bool
IsRebalanceJobStatusTerminal(RebalanceJobStatus status) IsRebalanceJobStatusTerminal(BackgroundTaskStatus status)
{ {
switch (status) switch (status)
{ {
case REBALANCE_JOB_STATUS_DONE: case BACKGROUND_TASK_STATUS_DONE:
case REBALANCE_JOB_STATUS_ERROR: case BACKGROUND_TASK_STATUS_ERROR:
case REBALANCE_JOB_STATUS_UNSCHEDULED: case BACKGROUND_TASK_STATUS_UNSCHEDULED:
{ {
return true; return true;
} }
@ -2315,33 +2315,33 @@ IsRebalanceJobStatusTerminal(RebalanceJobStatus status)
Oid Oid
RebalanceJobStatusOid(RebalanceJobStatus status) RebalanceJobStatusOid(BackgroundTaskStatus status)
{ {
switch (status) switch (status)
{ {
case REBALANCE_JOB_STATUS_SCHEDULED: case BACKGROUND_TASK_STATUS_SCHEDULED:
{ {
return JobStatusScheduledId(); return CitusTaskStatusScheduledId();
} }
case REBALANCE_JOB_STATUS_RUNNING: case BACKGROUND_TASK_STATUS_RUNNING:
{ {
return JobStatusRunningId(); return CitusTaskStatusRunningId();
} }
case REBALANCE_JOB_STATUS_DONE: case BACKGROUND_TASK_STATUS_DONE:
{ {
return JobStatusDoneId(); return CitusTaskStatusDoneId();
} }
case REBALANCE_JOB_STATUS_ERROR: case BACKGROUND_TASK_STATUS_ERROR:
{ {
return JobStatusErrorId(); return CitusTaskStatusErrorId();
} }
case REBALANCE_JOB_STATUS_UNSCHEDULED: case BACKGROUND_TASK_STATUS_UNSCHEDULED:
{ {
return JobStatusUnscheduledId(); return CitusTaskStatusSnscheduledId();
} }
default: default:
@ -2353,9 +2353,9 @@ RebalanceJobStatusOid(RebalanceJobStatus status)
int64 int64
GetNextRebalanceJobId(void) GetNextBackgroundTaskTaskId(void)
{ {
text *sequenceName = cstring_to_text(REBALANCE_JOB_JOBID_SEQUENCE_NAME); text *sequenceName = cstring_to_text(PG_DIST_BACKGROUND_TASK_TASK_ID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName, false); Oid sequenceId = ResolveRelationId(sequenceName, false);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid; Oid savedUserId = InvalidOid;
@ -2381,12 +2381,12 @@ ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount,
{ {
RebalanceJob *job = NULL; RebalanceJob *job = NULL;
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(),
RowExclusiveLock); RowExclusiveLock);
Relation pgDistRebalanceJobsDepend = NULL; Relation pgDistRebalanceJobsDepend = NULL;
if (dependingJobCount > 0) if (dependingJobCount > 0)
{ {
pgDistRebalanceJobsDepend = table_open(DistRebalanceJobsDependRelationId(), pgDistRebalanceJobsDepend = table_open(DistBackgroundTaskssDependRelationId(),
RowExclusiveLock); RowExclusiveLock);
} }
@ -2394,21 +2394,21 @@ ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount,
/* 2. insert new job */ /* 2. insert new job */
{ {
Datum values[Natts_pg_dist_rebalance_jobs] = { 0 }; Datum values[Natts_pg_dist_background_tasks] = { 0 };
bool nulls[Natts_pg_dist_rebalance_jobs] = { 0 }; bool nulls[Natts_pg_dist_background_tasks] = { 0 };
memset(nulls, true, sizeof(nulls)); memset(nulls, true, sizeof(nulls));
int64 jobid = GetNextRebalanceJobId(); int64 jobid = GetNextBackgroundTaskTaskId();
values[Anum_pg_dist_rebalance_jobs_jobid - 1] = Int64GetDatum(jobid); values[Anum_pg_dist_background_tasks_task_id - 1] = Int64GetDatum(jobid);
nulls[Anum_pg_dist_rebalance_jobs_jobid - 1] = false; nulls[Anum_pg_dist_background_tasks_task_id - 1] = false;
values[Anum_pg_dist_rebalance_jobs_status - 1] = JobStatusScheduledId(); values[Anum_pg_dist_background_tasks_status - 1] = CitusTaskStatusScheduledId();
nulls[Anum_pg_dist_rebalance_jobs_status - 1] = false; nulls[Anum_pg_dist_background_tasks_status - 1] = false;
values[Anum_pg_dist_rebalance_jobs_command - 1] = CStringGetTextDatum(command); values[Anum_pg_dist_background_tasks_command - 1] = CStringGetTextDatum(command);
nulls[Anum_pg_dist_rebalance_jobs_command - 1] = false; nulls[Anum_pg_dist_background_tasks_command - 1] = false;
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistRebalanceJobs), HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistRebalanceJobs),
values, nulls); values, nulls);
@ -2417,7 +2417,7 @@ ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount,
job = palloc0(sizeof(RebalanceJob)); job = palloc0(sizeof(RebalanceJob));
job->jobid = jobid; job->jobid = jobid;
job->status = REBALANCE_JOB_STATUS_SCHEDULED; job->status = BACKGROUND_TASK_STATUS_SCHEDULED;
job->command = pstrdup(command); job->command = pstrdup(command);
} }
@ -2456,42 +2456,43 @@ ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount,
void void
ResetRunningJobs(void) ResetRunningBackgroundTasks(void)
{ {
const int scanKeyCount = 1; const int scanKeyCount = 1;
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
const bool indexOK = true; const bool indexOK = true;
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(),
AccessShareLock); AccessShareLock);
/* pg_dist_rebalance_jobs.status == 'running' */ /* pg_dist_rebalance_jobs.status == 'running' */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status, ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_status,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(JobStatusRunningId())); BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(
CitusTaskStatusRunningId()));
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistRebalanceJobsStatusJobsIdIndexId(), DistBackgroundTasksStatusTaskIdIndexId(),
indexOK, NULL, scanKeyCount, indexOK, NULL, scanKeyCount,
scanKey); scanKey);
HeapTuple jobTuple = NULL; HeapTuple jobTuple = NULL;
while (HeapTupleIsValid(jobTuple = systable_getnext(scanDescriptor))) while (HeapTupleIsValid(jobTuple = systable_getnext(scanDescriptor)))
{ {
Datum values[Natts_pg_dist_rebalance_jobs] = { 0 }; Datum values[Natts_pg_dist_background_tasks] = { 0 };
bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 }; bool isnull[Natts_pg_dist_background_tasks] = { 0 };
bool replace[Natts_pg_dist_rebalance_jobs] = { 0 }; bool replace[Natts_pg_dist_background_tasks] = { 0 };
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
heap_deform_tuple(jobTuple, tupleDescriptor, values, isnull); heap_deform_tuple(jobTuple, tupleDescriptor, values, isnull);
values[Anum_pg_dist_rebalance_jobs_status - 1] = values[Anum_pg_dist_background_tasks_status - 1] =
ObjectIdGetDatum(JobStatusScheduledId()); ObjectIdGetDatum(CitusTaskStatusScheduledId());
isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false; isnull[Anum_pg_dist_background_tasks_status - 1] = false;
replace[Anum_pg_dist_rebalance_jobs_status - 1] = true; replace[Anum_pg_dist_background_tasks_status - 1] = true;
values[Anum_pg_dist_rebalance_jobs_pid - 1] = InvalidOid; values[Anum_pg_dist_background_tasks_pid - 1] = InvalidOid;
isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = true; isnull[Anum_pg_dist_background_tasks_pid - 1] = true;
replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true; replace[Anum_pg_dist_background_tasks_pid - 1] = true;
jobTuple = heap_modify_tuple(jobTuple, tupleDescriptor, values, isnull, replace); jobTuple = heap_modify_tuple(jobTuple, tupleDescriptor, values, isnull, replace);
@ -2511,8 +2512,9 @@ JobHasUmnetDependencies(int64 jobid)
{ {
bool hasUnmetDependency = false; bool hasUnmetDependency = false;
Relation pgDistRebalanceJobsDepend = table_open(DistRebalanceJobsDependRelationId(), Relation pgDistRebalanceJobsDepend = table_open(
AccessShareLock); DistBackgroundTaskssDependRelationId(),
AccessShareLock);
const int scanKeyCount = 1; const int scanKeyCount = 1;
ScanKeyData scanKey[1] = { 0 }; ScanKeyData scanKey[1] = { 0 };
@ -2523,7 +2525,7 @@ JobHasUmnetDependencies(int64 jobid)
BTEqualStrategyNumber, F_INT8EQ, jobid); BTEqualStrategyNumber, F_INT8EQ, jobid);
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobsDepend, SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobsDepend,
DistRebalanceJobsDependJobIdIndexId(), DistBackgroundTasksDependTaskIdIndexId(),
indexOK, NULL, scanKeyCount, indexOK, NULL, scanKeyCount,
scanKey); scanKey);
@ -2539,7 +2541,7 @@ JobHasUmnetDependencies(int64 jobid)
* Only when the status of all depending jobs is done we clear this job and say * Only when the status of all depending jobs is done we clear this job and say
* that is has no unmet dependencies. * that is has no unmet dependencies.
*/ */
if (dependingJob->status == REBALANCE_JOB_STATUS_DONE) if (dependingJob->status == BACKGROUND_TASK_STATUS_DONE)
{ {
continue; continue;
} }
@ -2549,7 +2551,7 @@ JobHasUmnetDependencies(int64 jobid)
* have errored. Once we move the job to error it should unschedule all dependant * have errored. Once we move the job to error it should unschedule all dependant
* jobs recursively. * jobs recursively.
*/ */
Assert(dependingJob->status != REBALANCE_JOB_STATUS_ERROR); Assert(dependingJob->status != BACKGROUND_TASK_STATUS_ERROR);
hasUnmetDependency = true; hasUnmetDependency = true;
break; break;
@ -2569,36 +2571,36 @@ GetRunableRebalanceJob(void)
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
bool indexOK = true; bool indexOK = true;
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(),
AccessShareLock); AccessShareLock);
RebalanceJobStatus jobStatus[] = { BackgroundTaskStatus jobStatus[] = {
REBALANCE_JOB_STATUS_SCHEDULED BACKGROUND_TASK_STATUS_SCHEDULED
}; };
RebalanceJob *job = NULL; RebalanceJob *job = NULL;
for (int i = 0; !job && i < sizeof(jobStatus) / sizeof(jobStatus[0]); i++) for (int i = 0; !job && i < sizeof(jobStatus) / sizeof(jobStatus[0]); i++)
{ {
/* pg_dist_rebalance_jobs.status == jobStatus[i] */ /* pg_dist_rebalance_jobs.status == jobStatus[i] */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status, ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_status,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum( BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(
RebalanceJobStatusOid(jobStatus[i]))); RebalanceJobStatusOid(jobStatus[i])));
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistRebalanceJobsStatusJobsIdIndexId(), DistBackgroundTasksStatusTaskIdIndexId(),
indexOK, NULL, scanKeyCount, indexOK, NULL, scanKeyCount,
scanKey); scanKey);
HeapTuple jobTuple = NULL; HeapTuple jobTuple = NULL;
while (HeapTupleIsValid(jobTuple = systable_getnext(scanDescriptor))) while (HeapTupleIsValid(jobTuple = systable_getnext(scanDescriptor)))
{ {
Datum datumArray[Natts_pg_dist_rebalance_jobs]; Datum datumArray[Natts_pg_dist_background_tasks];
bool isNullArray[Natts_pg_dist_rebalance_jobs]; bool isNullArray[Natts_pg_dist_background_tasks];
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray); heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray);
int64 jobid = DatumGetInt64( int64 jobid = DatumGetInt64(
datumArray[Anum_pg_dist_rebalance_jobs_jobid - 1]); datumArray[Anum_pg_dist_background_tasks_task_id - 1]);
if (JobHasUmnetDependencies(jobid)) if (JobHasUmnetDependencies(jobid))
{ {
continue; continue;
@ -2607,10 +2609,10 @@ GetRunableRebalanceJob(void)
job = palloc0(sizeof(RebalanceJob)); job = palloc0(sizeof(RebalanceJob));
job->jobid = jobid; job->jobid = jobid;
job->status = RebalanceJobStatusByOid( job->status = RebalanceJobStatusByOid(
DatumGetObjectId(datumArray[Anum_pg_dist_rebalance_jobs_status - 1])); DatumGetObjectId(datumArray[Anum_pg_dist_background_tasks_status - 1]));
job->command = text_to_cstring( job->command = text_to_cstring(
DatumGetTextP(datumArray[Anum_pg_dist_rebalance_jobs_command - 1])); DatumGetTextP(datumArray[Anum_pg_dist_background_tasks_command - 1]));
break; break;
} }
@ -2631,32 +2633,32 @@ GetScheduledRebalanceJobByJobID(int64 jobId)
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
bool indexOK = true; bool indexOK = true;
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(),
AccessShareLock); AccessShareLock);
/* pg_dist_rebalance_jobs.jobid == $jobId */ /* pg_dist_rebalance_jobs.jobid == $jobId */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid, ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId)); BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistRebalanceJobsStatusJobsIdIndexId(), DistBackgroundTasksStatusTaskIdIndexId(),
indexOK, NULL, scanKeyCount, scanKey); indexOK, NULL, scanKeyCount, scanKey);
HeapTuple jobTuple = systable_getnext(scanDescriptor); HeapTuple jobTuple = systable_getnext(scanDescriptor);
RebalanceJob *job = NULL; RebalanceJob *job = NULL;
if (HeapTupleIsValid(jobTuple)) if (HeapTupleIsValid(jobTuple))
{ {
Datum datumArray[Natts_pg_dist_rebalance_jobs]; Datum datumArray[Natts_pg_dist_background_tasks];
bool isNullArray[Natts_pg_dist_rebalance_jobs]; bool isNullArray[Natts_pg_dist_background_tasks];
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray); heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray);
job = palloc0(sizeof(RebalanceJob)); job = palloc0(sizeof(RebalanceJob));
job->jobid = DatumGetInt64(datumArray[Anum_pg_dist_rebalance_jobs_jobid - 1]); job->jobid = DatumGetInt64(datumArray[Anum_pg_dist_background_tasks_task_id - 1]);
job->status = RebalanceJobStatusByOid( job->status = RebalanceJobStatusByOid(
DatumGetObjectId(datumArray[Anum_pg_dist_rebalance_jobs_status - 1])); DatumGetObjectId(datumArray[Anum_pg_dist_background_tasks_status - 1]));
job->command = text_to_cstring( job->command = text_to_cstring(
DatumGetTextP(datumArray[Anum_pg_dist_rebalance_jobs_command - 1])); DatumGetTextP(datumArray[Anum_pg_dist_background_tasks_command - 1]));
} }
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
@ -2667,10 +2669,10 @@ GetScheduledRebalanceJobByJobID(int64 jobId)
void void
UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status, int32 *retry_count, UpdateJobStatus(int64 jobid, pid_t *pid, BackgroundTaskStatus status, int32 *retry_count,
char *message) char *message)
{ {
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(),
RowExclusiveLock); RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
@ -2678,12 +2680,12 @@ UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status, int32 *retry
int scanKeyCount = 1; int scanKeyCount = 1;
/* WHERE jobid = job->jobid */ /* WHERE jobid = job->jobid */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid, ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid)); BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid));
const bool indexOK = true; const bool indexOK = true;
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistRebalanceJobsJobsIdIndexId(), DistBackgroundTasksTaskIdIndexId(),
indexOK, indexOK,
NULL, scanKeyCount, scanKey); NULL, scanKeyCount, scanKey);
@ -2694,9 +2696,9 @@ UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status, int32 *retry
UINT64_FORMAT, jobid))); UINT64_FORMAT, jobid)));
} }
Datum values[Natts_pg_dist_rebalance_jobs] = { 0 }; Datum values[Natts_pg_dist_background_tasks] = { 0 };
bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 }; bool isnull[Natts_pg_dist_background_tasks] = { 0 };
bool replace[Natts_pg_dist_rebalance_jobs] = { 0 }; bool replace[Natts_pg_dist_background_tasks] = { 0 };
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
@ -2708,34 +2710,34 @@ UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status, int32 *retry
if (pid) if (pid)
{ {
UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_pid, false, Int32GetDatum(*pid)); UPDATE_FIELD(Anum_pg_dist_background_tasks_pid, false, Int32GetDatum(*pid));
} }
else else
{ {
UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_pid, true, InvalidOid); UPDATE_FIELD(Anum_pg_dist_background_tasks_pid, true, InvalidOid);
} }
Oid statusOid = ObjectIdGetDatum(RebalanceJobStatusOid(status)); Oid statusOid = ObjectIdGetDatum(RebalanceJobStatusOid(status));
UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_status, false, statusOid); UPDATE_FIELD(Anum_pg_dist_background_tasks_status, false, statusOid);
if (retry_count) if (retry_count)
{ {
UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_retry_count, false, Int32GetDatum( UPDATE_FIELD(Anum_pg_dist_background_tasks_retry_count, false, Int32GetDatum(
*retry_count)); *retry_count));
} }
else else
{ {
UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_retry_count, true, InvalidOid); UPDATE_FIELD(Anum_pg_dist_background_tasks_retry_count, true, InvalidOid);
} }
if (message) if (message)
{ {
Oid messageOid = CStringGetTextDatum(message); Oid messageOid = CStringGetTextDatum(message);
UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_message, false, messageOid); UPDATE_FIELD(Anum_pg_dist_background_tasks_message, false, messageOid);
} }
else else
{ {
UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_message, true, InvalidOid); UPDATE_FIELD(Anum_pg_dist_background_tasks_message, true, InvalidOid);
} }
#undef UPDATE_FIELD #undef UPDATE_FIELD
@ -2754,8 +2756,9 @@ UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status, int32 *retry
static List * static List *
GetDependantJobs(int64 jobid) GetDependantJobs(int64 jobid)
{ {
Relation pgDistRebalanceJobsDepends = table_open(DistRebalanceJobsDependRelationId(), Relation pgDistRebalanceJobsDepends = table_open(
RowExclusiveLock); DistBackgroundTaskssDependRelationId(),
RowExclusiveLock);
const bool indexOK = true; const bool indexOK = true;
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
int scanKeyCount = 1; int scanKeyCount = 1;
@ -2765,7 +2768,7 @@ GetDependantJobs(int64 jobid)
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid)); BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid));
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobsDepends, SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobsDepends,
DistRebalanceJobsDependDependsOnIndexId(), DistBackgroundTasksDependDependsOnIndexId(),
indexOK, indexOK,
NULL, scanKeyCount, scanKey); NULL, scanKeyCount, scanKey);
@ -2792,7 +2795,7 @@ GetDependantJobs(int64 jobid)
void void
UnscheduleDependantJobs(int64 jobid) UnscheduleDependantJobs(int64 jobid)
{ {
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), Relation pgDistRebalanceJobs = table_open(DistBackgroundTasksRelationId(),
RowExclusiveLock); RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
@ -2812,11 +2815,11 @@ UnscheduleDependantJobs(int64 jobid)
int scanKeyCount = 1; int scanKeyCount = 1;
/* WHERE jobid = job->jobid */ /* WHERE jobid = job->jobid */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid, ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(cJobid)); BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(cJobid));
const bool indexOK = true; const bool indexOK = true;
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistRebalanceJobsJobsIdIndexId(), DistBackgroundTasksTaskIdIndexId(),
indexOK, indexOK,
NULL, scanKeyCount, scanKey); NULL, scanKeyCount, scanKey);
@ -2827,14 +2830,14 @@ UnscheduleDependantJobs(int64 jobid)
UINT64_FORMAT, cJobid))); UINT64_FORMAT, cJobid)));
} }
Datum values[Natts_pg_dist_rebalance_jobs] = { 0 }; Datum values[Natts_pg_dist_background_tasks] = { 0 };
bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 }; bool isnull[Natts_pg_dist_background_tasks] = { 0 };
bool replace[Natts_pg_dist_rebalance_jobs] = { 0 }; bool replace[Natts_pg_dist_background_tasks] = { 0 };
values[Anum_pg_dist_rebalance_jobs_status - 1] = values[Anum_pg_dist_background_tasks_status - 1] =
ObjectIdGetDatum(JobStatusUnscheduledId()); ObjectIdGetDatum(CitusTaskStatusSnscheduledId());
isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false; isnull[Anum_pg_dist_background_tasks_status - 1] = false;
replace[Anum_pg_dist_rebalance_jobs_status - 1] = true; replace[Anum_pg_dist_background_tasks_status - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull,
replace); replace);

View File

@ -841,6 +841,17 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL, GUC_NO_SHOW_ALL,
NULL, NULL, NULL); NULL, NULL, NULL);
/* TODO remove before merge */
DefineCustomBoolVariable(
"citus.background_task_monitor_debug_delay",
NULL,
NULL,
&BackgroundTaskMonitorDebugDelay,
false,
PGC_SIGHUP,
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.check_available_space_before_move", "citus.check_available_space_before_move",
gettext_noop("When enabled will check free disk space before a shard move"), gettext_noop("When enabled will check free disk space before a shard move"),
@ -1893,17 +1904,6 @@ RegisterCitusConfigVariables(void)
GUC_UNIT_MS, GUC_UNIT_MS,
NULL, NULL, NULL); NULL, NULL, NULL);
/* TODO remove before merge */
DefineCustomBoolVariable(
"citus.rebalance_job_debug_delay",
NULL,
NULL,
&RebalanceJobDebugDelay,
false,
PGC_SIGHUP,
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.recover_2pc_interval", "citus.recover_2pc_interval",
gettext_noop("Sets the time to wait between recovering 2PCs."), gettext_noop("Sets the time to wait between recovering 2PCs."),

View File

@ -77,31 +77,31 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text); DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text);
#include "udfs/isolate_tenant_to_new_shard/11.1-1.sql" #include "udfs/isolate_tenant_to_new_shard/11.1-1.sql"
CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'done', 'error', 'unscheduled'); CREATE TYPE citus.citus_task_status AS ENUM ('scheduled', 'running', 'done', 'error', 'unscheduled');
ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog; ALTER TYPE citus.citus_task_status SET SCHEMA pg_catalog;
CREATE TABLE citus.pg_dist_rebalance_jobs( CREATE TABLE citus.pg_dist_background_tasks(
jobid bigserial NOT NULL, task_id bigserial NOT NULL,
pid integer, pid integer,
status pg_catalog.citus_job_status default 'scheduled' NOT NULL, status pg_catalog.citus_task_status default 'scheduled' NOT NULL,
command text NOT NULL, command text NOT NULL,
retry_count integer, retry_count integer,
message text message text
); );
ALTER TABLE citus.pg_dist_rebalance_jobs SET SCHEMA pg_catalog; ALTER TABLE citus.pg_dist_background_tasks SET SCHEMA pg_catalog;
CREATE UNIQUE INDEX pg_dist_rebalance_jobs_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(jobid); CREATE UNIQUE INDEX pg_dist_background_tasks_task_id_index ON pg_catalog.pg_dist_background_tasks using btree(task_id);
CREATE INDEX pg_dist_rebalance_jobs_status_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(status, jobid); CREATE INDEX pg_dist_background_tasks_status_task_id_index ON pg_catalog.pg_dist_background_tasks using btree(status, task_id);
CREATE TABLE citus.pg_dist_rebalance_jobs_depend( CREATE TABLE citus.pg_dist_background_tasks_depend(
jobid bigint NOT NULL REFERENCES pg_catalog.pg_dist_rebalance_jobs(jobid) ON DELETE CASCADE, task_id bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_tasks(task_id) ON DELETE CASCADE,
depends_on bigint NOT NULL REFERENCES pg_catalog.pg_dist_rebalance_jobs(jobid) ON DELETE CASCADE, depends_on bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_tasks(task_id) ON DELETE CASCADE,
UNIQUE(jobid, depends_on) UNIQUE(task_id, depends_on)
); );
ALTER TABLE citus.pg_dist_rebalance_jobs_depend SET SCHEMA pg_catalog; ALTER TABLE citus.pg_dist_background_tasks_depend SET SCHEMA pg_catalog;
CREATE INDEX pg_dist_rebalance_jobs_depend_jobid ON pg_catalog.pg_dist_rebalance_jobs_depend USING btree(jobid); CREATE INDEX pg_dist_background_tasks_depend_task_id ON pg_catalog.pg_dist_background_tasks_depend USING btree(task_id);
CREATE INDEX pg_dist_rebalance_jobs_depend_depends_on ON pg_catalog.pg_dist_rebalance_jobs_depend USING btree(depends_on); CREATE INDEX pg_dist_background_tasks_depend_depends_on ON pg_catalog.pg_dist_background_tasks_depend USING btree(depends_on);
#include "udfs/citus_wait_for_rebalance_job/11.1-1.sql" #include "udfs/citus_wait_for_rebalance_job/11.1-1.sql"

View File

@ -29,7 +29,7 @@
#include "distributed/metadata_utility.h" #include "distributed/metadata_utility.h"
#include "distributed/shard_cleaner.h" #include "distributed/shard_cleaner.h"
bool RebalanceJobDebugDelay = false; bool BackgroundTaskMonitorDebugDelay = false;
/* Table-of-contents constants for our dynamic shared memory segment. */ /* Table-of-contents constants for our dynamic shared memory segment. */
#define CITUS_BACKGROUND_JOB_MAGIC 0x51028081 #define CITUS_BACKGROUND_JOB_MAGIC 0x51028081
@ -45,7 +45,7 @@ static BackgroundWorkerHandle * StartCitusBackgroundJobExecuter(char *database,
static void ExecuteSqlString(const char *sql); static void ExecuteSqlString(const char *sql);
BackgroundWorkerHandle * BackgroundWorkerHandle *
StartCitusBackgroundJobWorker(Oid database, Oid extensionOwner) StartCitusBackgroundTaskMonitorWorker(Oid database, Oid extensionOwner)
{ {
BackgroundWorker worker = { 0 }; BackgroundWorker worker = { 0 };
BackgroundWorkerHandle *handle = NULL; BackgroundWorkerHandle *handle = NULL;
@ -53,7 +53,7 @@ StartCitusBackgroundJobWorker(Oid database, Oid extensionOwner)
/* Configure a worker. */ /* Configure a worker. */
memset(&worker, 0, sizeof(worker)); memset(&worker, 0, sizeof(worker));
SafeSnprintf(worker.bgw_name, BGW_MAXLEN, SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
"Citus Rebalance Jobs Worker: %u/%u", "Citus Background Task Monitor: %u/%u",
database, extensionOwner); database, extensionOwner);
worker.bgw_flags = worker.bgw_flags =
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
@ -63,7 +63,7 @@ StartCitusBackgroundJobWorker(Oid database, Oid extensionOwner)
worker.bgw_restart_time = BGW_NEVER_RESTART; worker.bgw_restart_time = BGW_NEVER_RESTART;
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
"CitusBackgroundJobMain"); "CitusBackgroundTaskMonitorMain");
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner, memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
sizeof(Oid)); sizeof(Oid));
@ -82,7 +82,7 @@ StartCitusBackgroundJobWorker(Oid database, Oid extensionOwner)
void void
CitusBackgroundJobMain(Datum arg) CitusBackgroundTaskMonitorMain(Datum arg)
{ {
Oid databaseOid = DatumGetObjectId(arg); Oid databaseOid = DatumGetObjectId(arg);
@ -96,21 +96,24 @@ CitusBackgroundJobMain(Datum arg)
/* connect to database, after that we can actually access catalogs */ /* connect to database, after that we can actually access catalogs */
BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0); BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0);
/* TODO get lock to make sure there is only one worker running per databasse */
/* make worker recognizable in pg_stat_activity */ /* make worker recognizable in pg_stat_activity */
pgstat_report_appname("rebalance jobs worker"); pgstat_report_appname("citus background task monitor");
ereport(LOG, (errmsg("background jobs runner"))); ereport(LOG, (errmsg("citus background task monitor")));
if (RebalanceJobDebugDelay) if (BackgroundTaskMonitorDebugDelay)
{ {
pg_usleep(30 * 1000 * 1000); pg_usleep(30 * 1000 * 1000);
} }
MemoryContext perJobContext = AllocSetContextCreateExtended(CurrentMemoryContext, MemoryContext perTaskContext =
"PerJobContext", AllocSetContextCreateExtended(CurrentMemoryContext,
ALLOCSET_DEFAULT_MINSIZE, "PerTaskContext",
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/* /*
* First we find all jobs that are running, we need to check if they are still running * First we find all jobs that are running, we need to check if they are still running
@ -121,18 +124,18 @@ CitusBackgroundJobMain(Datum arg)
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
/* TODO have an actual function to check if the worker is still running */ /* TODO have an actual function to check if the worker is still running */
ResetRunningJobs(); ResetRunningBackgroundTasks();
PopActiveSnapshot(); PopActiveSnapshot();
CommitTransactionCommand(); CommitTransactionCommand();
} }
MemoryContext oldContextPerJob = MemoryContextSwitchTo(perJobContext); MemoryContext oldContextPerJob = MemoryContextSwitchTo(perTaskContext);
bool hasJobs = true; bool hasJobs = true;
while (hasJobs) while (hasJobs)
{ {
MemoryContextReset(perJobContext); MemoryContextReset(perTaskContext);
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
@ -141,10 +144,10 @@ CitusBackgroundJobMain(Datum arg)
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
/* /*
* We need to load the job into the perJobContext as we will switch contexts * We need to load the job into the perTaskContext as we will switch contexts
* later due to the committing and starting of new transactions * later due to the committing and starting of new transactions
*/ */
MemoryContext oldContext = MemoryContextSwitchTo(perJobContext); MemoryContext oldContext = MemoryContextSwitchTo(perTaskContext);
RebalanceJob *job = GetRunableRebalanceJob(); RebalanceJob *job = GetRunableRebalanceJob();
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
@ -160,7 +163,7 @@ CitusBackgroundJobMain(Datum arg)
PopActiveSnapshot(); PopActiveSnapshot();
CommitTransactionCommand(); CommitTransactionCommand();
MemoryContextSwitchTo(perJobContext); MemoryContextSwitchTo(perTaskContext);
/* TODO find the actual database and username */ /* TODO find the actual database and username */
BackgroundWorkerHandle *handle = BackgroundWorkerHandle *handle =
@ -181,12 +184,12 @@ CitusBackgroundJobMain(Datum arg)
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
/* Update job status to indicate it is running */ /* Update job status to indicate it is running */
UpdateJobStatus(job->jobid, &pid, REBALANCE_JOB_STATUS_RUNNING, NULL, NULL); UpdateJobStatus(job->jobid, &pid, BACKGROUND_TASK_STATUS_RUNNING, NULL, NULL);
PopActiveSnapshot(); PopActiveSnapshot();
CommitTransactionCommand(); CommitTransactionCommand();
MemoryContextSwitchTo(perJobContext); MemoryContextSwitchTo(perTaskContext);
/* TODO keep polling the job */ /* TODO keep polling the job */
while (GetBackgroundWorkerPid(handle, &pid) != BGWH_STOPPED) while (GetBackgroundWorkerPid(handle, &pid) != BGWH_STOPPED)
@ -211,14 +214,14 @@ CitusBackgroundJobMain(Datum arg)
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
/* TODO job can actually also have failed*/ /* TODO job can actually also have failed*/
UpdateJobStatus(job->jobid, NULL, REBALANCE_JOB_STATUS_DONE, NULL, NULL); UpdateJobStatus(job->jobid, NULL, BACKGROUND_TASK_STATUS_DONE, NULL, NULL);
PopActiveSnapshot(); PopActiveSnapshot();
CommitTransactionCommand(); CommitTransactionCommand();
} }
MemoryContextSwitchTo(oldContextPerJob); MemoryContextSwitchTo(oldContextPerJob);
MemoryContextDelete(perJobContext); MemoryContextDelete(perTaskContext);
} }

View File

@ -730,8 +730,8 @@ CitusMaintenanceDaemonMain(Datum main_arg)
"Starting background worker for execution."))); "Starting background worker for execution.")));
rebalanceBgwHandle = rebalanceBgwHandle =
StartCitusBackgroundJobWorker(MyDatabaseId, StartCitusBackgroundTaskMonitorWorker(MyDatabaseId,
myDbData->userOid); myDbData->userOid);
if (!rebalanceBgwHandle || if (!rebalanceBgwHandle ||
GetBackgroundWorkerPid(rebalanceBgwHandle, &rebalanceWorkerPid) == GetBackgroundWorkerPid(rebalanceBgwHandle, &rebalanceWorkerPid) ==

View File

@ -5,11 +5,11 @@
#include "postmaster/bgworker.h" #include "postmaster/bgworker.h"
extern BackgroundWorkerHandle * StartCitusBackgroundJobWorker(Oid database, Oid extern BackgroundWorkerHandle * StartCitusBackgroundTaskMonitorWorker(Oid database, Oid
extensionOwner); extensionOwner);
extern void CitusBackgroundJobMain(Datum arg); extern void CitusBackgroundTaskMonitorMain(Datum arg);
extern void CitusBackgroundJobExecuter(Datum main_arg); extern void CitusBackgroundJobExecuter(Datum main_arg);
extern bool RebalanceJobDebugDelay; extern bool BackgroundTaskMonitorDebugDelay;
#endif /*CITUS_BACKGROUND_JOBS_H */ #endif /*CITUS_BACKGROUND_JOBS_H */

View File

@ -227,7 +227,7 @@ extern Oid DistPartitionRelationId(void);
extern Oid DistShardRelationId(void); extern Oid DistShardRelationId(void);
extern Oid DistPlacementRelationId(void); extern Oid DistPlacementRelationId(void);
extern Oid DistNodeRelationId(void); extern Oid DistNodeRelationId(void);
extern Oid DistRebalanceJobsRelationId(void); extern Oid DistBackgroundTasksRelationId(void);
extern Oid DistRebalanceStrategyRelationId(void); extern Oid DistRebalanceStrategyRelationId(void);
extern Oid DistLocalGroupIdRelationId(void); extern Oid DistLocalGroupIdRelationId(void);
extern Oid DistObjectRelationId(void); extern Oid DistObjectRelationId(void);
@ -237,11 +237,11 @@ extern Oid DistEnabledCustomAggregatesId(void);
extern Oid DistNodeNodeIdIndexId(void); extern Oid DistNodeNodeIdIndexId(void);
extern Oid DistPartitionLogicalRelidIndexId(void); extern Oid DistPartitionLogicalRelidIndexId(void);
extern Oid DistPartitionColocationidIndexId(void); extern Oid DistPartitionColocationidIndexId(void);
extern Oid DistRebalanceJobsJobsIdIndexId(void); extern Oid DistBackgroundTasksTaskIdIndexId(void);
extern Oid DistRebalanceJobsStatusJobsIdIndexId(void); extern Oid DistBackgroundTasksStatusTaskIdIndexId(void);
extern Oid DistRebalanceJobsDependRelationId(void); extern Oid DistBackgroundTaskssDependRelationId(void);
extern Oid DistRebalanceJobsDependJobIdIndexId(void); extern Oid DistBackgroundTasksDependTaskIdIndexId(void);
extern Oid DistRebalanceJobsDependDependsOnIndexId(void); extern Oid DistBackgroundTasksDependDependsOnIndexId(void);
extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardLogicalRelidIndexId(void);
extern Oid DistShardShardidIndexId(void); extern Oid DistShardShardidIndexId(void);
extern Oid DistPlacementShardidIndexId(void); extern Oid DistPlacementShardidIndexId(void);
@ -274,11 +274,11 @@ extern Oid SecondaryNodeRoleId(void);
extern Oid CitusCopyFormatTypeId(void); extern Oid CitusCopyFormatTypeId(void);
extern Oid TextCopyFormatId(void); extern Oid TextCopyFormatId(void);
extern Oid BinaryCopyFormatId(void); extern Oid BinaryCopyFormatId(void);
extern Oid JobStatusScheduledId(void); extern Oid CitusTaskStatusScheduledId(void);
extern Oid JobStatusRunningId(void); extern Oid CitusTaskStatusRunningId(void);
extern Oid JobStatusDoneId(void); extern Oid CitusTaskStatusDoneId(void);
extern Oid JobStatusErrorId(void); extern Oid CitusTaskStatusErrorId(void);
extern Oid JobStatusUnscheduledId(void); extern Oid CitusTaskStatusSnscheduledId(void);
/* user related functions */ /* user related functions */
extern Oid CitusExtensionOwner(void); extern Oid CitusExtensionOwner(void);

View File

@ -24,7 +24,7 @@
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/errormessage.h" #include "distributed/errormessage.h"
#include "distributed/pg_dist_rebalance_jobs.h" #include "distributed/pg_dist_background_tasks.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "utils/acl.h" #include "utils/acl.h"
@ -204,21 +204,21 @@ typedef enum SizeQueryType
TABLE_SIZE /* pg_table_size() */ TABLE_SIZE /* pg_table_size() */
} SizeQueryType; } SizeQueryType;
typedef enum RebalanceJobStatus typedef enum BackgroundTaskStatus
{ {
REBALANCE_JOB_STATUS_UNKNOWN, BACKGROUND_TASK_STATUS_UNKNOWN,
REBALANCE_JOB_STATUS_SCHEDULED, BACKGROUND_TASK_STATUS_SCHEDULED,
REBALANCE_JOB_STATUS_RUNNING, BACKGROUND_TASK_STATUS_RUNNING,
REBALANCE_JOB_STATUS_DONE, BACKGROUND_TASK_STATUS_DONE,
REBALANCE_JOB_STATUS_ERROR, BACKGROUND_TASK_STATUS_ERROR,
REBALANCE_JOB_STATUS_UNSCHEDULED BACKGROUND_TASK_STATUS_UNSCHEDULED
} RebalanceJobStatus; } BackgroundTaskStatus;
typedef struct RebalanceJob typedef struct RebalanceJob
{ {
int64 jobid; int64 jobid;
RebalanceJobStatus status; BackgroundTaskStatus status;
char *command; char *command;
} RebalanceJob; } RebalanceJob;
@ -331,17 +331,17 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid
extern void AlterSequenceType(Oid seqOid, Oid typeOid); extern void AlterSequenceType(Oid seqOid, Oid typeOid);
extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId); extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId);
extern bool HasScheduledRebalanceJobs(void); extern bool HasScheduledRebalanceJobs(void);
extern int64 GetNextRebalanceJobId(void); extern int64 GetNextBackgroundTaskTaskId(void);
extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount, extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount,
int64 dependingJobIds[]); int64 dependingJobIds[]);
extern bool JobHasUmnetDependencies(int64 jobid); extern bool JobHasUmnetDependencies(int64 jobid);
extern RebalanceJob * GetRunableRebalanceJob(void); extern RebalanceJob * GetRunableRebalanceJob(void);
extern void ResetRunningJobs(void); extern void ResetRunningBackgroundTasks(void);
extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId); extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId);
extern void UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status, extern void UpdateJobStatus(int64 jobid, pid_t *pid, BackgroundTaskStatus status,
int32 *retry_count, char *message); int32 *retry_count, char *message);
extern bool UpdateJobError(RebalanceJob *job, ErrorData *edata); extern bool UpdateJobError(RebalanceJob *job, ErrorData *edata);
extern void UnscheduleDependantJobs(int64 jobid); extern void UnscheduleDependantJobs(int64 jobid);
extern bool IsRebalanceJobStatusTerminal(RebalanceJobStatus status); extern bool IsRebalanceJobStatusTerminal(BackgroundTaskStatus status);
extern Oid RebalanceJobStatusOid(RebalanceJobStatus status); extern Oid RebalanceJobStatusOid(BackgroundTaskStatus status);
#endif /* METADATA_UTILITY_H */ #endif /* METADATA_UTILITY_H */

View File

@ -0,0 +1,20 @@
#ifndef CITUS_PG_DIST_BACKGROUND_TASKS_H
#define CITUS_PG_DIST_BACKGROUND_TASKS_H
/* ----------------
* compiler constants for pg_dist_background_tasks
* ----------------
*/
#define Natts_pg_dist_background_tasks 6
#define Anum_pg_dist_background_tasks_task_id 1
#define Anum_pg_dist_background_tasks_pid 2
#define Anum_pg_dist_background_tasks_status 3
#define Anum_pg_dist_background_tasks_command 4
#define Anum_pg_dist_background_tasks_retry_count 5
#define Anum_pg_dist_background_tasks_message 6
#define PG_DIST_BACKGROUND_TASK_TASK_ID_SEQUENCE_NAME \
"pg_catalog.pg_dist_background_tasks_task_id_seq"
#endif /* CITUS_PG_DIST_BACKGROUND_TASKS_H */

View File

@ -1,19 +0,0 @@
#ifndef CITUS_PG_DIST_REBALANCE_JOBS_H
#define CITUS_PG_DIST_REBALANCE_JOBS_H
/* ----------------
* compiler constants for pg_dist_rebalance_jobs
* ----------------
*/
#define Natts_pg_dist_rebalance_jobs 6
#define Anum_pg_dist_rebalance_jobs_jobid 1
#define Anum_pg_dist_rebalance_jobs_pid 2
#define Anum_pg_dist_rebalance_jobs_status 3
#define Anum_pg_dist_rebalance_jobs_command 4
#define Anum_pg_dist_rebalance_jobs_retry_count 5
#define Anum_pg_dist_rebalance_jobs_message 6
#define REBALANCE_JOB_JOBID_SEQUENCE_NAME "pg_catalog.pg_dist_rebalance_jobs_jobid_seq"
#endif /* CITUS_PG_DIST_REBALANCE_JOBS_H */