Merge pull request #5755 from citusdata/calculate_gpid

Introduce citus_calculate_gpid and citus_backend_gpid
pull/5760/head
Önder Kalacı 2022-03-04 11:46:36 +01:00 committed by GitHub
commit 28443aee0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 110 additions and 10 deletions

View File

@ -26,6 +26,8 @@
#include "udfs/worker_create_or_replace_object/11.0-1.sql" #include "udfs/worker_create_or_replace_object/11.0-1.sql"
#include "udfs/citus_isolation_test_session_is_blocked/11.0-1.sql" #include "udfs/citus_isolation_test_session_is_blocked/11.0-1.sql"
#include "udfs/citus_blocking_pids/11.0-1.sql" #include "udfs/citus_blocking_pids/11.0-1.sql"
#include "udfs/citus_calculate_gpid/11.0-1.sql"
#include "udfs/citus_backend_gpid/11.0-1.sql"
CREATE VIEW citus.citus_worker_stat_activity AS CREATE VIEW citus.citus_worker_stat_activity AS
SELECT * FROM pg_catalog.citus_worker_stat_activity(); SELECT * FROM pg_catalog.citus_worker_stat_activity();

View File

@ -353,6 +353,8 @@ ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC;
DROP FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool); DROP FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool);
DROP FUNCTION pg_catalog.citus_calculate_gpid(integer,integer);
DROP FUNCTION pg_catalog.citus_backend_gpid();
RESET search_path; RESET search_path;

View File

@ -0,0 +1,8 @@
CREATE FUNCTION pg_catalog.citus_backend_gpid()
RETURNS BIGINT
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_backend_gpid$$;
COMMENT ON FUNCTION pg_catalog.citus_backend_gpid()
IS 'returns gpid of the current backend';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_backend_gpid() TO PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE FUNCTION pg_catalog.citus_backend_gpid()
RETURNS BIGINT
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_backend_gpid$$;
COMMENT ON FUNCTION pg_catalog.citus_backend_gpid()
IS 'returns gpid of the current backend';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_backend_gpid() TO PUBLIC;

View File

@ -0,0 +1,9 @@
CREATE FUNCTION pg_catalog.citus_calculate_gpid(nodeid integer,
pid integer)
RETURNS BIGINT
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_calculate_gpid$$;
COMMENT ON FUNCTION pg_catalog.citus_calculate_gpid(nodeid integer, pid integer)
IS 'calculate gpid of a backend running on any node';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_calculate_gpid(integer, integer) TO PUBLIC;

View File

@ -0,0 +1,9 @@
CREATE FUNCTION pg_catalog.citus_calculate_gpid(nodeid integer,
pid integer)
RETURNS BIGINT
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_calculate_gpid$$;
COMMENT ON FUNCTION pg_catalog.citus_calculate_gpid(nodeid integer, pid integer)
IS 'calculate gpid of a backend running on any node';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_calculate_gpid(integer, integer) TO PUBLIC;

View File

@ -82,6 +82,7 @@ typedef struct BackendManagementShmemData
static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc
tupleDescriptor); tupleDescriptor);
static bool UserHasPermissionToViewStatsOf(Oid currentUserId, Oid backendOwnedId); static bool UserHasPermissionToViewStatsOf(Oid currentUserId, Oid backendOwnedId);
static uint64 CalculateGlobalPID(int32 nodeId, pid_t pid);
static uint64 GenerateGlobalPID(void); static uint64 GenerateGlobalPID(void);
static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
@ -97,6 +98,8 @@ PG_FUNCTION_INFO_V1(assign_distributed_transaction_id);
PG_FUNCTION_INFO_V1(get_current_transaction_id); PG_FUNCTION_INFO_V1(get_current_transaction_id);
PG_FUNCTION_INFO_V1(get_global_active_transactions); PG_FUNCTION_INFO_V1(get_global_active_transactions);
PG_FUNCTION_INFO_V1(get_all_active_transactions); PG_FUNCTION_INFO_V1(get_all_active_transactions);
PG_FUNCTION_INFO_V1(citus_calculate_gpid);
PG_FUNCTION_INFO_V1(citus_backend_gpid);
/* /*
@ -880,10 +883,27 @@ GetGlobalPID(void)
/* /*
* GenerateGlobalPID generates the global process id for the current backend. * citus_calculate_gpid calculates the gpid for any given process on any
* given node.
*/
Datum
citus_calculate_gpid(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
int32 nodeId = PG_GETARG_INT32(0);
int32 pid = PG_GETARG_INT32(1);
PG_RETURN_UINT64(CalculateGlobalPID(nodeId, pid));
}
/*
* CalculateGlobalPID gets a nodeId and pid, and returns the global pid
* that can be assigned for a process with the given input.
*/ */
static uint64 static uint64
GenerateGlobalPID(void) CalculateGlobalPID(int32 nodeId, pid_t pid)
{ {
/* /*
* We try to create a human readable global pid that consists of node id and process id. * We try to create a human readable global pid that consists of node id and process id.
@ -894,7 +914,31 @@ GenerateGlobalPID(void)
* node ids might cause overflow. But even for the applications that scale around 50 nodes every * node ids might cause overflow. But even for the applications that scale around 50 nodes every
* day it'd take about 100K years. So we are not worried. * day it'd take about 100K years. So we are not worried.
*/ */
return (((uint64) GetLocalNodeId()) * GLOBAL_PID_NODE_ID_MULTIPLIER) + getpid(); return (((uint64) nodeId) * GLOBAL_PID_NODE_ID_MULTIPLIER) + pid;
}
/*
* GenerateGlobalPID generates the global process id for the current backend.
* See CalculateGlobalPID for the details.
*/
static uint64
GenerateGlobalPID(void)
{
return CalculateGlobalPID(GetLocalNodeId(), getpid());
}
/*
* citus_backend_gpid similar to pg_backend_pid, but returns Citus
* assigned gpid.
*/
Datum
citus_backend_gpid(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
PG_RETURN_UINT64(GetGlobalPID());
} }

