Move citus_internal_local_blocked_processes and citus_internal_global_blocked_processes. Update the udfs referencing to them.

move_internal_functions
eaydingol 2024-01-31 22:12:01 +00:00
parent 89228fcb4a
commit bbd855ede3
17 changed files with 259 additions and 11 deletions

View File

@ -38,3 +38,9 @@ REVOKE ALL ON FUNCTION citus_internal.start_management_transaction FROM PUBLIC;
#include "udfs/citus_internal_delete_placement_metadata/12.2-1.sql"
#include "udfs/citus_internal_delete_shard_metadata/12.2-1.sql"
#include "udfs/citus_internal_delete_tenant_schema/12.2-1.sql"
#include "udfs/citus_internal_local_blocked_processes/12.2-1.sql"
#include "udfs/citus_internal_global_blocked_processes/12.2-1.sql"
#include "udfs/citus_blocking_pids/12.2-1.sql"
#include "udfs/citus_isolation_test_session_is_blocked/12.2-1.sql"
DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
#include "udfs/citus_lock_waits/12.2-1.sql"

View File

@ -35,3 +35,9 @@ DROP FUNCTION citus_internal.delete_partition_metadata(regclass);
DROP FUNCTION citus_internal.delete_placement_metadata(bigint);
DROP FUNCTION citus_internal.delete_shard_metadata(bigint);
DROP FUNCTION citus_internal.delete_tenant_schema(oid);
DROP FUNCTION citus_internal.local_blocked_processes();
#include "../udfs/citus_blocking_pids/11.0-1.sql"
#include "../udfs/citus_isolation_test_session_is_blocked/11.1-1.sql"
DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
#include "../udfs/citus_lock_waits/11.0-1.sql"
DROP FUNCTION citus_internal.global_blocked_processes;

View File

@ -0,0 +1,34 @@
DROP FUNCTION pg_catalog.citus_blocking_pids;
CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer)
RETURNS int4[] AS $$
DECLARE
mLocalBlockingPids int4[];
mRemoteBlockingPids int4[];
mLocalGlobalPid int8;
BEGIN
SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids;
IF (array_length(mLocalBlockingPids, 1) > 0) THEN
RETURN mLocalBlockingPids;
END IF;
-- pg says we're not blocked locally; check whether we're blocked globally.
SELECT global_pid INTO mLocalGlobalPid
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
SELECT array_agg(global_pid) INTO mRemoteBlockingPids FROM (
WITH activeTransactions AS (
SELECT global_pid FROM get_all_active_transactions()
), blockingTransactions AS (
SELECT blocking_global_pid FROM citus_internal.global_blocked_processes()
WHERE waiting_global_pid = mLocalGlobalPid
)
SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions
WHERE activeTransactions.global_pid = blockingTransactions.blocking_global_pid
) AS sub;
RETURN mRemoteBlockingPids;
END;
$$ LANGUAGE plpgsql;
REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC;

View File

@ -20,7 +20,7 @@ RETURNS int4[] AS $$
WITH activeTransactions AS (
SELECT global_pid FROM get_all_active_transactions()
), blockingTransactions AS (
SELECT blocking_global_pid FROM citus_internal_global_blocked_processes()
SELECT blocking_global_pid FROM citus_internal.global_blocked_processes()
WHERE waiting_global_pid = mLocalGlobalPid
)
SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions

View File

@ -0,0 +1,35 @@
CREATE OR REPLACE FUNCTION citus_internal.global_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_global_blocked_processes$$;
COMMENT ON FUNCTION citus_internal.global_blocked_processes()
IS 'returns a global list of blocked backends originating from this node';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_global_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_global_blocked_processes$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_global_blocked_processes()
IS 'returns a global list of blocked backends originating from this node';

View File

@ -1,3 +1,21 @@
CREATE OR REPLACE FUNCTION citus_internal.global_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_global_blocked_processes$$;
COMMENT ON FUNCTION citus_internal.global_blocked_processes()
IS 'returns a global list of blocked backends originating from this node';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_global_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,

View File

@ -0,0 +1,35 @@
CREATE OR REPLACE FUNCTION citus_internal.local_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_local_blocked_processes$$;
COMMENT ON FUNCTION citus_internal.local_blocked_processes()
IS 'returns all local lock wait chains, that start from any citus backend';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_local_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_local_blocked_processes$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_local_blocked_processes()
IS 'returns all local lock wait chains, that start from any citus backend';

