Merge pull request #5702 from citusdata/improve_citus_lock_waits

Improve citus lock waits to show non-tx blocked distributed processes as well
pull/5716/head
Önder Kalacı 2022-02-21 18:00:51 +01:00 committed by GitHub
commit 0dd2ddaa70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 924 additions and 236 deletions

View File

@ -17,8 +17,13 @@
#include "udfs/get_all_active_transactions/11.0-1.sql" #include "udfs/get_all_active_transactions/11.0-1.sql"
#include "udfs/get_global_active_transactions/11.0-1.sql" #include "udfs/get_global_active_transactions/11.0-1.sql"
#include "udfs/citus_internal_local_blocked_processes/11.0-1.sql"
#include "udfs/citus_internal_global_blocked_processes/11.0-1.sql"
#include "udfs/citus_worker_stat_activity/11.0-1.sql" #include "udfs/citus_worker_stat_activity/11.0-1.sql"
#include "udfs/worker_create_or_replace_object/11.0-1.sql" #include "udfs/worker_create_or_replace_object/11.0-1.sql"
#include "udfs/citus_isolation_test_session_is_blocked/11.0-1.sql"
#include "udfs/citus_blocking_pids/11.0-1.sql"
CREATE VIEW citus.citus_worker_stat_activity AS CREATE VIEW citus.citus_worker_stat_activity AS
SELECT * FROM pg_catalog.citus_worker_stat_activity(); SELECT * FROM pg_catalog.citus_worker_stat_activity();

View File

@ -113,6 +113,9 @@ CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int
RESET search_path; RESET search_path;
DROP FUNCTION citus_internal_local_blocked_processes CASCADE;
DROP FUNCTION citus_internal_global_blocked_processes CASCADE;
DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE; DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
@ -139,6 +142,162 @@ ALTER VIEW citus.citus_dist_stat_activity SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC; GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC;
SET search_path = 'pg_catalog'; SET search_path = 'pg_catalog';
DROP FUNCTION citus_worker_stat_activity CASCADE;
CREATE OR REPLACE FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text)
RETURNS SETOF RECORD
LANGUAGE C STRICT AS 'MODULE_PATHNAME',
$$citus_worker_stat_activity$$;
COMMENT ON FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text)
IS 'returns distributed transaction activity on shards of distributed tables';
DROP FUNCTION pg_catalog.worker_create_or_replace_object(text[]);
#include "../udfs/worker_create_or_replace_object/9.0-1.sql"
DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(bigint) CASCADE;
DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(bigint, bigint) CASCADE;
DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE;
CREATE FUNCTION pg_catalog.dump_local_wait_edges(
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges()
IS 'returns all local lock wait chains, that start from distributed transactions';
DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE;
CREATE FUNCTION pg_catalog.dump_global_wait_edges(
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE 'c' STRICT
AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges()
IS 'returns a global list of blocked transactions originating from this node';
DROP FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[]);
CREATE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[])
RETURNS boolean AS $$
DECLARE
mBlockedTransactionNum int8;
workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id');
coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id');
BEGIN
IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN
RETURN true;
END IF;
-- pg says we're not blocked locally; check whether we're blocked globally.
-- Note that worker process may be blocked or waiting for a lock. So we need to
-- get transaction number for both of them. Following IF provides the transaction
-- number when the worker process waiting for other session.
IF EXISTS (SELECT transaction_number FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN
SELECT transaction_number INTO mBlockedTransactionNum FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId;
ELSE
-- Check whether transactions initiated from the coordinator get locked
SELECT transaction_number INTO mBlockedTransactionNum
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
END IF;
RETURN EXISTS (
SELECT 1 FROM dump_global_wait_edges()
WHERE waiting_transaction_num = mBlockedTransactionNum
) OR EXISTS (
-- Check on the workers if any logical replication job spawned by the
-- current PID is blocked, by checking it's application name
-- Query is heavily based on: https://wiki.postgresql.org/wiki/Lock_Monitoring
SELECT result FROM run_command_on_workers($two$
SELECT blocked_activity.application_name AS blocked_application
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks
ON blocking_locks.locktype = blocked_locks.locktype
AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE
AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.GRANTED AND blocked_activity.application_name LIKE 'citus_shard_move_subscription_%'
$two$) where result='citus_shard_move_subscription_' || pBlockedPid);
END;
$$ LANGUAGE plpgsql;
REVOKE ALL ON FUNCTION citus_isolation_test_session_is_blocked(integer,integer[]) FROM PUBLIC;
DROP FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer);
CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer)
RETURNS int4[] AS $$
DECLARE
mLocalBlockingPids int4[];
mRemoteBlockingPids int4[];
mLocalTransactionNum int8;
BEGIN
SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids;
IF (array_length(mLocalBlockingPids, 1) > 0) THEN
RETURN mLocalBlockingPids;
END IF;
-- pg says we're not blocked locally; check whether we're blocked globally.
SELECT transaction_number INTO mLocalTransactionNum
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
SELECT array_agg(process_id) INTO mRemoteBlockingPids FROM (
WITH activeTransactions AS (
SELECT process_id, transaction_number FROM get_all_active_transactions()
), blockingTransactions AS (
SELECT blocking_transaction_num AS txn_num FROM dump_global_wait_edges()
WHERE waiting_transaction_num = mLocalTransactionNum
)
SELECT activeTransactions.process_id FROM activeTransactions, blockingTransactions
WHERE activeTransactions.transaction_number = blockingTransactions.txn_num
) AS sub;
RETURN mRemoteBlockingPids;
END;
$$ LANGUAGE plpgsql;
REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC;
CREATE VIEW citus.citus_worker_stat_activity AS
SELECT * FROM pg_catalog.citus_worker_stat_activity();
ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC;
-- we have to recreate this view because we drop citus_dist_stat_activity that this view depends -- we have to recreate this view because we drop citus_dist_stat_activity that this view depends
CREATE VIEW citus.citus_lock_waits AS CREATE VIEW citus.citus_lock_waits AS
@ -183,35 +342,4 @@ JOIN
ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC;
DROP FUNCTION citus_worker_stat_activity CASCADE;
CREATE OR REPLACE FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text)
RETURNS SETOF RECORD
LANGUAGE C STRICT AS 'MODULE_PATHNAME',
$$citus_worker_stat_activity$$;
COMMENT ON FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text)
IS 'returns distributed transaction activity on shards of distributed tables';
CREATE VIEW citus.citus_worker_stat_activity AS
SELECT * FROM pg_catalog.citus_worker_stat_activity();
ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC;
DROP FUNCTION pg_catalog.worker_create_or_replace_object(text[]);
#include "../udfs/worker_create_or_replace_object/9.0-1.sql"
RESET search_path; RESET search_path;
DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(bigint) CASCADE;
DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(bigint, bigint) CASCADE;

