Adjust the helper UDFs for isolation tests

And add new tests to show citus_lock_wait can detect non-tx commands as well
add_worker_query_again
Onder Kalaci 2022-02-10 10:00:35 +01:00
parent a46e8f29d1
commit 2c7040486f
8 changed files with 371 additions and 6 deletions

View File

@ -21,6 +21,8 @@
#include "udfs/dump_global_wait_edges/11.0-1.sql" #include "udfs/dump_global_wait_edges/11.0-1.sql"
#include "udfs/citus_worker_stat_activity/11.0-1.sql" #include "udfs/citus_worker_stat_activity/11.0-1.sql"
#include "udfs/citus_isolation_test_session_is_blocked/11.0-1.sql"
#include "udfs/citus_blocking_pids/11.0-1.sql"
CREATE VIEW citus.citus_worker_stat_activity AS CREATE VIEW citus.citus_worker_stat_activity AS
SELECT * FROM pg_catalog.citus_worker_stat_activity(); SELECT * FROM pg_catalog.citus_worker_stat_activity();

View File

@ -242,4 +242,96 @@ SELECT * FROM pg_catalog.citus_worker_stat_activity();
ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog; ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC; GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC;
DROP FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[]);
CREATE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[])
RETURNS boolean AS $$
DECLARE
mBlockedTransactionNum int8;
workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id');
coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id');
BEGIN
IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN
RETURN true;
END IF;
-- pg says we're not blocked locally; check whether we're blocked globally.
-- Note that worker process may be blocked or waiting for a lock. So we need to
-- get transaction number for both of them. Following IF provides the transaction
-- number when the worker process waiting for other session.
IF EXISTS (SELECT transaction_number FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN
SELECT transaction_number INTO mBlockedTransactionNum FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId;
ELSE
-- Check whether transactions initiated from the coordinator get locked
SELECT transaction_number INTO mBlockedTransactionNum
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
END IF;
RETURN EXISTS (
SELECT 1 FROM dump_global_wait_edges()
WHERE waiting_transaction_num = mBlockedTransactionNum
) OR EXISTS (
-- Check on the workers if any logical replication job spawned by the
-- current PID is blocked, by checking it's application name
-- Query is heavily based on: https://wiki.postgresql.org/wiki/Lock_Monitoring
SELECT result FROM run_command_on_workers($two$
SELECT blocked_activity.application_name AS blocked_application
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks
ON blocking_locks.locktype = blocked_locks.locktype
AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE
AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.GRANTED AND blocked_activity.application_name LIKE 'citus_shard_move_subscription_%'
$two$) where result='citus_shard_move_subscription_' || pBlockedPid);
END;
$$ LANGUAGE plpgsql;
REVOKE ALL ON FUNCTION citus_isolation_test_session_is_blocked(integer,integer[]) FROM PUBLIC;
DROP FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer);
CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer)
RETURNS int4[] AS $$
DECLARE
mLocalBlockingPids int4[];
mRemoteBlockingPids int4[];
mLocalTransactionNum int8;
BEGIN
SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids;
IF (array_length(mLocalBlockingPids, 1) > 0) THEN
RETURN mLocalBlockingPids;
END IF;
-- pg says we're not blocked locally; check whether we're blocked globally.
SELECT transaction_number INTO mLocalTransactionNum
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
SELECT array_agg(process_id) INTO mRemoteBlockingPids FROM (
WITH activeTransactions AS (
SELECT process_id, transaction_number FROM get_all_active_transactions()
), blockingTransactions AS (
SELECT blocking_transaction_num AS txn_num FROM dump_global_wait_edges()
WHERE waiting_transaction_num = mLocalTransactionNum
)
SELECT activeTransactions.process_id FROM activeTransactions, blockingTransactions
WHERE activeTransactions.transaction_number = blockingTransactions.txn_num
) AS sub;
RETURN mRemoteBlockingPids;
END;
$$ LANGUAGE plpgsql;
REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC;
RESET search_path; RESET search_path;

View File

