Rebalance Progress Reporting API (#6576)

citus_job_list() lists all background jobs by simply showing the records
in pg_dist_background_job.

citus_job_status(job_id bigint, raw boolean default false) shows the
status of a single background job by appending a jsonb details column to
the associated row from pg_dist_background_job. If the raw argument is
set, machine readable sizes are used instead of human readable
alternatives.

citus_rebalance_status(raw boolean default false) shows the status of
the last rebalance operation. If the raw argument is set, machine
readable sizes are used instead of human readable alternatives.
pull/6625/head
Hanefi Onaldi 2023-01-16 16:17:31 +03:00 committed by GitHub
parent 92689a8362
commit f21dfd5fae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 336 additions and 18 deletions

View File

@ -2312,7 +2312,7 @@ IsForeignTable(Oid relationId)
* For a task to be able to run the following conditions apply:
* - Task is in Running state. This could happen when a Background Tasks Queue Monitor
* had crashed or is otherwise restarted. To recover from such a failure tasks in
* Running state are deeed Runnable.
* Running state are deemed Runnable.
* - Task is in Runnable state with either _no_ value set in not_before, or a value that
* has currently passed. If the not_before field is set to a time in the future the
* task is currently not ready to be started.

View File

@ -1853,9 +1853,7 @@ ErrorOnConcurrentRebalance(RebalanceOptions *options)
errmsg("A rebalance is already running as job %ld", jobId),
errdetail("A rebalance was already scheduled as background job"),
errhint("To monitor progress, run: SELECT * FROM "
"pg_dist_background_task WHERE job_id = %ld ORDER BY task_id "
"ASC; or SELECT * FROM get_rebalance_progress();",
jobId)));
"citus_rebalance_status();")));
}
}
@ -1983,10 +1981,8 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
(errmsg("Scheduled %d moves as job %ld",
list_length(placementUpdateList), jobId),
errdetail("Rebalance scheduled as background job"),
errhint("To monitor progress, run: "
"SELECT * FROM pg_dist_background_task WHERE job_id = %ld ORDER BY "
"task_id ASC; or SELECT * FROM get_rebalance_progress();",
jobId)));
errhint("To monitor progress, run: SELECT * FROM "
"citus_rebalance_status();")));
return jobId;
}

View File

@ -9,6 +9,9 @@ DROP FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer)
#include "udfs/citus_get_transaction_clock/11.2-1.sql"
#include "udfs/citus_is_clock_after/11.2-1.sql"
#include "udfs/citus_internal_adjust_local_clock_to_remote/11.2-1.sql"
#include "udfs/citus_job_list/11.2-1.sql"
#include "udfs/citus_job_status/11.2-1.sql"
#include "udfs/citus_rebalance_status/11.2-1.sql"
#include "udfs/worker_split_shard_replication_setup/11.2-1.sql"
#include "udfs/citus_task_wait/11.2-1.sql"
#include "udfs/citus_prepare_pg_upgrade/11.2-1.sql"

View File

@ -5,6 +5,9 @@ DROP FUNCTION pg_catalog.citus_get_node_clock();
DROP FUNCTION pg_catalog.citus_get_transaction_clock();
DROP FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(cluster_clock);
DROP FUNCTION pg_catalog.citus_is_clock_after(cluster_clock, cluster_clock);
DROP FUNCTION pg_catalog.citus_job_list();
DROP FUNCTION pg_catalog.citus_job_status(bigint,boolean);
DROP FUNCTION pg_catalog.citus_rebalance_status(boolean);
DROP FUNCTION pg_catalog.cluster_clock_logical(cluster_clock);
DROP SEQUENCE pg_catalog.pg_dist_clock_logical_seq;
DROP OPERATOR CLASS pg_catalog.cluster_clock_ops USING btree CASCADE;

View File

@ -0,0 +1,23 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_job_list ()
RETURNS TABLE (
job_id bigint,
state pg_catalog.citus_job_status,
job_type name,
description text,
started_at timestamptz,
finished_at timestamptz
)
LANGUAGE SQL
AS $fn$
SELECT
job_id,
state,
job_type,
description,
started_at,
finished_at
FROM
pg_dist_background_job
ORDER BY
job_id
$fn$;

View File

