From 0500a62515214917fa09dd6c44a58a45bc8a6055 Mon Sep 17 00:00:00 2001 From: Halil Ozan Akgul Date: Thu, 3 Mar 2022 17:36:20 +0300 Subject: [PATCH] Updates citus_dist_stat_activity to use citus_stat_activity --- .../distributed/sql/citus--10.2-4--11.0-1.sql | 20 +- .../sql/downgrades/citus--11.0-1--10.2-4.sql | 8 +- .../udfs/citus_dist_stat_activity/11.0-1.sql | 23 +- .../udfs/citus_dist_stat_activity/latest.sql | 23 +- .../sql/udfs/citus_lock_waits/11.0-1.sql | 26 +- .../sql/udfs/citus_lock_waits/latest.sql | 26 +- .../citus_worker_stat_activity/11.0-1.sql | 19 - .../citus_worker_stat_activity/latest.sql | 19 - .../transaction/citus_dist_stat_activity.c | 1094 +---------------- .../isolation_citus_dist_activity.out | 60 +- ...lation_get_distributed_wait_queries_mx.out | 84 +- .../regress/expected/isolation_global_pid.out | 20 +- ...licate_reference_tables_to_coordinator.out | 37 +- src/test/regress/expected/multi_extension.out | 7 +- .../expected/upgrade_list_citus_objects.out | 5 +- .../spec/isolation_citus_dist_activity.spec | 4 +- ...ation_get_distributed_wait_queries_mx.spec | 2 +- .../regress/spec/isolation_global_pid.spec | 12 +- ...icate_reference_tables_to_coordinator.spec | 25 +- 19 files changed, 170 insertions(+), 1344 deletions(-) delete mode 100644 src/backend/distributed/sql/udfs/citus_worker_stat_activity/11.0-1.sql delete mode 100644 src/backend/distributed/sql/udfs/citus_worker_stat_activity/latest.sql diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index 0f7b57d05..6525322bf 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -22,26 +22,19 @@ #include "udfs/citus_internal_local_blocked_processes/11.0-1.sql" #include "udfs/citus_internal_global_blocked_processes/11.0-1.sql" -#include "udfs/citus_worker_stat_activity/11.0-1.sql" +#include "udfs/run_command_on_all_nodes/11.0-1.sql" +#include "udfs/citus_stat_activity/11.0-1.sql" + #include "udfs/worker_create_or_replace_object/11.0-1.sql" #include "udfs/citus_isolation_test_session_is_blocked/11.0-1.sql" #include "udfs/citus_blocking_pids/11.0-1.sql" #include "udfs/citus_calculate_gpid/11.0-1.sql" #include "udfs/citus_backend_gpid/11.0-1.sql" -CREATE VIEW citus.citus_worker_stat_activity AS -SELECT * FROM pg_catalog.citus_worker_stat_activity(); -ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog; -GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC; - +DROP FUNCTION IF EXISTS pg_catalog.citus_dist_stat_activity() CASCADE; +DROP FUNCTION IF EXISTS pg_catalog.citus_worker_stat_activity() CASCADE; #include "udfs/citus_dist_stat_activity/11.0-1.sql" -CREATE VIEW citus.citus_dist_stat_activity AS -SELECT * FROM pg_catalog.citus_dist_stat_activity(); -ALTER VIEW citus.citus_dist_stat_activity SET SCHEMA pg_catalog; -GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC; - --- we have to recreate this view because recreated citus_dist_stat_activity that this view depends #include "udfs/citus_lock_waits/11.0-1.sql" #include "udfs/pg_cancel_backend/11.0-1.sql" @@ -86,6 +79,3 @@ END; $$; #include "udfs/citus_finalize_upgrade_to_citus11/11.0-1.sql" - -#include "udfs/run_command_on_all_nodes/11.0-1.sql" -#include "udfs/citus_stat_activity/11.0-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index a3c992606..5c66e063f 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -124,8 +124,8 @@ DROP VIEW pg_catalog.citus_lock_waits; DROP FUNCTION citus_internal_local_blocked_processes; DROP FUNCTION citus_internal_global_blocked_processes; -DROP VIEW pg_catalog.citus_dist_stat_activity; -DROP FUNCTION pg_catalog.citus_dist_stat_activity; +DROP VIEW IF EXISTS pg_catalog.citus_dist_stat_activity; +DROP FUNCTION IF EXISTS pg_catalog.citus_dist_stat_activity; CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, @@ -151,8 +151,8 @@ ALTER VIEW citus.citus_dist_stat_activity SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC; SET search_path = 'pg_catalog'; -DROP VIEW citus_worker_stat_activity; -DROP FUNCTION citus_worker_stat_activity; +DROP VIEW IF EXISTS citus_worker_stat_activity; +DROP FUNCTION IF EXISTS citus_worker_stat_activity; CREATE OR REPLACE FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, diff --git a/src/backend/distributed/sql/udfs/citus_dist_stat_activity/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_dist_stat_activity/11.0-1.sql index 7b38f627d..97dc387b3 100644 --- a/src/backend/distributed/sql/udfs/citus_dist_stat_activity/11.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_dist_stat_activity/11.0-1.sql @@ -1,19 +1,8 @@ -DROP FUNCTION IF EXISTS pg_catalog.citus_dist_stat_activity CASCADE; +DROP VIEW IF EXISTS pg_catalog.citus_dist_stat_activity; -CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, - OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, - OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, - OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, - OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, - OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) -RETURNS SETOF RECORD -LANGUAGE C STRICT AS 'MODULE_PATHNAME', -$$citus_dist_stat_activity$$; +CREATE OR REPLACE VIEW citus.citus_dist_stat_activity AS +SELECT * FROM citus_stat_activity +WHERE is_worker_query = false; -COMMENT ON FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, - OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, - OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, - OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, - OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, - OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) -IS 'returns distributed transaction activity on distributed tables'; +ALTER VIEW citus.citus_dist_stat_activity SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_dist_stat_activity/latest.sql b/src/backend/distributed/sql/udfs/citus_dist_stat_activity/latest.sql index 7b38f627d..97dc387b3 100644 --- a/src/backend/distributed/sql/udfs/citus_dist_stat_activity/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_dist_stat_activity/latest.sql @@ -1,19 +1,8 @@ -DROP FUNCTION IF EXISTS pg_catalog.citus_dist_stat_activity CASCADE; +DROP VIEW IF EXISTS pg_catalog.citus_dist_stat_activity; -CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, - OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, - OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, - OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, - OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, - OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) -RETURNS SETOF RECORD -LANGUAGE C STRICT AS 'MODULE_PATHNAME', -$$citus_dist_stat_activity$$; +CREATE OR REPLACE VIEW citus.citus_dist_stat_activity AS +SELECT * FROM citus_stat_activity +WHERE is_worker_query = false; -COMMENT ON FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, - OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, - OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, - OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, - OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, - OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) -IS 'returns distributed transaction activity on distributed tables'; +ALTER VIEW citus.citus_dist_stat_activity SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql index 779341657..8770f2b1d 100644 --- a/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql @@ -2,23 +2,9 @@ SET search_path = 'pg_catalog'; CREATE VIEW citus.citus_lock_waits AS WITH -citus_dist_stat_activity AS -( - SELECT * FROM citus_dist_stat_activity -), unique_global_wait_edges AS ( SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM citus_internal_global_blocked_processes() -), -citus_dist_stat_activity_with_node_id AS -( - SELECT - citus_dist_stat_activity.*, (CASE citus_dist_stat_activity.distributed_query_host_name WHEN 'coordinator_host' THEN 0 ELSE pg_dist_node.nodeid END) as initiator_node_id - FROM - citus_dist_stat_activity LEFT JOIN pg_dist_node - ON - citus_dist_stat_activity.distributed_query_host_name = pg_dist_node.nodename AND - citus_dist_stat_activity.distributed_query_host_port = pg_dist_node.nodeport ) SELECT waiting.global_pid as waiting_gpid, @@ -27,18 +13,14 @@ SELECT blocking.pid AS blocking_pid, waiting.query AS blocked_statement, blocking.query AS current_statement_in_blocking_process, - waiting.initiator_node_id AS waiting_node_id, - blocking.initiator_node_id AS blocking_node_id, - waiting.distributed_query_host_name AS waiting_node_name, - blocking.distributed_query_host_name AS blocking_node_name, - waiting.distributed_query_host_port AS waiting_node_port, - blocking.distributed_query_host_port AS blocking_node_port + waiting.nodeid AS waiting_nodeid, + blocking.nodeid AS blocking_nodeid FROM unique_global_wait_edges JOIN - citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_global_pid = waiting.global_pid) + citus_dist_stat_activity waiting ON (unique_global_wait_edges.waiting_global_pid = waiting.global_pid) JOIN - citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_global_pid = blocking.global_pid); + citus_dist_stat_activity blocking ON (unique_global_wait_edges.blocking_global_pid = blocking.global_pid); ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql b/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql index 779341657..8770f2b1d 100644 --- a/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql @@ -2,23 +2,9 @@ SET search_path = 'pg_catalog'; CREATE VIEW citus.citus_lock_waits AS WITH -citus_dist_stat_activity AS -( - SELECT * FROM citus_dist_stat_activity -), unique_global_wait_edges AS ( SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM citus_internal_global_blocked_processes() -), -citus_dist_stat_activity_with_node_id AS -( - SELECT - citus_dist_stat_activity.*, (CASE citus_dist_stat_activity.distributed_query_host_name WHEN 'coordinator_host' THEN 0 ELSE pg_dist_node.nodeid END) as initiator_node_id - FROM - citus_dist_stat_activity LEFT JOIN pg_dist_node - ON - citus_dist_stat_activity.distributed_query_host_name = pg_dist_node.nodename AND - citus_dist_stat_activity.distributed_query_host_port = pg_dist_node.nodeport ) SELECT waiting.global_pid as waiting_gpid, @@ -27,18 +13,14 @@ SELECT blocking.pid AS blocking_pid, waiting.query AS blocked_statement, blocking.query AS current_statement_in_blocking_process, - waiting.initiator_node_id AS waiting_node_id, - blocking.initiator_node_id AS blocking_node_id, - waiting.distributed_query_host_name AS waiting_node_name, - blocking.distributed_query_host_name AS blocking_node_name, - waiting.distributed_query_host_port AS waiting_node_port, - blocking.distributed_query_host_port AS blocking_node_port + waiting.nodeid AS waiting_nodeid, + blocking.nodeid AS blocking_nodeid FROM unique_global_wait_edges JOIN - citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_global_pid = waiting.global_pid) + citus_dist_stat_activity waiting ON (unique_global_wait_edges.waiting_global_pid = waiting.global_pid) JOIN - citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_global_pid = blocking.global_pid); + citus_dist_stat_activity blocking ON (unique_global_wait_edges.blocking_global_pid = blocking.global_pid); ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_worker_stat_activity/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_worker_stat_activity/11.0-1.sql deleted file mode 100644 index 6f585b2e8..000000000 --- a/src/backend/distributed/sql/udfs/citus_worker_stat_activity/11.0-1.sql +++ /dev/null @@ -1,19 +0,0 @@ -DROP FUNCTION IF EXISTS pg_catalog.citus_worker_stat_activity CASCADE; - -CREATE OR REPLACE FUNCTION pg_catalog.citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, - OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, - OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, - OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, - OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, - OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) -RETURNS SETOF RECORD -LANGUAGE C STRICT AS 'MODULE_PATHNAME', -$$citus_worker_stat_activity$$; - -COMMENT ON FUNCTION pg_catalog.citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, - OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, - OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, - OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, - OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, - OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) -IS 'returns distributed transaction activity on shards of distributed tables'; diff --git a/src/backend/distributed/sql/udfs/citus_worker_stat_activity/latest.sql b/src/backend/distributed/sql/udfs/citus_worker_stat_activity/latest.sql deleted file mode 100644 index 6f585b2e8..000000000 --- a/src/backend/distributed/sql/udfs/citus_worker_stat_activity/latest.sql +++ /dev/null @@ -1,19 +0,0 @@ -DROP FUNCTION IF EXISTS pg_catalog.citus_worker_stat_activity CASCADE; - -CREATE OR REPLACE FUNCTION pg_catalog.citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, - OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, - OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, - OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, - OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, - OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) -RETURNS SETOF RECORD -LANGUAGE C STRICT AS 'MODULE_PATHNAME', -$$citus_worker_stat_activity$$; - -COMMENT ON FUNCTION pg_catalog.citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, - OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, - OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, - OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, - OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, - OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) -IS 'returns distributed transaction activity on shards of distributed tables'; diff --git a/src/backend/distributed/transaction/citus_dist_stat_activity.c b/src/backend/distributed/transaction/citus_dist_stat_activity.c index 0ee3925fb..3aa6372e6 100644 --- a/src/backend/distributed/transaction/citus_dist_stat_activity.c +++ b/src/backend/distributed/transaction/citus_dist_stat_activity.c @@ -2,8 +2,7 @@ * * citus_dist_stat_activity.c * - * This file contains functions for monitoring the distributed transactions - * across the cluster. + * The methods in the file are deprecated. * * Copyright (c) Citus Data, Inc. * @@ -11,1106 +10,27 @@ */ #include "postgres.h" -#include "libpq-fe.h" #include "miscadmin.h" -#include "fmgr.h" -#include "postmaster/postmaster.h" - #include "funcapi.h" -#include "access/htup_details.h" -#include "catalog/pg_type.h" -#include "datatype/timestamp.h" -#include "distributed/backend_data.h" -#include "distributed/connection_management.h" -#include "distributed/listutils.h" -#include "distributed/lock_graph.h" -#include "distributed/coordinator_protocol.h" -#include "distributed/metadata_cache.h" -#include "distributed/remote_commands.h" -#include "distributed/transaction_identifier.h" -#include "distributed/tuplestore.h" -#include "executor/spi.h" -#include "nodes/execnodes.h" -#include "storage/ipc.h" -#include "storage/lwlock.h" -#include "storage/proc.h" -#include "storage/spin.h" -#include "storage/s_lock.h" -#include "utils/builtins.h" -#include "utils/fmgrprotos.h" -#include "utils/inet.h" -#include "utils/timestamp.h" - - -/* - * citus_dist_stat_activity() and citus_worker_stat_activity() is similar to - * pg_stat_activity. Those functions basically return join of - * pg_stat_activity and get_all_active_transactions() on each node - * in the cluster. The only difference is that citus_dist_stat_activity() - * gets transactions where worker_query = false and citus_worker_stat_activity() - * gets transactions where worker_query = true. - * - * In other words, citus_dist_stat_activity returns only the queries that are the - * distributed queries. citus_worker_stat_activity returns only the queries that - * are worker queries (e.g., queries on the shards) initiated by those distributed - * queries. To understand this better, let us give an example. If a users starts - * a query like "UPDATE table SET value = 1", this query would show up on - * citus_dist_stat_activity. The same query would generate #shard worker queries, - * all of which would show up on citus_worker_stat_activity. - * - * An important note on this views is that they only show the activity - * that are inside distributed transactions. Distributed transactions - * cover the following: - * - All multi-shard modifications (DDLs, COPY, UPDATE, DELETE, INSERT .. SELECT) - * - All multi-shard queries with CTEs (modifying CTEs, read-only CTEs) - * - All recursively planned subqueries - * - All queries within transaction blocks (BEGIN; query; COMMIT;) - * - * In other words, the following types of queries won't be observed in these - * views: - * - Single-shard queries that are not inside transaction blocks - * - Multi-shard select queries that are not inside transaction blocks - * - Task-tracker queries - * - * - * The following information for all the distributed transactions: - * query_host_name text - * query_host_port int - * database_id oid - * databaese_name name - * process_id integer - * initiator_node_host text - * initiator_node_port int - * distributed_transaction_number bigint - * distributed_transaction_stamp timestamp with time zone - * usesysid oid - * usename name - * application_name text - * client_addr inet - * client_hostname text - * client_port integer - * backend_start timestamp with time zone - * xact_start timestamp with time zone - * query_start timestamp with time zone - * state_change timestamp with time zone - * wait_event_type text - * wait_event text - * state text - * backend_xid xid - * backend_xmin xid - * query text - * backend_type text - */ - -/* - * We get CITUS_DIST_STAT_ACTIVITY_QUERY_COLS from workers and manually add - * CITUS_DIST_STAT_ADDITIONAL_COLS for hostname and hostport. Also, instead of - * showing the initiator_node_id we expand it to initiator_node_host and - * initiator_node_port. - */ -#define CITUS_DIST_STAT_ACTIVITY_QUERY_COLS 24 -#define CITUS_DIST_STAT_ADDITIONAL_COLS 3 -#define CITUS_DIST_STAT_ACTIVITY_COLS \ - CITUS_DIST_STAT_ACTIVITY_QUERY_COLS + CITUS_DIST_STAT_ADDITIONAL_COLS - - -#define coordinator_host_name "coordinator_host" - -/* - * We get the query_host_name and query_host_port while opening the connection to - * the node. We also replace initiator_node_identifier with initiator_node_host - * and initiator_node_port. Thus, they are not in the query below. - */ - -#define CITUS_DIST_STAT_ACTIVITY_QUERY \ - "\ -SELECT \ - dist_txs.initiator_node_identifier, \ - dist_txs.transaction_number, \ - dist_txs.transaction_stamp, \ - pg_stat_activity.datid, \ - pg_stat_activity.datname, \ - pg_stat_activity.pid, \ - pg_stat_activity.usesysid, \ - pg_stat_activity.usename, \ - pg_stat_activity.application_name, \ - pg_stat_activity.client_addr, \ - pg_stat_activity.client_hostname, \ - pg_stat_activity.client_port, \ - pg_stat_activity.backend_start, \ - pg_stat_activity.xact_start, \ - pg_stat_activity.query_start, \ - pg_stat_activity.state_change, \ - pg_stat_activity.wait_event_type, \ - pg_stat_activity.wait_event, \ - pg_stat_activity.state, \ - pg_stat_activity.backend_xid, \ - pg_stat_activity.backend_xmin, \ - pg_stat_activity.query, \ - pg_stat_activity.backend_type, \ - dist_txs.global_pid \ -FROM \ - pg_stat_activity \ - INNER JOIN \ - get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp, global_pid) \ - ON pg_stat_activity.pid = dist_txs.process_id \ -WHERE \ - backend_type = 'client backend' \ - AND \ - worker_query = False \ - AND \ - pg_stat_activity.query NOT ILIKE '%stat_activity%';" - -#define CITUS_WORKER_STAT_ACTIVITY_QUERY \ - "\ -SELECT \ - dist_txs.initiator_node_identifier, \ - dist_txs.transaction_number, \ - dist_txs.transaction_stamp, \ - pg_stat_activity.datid, \ - pg_stat_activity.datname, \ - pg_stat_activity.pid, \ - pg_stat_activity.usesysid, \ - pg_stat_activity.usename, \ - pg_stat_activity.application_name, \ - pg_stat_activity.client_addr, \ - pg_stat_activity.client_hostname, \ - pg_stat_activity.client_port, \ - pg_stat_activity.backend_start, \ - pg_stat_activity.xact_start, \ - pg_stat_activity.query_start, \ - pg_stat_activity.state_change, \ - pg_stat_activity.wait_event_type, \ - pg_stat_activity.wait_event, \ - pg_stat_activity.state, \ - pg_stat_activity.backend_xid, \ - pg_stat_activity.backend_xmin, \ - pg_stat_activity.query, \ - pg_stat_activity.backend_type, \ - dist_txs.global_id \ -FROM \ - pg_stat_activity \ - JOIN \ - get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp, global_id) \ - ON pg_stat_activity.pid = dist_txs.process_id \ -WHERE \ - worker_query = True \ - AND \ - pg_stat_activity.query NOT ILIKE '%stat_activity%';" - -typedef struct CitusDistStat -{ - text *query_host_name; - int query_host_port; - - text *master_query_host_name; - int master_query_host_port; - uint64 distributed_transaction_number; - TimestampTz distributed_transaction_stamp; - - /* fields from pg_stat_statement */ - Oid database_id; - Name databaese_name; - int process_id; - Oid usesysid; - Name usename; - text *application_name; - inet *client_addr; - text *client_hostname; - int client_port; - TimestampTz backend_start; - TimestampTz xact_start; - TimestampTz query_start; - TimestampTz state_change; - text *wait_event_type; - text *wait_event; - text *state; - TransactionId backend_xid; - TransactionId backend_xmin; - text *query; - text *backend_type; - uint64 global_pid; -} CitusDistStat; - - -/* local forward declarations */ -static List * CitusStatActivity(const char *statQuery); -static void ReturnCitusDistStats(List *citusStatsList, FunctionCallInfo fcinfo); -static CitusDistStat * ParseCitusDistStat(PGresult *result, int64 rowIndex); -static void ReplaceInitiatorNodeIdentifier(int initiator_node_identifier, - CitusDistStat *citusDistStat); - -/* utility functions to parse the fields from PGResult */ -static text * ParseTextField(PGresult *result, int rowIndex, int colIndex); -static Name ParseNameField(PGresult *result, int rowIndex, int colIndex); -static inet * ParseInetField(PGresult *result, int rowIndex, int colIndex); -static TransactionId ParseXIDField(PGresult *result, int rowIndex, int colIndex); - -/* utility functions to fetch the fields from heapTuple */ -static List * GetLocalNodeCitusDistStat(const char *statQuery); -static List * LocalNodeCitusDistStat(const char *statQuery, const char *hostname, int - port); -static CitusDistStat * HeapTupleToCitusDistStat(HeapTuple result, TupleDesc - rowDescriptor); -static int64 ParseIntFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex); -static text * ParseTextFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int - colIndex); -static Name ParseNameFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex); -static inet * ParseInetFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int - colIndex); -static TimestampTz ParseTimestampTzFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, - int colIndex); -static TransactionId ParseXIDFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int - colIndex); - PG_FUNCTION_INFO_V1(citus_dist_stat_activity); PG_FUNCTION_INFO_V1(citus_worker_stat_activity); - -/* - * citus_dist_stat_activity connects to all nodes in the cluster and returns - * pg_stat_activity like result set but only consisting of queries that are - * on the distributed tables and inside distributed transactions. - */ +/* This UDF is deprecated. */ Datum citus_dist_stat_activity(PG_FUNCTION_ARGS) { - CheckCitusVersion(ERROR); + ereport(ERROR, (errmsg("This UDF is deprecated."))); - List *citusDistStatStatements = CitusStatActivity(CITUS_DIST_STAT_ACTIVITY_QUERY); - - ReturnCitusDistStats(citusDistStatStatements, fcinfo); - - PG_RETURN_VOID(); + PG_RETURN_NULL(); } -/* - * citus_worker_stat_activity connects to all nodes in the cluster and returns - * pg_stat_activity like result set but only consisting of queries that are - * on the shards of distributed tables and inside distributed transactions. - */ +/* This UDF is deprecated. */ Datum citus_worker_stat_activity(PG_FUNCTION_ARGS) { - CheckCitusVersion(ERROR); + ereport(ERROR, (errmsg("This UDF is deprecated."))); - List *citusWorkerStatStatements = CitusStatActivity(CITUS_WORKER_STAT_ACTIVITY_QUERY); - - ReturnCitusDistStats(citusWorkerStatStatements, fcinfo); - - PG_RETURN_VOID(); -} - - -/* - * CitusStatActivity gets the stats query, connects to each node in the - * cluster, executes the query and parses the results. The function returns - * list of CitusDistStat struct for further processing. - * - * The function connects to each active primary node in the pg_dist_node. Plus, - * if the query is being executed on the coordinator, the function connects to - * localhost as well. The implication of this is that whenever the query is executed - * from a MX worker node, it wouldn't be able to get information from the queries - * executed on the coordinator given that there is not metadata information about that. - */ -static List * -CitusStatActivity(const char *statQuery) -{ - List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); - List *connectionList = NIL; - - /* - * For the local node, we can avoid opening connections. This might be - * important when we're on the coordinator since it requires configuring - * the authentication for self-connection via any user who calls the citus - * stat activity functions. - */ - List *citusStatsList = GetLocalNodeCitusDistStat(statQuery); - - /* - * We prefer to connect with the current user to the remote nodes. This will - * ensure that we have the same privilage restrictions that pg_stat_activity - * enforces. - */ - char *nodeUser = CurrentUserName(); - - int32 localGroupId = GetLocalGroupId(); - - /* open connections in parallel */ - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, workerNodeList) - { - const char *nodeName = workerNode->workerName; - int nodePort = workerNode->workerPort; - int connectionFlags = 0; - - if (workerNode->groupId == localGroupId) - { - /* we already get these stats via GetLocalNodeCitusDistStat() */ - continue; - } - - MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags, - nodeName, nodePort, - nodeUser, NULL); - - connectionList = lappend(connectionList, connection); - } - - FinishConnectionListEstablishment(connectionList); - - /* send commands in parallel */ - MultiConnection *connection = NULL; - foreach_ptr(connection, connectionList) - { - int querySent = SendRemoteCommand(connection, statQuery); - if (querySent == 0) - { - ReportConnectionError(connection, WARNING); - } - } - - /* receive query results */ - foreach_ptr(connection, connectionList) - { - bool raiseInterrupts = true; - - PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); - if (!IsResponseOK(result)) - { - ReportResultError(connection, result, WARNING); - continue; - } - - int64 rowCount = PQntuples(result); - int64 colCount = PQnfields(result); - - if (colCount != CITUS_DIST_STAT_ACTIVITY_QUERY_COLS) - { - /* - * We don't expect to hit this error, but keep it here in case there - * is a version mistmatch. - */ - ereport(WARNING, (errmsg("unexpected number of columns from " - "citus stat query"))); - continue; - } - - for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) - { - CitusDistStat *citusDistStat = ParseCitusDistStat(result, rowIndex); - - /* - * Add the query_host_name and query_host_port which denote where - * the query is being running. - */ - citusDistStat->query_host_name = cstring_to_text(connection->hostname); - citusDistStat->query_host_port = connection->port; - - citusStatsList = lappend(citusStatsList, citusDistStat); - } - - PQclear(result); - ForgetResults(connection); - } - - return citusStatsList; -} - - -/* - * GetLocalNodeCitusDistStat simple executes the given query with SPI to get - * the result of the given stat query on the local node. - */ -static List * -GetLocalNodeCitusDistStat(const char *statQuery) -{ - List *citusStatsList = NIL; - - if (IsCoordinator()) - { - /* - * Coordinator's nodename and nodeport doesn't show-up in the metadata, - * so mark it manually as executing from the coordinator. - */ - citusStatsList = LocalNodeCitusDistStat(statQuery, coordinator_host_name, - PostPortNumber); - - return citusStatsList; - } - - int32 localGroupId = GetLocalGroupId(); - - /* get the current worker's node stats */ - List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, workerNodeList) - { - if (workerNode->groupId == localGroupId) - { - const char *nodeName = workerNode->workerName; - int nodePort = workerNode->workerPort; - - citusStatsList = LocalNodeCitusDistStat(statQuery, nodeName, nodePort); - - break; - } - } - - return citusStatsList; -} - - -/* - * ParseCitusDistStat is a helper function which basically gets a PGresult - * and parses the results for rowIndex. Finally, returns CitusDistStat for - * further processing of the data retrieved. - * - * HeapTupleToCitusDistStat() and ParseCitusDistStat() are doing the same thing on - * different input data structures. Thus, any change to here should be reflected in - * the other function as well. - */ -static CitusDistStat * -ParseCitusDistStat(PGresult *result, int64 rowIndex) -{ - CitusDistStat *citusDistStat = (CitusDistStat *) palloc0(sizeof(CitusDistStat)); - - - int initiator_node_identifier = - PQgetisnull(result, rowIndex, 0) ? -1 : ParseIntField(result, rowIndex, 0); - - ReplaceInitiatorNodeIdentifier(initiator_node_identifier, citusDistStat); - - citusDistStat->distributed_transaction_number = ParseIntField(result, rowIndex, 1); - citusDistStat->distributed_transaction_stamp = - ParseTimestampTzField(result, rowIndex, 2); - - /* fields from pg_stat_statement */ - citusDistStat->database_id = ParseIntField(result, rowIndex, 3); - citusDistStat->databaese_name = ParseNameField(result, rowIndex, 4); - citusDistStat->process_id = ParseIntField(result, rowIndex, 5); - citusDistStat->usesysid = ParseIntField(result, rowIndex, 6); - citusDistStat->usename = ParseNameField(result, rowIndex, 7); - citusDistStat->application_name = ParseTextField(result, rowIndex, 8); - citusDistStat->client_addr = ParseInetField(result, rowIndex, 9); - citusDistStat->client_hostname = ParseTextField(result, rowIndex, 10); - citusDistStat->client_port = ParseIntField(result, rowIndex, 11); - citusDistStat->backend_start = ParseTimestampTzField(result, rowIndex, 12); - citusDistStat->xact_start = ParseTimestampTzField(result, rowIndex, 13); - citusDistStat->query_start = ParseTimestampTzField(result, rowIndex, 14); - citusDistStat->state_change = ParseTimestampTzField(result, rowIndex, 15); - citusDistStat->wait_event_type = ParseTextField(result, rowIndex, 16); - citusDistStat->wait_event = ParseTextField(result, rowIndex, 17); - citusDistStat->state = ParseTextField(result, rowIndex, 18); - citusDistStat->backend_xid = ParseXIDField(result, rowIndex, 19); - citusDistStat->backend_xmin = ParseXIDField(result, rowIndex, 20); - citusDistStat->query = ParseTextField(result, rowIndex, 21); - citusDistStat->backend_type = ParseTextField(result, rowIndex, 22); - citusDistStat->global_pid = ParseIntField(result, rowIndex, 23); - - return citusDistStat; -} - - -static void -ReplaceInitiatorNodeIdentifier(int initiator_node_identifier, - CitusDistStat *citusDistStat) -{ - WorkerNode *initiatorWorkerNode = NULL; - - /* - * Replace initiator_node_identifier with initiator_node_hostname - * and initiator_node_port given that those are a lot more useful. - * - * The rules are following: - * - If initiator_node_identifier belongs to a worker, simply get it - * from the metadata - * - If the initiator_node_identifier belongs to the coordinator and - * we're executing the function on the coordinator, get the localhost - * and port - * - If the initiator_node_identifier belongs to the coordinator and - * we're executing the function on a worker node, manually mark it - * as "coordinator_host" given that we cannot know the host and port - * - If the initiator_node_identifier doesn't equal to zero, we know that - * it is a worker query initiated outside of a distributed - * transaction. However, we cannot know which node has initiated - * the worker query. - */ - if (initiator_node_identifier == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA && - IsCoordinator()) - { - citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); - citusDistStat->master_query_host_port = PostPortNumber; - } - else if (initiator_node_identifier == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA) - { - citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); - citusDistStat->master_query_host_port = 0; - } - else if (initiator_node_identifier > 0) - { - /* a query should run on an existing node, but lets be defensive */ - bool missingOk = true; - initiatorWorkerNode = FindNodeWithNodeId(initiator_node_identifier, missingOk); - - if (initiatorWorkerNode) - { - citusDistStat->master_query_host_name = - cstring_to_text(initiatorWorkerNode->workerName); - citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort; - } - else - { - citusDistStat->master_query_host_name = NULL; - citusDistStat->master_query_host_port = 0; - } - } - else - { - citusDistStat->master_query_host_name = NULL; - citusDistStat->master_query_host_port = 0; - } -} - - -/* - * LocalNodeCitusDistStat simply executes the given query via SPI and parses - * the results back in a list for further processing. - * - * hostname and port is provided for filling the fields on the return list, obviously - * not for executing the SPI. - */ -static List * -LocalNodeCitusDistStat(const char *statQuery, const char *hostname, int port) -{ - List *localNodeCitusDistStatList = NIL; - bool readOnly = true; - - MemoryContext upperContext = CurrentMemoryContext, oldContext = NULL; - - int spiConnectionResult = SPI_connect(); - if (spiConnectionResult != SPI_OK_CONNECT) - { - ereport(WARNING, (errmsg("could not connect to SPI manager to get " - "the local stat activity"))); - - SPI_finish(); - - return NIL; - } - - int spiQueryResult = SPI_execute(statQuery, readOnly, 0); - if (spiQueryResult != SPI_OK_SELECT) - { - ereport(WARNING, (errmsg("execution was not successful while trying to get " - "the local stat activity"))); - - SPI_finish(); - - return NIL; - } - - /* - * SPI_connect switches to its own memory context, which is destroyed by - * the call to SPI_finish. SPI_palloc is provided to allocate memory in - * the previous ("upper") context, but that is inadequate when we need to - * call other functions that themselves use the normal palloc (such as - * lappend). So we switch to the upper context ourselves as needed. - */ - oldContext = MemoryContextSwitchTo(upperContext); - - for (uint64 rowIndex = 0; rowIndex < SPI_processed; rowIndex++) - { - TupleDesc rowDescriptor = SPI_tuptable->tupdesc; - - /* we use pointers from the tuple, so copy it before processing */ - HeapTuple row = SPI_copytuple(SPI_tuptable->vals[rowIndex]); - CitusDistStat *citusDistStat = HeapTupleToCitusDistStat(row, rowDescriptor); - - /* - * Add the query_host_name and query_host_port which denote where - * the query is being running. - */ - citusDistStat->query_host_name = cstring_to_text(hostname); - citusDistStat->query_host_port = port; - - localNodeCitusDistStatList = lappend(localNodeCitusDistStatList, citusDistStat); - } - - MemoryContextSwitchTo(oldContext); - - SPI_finish(); - - return localNodeCitusDistStatList; -} - - -/* - * HeapTupleToCitusDistStat is a helper function which basically gets a heapTuple - * and fetches the results for the given tuple. Finally, returns CitusDistStat for - * further processing of the data retrieved. - * - * HeapTupleToCitusDistStat() and ParseCitusDistStat() are doing the same thing on - * different input data structures. Thus, any change to here should be reflected in - * the other function as well. - */ -static CitusDistStat * -HeapTupleToCitusDistStat(HeapTuple result, TupleDesc rowDescriptor) -{ - CitusDistStat *citusDistStat = (CitusDistStat *) palloc0(sizeof(CitusDistStat)); - - int initiator_node_identifier = ParseIntFieldFromHeapTuple(result, rowDescriptor, 1); - - ReplaceInitiatorNodeIdentifier(initiator_node_identifier, citusDistStat); - - citusDistStat->distributed_transaction_number = - ParseIntFieldFromHeapTuple(result, rowDescriptor, 2); - citusDistStat->distributed_transaction_stamp = - ParseTimestampTzFieldFromHeapTuple(result, rowDescriptor, 3); - - /* fields from pg_stat_statement */ - citusDistStat->database_id = ParseIntFieldFromHeapTuple(result, rowDescriptor, 4); - citusDistStat->databaese_name = ParseNameFieldFromHeapTuple(result, rowDescriptor, 5); - citusDistStat->process_id = ParseIntFieldFromHeapTuple(result, rowDescriptor, 6); - citusDistStat->usesysid = ParseIntFieldFromHeapTuple(result, rowDescriptor, 7); - citusDistStat->usename = ParseNameFieldFromHeapTuple(result, rowDescriptor, 8); - citusDistStat->application_name = - ParseTextFieldFromHeapTuple(result, rowDescriptor, 9); - citusDistStat->client_addr = ParseInetFieldFromHeapTuple(result, rowDescriptor, 10); - citusDistStat->client_hostname = - ParseTextFieldFromHeapTuple(result, rowDescriptor, 11); - citusDistStat->client_port = ParseIntFieldFromHeapTuple(result, rowDescriptor, 12); - citusDistStat->backend_start = - ParseTimestampTzFieldFromHeapTuple(result, rowDescriptor, 13); - citusDistStat->xact_start = - ParseTimestampTzFieldFromHeapTuple(result, rowDescriptor, 14); - citusDistStat->query_start = - ParseTimestampTzFieldFromHeapTuple(result, rowDescriptor, 15); - citusDistStat->state_change = - ParseTimestampTzFieldFromHeapTuple(result, rowDescriptor, 16); - citusDistStat->wait_event_type = - ParseTextFieldFromHeapTuple(result, rowDescriptor, 17); - citusDistStat->wait_event = ParseTextFieldFromHeapTuple(result, rowDescriptor, 18); - citusDistStat->state = ParseTextFieldFromHeapTuple(result, rowDescriptor, 19); - citusDistStat->backend_xid = ParseXIDFieldFromHeapTuple(result, rowDescriptor, 20); - citusDistStat->backend_xmin = ParseXIDFieldFromHeapTuple(result, rowDescriptor, 21); - citusDistStat->query = ParseTextFieldFromHeapTuple(result, rowDescriptor, 22); - citusDistStat->backend_type = ParseTextFieldFromHeapTuple(result, rowDescriptor, 23); - citusDistStat->global_pid = ParseIntFieldFromHeapTuple(result, rowDescriptor, 24); - - return citusDistStat; -} - - -/* - * ParseIntFieldFromHeapTuple fetches an int64 from a heapTuple or returns 0 if the - * result is NULL. - */ -static int64 -ParseIntFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex) -{ - bool isNull = false; - - Datum resultDatum = SPI_getbinval(tuple, tupdesc, colIndex, &isNull); - if (isNull) - { - return 0; - } - - return DatumGetInt64(resultDatum); -} - - -/* - * ParseTextFieldFromHeapTuple parses a text from a heapTuple or returns - * NULL if the result is NULL. - */ -static text * -ParseTextFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex) -{ - bool isNull = false; - - Datum resultDatum = SPI_getbinval(tuple, tupdesc, colIndex, &isNull); - if (isNull) - { - return NULL; - } - - return (text *) DatumGetPointer(resultDatum); -} - - -/* - * ParseNameFieldFromHeapTuple fetches a name from a heapTuple result or returns NULL if the - * result is NULL. - */ -static Name -ParseNameFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex) -{ - bool isNull = false; - - Datum resultDatum = SPI_getbinval(tuple, tupdesc, colIndex, &isNull); - if (isNull) - { - return NULL; - } - - return (Name) DatumGetPointer(resultDatum); -} - - -/* - * ParseInetFieldFromHeapTuple fetcges an inet from a heapTuple or returns NULL if the - * result is NULL. - */ -static inet * -ParseInetFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex) -{ - bool isNull = false; - - Datum resultDatum = SPI_getbinval(tuple, tupdesc, colIndex, &isNull); - if (isNull) - { - return NULL; - } - - return DatumGetInetP(resultDatum); -} - - -/* - * ParseTimestampTzFieldFromHeapTuple parses a timestamptz from a heapTuple or returns - * DT_NOBEGIN if the result is NULL. - */ -static TimestampTz -ParseTimestampTzFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex) -{ - bool isNull = false; - - Datum resultDatum = SPI_getbinval(tuple, tupdesc, colIndex, &isNull); - if (isNull) - { - return DT_NOBEGIN; - } - - return DatumGetTimestampTz(resultDatum); -} - - -/* - * ParseXIDFieldFromHeapTuple parses a XID from a heapTuple or returns - * PG_UINT32_MAX if the result is NULL. - */ -static TransactionId -ParseXIDFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex) -{ - bool isNull = false; - - Datum resultDatum = SPI_getbinval(tuple, tupdesc, colIndex, &isNull); - if (isNull) - { - /* - * We'd show NULL if user hits the max transaction id, but that should be - * one of the minor problems they'd probably hit. - */ - return PG_UINT32_MAX; - } - - return DatumGetTransactionId(resultDatum); -} - - -/* - * ParseTextField parses a text from a remote result or returns NULL if the - * result is NULL. - */ -static text * -ParseTextField(PGresult *result, int rowIndex, int colIndex) -{ - if (PQgetisnull(result, rowIndex, colIndex)) - { - return NULL; - } - - char *resultString = PQgetvalue(result, rowIndex, colIndex); - Datum resultStringDatum = CStringGetDatum(resultString); - Datum textDatum = DirectFunctionCall1(textin, resultStringDatum); - - return (text *) DatumGetPointer(textDatum); -} - - -/* - * ParseNameField parses a name from a remote result or returns NULL if the - * result is NULL. - */ -static Name -ParseNameField(PGresult *result, int rowIndex, int colIndex) -{ - Datum nameDatum = 0; - - if (PQgetisnull(result, rowIndex, colIndex)) - { - return (Name) nameDatum; - } - - char *resultString = PQgetvalue(result, rowIndex, colIndex); - Datum resultStringDatum = CStringGetDatum(resultString); - nameDatum = DirectFunctionCall1(namein, resultStringDatum); - - return (Name) DatumGetPointer(nameDatum); -} - - -/* - * ParseInetField parses an inet from a remote result or returns NULL if the - * result is NULL. - */ -static inet * -ParseInetField(PGresult *result, int rowIndex, int colIndex) -{ - if (PQgetisnull(result, rowIndex, colIndex)) - { - return NULL; - } - - char *resultString = PQgetvalue(result, rowIndex, colIndex); - Datum resultStringDatum = CStringGetDatum(resultString); - Datum inetDatum = DirectFunctionCall1(inet_in, resultStringDatum); - - return DatumGetInetP(inetDatum); -} - - -/* - * ParseXIDField parses an XID from a remote result or returns 0 if the - * result is NULL. - */ -static TransactionId -ParseXIDField(PGresult *result, int rowIndex, int colIndex) -{ - if (PQgetisnull(result, rowIndex, colIndex)) - { - /* - * We'd show NULL if user hits the max transaction id, but that should be - * one of the minor problems they'd probably hit. - */ - return PG_UINT32_MAX; - } - - char *resultString = PQgetvalue(result, rowIndex, colIndex); - Datum resultStringDatum = CStringGetDatum(resultString); - Datum XIDDatum = DirectFunctionCall1(xidin, resultStringDatum); - - return DatumGetTransactionId(XIDDatum); -} - - -/* - * ReturnCitusDistStats returns the stats for a set returning function. - */ -static void -ReturnCitusDistStats(List *citusStatsList, FunctionCallInfo fcinfo) -{ - TupleDesc tupleDesc; - Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc); - - CitusDistStat *citusDistStat = NULL; - foreach_ptr(citusDistStat, citusStatsList) - { - Datum values[CITUS_DIST_STAT_ACTIVITY_COLS]; - bool nulls[CITUS_DIST_STAT_ACTIVITY_COLS]; - - memset(values, 0, sizeof(values)); - memset(nulls, 0, sizeof(nulls)); - - if (citusDistStat->query_host_name != NULL) - { - values[0] = PointerGetDatum(citusDistStat->query_host_name); - } - else - { - nulls[0] = true; - } - - values[1] = Int32GetDatum(citusDistStat->query_host_port); - - if (citusDistStat->master_query_host_name != NULL) - { - values[2] = PointerGetDatum(citusDistStat->master_query_host_name); - } - else - { - nulls[2] = true; - } - - values[3] = Int32GetDatum(citusDistStat->master_query_host_port); - values[4] = UInt64GetDatum(citusDistStat->distributed_transaction_number); - - if (citusDistStat->distributed_transaction_stamp != DT_NOBEGIN) - { - values[5] = TimestampTzGetDatum(citusDistStat->distributed_transaction_stamp); - } - else - { - nulls[5] = true; - } - - values[6] = ObjectIdGetDatum(citusDistStat->database_id); - - if (citusDistStat->databaese_name != NULL) - { - values[7] = CStringGetDatum(NameStr(*citusDistStat->databaese_name)); - } - else - { - nulls[7] = true; - } - - values[8] = Int32GetDatum(citusDistStat->process_id); - values[9] = ObjectIdGetDatum(citusDistStat->usesysid); - - if (citusDistStat->usename != NULL) - { - values[10] = CStringGetDatum(NameStr(*citusDistStat->usename)); - } - else - { - nulls[10] = true; - } - - if (citusDistStat->application_name != NULL) - { - values[11] = PointerGetDatum(citusDistStat->application_name); - } - else - { - nulls[11] = true; - } - - if (citusDistStat->client_addr != NULL) - { - values[12] = InetPGetDatum(citusDistStat->client_addr); - } - else - { - nulls[12] = true; - } - - if (citusDistStat->client_hostname != NULL) - { - values[13] = PointerGetDatum(citusDistStat->client_hostname); - } - else - { - nulls[13] = true; - } - - values[14] = Int32GetDatum(citusDistStat->client_port); - - if (citusDistStat->backend_start != DT_NOBEGIN) - { - values[15] = TimestampTzGetDatum(citusDistStat->backend_start); - } - else - { - nulls[15] = true; - } - - if (citusDistStat->xact_start != DT_NOBEGIN) - { - values[16] = TimestampTzGetDatum(citusDistStat->xact_start); - } - else - { - nulls[16] = true; - } - - if (citusDistStat->query_start != DT_NOBEGIN) - { - values[17] = TimestampTzGetDatum(citusDistStat->query_start); - } - else - { - nulls[17] = true; - } - - if (citusDistStat->state_change != DT_NOBEGIN) - { - values[18] = TimestampTzGetDatum(citusDistStat->state_change); - } - else - { - nulls[18] = true; - } - - if (citusDistStat->wait_event_type != NULL) - { - values[19] = PointerGetDatum(citusDistStat->wait_event_type); - } - else - { - nulls[19] = true; - } - - if (citusDistStat->wait_event != NULL) - { - values[20] = PointerGetDatum(citusDistStat->wait_event); - } - else - { - nulls[20] = true; - } - - if (citusDistStat->state != NULL) - { - values[21] = PointerGetDatum(citusDistStat->state); - } - else - { - nulls[21] = true; - } - - if (citusDistStat->backend_xid != PG_UINT32_MAX) - { - values[22] = TransactionIdGetDatum(citusDistStat->backend_xid); - } - else - { - nulls[22] = true; - } - - if (citusDistStat->backend_xmin != PG_UINT32_MAX) - { - values[23] = TransactionIdGetDatum(citusDistStat->backend_xmin); - } - else - { - nulls[23] = true; - } - - if (citusDistStat->query != NULL) - { - values[24] = PointerGetDatum(citusDistStat->query); - } - else - { - nulls[24] = true; - } - - if (citusDistStat->backend_type != NULL) - { - values[25] = PointerGetDatum(citusDistStat->backend_type); - } - else - { - nulls[25] = true; - } - - values[26] = Int32GetDatum(citusDistStat->global_pid); - - tuplestore_putvalues(tupleStore, tupleDesc, values, nulls); - } + PG_RETURN_NULL(); } diff --git a/src/test/regress/expected/isolation_citus_dist_activity.out b/src/test/regress/expected/isolation_citus_dist_activity.out index aa536e17f..4777da59c 100644 --- a/src/test/regress/expected/isolation_citus_dist_activity.out +++ b/src/test/regress/expected/isolation_citus_dist_activity.out @@ -32,32 +32,32 @@ pg_sleep (1 row) step s2-view-dist: - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' and query not ILIKE '%BEGIN%' and query NOT ILIKE '%pg_catalog.pg_isolation_test_session_is_blocked%' ORDER BY query DESC; + SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%')) AND backend_type = 'client backend' ORDER BY query DESC; -query |query_hostname |query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname +query |state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- ALTER TABLE test_table ADD COLUMN x INT; -|coordinator_host| 57636|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +|idle in transaction|Client |ClientRead|postgres|regression (1 row) step s3-view-worker: - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND backend_type = 'client backend' ORDER BY query DESC; + SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%csa_from_one_node%')) AND is_worker_query = true AND backend_type = 'client backend' ORDER BY query DESC; -query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname +query |state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- SELECT worker_apply_shard_ddl_command (1300004, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; -')|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +')|idle in transaction|Client |ClientRead|postgres|regression SELECT worker_apply_shard_ddl_command (1300003, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; -')|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +')|idle in transaction|Client |ClientRead|postgres|regression SELECT worker_apply_shard_ddl_command (1300002, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; -')|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +')|idle in transaction|Client |ClientRead|postgres|regression SELECT worker_apply_shard_ddl_command (1300001, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; -')|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +')|idle in transaction|Client |ClientRead|postgres|regression (4 rows) step s2-rollback: @@ -102,21 +102,21 @@ pg_sleep (1 row) step s2-view-dist: - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' and query not ILIKE '%BEGIN%' and query NOT ILIKE '%pg_catalog.pg_isolation_test_session_is_blocked%' ORDER BY query DESC; + SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%')) AND backend_type = 'client backend' ORDER BY query DESC; -query |query_hostname |query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname +query |state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- INSERT INTO test_table VALUES (100, 100); -|coordinator_host| 57636|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +|idle in transaction|Client |ClientRead|postgres|regression (1 row) step s3-view-worker: - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND backend_type = 'client backend' ORDER BY query DESC; + SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%csa_from_one_node%')) AND is_worker_query = true AND backend_type = 'client backend' ORDER BY query DESC; -query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname +query |state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- -INSERT INTO public.test_table_1300008 (column1, column2) VALUES (100, 100)|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +INSERT INTO public.test_table_1300008 (column1, column2) VALUES (100, 100)|idle in transaction|Client |ClientRead|postgres|regression (1 row) step s2-rollback: @@ -166,24 +166,24 @@ pg_sleep (1 row) step s2-view-dist: - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' and query not ILIKE '%BEGIN%' and query NOT ILIKE '%pg_catalog.pg_isolation_test_session_is_blocked%' ORDER BY query DESC; + SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%')) AND backend_type = 'client backend' ORDER BY query DESC; -query |query_hostname |query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname +query |state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- SELECT count(*) FROM test_table; -|coordinator_host| 57636|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +|idle in transaction|Client |ClientRead|postgres|regression (1 row) step s3-view-worker: - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND backend_type = 'client backend' ORDER BY query DESC; + SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%csa_from_one_node%')) AND is_worker_query = true AND backend_type = 'client backend' ORDER BY query DESC; -query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname +query |state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- -SELECT count(*) AS count FROM public.test_table_1300014 test_table WHERE true|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression -SELECT count(*) AS count FROM public.test_table_1300013 test_table WHERE true|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression -SELECT count(*) AS count FROM public.test_table_1300012 test_table WHERE true|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression -SELECT count(*) AS count FROM public.test_table_1300011 test_table WHERE true|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +SELECT count(*) AS count FROM public.test_table_1300014 test_table WHERE true|idle in transaction|Client |ClientRead|postgres|regression +SELECT count(*) AS count FROM public.test_table_1300013 test_table WHERE true|idle in transaction|Client |ClientRead|postgres|regression +SELECT count(*) AS count FROM public.test_table_1300012 test_table WHERE true|idle in transaction|Client |ClientRead|postgres|regression +SELECT count(*) AS count FROM public.test_table_1300011 test_table WHERE true|idle in transaction|Client |ClientRead|postgres|regression (4 rows) step s2-rollback: @@ -233,21 +233,21 @@ pg_sleep (1 row) step s2-view-dist: - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' and query not ILIKE '%BEGIN%' and query NOT ILIKE '%pg_catalog.pg_isolation_test_session_is_blocked%' ORDER BY query DESC; + SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%')) AND backend_type = 'client backend' ORDER BY query DESC; -query |query_hostname |query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname +query |state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- SELECT count(*) FROM test_table WHERE column1 = 55; -|coordinator_host| 57636|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +|idle in transaction|Client |ClientRead|postgres|regression (1 row) step s3-view-worker: - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND backend_type = 'client backend' ORDER BY query DESC; + SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%csa_from_one_node%')) AND is_worker_query = true AND backend_type = 'client backend' ORDER BY query DESC; -query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname +query |state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- -SELECT count(*) AS count FROM public.test_table_1300017 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +SELECT count(*) AS count FROM public.test_table_1300017 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)|idle in transaction|Client |ClientRead|postgres|regression (1 row) step s2-rollback: diff --git a/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out b/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out index 109c61186..ce7e4f7fa 100644 --- a/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out +++ b/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out @@ -27,13 +27,13 @@ step s2-update-ref-table: SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; -blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +blocked_statement |current_statement_in_blocking_process --------------------------------------------------------------------- UPDATE ref_table SET value_1 = 12 WHERE user_id = 1| UPDATE ref_table SET value_1 = 15; -|coordinator_host |coordinator_host | 57636| 57636 + (1 row) step s1-commit: @@ -112,11 +112,11 @@ step s2-update-ref-table: SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; -blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +blocked_statement |current_statement_in_blocking_process --------------------------------------------------------------------- -UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|coordinator_host |coordinator_host | 57636| 57636 +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|UPDATE ref_table SET value_1 = 12 WHERE user_id = 1 (1 row) step s1-commit-worker: @@ -208,11 +208,11 @@ step s2-update-dist-table: SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; -blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +blocked_statement |current_statement_in_blocking_process --------------------------------------------------------------------- -UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |coordinator_host |coordinator_host | 57636| 57636 +UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 (1 row) step s1-commit-worker: @@ -304,11 +304,11 @@ step s2-update-ref-table: SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; -blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +blocked_statement |current_statement_in_blocking_process --------------------------------------------------------------------- -UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|DELETE FROM ref_table WHERE user_id = 1|coordinator_host |coordinator_host | 57636| 57636 +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|DELETE FROM ref_table WHERE user_id = 1 (1 row) step s1-commit-worker: @@ -400,11 +400,11 @@ step s2-update-ref-table: SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; -blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +blocked_statement |current_statement_in_blocking_process --------------------------------------------------------------------- -UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |coordinator_host | 57636| 57636 +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|INSERT INTO ref_table VALUES(8,81),(9,91) (1 row) step s1-commit-worker: @@ -501,9 +501,9 @@ run_commands_on_session_level_connection_to_node (1 row) step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; -blocked_statement|current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +blocked_statement|current_statement_in_blocking_process --------------------------------------------------------------------- (0 rows) @@ -590,11 +590,11 @@ step s2-update-ref-table: SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; -blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +blocked_statement |current_statement_in_blocking_process --------------------------------------------------------------------- -UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|COPY ref_table FROM PROGRAM 'echo 10, 101 && echo 11, 111' WITH CSV|coordinator_host |coordinator_host | 57636| 57636 +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|COPY ref_table FROM PROGRAM 'echo 10, 101 && echo 11, 111' WITH CSV (1 row) step s1-commit-worker: @@ -691,9 +691,9 @@ run_commands_on_session_level_connection_to_node (1 row) step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; -blocked_statement|current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +blocked_statement|current_statement_in_blocking_process --------------------------------------------------------------------- (0 rows) @@ -785,9 +785,9 @@ run_commands_on_session_level_connection_to_node (1 row) step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; -blocked_statement|current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +blocked_statement|current_statement_in_blocking_process --------------------------------------------------------------------- (0 rows) @@ -874,11 +874,11 @@ step s2-update-ref-table: SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; -blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +blocked_statement |current_statement_in_blocking_process --------------------------------------------------------------------- -UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|SELECT * FROM ref_table FOR UPDATE |coordinator_host |coordinator_host | 57636| 57636 +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|SELECT * FROM ref_table FOR UPDATE (1 row) step s1-commit-worker: @@ -957,13 +957,13 @@ step s1-alter-table: ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; -blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +blocked_statement |current_statement_in_blocking_process --------------------------------------------------------------------- ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); -|INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |coordinator_host | 57636| 57636 +|INSERT INTO ref_table VALUES(8,81),(9,91) (1 row) step s2-commit-worker: @@ -1003,15 +1003,15 @@ step s2-update-on-the-coordinator: UPDATE tt1 SET value_1 = 4; step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; -blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +blocked_statement |current_statement_in_blocking_process --------------------------------------------------------------------- UPDATE tt1 SET value_1 = 4; | UPDATE tt1 SET value_1 = 4; - |coordinator_host |coordinator_host | 57636| 57636 + (1 row) step s1-commit: @@ -1069,11 +1069,11 @@ step s4-update-dist-table: SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; -blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +blocked_statement |current_statement_in_blocking_process --------------------------------------------------------------------- -UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 |coordinator_host |coordinator_host | 57636| 57636 +UPDATE tt1 SET value_1 = 5|UPDATE tt1 SET value_1 = 4 (1 row) step s1-commit-worker: @@ -1157,11 +1157,11 @@ step s2-update-dist-table-id-1: SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; -blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +blocked_statement |current_statement_in_blocking_process --------------------------------------------------------------------- -UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|coordinator_host |coordinator_host | 57636| 57636 +UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|UPDATE tt1 SET value_1 = 4 WHERE user_id = 1 (1 row) step s1-commit-worker: @@ -1219,13 +1219,13 @@ step s2-update-ref-table: SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; -blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +blocked_statement |current_statement_in_blocking_process --------------------------------------------------------------------- UPDATE ref_table SET value_1 = 12 WHERE user_id = 1| UPDATE ref_table SET value_1 = 15; -|coordinator_host |coordinator_host | 57636| 57636 + (1 row) step s1-commit: diff --git a/src/test/regress/expected/isolation_global_pid.out b/src/test/regress/expected/isolation_global_pid.out index ad8630fb5..21573fd16 100644 --- a/src/test/regress/expected/isolation_global_pid.out +++ b/src/test/regress/expected/isolation_global_pid.out @@ -1,6 +1,6 @@ Parsed test spec with 2 sessions -starting permutation: s1-start-session-level-connection s1-worker-begin s1-worker-select s2-coordinator-citus_stat_activity s2-coordinator-citus_dist_stat_activity s2-coordinator-citus_worker_stat_activity s1-worker-commit s1-stop-session-level-connection +starting permutation: s1-start-session-level-connection s1-worker-begin s1-worker-select s2-coordinator-citus_stat_activity s2-coordinator-citus_dist_stat_activity s2-coordinator-citus_stat_activity-in-workers s1-worker-commit s1-stop-session-level-connection create_distributed_table --------------------------------------------------------------------- @@ -39,7 +39,7 @@ t (1 row) step s2-coordinator-citus_dist_stat_activity: - SELECT query FROM citus_dist_stat_activity() WHERE global_pid IN ( + SELECT query FROM citus_dist_stat_activity WHERE global_pid IN ( SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' ) AND query NOT ILIKE '%run_commands_on_session_level_connection_to_node%' @@ -50,10 +50,12 @@ query SET citus.enable_local_execution TO off; SET citus.force_max_query_parallelization TO ON; SELECT * FROM dist_table (1 row) -step s2-coordinator-citus_worker_stat_activity: - SELECT query FROM citus_worker_stat_activity() WHERE global_pid IN ( +step s2-coordinator-citus_stat_activity-in-workers: + SELECT query FROM citus_stat_activity WHERE global_pid IN ( SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' ) + AND is_worker_query = true + AND backend_type = 'client backend' ORDER BY 1; query @@ -86,7 +88,7 @@ citus_remove_node (1 row) -starting permutation: s1-coordinator-begin s1-coordinator-select s2-coordinator-citus_stat_activity s2-coordinator-citus_dist_stat_activity s2-coordinator-citus_worker_stat_activity s2-coordinator-get_all_active_transactions s2-coordinator-get_global_active_transactions s1-coordinator-commit +starting permutation: s1-coordinator-begin s1-coordinator-select s2-coordinator-citus_stat_activity s2-coordinator-citus_dist_stat_activity s2-coordinator-citus_stat_activity-in-workers s2-coordinator-get_all_active_transactions s2-coordinator-get_global_active_transactions s1-coordinator-commit create_distributed_table --------------------------------------------------------------------- @@ -113,7 +115,7 @@ t (1 row) step s2-coordinator-citus_dist_stat_activity: - SELECT query FROM citus_dist_stat_activity() WHERE global_pid IN ( + SELECT query FROM citus_dist_stat_activity WHERE global_pid IN ( SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' ) AND query NOT ILIKE '%run_commands_on_session_level_connection_to_node%' @@ -128,10 +130,12 @@ query (1 row) -step s2-coordinator-citus_worker_stat_activity: - SELECT query FROM citus_worker_stat_activity() WHERE global_pid IN ( +step s2-coordinator-citus_stat_activity-in-workers: + SELECT query FROM citus_stat_activity WHERE global_pid IN ( SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' ) + AND is_worker_query = true + AND backend_type = 'client backend' ORDER BY 1; query diff --git a/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out index 8fef72010..a2792d0c4 100644 --- a/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out @@ -99,34 +99,37 @@ pg_sleep (1 row) step s2-view-dist: - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' AND query NOT ILIKE '%add_node%' ORDER BY query DESC; + SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE backend_type = 'client backend' AND query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%pg_isolation_test_session_is_blocked%'), ('%BEGIN%'), ('%add_node%')) ORDER BY query DESC; -query |query_hostname |query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname +query |state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- SELECT check_distributed_deadlocks(); -|coordinator_host| 57636| | 0|idle |Client |ClientRead|postgres|regression +|idle |Client |ClientRead|postgres|regression update ref_table set a = a + 1; - |coordinator_host| 57636| | 0|idle in transaction|Client |ClientRead|postgres|regression + |idle in transaction|Client |ClientRead|postgres|regression (2 rows) step s2-view-worker: - SELECT query, query_hostname, query_hostport, distributed_query_host_name, - distributed_query_host_port, state, wait_event_type, wait_event, usename, datname - FROM citus_worker_stat_activity - WHERE query NOT ILIKE '%pg_prepared_xacts%' AND - query NOT ILIKE '%COMMIT%' AND - query NOT ILIKE '%dump_local_%' AND - query NOT ILIKE '%citus_internal_local_blocked_processes%' AND - query NOT ILIKE '%add_node%' AND - backend_type = 'client backend' - ORDER BY query, query_hostport DESC; + SELECT query, state, wait_event_type, wait_event, usename, datname + FROM citus_stat_activity + WHERE query NOT ILIKE ALL(VALUES + ('%pg_prepared_xacts%'), + ('%COMMIT%'), + ('%dump_local_%'), + ('%citus_internal_local_blocked_processes%'), + ('%add_node%'), + ('%csa_from_one_node%')) + AND is_worker_query = true + AND backend_type = 'client backend' + AND query != '' + ORDER BY query DESC; -query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname +query |state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- -UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638| | 0|idle in transaction|Client |ClientRead|postgres|regression -UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637| | 0|idle in transaction|Client |ClientRead|postgres|regression +UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|idle in transaction|Client |ClientRead|postgres|regression +UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|idle in transaction|Client |ClientRead|postgres|regression (2 rows) step s2-end: diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index c43299e95..7c9484619 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1002,12 +1002,15 @@ SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- function citus_disable_node(text,integer) void | + function citus_dist_stat_activity() SETOF record | + function citus_worker_stat_activity() SETOF record | function create_distributed_function(regprocedure,text,text) void | function master_append_table_to_shard(bigint,text,text,integer) real | function master_apply_delete_command(text) integer | function master_get_table_metadata(text) record | function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean) SETOF record | - | function citus_backend_gpid() bigint + view citus_worker_stat_activity | + | function citus_backend_gpid() bigint | function citus_calculate_gpid(integer,integer) bigint | function citus_check_cluster_node_health() SETOF record | function citus_check_connection_to_node(text,integer) boolean @@ -1031,7 +1034,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | function worker_drop_shell_table(text) void | function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) SETOF record | view citus_stat_activity -(30 rows) +(33 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index a340b594b..4a7a2301c 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -53,7 +53,6 @@ ORDER BY 1; function citus_dist_partition_cache_invalidate() function citus_dist_placement_cache_invalidate() function citus_dist_shard_cache_invalidate() - function citus_dist_stat_activity() function citus_drain_node(text,integer,citus.shard_transfer_mode,name) function citus_drop_all_shards(regclass,text,text,boolean) function citus_drop_trigger() @@ -121,7 +120,6 @@ ORDER BY 1; function citus_update_table_statistics(regclass) function citus_validate_rebalance_strategy_functions(regproc,regproc,regproc) function citus_version() - function citus_worker_stat_activity() function column_name_to_column(regclass,text) function column_to_column_name(regclass,text) function columnar.columnar_handler(internal) @@ -277,8 +275,7 @@ ORDER BY 1; view citus_shards_on_worker view citus_stat_activity view citus_stat_statements - view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(264 rows) +(261 rows) diff --git a/src/test/regress/spec/isolation_citus_dist_activity.spec b/src/test/regress/spec/isolation_citus_dist_activity.spec index 5047a656c..9a4e148a1 100644 --- a/src/test/regress/spec/isolation_citus_dist_activity.spec +++ b/src/test/regress/spec/isolation_citus_dist_activity.spec @@ -71,7 +71,7 @@ step "s2-sleep" step "s2-view-dist" { - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' and query not ILIKE '%BEGIN%' and query NOT ILIKE '%pg_catalog.pg_isolation_test_session_is_blocked%' ORDER BY query DESC; + SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%')) AND backend_type = 'client backend' ORDER BY query DESC; } @@ -89,7 +89,7 @@ step "s3-rollback" step "s3-view-worker" { - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND backend_type = 'client backend' ORDER BY query DESC; + SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%csa_from_one_node%')) AND is_worker_query = true AND backend_type = 'client backend' ORDER BY query DESC; } // we prefer to sleep before "s2-view-dist" so that we can ensure diff --git a/src/test/regress/spec/isolation_get_distributed_wait_queries_mx.spec b/src/test/regress/spec/isolation_get_distributed_wait_queries_mx.spec index 84daaf792..5954f2979 100644 --- a/src/test/regress/spec/isolation_get_distributed_wait_queries_mx.spec +++ b/src/test/regress/spec/isolation_get_distributed_wait_queries_mx.spec @@ -159,7 +159,7 @@ session "s3" step "s3-select-distributed-waiting-queries" { - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + SELECT blocked_statement, current_statement_in_blocking_process FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; } // session s1 and s4 executes the commands on the same worker node diff --git a/src/test/regress/spec/isolation_global_pid.spec b/src/test/regress/spec/isolation_global_pid.spec index a7804dd71..690d1768a 100644 --- a/src/test/regress/spec/isolation_global_pid.spec +++ b/src/test/regress/spec/isolation_global_pid.spec @@ -69,18 +69,20 @@ step "s2-coordinator-citus_stat_activity" step "s2-coordinator-citus_dist_stat_activity" { - SELECT query FROM citus_dist_stat_activity() WHERE global_pid IN ( + SELECT query FROM citus_dist_stat_activity WHERE global_pid IN ( SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' ) AND query NOT ILIKE '%run_commands_on_session_level_connection_to_node%' ORDER BY 1; } -step "s2-coordinator-citus_worker_stat_activity" +step "s2-coordinator-citus_stat_activity-in-workers" { - SELECT query FROM citus_worker_stat_activity() WHERE global_pid IN ( + SELECT query FROM citus_stat_activity WHERE global_pid IN ( SELECT global_pid FROM citus_stat_activity WHERE query LIKE '%SELECT * FROM dist\_table%' ) + AND is_worker_query = true + AND backend_type = 'client backend' ORDER BY 1; } @@ -101,7 +103,7 @@ step "s2-coordinator-get_global_active_transactions" // worker - coordinator -permutation "s1-start-session-level-connection" "s1-worker-begin" "s1-worker-select" "s2-coordinator-citus_stat_activity" "s2-coordinator-citus_dist_stat_activity" "s2-coordinator-citus_worker_stat_activity" "s1-worker-commit" "s1-stop-session-level-connection" +permutation "s1-start-session-level-connection" "s1-worker-begin" "s1-worker-select" "s2-coordinator-citus_stat_activity" "s2-coordinator-citus_dist_stat_activity" "s2-coordinator-citus_stat_activity-in-workers" "s1-worker-commit" "s1-stop-session-level-connection" // coordinator - coordinator -permutation "s1-coordinator-begin" "s1-coordinator-select" "s2-coordinator-citus_stat_activity" "s2-coordinator-citus_dist_stat_activity" "s2-coordinator-citus_worker_stat_activity" "s2-coordinator-get_all_active_transactions" "s2-coordinator-get_global_active_transactions" "s1-coordinator-commit" +permutation "s1-coordinator-begin" "s1-coordinator-select" "s2-coordinator-citus_stat_activity" "s2-coordinator-citus_dist_stat_activity" "s2-coordinator-citus_stat_activity-in-workers" "s2-coordinator-get_all_active_transactions" "s2-coordinator-get_global_active_transactions" "s1-coordinator-commit" diff --git a/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec b/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec index 09da5970d..f55f869dd 100644 --- a/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec +++ b/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec @@ -81,21 +81,24 @@ step "s2-lock-ref-table-placement-on-coordinator" step "s2-view-dist" { - SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%pg_isolation_test_session_is_blocked%' AND query NOT ILIKE '%BEGIN%' AND query NOT ILIKE '%add_node%' ORDER BY query DESC; + SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE backend_type = 'client backend' AND query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%pg_isolation_test_session_is_blocked%'), ('%BEGIN%'), ('%add_node%')) ORDER BY query DESC; } step "s2-view-worker" { - SELECT query, query_hostname, query_hostport, distributed_query_host_name, - distributed_query_host_port, state, wait_event_type, wait_event, usename, datname - FROM citus_worker_stat_activity - WHERE query NOT ILIKE '%pg_prepared_xacts%' AND - query NOT ILIKE '%COMMIT%' AND - query NOT ILIKE '%dump_local_%' AND - query NOT ILIKE '%citus_internal_local_blocked_processes%' AND - query NOT ILIKE '%add_node%' AND - backend_type = 'client backend' - ORDER BY query, query_hostport DESC; + SELECT query, state, wait_event_type, wait_event, usename, datname + FROM citus_stat_activity + WHERE query NOT ILIKE ALL(VALUES + ('%pg_prepared_xacts%'), + ('%COMMIT%'), + ('%dump_local_%'), + ('%citus_internal_local_blocked_processes%'), + ('%add_node%'), + ('%csa_from_one_node%')) + AND is_worker_query = true + AND backend_type = 'client backend' + AND query != '' + ORDER BY query DESC; }