From 3c23f897e46cdaf82121687368f9bf5f10b6d956 Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Wed, 2 Apr 2025 13:18:22 +0000 Subject: [PATCH] Implement job cancellation mechanism in background job processing --- .../distributed/utils/background_jobs.c | 186 ++++++++++++------ 1 file changed, 123 insertions(+), 63 deletions(-) diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 911880dc7..ba17e68a6 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -117,6 +117,7 @@ static void QueueMonitorSigTermHandler(SIGNAL_ARGS); static void QueueMonitorSigIntHandler(SIGNAL_ARGS); static void QueueMonitorSigHupHandler(SIGNAL_ARGS); static void DecrementParallelTaskCountForNodesInvolved(BackgroundTask *task); +static bool citus_cancel_job(int64 jobid); /* flags set by signal handlers */ static volatile sig_atomic_t GotSigterm = false; @@ -244,88 +245,147 @@ citus_task_wait(PG_FUNCTION_ARGS) /* - * citus_job_wait_internal implements the waiting on a job for reuse in other areas where - * we want to wait on jobs. eg the background rebalancer. + * citus_job_wait_internal implements the waiting on a job, e.g. for the background + * rebalancer. If desiredStatus is provided, we throw an error if we reach a + * different terminal state that can never transition to the desired state. * - * When a desiredStatus is provided it will provide an error when a different state is - * reached and the state cannot ever reach the desired state anymore. + * With the PG_TRY/PG_CATCH block, if the user cancels this SQL statement + * (Ctrl+C, statement_timeout, etc.), we will cancel the job in progress + * so it doesn't remain running in background. */ void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus) { - /* - * Since we are wait polling we will actually allocate memory on every poll. To make - * sure we don't put unneeded pressure on the memory we create a context that we clear - * every iteration. - */ - MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext, - "JobsWaitContext", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContext oldContext = MemoryContextSwitchTo(waitContext); + PG_TRY(); + { + /* + * Since we are wait polling, we actually allocate memory on every poll. To avoid + * putting unneeded pressure on memory, we create a context that we reset + * every iteration. + */ + MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext, + "JobsWaitContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContext oldContext = MemoryContextSwitchTo(waitContext); - while (true) - { - MemoryContextReset(waitContext); + while (true) + { + MemoryContextReset(waitContext); - BackgroundJob *job = GetBackgroundJobByJobId(jobid); - if (!job) - { - ereport(ERROR, (errmsg("no job found for job with jobid: %ld", jobid))); - } + BackgroundJob *job = GetBackgroundJobByJobId(jobid); + if (!job) + { + ereport(ERROR, + (errmsg("no job found for job with jobid: %ld", jobid))); + } - if (desiredStatus && job->state == *desiredStatus) - { - /* job has reached its desired status, done waiting */ - break; - } + /* If we have a desiredStatus and we've reached it, we're done */ + if (desiredStatus && job->state == *desiredStatus) + { + break; + } - if (IsBackgroundJobStatusTerminal(job->state)) - { - if (desiredStatus) - { - /* - * We have reached a terminal state, which is not the desired state we - * were waiting for, otherwise we would have escaped earlier. Since it is - * a terminal state we know that we can never reach the desired state. - */ + /* If the job is in a terminal state (e.g. SUCCEEDED, FAILED, or CANCELED), + * but not the desired state, throw an error or stop waiting. + */ + if (IsBackgroundJobStatusTerminal(job->state)) + { + if (desiredStatus) + { + Oid reachedStatusOid = BackgroundJobStatusOid(job->state); + Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out, reachedStatusOid); + char *reachedStatusName = DatumGetCString(reachedStatusNameDatum); - Oid reachedStatusOid = BackgroundJobStatusOid(job->state); - Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out, - reachedStatusOid); - char *reachedStatusName = DatumGetCString(reachedStatusNameDatum); + Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus); + Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, desiredStatusOid); + char *desiredStatusName = DatumGetCString(desiredStatusNameDatum); - Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus); - Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, - desiredStatusOid); - char *desiredStatusName = DatumGetCString(desiredStatusNameDatum); + ereport(ERROR, + (errmsg("Job reached terminal state \"%s\" instead of desired " + "state \"%s\"", reachedStatusName, desiredStatusName))); + } - ereport(ERROR, - (errmsg("Job reached terminal state \"%s\" instead of desired " - "state \"%s\"", reachedStatusName, desiredStatusName))); - } + /* Otherwise, if no desiredStatus was given, we accept this terminal state. */ + break; + } - /* job has reached its terminal state, done waiting */ - break; - } + /* Before sleeping, check for user interrupts (Ctrl+C, statement_timeout, etc.) */ + CHECK_FOR_INTERRUPTS(); - /* sleep for a while, before rechecking the job status */ - CHECK_FOR_INTERRUPTS(); - const long delay_ms = 1000; - (void) WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - delay_ms, - WAIT_EVENT_PG_SLEEP); + /* Sleep 1 second before re-checking the job status */ + const long delay_ms = 1000; + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + delay_ms, + WAIT_EVENT_PG_SLEEP); - ResetLatch(MyLatch); - } + ResetLatch(MyLatch); + } - MemoryContextSwitchTo(oldContext); - MemoryContextDelete(waitContext); + MemoryContextSwitchTo(oldContext); + MemoryContextDelete(waitContext); + } + PG_CATCH(); + { + /* + * If we get here, the user canceled the statement or an ERROR occurred. + * We forcibly cancel the job so that it doesn't remain running in background. + * This ensures no "zombie" shard moves or leftover replication slots. + */ + + /* Switch out of the waitContext so we can safely do cleanup in TopMemoryContext. */ + MemoryContextSwitchTo(TopMemoryContext); + + /* Attempt to cancel the job; if it's already in a terminal state, that's okay. */ + citus_cancel_job(jobid); + + /* Re-throw the original error so Postgres knows this statement was canceled. */ + PG_RE_THROW(); + } + PG_END_TRY(); } +/* + * citus_cancel_job - forcibly cancels a background job by setting its status + * to BACKGROUND_JOB_STATUS_CANCELLED in memory, then updates the + * pg_dist_background_job table. + */ +bool +citus_cancel_job(int64 jobId) +{ + BackgroundJob *job = GetBackgroundJobByJobId(jobId); + if (!job) + { + /* No such job ID */ + return false; + } + + /* + * If the job is already in a terminal state, or is scheduled, + * decide if you want to do anything special. + * But typically you just check if it is still "running" or "cancelling". + */ + if (IsBackgroundJobStatusTerminal(job->state)) + { + return false; + } + + /* Mark job as canceled, then update the catalog */ + job->state = BACKGROUND_JOB_STATUS_CANCELLED; + + /* This projects the tasks states into the job's new state, + * and updates the row in pg_dist_background_job. + */ + UpdateBackgroundJob(job->jobid); + + return true; +} + + + /* * citus_task_wait_internal implements the waiting on a task for reuse in other areas where * we want to wait on tasks.