Add citus_backend_gpid()

And also citus_calculate_gpid(nodeId,pid). These UDFs are just
wrappers for the existing functions. Useful for testing and simple
manipulation of citus_stat_activity.
pull/5755/head
Onder Kalaci 2022-03-03 15:08:31 +01:00
parent 90974fdc8f
commit c7b67ba0ea
11 changed files with 110 additions and 10 deletions

View File

@ -26,6 +26,8 @@
#include "udfs/worker_create_or_replace_object/11.0-1.sql"
#include "udfs/citus_isolation_test_session_is_blocked/11.0-1.sql"
#include "udfs/citus_blocking_pids/11.0-1.sql"
#include "udfs/citus_calculate_gpid/11.0-1.sql"
#include "udfs/citus_backend_gpid/11.0-1.sql"
CREATE VIEW citus.citus_worker_stat_activity AS
SELECT * FROM pg_catalog.citus_worker_stat_activity();

View File

@ -353,6 +353,8 @@ ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC;
DROP FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool);
DROP FUNCTION pg_catalog.citus_calculate_gpid(integer,integer);
DROP FUNCTION pg_catalog.citus_backend_gpid();
RESET search_path;

View File

@ -0,0 +1,8 @@
CREATE FUNCTION pg_catalog.citus_backend_gpid()
RETURNS BIGINT
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_backend_gpid$$;
COMMENT ON FUNCTION pg_catalog.citus_backend_gpid()
IS 'returns gpid of the current backend';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_backend_gpid() TO PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE FUNCTION pg_catalog.citus_backend_gpid()
RETURNS BIGINT
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_backend_gpid$$;
COMMENT ON FUNCTION pg_catalog.citus_backend_gpid()
IS 'returns gpid of the current backend';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_backend_gpid() TO PUBLIC;

View File

@ -0,0 +1,9 @@
CREATE FUNCTION pg_catalog.citus_calculate_gpid(nodeid integer,
pid integer)
RETURNS BIGINT
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_calculate_gpid$$;
COMMENT ON FUNCTION pg_catalog.citus_calculate_gpid(nodeid integer, pid integer)
IS 'calculate gpid of a backend running on any node';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_calculate_gpid(integer, integer) TO PUBLIC;

View File

@ -0,0 +1,9 @@
CREATE FUNCTION pg_catalog.citus_calculate_gpid(nodeid integer,
pid integer)
RETURNS BIGINT
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_calculate_gpid$$;
COMMENT ON FUNCTION pg_catalog.citus_calculate_gpid(nodeid integer, pid integer)
IS 'calculate gpid of a backend running on any node';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_calculate_gpid(integer, integer) TO PUBLIC;

View File

@ -82,6 +82,7 @@ typedef struct BackendManagementShmemData
static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc
tupleDescriptor);
static bool UserHasPermissionToViewStatsOf(Oid currentUserId, Oid backendOwnedId);
static uint64 CalculateGlobalPID(int32 nodeId, pid_t pid);
static uint64 GenerateGlobalPID(void);
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
@ -97,6 +98,8 @@ PG_FUNCTION_INFO_V1(assign_distributed_transaction_id);
PG_FUNCTION_INFO_V1(get_current_transaction_id);
PG_FUNCTION_INFO_V1(get_global_active_transactions);
PG_FUNCTION_INFO_V1(get_all_active_transactions);
PG_FUNCTION_INFO_V1(citus_calculate_gpid);
PG_FUNCTION_INFO_V1(citus_backend_gpid);
/*
@ -880,10 +883,27 @@ GetGlobalPID(void)
/*
* GenerateGlobalPID generates the global process id for the current backend.
* citus_calculate_gpid calculates the gpid for any given process on any
* given node.
*/
Datum
citus_calculate_gpid(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
int32 nodeId = PG_GETARG_INT32(0);
int32 pid = PG_GETARG_INT32(1);
PG_RETURN_UINT64(CalculateGlobalPID(nodeId, pid));
}
/*
* CalculateGlobalPID gets a nodeId and pid, and returns the global pid
* that can be assigned for a process with the given input.
*/
static uint64
GenerateGlobalPID(void)
CalculateGlobalPID(int32 nodeId, pid_t pid)
{
/*
* We try to create a human readable global pid that consists of node id and process id.
@ -894,7 +914,31 @@ GenerateGlobalPID(void)
* node ids might cause overflow. But even for the applications that scale around 50 nodes every
* day it'd take about 100K years. So we are not worried.
*/
return (((uint64) GetLocalNodeId()) * GLOBAL_PID_NODE_ID_MULTIPLIER) + getpid();
return (((uint64) nodeId) * GLOBAL_PID_NODE_ID_MULTIPLIER) + pid;
}
/*
* GenerateGlobalPID generates the global process id for the current backend.
* See CalculateGlobalPID for the details.
*/
static uint64
GenerateGlobalPID(void)
{
return CalculateGlobalPID(GetLocalNodeId(), getpid());
}
/*
* citus_backend_gpid similar to pg_backend_pid, but returns Citus
* assigned gpid.
*/
Datum
citus_backend_gpid(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
PG_RETURN_UINT64(GetGlobalPID());
}

