Add running state to rebalance job with pid reported

background-job-details
Nils Dijk 2022-07-18 17:11:48 +02:00 committed by Jelte Fennema
parent 3cf14ee816
commit 836950ff07
8 changed files with 137 additions and 44 deletions

View File

@ -145,6 +145,7 @@ typedef struct MetadataCacheData
Oid distRebalanceJobsJobsIndexId;
Oid distRebalanceJobsStatusJobsIndexId;
Oid jobStatusScheduledId;
Oid jobStatusRunningId;
Oid jobStatusDoneId;
Oid jobStatusErrorId;
Oid distRebalanceStrategyRelationId;
@ -3135,6 +3136,19 @@ JobStatusScheduledId(void)
}
Oid
JobStatusRunningId(void)
{
if (!MetadataCache.jobStatusRunningId)
{
MetadataCache.jobStatusRunningId =
LookupStringEnumValueId("citus_job_status", "running");
}
return MetadataCache.jobStatusRunningId;
}
Oid
JobStatusDoneId(void)
{

View File

@ -2230,23 +2230,35 @@ HasScheduledRebalanceJobs()
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(),
AccessShareLock);
/* pg_dist_rebalance_jobs.status == 'scheduled' */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(JobStatusScheduledId()));
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistRebalanceJobsStatusJobsIdIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
HeapTuple jobTuple = systable_getnext(scanDescriptor);
/* find any job in states listed here */
RebalanceJobStatus jobs[] = {
REBALANCE_JOB_STATUS_RUNNING,
REBALANCE_JOB_STATUS_SCHEDULED
};
bool hasScheduledJob = false;
if (HeapTupleIsValid(jobTuple))
for (int i = 0; !hasScheduledJob && i < sizeof(jobs) / sizeof(jobs[0]); i++)
{
hasScheduledJob = true;
/* pg_dist_rebalance_jobs.status == jobs[i] */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RebalanceJobStatusOid(jobs[i])));
SysScanDesc scanDescriptor = systable_beginscan(
pgDistRebalanceJobs,
DistRebalanceJobsStatusJobsIdIndexId(),
indexOK, NULL, scanKeyCount,
scanKey);
HeapTuple jobTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(jobTuple))
{
hasScheduledJob = true;
}
systable_endscan(scanDescriptor);
}
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobs, AccessShareLock);
return hasScheduledJob;
@ -2264,6 +2276,10 @@ RebalanceJobStatusByOid(Oid enumOid)
{
return REBALANCE_JOB_STATUS_SCHEDULED;
}
else if (enumOid == JobStatusRunningId())
{
return REBALANCE_JOB_STATUS_RUNNING;
}
else if (enumOid == JobStatusErrorId())
{
return REBALANCE_JOB_STATUS_ERROR;
@ -2292,7 +2308,7 @@ IsRebalanceJobStatusTerminal(RebalanceJobStatus status)
}
static Oid
Oid
RebalanceJobStatusOid(RebalanceJobStatus status)
{
switch (status)
@ -2302,6 +2318,11 @@ RebalanceJobStatusOid(RebalanceJobStatus status)
return JobStatusScheduledId();
}
case REBALANCE_JOB_STATUS_RUNNING:
{
return JobStatusRunningId();
}
case REBALANCE_JOB_STATUS_DONE:
{
return JobStatusDoneId();
@ -2385,36 +2406,47 @@ GetScheduledRebalanceJob(void)
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(),
AccessShareLock);
/* pg_dist_rebalance_jobs.status == 'scheduled' */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(JobStatusScheduledId()));
RebalanceJobStatus jobStatus[] = {
REBALANCE_JOB_STATUS_RUNNING,
REBALANCE_JOB_STATUS_SCHEDULED
};
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistRebalanceJobsStatusJobsIdIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
HeapTuple jobTuple = systable_getnext(scanDescriptor);
RebalanceJob *job = NULL;
if (HeapTupleIsValid(jobTuple))
for (int i = 0; !job && i < sizeof(jobStatus) / sizeof(jobStatus[0]); i++)
{
Form_pg_dist_rebalance_job jobData = NULL;
jobData = (Form_pg_dist_rebalance_job) GETSTRUCT(jobTuple);
/* pg_dist_rebalance_jobs.status == jobStatus[i] */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(
RebalanceJobStatusOid(jobStatus[i])));
job = palloc0(sizeof(RebalanceJob));
job->jobid = jobData->jobid;
job->status = RebalanceJobStatusByOid(jobData->status);
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistRebalanceJobsStatusJobsIdIndexId(),
indexOK, NULL, scanKeyCount,
scanKey);
/* TODO parse the actual job */
Datum datumArray[Natts_pg_dist_rebalance_jobs];
bool isNullArray[Natts_pg_dist_rebalance_jobs];
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray);
HeapTuple jobTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(jobTuple))
{
Form_pg_dist_rebalance_job jobData = NULL;
jobData = (Form_pg_dist_rebalance_job) GETSTRUCT(jobTuple);
job->command = text_to_cstring(
DatumGetTextP(datumArray[Anum_pg_dist_rebalance_jobs_command - 1]));
job = palloc0(sizeof(RebalanceJob));
job->jobid = jobData->jobid;
job->status = RebalanceJobStatusByOid(jobData->status);
/* TODO parse the actual job */
Datum datumArray[Natts_pg_dist_rebalance_jobs];
bool isNullArray[Natts_pg_dist_rebalance_jobs];
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray);
job->command = text_to_cstring(
DatumGetTextP(datumArray[Anum_pg_dist_rebalance_jobs_command - 1]));
}
systable_endscan(scanDescriptor);
}
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobs, AccessShareLock);
return job;
@ -2422,7 +2454,7 @@ GetScheduledRebalanceJob(void)
RebalanceJob *
GetScheduledRebalanceJobyJobID(int64 jobId)
GetScheduledRebalanceJobByJobID(int64 jobId)
{
const int scanKeyCount = 1;
ScanKeyData scanKey[1];
@ -2503,6 +2535,21 @@ UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus)
isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false;
replace[Anum_pg_dist_rebalance_jobs_status - 1] = true;
/* TODO figure out a nice way on how to update a tuple selectively */
if (newStatus == REBALANCE_JOB_STATUS_RUNNING)
{
/* update pid for running status */
values[Anum_pg_dist_rebalance_jobs_pid - 1] = Int32GetDatum((int32) MyProcPid);
isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = false;
replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true;
}
else
{
values[Anum_pg_dist_rebalance_jobs_pid - 1] = 0;
isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = true;
replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true;
}
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
CatalogTupleUpdate(pgDistRebalanceJobs, &heapTuple->t_self, heapTuple);