View File

@ -1,3 +1,21 @@
CREATE OR REPLACE FUNCTION citus_internal.local_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_local_blocked_processes$$;
COMMENT ON FUNCTION citus_internal.local_blocked_processes()
IS 'returns all local lock wait chains, that start from any citus backend';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_local_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,

View File

@ -0,0 +1,45 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[])
RETURNS boolean AS $$
DECLARE
mBlockedGlobalPid int8;
workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id');
coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id');
BEGIN
IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN
RETURN true;
END IF;
-- pg says we're not blocked locally; check whether we're blocked globally.
-- Note that worker process may be blocked or waiting for a lock. So we need to
-- get transaction number for both of them. Following IF provides the transaction
-- number when the worker process waiting for other session.
IF EXISTS (SELECT 1 FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN
SELECT global_pid INTO mBlockedGlobalPid FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId;
ELSE
-- Check whether transactions initiated from the coordinator get locked
SELECT global_pid INTO mBlockedGlobalPid
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
END IF;
-- We convert the blocking_global_pid to a regular pid and only look at
-- blocks caused by the interesting pids, or the workerProcessPid. If we
-- don't do that we might find unrelated blocks caused by some random
-- other processes that are not involved in this isolation test. Because we
-- run our isolation tests on a single physical machine, the PID part of
-- the GPID is known to be unique within the whole cluster.
RETURN EXISTS (
SELECT 1 FROM citus_internal.global_blocked_processes()
WHERE waiting_global_pid = mBlockedGlobalPid
AND (
citus_pid_for_gpid(blocking_global_pid) in (
select * from unnest(pInterestingPids)
)
OR citus_pid_for_gpid(blocking_global_pid) = workerProcessId
)
);
END;
$$ LANGUAGE plpgsql;
REVOKE ALL ON FUNCTION citus_isolation_test_session_is_blocked(integer,integer[]) FROM PUBLIC;

View File

@ -30,7 +30,7 @@ RETURNS boolean AS $$
-- run our isolation tests on a single physical machine, the PID part of
-- the GPID is known to be unique within the whole cluster.
RETURN EXISTS (
SELECT 1 FROM citus_internal_global_blocked_processes()
SELECT 1 FROM citus_internal.global_blocked_processes()
WHERE waiting_global_pid = mBlockedGlobalPid
AND (
citus_pid_for_gpid(blocking_global_pid) in (

View File

@ -0,0 +1,47 @@
SET search_path = 'pg_catalog';
CREATE VIEW citus.citus_lock_waits AS
WITH
unique_global_wait_edges_with_calculated_gpids AS (
SELECT
-- if global_pid is NULL, it is most likely that a backend is blocked on a DDL
-- also for legacy reasons citus_internal.global_blocked_processes() returns groupId, we replace that with nodeIds
case WHEN waiting_global_pid !=0 THEN waiting_global_pid ELSE citus_calculate_gpid(get_nodeid_for_groupid(waiting_node_id), waiting_pid) END waiting_global_pid,
case WHEN blocking_global_pid !=0 THEN blocking_global_pid ELSE citus_calculate_gpid(get_nodeid_for_groupid(blocking_node_id), blocking_pid) END blocking_global_pid,
-- citus_internal.global_blocked_processes returns groupId, we replace it here with actual
-- nodeId to be consisten with the other views
get_nodeid_for_groupid(blocking_node_id) as blocking_node_id,
get_nodeid_for_groupid(waiting_node_id) as waiting_node_id,
blocking_transaction_waiting
FROM citus_internal.global_blocked_processes()
),
unique_global_wait_edges AS
(
SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM unique_global_wait_edges_with_calculated_gpids
),
citus_dist_stat_activity_with_calculated_gpids AS
(
-- if global_pid is NULL, it is most likely that a backend is blocked on a DDL
SELECT CASE WHEN global_pid != 0 THEN global_pid ELSE citus_calculate_gpid(nodeid, pid) END global_pid, nodeid, pid, query FROM citus_dist_stat_activity
)
SELECT
waiting.global_pid as waiting_gpid,
blocking.global_pid as blocking_gpid,
waiting.query AS blocked_statement,
blocking.query AS current_statement_in_blocking_process,
waiting.nodeid AS waiting_nodeid,
blocking.nodeid AS blocking_nodeid
FROM
unique_global_wait_edges
JOIN
citus_dist_stat_activity_with_calculated_gpids waiting ON (unique_global_wait_edges.waiting_global_pid = waiting.global_pid)
JOIN
citus_dist_stat_activity_with_calculated_gpids blocking ON (unique_global_wait_edges.blocking_global_pid = blocking.global_pid);
ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC;
RESET search_path;

View File

@ -5,18 +5,18 @@ WITH
unique_global_wait_edges_with_calculated_gpids AS (
SELECT
-- if global_pid is NULL, it is most likely that a backend is blocked on a DDL
-- also for legacy reasons citus_internal_global_blocked_processes() returns groupId, we replace that with nodeIds
-- also for legacy reasons citus_internal.global_blocked_processes() returns groupId, we replace that with nodeIds
case WHEN waiting_global_pid !=0 THEN waiting_global_pid ELSE citus_calculate_gpid(get_nodeid_for_groupid(waiting_node_id), waiting_pid) END waiting_global_pid,
case WHEN blocking_global_pid !=0 THEN blocking_global_pid ELSE citus_calculate_gpid(get_nodeid_for_groupid(blocking_node_id), blocking_pid) END blocking_global_pid,
-- citus_internal_global_blocked_processes returns groupId, we replace it here with actual
-- citus_internal.global_blocked_processes returns groupId, we replace it here with actual
-- nodeId to be consisten with the other views
get_nodeid_for_groupid(blocking_node_id) as blocking_node_id,
get_nodeid_for_groupid(waiting_node_id) as waiting_node_id,
blocking_transaction_waiting
FROM citus_internal_global_blocked_processes()
FROM citus_internal.global_blocked_processes()
),
unique_global_wait_edges AS
(

View File

@ -192,7 +192,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
"waiting_node_id, waiting_transaction_num, waiting_transaction_stamp, "
"blocking_global_pid,blocking_pid, blocking_node_id, "
"blocking_transaction_num, blocking_transaction_stamp, blocking_transaction_waiting "
"FROM citus_internal_local_blocked_processes()");
"FROM citus_internal.local_blocked_processes()");
}
int querySent = SendRemoteCommand(connection, queryString->data);
@ -226,7 +226,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
else if (!onlyDistributedTx && colCount != 11)
{
ereport(WARNING, (errmsg("unexpected number of columns from "
"citus_internal_local_blocked_processes")));
"citus_internal.local_blocked_processes")));
continue;
}

