From dffcafc0967de66429458d5910bcc61478d40824 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 9 Feb 2022 17:35:42 +0100 Subject: [PATCH] Use global pids in citus_lock_waits --- .../distributed/sql/citus--10.2-4--11.0-1.sql | 6 +- .../sql/downgrades/citus--11.0-1--10.2-4.sql | 258 ++++++++----- .../sql/udfs/citus_blocking_pids/11.0-1.sql | 34 ++ .../sql/udfs/citus_blocking_pids/latest.sql | 34 ++ .../11.0-1.sql | 12 +- .../latest.sql | 12 +- .../11.0-1.sql | 10 +- .../latest.sql | 10 +- .../11.0-1.sql | 56 +++ .../latest.sql | 12 +- .../sql/udfs/citus_lock_waits/11.0-1.sql | 8 +- .../sql/udfs/citus_lock_waits/latest.sql | 8 +- .../distributed/transaction/lock_graph.c | 188 ++++++++- ...lation_get_distributed_wait_queries_mx.out | 361 ++++++++++++------ .../isolation_rebalancer_deferred_drop.out | 7 +- ...licate_reference_tables_to_coordinator.out | 7 +- src/test/regress/expected/multi_extension.out | 118 +++--- .../expected/upgrade_list_citus_objects.out | 8 +- ...ation_get_distributed_wait_queries_mx.spec | 18 +- ...icate_reference_tables_to_coordinator.spec | 3 +- 20 files changed, 833 insertions(+), 337 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_blocking_pids/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_blocking_pids/latest.sql rename src/backend/distributed/sql/udfs/{dump_global_wait_edges => citus_internal_global_blocked_processes}/11.0-1.sql (62%) rename src/backend/distributed/sql/udfs/{dump_global_wait_edges => citus_internal_global_blocked_processes}/latest.sql (62%) rename src/backend/distributed/sql/udfs/{dump_local_wait_edges => citus_internal_local_blocked_processes}/11.0-1.sql (67%) rename src/backend/distributed/sql/udfs/{dump_local_wait_edges => citus_internal_local_blocked_processes}/latest.sql (67%) create mode 100644 src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.0-1.sql diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index de6748c12..849b28761 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -17,11 +17,13 @@ #include "udfs/get_all_active_transactions/11.0-1.sql" #include "udfs/get_global_active_transactions/11.0-1.sql" -#include "udfs/dump_local_wait_edges/11.0-1.sql" -#include "udfs/dump_global_wait_edges/11.0-1.sql" +#include "udfs/citus_internal_local_blocked_processes/11.0-1.sql" +#include "udfs/citus_internal_global_blocked_processes/11.0-1.sql" #include "udfs/citus_worker_stat_activity/11.0-1.sql" #include "udfs/worker_create_or_replace_object/11.0-1.sql" +#include "udfs/citus_isolation_test_session_is_blocked/11.0-1.sql" +#include "udfs/citus_blocking_pids/11.0-1.sql" CREATE VIEW citus.citus_worker_stat_activity AS SELECT * FROM pg_catalog.citus_worker_stat_activity(); diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index 53d03f986..ba13b134a 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -113,39 +113,8 @@ CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int RESET search_path; -DROP FUNCTION dump_local_wait_edges CASCADE; -CREATE FUNCTION pg_catalog.dump_local_wait_edges( - OUT waiting_pid int4, - OUT waiting_node_id int4, - OUT waiting_transaction_num int8, - OUT waiting_transaction_stamp timestamptz, - OUT blocking_pid int4, - OUT blocking_node_id int4, - OUT blocking_transaction_num int8, - OUT blocking_transaction_stamp timestamptz, - OUT blocking_transaction_waiting bool) -RETURNS SETOF RECORD -LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges() -IS 'returns all local lock wait chains, that start from distributed transactions'; - -DROP FUNCTION dump_global_wait_edges CASCADE; -CREATE FUNCTION pg_catalog.dump_global_wait_edges( - OUT waiting_pid int4, - OUT waiting_node_id int4, - OUT waiting_transaction_num int8, - OUT waiting_transaction_stamp timestamptz, - OUT blocking_pid int4, - OUT blocking_node_id int4, - OUT blocking_transaction_num int8, - OUT blocking_transaction_stamp timestamptz, - OUT blocking_transaction_waiting bool) -RETURNS SETOF RECORD -LANGUAGE 'c' STRICT -AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges() -IS 'returns a global list of blocked transactions originating from this node'; +DROP FUNCTION citus_internal_local_blocked_processes CASCADE; +DROP FUNCTION citus_internal_global_blocked_processes CASCADE; DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE; @@ -173,6 +142,162 @@ ALTER VIEW citus.citus_dist_stat_activity SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC; SET search_path = 'pg_catalog'; +DROP FUNCTION citus_worker_stat_activity CASCADE; + +CREATE OR REPLACE FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$citus_worker_stat_activity$$; + +COMMENT ON FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) +IS 'returns distributed transaction activity on shards of distributed tables'; + +DROP FUNCTION pg_catalog.worker_create_or_replace_object(text[]); +#include "../udfs/worker_create_or_replace_object/9.0-1.sql" + +DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(bigint) CASCADE; +DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(bigint, bigint) CASCADE; + +DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE; +CREATE FUNCTION pg_catalog.dump_local_wait_edges( + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges() +IS 'returns all local lock wait chains, that start from distributed transactions'; + +DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE; +CREATE FUNCTION pg_catalog.dump_global_wait_edges( + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE 'c' STRICT +AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges() +IS 'returns a global list of blocked transactions originating from this node'; + +DROP FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[]); +CREATE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[]) +RETURNS boolean AS $$ + DECLARE + mBlockedTransactionNum int8; + workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id'); + coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id'); + BEGIN + IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN + RETURN true; + END IF; + + -- pg says we're not blocked locally; check whether we're blocked globally. + -- Note that worker process may be blocked or waiting for a lock. So we need to + -- get transaction number for both of them. Following IF provides the transaction + -- number when the worker process waiting for other session. + IF EXISTS (SELECT transaction_number FROM get_global_active_transactions() + WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN + SELECT transaction_number INTO mBlockedTransactionNum FROM get_global_active_transactions() + WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId; + ELSE + -- Check whether transactions initiated from the coordinator get locked + SELECT transaction_number INTO mBlockedTransactionNum + FROM get_all_active_transactions() WHERE process_id = pBlockedPid; + END IF; + + RETURN EXISTS ( + SELECT 1 FROM dump_global_wait_edges() + WHERE waiting_transaction_num = mBlockedTransactionNum + ) OR EXISTS ( + -- Check on the workers if any logical replication job spawned by the + -- current PID is blocked, by checking it's application name + -- Query is heavily based on: https://wiki.postgresql.org/wiki/Lock_Monitoring + SELECT result FROM run_command_on_workers($two$ + SELECT blocked_activity.application_name AS blocked_application + FROM pg_catalog.pg_locks blocked_locks + JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid + JOIN pg_catalog.pg_locks blocking_locks + ON blocking_locks.locktype = blocked_locks.locktype + AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE + AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation + AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page + AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple + AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid + AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid + AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid + AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid + AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid + AND blocking_locks.pid != blocked_locks.pid + JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid + WHERE NOT blocked_locks.GRANTED AND blocked_activity.application_name LIKE 'citus_shard_move_subscription_%' + $two$) where result='citus_shard_move_subscription_' || pBlockedPid); + + END; +$$ LANGUAGE plpgsql; + +REVOKE ALL ON FUNCTION citus_isolation_test_session_is_blocked(integer,integer[]) FROM PUBLIC; + +DROP FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer); +CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer) +RETURNS int4[] AS $$ + DECLARE + mLocalBlockingPids int4[]; + mRemoteBlockingPids int4[]; + mLocalTransactionNum int8; + BEGIN + SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids; + + IF (array_length(mLocalBlockingPids, 1) > 0) THEN + RETURN mLocalBlockingPids; + END IF; + + -- pg says we're not blocked locally; check whether we're blocked globally. + SELECT transaction_number INTO mLocalTransactionNum + FROM get_all_active_transactions() WHERE process_id = pBlockedPid; + + SELECT array_agg(process_id) INTO mRemoteBlockingPids FROM ( + WITH activeTransactions AS ( + SELECT process_id, transaction_number FROM get_all_active_transactions() + ), blockingTransactions AS ( + SELECT blocking_transaction_num AS txn_num FROM dump_global_wait_edges() + WHERE waiting_transaction_num = mLocalTransactionNum + ) + SELECT activeTransactions.process_id FROM activeTransactions, blockingTransactions + WHERE activeTransactions.transaction_number = blockingTransactions.txn_num + ) AS sub; + + RETURN mRemoteBlockingPids; + END; +$$ LANGUAGE plpgsql; +REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC; + +CREATE VIEW citus.citus_worker_stat_activity AS +SELECT * FROM pg_catalog.citus_worker_stat_activity(); +ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC; -- we have to recreate this view because we drop citus_dist_stat_activity that this view depends CREATE VIEW citus.citus_lock_waits AS @@ -184,7 +309,7 @@ citus_dist_stat_activity AS ), unique_global_wait_edges AS ( - SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges() + SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges() ), citus_dist_stat_activity_with_node_id AS ( @@ -217,67 +342,4 @@ JOIN ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; -DROP FUNCTION citus_worker_stat_activity CASCADE; - -CREATE OR REPLACE FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, - OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, - OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, - OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, - OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, - OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) -RETURNS SETOF RECORD -LANGUAGE C STRICT AS 'MODULE_PATHNAME', -$$citus_worker_stat_activity$$; - -COMMENT ON FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, - OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, - OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, - OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, - OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, - OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) -IS 'returns distributed transaction activity on shards of distributed tables'; - -CREATE VIEW citus.citus_worker_stat_activity AS -SELECT * FROM pg_catalog.citus_worker_stat_activity(); -ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog; -GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC; - -DROP FUNCTION pg_catalog.worker_create_or_replace_object(text[]); -#include "../udfs/worker_create_or_replace_object/9.0-1.sql" - -DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(bigint) CASCADE; -DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(bigint, bigint) CASCADE; - -DROP FUNCTION pg_catalog.dump_global_wait_edges; -CREATE FUNCTION pg_catalog.dump_local_wait_edges( - OUT waiting_pid int4, - OUT waiting_node_id int4, - OUT waiting_transaction_num int8, - OUT waiting_transaction_stamp timestamptz, - OUT blocking_pid int4, - OUT blocking_node_id int4, - OUT blocking_transaction_num int8, - OUT blocking_transaction_stamp timestamptz, - OUT blocking_transaction_waiting bool) -RETURNS SETOF RECORD -LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges() -IS 'returns all local lock wait chains, that start from distributed transactions'; - -DROP FUNCTION pg_catalog.dump_global_wait_edges; -CREATE FUNCTION pg_catalog.dump_global_wait_edges( - OUT waiting_pid int4, - OUT waiting_node_id int4, - OUT waiting_transaction_num int8, - OUT waiting_transaction_stamp timestamptz, - OUT blocking_pid int4, - OUT blocking_node_id int4, - OUT blocking_transaction_num int8, - OUT blocking_transaction_stamp timestamptz, - OUT blocking_transaction_waiting bool) -RETURNS SETOF RECORD -LANGUAGE 'c' STRICT -AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges() -IS 'returns a global list of blocked transactions originating from this node'; +RESET search_path; diff --git a/src/backend/distributed/sql/udfs/citus_blocking_pids/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_blocking_pids/11.0-1.sql new file mode 100644 index 000000000..c7e607c1c --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_blocking_pids/11.0-1.sql @@ -0,0 +1,34 @@ +DROP FUNCTION pg_catalog.citus_blocking_pids; +CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer) +RETURNS int4[] AS $$ + DECLARE + mLocalBlockingPids int4[]; + mRemoteBlockingPids int4[]; + mLocalGlobalPid int8; + BEGIN + SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids; + + IF (array_length(mLocalBlockingPids, 1) > 0) THEN + RETURN mLocalBlockingPids; + END IF; + + -- pg says we're not blocked locally; check whether we're blocked globally. + SELECT global_pid INTO mLocalGlobalPid + FROM get_all_active_transactions() WHERE process_id = pBlockedPid; + + SELECT array_agg(global_pid) INTO mRemoteBlockingPids FROM ( + WITH activeTransactions AS ( + SELECT global_pid FROM get_all_active_transactions() + ), blockingTransactions AS ( + SELECT blocking_global_pid FROM citus_internal_global_blocked_processes() + WHERE waiting_global_pid = mLocalGlobalPid + ) + SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions + WHERE activeTransactions.global_pid = blockingTransactions.blocking_global_pid + ) AS sub; + + RETURN mRemoteBlockingPids; + END; +$$ LANGUAGE plpgsql; + +REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_blocking_pids/latest.sql b/src/backend/distributed/sql/udfs/citus_blocking_pids/latest.sql new file mode 100644 index 000000000..c7e607c1c --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_blocking_pids/latest.sql @@ -0,0 +1,34 @@ +DROP FUNCTION pg_catalog.citus_blocking_pids; +CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer) +RETURNS int4[] AS $$ + DECLARE + mLocalBlockingPids int4[]; + mRemoteBlockingPids int4[]; + mLocalGlobalPid int8; + BEGIN + SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids; + + IF (array_length(mLocalBlockingPids, 1) > 0) THEN + RETURN mLocalBlockingPids; + END IF; + + -- pg says we're not blocked locally; check whether we're blocked globally. + SELECT global_pid INTO mLocalGlobalPid + FROM get_all_active_transactions() WHERE process_id = pBlockedPid; + + SELECT array_agg(global_pid) INTO mRemoteBlockingPids FROM ( + WITH activeTransactions AS ( + SELECT global_pid FROM get_all_active_transactions() + ), blockingTransactions AS ( + SELECT blocking_global_pid FROM citus_internal_global_blocked_processes() + WHERE waiting_global_pid = mLocalGlobalPid + ) + SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions + WHERE activeTransactions.global_pid = blockingTransactions.blocking_global_pid + ) AS sub; + + RETURN mRemoteBlockingPids; + END; +$$ LANGUAGE plpgsql; + +REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_internal_global_blocked_processes/11.0-1.sql similarity index 62% rename from src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql rename to src/backend/distributed/sql/udfs/citus_internal_global_blocked_processes/11.0-1.sql index 32e25e217..510cdf93d 100644 --- a/src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_global_blocked_processes/11.0-1.sql @@ -1,12 +1,10 @@ -DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE; -CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( - distributed_tx_only boolean DEFAULT true, - OUT waiting_global_pid int8, +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_global_blocked_processes( + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, OUT waiting_transaction_stamp timestamptz, - OUT blocking_global_pid int8, + OUT blocking_global_pid int8, OUT blocking_pid int4, OUT blocking_node_id int4, OUT blocking_transaction_num int8, @@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( OUT blocking_transaction_waiting bool) RETURNS SETOF RECORD LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges(bool) +AS $$MODULE_PATHNAME$$, $$citus_internal_global_blocked_processes$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_global_blocked_processes() IS 'returns a global list of blocked backends originating from this node'; diff --git a/src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_global_blocked_processes/latest.sql similarity index 62% rename from src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql rename to src/backend/distributed/sql/udfs/citus_internal_global_blocked_processes/latest.sql index 32e25e217..510cdf93d 100644 --- a/src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_global_blocked_processes/latest.sql @@ -1,12 +1,10 @@ -DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE; -CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( - distributed_tx_only boolean DEFAULT true, - OUT waiting_global_pid int8, +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_global_blocked_processes( + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, OUT waiting_transaction_stamp timestamptz, - OUT blocking_global_pid int8, + OUT blocking_global_pid int8, OUT blocking_pid int4, OUT blocking_node_id int4, OUT blocking_transaction_num int8, @@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( OUT blocking_transaction_waiting bool) RETURNS SETOF RECORD LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges(bool) +AS $$MODULE_PATHNAME$$, $$citus_internal_global_blocked_processes$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_global_blocked_processes() IS 'returns a global list of blocked backends originating from this node'; diff --git a/src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_internal_local_blocked_processes/11.0-1.sql similarity index 67% rename from src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql rename to src/backend/distributed/sql/udfs/citus_internal_local_blocked_processes/11.0-1.sql index 190e4c0d5..3157a9aad 100644 --- a/src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_local_blocked_processes/11.0-1.sql @@ -1,7 +1,5 @@ -DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE; -CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( - distributed_tx_only boolean DEFAULT true, - OUT waiting_global_pid int8, +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_local_blocked_processes( + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, @@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( OUT blocking_transaction_waiting bool) RETURNS SETOF RECORD LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool) +AS $$MODULE_PATHNAME$$, $$citus_internal_local_blocked_processes$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_local_blocked_processes() IS 'returns all local lock wait chains, that start from any citus backend'; diff --git a/src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_local_blocked_processes/latest.sql similarity index 67% rename from src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql rename to src/backend/distributed/sql/udfs/citus_internal_local_blocked_processes/latest.sql index 190e4c0d5..3157a9aad 100644 --- a/src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_local_blocked_processes/latest.sql @@ -1,7 +1,5 @@ -DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE; -CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( - distributed_tx_only boolean DEFAULT true, - OUT waiting_global_pid int8, +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_local_blocked_processes( + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, @@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( OUT blocking_transaction_waiting bool) RETURNS SETOF RECORD LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool) +AS $$MODULE_PATHNAME$$, $$citus_internal_local_blocked_processes$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_local_blocked_processes() IS 'returns all local lock wait chains, that start from any citus backend'; diff --git a/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.0-1.sql new file mode 100644 index 000000000..64b89ec0e --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.0-1.sql @@ -0,0 +1,56 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[]) +RETURNS boolean AS $$ + DECLARE + mBlockedGlobalPid int8; + workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id'); + coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id'); + BEGIN + IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN + RETURN true; + END IF; + + -- pg says we're not blocked locally; check whether we're blocked globally. + -- Note that worker process may be blocked or waiting for a lock. So we need to + -- get transaction number for both of them. Following IF provides the transaction + -- number when the worker process waiting for other session. + IF EXISTS (SELECT 1 FROM get_global_active_transactions() + WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN + SELECT global_pid INTO mBlockedGlobalPid FROM get_global_active_transactions() + WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId; + ELSE + -- Check whether transactions initiated from the coordinator get locked + SELECT global_pid INTO mBlockedGlobalPid + FROM get_all_active_transactions() WHERE process_id = pBlockedPid; + END IF; + + RETURN EXISTS ( + SELECT 1 FROM citus_internal_global_blocked_processes() + WHERE waiting_global_pid = mBlockedGlobalPid + ) OR EXISTS ( + -- Check on the workers if any logical replication job spawned by the + -- current PID is blocked, by checking it's application name + -- Query is heavily based on: https://wiki.postgresql.org/wiki/Lock_Monitoring + SELECT result FROM run_command_on_workers($two$ + SELECT blocked_activity.application_name AS blocked_application + FROM pg_catalog.pg_locks blocked_locks + JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid + JOIN pg_catalog.pg_locks blocking_locks + ON blocking_locks.locktype = blocked_locks.locktype + AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE + AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation + AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page + AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple + AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid + AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid + AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid + AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid + AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid + AND blocking_locks.pid != blocked_locks.pid + JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid + WHERE NOT blocked_locks.GRANTED AND blocked_activity.application_name LIKE 'citus_shard_move_subscription_%' + $two$) where result='citus_shard_move_subscription_' || pBlockedPid); + + END; +$$ LANGUAGE plpgsql; + +REVOKE ALL ON FUNCTION citus_isolation_test_session_is_blocked(integer,integer[]) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql index 0b91cc37c..64b89ec0e 100644 --- a/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql @@ -1,7 +1,7 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[]) RETURNS boolean AS $$ DECLARE - mBlockedTransactionNum int8; + mBlockedGlobalPid int8; workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id'); coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id'); BEGIN @@ -13,19 +13,19 @@ RETURNS boolean AS $$ -- Note that worker process may be blocked or waiting for a lock. So we need to -- get transaction number for both of them. Following IF provides the transaction -- number when the worker process waiting for other session. - IF EXISTS (SELECT transaction_number FROM get_global_active_transactions() + IF EXISTS (SELECT 1 FROM get_global_active_transactions() WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN - SELECT transaction_number INTO mBlockedTransactionNum FROM get_global_active_transactions() + SELECT global_pid INTO mBlockedGlobalPid FROM get_global_active_transactions() WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId; ELSE -- Check whether transactions initiated from the coordinator get locked - SELECT transaction_number INTO mBlockedTransactionNum + SELECT global_pid INTO mBlockedGlobalPid FROM get_all_active_transactions() WHERE process_id = pBlockedPid; END IF; RETURN EXISTS ( - SELECT 1 FROM dump_global_wait_edges() - WHERE waiting_transaction_num = mBlockedTransactionNum + SELECT 1 FROM citus_internal_global_blocked_processes() + WHERE waiting_global_pid = mBlockedGlobalPid ) OR EXISTS ( -- Check on the workers if any logical replication job spawned by the -- current PID is blocked, by checking it's application name diff --git a/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql index 2ae40374a..779341657 100644 --- a/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql @@ -8,7 +8,7 @@ citus_dist_stat_activity AS ), unique_global_wait_edges AS ( - SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges() + SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM citus_internal_global_blocked_processes() ), citus_dist_stat_activity_with_node_id AS ( @@ -21,6 +21,8 @@ citus_dist_stat_activity_with_node_id AS citus_dist_stat_activity.distributed_query_host_port = pg_dist_node.nodeport ) SELECT + waiting.global_pid as waiting_gpid, + blocking.global_pid as blocking_gpid, waiting.pid AS waiting_pid, blocking.pid AS blocking_pid, waiting.query AS blocked_statement, @@ -34,9 +36,9 @@ SELECT FROM unique_global_wait_edges JOIN - citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_transaction_num = waiting.transaction_number AND unique_global_wait_edges.waiting_node_id = waiting.initiator_node_id) + citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_global_pid = waiting.global_pid) JOIN - citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_transaction_num = blocking.transaction_number AND unique_global_wait_edges.blocking_node_id = blocking.initiator_node_id); + citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_global_pid = blocking.global_pid); ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql b/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql index 2ae40374a..779341657 100644 --- a/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql @@ -8,7 +8,7 @@ citus_dist_stat_activity AS ), unique_global_wait_edges AS ( - SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges() + SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM citus_internal_global_blocked_processes() ), citus_dist_stat_activity_with_node_id AS ( @@ -21,6 +21,8 @@ citus_dist_stat_activity_with_node_id AS citus_dist_stat_activity.distributed_query_host_port = pg_dist_node.nodeport ) SELECT + waiting.global_pid as waiting_gpid, + blocking.global_pid as blocking_gpid, waiting.pid AS waiting_pid, blocking.pid AS blocking_pid, waiting.query AS blocked_statement, @@ -34,9 +36,9 @@ SELECT FROM unique_global_wait_edges JOIN - citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_transaction_num = waiting.transaction_number AND unique_global_wait_edges.waiting_node_id = waiting.initiator_node_id) + citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_global_pid = waiting.global_pid) JOIN - citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_transaction_num = blocking.transaction_number AND unique_global_wait_edges.blocking_node_id = blocking.initiator_node_id); + citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_global_pid = blocking.global_pid); ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 18f391d74..62b5e4e04 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -47,6 +47,9 @@ typedef struct PROCStack static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex); static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); +static void AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, + int rowIndex); +static void ReturnBlockedProcessGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); static WaitGraph * BuildLocalWaitGraph(bool onlyDistributedTx); static bool IsProcessWaitingForSafeOperations(PGPROC *proc); static void LockLockData(void); @@ -62,10 +65,30 @@ static void AddProcToVisit(PROCStack *remaining, PGPROC *proc); static bool IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc); static bool IsConflictingLockMask(int holdMask, int conflictMask); - +/* + * We almost have 2 sets of identical functions. The first set (e.g., dump_wait_edges) + * functions are intended for distributed deadlock detection purposes. + * + * The second set of functions (e.g., citus_internal_local_blocked_processes) are + * intended for citus_lock_waits view. + * + * The main difference is that the former functions only show processes that are blocked + * inside a distributed transaction (e.g., see AssignDistributedTransactionId()). + * The latter functions return a superset, where any blocked process is returned. + * + * We kept two different set of functions for two purposes. First, the deadlock detection + * is a performance critical code-path happening very frequently and we don't add any + * performance overhead. Secondly, to be able to do rolling upgrades, we cannot change + * the API of dump_global_wait_edges/dump_local_wait_edges such that they take a boolean + * parameter. If we do that, until all nodes are upgraded, the deadlock detection would fail, + * which is not acceptable. + */ PG_FUNCTION_INFO_V1(dump_local_wait_edges); PG_FUNCTION_INFO_V1(dump_global_wait_edges); +PG_FUNCTION_INFO_V1(citus_internal_local_blocked_processes); +PG_FUNCTION_INFO_V1(citus_internal_global_blocked_processes); + /* * dump_global_wait_edges returns global wait edges for distributed transactions @@ -74,7 +97,7 @@ PG_FUNCTION_INFO_V1(dump_global_wait_edges); Datum dump_global_wait_edges(PG_FUNCTION_ARGS) { - bool onlyDistributedTx = PG_GETARG_BOOL(0); + bool onlyDistributedTx = true; WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx); @@ -84,6 +107,23 @@ dump_global_wait_edges(PG_FUNCTION_ARGS) } +/* + * citus_internal_global_blocked_processes returns global wait edges + * including all processes running on the cluster. + */ +Datum +citus_internal_global_blocked_processes(PG_FUNCTION_ARGS) +{ + bool onlyDistributedTx = false; + + WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx); + + ReturnBlockedProcessGraph(waitGraph, fcinfo); + + return (Datum) 0; +} + + /* * BuildGlobalWaitGraph builds a wait graph for distributed transactions * that originate from this node, including edges from all (other) worker @@ -103,7 +143,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) List *connectionList = NIL; int32 localGroupId = GetLocalGroupId(); - /* deadlock detection is only interested in */ + /* deadlock detection is only interested in distributed transactions */ WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx); /* open connections in parallel */ @@ -134,11 +174,25 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) foreach_ptr(connection, connectionList) { StringInfo queryString = makeStringInfo(); - const char *onlyDistributedTxStr = onlyDistributedTx ? "true" : "false"; - appendStringInfo(queryString, - "SELECT * FROM dump_local_wait_edges(%s)", - onlyDistributedTxStr); + if (onlyDistributedTx) + { + appendStringInfo(queryString, + "SELECT waiting_pid, waiting_node_id, " + "waiting_transaction_num, waiting_transaction_stamp, " + "blocking_pid, blocking_node_id, blocking_transaction_num, " + "blocking_transaction_stamp, blocking_transaction_waiting " + "FROM dump_local_wait_edges()"); + } + else + { + appendStringInfo(queryString, + "SELECT waiting_global_pid, waiting_pid, " + "waiting_node_id, waiting_transaction_num, waiting_transaction_stamp, " + "blocking_global_pid,blocking_pid, blocking_node_id, " + "blocking_transaction_num, blocking_transaction_stamp, blocking_transaction_waiting " + "FROM citus_internal_local_blocked_processes()"); + } int querySent = SendRemoteCommand(connection, queryString->data); if (querySent == 0) @@ -162,16 +216,29 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) int64 rowCount = PQntuples(result); int64 colCount = PQnfields(result); - if (colCount != 11) + if (onlyDistributedTx && colCount != 9) { ereport(WARNING, (errmsg("unexpected number of columns from " "dump_local_wait_edges"))); continue; } + else if (!onlyDistributedTx && colCount != 11) + { + ereport(WARNING, (errmsg("unexpected number of columns from " + "citus_internal_local_blocked_processes"))); + continue; + } for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) { - AddWaitEdgeFromResult(waitGraph, result, rowIndex); + if (onlyDistributedTx) + { + AddWaitEdgeFromResult(waitGraph, result, rowIndex); + } + else + { + AddWaitEdgeFromBlockedProcessResult(waitGraph, result, rowIndex); + } } PQclear(result); @@ -191,6 +258,29 @@ AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) { WaitEdge *waitEdge = AllocWaitEdge(waitGraph); + waitEdge->waitingGPid = 0; /* not requested for deadlock detection */ + waitEdge->waitingPid = ParseIntField(result, rowIndex, 0); + waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1); + waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2); + waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3); + waitEdge->blockingGPid = 0; /* not requested for deadlock detection */ + waitEdge->blockingPid = ParseIntField(result, rowIndex, 4); + waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5); + waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6); + waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7); + waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8); +} + + +/* + * AddWaitEdgeFromBlockedProcessResult adds an edge to the wait graph that + * is read from a PGresult. + */ +static void +AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) +{ + WaitEdge *waitEdge = AllocWaitEdge(waitGraph); + waitEdge->waitingGPid = ParseIntField(result, rowIndex, 0); waitEdge->waitingPid = ParseIntField(result, rowIndex, 1); waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 2); @@ -272,7 +362,7 @@ ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex) Datum dump_local_wait_edges(PG_FUNCTION_ARGS) { - bool onlyDistributedTx = PG_GETARG_BOOL(0); + bool onlyDistributedTx = true; WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx); ReturnWaitGraph(waitGraph, fcinfo); @@ -281,6 +371,22 @@ dump_local_wait_edges(PG_FUNCTION_ARGS) } +/* + * citus_internal_local_blocked_processes returns global wait edges + * including all processes running on the node. + */ +Datum +citus_internal_local_blocked_processes(PG_FUNCTION_ARGS) +{ + bool onlyDistributedTx = false; + + WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx); + ReturnBlockedProcessGraph(waitGraph, fcinfo); + + return (Datum) 0; +} + + /* * ReturnWaitGraph returns a wait graph for a set returning function. */ @@ -290,6 +396,68 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) TupleDesc tupleDesc; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc); + /* + * Columns: + * 00: waiting_pid + * 01: waiting_node_id + * 02: waiting_transaction_num + * 03: waiting_transaction_stamp + * 04: blocking_pid + * 05: blocking__node_id + * 06: blocking_transaction_num + * 07: blocking_transaction_stamp + * 08: blocking_transaction_waiting + */ + for (size_t curEdgeNum = 0; curEdgeNum < waitGraph->edgeCount; curEdgeNum++) + { + Datum values[9]; + bool nulls[9]; + WaitEdge *curEdge = &waitGraph->edges[curEdgeNum]; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = Int32GetDatum(curEdge->waitingPid); + values[1] = Int32GetDatum(curEdge->waitingNodeId); + if (curEdge->waitingTransactionNum != 0) + { + values[2] = Int64GetDatum(curEdge->waitingTransactionNum); + values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp); + } + else + { + nulls[2] = true; + nulls[3] = true; + } + + values[4] = Int32GetDatum(curEdge->blockingPid); + values[5] = Int32GetDatum(curEdge->blockingNodeId); + if (curEdge->blockingTransactionNum != 0) + { + values[6] = Int64GetDatum(curEdge->blockingTransactionNum); + values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp); + } + else + { + nulls[6] = true; + nulls[7] = true; + } + values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting); + + tuplestore_putvalues(tupleStore, tupleDesc, values, nulls); + } +} + + +/* + * ReturnBlockedProcessGraph returns a wait graph for a set returning function. + */ +static void +ReturnBlockedProcessGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) +{ + TupleDesc tupleDesc; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc); + /* * Columns: * 00: waiting_global_pid diff --git a/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out b/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out index 27e13263d..cae2222ed 100644 --- a/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out +++ b/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out @@ -2,13 +2,13 @@ Parsed test spec with 4 sessions starting permutation: s1-begin s1-update-ref-table-from-coordinator s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit s2-commit-worker s2-stop-connection step s1-begin: - BEGIN; + BEGIN; step s1-update-ref-table-from-coordinator: - UPDATE ref_table SET value_1 = 15; + UPDATE ref_table SET value_1 = 15; step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -16,7 +16,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -24,20 +24,20 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-update-ref-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- UPDATE ref_table SET value_1 = 12 WHERE user_id = 1| - UPDATE ref_table SET value_1 = 15; + UPDATE ref_table SET value_1 = 15; |localhost |coordinator_host | 57638| 57636 (1 row) step s1-commit: - COMMIT; + COMMIT; step s2-update-ref-table: <... completed> run_commands_on_session_level_connection_to_node @@ -54,7 +54,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -69,7 +69,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -77,7 +77,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -85,7 +85,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-update-ref-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -93,7 +93,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -101,7 +101,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -109,10 +109,10 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-update-ref-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -142,7 +142,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -150,7 +150,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -165,7 +165,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-dist-table s2-start-session-level-connection s2-begin-on-worker s2-update-dist-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -173,7 +173,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -181,7 +181,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-update-dist-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -189,7 +189,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -197,7 +197,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -205,10 +205,10 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-update-dist-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -238,7 +238,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -246,7 +246,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -261,7 +261,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-delete-from-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -269,7 +269,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -277,7 +277,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-delete-from-ref-table: - SELECT run_commands_on_session_level_connection_to_node('DELETE FROM ref_table WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('DELETE FROM ref_table WHERE user_id = 1'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -285,7 +285,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -293,7 +293,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -301,10 +301,10 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-update-ref-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -334,7 +334,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -342,7 +342,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -357,7 +357,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-insert-into-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -365,7 +365,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -373,7 +373,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-insert-into-ref-table: - SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -381,7 +381,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -389,7 +389,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -397,10 +397,10 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-update-ref-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -430,7 +430,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -438,7 +438,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -453,7 +453,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-insert-into-ref-table s2-start-session-level-connection s2-begin-on-worker s2-insert-into-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -461,7 +461,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -469,7 +469,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-insert-into-ref-table: - SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -477,7 +477,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -485,7 +485,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -493,7 +493,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-insert-into-ref-table: - SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -501,7 +501,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement|current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -524,7 +524,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -532,7 +532,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -547,7 +547,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy-to-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -555,7 +555,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -563,7 +563,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-copy-to-ref-table: - SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); + SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -571,7 +571,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -579,7 +579,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -587,10 +587,10 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-update-ref-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -620,7 +620,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -628,7 +628,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -643,7 +643,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy-to-ref-table s2-start-session-level-connection s2-begin-on-worker s2-insert-into-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -651,7 +651,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -659,7 +659,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-copy-to-ref-table: - SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); + SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -667,7 +667,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -675,7 +675,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -683,7 +683,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-insert-into-ref-table: - SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -691,7 +691,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement|current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -714,7 +714,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -722,7 +722,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -737,7 +737,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy-to-ref-table s2-start-session-level-connection s2-begin-on-worker s2-copy-to-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -745,7 +745,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -753,7 +753,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-copy-to-ref-table: - SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); + SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -761,7 +761,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -769,7 +769,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -777,7 +777,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-copy-to-ref-table: - SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); + SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -785,7 +785,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement|current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -808,7 +808,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -816,7 +816,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -831,7 +831,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-select-for-update s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -839,7 +839,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -847,7 +847,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-select-for-update: - SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE'); + SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -855,7 +855,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -863,7 +863,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -871,10 +871,10 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-update-ref-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -904,7 +904,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -912,7 +912,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -927,7 +927,7 @@ restore_isolation_tester_func starting permutation: s2-start-session-level-connection s2-begin-on-worker s2-insert-into-ref-table s1-begin s1-alter-table s3-select-distributed-waiting-queries s2-commit-worker s1-commit s2-stop-connection step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -935,7 +935,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -943,7 +943,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-insert-into-ref-table: - SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -951,18 +951,18 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-begin: - BEGIN; + BEGIN; step s1-alter-table: - ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); + ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- - ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); + ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); |INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |localhost | 57636| 57638 (1 row) @@ -976,10 +976,10 @@ run_commands_on_session_level_connection_to_node step s1-alter-table: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -994,28 +994,28 @@ restore_isolation_tester_func starting permutation: s1-begin s1-update-on-the-coordinator s2-update-on-the-coordinator s3-select-distributed-waiting-queries s1-commit step s1-begin: - BEGIN; + BEGIN; step s1-update-on-the-coordinator: - UPDATE tt1 SET value_1 = 4; + UPDATE tt1 SET value_1 = 4; step s2-update-on-the-coordinator: - UPDATE tt1 SET value_1 = 4; + UPDATE tt1 SET value_1 = 4; step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- - UPDATE tt1 SET value_1 = 4; + UPDATE tt1 SET value_1 = 4; | - UPDATE tt1 SET value_1 = 4; + UPDATE tt1 SET value_1 = 4; |coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit: - COMMIT; + COMMIT; step s2-update-on-the-coordinator: <... completed> restore_isolation_tester_func @@ -1026,7 +1026,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-dist-table s4-start-session-level-connection s4-begin-on-worker s4-update-dist-table s3-select-distributed-waiting-queries s1-commit-worker s4-commit-worker s1-stop-connection s4-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -1034,7 +1034,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -1042,7 +1042,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-update-dist-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -1050,7 +1050,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s4-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -1058,7 +1058,7 @@ start_session_level_connection_to_node (1 row) step s4-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -1066,10 +1066,10 @@ run_commands_on_session_level_connection_to_node (1 row) step s4-update-dist-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -1099,7 +1099,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -1107,7 +1107,138 @@ stop_session_level_connection_to_node (1 row) step s4-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +restore_isolation_tester_func +--------------------------------------------------------------------- + +(1 row) + + +starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-dist-table-id-1 s2-start-session-level-connection s2-update-dist-table-id-1 s3-select-distributed-waiting-queries s1-commit-worker s1-stop-connection s2-stop-connection +step s1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s1-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s1-update-dist-table-id-1: + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-update-dist-table-id-1: + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1'); + +step s3-select-distributed-waiting-queries: + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + +blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +--------------------------------------------------------------------- +UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|localhost |localhost | 57638| 57637 +(1 row) + +step s1-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-update-dist-table-id-1: <... completed> +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s1-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +restore_isolation_tester_func +--------------------------------------------------------------------- + +(1 row) + + +starting permutation: s1-begin s1-update-ref-table-from-coordinator s2-start-session-level-connection s2-update-ref-table s3-select-distributed-waiting-queries s1-commit s2-stop-connection +step s1-begin: + BEGIN; + +step s1-update-ref-table-from-coordinator: + UPDATE ref_table SET value_1 = 15; + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-update-ref-table: + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + +step s3-select-distributed-waiting-queries: + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + +blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +--------------------------------------------------------------------- +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1| + UPDATE ref_table SET value_1 = 15; +|localhost |coordinator_host | 57638| 57636 +(1 row) + +step s1-commit: + COMMIT; + +step s2-update-ref-table: <... completed> +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- diff --git a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out index d89a71ece..6963e9122 100644 --- a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out +++ b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out @@ -96,7 +96,7 @@ step s1-commit: COMMIT; step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -148,7 +148,8 @@ step s1-move-placement-back: SET client_min_messages to NOTICE; SHOW log_error_verbosity; SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57638, 'localhost', 57637); - + +step s1-move-placement-back: <... completed> log_error_verbosity --------------------------------------------------------------------- verbose @@ -159,7 +160,7 @@ step s1-commit: COMMIT; step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- diff --git a/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out index e83e71919..2d49f8586 100644 --- a/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out @@ -101,13 +101,14 @@ step s2-view-worker: FROM citus_worker_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND - query NOT ILIKE '%dump_local_wait_edges%' + query NOT ILIKE '%dump_local_%' AND + query NOT ILIKE '%citus_internal_local_blocked_processes%' ORDER BY query, query_hostport DESC; query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- -UPDATE public.ref_table_1500767 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression -UPDATE public.ref_table_1500767 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression (2 rows) step s2-end: diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 2746d8ea4..e5cbcd994 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -428,20 +428,20 @@ SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDE ALTER EXTENSION citus UPDATE TO '9.4-2'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -469,20 +469,20 @@ SELECT * FROM multi_extension.print_extension_changes(); ALTER EXTENSION citus UPDATE TO '9.4-1'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -578,20 +578,20 @@ SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDE ALTER EXTENSION citus UPDATE TO '9.5-2'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -619,20 +619,20 @@ SELECT * FROM multi_extension.print_extension_changes(); ALTER EXTENSION citus UPDATE TO '9.5-1'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -1005,8 +1005,6 @@ SELECT * FROM multi_extension.print_extension_changes(); --------------------------------------------------------------------- function citus_disable_node(text,integer) void | function create_distributed_function(regprocedure,text,text) void | - function dump_global_wait_edges() SETOF record | - function dump_local_wait_edges() SETOF record | function master_append_table_to_shard(bigint,text,text,integer) real | function master_apply_delete_command(text) integer | function master_get_table_metadata(text) record | @@ -1014,24 +1012,18 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_check_connection_to_node(text,integer) boolean | function citus_disable_node(text,integer,boolean) void | function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) void + | function citus_internal_global_blocked_processes() SETOF record + | function citus_internal_local_blocked_processes() SETOF record | function citus_run_local_command(text) void | function citus_shard_indexes_on_worker() SETOF record | function citus_shards_on_worker() SETOF record | function create_distributed_function(regprocedure,text,text,boolean) void -<<<<<<< HEAD | function pg_cancel_backend(bigint) boolean | function pg_terminate_backend(bigint,bigint) boolean | function worker_create_or_replace_object(text[]) boolean | function worker_drop_sequence_dependency(text) void | function worker_drop_shell_table(text) void -(18 rows) -======= - | function dump_global_wait_edges(boolean) SETOF record - | function dump_local_wait_edges(boolean) SETOF record - | function worker_drop_sequence_dependency(text) void - | function worker_drop_shell_table(text) void -(19 rows) ->>>>>>> d4b956c7f (Use the optional APIs introduced for dump global/local wait edges) +(20 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 2071dca18..185bf19c5 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -74,6 +74,8 @@ ORDER BY 1; function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint) function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) function citus_internal_delete_shard_metadata(bigint) + function citus_internal_global_blocked_processes() + function citus_internal_local_blocked_processes() function citus_internal_update_placement_metadata(bigint,integer,integer) function citus_internal_update_relation_colocation(oid,integer) function citus_isolation_test_session_is_blocked(integer,integer[]) @@ -127,8 +129,8 @@ ORDER BY 1; function create_time_partitions(regclass,interval,timestamp with time zone,timestamp with time zone) function distributed_tables_colocated(regclass,regclass) function drop_old_time_partitions(regclass,timestamp with time zone) - function dump_global_wait_edges(boolean) - function dump_local_wait_edges(boolean) + function dump_global_wait_edges() + function dump_local_wait_edges() function fetch_intermediate_results(text[],text,integer) function fix_all_partition_shard_index_names() function fix_partition_shard_index_names(regclass) @@ -270,5 +272,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(254 rows) +(256 rows) diff --git a/src/test/regress/spec/isolation_get_distributed_wait_queries_mx.spec b/src/test/regress/spec/isolation_get_distributed_wait_queries_mx.spec index b43708ab1..84daaf792 100644 --- a/src/test/regress/spec/isolation_get_distributed_wait_queries_mx.spec +++ b/src/test/regress/spec/isolation_get_distributed_wait_queries_mx.spec @@ -73,6 +73,11 @@ step "s1-select-for-update" SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE'); } +step "s1-update-dist-table-id-1" +{ + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1'); +} + step "s1-commit-worker" { SELECT run_commands_on_session_level_connection_to_node('COMMIT'); @@ -115,6 +120,11 @@ step "s2-update-dist-table" SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); } +step "s2-update-dist-table-id-1" +{ + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1'); +} + step "s2-update-ref-table" { SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); @@ -149,7 +159,7 @@ session "s3" step "s3-select-distributed-waiting-queries" { - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; } // session s1 and s4 executes the commands on the same worker node @@ -196,3 +206,9 @@ permutation "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert- // blocked on the same node permutation "s1-begin" "s1-update-on-the-coordinator" "s2-update-on-the-coordinator" "s3-select-distributed-waiting-queries" "s1-commit" permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table" "s4-start-session-level-connection" "s4-begin-on-worker" "s4-update-dist-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s4-commit-worker" "s1-stop-connection" "s4-stop-connection" + + +// show that even if the commands are not in a transaction block +// we can find the blocking relationship +permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table-id-1" "s2-start-session-level-connection" "s2-update-dist-table-id-1" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s1-stop-connection" "s2-stop-connection" +permutation "s1-begin" "s1-update-ref-table-from-coordinator" "s2-start-session-level-connection" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit" "s2-stop-connection" diff --git a/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec b/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec index 0defcf549..fa2079ba5 100644 --- a/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec +++ b/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec @@ -93,7 +93,8 @@ step "s2-view-worker" FROM citus_worker_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND - query NOT ILIKE '%dump_local_wait_edges%' + query NOT ILIKE '%dump_local_%' AND + query NOT ILIKE '%citus_internal_local_blocked_processes%' ORDER BY query, query_hostport DESC; }