View File

@ -77,11 +77,12 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text);
#include "udfs/isolate_tenant_to_new_shard/11.1-1.sql"
CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'done', 'error');
CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'done', 'error');
ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog;
CREATE TABLE citus.pg_dist_rebalance_jobs(
jobid bigserial NOT NULL,
pid integer,
status pg_catalog.citus_job_status default 'scheduled' NOT NULL,
command text NOT NULL,
retry_count integer,

View File

@ -858,9 +858,18 @@ RebalanceJobsBackgroundWorkerMain(Datum arg)
/* pg_usleep(30 * 1000 * 1000); */
MemoryContext perJobContext = AllocSetContextCreateExtended(CurrentMemoryContext,
"PerJobContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContext oldContextPerJob = MemoryContextSwitchTo(perJobContext);
bool hasJobs = true;
while (hasJobs)
{
MemoryContextReset(perJobContext);
CHECK_FOR_INTERRUPTS();
InvalidateMetadataSystemCache();
@ -875,11 +884,26 @@ RebalanceJobsBackgroundWorkerMain(Datum arg)
}
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
{
/*
* We need to load the job into the perJobContext as we will switch contexts
* later due to the committing and starting of new transactions
*/
MemoryContext oldContext = MemoryContextSwitchTo(perJobContext);
RebalanceJob *job = GetScheduledRebalanceJob();
MemoryContextSwitchTo(oldContext);
if (job)
{
ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid)));
MemoryContext savedContext = CurrentMemoryContext;
UpdateJobStatus(job, REBALANCE_JOB_STATUS_RUNNING);
PopActiveSnapshot();
CommitTransactionCommand();
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
BeginInternalSubTransaction(NULL);
PG_TRY();
@ -919,6 +943,8 @@ RebalanceJobsBackgroundWorkerMain(Datum arg)
CommitTransactionCommand();
ProcessCompletedNotifies();
}
MemoryContextSwitchTo(oldContextPerJob);
MemoryContextDelete(perJobContext);
}