View File

@ -45,7 +45,7 @@ SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user');
(2 rows)
RESET client_min_messages;
SET ROLE global_cancel_user;
\c - global_cancel_user - :master_port
SELECT pg_typeof(:maintenance_daemon_gpid);
pg_typeof
---------------------------------------------------------------------
@ -58,7 +58,10 @@ CONTEXT: while executing command on localhost:xxxxx
SELECT pg_terminate_backend(:maintenance_daemon_gpid);
ERROR: must be a superuser to terminate superuser process
CONTEXT: while executing command on localhost:xxxxx
RESET ROLE;
-- we can cancel our own backend
SELECT pg_cancel_backend(citus_backend_gpid());
ERROR: canceling statement due to user request
\c - postgres - :master_port
SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset
SET client_min_messages TO DEBUG;
-- 10000000000 is the node id multiplier for global pid
@ -79,5 +82,11 @@ DETAIL: from localhost:xxxxx
(1 row)
RESET client_min_messages;
SELECT citus_backend_gpid() = citus_calculate_gpid(:coordinator_node_id, pg_backend_pid());
?column?
---------------------------------------------------------------------
t
(1 row)
DROP SCHEMA global_cancel CASCADE;
NOTICE: drop cascades to table dist_table
NOTICE: drop cascades to table global_cancel.dist_table

View File

@ -1009,6 +1009,8 @@ SELECT * FROM multi_extension.print_extension_changes();
function master_apply_delete_command(text) integer |
function master_get_table_metadata(text) record |
function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean) SETOF record |
| function citus_backend_gpid() bigint
| function citus_calculate_gpid(integer,integer) bigint
| function citus_check_cluster_node_health() SETOF record
| function citus_check_connection_to_node(text,integer) boolean
| function citus_disable_node(text,integer,boolean) void
@ -1031,7 +1033,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| function worker_drop_shell_table(text) void
| function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) SETOF record
| view citus_stat_activity
(28 rows)
(30 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -37,7 +37,9 @@ ORDER BY 1;
function citus_add_node(text,integer,integer,noderole,name)
function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real)
function citus_add_secondary_node(text,integer,text,integer,name)
function citus_backend_gpid()
function citus_blocking_pids(integer)
function citus_calculate_gpid(integer,integer)
function citus_check_cluster_node_health()
function citus_check_connection_to_node(text,integer)
function citus_cleanup_orphaned_shards()
@ -278,5 +280,5 @@ ORDER BY 1;
view citus_worker_stat_activity
view pg_dist_shard_placement
view time_partitions
(262 rows)
(264 rows)

View File

@ -29,14 +29,17 @@ CREATE USER global_cancel_user;
SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user');
RESET client_min_messages;
SET ROLE global_cancel_user;
\c - global_cancel_user - :master_port
SELECT pg_typeof(:maintenance_daemon_gpid);
SELECT pg_cancel_backend(:maintenance_daemon_gpid);
SELECT pg_terminate_backend(:maintenance_daemon_gpid);
RESET ROLE;
-- we can cancel our own backend
SELECT pg_cancel_backend(citus_backend_gpid());
\c - postgres - :master_port
SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset
@ -48,4 +51,6 @@ SELECT pg_terminate_backend(10000000000 * :coordinator_node_id + 0);
RESET client_min_messages;
SELECT citus_backend_gpid() = citus_calculate_gpid(:coordinator_node_id, pg_backend_pid());
DROP SCHEMA global_cancel CASCADE;