Introduces citus_stat_activity view

pull/5731/head
Halil Ozan Akgul 2022-02-22 19:03:11 +03:00
parent ab614194fd
commit 06a0509b1a
18 changed files with 446 additions and 29 deletions

View File

@ -85,3 +85,5 @@ $$;
#include "udfs/citus_finalize_upgrade_to_citus11/11.0-1.sql"
#include "udfs/run_command_on_all_nodes/11.0-1.sql"
#include "udfs/citus_stat_activity/11.0-1.sql"

View File

@ -119,10 +119,13 @@ CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int
RESET search_path;
DROP FUNCTION citus_internal_local_blocked_processes CASCADE;
DROP FUNCTION citus_internal_global_blocked_processes CASCADE;
DROP VIEW pg_catalog.citus_lock_waits;
DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE;
DROP FUNCTION citus_internal_local_blocked_processes;
DROP FUNCTION citus_internal_global_blocked_processes;
DROP VIEW pg_catalog.citus_dist_stat_activity;
DROP FUNCTION pg_catalog.citus_dist_stat_activity;
CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
@ -148,7 +151,8 @@ ALTER VIEW citus.citus_dist_stat_activity SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC;
SET search_path = 'pg_catalog';
DROP FUNCTION citus_worker_stat_activity CASCADE;
DROP VIEW citus_worker_stat_activity;
DROP FUNCTION citus_worker_stat_activity;
CREATE OR REPLACE FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
@ -171,10 +175,10 @@ IS 'returns distributed transaction activity on shards of distributed tables';
DROP FUNCTION pg_catalog.worker_create_or_replace_object(text[]);
#include "../udfs/worker_create_or_replace_object/9.0-1.sql"
DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(bigint) CASCADE;
DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(bigint, bigint) CASCADE;
DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(bigint);
DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(bigint, bigint);
DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE;
DROP FUNCTION pg_catalog.dump_local_wait_edges;
CREATE FUNCTION pg_catalog.dump_local_wait_edges(
OUT waiting_pid int4,
OUT waiting_node_id int4,
@ -191,7 +195,7 @@ AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges()
IS 'returns all local lock wait chains, that start from distributed transactions';
DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE;
DROP FUNCTION pg_catalog.dump_global_wait_edges;
CREATE FUNCTION pg_catalog.dump_global_wait_edges(
OUT waiting_pid int4,
OUT waiting_node_id int4,
@ -351,3 +355,7 @@ GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC;
DROP FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool);
RESET search_path;
DROP VIEW IF EXISTS pg_catalog.citus_stat_activity;
DROP FUNCTION IF EXISTS pg_catalog.citus_stat_activity;
DROP FUNCTION IF EXISTS pg_catalog.run_command_on_all_nodes;

View File

@ -0,0 +1,42 @@
-- citus_stat_activity combines the pg_stat_activity views from all nodes and adds global_pid, nodeid and is_worker_query columns.
-- The columns of citus_stat_activity don't change based on the Postgres version, however the pg_stat_activity's columns do.
-- Both Postgres 13 and 14 added one more column to pg_stat_activity (leader_pid and query_id).
-- citus_stat_activity has the most expansive column set, including the newly added columns.
-- If citus_stat_activity is queried in a Postgres version where pg_stat_activity doesn't have some columns citus_stat_activity has
-- the values for those columns will be NULL
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_activity(OUT global_pid bigint, OUT nodeid int, OUT is_worker_query boolean, OUT datid oid, OUT datname name, OUT pid integer,
OUT leader_pid integer, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr inet, OUT client_hostname text,
OUT client_port integer, OUT backend_start timestamp with time zone, OUT xact_start timestamp with time zone,
OUT query_start timestamp with time zone, OUT state_change timestamp with time zone, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query_id bigint, OUT query text, OUT backend_type text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
BEGIN
RETURN QUERY SELECT * FROM jsonb_to_recordset((
SELECT jsonb_agg(all_csa_rows_as_jsonb.csa_row_as_jsonb)::JSONB FROM (
SELECT jsonb_array_elements(run_command_on_all_nodes.result::JSONB)::JSONB || ('{"nodeid":' || run_command_on_all_nodes.nodeid || '}')::JSONB AS csa_row_as_jsonb
FROM run_command_on_all_nodes($$
SELECT coalesce(to_jsonb(array_agg(csa_from_one_node.*)), '[{}]'::JSONB)
FROM (
SELECT global_pid, worker_query AS is_worker_query, pg_stat_activity.* FROM
pg_stat_activity LEFT JOIN get_all_active_transactions() ON process_id = pid
) AS csa_from_one_node;
$$, parallel:=true, give_warning_for_connection_errors:=true)
WHERE success = 't'
) AS all_csa_rows_as_jsonb
))
AS (global_pid bigint, nodeid int, is_worker_query boolean, datid oid, datname name, pid integer,
leader_pid integer, usesysid oid, usename name, application_name text, client_addr inet, client_hostname text,
client_port integer, backend_start timestamp with time zone, xact_start timestamp with time zone,
query_start timestamp with time zone, state_change timestamp with time zone, wait_event_type text, wait_event text,
state text, backend_xid xid, backend_xmin xid, query_id bigint, query text, backend_type text);
END;
$function$;
CREATE OR REPLACE VIEW citus.citus_stat_activity AS
SELECT * FROM pg_catalog.citus_stat_activity();
ALTER VIEW citus.citus_stat_activity SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_stat_activity TO PUBLIC;

View File

@ -0,0 +1,42 @@
-- citus_stat_activity combines the pg_stat_activity views from all nodes and adds global_pid, nodeid and is_worker_query columns.
-- The columns of citus_stat_activity don't change based on the Postgres version, however the pg_stat_activity's columns do.
-- Both Postgres 13 and 14 added one more column to pg_stat_activity (leader_pid and query_id).
-- citus_stat_activity has the most expansive column set, including the newly added columns.
-- If citus_stat_activity is queried in a Postgres version where pg_stat_activity doesn't have some columns citus_stat_activity has
-- the values for those columns will be NULL
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_activity(OUT global_pid bigint, OUT nodeid int, OUT is_worker_query boolean, OUT datid oid, OUT datname name, OUT pid integer,
OUT leader_pid integer, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr inet, OUT client_hostname text,
OUT client_port integer, OUT backend_start timestamp with time zone, OUT xact_start timestamp with time zone,
OUT query_start timestamp with time zone, OUT state_change timestamp with time zone, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query_id bigint, OUT query text, OUT backend_type text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
BEGIN
RETURN QUERY SELECT * FROM jsonb_to_recordset((
SELECT jsonb_agg(all_csa_rows_as_jsonb.csa_row_as_jsonb)::JSONB FROM (
SELECT jsonb_array_elements(run_command_on_all_nodes.result::JSONB)::JSONB || ('{"nodeid":' || run_command_on_all_nodes.nodeid || '}')::JSONB AS csa_row_as_jsonb
FROM run_command_on_all_nodes($$
SELECT coalesce(to_jsonb(array_agg(csa_from_one_node.*)), '[{}]'::JSONB)
FROM (
SELECT global_pid, worker_query AS is_worker_query, pg_stat_activity.* FROM
pg_stat_activity LEFT JOIN get_all_active_transactions() ON process_id = pid
) AS csa_from_one_node;
$$, parallel:=true, give_warning_for_connection_errors:=true)
WHERE success = 't'
) AS all_csa_rows_as_jsonb
))
AS (global_pid bigint, nodeid int, is_worker_query boolean, datid oid, datname name, pid integer,
leader_pid integer, usesysid oid, usename name, application_name text, client_addr inet, client_hostname text,
client_port integer, backend_start timestamp with time zone, xact_start timestamp with time zone,
query_start timestamp with time zone, state_change timestamp with time zone, wait_event_type text, wait_event text,
state text, backend_xid xid, backend_xmin xid, query_id bigint, query text, backend_type text);
END;
$function$;
CREATE OR REPLACE VIEW citus.citus_stat_activity AS
SELECT * FROM pg_catalog.citus_stat_activity();
ALTER VIEW citus.citus_stat_activity SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_stat_activity TO PUBLIC;

View File

@ -0,0 +1,60 @@
DROP FUNCTION IF EXISTS pg_catalog.run_command_on_all_nodes;
CREATE FUNCTION pg_catalog.run_command_on_all_nodes(command text, parallel bool default true, give_warning_for_connection_errors bool default false,
OUT nodeid int, OUT success bool, OUT result text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
DECLARE
nodenames text[];
ports int[];
commands text[];
current_node_is_in_metadata boolean;
command_result_of_current_node text;
BEGIN
WITH citus_nodes AS (
SELECT * FROM pg_dist_node
WHERE isactive = 't' AND nodecluster = current_setting('citus.cluster_name')
AND (
(current_setting('citus.use_secondary_nodes') = 'never' AND noderole = 'primary')
OR
(current_setting('citus.use_secondary_nodes') = 'always' AND noderole = 'secondary')
)
ORDER BY nodename, nodeport
)
SELECT array_agg(citus_nodes.nodename), array_agg(citus_nodes.nodeport), array_agg(command)
INTO nodenames, ports, commands
FROM citus_nodes;
SELECT count(*) > 0 FROM pg_dist_node
WHERE isactive = 't'
AND nodecluster = current_setting('citus.cluster_name')
AND groupid IN (SELECT groupid FROM pg_dist_local_group)
INTO current_node_is_in_metadata;
-- This will happen when we call this function on coordinator and
-- the coordinator is not added to the metadata.
-- We'll manually add current node to the lists to actually run on all nodes.
-- But when the coordinator is not added to metadata and this function
-- is called from a worker node, this will not be enough and we'll
-- not be able run on all nodes.
IF NOT current_node_is_in_metadata THEN
SELECT
array_append(nodenames, current_setting('citus.local_hostname')),
array_append(ports, current_setting('port')::int),
array_append(commands, command)
INTO nodenames, ports, commands;
END IF;
FOR nodeid, success, result IN
SELECT coalesce(pg_dist_node.nodeid, 0) AS nodeid, mrow.success, mrow.result
FROM master_run_on_worker(nodenames, ports, commands, parallel) mrow
LEFT JOIN pg_dist_node ON mrow.node_name = pg_dist_node.nodename AND mrow.node_port = pg_dist_node.nodeport
LOOP
IF give_warning_for_connection_errors AND NOT success THEN
RAISE WARNING 'Error on node with node id %: %', nodeid, result;
END IF;
RETURN NEXT;
END LOOP;
END;
$function$;

View File

@ -0,0 +1,60 @@
DROP FUNCTION IF EXISTS pg_catalog.run_command_on_all_nodes;
CREATE FUNCTION pg_catalog.run_command_on_all_nodes(command text, parallel bool default true, give_warning_for_connection_errors bool default false,
OUT nodeid int, OUT success bool, OUT result text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
DECLARE
nodenames text[];
ports int[];
commands text[];
current_node_is_in_metadata boolean;
command_result_of_current_node text;
BEGIN
WITH citus_nodes AS (
SELECT * FROM pg_dist_node
WHERE isactive = 't' AND nodecluster = current_setting('citus.cluster_name')
AND (
(current_setting('citus.use_secondary_nodes') = 'never' AND noderole = 'primary')
OR
(current_setting('citus.use_secondary_nodes') = 'always' AND noderole = 'secondary')
)
ORDER BY nodename, nodeport
)
SELECT array_agg(citus_nodes.nodename), array_agg(citus_nodes.nodeport), array_agg(command)
INTO nodenames, ports, commands
FROM citus_nodes;
SELECT count(*) > 0 FROM pg_dist_node
WHERE isactive = 't'
AND nodecluster = current_setting('citus.cluster_name')
AND groupid IN (SELECT groupid FROM pg_dist_local_group)
INTO current_node_is_in_metadata;
-- This will happen when we call this function on coordinator and
-- the coordinator is not added to the metadata.
-- We'll manually add current node to the lists to actually run on all nodes.
-- But when the coordinator is not added to metadata and this function
-- is called from a worker node, this will not be enough and we'll
-- not be able run on all nodes.
IF NOT current_node_is_in_metadata THEN
SELECT
array_append(nodenames, current_setting('citus.local_hostname')),
array_append(ports, current_setting('port')::int),
array_append(commands, command)
INTO nodenames, ports, commands;
END IF;
FOR nodeid, success, result IN
SELECT coalesce(pg_dist_node.nodeid, 0) AS nodeid, mrow.success, mrow.result
FROM master_run_on_worker(nodenames, ports, commands, parallel) mrow
LEFT JOIN pg_dist_node ON mrow.node_name = pg_dist_node.nodename AND mrow.node_port = pg_dist_node.nodeport
LOOP
IF give_warning_for_connection_errors AND NOT success THEN
RAISE WARNING 'Error on node with node id %: %', nodeid, result;
END IF;
RETURN NEXT;
END LOOP;
END;
$function$;

View File

@ -1,4 +1,4 @@
test: upgrade_basic_after upgrade_type_after upgrade_ref2ref_after upgrade_distributed_function_after upgrade_rebalance_strategy_after upgrade_list_citus_objects upgrade_autoconverted_after
test: upgrade_basic_after upgrade_type_after upgrade_ref2ref_after upgrade_distributed_function_after upgrade_rebalance_strategy_after upgrade_list_citus_objects upgrade_autoconverted_after upgrade_citus_stat_activity
# This attempts dropping citus extension (and rollbacks), so please do
# not run in parallel with any other tests.

View File

@ -6,6 +6,7 @@ test: upgrade_ref2ref_before
test: upgrade_type_before
test: upgrade_distributed_function_before upgrade_rebalance_strategy_before
test: upgrade_autoconverted_before
test: upgrade_citus_stat_activity
# upgrade_columnar_before renames public schema to citus_schema, so let's
# run this test as the last one.

View File

@ -260,3 +260,6 @@ s/NOTICE: issuing WITH placement_data\(shardid, shardstate, shardlength, groupi
# global_pid when pg_cancel_backend is sent to workers
s/pg_cancel_backend\('[0-9]+'::bigint\)/pg_cancel_backend('xxxxx'::bigint)/g
s/issuing SELECT pg_cancel_backend\([0-9]+::integer\)/issuing SELECT pg_cancel_backend(xxxxx::integer)/g
# node id in run_command_on_all_nodes warning
s/Error on node with node id [0-9]+/Error on node with node id xxxxx/g

View File

@ -1,6 +1,6 @@
Parsed test spec with 2 sessions
starting permutation: s1-start-session-level-connection s1-worker-begin s1-worker-select s2-coordinator-citus_dist_stat_activity s2-coordinator-citus_worker_stat_activity s1-worker-commit s1-stop-session-level-connection
starting permutation: s1-start-session-level-connection s1-worker-begin s1-worker-select s2-coordinator-citus_stat_activity s2-coordinator-citus_dist_stat_activity s2-coordinator-citus_worker_stat_activity s1-worker-commit s1-stop-session-level-connection
create_distributed_table
---------------------------------------------------------------------
@ -30,17 +30,29 @@ run_commands_on_session_level_connection_to_node
(1 row)
step s2-coordinator-citus_dist_stat_activity:
SELECT global_pid != 0 FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' and query NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
step s2-coordinator-citus_stat_activity:
SELECT global_pid != 0 FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' AND query NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
?column?
---------------------------------------------------------------------
t
(1 row)
step s2-coordinator-citus_dist_stat_activity:
SELECT query FROM citus_dist_stat_activity() WHERE global_pid IN (
SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%'
)
AND query NOT ILIKE '%run_commands_on_session_level_connection_to_node%'
ORDER BY 1;
query
---------------------------------------------------------------------
SET citus.enable_local_execution TO off; SET citus.force_max_query_parallelization TO ON; SELECT * FROM dist_table
(1 row)
step s2-coordinator-citus_worker_stat_activity:
SELECT query FROM citus_worker_stat_activity() WHERE global_pid IN (
SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'
SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%'
)
ORDER BY 1;
@ -68,13 +80,13 @@ stop_session_level_connection_to_node
(1 row)
restore_isolation_tester_func
citus_remove_node
---------------------------------------------------------------------
(1 row)
starting permutation: s1-coordinator-begin s1-coordinator-select s2-coordinator-citus_dist_stat_activity s2-coordinator-citus_worker_stat_activity s2-coordinator-get_all_active_transactions s2-coordinator-get_global_active_transactions s1-coordinator-commit
starting permutation: s1-coordinator-begin s1-coordinator-select s2-coordinator-citus_stat_activity s2-coordinator-citus_dist_stat_activity s2-coordinator-citus_worker_stat_activity s2-coordinator-get_all_active_transactions s2-coordinator-get_global_active_transactions s1-coordinator-commit
create_distributed_table
---------------------------------------------------------------------
@ -92,17 +104,33 @@ a|b
---------------------------------------------------------------------
(0 rows)
step s2-coordinator-citus_dist_stat_activity:
SELECT global_pid != 0 FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' and query NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
step s2-coordinator-citus_stat_activity:
SELECT global_pid != 0 FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' AND query NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
?column?
---------------------------------------------------------------------
t
(1 row)
step s2-coordinator-citus_dist_stat_activity:
SELECT query FROM citus_dist_stat_activity() WHERE global_pid IN (
SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%'
)
AND query NOT ILIKE '%run_commands_on_session_level_connection_to_node%'
ORDER BY 1;
query
---------------------------------------------------------------------
SET citus.enable_local_execution TO off;
SET citus.force_max_query_parallelization TO ON;
SELECT * FROM dist_table;
(1 row)
step s2-coordinator-citus_worker_stat_activity:
SELECT query FROM citus_worker_stat_activity() WHERE global_pid IN (
SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'
SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%'
)
ORDER BY 1;
@ -116,7 +144,7 @@ SELECT a, b FROM public.dist_table_12345003 dist_table WHERE true
step s2-coordinator-get_all_active_transactions:
SELECT count(*) FROM get_all_active_transactions() WHERE global_pid IN (
SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'
SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%'
);
count
@ -126,7 +154,7 @@ count
step s2-coordinator-get_global_active_transactions:
SELECT count(*) FROM get_global_active_transactions() WHERE global_pid IN (
SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'
SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%'
)
AND transaction_number != 0;
@ -138,7 +166,7 @@ count
step s1-coordinator-commit:
COMMIT;
restore_isolation_tester_func
citus_remove_node
---------------------------------------------------------------------
(1 row)

View File

@ -1021,14 +1021,17 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_run_local_command(text) void
| function citus_shard_indexes_on_worker() SETOF record
| function citus_shards_on_worker() SETOF record
| function citus_stat_activity() SETOF record
| function create_distributed_function(regprocedure,text,text,boolean) void
| function pg_cancel_backend(bigint) boolean
| function pg_terminate_backend(bigint,bigint) boolean
| function run_command_on_all_nodes(text,boolean,boolean) SETOF record
| function worker_create_or_replace_object(text[]) boolean
| function worker_drop_sequence_dependency(text) void
| function worker_drop_shell_table(text) void
| function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) SETOF record
(25 rows)
| view citus_stat_activity
(28 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -0,0 +1,88 @@
CREATE SCHEMA run_command_on_all_nodes;
SET search_path TO run_command_on_all_nodes;
-- check coordinator isn't in metadata
SELECT count(*) != 0 AS "Coordinator is in Metadata"
FROM pg_dist_node
WHERE groupid IN (
SELECT groupid FROM pg_dist_local_group
);
Coordinator is in Metadata
---------------------------------------------------------------------
f
(1 row)
-- run a simple select query and check it also runs in coordinator
SELECT nodeid NOT IN (SELECT nodeid FROM pg_dist_node) AS "Is Coordinator", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1;
Is Coordinator | success | result
---------------------------------------------------------------------
f | t | 1
f | t | 1
t | t | 1
(3 rows)
-- check that when coordinator is not in metadata and run_command_on_all_nodes is called from
-- a worker node, command is not run on the coordinator
\c - - - :worker_1_port
SELECT nodeid NOT IN (SELECT nodeid FROM pg_dist_node) AS "Is Coordinator", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1;
Is Coordinator | success | result
---------------------------------------------------------------------
f | t | 1
f | t | 1
(2 rows)
\c - - - :master_port
-- create a table
SELECT result FROM run_command_on_all_nodes('CREATE TABLE run_command_on_all_nodes.tbl (a INT)');
result
---------------------------------------------------------------------
CREATE TABLE
CREATE TABLE
CREATE TABLE
(3 rows)
SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes';
tablename
---------------------------------------------------------------------
tbl
(1 row)
\c - - - :worker_1_port
SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes';
tablename
---------------------------------------------------------------------
tbl
(1 row)
\c - - - :master_port
SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHERE schemaname = ''run_command_on_all_nodes'';');
result
---------------------------------------------------------------------
tbl
tbl
tbl
(3 rows)
-- break a node and check messages
SELECT nodeid AS worker_1_nodeid FROM pg_dist_node WHERE nodeport = :worker_1_port \gset
UPDATE pg_dist_node SET nodeport = 0 WHERE nodeid = :worker_1_nodeid;
SELECT nodeid = :worker_1_nodeid AS "Is Worker 1", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1;
Is Worker 1 | success | result
---------------------------------------------------------------------
f | t | 1
f | t | 1
t | f | failed to connect to localhost:xxxxx
(3 rows)
SELECT nodeid = :worker_1_nodeid AS "Is Worker 1", success, result FROM run_command_on_all_nodes('SELECT 1', give_warning_for_connection_errors:=true) ORDER BY 1;
WARNING: Error on node with node id xxxxx: failed to connect to localhost:xxxxx
CONTEXT: PL/pgSQL function run_command_on_all_nodes(text,boolean,boolean) line XX at RAISE
Is Worker 1 | success | result
---------------------------------------------------------------------
f | t | 1
f | t | 1
t | f | failed to connect to localhost:xxxxx
(3 rows)
UPDATE pg_dist_node SET nodeport = :worker_1_port WHERE nodeid = :worker_1_nodeid;
DROP SCHEMA run_command_on_all_nodes CASCADE;
NOTICE: drop cascades to table run_command_on_all_nodes.tbl

View File

@ -0,0 +1,17 @@
SELECT column_name FROM information_schema.columns WHERE table_name = 'citus_stat_activity' AND column_name NOT IN ('leader_pid', 'query_id')
EXCEPT SELECT column_name FROM information_schema.columns WHERE table_name = 'pg_stat_activity'
ORDER BY 1;
column_name
---------------------------------------------------------------------
global_pid
is_worker_query
nodeid
(3 rows)
SELECT column_name FROM information_schema.columns WHERE table_name = 'pg_stat_activity'
EXCEPT SELECT column_name FROM information_schema.columns WHERE table_name = 'citus_stat_activity'
ORDER BY 1;
column_name
---------------------------------------------------------------------
(0 rows)

View File

@ -105,6 +105,7 @@ ORDER BY 1;
function citus_shard_indexes_on_worker()
function citus_shard_sizes()
function citus_shards_on_worker()
function citus_stat_activity()
function citus_stat_statements()
function citus_stat_statements_reset()
function citus_table_is_visible(oid)
@ -189,6 +190,7 @@ ORDER BY 1;
function replicate_reference_tables()
function replicate_table_shards(regclass,integer,integer,bigint[],citus.shard_transfer_mode)
function role_exists(name)
function run_command_on_all_nodes(text,boolean,boolean)
function run_command_on_colocated_placements(regclass,regclass,text,boolean)
function run_command_on_placements(regclass,text,boolean)
function run_command_on_shards(regclass,text,boolean)
@ -271,9 +273,10 @@ ORDER BY 1;
view citus_shard_indexes_on_worker
view citus_shards
view citus_shards_on_worker
view citus_stat_activity
view citus_stat_statements
view citus_worker_stat_activity
view pg_dist_shard_placement
view time_partitions
(259 rows)
(262 rows)

View File

@ -94,6 +94,7 @@ test: alter_distributed_table
test: issue_5248 issue_5099
test: object_propagation_debug
test: undistribute_table
test: run_command_on_all_nodes
# ---------

View File

@ -2,6 +2,7 @@
setup
{
SELECT citus_add_node('localhost', 57636, groupid:=0);
SET citus.next_shard_id TO 12345000;
CREATE TABLE dist_table (a INT, b INT);
SELECT create_distributed_table('dist_table', 'a', shard_count:=4);
@ -11,6 +12,7 @@ teardown
{
DROP TABLE dist_table;
SELECT citus_internal.restore_isolation_tester_func();
SELECT citus_remove_node('localhost', 57636);
}
session "s1"
@ -60,15 +62,24 @@ step "s1-stop-session-level-connection"
session "s2"
step "s2-coordinator-citus_stat_activity"
{
SELECT global_pid != 0 FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' AND query NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
}
step "s2-coordinator-citus_dist_stat_activity"
{
SELECT global_pid != 0 FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' and query NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
SELECT query FROM citus_dist_stat_activity() WHERE global_pid IN (
SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%'
)
AND query NOT ILIKE '%run_commands_on_session_level_connection_to_node%'
ORDER BY 1;
}
step "s2-coordinator-citus_worker_stat_activity"
{
SELECT query FROM citus_worker_stat_activity() WHERE global_pid IN (
SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'
SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%'
)
ORDER BY 1;
}
@ -76,21 +87,21 @@ step "s2-coordinator-citus_worker_stat_activity"
step "s2-coordinator-get_all_active_transactions"
{
SELECT count(*) FROM get_all_active_transactions() WHERE global_pid IN (
SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'
SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%'
);
}
step "s2-coordinator-get_global_active_transactions"
{
SELECT count(*) FROM get_global_active_transactions() WHERE global_pid IN (
SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'
SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%'
)
AND transaction_number != 0;
}
// worker - coordinator
permutation "s1-start-session-level-connection" "s1-worker-begin" "s1-worker-select" "s2-coordinator-citus_dist_stat_activity" "s2-coordinator-citus_worker_stat_activity" "s1-worker-commit" "s1-stop-session-level-connection"
permutation "s1-start-session-level-connection" "s1-worker-begin" "s1-worker-select" "s2-coordinator-citus_stat_activity" "s2-coordinator-citus_dist_stat_activity" "s2-coordinator-citus_worker_stat_activity" "s1-worker-commit" "s1-stop-session-level-connection"
// coordinator - coordinator
permutation "s1-coordinator-begin" "s1-coordinator-select" "s2-coordinator-citus_dist_stat_activity" "s2-coordinator-citus_worker_stat_activity" "s2-coordinator-get_all_active_transactions" "s2-coordinator-get_global_active_transactions" "s1-coordinator-commit"
permutation "s1-coordinator-begin" "s1-coordinator-select" "s2-coordinator-citus_stat_activity" "s2-coordinator-citus_dist_stat_activity" "s2-coordinator-citus_worker_stat_activity" "s2-coordinator-get_all_active_transactions" "s2-coordinator-get_global_active_transactions" "s1-coordinator-commit"

View File

@ -0,0 +1,41 @@
CREATE SCHEMA run_command_on_all_nodes;
SET search_path TO run_command_on_all_nodes;
-- check coordinator isn't in metadata
SELECT count(*) != 0 AS "Coordinator is in Metadata"
FROM pg_dist_node
WHERE groupid IN (
SELECT groupid FROM pg_dist_local_group
);
-- run a simple select query and check it also runs in coordinator
SELECT nodeid NOT IN (SELECT nodeid FROM pg_dist_node) AS "Is Coordinator", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1;
-- check that when coordinator is not in metadata and run_command_on_all_nodes is called from
-- a worker node, command is not run on the coordinator
\c - - - :worker_1_port
SELECT nodeid NOT IN (SELECT nodeid FROM pg_dist_node) AS "Is Coordinator", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1;
\c - - - :master_port
-- create a table
SELECT result FROM run_command_on_all_nodes('CREATE TABLE run_command_on_all_nodes.tbl (a INT)');
SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes';
\c - - - :worker_1_port
SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes';
\c - - - :master_port
SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHERE schemaname = ''run_command_on_all_nodes'';');
-- break a node and check messages
SELECT nodeid AS worker_1_nodeid FROM pg_dist_node WHERE nodeport = :worker_1_port \gset
UPDATE pg_dist_node SET nodeport = 0 WHERE nodeid = :worker_1_nodeid;
SELECT nodeid = :worker_1_nodeid AS "Is Worker 1", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1;
SELECT nodeid = :worker_1_nodeid AS "Is Worker 1", success, result FROM run_command_on_all_nodes('SELECT 1', give_warning_for_connection_errors:=true) ORDER BY 1;
UPDATE pg_dist_node SET nodeport = :worker_1_port WHERE nodeid = :worker_1_nodeid;
DROP SCHEMA run_command_on_all_nodes CASCADE;

View File

@ -0,0 +1,7 @@
SELECT column_name FROM information_schema.columns WHERE table_name = 'citus_stat_activity' AND column_name NOT IN ('leader_pid', 'query_id')
EXCEPT SELECT column_name FROM information_schema.columns WHERE table_name = 'pg_stat_activity'
ORDER BY 1;
SELECT column_name FROM information_schema.columns WHERE table_name = 'pg_stat_activity'
EXCEPT SELECT column_name FROM information_schema.columns WHERE table_name = 'citus_stat_activity'
ORDER BY 1;