View File

@ -0,0 +1,34 @@
DROP FUNCTION pg_catalog.citus_blocking_pids;
CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer)
RETURNS int4[] AS $$
DECLARE
mLocalBlockingPids int4[];
mRemoteBlockingPids int4[];
mLocalGlobalPid int8;
BEGIN
SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids;
IF (array_length(mLocalBlockingPids, 1) > 0) THEN
RETURN mLocalBlockingPids;
END IF;
-- pg says we're not blocked locally; check whether we're blocked globally.
SELECT global_pid INTO mLocalGlobalPid
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
SELECT array_agg(global_pid) INTO mRemoteBlockingPids FROM (
WITH activeTransactions AS (
SELECT global_pid FROM get_all_active_transactions()
), blockingTransactions AS (
SELECT blocking_global_pid FROM citus_internal_global_blocked_processes()
WHERE waiting_global_pid = mLocalGlobalPid
)
SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions
WHERE activeTransactions.global_pid = blockingTransactions.blocking_global_pid
) AS sub;
RETURN mRemoteBlockingPids;
END;
$$ LANGUAGE plpgsql;
REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC;

View File

@ -0,0 +1,34 @@
DROP FUNCTION pg_catalog.citus_blocking_pids;
CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer)
RETURNS int4[] AS $$
DECLARE
mLocalBlockingPids int4[];
mRemoteBlockingPids int4[];
mLocalGlobalPid int8;
BEGIN
SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids;
IF (array_length(mLocalBlockingPids, 1) > 0) THEN
RETURN mLocalBlockingPids;
END IF;
-- pg says we're not blocked locally; check whether we're blocked globally.
SELECT global_pid INTO mLocalGlobalPid
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
SELECT array_agg(global_pid) INTO mRemoteBlockingPids FROM (
WITH activeTransactions AS (
SELECT global_pid FROM get_all_active_transactions()
), blockingTransactions AS (
SELECT blocking_global_pid FROM citus_internal_global_blocked_processes()
WHERE waiting_global_pid = mLocalGlobalPid
)
SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions
WHERE activeTransactions.global_pid = blockingTransactions.blocking_global_pid
) AS sub;
RETURN mRemoteBlockingPids;
END;
$$ LANGUAGE plpgsql;
REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC;

View File

@ -0,0 +1,17 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_global_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_global_blocked_processes$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_global_blocked_processes()
IS 'returns a global list of blocked backends originating from this node';

View File

@ -0,0 +1,17 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_global_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_global_blocked_processes$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_global_blocked_processes()
IS 'returns a global list of blocked backends originating from this node';

View File

@ -0,0 +1,17 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_local_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_local_blocked_processes$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_local_blocked_processes()
IS 'returns all local lock wait chains, that start from any citus backend';

View File

@ -0,0 +1,17 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_local_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_local_blocked_processes$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_local_blocked_processes()
IS 'returns all local lock wait chains, that start from any citus backend';

View File

