citus/src/backend/distributed/sql/citus--8.0-5--8.0-6.sql

53 lines
2.5 KiB
PL/PgSQL

-- citus--8.0-5--8.0-6
SET search_path = 'pg_catalog';
CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$get_global_active_transactions$$;
COMMENT ON FUNCTION get_global_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz)
IS 'returns distributed transaction ids of active distributed transactions from each node of the cluster';
CREATE OR REPLACE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer)
RETURNS int4[] AS $$
DECLARE
mLocalBlockingPids int4[];
mRemoteBlockingPids int4[];
mLocalTransactionNum int8;
workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id');
coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id');
BEGIN
SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids;
IF (array_length(mLocalBlockingPids, 1) > 0) THEN
RETURN mLocalBlockingPids;
END IF;
-- pg says we're not blocked locally; check whether we're blocked globally.
-- Note that worker process may be blocked or waiting for a lock. So we need to
-- get transaction number for both of them. Following IF provides the transaction
-- number when the worker process waiting for other session.
IF EXISTS (SELECT transaction_number FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN
SELECT transaction_number INTO mLocalTransactionNum
FROM get_global_active_transactions() WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId;
ELSE
-- Check whether transactions initiated from the coordinator get locked
SELECT transaction_number INTO mLocalTransactionNum
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
END IF;
IF EXISTS (SELECT waiting_transaction_num FROM dump_global_wait_edges()
WHERE waiting_transaction_num = mLocalTransactionNum) THEN
SELECT array_agg(pBlockedPid) INTO mRemoteBlockingPids;
END IF;
RETURN mRemoteBlockingPids;
END;
$$ LANGUAGE plpgsql;
#include "udfs/citus_isolation_test_session_is_blocked/8.0-6.sql"
RESET search_path;