View File

@ -45,7 +45,7 @@ SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user');
(2 rows) (2 rows)
RESET client_min_messages; RESET client_min_messages;
SET ROLE global_cancel_user; \c - global_cancel_user - :master_port
SELECT pg_typeof(:maintenance_daemon_gpid); SELECT pg_typeof(:maintenance_daemon_gpid);
pg_typeof pg_typeof
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -58,7 +58,10 @@ CONTEXT: while executing command on localhost:xxxxx
SELECT pg_terminate_backend(:maintenance_daemon_gpid); SELECT pg_terminate_backend(:maintenance_daemon_gpid);
ERROR: must be a superuser to terminate superuser process ERROR: must be a superuser to terminate superuser process
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
RESET ROLE; -- we can cancel our own backend
SELECT pg_cancel_backend(citus_backend_gpid());
ERROR: canceling statement due to user request
\c - postgres - :master_port
SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset
SET client_min_messages TO DEBUG; SET client_min_messages TO DEBUG;
-- 10000000000 is the node id multiplier for global pid -- 10000000000 is the node id multiplier for global pid
@ -79,5 +82,11 @@ DETAIL: from localhost:xxxxx
(1 row) (1 row)
RESET client_min_messages; RESET client_min_messages;
SELECT citus_backend_gpid() = citus_calculate_gpid(:coordinator_node_id, pg_backend_pid());
?column?
---------------------------------------------------------------------
t
(1 row)
DROP SCHEMA global_cancel CASCADE; DROP SCHEMA global_cancel CASCADE;
NOTICE: drop cascades to table dist_table NOTICE: drop cascades to table global_cancel.dist_table

View File

@ -1007,6 +1007,8 @@ SELECT * FROM multi_extension.print_extension_changes();
function master_apply_delete_command(text) integer | function master_apply_delete_command(text) integer |
function master_get_table_metadata(text) record | function master_get_table_metadata(text) record |
function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean) SETOF record | function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean) SETOF record |
| function citus_backend_gpid() bigint
| function citus_calculate_gpid(integer,integer) bigint
| function citus_check_cluster_node_health() SETOF record | function citus_check_cluster_node_health() SETOF record
| function citus_check_connection_to_node(text,integer) boolean | function citus_check_connection_to_node(text,integer) boolean
| function citus_disable_node(text,integer,boolean) void | function citus_disable_node(text,integer,boolean) void
@ -1029,7 +1031,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| function worker_drop_shell_table(text) void | function worker_drop_shell_table(text) void
| function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) SETOF record | function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) SETOF record
| view citus_stat_activity | view citus_stat_activity
(28 rows) (30 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version -- show running version

View File

@ -37,7 +37,9 @@ ORDER BY 1;
function citus_add_node(text,integer,integer,noderole,name) function citus_add_node(text,integer,integer,noderole,name)
function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real)
function citus_add_secondary_node(text,integer,text,integer,name) function citus_add_secondary_node(text,integer,text,integer,name)
function citus_backend_gpid()
function citus_blocking_pids(integer) function citus_blocking_pids(integer)
function citus_calculate_gpid(integer,integer)
function citus_check_cluster_node_health() function citus_check_cluster_node_health()
function citus_check_connection_to_node(text,integer) function citus_check_connection_to_node(text,integer)
function citus_cleanup_orphaned_shards() function citus_cleanup_orphaned_shards()
@ -278,5 +280,5 @@ ORDER BY 1;
view citus_worker_stat_activity view citus_worker_stat_activity
view pg_dist_shard_placement view pg_dist_shard_placement
view time_partitions view time_partitions
(262 rows) (264 rows)

View File

@ -29,14 +29,17 @@ CREATE USER global_cancel_user;
SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user'); SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user');
RESET client_min_messages; RESET client_min_messages;
SET ROLE global_cancel_user; \c - global_cancel_user - :master_port
SELECT pg_typeof(:maintenance_daemon_gpid); SELECT pg_typeof(:maintenance_daemon_gpid);
SELECT pg_cancel_backend(:maintenance_daemon_gpid); SELECT pg_cancel_backend(:maintenance_daemon_gpid);
SELECT pg_terminate_backend(:maintenance_daemon_gpid); SELECT pg_terminate_backend(:maintenance_daemon_gpid);
RESET ROLE; -- we can cancel our own backend
SELECT pg_cancel_backend(citus_backend_gpid());
\c - postgres - :master_port
SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset
@ -48,4 +51,6 @@ SELECT pg_terminate_backend(10000000000 * :coordinator_node_id + 0);
RESET client_min_messages; RESET client_min_messages;
SELECT citus_backend_gpid() = citus_calculate_gpid(:coordinator_node_id, pg_backend_pid());
DROP SCHEMA global_cancel CASCADE; DROP SCHEMA global_cancel CASCADE;