pull/7945/merge
Mehmet YILMAZ 2025-06-09 16:13:45 -04:00 committed by GitHub
commit af470e1d0f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 362 additions and 62 deletions

View File

@ -244,85 +244,110 @@ citus_task_wait(PG_FUNCTION_ARGS)
/*
* citus_job_wait_internal implements the waiting on a job for reuse in other areas where
* we want to wait on jobs. eg the background rebalancer.
* citus_job_wait_internal implements the waiting on a job, e.g. for the background
* rebalancer. If desiredStatus is provided, we throw an error if we reach a
* different terminal state that can never transition to the desired state.
*
* When a desiredStatus is provided it will provide an error when a different state is
* reached and the state cannot ever reach the desired state anymore.
* With the PG_TRY/PG_CATCH block, if the user cancels this SQL statement
* (Ctrl+C, statement_timeout, etc.), we will cancel the job in progress
* so it doesn't remain running in background.
*/
void
citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus)
{
/*
* Since we are wait polling we will actually allocate memory on every poll. To make
* sure we don't put unneeded pressure on the memory we create a context that we clear
* every iteration.
*/
MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext,
"JobsWaitContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContext oldContext = MemoryContextSwitchTo(waitContext);
while (true)
PG_TRY();
{
MemoryContextReset(waitContext);
/*
* Since we are wait polling, we actually allocate memory on every poll. To avoid
* putting unneeded pressure on memory, we create a context that we reset
* every iteration.
*/
MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext,
"JobsWaitContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContext oldContext = MemoryContextSwitchTo(waitContext);
BackgroundJob *job = GetBackgroundJobByJobId(jobid);
if (!job)
while (true)
{
ereport(ERROR, (errmsg("no job found for job with jobid: %ld", jobid)));
}
MemoryContextReset(waitContext);
if (desiredStatus && job->state == *desiredStatus)
{
/* job has reached its desired status, done waiting */
break;
}
if (IsBackgroundJobStatusTerminal(job->state))
{
if (desiredStatus)
BackgroundJob *job = GetBackgroundJobByJobId(jobid);
if (!job)
{
/*
* We have reached a terminal state, which is not the desired state we
* were waiting for, otherwise we would have escaped earlier. Since it is
* a terminal state we know that we can never reach the desired state.
*/
Oid reachedStatusOid = BackgroundJobStatusOid(job->state);
Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out,
reachedStatusOid);
char *reachedStatusName = DatumGetCString(reachedStatusNameDatum);
Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus);
Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out,
desiredStatusOid);
char *desiredStatusName = DatumGetCString(desiredStatusNameDatum);
ereport(ERROR,
(errmsg("Job reached terminal state \"%s\" instead of desired "
"state \"%s\"", reachedStatusName, desiredStatusName)));
(errmsg("no job found for job with jobid: %ld", jobid)));
}
/* job has reached its terminal state, done waiting */
break;
/* If we have a desiredStatus and we've reached it, we're done */
if (desiredStatus && job->state == *desiredStatus)
{
break;
}
/* If the job is in a terminal state (e.g. SUCCEEDED, FAILED, or CANCELED),
* but not the desired state, throw an error or stop waiting.
*/
if (IsBackgroundJobStatusTerminal(job->state))
{
if (desiredStatus)
{
Oid reachedStatusOid = BackgroundJobStatusOid(job->state);
Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out,
reachedStatusOid);
char *reachedStatusName = DatumGetCString(reachedStatusNameDatum);
Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus);
Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out,
desiredStatusOid);
char *desiredStatusName = DatumGetCString(desiredStatusNameDatum);
ereport(ERROR,
(errmsg(
"Job reached terminal state \"%s\" instead of desired "
"state \"%s\"", reachedStatusName,
desiredStatusName)));
}
/* Otherwise, if no desiredStatus was given, we accept this terminal state. */
break;
}
/* Before sleeping, check for user interrupts (Ctrl+C, statement_timeout, etc.) */
CHECK_FOR_INTERRUPTS();
/* Sleep 1 second before re-checking the job status */
const long delay_ms = 1000;
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
delay_ms,
WAIT_EVENT_PG_SLEEP);
ResetLatch(MyLatch);
}
/* sleep for a while, before rechecking the job status */
CHECK_FOR_INTERRUPTS();
const long delay_ms = 1000;
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
delay_ms,
WAIT_EVENT_PG_SLEEP);
ResetLatch(MyLatch);
MemoryContextSwitchTo(oldContext);
MemoryContextDelete(waitContext);
}
PG_CATCH();
{
/*
* If we get here, the user canceled the statement or an ERROR occurred.
* We forcibly cancel the job so that it doesn't remain running in background.
* This ensures no "zombie" shard moves or leftover replication slots.
*/
MemoryContextSwitchTo(oldContext);
MemoryContextDelete(waitContext);
/* Switch out of the waitContext so we can safely do cleanup in TopMemoryContext. */
MemoryContextSwitchTo(TopMemoryContext);
/* Attempt to cancel the job; if it's already in a terminal state, that's okay. */
(void) DirectFunctionCall1(citus_job_cancel, Int64GetDatum(jobid));
/* Re-throw the original error so Postgres knows this statement was canceled. */
PG_RE_THROW();
}
PG_END_TRY();
}

