From 85a1c730ce1647d3a3dde4fbc0f2ee976fc9012a Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Fri, 4 Apr 2025 11:02:32 +0000 Subject: [PATCH] Refactor job cancellation logic in background job processing for improved clarity and maintainability --- .../distributed/utils/background_jobs.c | 199 +++++++++--------- 1 file changed, 101 insertions(+), 98 deletions(-) diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index ba17e68a6..cfa5baca5 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -256,95 +256,99 @@ citus_task_wait(PG_FUNCTION_ARGS) void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus) { - PG_TRY(); - { - /* - * Since we are wait polling, we actually allocate memory on every poll. To avoid - * putting unneeded pressure on memory, we create a context that we reset - * every iteration. - */ - MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext, - "JobsWaitContext", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContext oldContext = MemoryContextSwitchTo(waitContext); + PG_TRY(); + { + /* + * Since we are wait polling, we actually allocate memory on every poll. To avoid + * putting unneeded pressure on memory, we create a context that we reset + * every iteration. + */ + MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext, + "JobsWaitContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContext oldContext = MemoryContextSwitchTo(waitContext); - while (true) - { - MemoryContextReset(waitContext); + while (true) + { + MemoryContextReset(waitContext); - BackgroundJob *job = GetBackgroundJobByJobId(jobid); - if (!job) - { - ereport(ERROR, - (errmsg("no job found for job with jobid: %ld", jobid))); - } + BackgroundJob *job = GetBackgroundJobByJobId(jobid); + if (!job) + { + ereport(ERROR, + (errmsg("no job found for job with jobid: %ld", jobid))); + } - /* If we have a desiredStatus and we've reached it, we're done */ - if (desiredStatus && job->state == *desiredStatus) - { - break; - } + /* If we have a desiredStatus and we've reached it, we're done */ + if (desiredStatus && job->state == *desiredStatus) + { + break; + } - /* If the job is in a terminal state (e.g. SUCCEEDED, FAILED, or CANCELED), - * but not the desired state, throw an error or stop waiting. - */ - if (IsBackgroundJobStatusTerminal(job->state)) - { - if (desiredStatus) - { - Oid reachedStatusOid = BackgroundJobStatusOid(job->state); - Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out, reachedStatusOid); - char *reachedStatusName = DatumGetCString(reachedStatusNameDatum); + /* If the job is in a terminal state (e.g. SUCCEEDED, FAILED, or CANCELED), + * but not the desired state, throw an error or stop waiting. + */ + if (IsBackgroundJobStatusTerminal(job->state)) + { + if (desiredStatus) + { + Oid reachedStatusOid = BackgroundJobStatusOid(job->state); + Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out, + reachedStatusOid); + char *reachedStatusName = DatumGetCString(reachedStatusNameDatum); - Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus); - Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, desiredStatusOid); - char *desiredStatusName = DatumGetCString(desiredStatusNameDatum); + Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus); + Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, + desiredStatusOid); + char *desiredStatusName = DatumGetCString(desiredStatusNameDatum); - ereport(ERROR, - (errmsg("Job reached terminal state \"%s\" instead of desired " - "state \"%s\"", reachedStatusName, desiredStatusName))); - } + ereport(ERROR, + (errmsg( + "Job reached terminal state \"%s\" instead of desired " + "state \"%s\"", reachedStatusName, + desiredStatusName))); + } - /* Otherwise, if no desiredStatus was given, we accept this terminal state. */ - break; - } + /* Otherwise, if no desiredStatus was given, we accept this terminal state. */ + break; + } - /* Before sleeping, check for user interrupts (Ctrl+C, statement_timeout, etc.) */ - CHECK_FOR_INTERRUPTS(); + /* Before sleeping, check for user interrupts (Ctrl+C, statement_timeout, etc.) */ + CHECK_FOR_INTERRUPTS(); - /* Sleep 1 second before re-checking the job status */ - const long delay_ms = 1000; - (void) WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - delay_ms, - WAIT_EVENT_PG_SLEEP); + /* Sleep 1 second before re-checking the job status */ + const long delay_ms = 1000; + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + delay_ms, + WAIT_EVENT_PG_SLEEP); - ResetLatch(MyLatch); - } + ResetLatch(MyLatch); + } - MemoryContextSwitchTo(oldContext); - MemoryContextDelete(waitContext); - } - PG_CATCH(); - { - /* - * If we get here, the user canceled the statement or an ERROR occurred. - * We forcibly cancel the job so that it doesn't remain running in background. - * This ensures no "zombie" shard moves or leftover replication slots. - */ + MemoryContextSwitchTo(oldContext); + MemoryContextDelete(waitContext); + } + PG_CATCH(); + { + /* + * If we get here, the user canceled the statement or an ERROR occurred. + * We forcibly cancel the job so that it doesn't remain running in background. + * This ensures no "zombie" shard moves or leftover replication slots. + */ - /* Switch out of the waitContext so we can safely do cleanup in TopMemoryContext. */ - MemoryContextSwitchTo(TopMemoryContext); + /* Switch out of the waitContext so we can safely do cleanup in TopMemoryContext. */ + MemoryContextSwitchTo(TopMemoryContext); - /* Attempt to cancel the job; if it's already in a terminal state, that's okay. */ - citus_cancel_job(jobid); + /* Attempt to cancel the job; if it's already in a terminal state, that's okay. */ + citus_cancel_job(jobid); - /* Re-throw the original error so Postgres knows this statement was canceled. */ - PG_RE_THROW(); - } - PG_END_TRY(); + /* Re-throw the original error so Postgres knows this statement was canceled. */ + PG_RE_THROW(); + } + PG_END_TRY(); } @@ -356,36 +360,35 @@ citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus) bool citus_cancel_job(int64 jobId) { - BackgroundJob *job = GetBackgroundJobByJobId(jobId); - if (!job) - { - /* No such job ID */ - return false; - } + BackgroundJob *job = GetBackgroundJobByJobId(jobId); + if (!job) + { + /* No such job ID */ + return false; + } - /* - * If the job is already in a terminal state, or is scheduled, - * decide if you want to do anything special. - * But typically you just check if it is still "running" or "cancelling". - */ - if (IsBackgroundJobStatusTerminal(job->state)) - { - return false; - } + /* + * If the job is already in a terminal state, or is scheduled, + * decide if you want to do anything special. + * But typically you just check if it is still "running" or "cancelling". + */ + if (IsBackgroundJobStatusTerminal(job->state)) + { + return false; + } - /* Mark job as canceled, then update the catalog */ - job->state = BACKGROUND_JOB_STATUS_CANCELLED; + /* Mark job as canceled, then update the catalog */ + job->state = BACKGROUND_JOB_STATUS_CANCELLED; - /* This projects the tasks states into the job's new state, - * and updates the row in pg_dist_background_job. - */ - UpdateBackgroundJob(job->jobid); + /* This projects the tasks states into the job's new state, + * and updates the row in pg_dist_background_job. + */ + UpdateBackgroundJob(job->jobid); - return true; + return true; } - /* * citus_task_wait_internal implements the waiting on a task for reuse in other areas where * we want to wait on tasks.