@ -0,0 +1,56 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[])
RETURNS boolean AS $$
DECLARE
mBlockedGlobalPid int8;
workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id');
coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id');
BEGIN
IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN
RETURN true;
END IF;
-- pg says we're not blocked locally; check whether we're blocked globally.
-- Note that worker process may be blocked or waiting for a lock. So we need to
-- get transaction number for both of them. Following IF provides the transaction
-- number when the worker process waiting for other session.
IF EXISTS (SELECT 1 FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN
SELECT global_pid INTO mBlockedGlobalPid FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId;
ELSE
-- Check whether transactions initiated from the coordinator get locked
SELECT global_pid INTO mBlockedGlobalPid
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
END IF;
RETURN EXISTS (
SELECT 1 FROM citus_internal_global_blocked_processes()
WHERE waiting_global_pid = mBlockedGlobalPid
) OR EXISTS (
-- Check on the workers if any logical replication job spawned by the
-- current PID is blocked, by checking it's application name
-- Query is heavily based on: https://wiki.postgresql.org/wiki/Lock_Monitoring
SELECT result FROM run_command_on_workers($two$
SELECT blocked_activity.application_name AS blocked_application
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks
ON blocking_locks.locktype = blocked_locks.locktype
AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE
AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.GRANTED AND blocked_activity.application_name LIKE 'citus_shard_move_subscription_%'
$two$) where result='citus_shard_move_subscription_' || pBlockedPid);
END;
$$ LANGUAGE plpgsql;
REVOKE ALL ON FUNCTION citus_isolation_test_session_is_blocked(integer,integer[]) FROM PUBLIC;

View File

@ -1,7 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[]) CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[])
RETURNS boolean AS $$ RETURNS boolean AS $$
DECLARE DECLARE
mBlockedTransactionNum int8; mBlockedGlobalPid int8;
workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id'); workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id');
coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id'); coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id');
BEGIN BEGIN
@ -13,19 +13,19 @@ RETURNS boolean AS $$
-- Note that worker process may be blocked or waiting for a lock. So we need to -- Note that worker process may be blocked or waiting for a lock. So we need to
-- get transaction number for both of them. Following IF provides the transaction -- get transaction number for both of them. Following IF provides the transaction
-- number when the worker process waiting for other session. -- number when the worker process waiting for other session.
IF EXISTS (SELECT transaction_number FROM get_global_active_transactions() IF EXISTS (SELECT 1 FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN
SELECT transaction_number INTO mBlockedTransactionNum FROM get_global_active_transactions() SELECT global_pid INTO mBlockedGlobalPid FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId; WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId;
ELSE ELSE
-- Check whether transactions initiated from the coordinator get locked -- Check whether transactions initiated from the coordinator get locked
SELECT transaction_number INTO mBlockedTransactionNum SELECT global_pid INTO mBlockedGlobalPid
FROM get_all_active_transactions() WHERE process_id = pBlockedPid; FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
END IF; END IF;
RETURN EXISTS ( RETURN EXISTS (
SELECT 1 FROM dump_global_wait_edges() SELECT 1 FROM citus_internal_global_blocked_processes()
WHERE waiting_transaction_num = mBlockedTransactionNum WHERE waiting_global_pid = mBlockedGlobalPid
) OR EXISTS ( ) OR EXISTS (
-- Check on the workers if any logical replication job spawned by the -- Check on the workers if any logical replication job spawned by the
-- current PID is blocked, by checking it's application name -- current PID is blocked, by checking it's application name

View File

@ -8,7 +8,7 @@ citus_dist_stat_activity AS
), ),
unique_global_wait_edges AS unique_global_wait_edges AS
( (
SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges() SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM citus_internal_global_blocked_processes()
), ),
citus_dist_stat_activity_with_node_id AS citus_dist_stat_activity_with_node_id AS
( (
@ -21,6 +21,8 @@ citus_dist_stat_activity_with_node_id AS
citus_dist_stat_activity.distributed_query_host_port = pg_dist_node.nodeport citus_dist_stat_activity.distributed_query_host_port = pg_dist_node.nodeport
) )
SELECT SELECT
waiting.global_pid as waiting_gpid,
blocking.global_pid as blocking_gpid,
waiting.pid AS waiting_pid, waiting.pid AS waiting_pid,
blocking.pid AS blocking_pid, blocking.pid AS blocking_pid,
waiting.query AS blocked_statement, waiting.query AS blocked_statement,
@ -34,9 +36,9 @@ SELECT
FROM FROM
unique_global_wait_edges unique_global_wait_edges
JOIN JOIN
citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_transaction_num = waiting.transaction_number AND unique_global_wait_edges.waiting_node_id = waiting.initiator_node_id) citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_global_pid = waiting.global_pid)
JOIN JOIN
citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_transaction_num = blocking.transaction_number AND unique_global_wait_edges.blocking_node_id = blocking.initiator_node_id); citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_global_pid = blocking.global_pid);
ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC;

View File