View File

@ -0,0 +1,155 @@
---------------------------------------------------------------------
-- Regression Test: Simulate zombie replication slot when
-- citus_rebalance_wait() is canceled.
--
-- In the buggy behavior, canceling citus_rebalance_wait()
-- (via a short statement_timeout or Ctrl+C) leaves behind an active logical
-- replication slot on a worker. This, in turn, prevents DROP DATABASE
-- (with FORCE) from succeeding.
--
-- With the fix applied, the underlying rebalance job is canceled,
-- no zombie slot remains.
---------------------------------------------------------------------
---------------------------------------------------------------------
-- 1) Setup the test environment.
---------------------------------------------------------------------
SET citus.next_shard_id TO 17560000;
CREATE SCHEMA issue_7896;
SET search_path TO issue_7896;
SET client_min_messages TO ERROR;
---------------------------------------------------------------------
-- 2) Set cluster parameters and initialize environment.
---------------------------------------------------------------------
SET citus.shard_replication_factor TO 2;
SET citus.enable_repartition_joins TO ON;
-- For faster background task processing, set a short background task queue interval.
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
---------------------------------------------------------------------
-- 3) Create a distributed table.
---------------------------------------------------------------------
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (a int PRIMARY KEY);
SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
---------------------------------------------------------------------
-- 4) Insert enough data so that a rebalance has measurable work.
---------------------------------------------------------------------
INSERT INTO t1
SELECT generate_series(1, 1000000);
---------------------------------------------------------------------
-- 5) Verify that a rebalance on a balanced cluster is a no-op.
---------------------------------------------------------------------
SELECT 1 FROM citus_rebalance_start();
?column?
---------------------------------------------------------------------
1
(1 row)
-- Expected: NOTICE "No moves available for rebalancing".
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
-- Expected: WARNING "no ongoing rebalance that can be waited on".
---------------------------------------------------------------------
-- 6) Force a shard movement so that a rebalance job is scheduled.
-- Remove and re-add a worker using a parameter placeholder.
---------------------------------------------------------------------
SELECT citus_remove_node('localhost', :worker_2_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
SELECT citus_add_node('localhost', :worker_2_port);
citus_add_node
---------------------------------------------------------------------
30
(1 row)
---------------------------------------------------------------------
-- 7) Start a rebalance job that will do actual work.
---------------------------------------------------------------------
SELECT citus_rebalance_start(
rebalance_strategy := 'by_disk_size',
shard_transfer_mode := 'force_logical'
);
citus_rebalance_start
---------------------------------------------------------------------
1
(1 row)
-- Expected: Notice that moves are scheduled as a background job.
---------------------------------------------------------------------
-- 8) Attempt to wait on the rebalance with a short timeout so that the wait
-- is canceled. The PG_CATCH block in citus_job_wait_internal should then
-- cancel the underlying job (cleaning up temporary replication slots).
---------------------------------------------------------------------
SET statement_timeout = '2s';
SET client_min_messages TO NOTICE;
DO $$
BEGIN
BEGIN
RAISE NOTICE 'Waiting on rebalance with a 2-second timeout...';
-- Public function citus_rebalance_wait() takes no arguments.
PERFORM citus_rebalance_wait();
EXCEPTION
WHEN query_canceled THEN
RAISE NOTICE 'Rebalance wait canceled as expected';
-- Your fix should cancel the underlying rebalance job.
END;
END;
$$ LANGUAGE plpgsql;
NOTICE: Waiting on rebalance with a 2-second timeout...
CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE
NOTICE: Rebalance wait canceled as expected
CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE
SET statement_timeout = '0';
SET client_min_messages TO ERROR;
---------------------------------------------------------------------
-- 9) Cleanup orphaned background resources (if any).
---------------------------------------------------------------------
CALL citus_cleanup_orphaned_resources();
---------------------------------------------------------------------
-- 10) Traverse nodes and check for active replication slots.
--
-- Connect to the coordinator and worker nodes, then query for replication slots.
-- Expected Outcome (with the fix applied): No active replication slots.
---------------------------------------------------------------------
\c - - - :master_port
SELECT * FROM pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced
---------------------------------------------------------------------
(0 rows)
\c - - - :worker_1_port
SELECT * FROM pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced
---------------------------------------------------------------------
(0 rows)
\c - - - :worker_2_port
SELECT * FROM pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced
---------------------------------------------------------------------
(0 rows)
---------------------------------------------------------------------
-- 11) Cleanup: Drop the test schema.
---------------------------------------------------------------------
\c - - - :master_port
SET search_path TO issue_7896;
SET client_min_messages TO WARNING;
DROP SCHEMA IF EXISTS issue_7896 CASCADE;