@ -0,0 +1,34 @@
DROP FUNCTION pg_catalog.citus_blocking_pids;
CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer)
RETURNS int4[] AS $$
DECLARE
mLocalBlockingPids int4[];
mRemoteBlockingPids int4[];
mLocalGlobalPid int8;
BEGIN
SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids;
IF (array_length(mLocalBlockingPids, 1) > 0) THEN
RETURN mLocalBlockingPids;
END IF;
-- pg says we're not blocked locally; check whether we're blocked globally.
SELECT global_pid INTO mLocalGlobalPid
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
SELECT array_agg(global_pid) INTO mRemoteBlockingPids FROM (
WITH activeTransactions AS (
SELECT global_pid FROM get_all_active_transactions()
), blockingTransactions AS (
SELECT blocking_global_pid FROM dump_global_wait_edges(distributed_tx_only:=false)
WHERE waiting_global_pid = mLocalGlobalPid
)
SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions
WHERE activeTransactions.global_pid = blockingTransactions.blocking_global_pid
) AS sub;
RETURN mRemoteBlockingPids;
END;
$$ LANGUAGE plpgsql;
REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC;

View File

@ -0,0 +1,34 @@
DROP FUNCTION pg_catalog.citus_blocking_pids;
CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer)
RETURNS int4[] AS $$
DECLARE
mLocalBlockingPids int4[];
mRemoteBlockingPids int4[];
mLocalGlobalPid int8;
BEGIN
SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids;
IF (array_length(mLocalBlockingPids, 1) > 0) THEN
RETURN mLocalBlockingPids;
END IF;
-- pg says we're not blocked locally; check whether we're blocked globally.
SELECT global_pid INTO mLocalGlobalPid
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
SELECT array_agg(global_pid) INTO mRemoteBlockingPids FROM (
WITH activeTransactions AS (
SELECT global_pid FROM get_all_active_transactions()
), blockingTransactions AS (
SELECT blocking_global_pid FROM dump_global_wait_edges(distributed_tx_only:=false)
WHERE waiting_global_pid = mLocalGlobalPid
)
SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions
WHERE activeTransactions.global_pid = blockingTransactions.blocking_global_pid
) AS sub;
RETURN mRemoteBlockingPids;
END;
$$ LANGUAGE plpgsql;
REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC;

View File

