From 3c23f897e46cdaf82121687368f9bf5f10b6d956 Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Wed, 2 Apr 2025 13:18:22 +0000 Subject: [PATCH 01/11] Implement job cancellation mechanism in background job processing --- .../distributed/utils/background_jobs.c | 186 ++++++++++++------ 1 file changed, 123 insertions(+), 63 deletions(-) diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 911880dc7..ba17e68a6 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -117,6 +117,7 @@ static void QueueMonitorSigTermHandler(SIGNAL_ARGS); static void QueueMonitorSigIntHandler(SIGNAL_ARGS); static void QueueMonitorSigHupHandler(SIGNAL_ARGS); static void DecrementParallelTaskCountForNodesInvolved(BackgroundTask *task); +static bool citus_cancel_job(int64 jobid); /* flags set by signal handlers */ static volatile sig_atomic_t GotSigterm = false; @@ -244,88 +245,147 @@ citus_task_wait(PG_FUNCTION_ARGS) /* - * citus_job_wait_internal implements the waiting on a job for reuse in other areas where - * we want to wait on jobs. eg the background rebalancer. + * citus_job_wait_internal implements the waiting on a job, e.g. for the background + * rebalancer. If desiredStatus is provided, we throw an error if we reach a + * different terminal state that can never transition to the desired state. * - * When a desiredStatus is provided it will provide an error when a different state is - * reached and the state cannot ever reach the desired state anymore. + * With the PG_TRY/PG_CATCH block, if the user cancels this SQL statement + * (Ctrl+C, statement_timeout, etc.), we will cancel the job in progress + * so it doesn't remain running in background. */ void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus) { - /* - * Since we are wait polling we will actually allocate memory on every poll. To make - * sure we don't put unneeded pressure on the memory we create a context that we clear - * every iteration. - */ - MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext, - "JobsWaitContext", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContext oldContext = MemoryContextSwitchTo(waitContext); + PG_TRY(); + { + /* + * Since we are wait polling, we actually allocate memory on every poll. To avoid + * putting unneeded pressure on memory, we create a context that we reset + * every iteration. + */ + MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext, + "JobsWaitContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContext oldContext = MemoryContextSwitchTo(waitContext); - while (true) - { - MemoryContextReset(waitContext); + while (true) + { + MemoryContextReset(waitContext); - BackgroundJob *job = GetBackgroundJobByJobId(jobid); - if (!job) - { - ereport(ERROR, (errmsg("no job found for job with jobid: %ld", jobid))); - } + BackgroundJob *job = GetBackgroundJobByJobId(jobid); + if (!job) + { + ereport(ERROR, + (errmsg("no job found for job with jobid: %ld", jobid))); + } - if (desiredStatus && job->state == *desiredStatus) - { - /* job has reached its desired status, done waiting */ - break; - } + /* If we have a desiredStatus and we've reached it, we're done */ + if (desiredStatus && job->state == *desiredStatus) + { + break; + } - if (IsBackgroundJobStatusTerminal(job->state)) - { - if (desiredStatus) - { - /* - * We have reached a terminal state, which is not the desired state we - * were waiting for, otherwise we would have escaped earlier. Since it is - * a terminal state we know that we can never reach the desired state. - */ + /* If the job is in a terminal state (e.g. SUCCEEDED, FAILED, or CANCELED), + * but not the desired state, throw an error or stop waiting. + */ + if (IsBackgroundJobStatusTerminal(job->state)) + { + if (desiredStatus) + { + Oid reachedStatusOid = BackgroundJobStatusOid(job->state); + Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out, reachedStatusOid); + char *reachedStatusName = DatumGetCString(reachedStatusNameDatum); - Oid reachedStatusOid = BackgroundJobStatusOid(job->state); - Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out, - reachedStatusOid); - char *reachedStatusName = DatumGetCString(reachedStatusNameDatum); + Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus); + Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, desiredStatusOid); + char *desiredStatusName = DatumGetCString(desiredStatusNameDatum); - Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus); - Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, - desiredStatusOid); - char *desiredStatusName = DatumGetCString(desiredStatusNameDatum); + ereport(ERROR, + (errmsg("Job reached terminal state \"%s\" instead of desired " + "state \"%s\"", reachedStatusName, desiredStatusName))); + } - ereport(ERROR, - (errmsg("Job reached terminal state \"%s\" instead of desired " - "state \"%s\"", reachedStatusName, desiredStatusName))); - } + /* Otherwise, if no desiredStatus was given, we accept this terminal state. */ + break; + } - /* job has reached its terminal state, done waiting */ - break; - } + /* Before sleeping, check for user interrupts (Ctrl+C, statement_timeout, etc.) */ + CHECK_FOR_INTERRUPTS(); - /* sleep for a while, before rechecking the job status */ - CHECK_FOR_INTERRUPTS(); - const long delay_ms = 1000; - (void) WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - delay_ms, - WAIT_EVENT_PG_SLEEP); + /* Sleep 1 second before re-checking the job status */ + const long delay_ms = 1000; + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + delay_ms, + WAIT_EVENT_PG_SLEEP); - ResetLatch(MyLatch); - } + ResetLatch(MyLatch); + } - MemoryContextSwitchTo(oldContext); - MemoryContextDelete(waitContext); + MemoryContextSwitchTo(oldContext); + MemoryContextDelete(waitContext); + } + PG_CATCH(); + { + /* + * If we get here, the user canceled the statement or an ERROR occurred. + * We forcibly cancel the job so that it doesn't remain running in background. + * This ensures no "zombie" shard moves or leftover replication slots. + */ + + /* Switch out of the waitContext so we can safely do cleanup in TopMemoryContext. */ + MemoryContextSwitchTo(TopMemoryContext); + + /* Attempt to cancel the job; if it's already in a terminal state, that's okay. */ + citus_cancel_job(jobid); + + /* Re-throw the original error so Postgres knows this statement was canceled. */ + PG_RE_THROW(); + } + PG_END_TRY(); } +/* + * citus_cancel_job - forcibly cancels a background job by setting its status + * to BACKGROUND_JOB_STATUS_CANCELLED in memory, then updates the + * pg_dist_background_job table. + */ +bool +citus_cancel_job(int64 jobId) +{ + BackgroundJob *job = GetBackgroundJobByJobId(jobId); + if (!job) + { + /* No such job ID */ + return false; + } + + /* + * If the job is already in a terminal state, or is scheduled, + * decide if you want to do anything special. + * But typically you just check if it is still "running" or "cancelling". + */ + if (IsBackgroundJobStatusTerminal(job->state)) + { + return false; + } + + /* Mark job as canceled, then update the catalog */ + job->state = BACKGROUND_JOB_STATUS_CANCELLED; + + /* This projects the tasks states into the job's new state, + * and updates the row in pg_dist_background_job. + */ + UpdateBackgroundJob(job->jobid); + + return true; +} + + + /* * citus_task_wait_internal implements the waiting on a task for reuse in other areas where * we want to wait on tasks. From 85a1c730ce1647d3a3dde4fbc0f2ee976fc9012a Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Fri, 4 Apr 2025 11:02:32 +0000 Subject: [PATCH 02/11] Refactor job cancellation logic in background job processing for improved clarity and maintainability --- .../distributed/utils/background_jobs.c | 199 +++++++++--------- 1 file changed, 101 insertions(+), 98 deletions(-) diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index ba17e68a6..cfa5baca5 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -256,95 +256,99 @@ citus_task_wait(PG_FUNCTION_ARGS) void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus) { - PG_TRY(); - { - /* - * Since we are wait polling, we actually allocate memory on every poll. To avoid - * putting unneeded pressure on memory, we create a context that we reset - * every iteration. - */ - MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext, - "JobsWaitContext", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContext oldContext = MemoryContextSwitchTo(waitContext); + PG_TRY(); + { + /* + * Since we are wait polling, we actually allocate memory on every poll. To avoid + * putting unneeded pressure on memory, we create a context that we reset + * every iteration. + */ + MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext, + "JobsWaitContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContext oldContext = MemoryContextSwitchTo(waitContext); - while (true) - { - MemoryContextReset(waitContext); + while (true) + { + MemoryContextReset(waitContext); - BackgroundJob *job = GetBackgroundJobByJobId(jobid); - if (!job) - { - ereport(ERROR, - (errmsg("no job found for job with jobid: %ld", jobid))); - } + BackgroundJob *job = GetBackgroundJobByJobId(jobid); + if (!job) + { + ereport(ERROR, + (errmsg("no job found for job with jobid: %ld", jobid))); + } - /* If we have a desiredStatus and we've reached it, we're done */ - if (desiredStatus && job->state == *desiredStatus) - { - break; - } + /* If we have a desiredStatus and we've reached it, we're done */ + if (desiredStatus && job->state == *desiredStatus) + { + break; + } - /* If the job is in a terminal state (e.g. SUCCEEDED, FAILED, or CANCELED), - * but not the desired state, throw an error or stop waiting. - */ - if (IsBackgroundJobStatusTerminal(job->state)) - { - if (desiredStatus) - { - Oid reachedStatusOid = BackgroundJobStatusOid(job->state); - Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out, reachedStatusOid); - char *reachedStatusName = DatumGetCString(reachedStatusNameDatum); + /* If the job is in a terminal state (e.g. SUCCEEDED, FAILED, or CANCELED), + * but not the desired state, throw an error or stop waiting. + */ + if (IsBackgroundJobStatusTerminal(job->state)) + { + if (desiredStatus) + { + Oid reachedStatusOid = BackgroundJobStatusOid(job->state); + Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out, + reachedStatusOid); + char *reachedStatusName = DatumGetCString(reachedStatusNameDatum); - Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus); - Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, desiredStatusOid); - char *desiredStatusName = DatumGetCString(desiredStatusNameDatum); + Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus); + Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, + desiredStatusOid); + char *desiredStatusName = DatumGetCString(desiredStatusNameDatum); - ereport(ERROR, - (errmsg("Job reached terminal state \"%s\" instead of desired " - "state \"%s\"", reachedStatusName, desiredStatusName))); - } + ereport(ERROR, + (errmsg( + "Job reached terminal state \"%s\" instead of desired " + "state \"%s\"", reachedStatusName, + desiredStatusName))); + } - /* Otherwise, if no desiredStatus was given, we accept this terminal state. */ - break; - } + /* Otherwise, if no desiredStatus was given, we accept this terminal state. */ + break; + } - /* Before sleeping, check for user interrupts (Ctrl+C, statement_timeout, etc.) */ - CHECK_FOR_INTERRUPTS(); + /* Before sleeping, check for user interrupts (Ctrl+C, statement_timeout, etc.) */ + CHECK_FOR_INTERRUPTS(); - /* Sleep 1 second before re-checking the job status */ - const long delay_ms = 1000; - (void) WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - delay_ms, - WAIT_EVENT_PG_SLEEP); + /* Sleep 1 second before re-checking the job status */ + const long delay_ms = 1000; + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + delay_ms, + WAIT_EVENT_PG_SLEEP); - ResetLatch(MyLatch); - } + ResetLatch(MyLatch); + } - MemoryContextSwitchTo(oldContext); - MemoryContextDelete(waitContext); - } - PG_CATCH(); - { - /* - * If we get here, the user canceled the statement or an ERROR occurred. - * We forcibly cancel the job so that it doesn't remain running in background. - * This ensures no "zombie" shard moves or leftover replication slots. - */ + MemoryContextSwitchTo(oldContext); + MemoryContextDelete(waitContext); + } + PG_CATCH(); + { + /* + * If we get here, the user canceled the statement or an ERROR occurred. + * We forcibly cancel the job so that it doesn't remain running in background. + * This ensures no "zombie" shard moves or leftover replication slots. + */ - /* Switch out of the waitContext so we can safely do cleanup in TopMemoryContext. */ - MemoryContextSwitchTo(TopMemoryContext); + /* Switch out of the waitContext so we can safely do cleanup in TopMemoryContext. */ + MemoryContextSwitchTo(TopMemoryContext); - /* Attempt to cancel the job; if it's already in a terminal state, that's okay. */ - citus_cancel_job(jobid); + /* Attempt to cancel the job; if it's already in a terminal state, that's okay. */ + citus_cancel_job(jobid); - /* Re-throw the original error so Postgres knows this statement was canceled. */ - PG_RE_THROW(); - } - PG_END_TRY(); + /* Re-throw the original error so Postgres knows this statement was canceled. */ + PG_RE_THROW(); + } + PG_END_TRY(); } @@ -356,36 +360,35 @@ citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus) bool citus_cancel_job(int64 jobId) { - BackgroundJob *job = GetBackgroundJobByJobId(jobId); - if (!job) - { - /* No such job ID */ - return false; - } + BackgroundJob *job = GetBackgroundJobByJobId(jobId); + if (!job) + { + /* No such job ID */ + return false; + } - /* - * If the job is already in a terminal state, or is scheduled, - * decide if you want to do anything special. - * But typically you just check if it is still "running" or "cancelling". - */ - if (IsBackgroundJobStatusTerminal(job->state)) - { - return false; - } + /* + * If the job is already in a terminal state, or is scheduled, + * decide if you want to do anything special. + * But typically you just check if it is still "running" or "cancelling". + */ + if (IsBackgroundJobStatusTerminal(job->state)) + { + return false; + } - /* Mark job as canceled, then update the catalog */ - job->state = BACKGROUND_JOB_STATUS_CANCELLED; + /* Mark job as canceled, then update the catalog */ + job->state = BACKGROUND_JOB_STATUS_CANCELLED; - /* This projects the tasks states into the job's new state, - * and updates the row in pg_dist_background_job. - */ - UpdateBackgroundJob(job->jobid); + /* This projects the tasks states into the job's new state, + * and updates the row in pg_dist_background_job. + */ + UpdateBackgroundJob(job->jobid); - return true; + return true; } - /* * citus_task_wait_internal implements the waiting on a task for reuse in other areas where * we want to wait on tasks. From 62ef1c353628f115930270aaa150bf881d9db4ac Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Fri, 4 Apr 2025 11:09:32 +0000 Subject: [PATCH 03/11] Replace job cancellation call with DirectFunctionCall for improved error handling --- src/backend/distributed/utils/background_jobs.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index cfa5baca5..b63efb5e6 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -343,7 +343,7 @@ citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus) MemoryContextSwitchTo(TopMemoryContext); /* Attempt to cancel the job; if it's already in a terminal state, that's okay. */ - citus_cancel_job(jobid); + (void) DirectFunctionCall1(citus_job_cancel, Int64GetDatum(jobid)); /* Re-throw the original error so Postgres knows this statement was canceled. */ PG_RE_THROW(); From 3ed1284794f8f60691ed9e68577f9f7476c2aff9 Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Fri, 4 Apr 2025 11:17:07 +0000 Subject: [PATCH 04/11] Remove unused citus_cancel_job function to streamline background job processing --- .../distributed/utils/background_jobs.c | 38 ------------------- 1 file changed, 38 deletions(-) diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index b63efb5e6..e00a7a85b 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -117,7 +117,6 @@ static void QueueMonitorSigTermHandler(SIGNAL_ARGS); static void QueueMonitorSigIntHandler(SIGNAL_ARGS); static void QueueMonitorSigHupHandler(SIGNAL_ARGS); static void DecrementParallelTaskCountForNodesInvolved(BackgroundTask *task); -static bool citus_cancel_job(int64 jobid); /* flags set by signal handlers */ static volatile sig_atomic_t GotSigterm = false; @@ -352,43 +351,6 @@ citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus) } -/* - * citus_cancel_job - forcibly cancels a background job by setting its status - * to BACKGROUND_JOB_STATUS_CANCELLED in memory, then updates the - * pg_dist_background_job table. - */ -bool -citus_cancel_job(int64 jobId) -{ - BackgroundJob *job = GetBackgroundJobByJobId(jobId); - if (!job) - { - /* No such job ID */ - return false; - } - - /* - * If the job is already in a terminal state, or is scheduled, - * decide if you want to do anything special. - * But typically you just check if it is still "running" or "cancelling". - */ - if (IsBackgroundJobStatusTerminal(job->state)) - { - return false; - } - - /* Mark job as canceled, then update the catalog */ - job->state = BACKGROUND_JOB_STATUS_CANCELLED; - - /* This projects the tasks states into the job's new state, - * and updates the row in pg_dist_background_job. - */ - UpdateBackgroundJob(job->jobid); - - return true; -} - - /* * citus_task_wait_internal implements the waiting on a task for reuse in other areas where * we want to wait on tasks. From 5b6b7b847e278834254fdcd67341c51ac49aede7 Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Fri, 4 Apr 2025 11:34:07 +0000 Subject: [PATCH 05/11] Add regression tests for job cancellation behavior in background job processing --- citus-tools | 1 + src/test/regress/expected/issue_7896.out | 46 ++++++++++++++++++++++++ src/test/regress/sql/issue_7896.sql | 45 +++++++++++++++++++++++ 3 files changed, 92 insertions(+) create mode 160000 citus-tools create mode 100644 src/test/regress/expected/issue_7896.out create mode 100644 src/test/regress/sql/issue_7896.sql diff --git a/citus-tools b/citus-tools new file mode 160000 index 000000000..3376bd684 --- /dev/null +++ b/citus-tools @@ -0,0 +1 @@ +Subproject commit 3376bd6845f0614908ed304f5033bd644c82d3bf diff --git a/src/test/regress/expected/issue_7896.out b/src/test/regress/expected/issue_7896.out new file mode 100644 index 000000000..09f19c94a --- /dev/null +++ b/src/test/regress/expected/issue_7896.out @@ -0,0 +1,46 @@ +CREATE SCHEMA issue_7896; +SET search_path TO issue_7896; +-- Create a temporary table to simulate the background job catalog. +-- (In production this would be the actual catalog table.) +CREATE TEMP TABLE pg_dist_background_job +( + job_id int8 PRIMARY KEY, + job_state text, + started_at timestamptz, + finished_at timestamptz +); +-- Insert a dummy job record with job_state set to 'running' +INSERT INTO pg_dist_background_job (job_id, job_state, started_at) +VALUES (1001, 'running', now()); +-- Set a short statement timeout so that citus_rebalance_wait times out quickly. +SET statement_timeout = '1000ms'; +DO $$ +BEGIN + BEGIN + -- Call the wait function. + -- Note: The public function citus_rebalance_wait() takes no arguments. + PERFORM citus_rebalance_wait(); + EXCEPTION + WHEN query_canceled THEN + RAISE NOTICE 'Query canceled as expected'; + -- Swallow the error so the transaction continues. + END; +END; +$$ LANGUAGE plpgsql; +WARNING: no ongoing rebalance that can be waited on +CONTEXT: SQL statement "SELECT citus_rebalance_wait()" +PL/pgSQL function inline_code_block line XX at PERFORM +-- Reset the statement timeout for subsequent queries. +SET statement_timeout = '0'; +-- Verify that the job's state has been updated to 'cancelled' +-- (the expected outcome after a cancellation). +SELECT job_state +FROM pg_dist_background_job +WHERE job_id = 1001; + job_state +--------------------------------------------------------------------- + running +(1 row) + +SET client_min_messages TO WARNING; +DROP SCHEMA issue_7896 CASCADE; diff --git a/src/test/regress/sql/issue_7896.sql b/src/test/regress/sql/issue_7896.sql new file mode 100644 index 000000000..12d5ff341 --- /dev/null +++ b/src/test/regress/sql/issue_7896.sql @@ -0,0 +1,45 @@ +CREATE SCHEMA issue_7896; +SET search_path TO issue_7896; + +-- Create a temporary table to simulate the background job catalog. +-- (In production this would be the actual catalog table.) +CREATE TEMP TABLE pg_dist_background_job +( + job_id int8 PRIMARY KEY, + job_state text, + started_at timestamptz, + finished_at timestamptz +); + +-- Insert a dummy job record with job_state set to 'running' +INSERT INTO pg_dist_background_job (job_id, job_state, started_at) +VALUES (1001, 'running', now()); + +-- Set a short statement timeout so that citus_rebalance_wait times out quickly. +SET statement_timeout = '1000ms'; + +DO $$ +BEGIN + BEGIN + -- Call the wait function. + -- Note: The public function citus_rebalance_wait() takes no arguments. + PERFORM citus_rebalance_wait(); + EXCEPTION + WHEN query_canceled THEN + RAISE NOTICE 'Query canceled as expected'; + -- Swallow the error so the transaction continues. + END; +END; +$$ LANGUAGE plpgsql; + +-- Reset the statement timeout for subsequent queries. +SET statement_timeout = '0'; + +-- Verify that the job's state has been updated to 'cancelled' +-- (the expected outcome after a cancellation). +SELECT job_state +FROM pg_dist_background_job +WHERE job_id = 1001; + +SET client_min_messages TO WARNING; +DROP SCHEMA issue_7896 CASCADE; From 23a4671a68c5d2332229e17cde04028410abc686 Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Tue, 8 Apr 2025 10:58:33 +0000 Subject: [PATCH 06/11] Add regression test for zombie replication slot cleanup during job cancellation --- src/test/regress/expected/issue_7896.out | 179 +++++++++++++++++++---- src/test/regress/multi_schedule | 2 +- src/test/regress/sql/issue_7896.sql | 126 ++++++++++++---- 3 files changed, 247 insertions(+), 60 deletions(-) diff --git a/src/test/regress/expected/issue_7896.out b/src/test/regress/expected/issue_7896.out index 09f19c94a..cbb1c31dc 100644 --- a/src/test/regress/expected/issue_7896.out +++ b/src/test/regress/expected/issue_7896.out @@ -1,46 +1,161 @@ +--------------------------------------------------------------------- +-- Regression Test: Simulate zombie replication slot when +-- citus_rebalance_wait() is canceled. +-- +-- In the buggy behavior, canceling citus_rebalance_wait() +-- (via a short statement_timeout or Ctrl+C) leaves behind an active logical +-- replication slot on a worker. This, in turn, prevents DROP DATABASE +-- (with FORCE) from succeeding. +-- +-- With your fix applied, the underlying rebalance job is canceled, +-- no zombie slot remains, and DROP DATABASE succeeds. +--------------------------------------------------------------------- +--------------------------------------------------------------------- +-- 1) Create an isolated schema for this test. +--------------------------------------------------------------------- CREATE SCHEMA issue_7896; SET search_path TO issue_7896; --- Create a temporary table to simulate the background job catalog. --- (In production this would be the actual catalog table.) -CREATE TEMP TABLE pg_dist_background_job -( - job_id int8 PRIMARY KEY, - job_state text, - started_at timestamptz, - finished_at timestamptz -); --- Insert a dummy job record with job_state set to 'running' -INSERT INTO pg_dist_background_job (job_id, job_state, started_at) -VALUES (1001, 'running', now()); --- Set a short statement timeout so that citus_rebalance_wait times out quickly. -SET statement_timeout = '1000ms'; +--------------------------------------------------------------------- +-- 2) Set cluster parameters and initialize environment. +--------------------------------------------------------------------- +-- We assume a coordinator with at least two workers. +-- Set replication factor to 2 and enable repartition joins. +SET citus.shard_replication_factor TO 2; +SET citus.enable_repartition_joins TO ON; +-- For faster background task processing, set a short background task queue interval. +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +--------------------------------------------------------------------- +-- 3) Create a distributed table. +--------------------------------------------------------------------- +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +CREATE TABLE t1 (a int PRIMARY KEY); +SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +--------------------------------------------------------------------- +-- 4) Insert enough data so that a rebalance has measurable work. +--------------------------------------------------------------------- +INSERT INTO t1 + SELECT generate_series(1, 1000000); +--------------------------------------------------------------------- +-- 5) Verify that a rebalance on a balanced cluster is a no-op. +--------------------------------------------------------------------- +SELECT 1 FROM citus_rebalance_start(); +NOTICE: No moves available for rebalancing + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- Expected: NOTICE "No moves available for rebalancing". +SELECT citus_rebalance_wait(); +WARNING: no ongoing rebalance that can be waited on + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +-- Expected: WARNING "no ongoing rebalance that can be waited on". +--------------------------------------------------------------------- +-- 6) Force a shard movement so that a rebalance job is scheduled. +-- Remove and re-add a worker using a parameter placeholder. +--------------------------------------------------------------------- +SELECT citus_remove_node('localhost', :worker_2_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_node('localhost', :worker_2_port); + citus_add_node +--------------------------------------------------------------------- + 30 +(1 row) + +--------------------------------------------------------------------- +-- 7) Start a rebalance job that will do actual work. +--------------------------------------------------------------------- +SELECT citus_rebalance_start( + rebalance_strategy := 'by_disk_size', + shard_transfer_mode := 'force_logical' + ); +NOTICE: Scheduled 2 moves as job xxx +DETAIL: Rebalance scheduled as background job +HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status(); + citus_rebalance_start +--------------------------------------------------------------------- + 1 +(1 row) + +-- Expected: Notice that moves are scheduled as a background job. +-- (You may verify with: SELECT * FROM citus_rebalance_status();) +--------------------------------------------------------------------- +-- 8) Attempt to wait on the rebalance with a short timeout so that the wait +-- is canceled. The PG_CATCH block in citus_job_wait_internal should then +-- cancel the underlying job (cleaning up temporary replication slots). +--------------------------------------------------------------------- +SET statement_timeout = '2s'; DO $$ BEGIN BEGIN - -- Call the wait function. - -- Note: The public function citus_rebalance_wait() takes no arguments. + RAISE NOTICE 'Waiting on rebalance with a 2-second timeout...'; + -- Public function citus_rebalance_wait() takes no arguments. PERFORM citus_rebalance_wait(); EXCEPTION WHEN query_canceled THEN - RAISE NOTICE 'Query canceled as expected'; - -- Swallow the error so the transaction continues. + RAISE NOTICE 'Rebalance wait canceled as expected'; + -- Your fix should cancel the underlying rebalance job. END; END; $$ LANGUAGE plpgsql; -WARNING: no ongoing rebalance that can be waited on -CONTEXT: SQL statement "SELECT citus_rebalance_wait()" -PL/pgSQL function inline_code_block line XX at PERFORM --- Reset the statement timeout for subsequent queries. +NOTICE: Waiting on rebalance with a 2-second timeout... +CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE +NOTICE: Rebalance wait canceled as expected +CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE SET statement_timeout = '0'; --- Verify that the job's state has been updated to 'cancelled' --- (the expected outcome after a cancellation). -SELECT job_state -FROM pg_dist_background_job -WHERE job_id = 1001; - job_state --------------------------------------------------------------------- - running -(1 row) +-- 9) Cleanup orphaned background resources (if any). +--------------------------------------------------------------------- +CALL citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 5 orphaned resources +--------------------------------------------------------------------- +-- 12) Traverse nodes and check for active replication slots. +-- +-- Connect to the coordinator and worker nodes, then query for replication slots. +-- Expected Outcome (with the fix applied): No active replication slots. +--------------------------------------------------------------------- +\c - - - :master_port +SELECT * FROM pg_replication_slots; + slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced +--------------------------------------------------------------------- +(0 rows) -SET client_min_messages TO WARNING; -DROP SCHEMA issue_7896 CASCADE; +\c - - - :worker_1_port +SELECT * FROM pg_replication_slots; + slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced +--------------------------------------------------------------------- +(0 rows) + +\c - - - :worker_2_port +SELECT * FROM pg_replication_slots; + slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced +--------------------------------------------------------------------- +(0 rows) + +--------------------------------------------------------------------- +-- 11) Cleanup: Drop the test schema. +--------------------------------------------------------------------- +\c - - - :master_port +SET search_path TO issue_7896; +DROP SCHEMA IF EXISTS issue_7896 CASCADE; +NOTICE: drop cascades to table t1 diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 3d7bd6e98..0a5c6a422 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -104,7 +104,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement test: binary_protocol test: alter_table_set_access_method test: alter_distributed_table -test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891 +test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891 issue_7896 test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes diff --git a/src/test/regress/sql/issue_7896.sql b/src/test/regress/sql/issue_7896.sql index 12d5ff341..679a5373d 100644 --- a/src/test/regress/sql/issue_7896.sql +++ b/src/test/regress/sql/issue_7896.sql @@ -1,45 +1,117 @@ +--------------------------------------------------------------------- +-- Regression Test: Simulate zombie replication slot when +-- citus_rebalance_wait() is canceled. +-- +-- In the buggy behavior, canceling citus_rebalance_wait() +-- (via a short statement_timeout or Ctrl+C) leaves behind an active logical +-- replication slot on a worker. This, in turn, prevents DROP DATABASE +-- (with FORCE) from succeeding. +-- +-- With your fix applied, the underlying rebalance job is canceled, +-- no zombie slot remains, and DROP DATABASE succeeds. +--------------------------------------------------------------------- + +--------------------------------------------------------------------- +-- 1) Create an isolated schema for this test. +--------------------------------------------------------------------- CREATE SCHEMA issue_7896; SET search_path TO issue_7896; --- Create a temporary table to simulate the background job catalog. --- (In production this would be the actual catalog table.) -CREATE TEMP TABLE pg_dist_background_job -( - job_id int8 PRIMARY KEY, - job_state text, - started_at timestamptz, - finished_at timestamptz -); +--------------------------------------------------------------------- +-- 2) Set cluster parameters and initialize environment. +--------------------------------------------------------------------- +-- We assume a coordinator with at least two workers. +-- Set replication factor to 2 and enable repartition joins. +SET citus.shard_replication_factor TO 2; +SET citus.enable_repartition_joins TO ON; +-- For faster background task processing, set a short background task queue interval. +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +SELECT pg_reload_conf(); --- Insert a dummy job record with job_state set to 'running' -INSERT INTO pg_dist_background_job (job_id, job_state, started_at) -VALUES (1001, 'running', now()); +--------------------------------------------------------------------- +-- 3) Create a distributed table. +--------------------------------------------------------------------- +DROP TABLE IF EXISTS t1; +CREATE TABLE t1 (a int PRIMARY KEY); +SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none'); --- Set a short statement timeout so that citus_rebalance_wait times out quickly. -SET statement_timeout = '1000ms'; +--------------------------------------------------------------------- +-- 4) Insert enough data so that a rebalance has measurable work. +--------------------------------------------------------------------- +INSERT INTO t1 + SELECT generate_series(1, 1000000); +--------------------------------------------------------------------- +-- 5) Verify that a rebalance on a balanced cluster is a no-op. +--------------------------------------------------------------------- +SELECT 1 FROM citus_rebalance_start(); +-- Expected: NOTICE "No moves available for rebalancing". +SELECT citus_rebalance_wait(); +-- Expected: WARNING "no ongoing rebalance that can be waited on". + +--------------------------------------------------------------------- +-- 6) Force a shard movement so that a rebalance job is scheduled. +-- Remove and re-add a worker using a parameter placeholder. +--------------------------------------------------------------------- +SELECT citus_remove_node('localhost', :worker_2_port); +SELECT citus_add_node('localhost', :worker_2_port); + +--------------------------------------------------------------------- +-- 7) Start a rebalance job that will do actual work. +--------------------------------------------------------------------- +SELECT citus_rebalance_start( + rebalance_strategy := 'by_disk_size', + shard_transfer_mode := 'force_logical' + ); +-- Expected: Notice that moves are scheduled as a background job. +-- (You may verify with: SELECT * FROM citus_rebalance_status();) + +--------------------------------------------------------------------- +-- 8) Attempt to wait on the rebalance with a short timeout so that the wait +-- is canceled. The PG_CATCH block in citus_job_wait_internal should then +-- cancel the underlying job (cleaning up temporary replication slots). +--------------------------------------------------------------------- +SET statement_timeout = '2s'; DO $$ BEGIN BEGIN - -- Call the wait function. - -- Note: The public function citus_rebalance_wait() takes no arguments. + RAISE NOTICE 'Waiting on rebalance with a 2-second timeout...'; + -- Public function citus_rebalance_wait() takes no arguments. PERFORM citus_rebalance_wait(); EXCEPTION WHEN query_canceled THEN - RAISE NOTICE 'Query canceled as expected'; - -- Swallow the error so the transaction continues. + RAISE NOTICE 'Rebalance wait canceled as expected'; + -- Your fix should cancel the underlying rebalance job. END; END; $$ LANGUAGE plpgsql; - --- Reset the statement timeout for subsequent queries. SET statement_timeout = '0'; --- Verify that the job's state has been updated to 'cancelled' --- (the expected outcome after a cancellation). -SELECT job_state -FROM pg_dist_background_job -WHERE job_id = 1001; +--------------------------------------------------------------------- +-- 9) Cleanup orphaned background resources (if any). +--------------------------------------------------------------------- +CALL citus_cleanup_orphaned_resources(); + +--------------------------------------------------------------------- +-- 10) Traverse nodes and check for active replication slots. +-- +-- Connect to the coordinator and worker nodes, then query for replication slots. +-- Expected Outcome (with the fix applied): No active replication slots. +--------------------------------------------------------------------- +\c - - - :master_port +SELECT * FROM pg_replication_slots; + +\c - - - :worker_1_port +SELECT * FROM pg_replication_slots; + +\c - - - :worker_2_port +SELECT * FROM pg_replication_slots; + + +--------------------------------------------------------------------- +-- 11) Cleanup: Drop the test schema. +--------------------------------------------------------------------- +\c - - - :master_port +SET search_path TO issue_7896; +DROP SCHEMA IF EXISTS issue_7896 CASCADE; -SET client_min_messages TO WARNING; -DROP SCHEMA issue_7896 CASCADE; From a057fce149380d59ec0f601164756ad3fafa1e84 Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Tue, 8 Apr 2025 11:54:03 +0000 Subject: [PATCH 07/11] Remove unused citus-tools subproject and update regression test output for clarity --- citus-tools | 1 - src/test/regress/expected/issue_7896.out | 13 +++++-------- src/test/regress/sql/issue_7896.sql | 12 +++++------- 3 files changed, 10 insertions(+), 16 deletions(-) delete mode 160000 citus-tools diff --git a/citus-tools b/citus-tools deleted file mode 160000 index 3376bd684..000000000 --- a/citus-tools +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 3376bd6845f0614908ed304f5033bd644c82d3bf diff --git a/src/test/regress/expected/issue_7896.out b/src/test/regress/expected/issue_7896.out index cbb1c31dc..23dcc9856 100644 --- a/src/test/regress/expected/issue_7896.out +++ b/src/test/regress/expected/issue_7896.out @@ -7,19 +7,17 @@ -- replication slot on a worker. This, in turn, prevents DROP DATABASE -- (with FORCE) from succeeding. -- --- With your fix applied, the underlying rebalance job is canceled, --- no zombie slot remains, and DROP DATABASE succeeds. +-- With the fix applied, the underlying rebalance job is canceled, +-- no zombie slot remains. --------------------------------------------------------------------- --------------------------------------------------------------------- --- 1) Create an isolated schema for this test. +-- 1) Setup the test environment. --------------------------------------------------------------------- CREATE SCHEMA issue_7896; SET search_path TO issue_7896; --------------------------------------------------------------------- -- 2) Set cluster parameters and initialize environment. --------------------------------------------------------------------- --- We assume a coordinator with at least two workers. --- Set replication factor to 2 and enable repartition joins. SET citus.shard_replication_factor TO 2; SET citus.enable_repartition_joins TO ON; -- For faster background task processing, set a short background task queue interval. @@ -98,7 +96,6 @@ HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status(); (1 row) -- Expected: Notice that moves are scheduled as a background job. --- (You may verify with: SELECT * FROM citus_rebalance_status();) --------------------------------------------------------------------- -- 8) Attempt to wait on the rebalance with a short timeout so that the wait -- is canceled. The PG_CATCH block in citus_job_wait_internal should then @@ -129,7 +126,7 @@ SET statement_timeout = '0'; CALL citus_cleanup_orphaned_resources(); NOTICE: cleaned up 5 orphaned resources --------------------------------------------------------------------- --- 12) Traverse nodes and check for active replication slots. +-- 10) Traverse nodes and check for active replication slots. -- -- Connect to the coordinator and worker nodes, then query for replication slots. -- Expected Outcome (with the fix applied): No active replication slots. @@ -157,5 +154,5 @@ SELECT * FROM pg_replication_slots; --------------------------------------------------------------------- \c - - - :master_port SET search_path TO issue_7896; +SET client_min_messages TO WARNING; DROP SCHEMA IF EXISTS issue_7896 CASCADE; -NOTICE: drop cascades to table t1 diff --git a/src/test/regress/sql/issue_7896.sql b/src/test/regress/sql/issue_7896.sql index 679a5373d..2447aba81 100644 --- a/src/test/regress/sql/issue_7896.sql +++ b/src/test/regress/sql/issue_7896.sql @@ -1,5 +1,5 @@ --------------------------------------------------------------------- --- Regression Test: Simulate zombie replication slot when +-- Regression Test: Simulate zombie replication slot when -- citus_rebalance_wait() is canceled. -- -- In the buggy behavior, canceling citus_rebalance_wait() @@ -7,12 +7,12 @@ -- replication slot on a worker. This, in turn, prevents DROP DATABASE -- (with FORCE) from succeeding. -- --- With your fix applied, the underlying rebalance job is canceled, --- no zombie slot remains, and DROP DATABASE succeeds. +-- With the fix applied, the underlying rebalance job is canceled, +-- no zombie slot remains. --------------------------------------------------------------------- --------------------------------------------------------------------- --- 1) Create an isolated schema for this test. +-- 1) Setup the test environment. --------------------------------------------------------------------- CREATE SCHEMA issue_7896; SET search_path TO issue_7896; @@ -20,8 +20,6 @@ SET search_path TO issue_7896; --------------------------------------------------------------------- -- 2) Set cluster parameters and initialize environment. --------------------------------------------------------------------- --- We assume a coordinator with at least two workers. --- Set replication factor to 2 and enable repartition joins. SET citus.shard_replication_factor TO 2; SET citus.enable_repartition_joins TO ON; -- For faster background task processing, set a short background task queue interval. @@ -64,7 +62,6 @@ SELECT citus_rebalance_start( shard_transfer_mode := 'force_logical' ); -- Expected: Notice that moves are scheduled as a background job. --- (You may verify with: SELECT * FROM citus_rebalance_status();) --------------------------------------------------------------------- -- 8) Attempt to wait on the rebalance with a short timeout so that the wait @@ -113,5 +110,6 @@ SELECT * FROM pg_replication_slots; --------------------------------------------------------------------- \c - - - :master_port SET search_path TO issue_7896; +SET client_min_messages TO WARNING; DROP SCHEMA IF EXISTS issue_7896 CASCADE; From bc6a19ab42867827cb405e0c966ecc9fc1a3ca87 Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Tue, 8 Apr 2025 11:59:03 +0000 Subject: [PATCH 08/11] Update test cases to separate issue_7896 for improved clarity --- src/test/regress/multi_schedule | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 0a5c6a422..3c81b52dd 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -104,7 +104,8 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement test: binary_protocol test: alter_table_set_access_method test: alter_distributed_table -test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891 issue_7896 +test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891 +test: issue_7896 test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes From 2f7f86530528500a979b4dc90e44b2b22b7a0fe7 Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Tue, 8 Apr 2025 12:20:18 +0000 Subject: [PATCH 09/11] Add issue_7896 test cases to regression suite for improved coverage --- src/test/regress/multi_mx_schedule | 1 + src/test/regress/multi_schedule | 2 +- src/test/regress/sql/issue_7896.sql | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 6654b4ab0..af9e57ffc 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -69,6 +69,7 @@ test: local_shard_execution_dropped_column test: metadata_sync_helpers test: issue_6592 +test: issue_7896 test: executor_local_failure # test that no tests leaked intermediate results. This should always be last diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 3c81b52dd..9af62f014 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -105,7 +105,7 @@ test: binary_protocol test: alter_table_set_access_method test: alter_distributed_table test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891 -test: issue_7896 + test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes diff --git a/src/test/regress/sql/issue_7896.sql b/src/test/regress/sql/issue_7896.sql index 2447aba81..04ae22bd0 100644 --- a/src/test/regress/sql/issue_7896.sql +++ b/src/test/regress/sql/issue_7896.sql @@ -14,6 +14,7 @@ --------------------------------------------------------------------- -- 1) Setup the test environment. --------------------------------------------------------------------- +SET citus.next_shard_id TO 17560000; CREATE SCHEMA issue_7896; SET search_path TO issue_7896; From f6eef651f961e73af74bba32acb47c60917e7e9b Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Tue, 8 Apr 2025 12:20:43 +0000 Subject: [PATCH 10/11] Remove redundant whitespace in multi_schedule test file for improved readability --- src/test/regress/multi_schedule | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 9af62f014..3d7bd6e98 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -105,7 +105,6 @@ test: binary_protocol test: alter_table_set_access_method test: alter_distributed_table test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891 - test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes From 121142257873493f749a4ac64429bbb1446dd0be Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Tue, 8 Apr 2025 12:34:19 +0000 Subject: [PATCH 11/11] Update expected output and SQL scripts for issue_7896 to adjust client_min_messages settings for improved clarity --- src/test/regress/expected/issue_7896.out | 11 ++++------- src/test/regress/sql/issue_7896.sql | 3 +++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/test/regress/expected/issue_7896.out b/src/test/regress/expected/issue_7896.out index 23dcc9856..7eee1898c 100644 --- a/src/test/regress/expected/issue_7896.out +++ b/src/test/regress/expected/issue_7896.out @@ -13,8 +13,10 @@ --------------------------------------------------------------------- -- 1) Setup the test environment. --------------------------------------------------------------------- +SET citus.next_shard_id TO 17560000; CREATE SCHEMA issue_7896; SET search_path TO issue_7896; +SET client_min_messages TO ERROR; --------------------------------------------------------------------- -- 2) Set cluster parameters and initialize environment. --------------------------------------------------------------------- @@ -32,7 +34,6 @@ SELECT pg_reload_conf(); -- 3) Create a distributed table. --------------------------------------------------------------------- DROP TABLE IF EXISTS t1; -NOTICE: table "t1" does not exist, skipping CREATE TABLE t1 (a int PRIMARY KEY); SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none'); create_distributed_table @@ -49,7 +50,6 @@ INSERT INTO t1 -- 5) Verify that a rebalance on a balanced cluster is a no-op. --------------------------------------------------------------------- SELECT 1 FROM citus_rebalance_start(); -NOTICE: No moves available for rebalancing ?column? --------------------------------------------------------------------- 1 @@ -57,7 +57,6 @@ NOTICE: No moves available for rebalancing -- Expected: NOTICE "No moves available for rebalancing". SELECT citus_rebalance_wait(); -WARNING: no ongoing rebalance that can be waited on citus_rebalance_wait --------------------------------------------------------------------- @@ -87,9 +86,6 @@ SELECT citus_rebalance_start( rebalance_strategy := 'by_disk_size', shard_transfer_mode := 'force_logical' ); -NOTICE: Scheduled 2 moves as job xxx -DETAIL: Rebalance scheduled as background job -HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status(); citus_rebalance_start --------------------------------------------------------------------- 1 @@ -102,6 +98,7 @@ HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status(); -- cancel the underlying job (cleaning up temporary replication slots). --------------------------------------------------------------------- SET statement_timeout = '2s'; +SET client_min_messages TO NOTICE; DO $$ BEGIN BEGIN @@ -120,11 +117,11 @@ CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE NOTICE: Rebalance wait canceled as expected CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE SET statement_timeout = '0'; +SET client_min_messages TO ERROR; --------------------------------------------------------------------- -- 9) Cleanup orphaned background resources (if any). --------------------------------------------------------------------- CALL citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 5 orphaned resources --------------------------------------------------------------------- -- 10) Traverse nodes and check for active replication slots. -- diff --git a/src/test/regress/sql/issue_7896.sql b/src/test/regress/sql/issue_7896.sql index 04ae22bd0..c003c352c 100644 --- a/src/test/regress/sql/issue_7896.sql +++ b/src/test/regress/sql/issue_7896.sql @@ -17,6 +17,7 @@ SET citus.next_shard_id TO 17560000; CREATE SCHEMA issue_7896; SET search_path TO issue_7896; +SET client_min_messages TO ERROR; --------------------------------------------------------------------- -- 2) Set cluster parameters and initialize environment. @@ -70,6 +71,7 @@ SELECT citus_rebalance_start( -- cancel the underlying job (cleaning up temporary replication slots). --------------------------------------------------------------------- SET statement_timeout = '2s'; +SET client_min_messages TO NOTICE; DO $$ BEGIN BEGIN @@ -84,6 +86,7 @@ BEGIN END; $$ LANGUAGE plpgsql; SET statement_timeout = '0'; +SET client_min_messages TO ERROR; --------------------------------------------------------------------- -- 9) Cleanup orphaned background resources (if any).