diff --git a/src/test/regress/expected/issue_7896.out b/src/test/regress/expected/issue_7896.out index 09f19c94a..cbb1c31dc 100644 --- a/src/test/regress/expected/issue_7896.out +++ b/src/test/regress/expected/issue_7896.out @@ -1,46 +1,161 @@ +--------------------------------------------------------------------- +-- Regression Test: Simulate zombie replication slot when +-- citus_rebalance_wait() is canceled. +-- +-- In the buggy behavior, canceling citus_rebalance_wait() +-- (via a short statement_timeout or Ctrl+C) leaves behind an active logical +-- replication slot on a worker. This, in turn, prevents DROP DATABASE +-- (with FORCE) from succeeding. +-- +-- With your fix applied, the underlying rebalance job is canceled, +-- no zombie slot remains, and DROP DATABASE succeeds. +--------------------------------------------------------------------- +--------------------------------------------------------------------- +-- 1) Create an isolated schema for this test. +--------------------------------------------------------------------- CREATE SCHEMA issue_7896; SET search_path TO issue_7896; --- Create a temporary table to simulate the background job catalog. --- (In production this would be the actual catalog table.) -CREATE TEMP TABLE pg_dist_background_job -( - job_id int8 PRIMARY KEY, - job_state text, - started_at timestamptz, - finished_at timestamptz -); --- Insert a dummy job record with job_state set to 'running' -INSERT INTO pg_dist_background_job (job_id, job_state, started_at) -VALUES (1001, 'running', now()); --- Set a short statement timeout so that citus_rebalance_wait times out quickly. -SET statement_timeout = '1000ms'; +--------------------------------------------------------------------- +-- 2) Set cluster parameters and initialize environment. +--------------------------------------------------------------------- +-- We assume a coordinator with at least two workers. +-- Set replication factor to 2 and enable repartition joins. +SET citus.shard_replication_factor TO 2; +SET citus.enable_repartition_joins TO ON; +-- For faster background task processing, set a short background task queue interval. +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +--------------------------------------------------------------------- +-- 3) Create a distributed table. +--------------------------------------------------------------------- +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +CREATE TABLE t1 (a int PRIMARY KEY); +SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +--------------------------------------------------------------------- +-- 4) Insert enough data so that a rebalance has measurable work. +--------------------------------------------------------------------- +INSERT INTO t1 + SELECT generate_series(1, 1000000); +--------------------------------------------------------------------- +-- 5) Verify that a rebalance on a balanced cluster is a no-op. +--------------------------------------------------------------------- +SELECT 1 FROM citus_rebalance_start(); +NOTICE: No moves available for rebalancing + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- Expected: NOTICE "No moves available for rebalancing". +SELECT citus_rebalance_wait(); +WARNING: no ongoing rebalance that can be waited on + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +-- Expected: WARNING "no ongoing rebalance that can be waited on". +--------------------------------------------------------------------- +-- 6) Force a shard movement so that a rebalance job is scheduled. +-- Remove and re-add a worker using a parameter placeholder. +--------------------------------------------------------------------- +SELECT citus_remove_node('localhost', :worker_2_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_node('localhost', :worker_2_port); + citus_add_node +--------------------------------------------------------------------- + 30 +(1 row) + +--------------------------------------------------------------------- +-- 7) Start a rebalance job that will do actual work. +--------------------------------------------------------------------- +SELECT citus_rebalance_start( + rebalance_strategy := 'by_disk_size', + shard_transfer_mode := 'force_logical' + ); +NOTICE: Scheduled 2 moves as job xxx +DETAIL: Rebalance scheduled as background job +HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status(); + citus_rebalance_start +--------------------------------------------------------------------- + 1 +(1 row) + +-- Expected: Notice that moves are scheduled as a background job. +-- (You may verify with: SELECT * FROM citus_rebalance_status();) +--------------------------------------------------------------------- +-- 8) Attempt to wait on the rebalance with a short timeout so that the wait +-- is canceled. The PG_CATCH block in citus_job_wait_internal should then +-- cancel the underlying job (cleaning up temporary replication slots). +--------------------------------------------------------------------- +SET statement_timeout = '2s'; DO $$ BEGIN BEGIN - -- Call the wait function. - -- Note: The public function citus_rebalance_wait() takes no arguments. + RAISE NOTICE 'Waiting on rebalance with a 2-second timeout...'; + -- Public function citus_rebalance_wait() takes no arguments. PERFORM citus_rebalance_wait(); EXCEPTION WHEN query_canceled THEN - RAISE NOTICE 'Query canceled as expected'; - -- Swallow the error so the transaction continues. + RAISE NOTICE 'Rebalance wait canceled as expected'; + -- Your fix should cancel the underlying rebalance job. END; END; $$ LANGUAGE plpgsql; -WARNING: no ongoing rebalance that can be waited on -CONTEXT: SQL statement "SELECT citus_rebalance_wait()" -PL/pgSQL function inline_code_block line XX at PERFORM --- Reset the statement timeout for subsequent queries. +NOTICE: Waiting on rebalance with a 2-second timeout... +CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE +NOTICE: Rebalance wait canceled as expected +CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE SET statement_timeout = '0'; --- Verify that the job's state has been updated to 'cancelled' --- (the expected outcome after a cancellation). -SELECT job_state -FROM pg_dist_background_job -WHERE job_id = 1001; - job_state --------------------------------------------------------------------- - running -(1 row) +-- 9) Cleanup orphaned background resources (if any). +--------------------------------------------------------------------- +CALL citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 5 orphaned resources +--------------------------------------------------------------------- +-- 12) Traverse nodes and check for active replication slots. +-- +-- Connect to the coordinator and worker nodes, then query for replication slots. +-- Expected Outcome (with the fix applied): No active replication slots. +--------------------------------------------------------------------- +\c - - - :master_port +SELECT * FROM pg_replication_slots; + slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced +--------------------------------------------------------------------- +(0 rows) -SET client_min_messages TO WARNING; -DROP SCHEMA issue_7896 CASCADE; +\c - - - :worker_1_port +SELECT * FROM pg_replication_slots; + slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced +--------------------------------------------------------------------- +(0 rows) + +\c - - - :worker_2_port +SELECT * FROM pg_replication_slots; + slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced +--------------------------------------------------------------------- +(0 rows) + +--------------------------------------------------------------------- +-- 11) Cleanup: Drop the test schema. +--------------------------------------------------------------------- +\c - - - :master_port +SET search_path TO issue_7896; +DROP SCHEMA IF EXISTS issue_7896 CASCADE; +NOTICE: drop cascades to table t1 diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 3d7bd6e98..0a5c6a422 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -104,7 +104,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement test: binary_protocol test: alter_table_set_access_method test: alter_distributed_table -test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891 +test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891 issue_7896 test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes diff --git a/src/test/regress/sql/issue_7896.sql b/src/test/regress/sql/issue_7896.sql index 12d5ff341..679a5373d 100644 --- a/src/test/regress/sql/issue_7896.sql +++ b/src/test/regress/sql/issue_7896.sql @@ -1,45 +1,117 @@ +--------------------------------------------------------------------- +-- Regression Test: Simulate zombie replication slot when +-- citus_rebalance_wait() is canceled. +-- +-- In the buggy behavior, canceling citus_rebalance_wait() +-- (via a short statement_timeout or Ctrl+C) leaves behind an active logical +-- replication slot on a worker. This, in turn, prevents DROP DATABASE +-- (with FORCE) from succeeding. +-- +-- With your fix applied, the underlying rebalance job is canceled, +-- no zombie slot remains, and DROP DATABASE succeeds. +--------------------------------------------------------------------- + +--------------------------------------------------------------------- +-- 1) Create an isolated schema for this test. +--------------------------------------------------------------------- CREATE SCHEMA issue_7896; SET search_path TO issue_7896; --- Create a temporary table to simulate the background job catalog. --- (In production this would be the actual catalog table.) -CREATE TEMP TABLE pg_dist_background_job -( - job_id int8 PRIMARY KEY, - job_state text, - started_at timestamptz, - finished_at timestamptz -); +--------------------------------------------------------------------- +-- 2) Set cluster parameters and initialize environment. +--------------------------------------------------------------------- +-- We assume a coordinator with at least two workers. +-- Set replication factor to 2 and enable repartition joins. +SET citus.shard_replication_factor TO 2; +SET citus.enable_repartition_joins TO ON; +-- For faster background task processing, set a short background task queue interval. +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +SELECT pg_reload_conf(); --- Insert a dummy job record with job_state set to 'running' -INSERT INTO pg_dist_background_job (job_id, job_state, started_at) -VALUES (1001, 'running', now()); +--------------------------------------------------------------------- +-- 3) Create a distributed table. +--------------------------------------------------------------------- +DROP TABLE IF EXISTS t1; +CREATE TABLE t1 (a int PRIMARY KEY); +SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none'); --- Set a short statement timeout so that citus_rebalance_wait times out quickly. -SET statement_timeout = '1000ms'; +--------------------------------------------------------------------- +-- 4) Insert enough data so that a rebalance has measurable work. +--------------------------------------------------------------------- +INSERT INTO t1 + SELECT generate_series(1, 1000000); +--------------------------------------------------------------------- +-- 5) Verify that a rebalance on a balanced cluster is a no-op. +--------------------------------------------------------------------- +SELECT 1 FROM citus_rebalance_start(); +-- Expected: NOTICE "No moves available for rebalancing". +SELECT citus_rebalance_wait(); +-- Expected: WARNING "no ongoing rebalance that can be waited on". + +--------------------------------------------------------------------- +-- 6) Force a shard movement so that a rebalance job is scheduled. +-- Remove and re-add a worker using a parameter placeholder. +--------------------------------------------------------------------- +SELECT citus_remove_node('localhost', :worker_2_port); +SELECT citus_add_node('localhost', :worker_2_port); + +--------------------------------------------------------------------- +-- 7) Start a rebalance job that will do actual work. +--------------------------------------------------------------------- +SELECT citus_rebalance_start( + rebalance_strategy := 'by_disk_size', + shard_transfer_mode := 'force_logical' + ); +-- Expected: Notice that moves are scheduled as a background job. +-- (You may verify with: SELECT * FROM citus_rebalance_status();) + +--------------------------------------------------------------------- +-- 8) Attempt to wait on the rebalance with a short timeout so that the wait +-- is canceled. The PG_CATCH block in citus_job_wait_internal should then +-- cancel the underlying job (cleaning up temporary replication slots). +--------------------------------------------------------------------- +SET statement_timeout = '2s'; DO $$ BEGIN BEGIN - -- Call the wait function. - -- Note: The public function citus_rebalance_wait() takes no arguments. + RAISE NOTICE 'Waiting on rebalance with a 2-second timeout...'; + -- Public function citus_rebalance_wait() takes no arguments. PERFORM citus_rebalance_wait(); EXCEPTION WHEN query_canceled THEN - RAISE NOTICE 'Query canceled as expected'; - -- Swallow the error so the transaction continues. + RAISE NOTICE 'Rebalance wait canceled as expected'; + -- Your fix should cancel the underlying rebalance job. END; END; $$ LANGUAGE plpgsql; - --- Reset the statement timeout for subsequent queries. SET statement_timeout = '0'; --- Verify that the job's state has been updated to 'cancelled' --- (the expected outcome after a cancellation). -SELECT job_state -FROM pg_dist_background_job -WHERE job_id = 1001; +--------------------------------------------------------------------- +-- 9) Cleanup orphaned background resources (if any). +--------------------------------------------------------------------- +CALL citus_cleanup_orphaned_resources(); + +--------------------------------------------------------------------- +-- 10) Traverse nodes and check for active replication slots. +-- +-- Connect to the coordinator and worker nodes, then query for replication slots. +-- Expected Outcome (with the fix applied): No active replication slots. +--------------------------------------------------------------------- +\c - - - :master_port +SELECT * FROM pg_replication_slots; + +\c - - - :worker_1_port +SELECT * FROM pg_replication_slots; + +\c - - - :worker_2_port +SELECT * FROM pg_replication_slots; + + +--------------------------------------------------------------------- +-- 11) Cleanup: Drop the test schema. +--------------------------------------------------------------------- +\c - - - :master_port +SET search_path TO issue_7896; +DROP SCHEMA IF EXISTS issue_7896 CASCADE; -SET client_min_messages TO WARNING; -DROP SCHEMA issue_7896 CASCADE;