diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 911880dc7..e00a7a85b 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -244,85 +244,110 @@ citus_task_wait(PG_FUNCTION_ARGS) /* - * citus_job_wait_internal implements the waiting on a job for reuse in other areas where - * we want to wait on jobs. eg the background rebalancer. + * citus_job_wait_internal implements the waiting on a job, e.g. for the background + * rebalancer. If desiredStatus is provided, we throw an error if we reach a + * different terminal state that can never transition to the desired state. * - * When a desiredStatus is provided it will provide an error when a different state is - * reached and the state cannot ever reach the desired state anymore. + * With the PG_TRY/PG_CATCH block, if the user cancels this SQL statement + * (Ctrl+C, statement_timeout, etc.), we will cancel the job in progress + * so it doesn't remain running in background. */ void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus) { - /* - * Since we are wait polling we will actually allocate memory on every poll. To make - * sure we don't put unneeded pressure on the memory we create a context that we clear - * every iteration. - */ - MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext, - "JobsWaitContext", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContext oldContext = MemoryContextSwitchTo(waitContext); - - while (true) + PG_TRY(); { - MemoryContextReset(waitContext); + /* + * Since we are wait polling, we actually allocate memory on every poll. To avoid + * putting unneeded pressure on memory, we create a context that we reset + * every iteration. + */ + MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext, + "JobsWaitContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContext oldContext = MemoryContextSwitchTo(waitContext); - BackgroundJob *job = GetBackgroundJobByJobId(jobid); - if (!job) + while (true) { - ereport(ERROR, (errmsg("no job found for job with jobid: %ld", jobid))); - } + MemoryContextReset(waitContext); - if (desiredStatus && job->state == *desiredStatus) - { - /* job has reached its desired status, done waiting */ - break; - } - - if (IsBackgroundJobStatusTerminal(job->state)) - { - if (desiredStatus) + BackgroundJob *job = GetBackgroundJobByJobId(jobid); + if (!job) { - /* - * We have reached a terminal state, which is not the desired state we - * were waiting for, otherwise we would have escaped earlier. Since it is - * a terminal state we know that we can never reach the desired state. - */ - - Oid reachedStatusOid = BackgroundJobStatusOid(job->state); - Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out, - reachedStatusOid); - char *reachedStatusName = DatumGetCString(reachedStatusNameDatum); - - Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus); - Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, - desiredStatusOid); - char *desiredStatusName = DatumGetCString(desiredStatusNameDatum); - ereport(ERROR, - (errmsg("Job reached terminal state \"%s\" instead of desired " - "state \"%s\"", reachedStatusName, desiredStatusName))); + (errmsg("no job found for job with jobid: %ld", jobid))); } - /* job has reached its terminal state, done waiting */ - break; + /* If we have a desiredStatus and we've reached it, we're done */ + if (desiredStatus && job->state == *desiredStatus) + { + break; + } + + /* If the job is in a terminal state (e.g. SUCCEEDED, FAILED, or CANCELED), + * but not the desired state, throw an error or stop waiting. + */ + if (IsBackgroundJobStatusTerminal(job->state)) + { + if (desiredStatus) + { + Oid reachedStatusOid = BackgroundJobStatusOid(job->state); + Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out, + reachedStatusOid); + char *reachedStatusName = DatumGetCString(reachedStatusNameDatum); + + Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus); + Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, + desiredStatusOid); + char *desiredStatusName = DatumGetCString(desiredStatusNameDatum); + + ereport(ERROR, + (errmsg( + "Job reached terminal state \"%s\" instead of desired " + "state \"%s\"", reachedStatusName, + desiredStatusName))); + } + + /* Otherwise, if no desiredStatus was given, we accept this terminal state. */ + break; + } + + /* Before sleeping, check for user interrupts (Ctrl+C, statement_timeout, etc.) */ + CHECK_FOR_INTERRUPTS(); + + /* Sleep 1 second before re-checking the job status */ + const long delay_ms = 1000; + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + delay_ms, + WAIT_EVENT_PG_SLEEP); + + ResetLatch(MyLatch); } - /* sleep for a while, before rechecking the job status */ - CHECK_FOR_INTERRUPTS(); - const long delay_ms = 1000; - (void) WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - delay_ms, - WAIT_EVENT_PG_SLEEP); - - ResetLatch(MyLatch); + MemoryContextSwitchTo(oldContext); + MemoryContextDelete(waitContext); } + PG_CATCH(); + { + /* + * If we get here, the user canceled the statement or an ERROR occurred. + * We forcibly cancel the job so that it doesn't remain running in background. + * This ensures no "zombie" shard moves or leftover replication slots. + */ - MemoryContextSwitchTo(oldContext); - MemoryContextDelete(waitContext); + /* Switch out of the waitContext so we can safely do cleanup in TopMemoryContext. */ + MemoryContextSwitchTo(TopMemoryContext); + + /* Attempt to cancel the job; if it's already in a terminal state, that's okay. */ + (void) DirectFunctionCall1(citus_job_cancel, Int64GetDatum(jobid)); + + /* Re-throw the original error so Postgres knows this statement was canceled. */ + PG_RE_THROW(); + } + PG_END_TRY(); } diff --git a/src/test/regress/expected/issue_7896.out b/src/test/regress/expected/issue_7896.out new file mode 100644 index 000000000..7eee1898c --- /dev/null +++ b/src/test/regress/expected/issue_7896.out @@ -0,0 +1,155 @@ +--------------------------------------------------------------------- +-- Regression Test: Simulate zombie replication slot when +-- citus_rebalance_wait() is canceled. +-- +-- In the buggy behavior, canceling citus_rebalance_wait() +-- (via a short statement_timeout or Ctrl+C) leaves behind an active logical +-- replication slot on a worker. This, in turn, prevents DROP DATABASE +-- (with FORCE) from succeeding. +-- +-- With the fix applied, the underlying rebalance job is canceled, +-- no zombie slot remains. +--------------------------------------------------------------------- +--------------------------------------------------------------------- +-- 1) Setup the test environment. +--------------------------------------------------------------------- +SET citus.next_shard_id TO 17560000; +CREATE SCHEMA issue_7896; +SET search_path TO issue_7896; +SET client_min_messages TO ERROR; +--------------------------------------------------------------------- +-- 2) Set cluster parameters and initialize environment. +--------------------------------------------------------------------- +SET citus.shard_replication_factor TO 2; +SET citus.enable_repartition_joins TO ON; +-- For faster background task processing, set a short background task queue interval. +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +--------------------------------------------------------------------- +-- 3) Create a distributed table. +--------------------------------------------------------------------- +DROP TABLE IF EXISTS t1; +CREATE TABLE t1 (a int PRIMARY KEY); +SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +--------------------------------------------------------------------- +-- 4) Insert enough data so that a rebalance has measurable work. +--------------------------------------------------------------------- +INSERT INTO t1 + SELECT generate_series(1, 1000000); +--------------------------------------------------------------------- +-- 5) Verify that a rebalance on a balanced cluster is a no-op. +--------------------------------------------------------------------- +SELECT 1 FROM citus_rebalance_start(); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- Expected: NOTICE "No moves available for rebalancing". +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +-- Expected: WARNING "no ongoing rebalance that can be waited on". +--------------------------------------------------------------------- +-- 6) Force a shard movement so that a rebalance job is scheduled. +-- Remove and re-add a worker using a parameter placeholder. +--------------------------------------------------------------------- +SELECT citus_remove_node('localhost', :worker_2_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_node('localhost', :worker_2_port); + citus_add_node +--------------------------------------------------------------------- + 30 +(1 row) + +--------------------------------------------------------------------- +-- 7) Start a rebalance job that will do actual work. +--------------------------------------------------------------------- +SELECT citus_rebalance_start( + rebalance_strategy := 'by_disk_size', + shard_transfer_mode := 'force_logical' + ); + citus_rebalance_start +--------------------------------------------------------------------- + 1 +(1 row) + +-- Expected: Notice that moves are scheduled as a background job. +--------------------------------------------------------------------- +-- 8) Attempt to wait on the rebalance with a short timeout so that the wait +-- is canceled. The PG_CATCH block in citus_job_wait_internal should then +-- cancel the underlying job (cleaning up temporary replication slots). +--------------------------------------------------------------------- +SET statement_timeout = '2s'; +SET client_min_messages TO NOTICE; +DO $$ +BEGIN + BEGIN + RAISE NOTICE 'Waiting on rebalance with a 2-second timeout...'; + -- Public function citus_rebalance_wait() takes no arguments. + PERFORM citus_rebalance_wait(); + EXCEPTION + WHEN query_canceled THEN + RAISE NOTICE 'Rebalance wait canceled as expected'; + -- Your fix should cancel the underlying rebalance job. + END; +END; +$$ LANGUAGE plpgsql; +NOTICE: Waiting on rebalance with a 2-second timeout... +CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE +NOTICE: Rebalance wait canceled as expected +CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE +SET statement_timeout = '0'; +SET client_min_messages TO ERROR; +--------------------------------------------------------------------- +-- 9) Cleanup orphaned background resources (if any). +--------------------------------------------------------------------- +CALL citus_cleanup_orphaned_resources(); +--------------------------------------------------------------------- +-- 10) Traverse nodes and check for active replication slots. +-- +-- Connect to the coordinator and worker nodes, then query for replication slots. +-- Expected Outcome (with the fix applied): No active replication slots. +--------------------------------------------------------------------- +\c - - - :master_port +SELECT * FROM pg_replication_slots; + slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced +--------------------------------------------------------------------- +(0 rows) + +\c - - - :worker_1_port +SELECT * FROM pg_replication_slots; + slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced +--------------------------------------------------------------------- +(0 rows) + +\c - - - :worker_2_port +SELECT * FROM pg_replication_slots; + slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced +--------------------------------------------------------------------- +(0 rows) + +--------------------------------------------------------------------- +-- 11) Cleanup: Drop the test schema. +--------------------------------------------------------------------- +\c - - - :master_port +SET search_path TO issue_7896; +SET client_min_messages TO WARNING; +DROP SCHEMA IF EXISTS issue_7896 CASCADE; diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 6654b4ab0..af9e57ffc 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -69,6 +69,7 @@ test: local_shard_execution_dropped_column test: metadata_sync_helpers test: issue_6592 +test: issue_7896 test: executor_local_failure # test that no tests leaked intermediate results. This should always be last diff --git a/src/test/regress/sql/issue_7896.sql b/src/test/regress/sql/issue_7896.sql new file mode 100644 index 000000000..c003c352c --- /dev/null +++ b/src/test/regress/sql/issue_7896.sql @@ -0,0 +1,119 @@ +--------------------------------------------------------------------- +-- Regression Test: Simulate zombie replication slot when +-- citus_rebalance_wait() is canceled. +-- +-- In the buggy behavior, canceling citus_rebalance_wait() +-- (via a short statement_timeout or Ctrl+C) leaves behind an active logical +-- replication slot on a worker. This, in turn, prevents DROP DATABASE +-- (with FORCE) from succeeding. +-- +-- With the fix applied, the underlying rebalance job is canceled, +-- no zombie slot remains. +--------------------------------------------------------------------- + +--------------------------------------------------------------------- +-- 1) Setup the test environment. +--------------------------------------------------------------------- +SET citus.next_shard_id TO 17560000; +CREATE SCHEMA issue_7896; +SET search_path TO issue_7896; +SET client_min_messages TO ERROR; + +--------------------------------------------------------------------- +-- 2) Set cluster parameters and initialize environment. +--------------------------------------------------------------------- +SET citus.shard_replication_factor TO 2; +SET citus.enable_repartition_joins TO ON; +-- For faster background task processing, set a short background task queue interval. +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +SELECT pg_reload_conf(); + +--------------------------------------------------------------------- +-- 3) Create a distributed table. +--------------------------------------------------------------------- +DROP TABLE IF EXISTS t1; +CREATE TABLE t1 (a int PRIMARY KEY); +SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none'); + +--------------------------------------------------------------------- +-- 4) Insert enough data so that a rebalance has measurable work. +--------------------------------------------------------------------- +INSERT INTO t1 + SELECT generate_series(1, 1000000); + +--------------------------------------------------------------------- +-- 5) Verify that a rebalance on a balanced cluster is a no-op. +--------------------------------------------------------------------- +SELECT 1 FROM citus_rebalance_start(); +-- Expected: NOTICE "No moves available for rebalancing". +SELECT citus_rebalance_wait(); +-- Expected: WARNING "no ongoing rebalance that can be waited on". + +--------------------------------------------------------------------- +-- 6) Force a shard movement so that a rebalance job is scheduled. +-- Remove and re-add a worker using a parameter placeholder. +--------------------------------------------------------------------- +SELECT citus_remove_node('localhost', :worker_2_port); +SELECT citus_add_node('localhost', :worker_2_port); + +--------------------------------------------------------------------- +-- 7) Start a rebalance job that will do actual work. +--------------------------------------------------------------------- +SELECT citus_rebalance_start( + rebalance_strategy := 'by_disk_size', + shard_transfer_mode := 'force_logical' + ); +-- Expected: Notice that moves are scheduled as a background job. + +--------------------------------------------------------------------- +-- 8) Attempt to wait on the rebalance with a short timeout so that the wait +-- is canceled. The PG_CATCH block in citus_job_wait_internal should then +-- cancel the underlying job (cleaning up temporary replication slots). +--------------------------------------------------------------------- +SET statement_timeout = '2s'; +SET client_min_messages TO NOTICE; +DO $$ +BEGIN + BEGIN + RAISE NOTICE 'Waiting on rebalance with a 2-second timeout...'; + -- Public function citus_rebalance_wait() takes no arguments. + PERFORM citus_rebalance_wait(); + EXCEPTION + WHEN query_canceled THEN + RAISE NOTICE 'Rebalance wait canceled as expected'; + -- Your fix should cancel the underlying rebalance job. + END; +END; +$$ LANGUAGE plpgsql; +SET statement_timeout = '0'; +SET client_min_messages TO ERROR; + +--------------------------------------------------------------------- +-- 9) Cleanup orphaned background resources (if any). +--------------------------------------------------------------------- +CALL citus_cleanup_orphaned_resources(); + +--------------------------------------------------------------------- +-- 10) Traverse nodes and check for active replication slots. +-- +-- Connect to the coordinator and worker nodes, then query for replication slots. +-- Expected Outcome (with the fix applied): No active replication slots. +--------------------------------------------------------------------- +\c - - - :master_port +SELECT * FROM pg_replication_slots; + +\c - - - :worker_1_port +SELECT * FROM pg_replication_slots; + +\c - - - :worker_2_port +SELECT * FROM pg_replication_slots; + + +--------------------------------------------------------------------- +-- 11) Cleanup: Drop the test schema. +--------------------------------------------------------------------- +\c - - - :master_port +SET search_path TO issue_7896; +SET client_min_messages TO WARNING; +DROP SCHEMA IF EXISTS issue_7896 CASCADE; +