reset job status' on restart of background worker

background-job-details
Nils Dijk 2022-07-20 17:07:11 +02:00 committed by Jelte Fennema
parent 2fbcba6c2a
commit 6fa08619cf
5 changed files with 85 additions and 2 deletions

View File

@ -2396,6 +2396,57 @@ ScheduleBackgrounRebalanceJob(char *command)
}
void
ResetRunningJobs(void)
{
const int scanKeyCount = 1;
ScanKeyData scanKey[1];
const bool indexOK = true;
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(),
AccessShareLock);
/* pg_dist_rebalance_jobs.status == 'running' */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(JobStatusRunningId()));
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistRebalanceJobsStatusJobsIdIndexId(),
indexOK, NULL, scanKeyCount,
scanKey);
HeapTuple jobTuple = NULL;
while (HeapTupleIsValid(jobTuple = systable_getnext(scanDescriptor)))
{
Datum values[Natts_pg_dist_rebalance_jobs] = { 0 };
bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 };
bool replace[Natts_pg_dist_rebalance_jobs] = { 0 };
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
heap_deform_tuple(jobTuple, tupleDescriptor, values, isnull);
values[Anum_pg_dist_rebalance_jobs_status - 1] =
ObjectIdGetDatum(JobStatusScheduledId());
isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false;
replace[Anum_pg_dist_rebalance_jobs_status - 1] = true;
values[Anum_pg_dist_rebalance_jobs_pid - 1] = InvalidOid;
isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = true;
replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true;
jobTuple = heap_modify_tuple(jobTuple, tupleDescriptor, values, isnull, replace);
CatalogTupleUpdate(pgDistRebalanceJobs, &jobTuple->t_self, jobTuple);
}
CommandCounterIncrement();
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobs, AccessShareLock);
}
RebalanceJob *
GetScheduledRebalanceJob(void)
{
@ -2407,7 +2458,6 @@ GetScheduledRebalanceJob(void)
AccessShareLock);
RebalanceJobStatus jobStatus[] = {
REBALANCE_JOB_STATUS_RUNNING,
REBALANCE_JOB_STATUS_SCHEDULED
};

View File

@ -1892,6 +1892,17 @@ RegisterCitusConfigVariables(void)
GUC_UNIT_MS,
NULL, NULL, NULL);
/* TODO remove before merge */
DefineCustomBoolVariable(
"citus.rebalance_job_debug_delay",
NULL,
NULL,
&RebalanceJobDebugDelay,
false,
PGC_SIGHUP,
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.recover_2pc_interval",
gettext_noop("Sets the time to wait between recovering 2PCs."),

View File

@ -97,6 +97,7 @@ double DistributedDeadlockDetectionTimeoutFactor = 2.0;
int Recover2PCInterval = 60000;
int DeferShardDeleteInterval = 15000;
int RebalanceCheckInterval = 1000;
bool RebalanceJobDebugDelay = false;
/* config variables for metadata sync timeout */
int MetadataSyncInterval = 60000;
@ -856,7 +857,10 @@ RebalanceJobsBackgroundWorkerMain(Datum arg)
ereport(LOG, (errmsg("background jobs runner")));
/* pg_usleep(30 * 1000 * 1000); */
if (RebalanceJobDebugDelay)
{
pg_usleep(30 * 1000 * 1000);
}
MemoryContext perJobContext = AllocSetContextCreateExtended(CurrentMemoryContext,
"PerJobContext",
@ -864,6 +868,22 @@ RebalanceJobsBackgroundWorkerMain(Datum arg)
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/*
* First we find all jobs that are running, we need to check if they are still running
* if not reset their state back to scheduled.
*/
{
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
/* TODO have an actual function to check if the worker is still running */
ResetRunningJobs();
PopActiveSnapshot();
CommitTransactionCommand();
}
MemoryContext oldContextPerJob = MemoryContextSwitchTo(perJobContext);
bool hasJobs = true;
while (hasJobs)

View File

@ -333,6 +333,7 @@ extern bool HasScheduledRebalanceJobs(void);
extern int64 GetNextRebalanceJobId(void);
extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command);
extern RebalanceJob * GetScheduledRebalanceJob(void);
extern void ResetRunningJobs(void);
extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId);
extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus);
extern void UpdateJobError(RebalanceJob *job, ErrorData *edata);

View File

@ -14,6 +14,7 @@
/* GUC to configure deferred shard deletion */
extern int DeferShardDeleteInterval;
extern int RebalanceCheckInterval;
extern bool RebalanceJobDebugDelay;
extern bool DeferShardDeleteOnMove;
extern double DesiredPercentFreeAfterMove;
extern bool CheckAvailableSpaceBeforeMove;