@ -8,7 +8,7 @@ citus_dist_stat_activity AS
), ),
unique_global_wait_edges AS unique_global_wait_edges AS
( (
SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges() SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM citus_internal_global_blocked_processes()
), ),
citus_dist_stat_activity_with_node_id AS citus_dist_stat_activity_with_node_id AS
( (
@ -21,6 +21,8 @@ citus_dist_stat_activity_with_node_id AS
citus_dist_stat_activity.distributed_query_host_port = pg_dist_node.nodeport citus_dist_stat_activity.distributed_query_host_port = pg_dist_node.nodeport
) )
SELECT SELECT
waiting.global_pid as waiting_gpid,
blocking.global_pid as blocking_gpid,
waiting.pid AS waiting_pid, waiting.pid AS waiting_pid,
blocking.pid AS blocking_pid, blocking.pid AS blocking_pid,
waiting.query AS blocked_statement, waiting.query AS blocked_statement,
@ -34,9 +36,9 @@ SELECT
FROM FROM
unique_global_wait_edges unique_global_wait_edges
JOIN JOIN
citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_transaction_num = waiting.transaction_number AND unique_global_wait_edges.waiting_node_id = waiting.initiator_node_id) citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_global_pid = waiting.global_pid)
JOIN JOIN
citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_transaction_num = blocking.transaction_number AND unique_global_wait_edges.blocking_node_id = blocking.initiator_node_id); citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_global_pid = blocking.global_pid);
ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC;

View File

@ -50,7 +50,10 @@ get_adjacency_list_wait_graph(PG_FUNCTION_ARGS)
bool isNulls[2]; bool isNulls[2];
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
WaitGraph *waitGraph = BuildGlobalWaitGraph();
/* distributed deadlock detection only considers distributed txs */
bool onlyDistributedTx = true;
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
HTAB *adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph); HTAB *adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph);
/* iterate on all nodes */ /* iterate on all nodes */

View File

@ -119,7 +119,9 @@ CheckForDistributedDeadlocks(void)
return false; return false;
} }
WaitGraph *waitGraph = BuildGlobalWaitGraph(); /* distributed deadlock detection only considers distributed txs */
bool onlyDistributedTx = true;
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
HTAB *adjacencyLists = BuildAdjacencyListsForWaitGraph(waitGraph); HTAB *adjacencyLists = BuildAdjacencyListsForWaitGraph(waitGraph);
int edgeCount = waitGraph->edgeCount; int edgeCount = waitGraph->edgeCount;

View File