@ -0,0 +1,23 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_job_list ()
RETURNS TABLE (
job_id bigint,
state pg_catalog.citus_job_status,
job_type name,
description text,
started_at timestamptz,
finished_at timestamptz
)
LANGUAGE SQL
AS $fn$
SELECT
job_id,
state,
job_type,
description,
started_at,
finished_at
FROM
pg_dist_background_job
ORDER BY
job_id
$fn$;

View File

@ -0,0 +1,103 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_job_status (
job_id bigint,
raw boolean DEFAULT FALSE
)
RETURNS TABLE (
job_id bigint,
state pg_catalog.citus_job_status,
job_type name,
description text,
started_at timestamptz,
finished_at timestamptz,
details jsonb
)
LANGUAGE SQL
STRICT
AS $fn$
WITH rp AS MATERIALIZED (
SELECT
sessionid,
sum(source_shard_size) as source_shard_size,
sum(target_shard_size) as target_shard_size,
any_value(status) as status,
any_value(sourcename) as sourcename,
any_value(sourceport) as sourceport,
any_value(targetname) as targetname,
any_value(targetport) as targetport,
max(source_lsn) as source_lsn,
min(target_lsn) as target_lsn
FROM get_rebalance_progress()
GROUP BY sessionid
),
task_state_occurence_counts AS (
SELECT t.status, count(task_id)
FROM pg_dist_background_job j
JOIN pg_dist_background_task t ON t.job_id = j.job_id
WHERE j.job_id = $1
GROUP BY t.status
),
running_task_details AS (
SELECT jsonb_agg(jsonb_build_object(
'state', t.status,
'retried', coalesce(t.retry_count,0),
'phase', rp.status,
'size' , jsonb_build_object(
'source', rp.source_shard_size,
'target', rp.target_shard_size),
'hosts', jsonb_build_object(
'source', rp.sourcename || ':' || rp.sourceport,
'target', rp.targetname || ':' || rp.targetport),
'message', t.message,
'command', t.command,
'task_id', t.task_id ) ||
CASE
WHEN ($2) THEN jsonb_build_object(
'size', jsonb_build_object(
'source', rp.source_shard_size,
'target', rp.target_shard_size),
'LSN', jsonb_build_object(
'source', rp.source_lsn,
'target', rp.target_lsn,
'lag', rp.source_lsn - rp.target_lsn))
ELSE jsonb_build_object(
'size', jsonb_build_object(
'source', pg_size_pretty(rp.source_shard_size),
'target', pg_size_pretty(rp.target_shard_size)),
'LSN', jsonb_build_object(
'source', rp.source_lsn,
'target', rp.target_lsn,
'lag', pg_size_pretty(rp.source_lsn - rp.target_lsn)))
END) AS tasks
FROM
rp JOIN pg_dist_background_task t ON rp.sessionid = t.pid
JOIN pg_dist_background_job j ON t.job_id = j.job_id
WHERE j.job_id = $1
AND t.status = 'running'
),
errored_task_details AS (
SELECT jsonb_agg(jsonb_build_object(
'state', t.status,
'retried', coalesce(t.retry_count,0),
'message', t.message,
'command', t.command,
'task_id', t.task_id )) AS tasks
FROM
pg_dist_background_task t JOIN pg_dist_background_job j ON t.job_id = j.job_id
WHERE j.job_id = $1
AND NOT EXISTS (SELECT 1 FROM rp WHERE rp.sessionid = t.pid)
AND t.status = 'error'
)
SELECT
job_id,
state,
job_type,
description,
started_at,
finished_at,
jsonb_build_object(
'task_state_counts', (SELECT jsonb_object_agg(status, count) FROM task_state_occurence_counts),
'tasks', (COALESCE((SELECT tasks FROM running_task_details),'[]'::jsonb) ||
COALESCE((SELECT tasks FROM errored_task_details),'[]'::jsonb))) AS details
FROM pg_dist_background_job j
WHERE j.job_id = $1
$fn$;

View File

