Introduces citus_coordinator_node_id

pull/5823/head
Halil Ozan Akgul 2022-03-18 13:55:12 +03:00
parent 9f204600af
commit 4dbc760603
9 changed files with 71 additions and 11 deletions

View File

@ -126,6 +126,7 @@ static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum val
char *field);
static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards);
static void RemoveOldShardPlacementForNodeGroup(int groupId);
static int FindCoordinatorNodeId(void);
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(citus_set_coordinator_host);
@ -148,6 +149,7 @@ PG_FUNCTION_INFO_V1(master_update_node);
PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
PG_FUNCTION_INFO_V1(citus_nodename_for_nodeid);
PG_FUNCTION_INFO_V1(citus_nodeport_for_nodeid);
PG_FUNCTION_INFO_V1(citus_coordinator_nodeid);
/*
@ -1537,6 +1539,25 @@ citus_nodeport_for_nodeid(PG_FUNCTION_ARGS)
}
/*
* citus_coordinator_nodeid returns the node id of the coordinator node
*/
Datum
citus_coordinator_nodeid(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
int coordinatorNodeId = FindCoordinatorNodeId();
if (coordinatorNodeId == -1)
{
PG_RETURN_INT32(0);
}
PG_RETURN_INT32(coordinatorNodeId);
}
/*
* FindWorkerNode searches over the worker nodes and returns the workerNode
* if it already exists. Else, the function returns NULL.
@ -1635,6 +1656,28 @@ FindNodeWithNodeId(int nodeId, bool missingOk)
}
/*
* FindCoordinatorNodeId returns the node id of the coordinator node
*/
static int
FindCoordinatorNodeId()
{
bool includeNodesFromOtherClusters = false;
List *nodeList = ReadDistNode(includeNodesFromOtherClusters);
WorkerNode *node = NULL;
foreach_ptr(node, nodeList)
{
if (NodeIsCoordinator(node))
{
return node->nodeId;
}
}
return -1;
}
/*
* ReadDistNode iterates over pg_dist_node table, converts each row
* into it's memory representation (i.e., WorkerNode) and adds them into

View File

@ -103,3 +103,5 @@ GRANT SELECT ON pg_catalog.pg_dist_object TO public;
#include "udfs/citus_nodeid_for_gpid/11.0-1.sql"
#include "udfs/citus_pid_for_gpid/11.0-1.sql"
#include "udfs/citus_coordinator_nodeid/11.0-1.sql"

View File

@ -370,3 +370,5 @@ DROP FUNCTION pg_catalog.citus_nodeport_for_nodeid(integer);
DROP FUNCTION pg_catalog.citus_nodeid_for_gpid(bigint);
DROP FUNCTION pg_catalog.citus_pid_for_gpid(bigint);
DROP FUNCTION pg_catalog.citus_coordinator_nodeid();

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_coordinator_nodeid()
RETURNS integer
LANGUAGE C STABLE STRICT
AS 'MODULE_PATHNAME', $$citus_coordinator_nodeid$$;
COMMENT ON FUNCTION pg_catalog.citus_coordinator_nodeid()
IS 'returns node id of the coordinator node';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_coordinator_nodeid()
RETURNS integer
LANGUAGE C STABLE STRICT
AS 'MODULE_PATHNAME', $$citus_coordinator_nodeid$$;
COMMENT ON FUNCTION pg_catalog.citus_coordinator_nodeid()
IS 'returns node id of the coordinator node';

View File

@ -62,10 +62,9 @@ CONTEXT: while executing command on localhost:xxxxx
SELECT pg_cancel_backend(citus_backend_gpid());
ERROR: canceling statement due to user request
\c - postgres - :master_port
SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset
SET client_min_messages TO DEBUG;
-- 10000000000 is the node id multiplier for global pid
SELECT pg_cancel_backend(10000000000 * :coordinator_node_id + 0);
SELECT pg_cancel_backend(10000000000 * citus_coordinator_nodeid() + 0);
DEBUG: PID 0 is not a PostgreSQL server process
DETAIL: from localhost:xxxxx
pg_cancel_backend
@ -73,7 +72,7 @@ DETAIL: from localhost:xxxxx
f
(1 row)
SELECT pg_terminate_backend(10000000000 * :coordinator_node_id + 0);
SELECT pg_terminate_backend(10000000000 * citus_coordinator_nodeid() + 0);
DEBUG: PID 0 is not a PostgreSQL server process
DETAIL: from localhost:xxxxx
pg_terminate_backend
@ -82,7 +81,7 @@ DETAIL: from localhost:xxxxx
(1 row)
RESET client_min_messages;
SELECT citus_backend_gpid() = citus_calculate_gpid(:coordinator_node_id, pg_backend_pid());
SELECT citus_backend_gpid() = citus_calculate_gpid(citus_coordinator_nodeid(), pg_backend_pid());
?column?
---------------------------------------------------------------------
t

View File

@ -1002,6 +1002,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_calculate_gpid(integer,integer) bigint
| function citus_check_cluster_node_health() SETOF record
| function citus_check_connection_to_node(text,integer) boolean
| function citus_coordinator_nodeid() integer
| function citus_disable_node(text,integer,boolean) void
| function citus_finalize_upgrade_to_citus11(boolean) boolean
| function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid) void
@ -1028,7 +1029,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) SETOF record
| table pg_dist_object
| view citus_stat_activity
(40 rows)
(41 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -44,6 +44,7 @@ ORDER BY 1;
function citus_check_connection_to_node(text,integer)
function citus_cleanup_orphaned_shards()
function citus_conninfo_cache_invalidate()
function citus_coordinator_nodeid()
function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode)
function citus_create_restore_point(text)
function citus_disable_node(text,integer,boolean)
@ -282,5 +283,5 @@ ORDER BY 1;
view citus_stat_statements
view pg_dist_shard_placement
view time_partitions
(266 rows)
(267 rows)

View File

@ -41,17 +41,15 @@ SELECT pg_cancel_backend(citus_backend_gpid());
\c - postgres - :master_port
SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset
SET client_min_messages TO DEBUG;
-- 10000000000 is the node id multiplier for global pid
SELECT pg_cancel_backend(10000000000 * :coordinator_node_id + 0);
SELECT pg_terminate_backend(10000000000 * :coordinator_node_id + 0);
SELECT pg_cancel_backend(10000000000 * citus_coordinator_nodeid() + 0);
SELECT pg_terminate_backend(10000000000 * citus_coordinator_nodeid() + 0);
RESET client_min_messages;
SELECT citus_backend_gpid() = citus_calculate_gpid(:coordinator_node_id, pg_backend_pid());
SELECT citus_backend_gpid() = citus_calculate_gpid(citus_coordinator_nodeid(), pg_backend_pid());
SELECT nodename = citus_nodename_for_nodeid(nodeid) AND nodeport = citus_nodeport_for_nodeid(nodeid)
FROM pg_dist_node