@ -47,7 +47,10 @@ typedef struct PROCStack
static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex); static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex);
static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
static WaitGraph * BuildLocalWaitGraph(void); static void AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result,
int rowIndex);
static void ReturnBlockedProcessGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
static WaitGraph * BuildLocalWaitGraph(bool onlyDistributedTx);
static bool IsProcessWaitingForSafeOperations(PGPROC *proc); static bool IsProcessWaitingForSafeOperations(PGPROC *proc);
static void LockLockData(void); static void LockLockData(void);
static void UnlockLockData(void); static void UnlockLockData(void);
@ -62,10 +65,30 @@ static void AddProcToVisit(PROCStack *remaining, PGPROC *proc);
static bool IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc); static bool IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc);
static bool IsConflictingLockMask(int holdMask, int conflictMask); static bool IsConflictingLockMask(int holdMask, int conflictMask);
/*
* We almost have 2 sets of identical functions. The first set (e.g., dump_wait_edges)
* functions are intended for distributed deadlock detection purposes.
*
* The second set of functions (e.g., citus_internal_local_blocked_processes) are
* intended for citus_lock_waits view.
*
* The main difference is that the former functions only show processes that are blocked
* inside a distributed transaction (e.g., see AssignDistributedTransactionId()).
* The latter functions return a superset, where any blocked process is returned.
*
* We kept two different set of functions for two purposes. First, the deadlock detection
* is a performance critical code-path happening very frequently and we don't add any
* performance overhead. Secondly, to be able to do rolling upgrades, we cannot change
* the API of dump_global_wait_edges/dump_local_wait_edges such that they take a boolean
* parameter. If we do that, until all nodes are upgraded, the deadlock detection would fail,
* which is not acceptable.
*/
PG_FUNCTION_INFO_V1(dump_local_wait_edges); PG_FUNCTION_INFO_V1(dump_local_wait_edges);
PG_FUNCTION_INFO_V1(dump_global_wait_edges); PG_FUNCTION_INFO_V1(dump_global_wait_edges);
PG_FUNCTION_INFO_V1(citus_internal_local_blocked_processes);
PG_FUNCTION_INFO_V1(citus_internal_global_blocked_processes);
/* /*
* dump_global_wait_edges returns global wait edges for distributed transactions * dump_global_wait_edges returns global wait edges for distributed transactions
@ -74,7 +97,9 @@ PG_FUNCTION_INFO_V1(dump_global_wait_edges);
Datum Datum
dump_global_wait_edges(PG_FUNCTION_ARGS) dump_global_wait_edges(PG_FUNCTION_ARGS)
{ {
WaitGraph *waitGraph = BuildGlobalWaitGraph(); bool onlyDistributedTx = true;
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
ReturnWaitGraph(waitGraph, fcinfo); ReturnWaitGraph(waitGraph, fcinfo);
@ -82,20 +107,44 @@ dump_global_wait_edges(PG_FUNCTION_ARGS)
} }
/*
* citus_internal_global_blocked_processes returns global wait edges
* including all processes running on the cluster.
*/
Datum
citus_internal_global_blocked_processes(PG_FUNCTION_ARGS)
{
bool onlyDistributedTx = false;
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
ReturnBlockedProcessGraph(waitGraph, fcinfo);
return (Datum) 0;
}
/* /*
* BuildGlobalWaitGraph builds a wait graph for distributed transactions * BuildGlobalWaitGraph builds a wait graph for distributed transactions
* that originate from this node, including edges from all (other) worker * that originate from this node, including edges from all (other) worker
* nodes. * nodes.
*
*
* If onlyDistributedTx is true, we only return distributed transactions
* (e.g., AssignDistributedTransaction() or assign_distributed_transactions())
* has been called for the process. Distributed deadlock detection only
* interested in these processes.
*/ */
WaitGraph * WaitGraph *
BuildGlobalWaitGraph(void) BuildGlobalWaitGraph(bool onlyDistributedTx)
{ {
List *workerNodeList = ActiveReadableNodeList(); List *workerNodeList = ActiveReadableNodeList();
char *nodeUser = CitusExtensionOwnerName(); char *nodeUser = CitusExtensionOwnerName();
List *connectionList = NIL; List *connectionList = NIL;
int32 localGroupId = GetLocalGroupId(); int32 localGroupId = GetLocalGroupId();
WaitGraph *waitGraph = BuildLocalWaitGraph(); /* deadlock detection is only interested in distributed transactions */
WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx);
/* open connections in parallel */ /* open connections in parallel */
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
@ -124,9 +173,28 @@ BuildGlobalWaitGraph(void)
MultiConnection *connection = NULL; MultiConnection *connection = NULL;
foreach_ptr(connection, connectionList) foreach_ptr(connection, connectionList)
{ {
const char *command = "SELECT * FROM dump_local_wait_edges()"; StringInfo queryString = makeStringInfo();
int querySent = SendRemoteCommand(connection, command); if (onlyDistributedTx)
{
appendStringInfo(queryString,
"SELECT waiting_pid, waiting_node_id, "
"waiting_transaction_num, waiting_transaction_stamp, "
"blocking_pid, blocking_node_id, blocking_transaction_num, "
"blocking_transaction_stamp, blocking_transaction_waiting "
"FROM dump_local_wait_edges()");
}
else
{
appendStringInfo(queryString,
"SELECT waiting_global_pid, waiting_pid, "
"waiting_node_id, waiting_transaction_num, waiting_transaction_stamp, "
"blocking_global_pid,blocking_pid, blocking_node_id, "
"blocking_transaction_num, blocking_transaction_stamp, blocking_transaction_waiting "
"FROM citus_internal_local_blocked_processes()");
}
int querySent = SendRemoteCommand(connection, queryString->data);
if (querySent == 0) if (querySent == 0)
{ {
ReportConnectionError(connection, WARNING); ReportConnectionError(connection, WARNING);
@ -148,17 +216,30 @@ BuildGlobalWaitGraph(void)
int64 rowCount = PQntuples(result); int64 rowCount = PQntuples(result);
int64 colCount = PQnfields(result); int64 colCount = PQnfields(result);
if (colCount != 9) if (onlyDistributedTx && colCount != 9)
{ {
ereport(WARNING, (errmsg("unexpected number of columns from " ereport(WARNING, (errmsg("unexpected number of columns from "
"dump_local_wait_edges"))); "dump_local_wait_edges")));
continue; continue;
} }
else if (!onlyDistributedTx && colCount != 11)
{
ereport(WARNING, (errmsg("unexpected number of columns from "
"citus_internal_local_blocked_processes")));
continue;
}
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
if (onlyDistributedTx)
{ {
AddWaitEdgeFromResult(waitGraph, result, rowIndex); AddWaitEdgeFromResult(waitGraph, result, rowIndex);
} }
else
{
AddWaitEdgeFromBlockedProcessResult(waitGraph, result, rowIndex);
}
}
PQclear(result); PQclear(result);
ForgetResults(connection); ForgetResults(connection);
@ -177,10 +258,12 @@ AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex)
{ {
WaitEdge *waitEdge = AllocWaitEdge(waitGraph); WaitEdge *waitEdge = AllocWaitEdge(waitGraph);
waitEdge->waitingGPid = 0; /* not requested for deadlock detection */
waitEdge->waitingPid = ParseIntField(result, rowIndex, 0); waitEdge->waitingPid = ParseIntField(result, rowIndex, 0);
waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1); waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1);
waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2); waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2);
waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3); waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3);
waitEdge->blockingGPid = 0; /* not requested for deadlock detection */
waitEdge->blockingPid = ParseIntField(result, rowIndex, 4); waitEdge->blockingPid = ParseIntField(result, rowIndex, 4);
waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5); waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5);
waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6); waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6);
@ -189,6 +272,29 @@ AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex)
} }
/*
* AddWaitEdgeFromBlockedProcessResult adds an edge to the wait graph that
* is read from a PGresult.
*/
static void
AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, int rowIndex)
{
WaitEdge *waitEdge = AllocWaitEdge(waitGraph);
waitEdge->waitingGPid = ParseIntField(result, rowIndex, 0);
waitEdge->waitingPid = ParseIntField(result, rowIndex, 1);
waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 2);
waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 3);
waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 4);
waitEdge->blockingGPid = ParseIntField(result, rowIndex, 5);
waitEdge->blockingPid = ParseIntField(result, rowIndex, 6);
waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 7);
waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 8);
waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 9);
waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 10);
}
/* /*
* ParseIntField parses a int64 from a remote result or returns 0 if the * ParseIntField parses a int64 from a remote result or returns 0 if the
* result is NULL. * result is NULL.
@ -256,13 +362,31 @@ ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex)
Datum Datum
dump_local_wait_edges(PG_FUNCTION_ARGS) dump_local_wait_edges(PG_FUNCTION_ARGS)
{ {
WaitGraph *waitGraph = BuildLocalWaitGraph(); bool onlyDistributedTx = true;
WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx);
ReturnWaitGraph(waitGraph, fcinfo); ReturnWaitGraph(waitGraph, fcinfo);
return (Datum) 0; return (Datum) 0;
} }
/*
* citus_internal_local_blocked_processes returns global wait edges
* including all processes running on the node.
*/
Datum
citus_internal_local_blocked_processes(PG_FUNCTION_ARGS)
{
bool onlyDistributedTx = false;
WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx);
ReturnBlockedProcessGraph(waitGraph, fcinfo);
return (Datum) 0;
}
/* /*
* ReturnWaitGraph returns a wait graph for a set returning function. * ReturnWaitGraph returns a wait graph for a set returning function.
*/ */
@ -325,12 +449,83 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
} }
/*
* ReturnBlockedProcessGraph returns a wait graph for a set returning function.
*/
static void
ReturnBlockedProcessGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
{
TupleDesc tupleDesc;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc);
/*
* Columns:
* 00: waiting_global_pid
* 01: waiting_pid
* 02: waiting_node_id
* 03: waiting_transaction_num
* 04: waiting_transaction_stamp
* 05: blocking_global_pid
* 06: blocking_pid
* 07: blocking__node_id
* 08: blocking_transaction_num
* 09: blocking_transaction_stamp
* 10: blocking_transaction_waiting
*/
for (size_t curEdgeNum = 0; curEdgeNum < waitGraph->edgeCount; curEdgeNum++)
{
Datum values[11];
bool nulls[11];
WaitEdge *curEdge = &waitGraph->edges[curEdgeNum];
memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
values[0] = UInt64GetDatum(curEdge->waitingGPid);
values[1] = Int32GetDatum(curEdge->waitingPid);
values[2] = Int32GetDatum(curEdge->waitingNodeId);
if (curEdge->waitingTransactionNum != 0)
{
values[3] = Int64GetDatum(curEdge->waitingTransactionNum);
values[4] = TimestampTzGetDatum(curEdge->waitingTransactionStamp);
}
else
{
nulls[3] = true;
nulls[4] = true;
}
values[5] = UInt64GetDatum(curEdge->blockingGPid);
values[6] = Int32GetDatum(curEdge->blockingPid);
values[7] = Int32GetDatum(curEdge->blockingNodeId);
if (curEdge->blockingTransactionNum != 0)
{
values[8] = Int64GetDatum(curEdge->blockingTransactionNum);
values[9] = TimestampTzGetDatum(curEdge->blockingTransactionStamp);
}
else
{
nulls[8] = true;
nulls[9] = true;
}
values[10] = BoolGetDatum(curEdge->isBlockingXactWaiting);
tuplestore_putvalues(tupleStore, tupleDesc, values, nulls);
}
}
/* /*
* BuildLocalWaitGraph builds a wait graph for distributed transactions * BuildLocalWaitGraph builds a wait graph for distributed transactions
* that originate from the local node. * that originate from the local node.
*
* If onlyDistributedTx is true, we only return distributed transactions
* (e.g., AssignDistributedTransaction() or assign_distributed_transactions())
* has been called for the process. Distributed deadlock detection only
* interested in these processes.
*/ */
static WaitGraph * static WaitGraph *
BuildLocalWaitGraph(void) BuildLocalWaitGraph(bool onlyDistributedTx)
{ {
PROCStack remaining; PROCStack remaining;
int totalProcs = TotalProcCount(); int totalProcs = TotalProcCount();
@ -379,7 +574,8 @@ BuildLocalWaitGraph(void)
* care about distributed transactions for the purpose of distributed * care about distributed transactions for the purpose of distributed
* deadlock detection. * deadlock detection.
*/ */
if (!IsInDistributedTransaction(&currentBackendData)) if (onlyDistributedTx &&
!IsInDistributedTransaction(&currentBackendData))
{ {
continue; continue;
} }
@ -627,6 +823,7 @@ AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
} }
curEdge->waitingPid = waitingProc->pid; curEdge->waitingPid = waitingProc->pid;
curEdge->waitingGPid = waitingBackendData.globalPID;
if (IsInDistributedTransaction(&waitingBackendData)) if (IsInDistributedTransaction(&waitingBackendData))
{ {
@ -645,6 +842,7 @@ AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
} }
curEdge->blockingPid = blockingProc->pid; curEdge->blockingPid = blockingProc->pid;
curEdge->blockingGPid = blockingBackendData.globalPID;
if (IsInDistributedTransaction(&blockingBackendData)) if (IsInDistributedTransaction(&blockingBackendData))
{ {

View File

@ -31,11 +31,13 @@
*/ */
typedef struct WaitEdge typedef struct WaitEdge
{ {
uint64 waitingGPid;
int waitingPid; int waitingPid;
int waitingNodeId; int waitingNodeId;
int64 waitingTransactionNum; int64 waitingTransactionNum;
TimestampTz waitingTransactionStamp; TimestampTz waitingTransactionStamp;
uint64 blockingGPid;
int blockingPid; int blockingPid;
int blockingNodeId; int blockingNodeId;
int64 blockingTransactionNum; int64 blockingTransactionNum;
@ -58,7 +60,7 @@ typedef struct WaitGraph
} WaitGraph; } WaitGraph;
extern WaitGraph * BuildGlobalWaitGraph(void); extern WaitGraph * BuildGlobalWaitGraph(bool onlyDistributedTx);
extern bool IsProcessWaitingForLock(PGPROC *proc); extern bool IsProcessWaitingForLock(PGPROC *proc);
extern bool IsInDistributedTransaction(BackendData *backendData); extern bool IsInDistributedTransaction(BackendData *backendData);
extern TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex); extern TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex);