View File

@ -25,7 +25,7 @@ citus_wait_for_rebalance_job(PG_FUNCTION_ARGS)
{
CHECK_FOR_INTERRUPTS();
RebalanceJob *job = GetScheduledRebalanceJobyJobID(jobid);
RebalanceJob *job = GetScheduledRebalanceJobByJobID(jobid);
if (!job)
{
ereport(ERROR, (errmsg("unkown job with jobid: %ld", jobid)));

View File

@ -272,6 +272,7 @@ extern Oid CitusCopyFormatTypeId(void);
extern Oid TextCopyFormatId(void);
extern Oid BinaryCopyFormatId(void);
extern Oid JobStatusScheduledId(void);
extern Oid JobStatusRunningId(void);
extern Oid JobStatusDoneId(void);
extern Oid JobStatusErrorId(void);

View File

@ -208,6 +208,7 @@ typedef enum RebalanceJobStatus
{
REBALANCE_JOB_STATUS_UNKNOWN,
REBALANCE_JOB_STATUS_SCHEDULED,
REBALANCE_JOB_STATUS_RUNNING,
REBALANCE_JOB_STATUS_DONE,
REBALANCE_JOB_STATUS_ERROR
} RebalanceJobStatus;
@ -332,8 +333,9 @@ extern bool HasScheduledRebalanceJobs(void);
extern int64 GetNextRebalanceJobId(void);
extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command);
extern RebalanceJob * GetScheduledRebalanceJob(void);
extern RebalanceJob * GetScheduledRebalanceJobyJobID(int64 jobId);
extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId);
extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus);
extern void UpdateJobError(RebalanceJob *job, ErrorData *edata);
extern bool IsRebalanceJobStatusTerminal(RebalanceJobStatus status);
extern Oid RebalanceJobStatusOid(RebalanceJobStatus status);
#endif /* METADATA_UTILITY_H */

View File

@ -9,6 +9,7 @@
typedef struct FormData_pg_dist_rebalance_job
{
int64 jobid;
int32 pid;
Oid status;
#ifdef CATALOG_VARLEN /* variable-length fields start here */
text command;
@ -28,12 +29,13 @@ typedef FormData_pg_dist_rebalance_job *Form_pg_dist_rebalance_job;
* compiler constants for pg_dist_rebalance_jobs
* ----------------
*/
#define Natts_pg_dist_rebalance_jobs 5
#define Natts_pg_dist_rebalance_jobs 6
#define Anum_pg_dist_rebalance_jobs_jobid 1
#define Anum_pg_dist_rebalance_jobs_status 2
#define Anum_pg_dist_rebalance_jobs_command 3
#define Anum_pg_dist_rebalance_jobs_retry_count 4
#define Anum_pg_dist_rebalance_jobs_message 5
#define Anum_pg_dist_rebalance_jobs_pid 2
#define Anum_pg_dist_rebalance_jobs_status 3
#define Anum_pg_dist_rebalance_jobs_command 4
#define Anum_pg_dist_rebalance_jobs_retry_count 5
#define Anum_pg_dist_rebalance_jobs_message 6
#define REBALANCE_JOB_JOBID_SEQUENCE_NAME "pg_catalog.pg_dist_rebalance_jobs_jobid_seq"