View File

@ -69,6 +69,7 @@ test: local_shard_execution_dropped_column
test: metadata_sync_helpers
test: issue_6592
test: issue_7896
test: executor_local_failure
# test that no tests leaked intermediate results. This should always be last

View File

@ -0,0 +1,119 @@
---------------------------------------------------------------------
-- Regression Test: Simulate zombie replication slot when
-- citus_rebalance_wait() is canceled.
--
-- In the buggy behavior, canceling citus_rebalance_wait()
-- (via a short statement_timeout or Ctrl+C) leaves behind an active logical
-- replication slot on a worker. This, in turn, prevents DROP DATABASE
-- (with FORCE) from succeeding.
--
-- With the fix applied, the underlying rebalance job is canceled,
-- no zombie slot remains.
---------------------------------------------------------------------
---------------------------------------------------------------------
-- 1) Setup the test environment.
---------------------------------------------------------------------
SET citus.next_shard_id TO 17560000;
CREATE SCHEMA issue_7896;
SET search_path TO issue_7896;
SET client_min_messages TO ERROR;
---------------------------------------------------------------------
-- 2) Set cluster parameters and initialize environment.
---------------------------------------------------------------------
SET citus.shard_replication_factor TO 2;
SET citus.enable_repartition_joins TO ON;
-- For faster background task processing, set a short background task queue interval.
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
SELECT pg_reload_conf();
---------------------------------------------------------------------
-- 3) Create a distributed table.
---------------------------------------------------------------------
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (a int PRIMARY KEY);
SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none');
---------------------------------------------------------------------
-- 4) Insert enough data so that a rebalance has measurable work.
---------------------------------------------------------------------
INSERT INTO t1
SELECT generate_series(1, 1000000);
---------------------------------------------------------------------
-- 5) Verify that a rebalance on a balanced cluster is a no-op.
---------------------------------------------------------------------
SELECT 1 FROM citus_rebalance_start();
-- Expected: NOTICE "No moves available for rebalancing".
SELECT citus_rebalance_wait();
-- Expected: WARNING "no ongoing rebalance that can be waited on".
---------------------------------------------------------------------
-- 6) Force a shard movement so that a rebalance job is scheduled.
-- Remove and re-add a worker using a parameter placeholder.
---------------------------------------------------------------------
SELECT citus_remove_node('localhost', :worker_2_port);
SELECT citus_add_node('localhost', :worker_2_port);
---------------------------------------------------------------------
-- 7) Start a rebalance job that will do actual work.
---------------------------------------------------------------------
SELECT citus_rebalance_start(
rebalance_strategy := 'by_disk_size',
shard_transfer_mode := 'force_logical'
);
-- Expected: Notice that moves are scheduled as a background job.
---------------------------------------------------------------------
-- 8) Attempt to wait on the rebalance with a short timeout so that the wait
-- is canceled. The PG_CATCH block in citus_job_wait_internal should then
-- cancel the underlying job (cleaning up temporary replication slots).
---------------------------------------------------------------------
SET statement_timeout = '2s';
SET client_min_messages TO NOTICE;
DO $$
BEGIN
BEGIN
RAISE NOTICE 'Waiting on rebalance with a 2-second timeout...';
-- Public function citus_rebalance_wait() takes no arguments.
PERFORM citus_rebalance_wait();
EXCEPTION
WHEN query_canceled THEN
RAISE NOTICE 'Rebalance wait canceled as expected';
-- Your fix should cancel the underlying rebalance job.
END;
END;
$$ LANGUAGE plpgsql;
SET statement_timeout = '0';
SET client_min_messages TO ERROR;
---------------------------------------------------------------------
-- 9) Cleanup orphaned background resources (if any).
---------------------------------------------------------------------
CALL citus_cleanup_orphaned_resources();
---------------------------------------------------------------------
-- 10) Traverse nodes and check for active replication slots.
--
-- Connect to the coordinator and worker nodes, then query for replication slots.
-- Expected Outcome (with the fix applied): No active replication slots.
---------------------------------------------------------------------
\c - - - :master_port
SELECT * FROM pg_replication_slots;
\c - - - :worker_1_port
SELECT * FROM pg_replication_slots;
\c - - - :worker_2_port
SELECT * FROM pg_replication_slots;
---------------------------------------------------------------------
-- 11) Cleanup: Drop the test schema.
---------------------------------------------------------------------
\c - - - :master_port
SET search_path TO issue_7896;
SET client_min_messages TO WARNING;
DROP SCHEMA IF EXISTS issue_7896 CASCADE;