View File

@ -138,7 +138,7 @@ step s2-view-worker:
('%pg_prepared_xacts%'),
('%COMMIT%'),
('%dump_local_%'),
('%citus_internal_local_blocked_processes%'),
('%citus_internal.local_blocked_processes%'),
('%add_node%'),
('%csa_from_one_node%'),
('%pg_locks%'))

View File

@ -1438,9 +1438,11 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_internal.delete_shard_metadata(bigint) void
| function citus_internal.delete_tenant_schema(oid) void
| function citus_internal.execute_command_on_remote_nodes_as_user(text,text) void
| function citus_internal.global_blocked_processes() SETOF record
| function citus_internal.local_blocked_processes() SETOF record
| function citus_internal.mark_object_distributed(oid,text,oid,text) void
| function citus_internal.start_management_transaction(xid8) void
(18 rows)
(20 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -73,6 +73,8 @@ ORDER BY 1;
function citus_internal.delete_tenant_schema(oid)
function citus_internal.execute_command_on_remote_nodes_as_user(text,text)
function citus_internal.find_groupid_for_node(text,integer)
function citus_internal.global_blocked_processes()
function citus_internal.local_blocked_processes()
function citus_internal.mark_object_distributed(oid,text,oid,text)
function citus_internal.pg_dist_node_trigger_func()
function citus_internal.pg_dist_rebalance_strategy_trigger_func()
@ -361,5 +363,5 @@ ORDER BY 1;
view citus_stat_tenants_local
view pg_dist_shard_placement
view time_partitions
(351 rows)
(353 rows)

View File

@ -90,7 +90,7 @@ step "s2-view-worker"
('%pg_prepared_xacts%'),
('%COMMIT%'),
('%dump_local_%'),
('%citus_internal_local_blocked_processes%'),
('%citus_internal.local_blocked_processes%'),
('%add_node%'),
('%csa_from_one_node%'),
('%pg_locks%'))