Refactor job cancellation logic in background job processing for improved clarity and maintainability

m3hm3t/issue_7896
Mehmet Yilmaz 2025-04-04 11:02:32 +00:00
parent 3c23f897e4
commit 85a1c730ce
1 changed files with 101 additions and 98 deletions

View File

@ -256,95 +256,99 @@ citus_task_wait(PG_FUNCTION_ARGS)
void void
citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus) citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus)
{ {
PG_TRY(); PG_TRY();
{ {
/* /*
* Since we are wait polling, we actually allocate memory on every poll. To avoid * Since we are wait polling, we actually allocate memory on every poll. To avoid
* putting unneeded pressure on memory, we create a context that we reset * putting unneeded pressure on memory, we create a context that we reset
* every iteration. * every iteration.
*/ */
MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext, MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext,
"JobsWaitContext", "JobsWaitContext",
ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MAXSIZE);
MemoryContext oldContext = MemoryContextSwitchTo(waitContext); MemoryContext oldContext = MemoryContextSwitchTo(waitContext);
while (true) while (true)
{ {
MemoryContextReset(waitContext); MemoryContextReset(waitContext);
BackgroundJob *job = GetBackgroundJobByJobId(jobid); BackgroundJob *job = GetBackgroundJobByJobId(jobid);
if (!job) if (!job)
{ {
ereport(ERROR, ereport(ERROR,
(errmsg("no job found for job with jobid: %ld", jobid))); (errmsg("no job found for job with jobid: %ld", jobid)));
} }
/* If we have a desiredStatus and we've reached it, we're done */ /* If we have a desiredStatus and we've reached it, we're done */
if (desiredStatus && job->state == *desiredStatus) if (desiredStatus && job->state == *desiredStatus)
{ {
break; break;
} }
/* If the job is in a terminal state (e.g. SUCCEEDED, FAILED, or CANCELED), /* If the job is in a terminal state (e.g. SUCCEEDED, FAILED, or CANCELED),
* but not the desired state, throw an error or stop waiting. * but not the desired state, throw an error or stop waiting.
*/ */
if (IsBackgroundJobStatusTerminal(job->state)) if (IsBackgroundJobStatusTerminal(job->state))
{ {
if (desiredStatus) if (desiredStatus)
{ {
Oid reachedStatusOid = BackgroundJobStatusOid(job->state); Oid reachedStatusOid = BackgroundJobStatusOid(job->state);
Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out, reachedStatusOid); Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out,
char *reachedStatusName = DatumGetCString(reachedStatusNameDatum); reachedStatusOid);
char *reachedStatusName = DatumGetCString(reachedStatusNameDatum);
Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus); Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus);
Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, desiredStatusOid); Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out,
char *desiredStatusName = DatumGetCString(desiredStatusNameDatum); desiredStatusOid);
char *desiredStatusName = DatumGetCString(desiredStatusNameDatum);
ereport(ERROR, ereport(ERROR,
(errmsg("Job reached terminal state \"%s\" instead of desired " (errmsg(
"state \"%s\"", reachedStatusName, desiredStatusName))); "Job reached terminal state \"%s\" instead of desired "
} "state \"%s\"", reachedStatusName,
desiredStatusName)));
}
/* Otherwise, if no desiredStatus was given, we accept this terminal state. */ /* Otherwise, if no desiredStatus was given, we accept this terminal state. */
break; break;
} }
/* Before sleeping, check for user interrupts (Ctrl+C, statement_timeout, etc.) */ /* Before sleeping, check for user interrupts (Ctrl+C, statement_timeout, etc.) */
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
/* Sleep 1 second before re-checking the job status */ /* Sleep 1 second before re-checking the job status */
const long delay_ms = 1000; const long delay_ms = 1000;
(void) WaitLatch(MyLatch, (void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
delay_ms, delay_ms,
WAIT_EVENT_PG_SLEEP); WAIT_EVENT_PG_SLEEP);
ResetLatch(MyLatch); ResetLatch(MyLatch);
} }
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
MemoryContextDelete(waitContext); MemoryContextDelete(waitContext);
} }
PG_CATCH(); PG_CATCH();
{ {
/* /*
* If we get here, the user canceled the statement or an ERROR occurred. * If we get here, the user canceled the statement or an ERROR occurred.
* We forcibly cancel the job so that it doesn't remain running in background. * We forcibly cancel the job so that it doesn't remain running in background.
* This ensures no "zombie" shard moves or leftover replication slots. * This ensures no "zombie" shard moves or leftover replication slots.
*/ */
/* Switch out of the waitContext so we can safely do cleanup in TopMemoryContext. */ /* Switch out of the waitContext so we can safely do cleanup in TopMemoryContext. */
MemoryContextSwitchTo(TopMemoryContext); MemoryContextSwitchTo(TopMemoryContext);
/* Attempt to cancel the job; if it's already in a terminal state, that's okay. */ /* Attempt to cancel the job; if it's already in a terminal state, that's okay. */
citus_cancel_job(jobid); citus_cancel_job(jobid);
/* Re-throw the original error so Postgres knows this statement was canceled. */ /* Re-throw the original error so Postgres knows this statement was canceled. */
PG_RE_THROW(); PG_RE_THROW();
} }
PG_END_TRY(); PG_END_TRY();
} }
@ -356,36 +360,35 @@ citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus)
bool bool
citus_cancel_job(int64 jobId) citus_cancel_job(int64 jobId)
{ {
BackgroundJob *job = GetBackgroundJobByJobId(jobId); BackgroundJob *job = GetBackgroundJobByJobId(jobId);
if (!job) if (!job)
{ {
/* No such job ID */ /* No such job ID */
return false; return false;
} }
/* /*
* If the job is already in a terminal state, or is scheduled, * If the job is already in a terminal state, or is scheduled,
* decide if you want to do anything special. * decide if you want to do anything special.
* But typically you just check if it is still "running" or "cancelling". * But typically you just check if it is still "running" or "cancelling".
*/ */
if (IsBackgroundJobStatusTerminal(job->state)) if (IsBackgroundJobStatusTerminal(job->state))
{ {
return false; return false;
} }
/* Mark job as canceled, then update the catalog */ /* Mark job as canceled, then update the catalog */
job->state = BACKGROUND_JOB_STATUS_CANCELLED; job->state = BACKGROUND_JOB_STATUS_CANCELLED;
/* This projects the tasks states into the job's new state, /* This projects the tasks states into the job's new state,
* and updates the row in pg_dist_background_job. * and updates the row in pg_dist_background_job.
*/ */
UpdateBackgroundJob(job->jobid); UpdateBackgroundJob(job->jobid);
return true; return true;
} }
/* /*
* citus_task_wait_internal implements the waiting on a task for reuse in other areas where * citus_task_wait_internal implements the waiting on a task for reuse in other areas where
* we want to wait on tasks. * we want to wait on tasks.