From cbe94d26d5c92ba1fce38c52aae064e49cc7ac7b Mon Sep 17 00:00:00 2001 From: Halil Ozan Akgul Date: Wed, 23 Feb 2022 11:35:50 +0300 Subject: [PATCH] Introduces extract_node_id_from_global_pid UDF --- .../distributed/sql/citus--10.2-4--11.0-1.sql | 2 ++ .../sql/downgrades/citus--11.0-1--10.2-4.sql | 2 ++ .../extract_node_id_from_global_pid/11.0-1.sql | 7 +++++++ .../extract_node_id_from_global_pid/latest.sql | 7 +++++++ .../distributed/transaction/backend_data.c | 17 +++++++++++++++++ src/test/regress/expected/multi_extension.out | 3 ++- .../expected/upgrade_list_citus_objects.out | 3 ++- 7 files changed, 39 insertions(+), 2 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/extract_node_id_from_global_pid/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/extract_node_id_from_global_pid/latest.sql diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index 849b28761..060437765 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -79,3 +79,5 @@ BEGIN UPDATE pg_dist_node_metadata SET metadata=jsonb_set(metadata, '{partitioned_citus_table_exists_pre_11}', to_jsonb(partitioned_table_exists), true); END; $$; + +#include "udfs/extract_node_id_from_global_pid/11.0-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index ba13b134a..10764138c 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -343,3 +343,5 @@ ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; RESET search_path; + +DROP FUNCTION IF EXISTS pg_catalog.extract_node_id_from_global_pid; diff --git a/src/backend/distributed/sql/udfs/extract_node_id_from_global_pid/11.0-1.sql b/src/backend/distributed/sql/udfs/extract_node_id_from_global_pid/11.0-1.sql new file mode 100644 index 000000000..1c6cd8a99 --- /dev/null +++ b/src/backend/distributed/sql/udfs/extract_node_id_from_global_pid/11.0-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.extract_node_id_from_global_pid(global_pid bigint) + RETURNS INTEGER + LANGUAGE C +AS 'MODULE_PATHNAME', $$extract_node_id_from_global_pid$$; + +COMMENT ON FUNCTION pg_catalog.extract_node_id_from_global_pid(global_pid bigint) + IS 'returns the originator node id for the query with the given global pid'; diff --git a/src/backend/distributed/sql/udfs/extract_node_id_from_global_pid/latest.sql b/src/backend/distributed/sql/udfs/extract_node_id_from_global_pid/latest.sql new file mode 100644 index 000000000..1c6cd8a99 --- /dev/null +++ b/src/backend/distributed/sql/udfs/extract_node_id_from_global_pid/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.extract_node_id_from_global_pid(global_pid bigint) + RETURNS INTEGER + LANGUAGE C +AS 'MODULE_PATHNAME', $$extract_node_id_from_global_pid$$; + +COMMENT ON FUNCTION pg_catalog.extract_node_id_from_global_pid(global_pid bigint) + IS 'returns the originator node id for the query with the given global pid'; diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 5ca517199..9718a7b23 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -98,6 +98,7 @@ PG_FUNCTION_INFO_V1(assign_distributed_transaction_id); PG_FUNCTION_INFO_V1(get_current_transaction_id); PG_FUNCTION_INFO_V1(get_global_active_transactions); PG_FUNCTION_INFO_V1(get_all_active_transactions); +PG_FUNCTION_INFO_V1(extract_node_id_from_global_pid); /* @@ -351,6 +352,22 @@ get_all_active_transactions(PG_FUNCTION_ARGS) } +/* + * extract_node_id_from_global_pid returns the originator node if for the given global pid + */ +Datum +extract_node_id_from_global_pid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + uint64 globalPID = PG_GETARG_INT64(0); + + int nodeId = ExtractNodeIdFromGlobalPID(globalPID); + + PG_RETURN_INT32(nodeId); +} + + /* * StoreAllActiveTransactions gets active transaction from the local node and inserts * them into the given tuplestore. diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index e5cbcd994..a6f320dda 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1018,12 +1018,13 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_shard_indexes_on_worker() SETOF record | function citus_shards_on_worker() SETOF record | function create_distributed_function(regprocedure,text,text,boolean) void + | function extract_node_id_from_global_pid(bigint) integer | function pg_cancel_backend(bigint) boolean | function pg_terminate_backend(bigint,bigint) boolean | function worker_create_or_replace_object(text[]) boolean | function worker_drop_sequence_dependency(text) void | function worker_drop_shell_table(text) void -(20 rows) +(21 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 185bf19c5..f67bd1476 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -131,6 +131,7 @@ ORDER BY 1; function drop_old_time_partitions(regclass,timestamp with time zone) function dump_global_wait_edges() function dump_local_wait_edges() + function extract_node_id_from_global_pid(bigint) function fetch_intermediate_results(text[],text,integer) function fix_all_partition_shard_index_names() function fix_partition_shard_index_names(regclass) @@ -272,5 +273,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(256 rows) +(257 rows)