mirror of https://github.com/citusdata/citus.git
85 lines
4.0 KiB
PL/PgSQL
85 lines
4.0 KiB
PL/PgSQL
/* citus--8.0-5--8.0-6 */
|
|
SET search_path = 'pg_catalog';
|
|
|
|
CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz)
|
|
RETURNS SETOF RECORD
|
|
LANGUAGE C STRICT
|
|
AS 'MODULE_PATHNAME', $$get_global_active_transactions$$;
|
|
COMMENT ON FUNCTION get_global_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz)
|
|
IS 'returns distributed transaction ids of active distributed transactions from each node of the cluster';
|
|
|
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer)
|
|
RETURNS int4[] AS $$
|
|
DECLARE
|
|
mLocalBlockingPids int4[];
|
|
mRemoteBlockingPids int4[];
|
|
mLocalTransactionNum int8;
|
|
workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id');
|
|
coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id');
|
|
BEGIN
|
|
SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids;
|
|
|
|
IF (array_length(mLocalBlockingPids, 1) > 0) THEN
|
|
RETURN mLocalBlockingPids;
|
|
END IF;
|
|
|
|
-- pg says we're not blocked locally; check whether we're blocked globally.
|
|
-- Note that worker process may be blocked or waiting for a lock. So we need to
|
|
-- get transaction number for both of them. Following IF provides the transaction
|
|
-- number when the worker process waiting for other session.
|
|
IF EXISTS (SELECT transaction_number FROM get_global_active_transactions()
|
|
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN
|
|
|
|
SELECT transaction_number INTO mLocalTransactionNum
|
|
FROM get_global_active_transactions() WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId;
|
|
ELSE
|
|
-- Check whether transactions initiated from the coordinator get locked
|
|
SELECT transaction_number INTO mLocalTransactionNum
|
|
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
|
|
END IF;
|
|
|
|
IF EXISTS (SELECT waiting_transaction_num FROM dump_global_wait_edges()
|
|
WHERE waiting_transaction_num = mLocalTransactionNum) THEN
|
|
SELECT array_agg(pBlockedPid) INTO mRemoteBlockingPids;
|
|
END IF;
|
|
|
|
RETURN mRemoteBlockingPids;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[])
|
|
RETURNS boolean AS $$
|
|
DECLARE
|
|
mBlockedTransactionNum int8;
|
|
workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id');
|
|
coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id');
|
|
BEGIN
|
|
IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN
|
|
RETURN true;
|
|
END IF;
|
|
|
|
-- pg says we're not blocked locally; check whether we're blocked globally.
|
|
-- Note that worker process may be blocked or waiting for a lock. So we need to
|
|
-- get transaction number for both of them. Following IF provides the transaction
|
|
-- number when the worker process waiting for other session.
|
|
IF EXISTS (SELECT transaction_number FROM get_global_active_transactions()
|
|
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN
|
|
SELECT transaction_number INTO mBlockedTransactionNum FROM get_global_active_transactions()
|
|
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId;
|
|
ELSE
|
|
-- Check whether transactions initiated from the coordinator get locked
|
|
SELECT transaction_number INTO mBlockedTransactionNum
|
|
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
|
|
END IF;
|
|
|
|
RETURN EXISTS (
|
|
SELECT 1 FROM dump_global_wait_edges()
|
|
WHERE waiting_transaction_num = mBlockedTransactionNum
|
|
);
|
|
|
|
END;
|
|
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
RESET search_path;
|