@ -0,0 +1,56 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[])
RETURNS boolean AS $$
DECLARE
mBlockedGlobalPid int8;
workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id');
coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id');
BEGIN
IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN
RETURN true;
END IF;
-- pg says we're not blocked locally; check whether we're blocked globally.
-- Note that worker process may be blocked or waiting for a lock. So we need to
-- get transaction number for both of them. Following IF provides the transaction
-- number when the worker process waiting for other session.
IF EXISTS (SELECT 1 FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN
SELECT global_pid INTO mBlockedGlobalPid FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId;
ELSE
-- Check whether transactions initiated from the coordinator get locked
SELECT global_pid INTO mBlockedGlobalPid
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
END IF;
RETURN EXISTS (
SELECT 1 FROM dump_global_wait_edges(distributed_tx_only:=false)
WHERE waiting_global_pid = mBlockedGlobalPid
) OR EXISTS (
-- Check on the workers if any logical replication job spawned by the
-- current PID is blocked, by checking it's application name
-- Query is heavily based on: https://wiki.postgresql.org/wiki/Lock_Monitoring
SELECT result FROM run_command_on_workers($two$
SELECT blocked_activity.application_name AS blocked_application
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks
ON blocking_locks.locktype = blocked_locks.locktype
AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE
AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.GRANTED AND blocked_activity.application_name LIKE 'citus_shard_move_subscription_%'
$two$) where result='citus_shard_move_subscription_' || pBlockedPid);
END;
$$ LANGUAGE plpgsql;
REVOKE ALL ON FUNCTION citus_isolation_test_session_is_blocked(integer,integer[]) FROM PUBLIC;

View File

@ -1,7 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[]) CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[])
RETURNS boolean AS $$ RETURNS boolean AS $$
DECLARE DECLARE
mBlockedTransactionNum int8; mBlockedGlobalPid int8;
workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id'); workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id');
coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id'); coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id');
BEGIN BEGIN
@ -13,19 +13,19 @@ RETURNS boolean AS $$
-- Note that worker process may be blocked or waiting for a lock. So we need to -- Note that worker process may be blocked or waiting for a lock. So we need to
-- get transaction number for both of them. Following IF provides the transaction -- get transaction number for both of them. Following IF provides the transaction
-- number when the worker process waiting for other session. -- number when the worker process waiting for other session.
IF EXISTS (SELECT transaction_number FROM get_global_active_transactions() IF EXISTS (SELECT 1 FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN
SELECT transaction_number INTO mBlockedTransactionNum FROM get_global_active_transactions() SELECT global_pid INTO mBlockedGlobalPid FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId; WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId;
ELSE ELSE
-- Check whether transactions initiated from the coordinator get locked -- Check whether transactions initiated from the coordinator get locked
SELECT transaction_number INTO mBlockedTransactionNum SELECT global_pid INTO mBlockedGlobalPid
FROM get_all_active_transactions() WHERE process_id = pBlockedPid; FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
END IF; END IF;
RETURN EXISTS ( RETURN EXISTS (
SELECT 1 FROM dump_global_wait_edges() SELECT 1 FROM dump_global_wait_edges(distributed_tx_only:=false)
WHERE waiting_transaction_num = mBlockedTransactionNum WHERE waiting_global_pid = mBlockedGlobalPid
) OR EXISTS ( ) OR EXISTS (
-- Check on the workers if any logical replication job spawned by the -- Check on the workers if any logical replication job spawned by the
-- current PID is blocked, by checking it's application name -- current PID is blocked, by checking it's application name

View File

@ -1119,3 +1119,134 @@ restore_isolation_tester_func
(1 row) (1 row)
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-dist-table-id-1 s2-start-session-level-connection s2-update-dist-table-id-1 s3-select-distributed-waiting-queries s1-commit-worker s1-stop-connection s2-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s1-update-dist-table-id-1:
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-update-dist-table-id-1:
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1');
<waiting ...>
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
---------------------------------------------------------------------
UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|localhost |localhost | 57638| 57637
(1 row)
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-update-dist-table-id-1: <... completed>
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-update-ref-table-from-coordinator s2-start-session-level-connection s2-update-ref-table s3-select-distributed-waiting-queries s1-commit s2-stop-connection
step s1-begin:
BEGIN;
step s1-update-ref-table-from-coordinator:
UPDATE ref_table SET value_1 = 15;
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-update-ref-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
<waiting ...>
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%';
blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port
---------------------------------------------------------------------
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1|
UPDATE ref_table SET value_1 = 15;
|localhost |coordinator_host | 57638| 57636
(1 row)
step s1-commit:
COMMIT;
step s2-update-ref-table: <... completed>
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)

View File

@ -73,6 +73,11 @@ step "s1-select-for-update"
SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE'); SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE');
} }
step "s1-update-dist-table-id-1"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1');
}
step "s1-commit-worker" step "s1-commit-worker"
{ {
SELECT run_commands_on_session_level_connection_to_node('COMMIT'); SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -115,6 +120,11 @@ step "s2-update-dist-table"
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5');
} }
step "s2-update-dist-table-id-1"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1');
}
step "s2-update-ref-table" step "s2-update-ref-table"
{ {
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
@ -196,3 +206,9 @@ permutation "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-
// blocked on the same node // blocked on the same node
permutation "s1-begin" "s1-update-on-the-coordinator" "s2-update-on-the-coordinator" "s3-select-distributed-waiting-queries" "s1-commit" permutation "s1-begin" "s1-update-on-the-coordinator" "s2-update-on-the-coordinator" "s3-select-distributed-waiting-queries" "s1-commit"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table" "s4-start-session-level-connection" "s4-begin-on-worker" "s4-update-dist-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s4-commit-worker" "s1-stop-connection" "s4-stop-connection" permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table" "s4-start-session-level-connection" "s4-begin-on-worker" "s4-update-dist-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s4-commit-worker" "s1-stop-connection" "s4-stop-connection"
// show that even if the commands are not in a transaction block
// we can find the blocking relationship
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table-id-1" "s2-start-session-level-connection" "s2-update-dist-table-id-1" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-begin" "s1-update-ref-table-from-coordinator" "s2-start-session-level-connection" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit" "s2-stop-connection"