@ -0,0 +1,103 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_job_status (
job_id bigint,
raw boolean DEFAULT FALSE
)
RETURNS TABLE (
job_id bigint,
state pg_catalog.citus_job_status,
job_type name,
description text,
started_at timestamptz,
finished_at timestamptz,
details jsonb
)
LANGUAGE SQL
STRICT
AS $fn$
WITH rp AS MATERIALIZED (
SELECT
sessionid,
sum(source_shard_size) as source_shard_size,
sum(target_shard_size) as target_shard_size,
any_value(status) as status,
any_value(sourcename) as sourcename,
any_value(sourceport) as sourceport,
any_value(targetname) as targetname,
any_value(targetport) as targetport,
max(source_lsn) as source_lsn,
min(target_lsn) as target_lsn
FROM get_rebalance_progress()
GROUP BY sessionid
),
task_state_occurence_counts AS (
SELECT t.status, count(task_id)
FROM pg_dist_background_job j
JOIN pg_dist_background_task t ON t.job_id = j.job_id
WHERE j.job_id = $1
GROUP BY t.status
),
running_task_details AS (
SELECT jsonb_agg(jsonb_build_object(
'state', t.status,
'retried', coalesce(t.retry_count,0),
'phase', rp.status,
'size' , jsonb_build_object(
'source', rp.source_shard_size,
'target', rp.target_shard_size),
'hosts', jsonb_build_object(
'source', rp.sourcename || ':' || rp.sourceport,
'target', rp.targetname || ':' || rp.targetport),
'message', t.message,
'command', t.command,
'task_id', t.task_id ) ||
CASE
WHEN ($2) THEN jsonb_build_object(
'size', jsonb_build_object(
'source', rp.source_shard_size,
'target', rp.target_shard_size),
'LSN', jsonb_build_object(
'source', rp.source_lsn,
'target', rp.target_lsn,
'lag', rp.source_lsn - rp.target_lsn))
ELSE jsonb_build_object(
'size', jsonb_build_object(
'source', pg_size_pretty(rp.source_shard_size),
'target', pg_size_pretty(rp.target_shard_size)),
'LSN', jsonb_build_object(
'source', rp.source_lsn,
'target', rp.target_lsn,
'lag', pg_size_pretty(rp.source_lsn - rp.target_lsn)))
END) AS tasks
FROM
rp JOIN pg_dist_background_task t ON rp.sessionid = t.pid
JOIN pg_dist_background_job j ON t.job_id = j.job_id
WHERE j.job_id = $1
AND t.status = 'running'
),
errored_task_details AS (
SELECT jsonb_agg(jsonb_build_object(
'state', t.status,
'retried', coalesce(t.retry_count,0),
'message', t.message,
'command', t.command,
'task_id', t.task_id )) AS tasks
FROM
pg_dist_background_task t JOIN pg_dist_background_job j ON t.job_id = j.job_id
WHERE j.job_id = $1
AND NOT EXISTS (SELECT 1 FROM rp WHERE rp.sessionid = t.pid)
AND t.status = 'error'
)
SELECT
job_id,
state,
job_type,
description,
started_at,
finished_at,
jsonb_build_object(
'task_state_counts', (SELECT jsonb_object_agg(status, count) FROM task_state_occurence_counts),
'tasks', (COALESCE((SELECT tasks FROM running_task_details),'[]'::jsonb) ||
COALESCE((SELECT tasks FROM errored_task_details),'[]'::jsonb))) AS details
FROM pg_dist_background_job j
WHERE j.job_id = $1
$fn$;

View File

@ -0,0 +1,29 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_status (
raw boolean DEFAULT FALSE
)
RETURNS TABLE (
job_id bigint,
state pg_catalog.citus_job_status,
job_type name,
description text,
started_at timestamptz,
finished_at timestamptz,
details jsonb
)
LANGUAGE SQL
STRICT
AS $fn$
SELECT
job_status.*
FROM
pg_dist_background_job j,
citus_job_status (j.job_id, $1) job_status
WHERE
j.job_id IN (
SELECT job_id
FROM pg_dist_background_job
WHERE job_type = 'rebalance'
ORDER BY job_id DESC
LIMIT 1
);
$fn$;

View File

@ -0,0 +1,29 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_status (
raw boolean DEFAULT FALSE
)
RETURNS TABLE (
job_id bigint,
state pg_catalog.citus_job_status,
job_type name,
description text,
started_at timestamptz,
finished_at timestamptz,
details jsonb
)
LANGUAGE SQL
STRICT
AS $fn$
SELECT
job_status.*
FROM
pg_dist_background_job j,
citus_job_status (j.job_id, $1) job_status
WHERE
j.job_id IN (
SELECT job_id
FROM pg_dist_background_job
WHERE job_type = 'rebalance'
ORDER BY job_id DESC
LIMIT 1
);
$fn$;

