mirror of https://github.com/citusdata/citus.git
Introduce citus_locks view
citus_locks combines the pg_locks views from all nodes and adds global_pid, nodeid, and relation_name. The columns of citus_locks don't change based on the Postgres version, however the pg_locks's columns do. Postgres 14 added one more column to pg_locks (waitstart timestamptz). citus_locks has the most expansive column set, including the newly added column. If citus_locks is queried in a Postgres version where pg_locks doesn't have some columns, the values for those columns in citus_locks will be NULLpull/6073/head
parent
3d569cc49a
commit
eb3e5ee227
|
@ -1,3 +1,5 @@
|
|||
#include "udfs/citus_locks/11.1-1.sql"
|
||||
|
||||
DROP FUNCTION pg_catalog.worker_create_schema(bigint,text);
|
||||
DROP FUNCTION pg_catalog.worker_cleanup_job_schema_cache();
|
||||
DROP FUNCTION pg_catalog.worker_fetch_foreign_file(text, text, bigint, text[], integer[]);
|
||||
|
|
|
@ -77,3 +77,6 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
|
|||
OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz,
|
||||
OUT global_pid int8);
|
||||
#include "../udfs/get_all_active_transactions/11.0-1.sql"
|
||||
|
||||
DROP VIEW pg_catalog.citus_locks;
|
||||
DROP FUNCTION pg_catalog.citus_locks();
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
-- citus_locks combines the pg_locks views from all nodes and adds global_pid, nodeid, and
|
||||
-- relation_name. The columns of citus_locks don't change based on the Postgres version,
|
||||
-- however the pg_locks's columns do. Postgres 14 added one more column to pg_locks
|
||||
-- (waitstart timestamptz). citus_locks has the most expansive column set, including the
|
||||
-- newly added column. If citus_locks is queried in a Postgres version where pg_locks
|
||||
-- doesn't have some columns, the values for those columns in citus_locks will be NULL
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_locks (
|
||||
OUT global_pid bigint,
|
||||
OUT nodeid int,
|
||||
OUT locktype text,
|
||||
OUT database oid,
|
||||
OUT relation oid,
|
||||
OUT relation_name text,
|
||||
OUT page integer,
|
||||
OUT tuple smallint,
|
||||
OUT virtualxid text,
|
||||
OUT transactionid xid,
|
||||
OUT classid oid,
|
||||
OUT objid oid,
|
||||
OUT objsubid smallint,
|
||||
OUT virtualtransaction text,
|
||||
OUT pid integer,
|
||||
OUT mode text,
|
||||
OUT granted boolean,
|
||||
OUT fastpath boolean,
|
||||
OUT waitstart timestamp with time zone
|
||||
)
|
||||
RETURNS SETOF record
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
BEGIN
|
||||
RETURN QUERY
|
||||
SELECT *
|
||||
FROM jsonb_to_recordset((
|
||||
SELECT
|
||||
jsonb_agg(all_citus_locks_rows_as_jsonb.citus_locks_row_as_jsonb)::jsonb
|
||||
FROM (
|
||||
SELECT
|
||||
jsonb_array_elements(run_command_on_all_nodes.result::jsonb)::jsonb ||
|
||||
('{"nodeid":' || run_command_on_all_nodes.nodeid || '}')::jsonb AS citus_locks_row_as_jsonb
|
||||
FROM
|
||||
run_command_on_all_nodes (
|
||||
$$
|
||||
SELECT
|
||||
coalesce(to_jsonb (array_agg(citus_locks_from_one_node.*)), '[{}]'::jsonb)
|
||||
FROM (
|
||||
SELECT
|
||||
global_pid, pg_locks.relation::regclass::text AS relation_name, pg_locks.*
|
||||
FROM pg_locks
|
||||
LEFT JOIN get_all_active_transactions () ON process_id = pid) AS citus_locks_from_one_node;
|
||||
$$,
|
||||
parallel:= TRUE,
|
||||
give_warning_for_connection_errors:= TRUE)
|
||||
WHERE
|
||||
success = 't')
|
||||
AS all_citus_locks_rows_as_jsonb))
|
||||
AS (
|
||||
global_pid bigint,
|
||||
nodeid int,
|
||||
locktype text,
|
||||
database oid,
|
||||
relation oid,
|
||||
relation_name text,
|
||||
page integer,
|
||||
tuple smallint,
|
||||
virtualxid text,
|
||||
transactionid xid,
|
||||
classid oid,
|
||||
objid oid,
|
||||
objsubid smallint,
|
||||
virtualtransaction text,
|
||||
pid integer,
|
||||
mode text,
|
||||
granted boolean,
|
||||
fastpath boolean,
|
||||
waitstart timestamp with time zone
|
||||
);
|
||||
END;
|
||||
$function$;
|
||||
|
||||
CREATE OR REPLACE VIEW citus.citus_locks AS
|
||||
SELECT * FROM pg_catalog.citus_locks();
|
||||
|
||||
ALTER VIEW citus.citus_locks SET SCHEMA pg_catalog;
|
||||
|
||||
GRANT SELECT ON pg_catalog.citus_locks TO PUBLIC;
|
|
@ -0,0 +1,86 @@
|
|||
-- citus_locks combines the pg_locks views from all nodes and adds global_pid, nodeid, and
|
||||
-- relation_name. The columns of citus_locks don't change based on the Postgres version,
|
||||
-- however the pg_locks's columns do. Postgres 14 added one more column to pg_locks
|
||||
-- (waitstart timestamptz). citus_locks has the most expansive column set, including the
|
||||
-- newly added column. If citus_locks is queried in a Postgres version where pg_locks
|
||||
-- doesn't have some columns, the values for those columns in citus_locks will be NULL
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_locks (
|
||||
OUT global_pid bigint,
|
||||
OUT nodeid int,
|
||||
OUT locktype text,
|
||||
OUT database oid,
|
||||
OUT relation oid,
|
||||
OUT relation_name text,
|
||||
OUT page integer,
|
||||
OUT tuple smallint,
|
||||
OUT virtualxid text,
|
||||
OUT transactionid xid,
|
||||
OUT classid oid,
|
||||
OUT objid oid,
|
||||
OUT objsubid smallint,
|
||||
OUT virtualtransaction text,
|
||||
OUT pid integer,
|
||||
OUT mode text,
|
||||
OUT granted boolean,
|
||||
OUT fastpath boolean,
|
||||
OUT waitstart timestamp with time zone
|
||||
)
|
||||
RETURNS SETOF record
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
BEGIN
|
||||
RETURN QUERY
|
||||
SELECT *
|
||||
FROM jsonb_to_recordset((
|
||||
SELECT
|
||||
jsonb_agg(all_citus_locks_rows_as_jsonb.citus_locks_row_as_jsonb)::jsonb
|
||||
FROM (
|
||||
SELECT
|
||||
jsonb_array_elements(run_command_on_all_nodes.result::jsonb)::jsonb ||
|
||||
('{"nodeid":' || run_command_on_all_nodes.nodeid || '}')::jsonb AS citus_locks_row_as_jsonb
|
||||
FROM
|
||||
run_command_on_all_nodes (
|
||||
$$
|
||||
SELECT
|
||||
coalesce(to_jsonb (array_agg(citus_locks_from_one_node.*)), '[{}]'::jsonb)
|
||||
FROM (
|
||||
SELECT
|
||||
global_pid, pg_locks.relation::regclass::text AS relation_name, pg_locks.*
|
||||
FROM pg_locks
|
||||
LEFT JOIN get_all_active_transactions () ON process_id = pid) AS citus_locks_from_one_node;
|
||||
$$,
|
||||
parallel:= TRUE,
|
||||
give_warning_for_connection_errors:= TRUE)
|
||||
WHERE
|
||||
success = 't')
|
||||
AS all_citus_locks_rows_as_jsonb))
|
||||
AS (
|
||||
global_pid bigint,
|
||||
nodeid int,
|
||||
locktype text,
|
||||
database oid,
|
||||
relation oid,
|
||||
relation_name text,
|
||||
page integer,
|
||||
tuple smallint,
|
||||
virtualxid text,
|
||||
transactionid xid,
|
||||
classid oid,
|
||||
objid oid,
|
||||
objsubid smallint,
|
||||
virtualtransaction text,
|
||||
pid integer,
|
||||
mode text,
|
||||
granted boolean,
|
||||
fastpath boolean,
|
||||
waitstart timestamp with time zone
|
||||
);
|
||||
END;
|
||||
$function$;
|
||||
|
||||
CREATE OR REPLACE VIEW citus.citus_locks AS
|
||||
SELECT * FROM pg_catalog.citus_locks();
|
||||
|
||||
ALTER VIEW citus.citus_locks SET SCHEMA pg_catalog;
|
||||
|
||||
GRANT SELECT ON pg_catalog.citus_locks TO PUBLIC;
|
|
@ -1,4 +1,4 @@
|
|||
test: upgrade_basic_after upgrade_type_after upgrade_ref2ref_after upgrade_distributed_function_after upgrade_rebalance_strategy_after upgrade_list_citus_objects upgrade_autoconverted_after upgrade_citus_stat_activity
|
||||
test: upgrade_basic_after upgrade_type_after upgrade_ref2ref_after upgrade_distributed_function_after upgrade_rebalance_strategy_after upgrade_list_citus_objects upgrade_autoconverted_after upgrade_citus_stat_activity upgrade_citus_locks
|
||||
|
||||
# This attempts dropping citus extension (and rollbacks), so please do
|
||||
# not run in parallel with any other tests.
|
||||
|
|
|
@ -7,6 +7,7 @@ test: upgrade_type_before
|
|||
test: upgrade_distributed_function_before upgrade_rebalance_strategy_before
|
||||
test: upgrade_autoconverted_before
|
||||
test: upgrade_citus_stat_activity
|
||||
test: upgrade_citus_locks
|
||||
|
||||
# upgrade_columnar_before renames public schema to citus_schema, so let's
|
||||
# run this test as the last one.
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
CREATE SCHEMA citus_locks;
|
||||
SET search_path TO citus_locks;
|
||||
SET citus.next_shard_id TO 1000;
|
||||
CREATE TABLE dist_locked_table(id int, data text);
|
||||
SELECT create_distributed_table('dist_locked_table', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
-- Alter a distributed table so that we get some locks
|
||||
ALTER TABLE dist_locked_table ADD COLUMN new_data_column text;
|
||||
-- list the locks on relations for current distributed transaction
|
||||
SELECT relation_name, citus_nodename_for_nodeid(nodeid), citus_nodeport_for_nodeid(nodeid), mode, granted
|
||||
FROM citus_locks
|
||||
WHERE global_pid = citus_backend_gpid() AND locktype = 'relation' AND relation_name LIKE '%dist_locked_table%'
|
||||
ORDER BY 1, 2, 3, 4;
|
||||
relation_name | citus_nodename_for_nodeid | citus_nodeport_for_nodeid | mode | granted
|
||||
---------------------------------------------------------------------
|
||||
citus_locks.dist_locked_table | localhost | 57636 | AccessExclusiveLock | t
|
||||
citus_locks.dist_locked_table | localhost | 57636 | AccessShareLock | t
|
||||
citus_locks.dist_locked_table | localhost | 57637 | AccessExclusiveLock | t
|
||||
citus_locks.dist_locked_table | localhost | 57637 | AccessShareLock | t
|
||||
citus_locks.dist_locked_table | localhost | 57638 | AccessExclusiveLock | t
|
||||
citus_locks.dist_locked_table | localhost | 57638 | AccessShareLock | t
|
||||
citus_locks.dist_locked_table_1000 | localhost | 57637 | AccessExclusiveLock | t
|
||||
citus_locks.dist_locked_table_1000 | localhost | 57637 | AccessShareLock | t
|
||||
citus_locks.dist_locked_table_1000 | localhost | 57638 | AccessExclusiveLock | t
|
||||
citus_locks.dist_locked_table_1000 | localhost | 57638 | AccessShareLock | t
|
||||
citus_locks.dist_locked_table_1001 | localhost | 57637 | AccessExclusiveLock | t
|
||||
citus_locks.dist_locked_table_1001 | localhost | 57637 | AccessShareLock | t
|
||||
citus_locks.dist_locked_table_1001 | localhost | 57638 | AccessExclusiveLock | t
|
||||
citus_locks.dist_locked_table_1001 | localhost | 57638 | AccessShareLock | t
|
||||
citus_locks.dist_locked_table_1002 | localhost | 57637 | AccessExclusiveLock | t
|
||||
citus_locks.dist_locked_table_1002 | localhost | 57637 | AccessShareLock | t
|
||||
citus_locks.dist_locked_table_1002 | localhost | 57638 | AccessExclusiveLock | t
|
||||
citus_locks.dist_locked_table_1002 | localhost | 57638 | AccessShareLock | t
|
||||
citus_locks.dist_locked_table_1003 | localhost | 57637 | AccessExclusiveLock | t
|
||||
citus_locks.dist_locked_table_1003 | localhost | 57637 | AccessShareLock | t
|
||||
citus_locks.dist_locked_table_1003 | localhost | 57638 | AccessExclusiveLock | t
|
||||
citus_locks.dist_locked_table_1003 | localhost | 57638 | AccessShareLock | t
|
||||
(22 rows)
|
||||
|
||||
ROLLBACK;
|
||||
DROP SCHEMA citus_locks CASCADE;
|
||||
NOTICE: drop cascades to table dist_locked_table
|
|
@ -0,0 +1,69 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-record-gpid s1-begin s2-show-locks s1-alter-dist-table s2-show-locks s1-commit s2-show-locks
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-record-gpid:
|
||||
SELECT citus_backend_gpid() INTO selected_gpid;
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-show-locks:
|
||||
SELECT relation_name, citus_nodename_for_nodeid(nodeid), citus_nodeport_for_nodeid(nodeid), mode
|
||||
FROM citus_locks
|
||||
WHERE global_pid IN (SELECT * FROM selected_gpid) AND relation_name LIKE 'dist_table%'
|
||||
ORDER BY 1, 2, 3, 4;
|
||||
|
||||
relation_name|citus_nodename_for_nodeid|citus_nodeport_for_nodeid|mode
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
step s1-alter-dist-table:
|
||||
ALTER TABLE dist_table ADD COLUMN data text;
|
||||
|
||||
step s2-show-locks:
|
||||
SELECT relation_name, citus_nodename_for_nodeid(nodeid), citus_nodeport_for_nodeid(nodeid), mode
|
||||
FROM citus_locks
|
||||
WHERE global_pid IN (SELECT * FROM selected_gpid) AND relation_name LIKE 'dist_table%'
|
||||
ORDER BY 1, 2, 3, 4;
|
||||
|
||||
relation_name |citus_nodename_for_nodeid|citus_nodeport_for_nodeid|mode
|
||||
---------------------------------------------------------------------
|
||||
dist_table |localhost | 57636|AccessExclusiveLock
|
||||
dist_table |localhost | 57636|AccessShareLock
|
||||
dist_table |localhost | 57637|AccessExclusiveLock
|
||||
dist_table |localhost | 57637|AccessShareLock
|
||||
dist_table |localhost | 57638|AccessExclusiveLock
|
||||
dist_table |localhost | 57638|AccessShareLock
|
||||
dist_table_12345000|localhost | 57637|AccessExclusiveLock
|
||||
dist_table_12345000|localhost | 57637|AccessShareLock
|
||||
dist_table_12345001|localhost | 57638|AccessExclusiveLock
|
||||
dist_table_12345001|localhost | 57638|AccessShareLock
|
||||
dist_table_12345002|localhost | 57637|AccessExclusiveLock
|
||||
dist_table_12345002|localhost | 57637|AccessShareLock
|
||||
dist_table_12345003|localhost | 57638|AccessExclusiveLock
|
||||
dist_table_12345003|localhost | 57638|AccessShareLock
|
||||
(14 rows)
|
||||
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-show-locks:
|
||||
SELECT relation_name, citus_nodename_for_nodeid(nodeid), citus_nodeport_for_nodeid(nodeid), mode
|
||||
FROM citus_locks
|
||||
WHERE global_pid IN (SELECT * FROM selected_gpid) AND relation_name LIKE 'dist_table%'
|
||||
ORDER BY 1, 2, 3, 4;
|
||||
|
||||
relation_name|citus_nodename_for_nodeid|citus_nodeport_for_nodeid|mode
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
citus_remove_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
|
@ -1095,10 +1095,12 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
table columnar.chunk_group |
|
||||
table columnar.options |
|
||||
table columnar.stripe |
|
||||
| function citus_locks() SETOF record
|
||||
| function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void
|
||||
| function worker_split_copy(bigint,split_copy_info[]) void
|
||||
| type split_copy_info
|
||||
(24 rows)
|
||||
| view citus_locks
|
||||
(26 rows)
|
||||
|
||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
SELECT column_name FROM information_schema.columns WHERE table_name = 'citus_locks' AND column_name NOT IN ('waitstart')
|
||||
EXCEPT SELECT column_name FROM information_schema.columns WHERE table_name = 'pg_locks'
|
||||
ORDER BY 1;
|
||||
column_name
|
||||
---------------------------------------------------------------------
|
||||
global_pid
|
||||
nodeid
|
||||
relation_name
|
||||
(3 rows)
|
||||
|
||||
SELECT column_name FROM information_schema.columns WHERE table_name = 'pg_locks'
|
||||
EXCEPT SELECT column_name FROM information_schema.columns WHERE table_name = 'citus_locks'
|
||||
ORDER BY 1;
|
||||
column_name
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
|
@ -77,6 +77,7 @@ ORDER BY 1;
|
|||
function citus_jsonb_concatenate(jsonb,jsonb)
|
||||
function citus_jsonb_concatenate_final(jsonb)
|
||||
function citus_local_disk_space_stats()
|
||||
function citus_locks()
|
||||
function citus_move_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode)
|
||||
function citus_node_capacity_1(integer)
|
||||
function citus_nodeid_for_gpid(bigint)
|
||||
|
@ -253,6 +254,7 @@ ORDER BY 1;
|
|||
type split_copy_info
|
||||
view citus_dist_stat_activity
|
||||
view citus_lock_waits
|
||||
view citus_locks
|
||||
view citus_schema.citus_tables
|
||||
view citus_shard_indexes_on_worker
|
||||
view citus_shards
|
||||
|
@ -261,5 +263,5 @@ ORDER BY 1;
|
|||
view citus_stat_statements
|
||||
view pg_dist_shard_placement
|
||||
view time_partitions
|
||||
(253 rows)
|
||||
(255 rows)
|
||||
|
||||
|
|
|
@ -67,6 +67,7 @@ test: isolation_max_client_connections
|
|||
test: isolation_undistribute_table
|
||||
test: isolation_fix_partition_shard_index_names
|
||||
test: isolation_global_pid
|
||||
test: isolation_citus_locks
|
||||
|
||||
# Rebalancer
|
||||
test: isolation_blocking_move_single_shard_commands
|
||||
|
|
|
@ -287,6 +287,7 @@ test: create_citus_local_table_cascade
|
|||
test: fkeys_between_local_ref
|
||||
test: auto_undist_citus_local
|
||||
test: mx_regular_user
|
||||
test: citus_locks
|
||||
test: global_cancel
|
||||
test: remove_coordinator
|
||||
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
#include "isolation_mx_common.include.spec"
|
||||
|
||||
setup
|
||||
{
|
||||
SELECT citus_add_node('localhost', 57636, groupid:=0);
|
||||
SET citus.next_shard_id TO 12345000;
|
||||
CREATE TABLE dist_table (a INT, b INT);
|
||||
SELECT create_distributed_table('dist_table', 'a', shard_count:=4);
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE dist_table, selected_gpid;
|
||||
SELECT citus_remove_node('localhost', 57636);
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
||||
step "s1-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s1-alter-dist-table"
|
||||
{
|
||||
ALTER TABLE dist_table ADD COLUMN data text;
|
||||
}
|
||||
|
||||
step "s1-record-gpid"
|
||||
{
|
||||
SELECT citus_backend_gpid() INTO selected_gpid;
|
||||
}
|
||||
|
||||
step "s1-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
session "s2"
|
||||
|
||||
step "s2-show-locks"
|
||||
{
|
||||
SELECT relation_name, citus_nodename_for_nodeid(nodeid), citus_nodeport_for_nodeid(nodeid), mode
|
||||
FROM citus_locks
|
||||
WHERE global_pid IN (SELECT * FROM selected_gpid) AND relation_name LIKE 'dist_table%'
|
||||
ORDER BY 1, 2, 3, 4;
|
||||
}
|
||||
|
||||
permutation "s1-record-gpid" "s1-begin" "s2-show-locks" "s1-alter-dist-table" "s2-show-locks" "s1-commit" "s2-show-locks"
|
|
@ -0,0 +1,20 @@
|
|||
CREATE SCHEMA citus_locks;
|
||||
SET search_path TO citus_locks;
|
||||
SET citus.next_shard_id TO 1000;
|
||||
|
||||
CREATE TABLE dist_locked_table(id int, data text);
|
||||
SELECT create_distributed_table('dist_locked_table', 'id');
|
||||
|
||||
BEGIN;
|
||||
-- Alter a distributed table so that we get some locks
|
||||
ALTER TABLE dist_locked_table ADD COLUMN new_data_column text;
|
||||
|
||||
-- list the locks on relations for current distributed transaction
|
||||
SELECT relation_name, citus_nodename_for_nodeid(nodeid), citus_nodeport_for_nodeid(nodeid), mode, granted
|
||||
FROM citus_locks
|
||||
WHERE global_pid = citus_backend_gpid() AND locktype = 'relation' AND relation_name LIKE '%dist_locked_table%'
|
||||
ORDER BY 1, 2, 3, 4;
|
||||
|
||||
ROLLBACK;
|
||||
|
||||
DROP SCHEMA citus_locks CASCADE;
|
|
@ -0,0 +1,7 @@
|
|||
SELECT column_name FROM information_schema.columns WHERE table_name = 'citus_locks' AND column_name NOT IN ('waitstart')
|
||||
EXCEPT SELECT column_name FROM information_schema.columns WHERE table_name = 'pg_locks'
|
||||
ORDER BY 1;
|
||||
|
||||
SELECT column_name FROM information_schema.columns WHERE table_name = 'pg_locks'
|
||||
EXCEPT SELECT column_name FROM information_schema.columns WHERE table_name = 'citus_locks'
|
||||
ORDER BY 1;
|
Loading…
Reference in New Issue