Implement job cancellation mechanism in background job processing

m3hm3t/issue_7896
Mehmet Yilmaz 2025-04-02 13:18:22 +00:00
parent a7e686c106
commit 3c23f897e4
1 changed files with 123 additions and 63 deletions

View File

@ -117,6 +117,7 @@ static void QueueMonitorSigTermHandler(SIGNAL_ARGS);
static void QueueMonitorSigIntHandler(SIGNAL_ARGS); static void QueueMonitorSigIntHandler(SIGNAL_ARGS);
static void QueueMonitorSigHupHandler(SIGNAL_ARGS); static void QueueMonitorSigHupHandler(SIGNAL_ARGS);
static void DecrementParallelTaskCountForNodesInvolved(BackgroundTask *task); static void DecrementParallelTaskCountForNodesInvolved(BackgroundTask *task);
static bool citus_cancel_job(int64 jobid);
/* flags set by signal handlers */ /* flags set by signal handlers */
static volatile sig_atomic_t GotSigterm = false; static volatile sig_atomic_t GotSigterm = false;
@ -244,88 +245,147 @@ citus_task_wait(PG_FUNCTION_ARGS)
/* /*
* citus_job_wait_internal implements the waiting on a job for reuse in other areas where * citus_job_wait_internal implements the waiting on a job, e.g. for the background
* we want to wait on jobs. eg the background rebalancer. * rebalancer. If desiredStatus is provided, we throw an error if we reach a
* different terminal state that can never transition to the desired state.
* *
* When a desiredStatus is provided it will provide an error when a different state is * With the PG_TRY/PG_CATCH block, if the user cancels this SQL statement
* reached and the state cannot ever reach the desired state anymore. * (Ctrl+C, statement_timeout, etc.), we will cancel the job in progress
* so it doesn't remain running in background.
*/ */
void void
citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus) citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus)
{ {
/* PG_TRY();
* Since we are wait polling we will actually allocate memory on every poll. To make {
* sure we don't put unneeded pressure on the memory we create a context that we clear /*
* every iteration. * Since we are wait polling, we actually allocate memory on every poll. To avoid
*/ * putting unneeded pressure on memory, we create a context that we reset
MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext, * every iteration.
"JobsWaitContext", */
ALLOCSET_DEFAULT_MINSIZE, MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext,
ALLOCSET_DEFAULT_INITSIZE, "JobsWaitContext",
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MINSIZE,
MemoryContext oldContext = MemoryContextSwitchTo(waitContext); ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContext oldContext = MemoryContextSwitchTo(waitContext);
while (true) while (true)
{ {
MemoryContextReset(waitContext); MemoryContextReset(waitContext);
BackgroundJob *job = GetBackgroundJobByJobId(jobid); BackgroundJob *job = GetBackgroundJobByJobId(jobid);
if (!job) if (!job)
{ {
ereport(ERROR, (errmsg("no job found for job with jobid: %ld", jobid))); ereport(ERROR,
} (errmsg("no job found for job with jobid: %ld", jobid)));
}
if (desiredStatus && job->state == *desiredStatus) /* If we have a desiredStatus and we've reached it, we're done */
{ if (desiredStatus && job->state == *desiredStatus)
/* job has reached its desired status, done waiting */ {
break; break;
} }
if (IsBackgroundJobStatusTerminal(job->state)) /* If the job is in a terminal state (e.g. SUCCEEDED, FAILED, or CANCELED),
{ * but not the desired state, throw an error or stop waiting.
if (desiredStatus) */
{ if (IsBackgroundJobStatusTerminal(job->state))
/* {
* We have reached a terminal state, which is not the desired state we if (desiredStatus)
* were waiting for, otherwise we would have escaped earlier. Since it is {
* a terminal state we know that we can never reach the desired state. Oid reachedStatusOid = BackgroundJobStatusOid(job->state);
*/ Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out, reachedStatusOid);
char *reachedStatusName = DatumGetCString(reachedStatusNameDatum);
Oid reachedStatusOid = BackgroundJobStatusOid(job->state); Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus);
Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out, Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, desiredStatusOid);
reachedStatusOid); char *desiredStatusName = DatumGetCString(desiredStatusNameDatum);
char *reachedStatusName = DatumGetCString(reachedStatusNameDatum);
Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus); ereport(ERROR,
Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, (errmsg("Job reached terminal state \"%s\" instead of desired "
desiredStatusOid); "state \"%s\"", reachedStatusName, desiredStatusName)));
char *desiredStatusName = DatumGetCString(desiredStatusNameDatum); }
ereport(ERROR, /* Otherwise, if no desiredStatus was given, we accept this terminal state. */
(errmsg("Job reached terminal state \"%s\" instead of desired " break;
"state \"%s\"", reachedStatusName, desiredStatusName))); }
}
/* job has reached its terminal state, done waiting */ /* Before sleeping, check for user interrupts (Ctrl+C, statement_timeout, etc.) */
break; CHECK_FOR_INTERRUPTS();
}
/* sleep for a while, before rechecking the job status */ /* Sleep 1 second before re-checking the job status */
CHECK_FOR_INTERRUPTS(); const long delay_ms = 1000;
const long delay_ms = 1000; (void) WaitLatch(MyLatch,
(void) WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, delay_ms,
delay_ms, WAIT_EVENT_PG_SLEEP);
WAIT_EVENT_PG_SLEEP);
ResetLatch(MyLatch); ResetLatch(MyLatch);
} }
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
MemoryContextDelete(waitContext); MemoryContextDelete(waitContext);
}
PG_CATCH();
{
/*
* If we get here, the user canceled the statement or an ERROR occurred.
* We forcibly cancel the job so that it doesn't remain running in background.
* This ensures no "zombie" shard moves or leftover replication slots.
*/
/* Switch out of the waitContext so we can safely do cleanup in TopMemoryContext. */
MemoryContextSwitchTo(TopMemoryContext);
/* Attempt to cancel the job; if it's already in a terminal state, that's okay. */
citus_cancel_job(jobid);
/* Re-throw the original error so Postgres knows this statement was canceled. */
PG_RE_THROW();
}
PG_END_TRY();
} }
/*
* citus_cancel_job - forcibly cancels a background job by setting its status
* to BACKGROUND_JOB_STATUS_CANCELLED in memory, then updates the
* pg_dist_background_job table.
*/
bool
citus_cancel_job(int64 jobId)
{
BackgroundJob *job = GetBackgroundJobByJobId(jobId);
if (!job)
{
/* No such job ID */
return false;
}
/*
* If the job is already in a terminal state, or is scheduled,
* decide if you want to do anything special.
* But typically you just check if it is still "running" or "cancelling".
*/
if (IsBackgroundJobStatusTerminal(job->state))
{
return false;
}
/* Mark job as canceled, then update the catalog */
job->state = BACKGROUND_JOB_STATUS_CANCELLED;
/* This projects the tasks states into the job's new state,
* and updates the row in pg_dist_background_job.
*/
UpdateBackgroundJob(job->jobid);
return true;
}
/* /*
* citus_task_wait_internal implements the waiting on a task for reuse in other areas where * citus_task_wait_internal implements the waiting on a task for reuse in other areas where
* we want to wait on tasks. * we want to wait on tasks.