View File

@ -42,7 +42,7 @@ SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localh
SELECT 1 FROM citus_rebalance_start();
NOTICE: Scheduled 1 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status();
?column?
---------------------------------------------------------------------
1
@ -75,7 +75,7 @@ HINT: If you wish to continue without a replica identity set the shard_transfer
SELECT 1 FROM citus_rebalance_start(shard_transfer_mode => 'block_writes');
NOTICE: Scheduled 1 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status();
?column?
---------------------------------------------------------------------
1
@ -98,7 +98,7 @@ SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localh
SELECT 1 FROM citus_rebalance_start();
NOTICE: Scheduled 1 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status();
?column?
---------------------------------------------------------------------
1
@ -134,7 +134,7 @@ DETAIL: Move may have already completed.
SELECT 1 FROM citus_rebalance_start();
NOTICE: Scheduled 1 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status();
?column?
---------------------------------------------------------------------
1
@ -143,7 +143,7 @@ HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job
SELECT 1 FROM citus_rebalance_start();
ERROR: A rebalance is already running as job xxx
DETAIL: A rebalance was already scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status();
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
@ -160,7 +160,7 @@ SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localh
SELECT 1 FROM citus_rebalance_start();
NOTICE: Scheduled 1 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status();
?column?
---------------------------------------------------------------------
1
@ -169,7 +169,7 @@ HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job
SELECT rebalance_table_shards();
ERROR: A rebalance is already running as job xxx
DETAIL: A rebalance was already scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status();
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
@ -197,7 +197,7 @@ SELECT citus_move_shard_placement(85674008, 'localhost', :worker_1_port, 'localh
SELECT 1 FROM citus_rebalance_start();
NOTICE: Scheduled 1 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status();
?column?
---------------------------------------------------------------------
1

View File

@ -1271,7 +1271,10 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_get_transaction_clock() cluster_clock
| function citus_internal_adjust_local_clock_to_remote(cluster_clock) void
| function citus_is_clock_after(cluster_clock,cluster_clock) boolean
| function citus_job_list() TABLE(job_id bigint, state citus_job_status, job_type name, description text, started_at timestamp with time zone, finished_at timestamp with time zone)
| function citus_job_status(bigint,boolean) TABLE(job_id bigint, state citus_job_status, job_type name, description text, started_at timestamp with time zone, finished_at timestamp with time zone, details jsonb)
| function citus_move_shard_placement(bigint,integer,integer,citus.shard_transfer_mode) void
| function citus_rebalance_status(boolean) TABLE(job_id bigint, state citus_job_status, job_type name, description text, started_at timestamp with time zone, finished_at timestamp with time zone, details jsonb)
| function citus_task_wait(bigint,citus_task_status) void
| function cluster_clock_cmp(cluster_clock,cluster_clock) integer
| function cluster_clock_eq(cluster_clock,cluster_clock) boolean
@ -1297,7 +1300,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| operator family cluster_clock_ops for access method btree
| sequence pg_dist_clock_logical_seq
| type cluster_clock
(34 rows)
(37 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -80,6 +80,8 @@ ORDER BY 1;
function citus_is_coordinator()
function citus_isolation_test_session_is_blocked(integer,integer[])
function citus_job_cancel(bigint)
function citus_job_list()
function citus_job_status(bigint,boolean)
function citus_job_wait(bigint,citus_job_status)
function citus_json_concatenate(json,json)
function citus_json_concatenate_final(json)
@ -97,6 +99,7 @@ ORDER BY 1;
function citus_prepare_pg_upgrade()
function citus_query_stats()
function citus_rebalance_start(name,boolean,citus.shard_transfer_mode)
function citus_rebalance_status(boolean)
function citus_rebalance_stop()
function citus_rebalance_wait()
function citus_relation_size(regclass)
@ -314,5 +317,5 @@ ORDER BY 1;
view citus_stat_statements
view pg_dist_shard_placement
view time_partitions
(306 rows)
(309 rows)