View File

@ -27,7 +27,7 @@ step s2-update-ref-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
<waiting ...> <waiting ...>
step s3-select-distributed-waiting-queries: step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -112,7 +112,7 @@ step s2-update-ref-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
<waiting ...> <waiting ...>
step s3-select-distributed-waiting-queries: step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -208,7 +208,7 @@ step s2-update-dist-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5');
<waiting ...> <waiting ...>
step s3-select-distributed-waiting-queries: step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -304,7 +304,7 @@ step s2-update-ref-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
<waiting ...> <waiting ...>
step s3-select-distributed-waiting-queries: step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -400,7 +400,7 @@ step s2-update-ref-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
<waiting ...> <waiting ...>
step s3-select-distributed-waiting-queries: step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -501,7 +501,7 @@ run_commands_on_session_level_connection_to_node
(1 row) (1 row)
step s3-select-distributed-waiting-queries: step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement|current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement|current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -590,7 +590,7 @@ step s2-update-ref-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
<waiting ...> <waiting ...>
step s3-select-distributed-waiting-queries: step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -691,7 +691,7 @@ run_commands_on_session_level_connection_to_node
(1 row) (1 row)
step s3-select-distributed-waiting-queries: step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement|current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement|current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -785,7 +785,7 @@ run_commands_on_session_level_connection_to_node
(1 row) (1 row)
step s3-select-distributed-waiting-queries: step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement|current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement|current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -874,7 +874,7 @@ step s2-update-ref-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
<waiting ...> <waiting ...>
step s3-select-distributed-waiting-queries: step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -957,7 +957,7 @@ step s1-alter-table:
ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id);
<waiting ...> <waiting ...>
step s3-select-distributed-waiting-queries: step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1003,7 +1003,7 @@ step s2-update-on-the-coordinator:
UPDATE tt1 SET value_1 = 4; UPDATE tt1 SET value_1 = 4;
<waiting ...> <waiting ...>
step s3-select-distributed-waiting-queries: step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1069,7 +1069,7 @@ step s4-update-dist-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5');
<waiting ...> <waiting ...>
step s3-select-distributed-waiting-queries: step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1119,3 +1119,134 @@ restore_isolation_tester_func
(1 row) (1 row)
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-dist-table-id-1 s2-start-session-level-connection s2-update-dist-table-id-1 s3-select-distributed-waiting-queries s1-commit-worker s1-stop-connection s2-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s1-update-dist-table-id-1:
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-update-dist-table-id-1:
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1');
<waiting ...>
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
---------------------------------------------------------------------
UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|localhost |localhost | 57638| 57637
(1 row)
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-update-dist-table-id-1: <... completed>
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-update-ref-table-from-coordinator s2-start-session-level-connection s2-update-ref-table s3-select-distributed-waiting-queries s1-commit s2-stop-connection
step s1-begin:
BEGIN;
step s1-update-ref-table-from-coordinator:
UPDATE ref_table SET value_1 = 15;
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-update-ref-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
<waiting ...>
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
---------------------------------------------------------------------
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|
UPDATE ref_table SET value_1 = 15;
|localhost |coordinator_host | 57638| 57636
(1 row)
step s1-commit:
COMMIT;
step s2-update-ref-table: <... completed>
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)

