diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index e39b7546b..0f7b57d05 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -26,6 +26,8 @@ #include "udfs/worker_create_or_replace_object/11.0-1.sql" #include "udfs/citus_isolation_test_session_is_blocked/11.0-1.sql" #include "udfs/citus_blocking_pids/11.0-1.sql" +#include "udfs/citus_calculate_gpid/11.0-1.sql" +#include "udfs/citus_backend_gpid/11.0-1.sql" CREATE VIEW citus.citus_worker_stat_activity AS SELECT * FROM pg_catalog.citus_worker_stat_activity(); diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index a14969d24..a3c992606 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -353,6 +353,8 @@ ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; DROP FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool); +DROP FUNCTION pg_catalog.citus_calculate_gpid(integer,integer); +DROP FUNCTION pg_catalog.citus_backend_gpid(); RESET search_path; diff --git a/src/backend/distributed/sql/udfs/citus_backend_gpid/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_backend_gpid/11.0-1.sql new file mode 100644 index 000000000..fd7222c04 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_backend_gpid/11.0-1.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.citus_backend_gpid() + RETURNS BIGINT + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$citus_backend_gpid$$; +COMMENT ON FUNCTION pg_catalog.citus_backend_gpid() + IS 'returns gpid of the current backend'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_backend_gpid() TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_backend_gpid/latest.sql b/src/backend/distributed/sql/udfs/citus_backend_gpid/latest.sql new file mode 100644 index 000000000..fd7222c04 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_backend_gpid/latest.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.citus_backend_gpid() + RETURNS BIGINT + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$citus_backend_gpid$$; +COMMENT ON FUNCTION pg_catalog.citus_backend_gpid() + IS 'returns gpid of the current backend'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_backend_gpid() TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_calculate_gpid/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_calculate_gpid/11.0-1.sql new file mode 100644 index 000000000..a5eecc7ae --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_calculate_gpid/11.0-1.sql @@ -0,0 +1,9 @@ +CREATE FUNCTION pg_catalog.citus_calculate_gpid(nodeid integer, + pid integer) + RETURNS BIGINT + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$citus_calculate_gpid$$; +COMMENT ON FUNCTION pg_catalog.citus_calculate_gpid(nodeid integer, pid integer) + IS 'calculate gpid of a backend running on any node'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_calculate_gpid(integer, integer) TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_calculate_gpid/latest.sql b/src/backend/distributed/sql/udfs/citus_calculate_gpid/latest.sql new file mode 100644 index 000000000..a5eecc7ae --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_calculate_gpid/latest.sql @@ -0,0 +1,9 @@ +CREATE FUNCTION pg_catalog.citus_calculate_gpid(nodeid integer, + pid integer) + RETURNS BIGINT + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$citus_calculate_gpid$$; +COMMENT ON FUNCTION pg_catalog.citus_calculate_gpid(nodeid integer, pid integer) + IS 'calculate gpid of a backend running on any node'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_calculate_gpid(integer, integer) TO PUBLIC; diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index f19fd9350..7cfe1fe51 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -82,6 +82,7 @@ typedef struct BackendManagementShmemData static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); static bool UserHasPermissionToViewStatsOf(Oid currentUserId, Oid backendOwnedId); +static uint64 CalculateGlobalPID(int32 nodeId, pid_t pid); static uint64 GenerateGlobalPID(void); static shmem_startup_hook_type prev_shmem_startup_hook = NULL; @@ -97,6 +98,8 @@ PG_FUNCTION_INFO_V1(assign_distributed_transaction_id); PG_FUNCTION_INFO_V1(get_current_transaction_id); PG_FUNCTION_INFO_V1(get_global_active_transactions); PG_FUNCTION_INFO_V1(get_all_active_transactions); +PG_FUNCTION_INFO_V1(citus_calculate_gpid); +PG_FUNCTION_INFO_V1(citus_backend_gpid); /* @@ -880,10 +883,27 @@ GetGlobalPID(void) /* - * GenerateGlobalPID generates the global process id for the current backend. + * citus_calculate_gpid calculates the gpid for any given process on any + * given node. + */ +Datum +citus_calculate_gpid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + int32 nodeId = PG_GETARG_INT32(0); + int32 pid = PG_GETARG_INT32(1); + + PG_RETURN_UINT64(CalculateGlobalPID(nodeId, pid)); +} + + +/* + * CalculateGlobalPID gets a nodeId and pid, and returns the global pid + * that can be assigned for a process with the given input. */ static uint64 -GenerateGlobalPID(void) +CalculateGlobalPID(int32 nodeId, pid_t pid) { /* * We try to create a human readable global pid that consists of node id and process id. @@ -894,7 +914,31 @@ GenerateGlobalPID(void) * node ids might cause overflow. But even for the applications that scale around 50 nodes every * day it'd take about 100K years. So we are not worried. */ - return (((uint64) GetLocalNodeId()) * GLOBAL_PID_NODE_ID_MULTIPLIER) + getpid(); + return (((uint64) nodeId) * GLOBAL_PID_NODE_ID_MULTIPLIER) + pid; +} + + +/* + * GenerateGlobalPID generates the global process id for the current backend. + * See CalculateGlobalPID for the details. + */ +static uint64 +GenerateGlobalPID(void) +{ + return CalculateGlobalPID(GetLocalNodeId(), getpid()); +} + + +/* + * citus_backend_gpid similar to pg_backend_pid, but returns Citus + * assigned gpid. + */ +Datum +citus_backend_gpid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + PG_RETURN_UINT64(GetGlobalPID()); } diff --git a/src/test/regress/expected/global_cancel.out b/src/test/regress/expected/global_cancel.out index 5ebc4098d..e8c016c56 100644 --- a/src/test/regress/expected/global_cancel.out +++ b/src/test/regress/expected/global_cancel.out @@ -45,7 +45,7 @@ SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user'); (2 rows) RESET client_min_messages; -SET ROLE global_cancel_user; +\c - global_cancel_user - :master_port SELECT pg_typeof(:maintenance_daemon_gpid); pg_typeof --------------------------------------------------------------------- @@ -58,7 +58,10 @@ CONTEXT: while executing command on localhost:xxxxx SELECT pg_terminate_backend(:maintenance_daemon_gpid); ERROR: must be a superuser to terminate superuser process CONTEXT: while executing command on localhost:xxxxx -RESET ROLE; +-- we can cancel our own backend +SELECT pg_cancel_backend(citus_backend_gpid()); +ERROR: canceling statement due to user request +\c - postgres - :master_port SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset SET client_min_messages TO DEBUG; -- 10000000000 is the node id multiplier for global pid @@ -79,5 +82,11 @@ DETAIL: from localhost:xxxxx (1 row) RESET client_min_messages; +SELECT citus_backend_gpid() = citus_calculate_gpid(:coordinator_node_id, pg_backend_pid()); + ?column? +--------------------------------------------------------------------- + t +(1 row) + DROP SCHEMA global_cancel CASCADE; -NOTICE: drop cascades to table dist_table +NOTICE: drop cascades to table global_cancel.dist_table diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index aa9960bbe..c43299e95 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1007,6 +1007,8 @@ SELECT * FROM multi_extension.print_extension_changes(); function master_apply_delete_command(text) integer | function master_get_table_metadata(text) record | function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean) SETOF record | + | function citus_backend_gpid() bigint + | function citus_calculate_gpid(integer,integer) bigint | function citus_check_cluster_node_health() SETOF record | function citus_check_connection_to_node(text,integer) boolean | function citus_disable_node(text,integer,boolean) void @@ -1029,7 +1031,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | function worker_drop_shell_table(text) void | function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) SETOF record | view citus_stat_activity -(28 rows) +(30 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 6adf0e990..a340b594b 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -37,7 +37,9 @@ ORDER BY 1; function citus_add_node(text,integer,integer,noderole,name) function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) function citus_add_secondary_node(text,integer,text,integer,name) + function citus_backend_gpid() function citus_blocking_pids(integer) + function citus_calculate_gpid(integer,integer) function citus_check_cluster_node_health() function citus_check_connection_to_node(text,integer) function citus_cleanup_orphaned_shards() @@ -278,5 +280,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(262 rows) +(264 rows) diff --git a/src/test/regress/sql/global_cancel.sql b/src/test/regress/sql/global_cancel.sql index edf380771..13371b2d0 100644 --- a/src/test/regress/sql/global_cancel.sql +++ b/src/test/regress/sql/global_cancel.sql @@ -29,14 +29,17 @@ CREATE USER global_cancel_user; SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user'); RESET client_min_messages; -SET ROLE global_cancel_user; +\c - global_cancel_user - :master_port SELECT pg_typeof(:maintenance_daemon_gpid); SELECT pg_cancel_backend(:maintenance_daemon_gpid); SELECT pg_terminate_backend(:maintenance_daemon_gpid); -RESET ROLE; +-- we can cancel our own backend +SELECT pg_cancel_backend(citus_backend_gpid()); + +\c - postgres - :master_port SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset @@ -48,4 +51,6 @@ SELECT pg_terminate_backend(10000000000 * :coordinator_node_id + 0); RESET client_min_messages; +SELECT citus_backend_gpid() = citus_calculate_gpid(:coordinator_node_id, pg_backend_pid()); + DROP SCHEMA global_cancel CASCADE;