diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index 0b0c0ca39..e39b7546b 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -85,3 +85,5 @@ $$; #include "udfs/citus_finalize_upgrade_to_citus11/11.0-1.sql" +#include "udfs/run_command_on_all_nodes/11.0-1.sql" +#include "udfs/citus_stat_activity/11.0-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index 62b683cc0..a14969d24 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -119,10 +119,13 @@ CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int RESET search_path; -DROP FUNCTION citus_internal_local_blocked_processes CASCADE; -DROP FUNCTION citus_internal_global_blocked_processes CASCADE; +DROP VIEW pg_catalog.citus_lock_waits; -DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE; +DROP FUNCTION citus_internal_local_blocked_processes; +DROP FUNCTION citus_internal_global_blocked_processes; + +DROP VIEW pg_catalog.citus_dist_stat_activity; +DROP FUNCTION pg_catalog.citus_dist_stat_activity; CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, @@ -148,7 +151,8 @@ ALTER VIEW citus.citus_dist_stat_activity SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC; SET search_path = 'pg_catalog'; -DROP FUNCTION citus_worker_stat_activity CASCADE; +DROP VIEW citus_worker_stat_activity; +DROP FUNCTION citus_worker_stat_activity; CREATE OR REPLACE FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, @@ -171,10 +175,10 @@ IS 'returns distributed transaction activity on shards of distributed tables'; DROP FUNCTION pg_catalog.worker_create_or_replace_object(text[]); #include "../udfs/worker_create_or_replace_object/9.0-1.sql" -DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(bigint) CASCADE; -DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(bigint, bigint) CASCADE; +DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(bigint); +DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(bigint, bigint); -DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE; +DROP FUNCTION pg_catalog.dump_local_wait_edges; CREATE FUNCTION pg_catalog.dump_local_wait_edges( OUT waiting_pid int4, OUT waiting_node_id int4, @@ -191,7 +195,7 @@ AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges() IS 'returns all local lock wait chains, that start from distributed transactions'; -DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE; +DROP FUNCTION pg_catalog.dump_global_wait_edges; CREATE FUNCTION pg_catalog.dump_global_wait_edges( OUT waiting_pid int4, OUT waiting_node_id int4, @@ -351,3 +355,7 @@ GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; DROP FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool); RESET search_path; + +DROP VIEW IF EXISTS pg_catalog.citus_stat_activity; +DROP FUNCTION IF EXISTS pg_catalog.citus_stat_activity; +DROP FUNCTION IF EXISTS pg_catalog.run_command_on_all_nodes; diff --git a/src/backend/distributed/sql/udfs/citus_stat_activity/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_stat_activity/11.0-1.sql new file mode 100644 index 000000000..5dea5a903 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stat_activity/11.0-1.sql @@ -0,0 +1,42 @@ +-- citus_stat_activity combines the pg_stat_activity views from all nodes and adds global_pid, nodeid and is_worker_query columns. +-- The columns of citus_stat_activity don't change based on the Postgres version, however the pg_stat_activity's columns do. +-- Both Postgres 13 and 14 added one more column to pg_stat_activity (leader_pid and query_id). +-- citus_stat_activity has the most expansive column set, including the newly added columns. +-- If citus_stat_activity is queried in a Postgres version where pg_stat_activity doesn't have some columns citus_stat_activity has +-- the values for those columns will be NULL + +CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_activity(OUT global_pid bigint, OUT nodeid int, OUT is_worker_query boolean, OUT datid oid, OUT datname name, OUT pid integer, + OUT leader_pid integer, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr inet, OUT client_hostname text, + OUT client_port integer, OUT backend_start timestamp with time zone, OUT xact_start timestamp with time zone, + OUT query_start timestamp with time zone, OUT state_change timestamp with time zone, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query_id bigint, OUT query text, OUT backend_type text) + RETURNS SETOF record + LANGUAGE plpgsql + AS $function$ +BEGIN + RETURN QUERY SELECT * FROM jsonb_to_recordset(( + SELECT jsonb_agg(all_csa_rows_as_jsonb.csa_row_as_jsonb)::JSONB FROM ( + SELECT jsonb_array_elements(run_command_on_all_nodes.result::JSONB)::JSONB || ('{"nodeid":' || run_command_on_all_nodes.nodeid || '}')::JSONB AS csa_row_as_jsonb + FROM run_command_on_all_nodes($$ + SELECT coalesce(to_jsonb(array_agg(csa_from_one_node.*)), '[{}]'::JSONB) + FROM ( + SELECT global_pid, worker_query AS is_worker_query, pg_stat_activity.* FROM + pg_stat_activity LEFT JOIN get_all_active_transactions() ON process_id = pid + ) AS csa_from_one_node; + $$, parallel:=true, give_warning_for_connection_errors:=true) + WHERE success = 't' + ) AS all_csa_rows_as_jsonb + )) + AS (global_pid bigint, nodeid int, is_worker_query boolean, datid oid, datname name, pid integer, + leader_pid integer, usesysid oid, usename name, application_name text, client_addr inet, client_hostname text, + client_port integer, backend_start timestamp with time zone, xact_start timestamp with time zone, + query_start timestamp with time zone, state_change timestamp with time zone, wait_event_type text, wait_event text, + state text, backend_xid xid, backend_xmin xid, query_id bigint, query text, backend_type text); +END; +$function$; + +CREATE OR REPLACE VIEW citus.citus_stat_activity AS +SELECT * FROM pg_catalog.citus_stat_activity(); + +ALTER VIEW citus.citus_stat_activity SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_stat_activity TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_stat_activity/latest.sql b/src/backend/distributed/sql/udfs/citus_stat_activity/latest.sql new file mode 100644 index 000000000..5dea5a903 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stat_activity/latest.sql @@ -0,0 +1,42 @@ +-- citus_stat_activity combines the pg_stat_activity views from all nodes and adds global_pid, nodeid and is_worker_query columns. +-- The columns of citus_stat_activity don't change based on the Postgres version, however the pg_stat_activity's columns do. +-- Both Postgres 13 and 14 added one more column to pg_stat_activity (leader_pid and query_id). +-- citus_stat_activity has the most expansive column set, including the newly added columns. +-- If citus_stat_activity is queried in a Postgres version where pg_stat_activity doesn't have some columns citus_stat_activity has +-- the values for those columns will be NULL + +CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_activity(OUT global_pid bigint, OUT nodeid int, OUT is_worker_query boolean, OUT datid oid, OUT datname name, OUT pid integer, + OUT leader_pid integer, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr inet, OUT client_hostname text, + OUT client_port integer, OUT backend_start timestamp with time zone, OUT xact_start timestamp with time zone, + OUT query_start timestamp with time zone, OUT state_change timestamp with time zone, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query_id bigint, OUT query text, OUT backend_type text) + RETURNS SETOF record + LANGUAGE plpgsql + AS $function$ +BEGIN + RETURN QUERY SELECT * FROM jsonb_to_recordset(( + SELECT jsonb_agg(all_csa_rows_as_jsonb.csa_row_as_jsonb)::JSONB FROM ( + SELECT jsonb_array_elements(run_command_on_all_nodes.result::JSONB)::JSONB || ('{"nodeid":' || run_command_on_all_nodes.nodeid || '}')::JSONB AS csa_row_as_jsonb + FROM run_command_on_all_nodes($$ + SELECT coalesce(to_jsonb(array_agg(csa_from_one_node.*)), '[{}]'::JSONB) + FROM ( + SELECT global_pid, worker_query AS is_worker_query, pg_stat_activity.* FROM + pg_stat_activity LEFT JOIN get_all_active_transactions() ON process_id = pid + ) AS csa_from_one_node; + $$, parallel:=true, give_warning_for_connection_errors:=true) + WHERE success = 't' + ) AS all_csa_rows_as_jsonb + )) + AS (global_pid bigint, nodeid int, is_worker_query boolean, datid oid, datname name, pid integer, + leader_pid integer, usesysid oid, usename name, application_name text, client_addr inet, client_hostname text, + client_port integer, backend_start timestamp with time zone, xact_start timestamp with time zone, + query_start timestamp with time zone, state_change timestamp with time zone, wait_event_type text, wait_event text, + state text, backend_xid xid, backend_xmin xid, query_id bigint, query text, backend_type text); +END; +$function$; + +CREATE OR REPLACE VIEW citus.citus_stat_activity AS +SELECT * FROM pg_catalog.citus_stat_activity(); + +ALTER VIEW citus.citus_stat_activity SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_stat_activity TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/run_command_on_all_nodes/11.0-1.sql b/src/backend/distributed/sql/udfs/run_command_on_all_nodes/11.0-1.sql new file mode 100644 index 000000000..1361ef6aa --- /dev/null +++ b/src/backend/distributed/sql/udfs/run_command_on_all_nodes/11.0-1.sql @@ -0,0 +1,60 @@ +DROP FUNCTION IF EXISTS pg_catalog.run_command_on_all_nodes; + +CREATE FUNCTION pg_catalog.run_command_on_all_nodes(command text, parallel bool default true, give_warning_for_connection_errors bool default false, + OUT nodeid int, OUT success bool, OUT result text) + RETURNS SETOF record + LANGUAGE plpgsql + AS $function$ +DECLARE + nodenames text[]; + ports int[]; + commands text[]; + current_node_is_in_metadata boolean; + command_result_of_current_node text; +BEGIN + WITH citus_nodes AS ( + SELECT * FROM pg_dist_node + WHERE isactive = 't' AND nodecluster = current_setting('citus.cluster_name') + AND ( + (current_setting('citus.use_secondary_nodes') = 'never' AND noderole = 'primary') + OR + (current_setting('citus.use_secondary_nodes') = 'always' AND noderole = 'secondary') + ) + ORDER BY nodename, nodeport + ) + SELECT array_agg(citus_nodes.nodename), array_agg(citus_nodes.nodeport), array_agg(command) + INTO nodenames, ports, commands + FROM citus_nodes; + + SELECT count(*) > 0 FROM pg_dist_node + WHERE isactive = 't' + AND nodecluster = current_setting('citus.cluster_name') + AND groupid IN (SELECT groupid FROM pg_dist_local_group) + INTO current_node_is_in_metadata; + + -- This will happen when we call this function on coordinator and + -- the coordinator is not added to the metadata. + -- We'll manually add current node to the lists to actually run on all nodes. + -- But when the coordinator is not added to metadata and this function + -- is called from a worker node, this will not be enough and we'll + -- not be able run on all nodes. + IF NOT current_node_is_in_metadata THEN + SELECT + array_append(nodenames, current_setting('citus.local_hostname')), + array_append(ports, current_setting('port')::int), + array_append(commands, command) + INTO nodenames, ports, commands; + END IF; + + FOR nodeid, success, result IN + SELECT coalesce(pg_dist_node.nodeid, 0) AS nodeid, mrow.success, mrow.result + FROM master_run_on_worker(nodenames, ports, commands, parallel) mrow + LEFT JOIN pg_dist_node ON mrow.node_name = pg_dist_node.nodename AND mrow.node_port = pg_dist_node.nodeport + LOOP + IF give_warning_for_connection_errors AND NOT success THEN + RAISE WARNING 'Error on node with node id %: %', nodeid, result; + END IF; + RETURN NEXT; + END LOOP; +END; +$function$; diff --git a/src/backend/distributed/sql/udfs/run_command_on_all_nodes/latest.sql b/src/backend/distributed/sql/udfs/run_command_on_all_nodes/latest.sql new file mode 100644 index 000000000..1361ef6aa --- /dev/null +++ b/src/backend/distributed/sql/udfs/run_command_on_all_nodes/latest.sql @@ -0,0 +1,60 @@ +DROP FUNCTION IF EXISTS pg_catalog.run_command_on_all_nodes; + +CREATE FUNCTION pg_catalog.run_command_on_all_nodes(command text, parallel bool default true, give_warning_for_connection_errors bool default false, + OUT nodeid int, OUT success bool, OUT result text) + RETURNS SETOF record + LANGUAGE plpgsql + AS $function$ +DECLARE + nodenames text[]; + ports int[]; + commands text[]; + current_node_is_in_metadata boolean; + command_result_of_current_node text; +BEGIN + WITH citus_nodes AS ( + SELECT * FROM pg_dist_node + WHERE isactive = 't' AND nodecluster = current_setting('citus.cluster_name') + AND ( + (current_setting('citus.use_secondary_nodes') = 'never' AND noderole = 'primary') + OR + (current_setting('citus.use_secondary_nodes') = 'always' AND noderole = 'secondary') + ) + ORDER BY nodename, nodeport + ) + SELECT array_agg(citus_nodes.nodename), array_agg(citus_nodes.nodeport), array_agg(command) + INTO nodenames, ports, commands + FROM citus_nodes; + + SELECT count(*) > 0 FROM pg_dist_node + WHERE isactive = 't' + AND nodecluster = current_setting('citus.cluster_name') + AND groupid IN (SELECT groupid FROM pg_dist_local_group) + INTO current_node_is_in_metadata; + + -- This will happen when we call this function on coordinator and + -- the coordinator is not added to the metadata. + -- We'll manually add current node to the lists to actually run on all nodes. + -- But when the coordinator is not added to metadata and this function + -- is called from a worker node, this will not be enough and we'll + -- not be able run on all nodes. + IF NOT current_node_is_in_metadata THEN + SELECT + array_append(nodenames, current_setting('citus.local_hostname')), + array_append(ports, current_setting('port')::int), + array_append(commands, command) + INTO nodenames, ports, commands; + END IF; + + FOR nodeid, success, result IN + SELECT coalesce(pg_dist_node.nodeid, 0) AS nodeid, mrow.success, mrow.result + FROM master_run_on_worker(nodenames, ports, commands, parallel) mrow + LEFT JOIN pg_dist_node ON mrow.node_name = pg_dist_node.nodename AND mrow.node_port = pg_dist_node.nodeport + LOOP + IF give_warning_for_connection_errors AND NOT success THEN + RAISE WARNING 'Error on node with node id %: %', nodeid, result; + END IF; + RETURN NEXT; + END LOOP; +END; +$function$; diff --git a/src/test/regress/after_pg_upgrade_schedule b/src/test/regress/after_pg_upgrade_schedule index c8a1b6b40..f8e4e66ae 100644 --- a/src/test/regress/after_pg_upgrade_schedule +++ b/src/test/regress/after_pg_upgrade_schedule @@ -1,4 +1,4 @@ -test: upgrade_basic_after upgrade_type_after upgrade_ref2ref_after upgrade_distributed_function_after upgrade_rebalance_strategy_after upgrade_list_citus_objects upgrade_autoconverted_after +test: upgrade_basic_after upgrade_type_after upgrade_ref2ref_after upgrade_distributed_function_after upgrade_rebalance_strategy_after upgrade_list_citus_objects upgrade_autoconverted_after upgrade_citus_stat_activity # This attempts dropping citus extension (and rollbacks), so please do # not run in parallel with any other tests. diff --git a/src/test/regress/before_pg_upgrade_schedule b/src/test/regress/before_pg_upgrade_schedule index a5292e0ce..880b25e0a 100644 --- a/src/test/regress/before_pg_upgrade_schedule +++ b/src/test/regress/before_pg_upgrade_schedule @@ -6,6 +6,7 @@ test: upgrade_ref2ref_before test: upgrade_type_before test: upgrade_distributed_function_before upgrade_rebalance_strategy_before test: upgrade_autoconverted_before +test: upgrade_citus_stat_activity # upgrade_columnar_before renames public schema to citus_schema, so let's # run this test as the last one. diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 916d62afa..d7ffe31c3 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -260,3 +260,6 @@ s/NOTICE: issuing WITH placement_data\(shardid, shardstate, shardlength, groupi # global_pid when pg_cancel_backend is sent to workers s/pg_cancel_backend\('[0-9]+'::bigint\)/pg_cancel_backend('xxxxx'::bigint)/g s/issuing SELECT pg_cancel_backend\([0-9]+::integer\)/issuing SELECT pg_cancel_backend(xxxxx::integer)/g + +# node id in run_command_on_all_nodes warning +s/Error on node with node id [0-9]+/Error on node with node id xxxxx/g diff --git a/src/test/regress/expected/isolation_global_pid.out b/src/test/regress/expected/isolation_global_pid.out index 99ca48693..ad8630fb5 100644 --- a/src/test/regress/expected/isolation_global_pid.out +++ b/src/test/regress/expected/isolation_global_pid.out @@ -1,6 +1,6 @@ Parsed test spec with 2 sessions -starting permutation: s1-start-session-level-connection s1-worker-begin s1-worker-select s2-coordinator-citus_dist_stat_activity s2-coordinator-citus_worker_stat_activity s1-worker-commit s1-stop-session-level-connection +starting permutation: s1-start-session-level-connection s1-worker-begin s1-worker-select s2-coordinator-citus_stat_activity s2-coordinator-citus_dist_stat_activity s2-coordinator-citus_worker_stat_activity s1-worker-commit s1-stop-session-level-connection create_distributed_table --------------------------------------------------------------------- @@ -30,17 +30,29 @@ run_commands_on_session_level_connection_to_node (1 row) -step s2-coordinator-citus_dist_stat_activity: - SELECT global_pid != 0 FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' and query NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; +step s2-coordinator-citus_stat_activity: + SELECT global_pid != 0 FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' AND query NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; ?column? --------------------------------------------------------------------- t (1 row) +step s2-coordinator-citus_dist_stat_activity: + SELECT query FROM citus_dist_stat_activity() WHERE global_pid IN ( + SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' + ) + AND query NOT ILIKE '%run_commands_on_session_level_connection_to_node%' + ORDER BY 1; + +query +--------------------------------------------------------------------- +SET citus.enable_local_execution TO off; SET citus.force_max_query_parallelization TO ON; SELECT * FROM dist_table +(1 row) + step s2-coordinator-citus_worker_stat_activity: SELECT query FROM citus_worker_stat_activity() WHERE global_pid IN ( - SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' + SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' ) ORDER BY 1; @@ -68,13 +80,13 @@ stop_session_level_connection_to_node (1 row) -restore_isolation_tester_func +citus_remove_node --------------------------------------------------------------------- (1 row) -starting permutation: s1-coordinator-begin s1-coordinator-select s2-coordinator-citus_dist_stat_activity s2-coordinator-citus_worker_stat_activity s2-coordinator-get_all_active_transactions s2-coordinator-get_global_active_transactions s1-coordinator-commit +starting permutation: s1-coordinator-begin s1-coordinator-select s2-coordinator-citus_stat_activity s2-coordinator-citus_dist_stat_activity s2-coordinator-citus_worker_stat_activity s2-coordinator-get_all_active_transactions s2-coordinator-get_global_active_transactions s1-coordinator-commit create_distributed_table --------------------------------------------------------------------- @@ -92,17 +104,33 @@ a|b --------------------------------------------------------------------- (0 rows) -step s2-coordinator-citus_dist_stat_activity: - SELECT global_pid != 0 FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' and query NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; +step s2-coordinator-citus_stat_activity: + SELECT global_pid != 0 FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' AND query NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; ?column? --------------------------------------------------------------------- t (1 row) +step s2-coordinator-citus_dist_stat_activity: + SELECT query FROM citus_dist_stat_activity() WHERE global_pid IN ( + SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' + ) + AND query NOT ILIKE '%run_commands_on_session_level_connection_to_node%' + ORDER BY 1; + +query +--------------------------------------------------------------------- + + SET citus.enable_local_execution TO off; + SET citus.force_max_query_parallelization TO ON; + SELECT * FROM dist_table; + +(1 row) + step s2-coordinator-citus_worker_stat_activity: SELECT query FROM citus_worker_stat_activity() WHERE global_pid IN ( - SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' + SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' ) ORDER BY 1; @@ -116,7 +144,7 @@ SELECT a, b FROM public.dist_table_12345003 dist_table WHERE true step s2-coordinator-get_all_active_transactions: SELECT count(*) FROM get_all_active_transactions() WHERE global_pid IN ( - SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' + SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' ); count @@ -126,7 +154,7 @@ count step s2-coordinator-get_global_active_transactions: SELECT count(*) FROM get_global_active_transactions() WHERE global_pid IN ( - SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' + SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' ) AND transaction_number != 0; @@ -138,7 +166,7 @@ count step s1-coordinator-commit: COMMIT; -restore_isolation_tester_func +citus_remove_node --------------------------------------------------------------------- (1 row) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 6d2408a21..f378d2635 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1021,14 +1021,17 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_run_local_command(text) void | function citus_shard_indexes_on_worker() SETOF record | function citus_shards_on_worker() SETOF record + | function citus_stat_activity() SETOF record | function create_distributed_function(regprocedure,text,text,boolean) void | function pg_cancel_backend(bigint) boolean | function pg_terminate_backend(bigint,bigint) boolean + | function run_command_on_all_nodes(text,boolean,boolean) SETOF record | function worker_create_or_replace_object(text[]) boolean | function worker_drop_sequence_dependency(text) void | function worker_drop_shell_table(text) void | function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) SETOF record -(25 rows) + | view citus_stat_activity +(28 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/run_command_on_all_nodes.out b/src/test/regress/expected/run_command_on_all_nodes.out new file mode 100644 index 000000000..9cf838c83 --- /dev/null +++ b/src/test/regress/expected/run_command_on_all_nodes.out @@ -0,0 +1,88 @@ +CREATE SCHEMA run_command_on_all_nodes; +SET search_path TO run_command_on_all_nodes; +-- check coordinator isn't in metadata +SELECT count(*) != 0 AS "Coordinator is in Metadata" +FROM pg_dist_node +WHERE groupid IN ( + SELECT groupid FROM pg_dist_local_group +); + Coordinator is in Metadata +--------------------------------------------------------------------- + f +(1 row) + +-- run a simple select query and check it also runs in coordinator +SELECT nodeid NOT IN (SELECT nodeid FROM pg_dist_node) AS "Is Coordinator", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1; + Is Coordinator | success | result +--------------------------------------------------------------------- + f | t | 1 + f | t | 1 + t | t | 1 +(3 rows) + +-- check that when coordinator is not in metadata and run_command_on_all_nodes is called from +-- a worker node, command is not run on the coordinator +\c - - - :worker_1_port +SELECT nodeid NOT IN (SELECT nodeid FROM pg_dist_node) AS "Is Coordinator", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1; + Is Coordinator | success | result +--------------------------------------------------------------------- + f | t | 1 + f | t | 1 +(2 rows) + +\c - - - :master_port +-- create a table +SELECT result FROM run_command_on_all_nodes('CREATE TABLE run_command_on_all_nodes.tbl (a INT)'); + result +--------------------------------------------------------------------- + CREATE TABLE + CREATE TABLE + CREATE TABLE +(3 rows) + +SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes'; + tablename +--------------------------------------------------------------------- + tbl +(1 row) + +\c - - - :worker_1_port +SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes'; + tablename +--------------------------------------------------------------------- + tbl +(1 row) + +\c - - - :master_port +SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHERE schemaname = ''run_command_on_all_nodes'';'); + result +--------------------------------------------------------------------- + tbl + tbl + tbl +(3 rows) + +-- break a node and check messages +SELECT nodeid AS worker_1_nodeid FROM pg_dist_node WHERE nodeport = :worker_1_port \gset +UPDATE pg_dist_node SET nodeport = 0 WHERE nodeid = :worker_1_nodeid; +SELECT nodeid = :worker_1_nodeid AS "Is Worker 1", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1; + Is Worker 1 | success | result +--------------------------------------------------------------------- + f | t | 1 + f | t | 1 + t | f | failed to connect to localhost:xxxxx +(3 rows) + +SELECT nodeid = :worker_1_nodeid AS "Is Worker 1", success, result FROM run_command_on_all_nodes('SELECT 1', give_warning_for_connection_errors:=true) ORDER BY 1; +WARNING: Error on node with node id xxxxx: failed to connect to localhost:xxxxx +CONTEXT: PL/pgSQL function run_command_on_all_nodes(text,boolean,boolean) line XX at RAISE + Is Worker 1 | success | result +--------------------------------------------------------------------- + f | t | 1 + f | t | 1 + t | f | failed to connect to localhost:xxxxx +(3 rows) + +UPDATE pg_dist_node SET nodeport = :worker_1_port WHERE nodeid = :worker_1_nodeid; +DROP SCHEMA run_command_on_all_nodes CASCADE; +NOTICE: drop cascades to table run_command_on_all_nodes.tbl diff --git a/src/test/regress/expected/upgrade_citus_stat_activity.out b/src/test/regress/expected/upgrade_citus_stat_activity.out new file mode 100644 index 000000000..6564e9b40 --- /dev/null +++ b/src/test/regress/expected/upgrade_citus_stat_activity.out @@ -0,0 +1,17 @@ +SELECT column_name FROM information_schema.columns WHERE table_name = 'citus_stat_activity' AND column_name NOT IN ('leader_pid', 'query_id') +EXCEPT SELECT column_name FROM information_schema.columns WHERE table_name = 'pg_stat_activity' +ORDER BY 1; + column_name +--------------------------------------------------------------------- + global_pid + is_worker_query + nodeid +(3 rows) + +SELECT column_name FROM information_schema.columns WHERE table_name = 'pg_stat_activity' +EXCEPT SELECT column_name FROM information_schema.columns WHERE table_name = 'citus_stat_activity' +ORDER BY 1; + column_name +--------------------------------------------------------------------- + (0 rows) + diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index b1c5c0088..6adf0e990 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -105,6 +105,7 @@ ORDER BY 1; function citus_shard_indexes_on_worker() function citus_shard_sizes() function citus_shards_on_worker() + function citus_stat_activity() function citus_stat_statements() function citus_stat_statements_reset() function citus_table_is_visible(oid) @@ -189,6 +190,7 @@ ORDER BY 1; function replicate_reference_tables() function replicate_table_shards(regclass,integer,integer,bigint[],citus.shard_transfer_mode) function role_exists(name) + function run_command_on_all_nodes(text,boolean,boolean) function run_command_on_colocated_placements(regclass,regclass,text,boolean) function run_command_on_placements(regclass,text,boolean) function run_command_on_shards(regclass,text,boolean) @@ -271,9 +273,10 @@ ORDER BY 1; view citus_shard_indexes_on_worker view citus_shards view citus_shards_on_worker + view citus_stat_activity view citus_stat_statements view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(259 rows) +(262 rows) diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index ee6a52da5..48e166deb 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -94,6 +94,7 @@ test: alter_distributed_table test: issue_5248 issue_5099 test: object_propagation_debug test: undistribute_table +test: run_command_on_all_nodes # --------- diff --git a/src/test/regress/spec/isolation_global_pid.spec b/src/test/regress/spec/isolation_global_pid.spec index 62f45d1e9..a7804dd71 100644 --- a/src/test/regress/spec/isolation_global_pid.spec +++ b/src/test/regress/spec/isolation_global_pid.spec @@ -2,6 +2,7 @@ setup { + SELECT citus_add_node('localhost', 57636, groupid:=0); SET citus.next_shard_id TO 12345000; CREATE TABLE dist_table (a INT, b INT); SELECT create_distributed_table('dist_table', 'a', shard_count:=4); @@ -11,6 +12,7 @@ teardown { DROP TABLE dist_table; SELECT citus_internal.restore_isolation_tester_func(); + SELECT citus_remove_node('localhost', 57636); } session "s1" @@ -60,15 +62,24 @@ step "s1-stop-session-level-connection" session "s2" +step "s2-coordinator-citus_stat_activity" +{ + SELECT global_pid != 0 FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' AND query NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; +} + step "s2-coordinator-citus_dist_stat_activity" { - SELECT global_pid != 0 FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' and query NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT query FROM citus_dist_stat_activity() WHERE global_pid IN ( + SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' + ) + AND query NOT ILIKE '%run_commands_on_session_level_connection_to_node%' + ORDER BY 1; } step "s2-coordinator-citus_worker_stat_activity" { SELECT query FROM citus_worker_stat_activity() WHERE global_pid IN ( - SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' + SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' ) ORDER BY 1; } @@ -76,21 +87,21 @@ step "s2-coordinator-citus_worker_stat_activity" step "s2-coordinator-get_all_active_transactions" { SELECT count(*) FROM get_all_active_transactions() WHERE global_pid IN ( - SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' + SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' ); } step "s2-coordinator-get_global_active_transactions" { SELECT count(*) FROM get_global_active_transactions() WHERE global_pid IN ( - SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' + SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' ) AND transaction_number != 0; } // worker - coordinator -permutation "s1-start-session-level-connection" "s1-worker-begin" "s1-worker-select" "s2-coordinator-citus_dist_stat_activity" "s2-coordinator-citus_worker_stat_activity" "s1-worker-commit" "s1-stop-session-level-connection" +permutation "s1-start-session-level-connection" "s1-worker-begin" "s1-worker-select" "s2-coordinator-citus_stat_activity" "s2-coordinator-citus_dist_stat_activity" "s2-coordinator-citus_worker_stat_activity" "s1-worker-commit" "s1-stop-session-level-connection" // coordinator - coordinator -permutation "s1-coordinator-begin" "s1-coordinator-select" "s2-coordinator-citus_dist_stat_activity" "s2-coordinator-citus_worker_stat_activity" "s2-coordinator-get_all_active_transactions" "s2-coordinator-get_global_active_transactions" "s1-coordinator-commit" +permutation "s1-coordinator-begin" "s1-coordinator-select" "s2-coordinator-citus_stat_activity" "s2-coordinator-citus_dist_stat_activity" "s2-coordinator-citus_worker_stat_activity" "s2-coordinator-get_all_active_transactions" "s2-coordinator-get_global_active_transactions" "s1-coordinator-commit" diff --git a/src/test/regress/sql/run_command_on_all_nodes.sql b/src/test/regress/sql/run_command_on_all_nodes.sql new file mode 100644 index 000000000..46f838eb5 --- /dev/null +++ b/src/test/regress/sql/run_command_on_all_nodes.sql @@ -0,0 +1,41 @@ +CREATE SCHEMA run_command_on_all_nodes; +SET search_path TO run_command_on_all_nodes; + +-- check coordinator isn't in metadata +SELECT count(*) != 0 AS "Coordinator is in Metadata" +FROM pg_dist_node +WHERE groupid IN ( + SELECT groupid FROM pg_dist_local_group +); + +-- run a simple select query and check it also runs in coordinator +SELECT nodeid NOT IN (SELECT nodeid FROM pg_dist_node) AS "Is Coordinator", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1; + +-- check that when coordinator is not in metadata and run_command_on_all_nodes is called from +-- a worker node, command is not run on the coordinator +\c - - - :worker_1_port +SELECT nodeid NOT IN (SELECT nodeid FROM pg_dist_node) AS "Is Coordinator", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1; + +\c - - - :master_port + +-- create a table +SELECT result FROM run_command_on_all_nodes('CREATE TABLE run_command_on_all_nodes.tbl (a INT)'); + +SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes'; + +\c - - - :worker_1_port +SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes'; + +\c - - - :master_port +SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHERE schemaname = ''run_command_on_all_nodes'';'); + +-- break a node and check messages +SELECT nodeid AS worker_1_nodeid FROM pg_dist_node WHERE nodeport = :worker_1_port \gset +UPDATE pg_dist_node SET nodeport = 0 WHERE nodeid = :worker_1_nodeid; + +SELECT nodeid = :worker_1_nodeid AS "Is Worker 1", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1; +SELECT nodeid = :worker_1_nodeid AS "Is Worker 1", success, result FROM run_command_on_all_nodes('SELECT 1', give_warning_for_connection_errors:=true) ORDER BY 1; + +UPDATE pg_dist_node SET nodeport = :worker_1_port WHERE nodeid = :worker_1_nodeid; + +DROP SCHEMA run_command_on_all_nodes CASCADE; diff --git a/src/test/regress/sql/upgrade_citus_stat_activity.sql b/src/test/regress/sql/upgrade_citus_stat_activity.sql new file mode 100644 index 000000000..1d97b59b6 --- /dev/null +++ b/src/test/regress/sql/upgrade_citus_stat_activity.sql @@ -0,0 +1,7 @@ +SELECT column_name FROM information_schema.columns WHERE table_name = 'citus_stat_activity' AND column_name NOT IN ('leader_pid', 'query_id') +EXCEPT SELECT column_name FROM information_schema.columns WHERE table_name = 'pg_stat_activity' +ORDER BY 1; + +SELECT column_name FROM information_schema.columns WHERE table_name = 'pg_stat_activity' +EXCEPT SELECT column_name FROM information_schema.columns WHERE table_name = 'citus_stat_activity' +ORDER BY 1;