View File

@ -148,7 +148,8 @@ step s1-move-placement-back:
SET client_min_messages to NOTICE; SET client_min_messages to NOTICE;
SHOW log_error_verbosity; SHOW log_error_verbosity;
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57638, 'localhost', 57637); SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57638, 'localhost', 57637);
<waiting ...>
step s1-move-placement-back: <... completed>
log_error_verbosity log_error_verbosity
--------------------------------------------------------------------- ---------------------------------------------------------------------
verbose verbose

View File

@ -101,13 +101,14 @@ step s2-view-worker:
FROM citus_worker_stat_activity FROM citus_worker_stat_activity
WHERE query NOT ILIKE '%pg_prepared_xacts%' AND WHERE query NOT ILIKE '%pg_prepared_xacts%' AND
query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%COMMIT%' AND
query NOT ILIKE '%dump_local_wait_edges%' query NOT ILIKE '%dump_local_%' AND
query NOT ILIKE '%citus_internal_local_blocked_processes%'
ORDER BY query, query_hostport DESC; ORDER BY query, query_hostport DESC;
query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname
--------------------------------------------------------------------- ---------------------------------------------------------------------
UPDATE public.ref_table_1500767 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression
UPDATE public.ref_table_1500767 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression
(2 rows) (2 rows)
step s2-end: step s2-end:

