diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 214fb2dec..e857e1dda 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -126,6 +126,7 @@ static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum val char *field); static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); static void RemoveOldShardPlacementForNodeGroup(int groupId); +static int FindCoordinatorNodeId(void); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_set_coordinator_host); @@ -148,6 +149,7 @@ PG_FUNCTION_INFO_V1(master_update_node); PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column); PG_FUNCTION_INFO_V1(citus_nodename_for_nodeid); PG_FUNCTION_INFO_V1(citus_nodeport_for_nodeid); +PG_FUNCTION_INFO_V1(citus_coordinator_nodeid); /* @@ -1537,6 +1539,25 @@ citus_nodeport_for_nodeid(PG_FUNCTION_ARGS) } +/* + * citus_coordinator_nodeid returns the node id of the coordinator node + */ +Datum +citus_coordinator_nodeid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + int coordinatorNodeId = FindCoordinatorNodeId(); + + if (coordinatorNodeId == -1) + { + PG_RETURN_INT32(0); + } + + PG_RETURN_INT32(coordinatorNodeId); +} + + /* * FindWorkerNode searches over the worker nodes and returns the workerNode * if it already exists. Else, the function returns NULL. @@ -1635,6 +1656,28 @@ FindNodeWithNodeId(int nodeId, bool missingOk) } +/* + * FindCoordinatorNodeId returns the node id of the coordinator node + */ +static int +FindCoordinatorNodeId() +{ + bool includeNodesFromOtherClusters = false; + List *nodeList = ReadDistNode(includeNodesFromOtherClusters); + WorkerNode *node = NULL; + + foreach_ptr(node, nodeList) + { + if (NodeIsCoordinator(node)) + { + return node->nodeId; + } + } + + return -1; +} + + /* * ReadDistNode iterates over pg_dist_node table, converts each row * into it's memory representation (i.e., WorkerNode) and adds them into diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index f341976c9..688cfad8c 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -103,3 +103,5 @@ GRANT SELECT ON pg_catalog.pg_dist_object TO public; #include "udfs/citus_nodeid_for_gpid/11.0-1.sql" #include "udfs/citus_pid_for_gpid/11.0-1.sql" + +#include "udfs/citus_coordinator_nodeid/11.0-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index 19d5bb22d..b7f4018d9 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -370,3 +370,5 @@ DROP FUNCTION pg_catalog.citus_nodeport_for_nodeid(integer); DROP FUNCTION pg_catalog.citus_nodeid_for_gpid(bigint); DROP FUNCTION pg_catalog.citus_pid_for_gpid(bigint); + +DROP FUNCTION pg_catalog.citus_coordinator_nodeid(); diff --git a/src/backend/distributed/sql/udfs/citus_coordinator_nodeid/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_coordinator_nodeid/11.0-1.sql new file mode 100644 index 000000000..2bf0dd250 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_coordinator_nodeid/11.0-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_coordinator_nodeid() + RETURNS integer + LANGUAGE C STABLE STRICT + AS 'MODULE_PATHNAME', $$citus_coordinator_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_coordinator_nodeid() + IS 'returns node id of the coordinator node'; diff --git a/src/backend/distributed/sql/udfs/citus_coordinator_nodeid/latest.sql b/src/backend/distributed/sql/udfs/citus_coordinator_nodeid/latest.sql new file mode 100644 index 000000000..2bf0dd250 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_coordinator_nodeid/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_coordinator_nodeid() + RETURNS integer + LANGUAGE C STABLE STRICT + AS 'MODULE_PATHNAME', $$citus_coordinator_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_coordinator_nodeid() + IS 'returns node id of the coordinator node'; diff --git a/src/test/regress/expected/global_cancel.out b/src/test/regress/expected/global_cancel.out index ede4bc7e3..6a206b0ad 100644 --- a/src/test/regress/expected/global_cancel.out +++ b/src/test/regress/expected/global_cancel.out @@ -62,10 +62,9 @@ CONTEXT: while executing command on localhost:xxxxx SELECT pg_cancel_backend(citus_backend_gpid()); ERROR: canceling statement due to user request \c - postgres - :master_port -SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset SET client_min_messages TO DEBUG; -- 10000000000 is the node id multiplier for global pid -SELECT pg_cancel_backend(10000000000 * :coordinator_node_id + 0); +SELECT pg_cancel_backend(10000000000 * citus_coordinator_nodeid() + 0); DEBUG: PID 0 is not a PostgreSQL server process DETAIL: from localhost:xxxxx pg_cancel_backend @@ -73,7 +72,7 @@ DETAIL: from localhost:xxxxx f (1 row) -SELECT pg_terminate_backend(10000000000 * :coordinator_node_id + 0); +SELECT pg_terminate_backend(10000000000 * citus_coordinator_nodeid() + 0); DEBUG: PID 0 is not a PostgreSQL server process DETAIL: from localhost:xxxxx pg_terminate_backend @@ -82,7 +81,7 @@ DETAIL: from localhost:xxxxx (1 row) RESET client_min_messages; -SELECT citus_backend_gpid() = citus_calculate_gpid(:coordinator_node_id, pg_backend_pid()); +SELECT citus_backend_gpid() = citus_calculate_gpid(citus_coordinator_nodeid(), pg_backend_pid()); ?column? --------------------------------------------------------------------- t diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 31f79c83a..2e23f6c63 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1002,6 +1002,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_calculate_gpid(integer,integer) bigint | function citus_check_cluster_node_health() SETOF record | function citus_check_connection_to_node(text,integer) boolean + | function citus_coordinator_nodeid() integer | function citus_disable_node(text,integer,boolean) void | function citus_finalize_upgrade_to_citus11(boolean) boolean | function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid) void @@ -1028,7 +1029,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) SETOF record | table pg_dist_object | view citus_stat_activity -(40 rows) +(41 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index afd30523c..f84553317 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -44,6 +44,7 @@ ORDER BY 1; function citus_check_connection_to_node(text,integer) function citus_cleanup_orphaned_shards() function citus_conninfo_cache_invalidate() + function citus_coordinator_nodeid() function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode) function citus_create_restore_point(text) function citus_disable_node(text,integer,boolean) @@ -282,5 +283,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(266 rows) +(267 rows) diff --git a/src/test/regress/sql/global_cancel.sql b/src/test/regress/sql/global_cancel.sql index 6c4341877..6dc58aa2d 100644 --- a/src/test/regress/sql/global_cancel.sql +++ b/src/test/regress/sql/global_cancel.sql @@ -41,17 +41,15 @@ SELECT pg_cancel_backend(citus_backend_gpid()); \c - postgres - :master_port -SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset - SET client_min_messages TO DEBUG; -- 10000000000 is the node id multiplier for global pid -SELECT pg_cancel_backend(10000000000 * :coordinator_node_id + 0); -SELECT pg_terminate_backend(10000000000 * :coordinator_node_id + 0); +SELECT pg_cancel_backend(10000000000 * citus_coordinator_nodeid() + 0); +SELECT pg_terminate_backend(10000000000 * citus_coordinator_nodeid() + 0); RESET client_min_messages; -SELECT citus_backend_gpid() = citus_calculate_gpid(:coordinator_node_id, pg_backend_pid()); +SELECT citus_backend_gpid() = citus_calculate_gpid(citus_coordinator_nodeid(), pg_backend_pid()); SELECT nodename = citus_nodename_for_nodeid(nodeid) AND nodeport = citus_nodeport_for_nodeid(nodeid) FROM pg_dist_node