mirror of https://github.com/citusdata/citus.git
Add regression test for zombie replication slot cleanup during job cancellation
parent
5b6b7b847e
commit
23a4671a68
|
@ -1,46 +1,161 @@
|
|||
---------------------------------------------------------------------
|
||||
-- Regression Test: Simulate zombie replication slot when
|
||||
-- citus_rebalance_wait() is canceled.
|
||||
--
|
||||
-- In the buggy behavior, canceling citus_rebalance_wait()
|
||||
-- (via a short statement_timeout or Ctrl+C) leaves behind an active logical
|
||||
-- replication slot on a worker. This, in turn, prevents DROP DATABASE
|
||||
-- (with FORCE) from succeeding.
|
||||
--
|
||||
-- With your fix applied, the underlying rebalance job is canceled,
|
||||
-- no zombie slot remains, and DROP DATABASE succeeds.
|
||||
---------------------------------------------------------------------
|
||||
---------------------------------------------------------------------
|
||||
-- 1) Create an isolated schema for this test.
|
||||
---------------------------------------------------------------------
|
||||
CREATE SCHEMA issue_7896;
|
||||
SET search_path TO issue_7896;
|
||||
-- Create a temporary table to simulate the background job catalog.
|
||||
-- (In production this would be the actual catalog table.)
|
||||
CREATE TEMP TABLE pg_dist_background_job
|
||||
(
|
||||
job_id int8 PRIMARY KEY,
|
||||
job_state text,
|
||||
started_at timestamptz,
|
||||
finished_at timestamptz
|
||||
);
|
||||
-- Insert a dummy job record with job_state set to 'running'
|
||||
INSERT INTO pg_dist_background_job (job_id, job_state, started_at)
|
||||
VALUES (1001, 'running', now());
|
||||
-- Set a short statement timeout so that citus_rebalance_wait times out quickly.
|
||||
SET statement_timeout = '1000ms';
|
||||
---------------------------------------------------------------------
|
||||
-- 2) Set cluster parameters and initialize environment.
|
||||
---------------------------------------------------------------------
|
||||
-- We assume a coordinator with at least two workers.
|
||||
-- Set replication factor to 2 and enable repartition joins.
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
-- For faster background task processing, set a short background task queue interval.
|
||||
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
---------------------------------------------------------------------
|
||||
-- 3) Create a distributed table.
|
||||
---------------------------------------------------------------------
|
||||
DROP TABLE IF EXISTS t1;
|
||||
NOTICE: table "t1" does not exist, skipping
|
||||
CREATE TABLE t1 (a int PRIMARY KEY);
|
||||
SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
---------------------------------------------------------------------
|
||||
-- 4) Insert enough data so that a rebalance has measurable work.
|
||||
---------------------------------------------------------------------
|
||||
INSERT INTO t1
|
||||
SELECT generate_series(1, 1000000);
|
||||
---------------------------------------------------------------------
|
||||
-- 5) Verify that a rebalance on a balanced cluster is a no-op.
|
||||
---------------------------------------------------------------------
|
||||
SELECT 1 FROM citus_rebalance_start();
|
||||
NOTICE: No moves available for rebalancing
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- Expected: NOTICE "No moves available for rebalancing".
|
||||
SELECT citus_rebalance_wait();
|
||||
WARNING: no ongoing rebalance that can be waited on
|
||||
citus_rebalance_wait
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Expected: WARNING "no ongoing rebalance that can be waited on".
|
||||
---------------------------------------------------------------------
|
||||
-- 6) Force a shard movement so that a rebalance job is scheduled.
|
||||
-- Remove and re-add a worker using a parameter placeholder.
|
||||
---------------------------------------------------------------------
|
||||
SELECT citus_remove_node('localhost', :worker_2_port);
|
||||
citus_remove_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus_add_node('localhost', :worker_2_port);
|
||||
citus_add_node
|
||||
---------------------------------------------------------------------
|
||||
30
|
||||
(1 row)
|
||||
|
||||
---------------------------------------------------------------------
|
||||
-- 7) Start a rebalance job that will do actual work.
|
||||
---------------------------------------------------------------------
|
||||
SELECT citus_rebalance_start(
|
||||
rebalance_strategy := 'by_disk_size',
|
||||
shard_transfer_mode := 'force_logical'
|
||||
);
|
||||
NOTICE: Scheduled 2 moves as job xxx
|
||||
DETAIL: Rebalance scheduled as background job
|
||||
HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status();
|
||||
citus_rebalance_start
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- Expected: Notice that moves are scheduled as a background job.
|
||||
-- (You may verify with: SELECT * FROM citus_rebalance_status();)
|
||||
---------------------------------------------------------------------
|
||||
-- 8) Attempt to wait on the rebalance with a short timeout so that the wait
|
||||
-- is canceled. The PG_CATCH block in citus_job_wait_internal should then
|
||||
-- cancel the underlying job (cleaning up temporary replication slots).
|
||||
---------------------------------------------------------------------
|
||||
SET statement_timeout = '2s';
|
||||
DO $$
|
||||
BEGIN
|
||||
BEGIN
|
||||
-- Call the wait function.
|
||||
-- Note: The public function citus_rebalance_wait() takes no arguments.
|
||||
RAISE NOTICE 'Waiting on rebalance with a 2-second timeout...';
|
||||
-- Public function citus_rebalance_wait() takes no arguments.
|
||||
PERFORM citus_rebalance_wait();
|
||||
EXCEPTION
|
||||
WHEN query_canceled THEN
|
||||
RAISE NOTICE 'Query canceled as expected';
|
||||
-- Swallow the error so the transaction continues.
|
||||
RAISE NOTICE 'Rebalance wait canceled as expected';
|
||||
-- Your fix should cancel the underlying rebalance job.
|
||||
END;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
WARNING: no ongoing rebalance that can be waited on
|
||||
CONTEXT: SQL statement "SELECT citus_rebalance_wait()"
|
||||
PL/pgSQL function inline_code_block line XX at PERFORM
|
||||
-- Reset the statement timeout for subsequent queries.
|
||||
NOTICE: Waiting on rebalance with a 2-second timeout...
|
||||
CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE
|
||||
NOTICE: Rebalance wait canceled as expected
|
||||
CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE
|
||||
SET statement_timeout = '0';
|
||||
-- Verify that the job's state has been updated to 'cancelled'
|
||||
-- (the expected outcome after a cancellation).
|
||||
SELECT job_state
|
||||
FROM pg_dist_background_job
|
||||
WHERE job_id = 1001;
|
||||
job_state
|
||||
---------------------------------------------------------------------
|
||||
running
|
||||
(1 row)
|
||||
-- 9) Cleanup orphaned background resources (if any).
|
||||
---------------------------------------------------------------------
|
||||
CALL citus_cleanup_orphaned_resources();
|
||||
NOTICE: cleaned up 5 orphaned resources
|
||||
---------------------------------------------------------------------
|
||||
-- 12) Traverse nodes and check for active replication slots.
|
||||
--
|
||||
-- Connect to the coordinator and worker nodes, then query for replication slots.
|
||||
-- Expected Outcome (with the fix applied): No active replication slots.
|
||||
---------------------------------------------------------------------
|
||||
\c - - - :master_port
|
||||
SELECT * FROM pg_replication_slots;
|
||||
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA issue_7896 CASCADE;
|
||||
\c - - - :worker_1_port
|
||||
SELECT * FROM pg_replication_slots;
|
||||
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SELECT * FROM pg_replication_slots;
|
||||
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
---------------------------------------------------------------------
|
||||
-- 11) Cleanup: Drop the test schema.
|
||||
---------------------------------------------------------------------
|
||||
\c - - - :master_port
|
||||
SET search_path TO issue_7896;
|
||||
DROP SCHEMA IF EXISTS issue_7896 CASCADE;
|
||||
NOTICE: drop cascades to table t1
|
||||
|
|
|
@ -104,7 +104,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement
|
|||
test: binary_protocol
|
||||
test: alter_table_set_access_method
|
||||
test: alter_distributed_table
|
||||
test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891
|
||||
test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891 issue_7896
|
||||
test: object_propagation_debug
|
||||
test: undistribute_table
|
||||
test: run_command_on_all_nodes
|
||||
|
|
|
@ -1,45 +1,117 @@
|
|||
---------------------------------------------------------------------
|
||||
-- Regression Test: Simulate zombie replication slot when
|
||||
-- citus_rebalance_wait() is canceled.
|
||||
--
|
||||
-- In the buggy behavior, canceling citus_rebalance_wait()
|
||||
-- (via a short statement_timeout or Ctrl+C) leaves behind an active logical
|
||||
-- replication slot on a worker. This, in turn, prevents DROP DATABASE
|
||||
-- (with FORCE) from succeeding.
|
||||
--
|
||||
-- With your fix applied, the underlying rebalance job is canceled,
|
||||
-- no zombie slot remains, and DROP DATABASE succeeds.
|
||||
---------------------------------------------------------------------
|
||||
|
||||
---------------------------------------------------------------------
|
||||
-- 1) Create an isolated schema for this test.
|
||||
---------------------------------------------------------------------
|
||||
CREATE SCHEMA issue_7896;
|
||||
SET search_path TO issue_7896;
|
||||
|
||||
-- Create a temporary table to simulate the background job catalog.
|
||||
-- (In production this would be the actual catalog table.)
|
||||
CREATE TEMP TABLE pg_dist_background_job
|
||||
(
|
||||
job_id int8 PRIMARY KEY,
|
||||
job_state text,
|
||||
started_at timestamptz,
|
||||
finished_at timestamptz
|
||||
);
|
||||
---------------------------------------------------------------------
|
||||
-- 2) Set cluster parameters and initialize environment.
|
||||
---------------------------------------------------------------------
|
||||
-- We assume a coordinator with at least two workers.
|
||||
-- Set replication factor to 2 and enable repartition joins.
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
-- For faster background task processing, set a short background task queue interval.
|
||||
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
|
||||
SELECT pg_reload_conf();
|
||||
|
||||
-- Insert a dummy job record with job_state set to 'running'
|
||||
INSERT INTO pg_dist_background_job (job_id, job_state, started_at)
|
||||
VALUES (1001, 'running', now());
|
||||
---------------------------------------------------------------------
|
||||
-- 3) Create a distributed table.
|
||||
---------------------------------------------------------------------
|
||||
DROP TABLE IF EXISTS t1;
|
||||
CREATE TABLE t1 (a int PRIMARY KEY);
|
||||
SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none');
|
||||
|
||||
-- Set a short statement timeout so that citus_rebalance_wait times out quickly.
|
||||
SET statement_timeout = '1000ms';
|
||||
---------------------------------------------------------------------
|
||||
-- 4) Insert enough data so that a rebalance has measurable work.
|
||||
---------------------------------------------------------------------
|
||||
INSERT INTO t1
|
||||
SELECT generate_series(1, 1000000);
|
||||
|
||||
---------------------------------------------------------------------
|
||||
-- 5) Verify that a rebalance on a balanced cluster is a no-op.
|
||||
---------------------------------------------------------------------
|
||||
SELECT 1 FROM citus_rebalance_start();
|
||||
-- Expected: NOTICE "No moves available for rebalancing".
|
||||
SELECT citus_rebalance_wait();
|
||||
-- Expected: WARNING "no ongoing rebalance that can be waited on".
|
||||
|
||||
---------------------------------------------------------------------
|
||||
-- 6) Force a shard movement so that a rebalance job is scheduled.
|
||||
-- Remove and re-add a worker using a parameter placeholder.
|
||||
---------------------------------------------------------------------
|
||||
SELECT citus_remove_node('localhost', :worker_2_port);
|
||||
SELECT citus_add_node('localhost', :worker_2_port);
|
||||
|
||||
---------------------------------------------------------------------
|
||||
-- 7) Start a rebalance job that will do actual work.
|
||||
---------------------------------------------------------------------
|
||||
SELECT citus_rebalance_start(
|
||||
rebalance_strategy := 'by_disk_size',
|
||||
shard_transfer_mode := 'force_logical'
|
||||
);
|
||||
-- Expected: Notice that moves are scheduled as a background job.
|
||||
-- (You may verify with: SELECT * FROM citus_rebalance_status();)
|
||||
|
||||
---------------------------------------------------------------------
|
||||
-- 8) Attempt to wait on the rebalance with a short timeout so that the wait
|
||||
-- is canceled. The PG_CATCH block in citus_job_wait_internal should then
|
||||
-- cancel the underlying job (cleaning up temporary replication slots).
|
||||
---------------------------------------------------------------------
|
||||
SET statement_timeout = '2s';
|
||||
DO $$
|
||||
BEGIN
|
||||
BEGIN
|
||||
-- Call the wait function.
|
||||
-- Note: The public function citus_rebalance_wait() takes no arguments.
|
||||
RAISE NOTICE 'Waiting on rebalance with a 2-second timeout...';
|
||||
-- Public function citus_rebalance_wait() takes no arguments.
|
||||
PERFORM citus_rebalance_wait();
|
||||
EXCEPTION
|
||||
WHEN query_canceled THEN
|
||||
RAISE NOTICE 'Query canceled as expected';
|
||||
-- Swallow the error so the transaction continues.
|
||||
RAISE NOTICE 'Rebalance wait canceled as expected';
|
||||
-- Your fix should cancel the underlying rebalance job.
|
||||
END;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Reset the statement timeout for subsequent queries.
|
||||
SET statement_timeout = '0';
|
||||
|
||||
-- Verify that the job's state has been updated to 'cancelled'
|
||||
-- (the expected outcome after a cancellation).
|
||||
SELECT job_state
|
||||
FROM pg_dist_background_job
|
||||
WHERE job_id = 1001;
|
||||
---------------------------------------------------------------------
|
||||
-- 9) Cleanup orphaned background resources (if any).
|
||||
---------------------------------------------------------------------
|
||||
CALL citus_cleanup_orphaned_resources();
|
||||
|
||||
---------------------------------------------------------------------
|
||||
-- 10) Traverse nodes and check for active replication slots.
|
||||
--
|
||||
-- Connect to the coordinator and worker nodes, then query for replication slots.
|
||||
-- Expected Outcome (with the fix applied): No active replication slots.
|
||||
---------------------------------------------------------------------
|
||||
\c - - - :master_port
|
||||
SELECT * FROM pg_replication_slots;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT * FROM pg_replication_slots;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SELECT * FROM pg_replication_slots;
|
||||
|
||||
|
||||
---------------------------------------------------------------------
|
||||
-- 11) Cleanup: Drop the test schema.
|
||||
---------------------------------------------------------------------
|
||||
\c - - - :master_port
|
||||
SET search_path TO issue_7896;
|
||||
DROP SCHEMA IF EXISTS issue_7896 CASCADE;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA issue_7896 CASCADE;
|
||||
|
|
Loading…
Reference in New Issue