View File

@ -1012,6 +1012,8 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_check_connection_to_node(text,integer) boolean | function citus_check_connection_to_node(text,integer) boolean
| function citus_disable_node(text,integer,boolean) void | function citus_disable_node(text,integer,boolean) void
| function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) void | function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) void
| function citus_internal_global_blocked_processes() SETOF record
| function citus_internal_local_blocked_processes() SETOF record
| function citus_run_local_command(text) void | function citus_run_local_command(text) void
| function citus_shard_indexes_on_worker() SETOF record | function citus_shard_indexes_on_worker() SETOF record
| function citus_shards_on_worker() SETOF record | function citus_shards_on_worker() SETOF record
@ -1021,7 +1023,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| function worker_create_or_replace_object(text[]) boolean | function worker_create_or_replace_object(text[]) boolean
| function worker_drop_sequence_dependency(text) void | function worker_drop_sequence_dependency(text) void
| function worker_drop_shell_table(text) void | function worker_drop_shell_table(text) void
(18 rows) (20 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version -- show running version

View File

@ -74,6 +74,8 @@ ORDER BY 1;
function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint) function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint)
function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text)
function citus_internal_delete_shard_metadata(bigint) function citus_internal_delete_shard_metadata(bigint)
function citus_internal_global_blocked_processes()
function citus_internal_local_blocked_processes()
function citus_internal_update_placement_metadata(bigint,integer,integer) function citus_internal_update_placement_metadata(bigint,integer,integer)
function citus_internal_update_relation_colocation(oid,integer) function citus_internal_update_relation_colocation(oid,integer)
function citus_isolation_test_session_is_blocked(integer,integer[]) function citus_isolation_test_session_is_blocked(integer,integer[])
@ -270,5 +272,5 @@ ORDER BY 1;
view citus_worker_stat_activity view citus_worker_stat_activity
view pg_dist_shard_placement view pg_dist_shard_placement
view time_partitions view time_partitions
(254 rows) (256 rows)

View File

@ -73,6 +73,11 @@ step "s1-select-for-update"
SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE'); SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE');
} }
step "s1-update-dist-table-id-1"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1');
}
step "s1-commit-worker" step "s1-commit-worker"
{ {
SELECT run_commands_on_session_level_connection_to_node('COMMIT'); SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -115,6 +120,11 @@ step "s2-update-dist-table"
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5');
} }
step "s2-update-dist-table-id-1"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1');
}
step "s2-update-ref-table" step "s2-update-ref-table"
{ {
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
@ -149,7 +159,7 @@ session "s3"
step "s3-select-distributed-waiting-queries" step "s3-select-distributed-waiting-queries"
{ {
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
} }
// session s1 and s4 executes the commands on the same worker node // session s1 and s4 executes the commands on the same worker node
@ -196,3 +206,9 @@ permutation "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-
// blocked on the same node // blocked on the same node
permutation "s1-begin" "s1-update-on-the-coordinator" "s2-update-on-the-coordinator" "s3-select-distributed-waiting-queries" "s1-commit" permutation "s1-begin" "s1-update-on-the-coordinator" "s2-update-on-the-coordinator" "s3-select-distributed-waiting-queries" "s1-commit"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table" "s4-start-session-level-connection" "s4-begin-on-worker" "s4-update-dist-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s4-commit-worker" "s1-stop-connection" "s4-stop-connection" permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table" "s4-start-session-level-connection" "s4-begin-on-worker" "s4-update-dist-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s4-commit-worker" "s1-stop-connection" "s4-stop-connection"
// show that even if the commands are not in a transaction block
// we can find the blocking relationship
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table-id-1" "s2-start-session-level-connection" "s2-update-dist-table-id-1" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-begin" "s1-update-ref-table-from-coordinator" "s2-start-session-level-connection" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit" "s2-stop-connection"

View File

@ -93,7 +93,8 @@ step "s2-view-worker"
FROM citus_worker_stat_activity FROM citus_worker_stat_activity
WHERE query NOT ILIKE '%pg_prepared_xacts%' AND WHERE query NOT ILIKE '%pg_prepared_xacts%' AND
query NOT ILIKE '%COMMIT%' AND query NOT ILIKE '%COMMIT%' AND
query NOT ILIKE '%dump_local_wait_edges%' query NOT ILIKE '%dump_local_%' AND
query NOT ILIKE '%citus_internal_local_blocked_processes%'
ORDER BY query, query_hostport DESC; ORDER BY query, query_hostport DESC;
} }