Add a citus_is_coordinator function

velioglu/release-11.0-2005
Marco Slot 2022-04-07 12:17:39 +02:00
parent 5e4c0e4bea
commit 33dede5b75
10 changed files with 84 additions and 3 deletions

View File

@ -150,6 +150,7 @@ PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
PG_FUNCTION_INFO_V1(citus_nodename_for_nodeid); PG_FUNCTION_INFO_V1(citus_nodename_for_nodeid);
PG_FUNCTION_INFO_V1(citus_nodeport_for_nodeid); PG_FUNCTION_INFO_V1(citus_nodeport_for_nodeid);
PG_FUNCTION_INFO_V1(citus_coordinator_nodeid); PG_FUNCTION_INFO_V1(citus_coordinator_nodeid);
PG_FUNCTION_INFO_V1(citus_is_coordinator);
/* /*
@ -1558,6 +1559,29 @@ citus_coordinator_nodeid(PG_FUNCTION_ARGS)
} }
/*
* citus_is_coordinator returns whether the current node is a coordinator.
* We consider the node a coordinator if its group ID is 0 and it has
* pg_dist_node entries (only group ID 0 could indicate a worker without
* metadata).
*/
Datum
citus_is_coordinator(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
bool isCoordinator = false;
if (GetLocalGroupId() == COORDINATOR_GROUP_ID &&
ActivePrimaryNodeCount() > 0)
{
isCoordinator = true;
}
PG_RETURN_BOOL(isCoordinator);
}
/* /*
* FindWorkerNode searches over the worker nodes and returns the workerNode * FindWorkerNode searches over the worker nodes and returns the workerNode
* if it already exists. Else, the function returns NULL. * if it already exists. Else, the function returns NULL.

View File

@ -1,2 +1,3 @@
#include "udfs/citus_shards_on_worker/11.0-2.sql" #include "udfs/citus_shards_on_worker/11.0-2.sql"
#include "udfs/citus_shard_indexes_on_worker/11.0-2.sql" #include "udfs/citus_shard_indexes_on_worker/11.0-2.sql"
#include "udfs/citus_is_coordinator/11.0-2.sql"

View File

@ -0,0 +1,7 @@
CREATE FUNCTION pg_catalog.citus_is_coordinator()
RETURNS bool
LANGUAGE c
STRICT
AS 'MODULE_PATHNAME', $$citus_is_coordinator$$;
COMMENT ON FUNCTION pg_catalog.citus_is_coordinator()
IS 'returns whether the current node is a coordinator';

View File

@ -0,0 +1,7 @@
CREATE FUNCTION pg_catalog.citus_is_coordinator()
RETURNS bool
LANGUAGE c
STRICT
AS 'MODULE_PATHNAME', $$citus_is_coordinator$$;
COMMENT ON FUNCTION pg_catalog.citus_is_coordinator()
IS 'returns whether the current node is a coordinator';

View File

@ -15,6 +15,21 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
1 1
(1 row) (1 row)
-- I am coordinator
SELECT citus_is_coordinator();
citus_is_coordinator
---------------------------------------------------------------------
t
(1 row)
-- workers are not coordinator
SELECT result FROM run_command_on_workers('SELECT citus_is_coordinator()');
result
---------------------------------------------------------------------
f
f
(2 rows)
-- get the active nodes -- get the active nodes
SELECT master_get_active_worker_nodes(); SELECT master_get_active_worker_nodes();
master_get_active_worker_nodes master_get_active_worker_nodes

View File

@ -1034,9 +1034,10 @@ SELECT * FROM multi_extension.print_extension_changes();
-- Snapshot of state at 11.0-2 -- Snapshot of state at 11.0-2
ALTER EXTENSION citus UPDATE TO '11.0-2'; ALTER EXTENSION citus UPDATE TO '11.0-2';
SELECT * FROM multi_extension.print_extension_changes(); SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object previous_object | current_object
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) | function citus_is_coordinator() boolean
(1 row)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version -- show running version

View File

@ -10,6 +10,13 @@ ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
-- adding the coordinator as inactive is disallowed -- adding the coordinator as inactive is disallowed
SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0);
ERROR: coordinator node cannot be added as inactive node ERROR: coordinator node cannot be added as inactive node
-- before adding a node we are not officially a coordinator
SELECT citus_is_coordinator();
citus_is_coordinator
---------------------------------------------------------------------
f
(1 row)
-- idempotently add node to allow this test to run without add_coordinator -- idempotently add node to allow this test to run without add_coordinator
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port); SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port);
@ -18,6 +25,13 @@ SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port);
1 1
(1 row) (1 row)
-- after adding a node we are officially a coordinator
SELECT citus_is_coordinator();
citus_is_coordinator
---------------------------------------------------------------------
t
(1 row)
-- coordinator cannot be disabled -- coordinator cannot be disabled
SELECT 1 FROM citus_disable_node('localhost', :master_port); SELECT 1 FROM citus_disable_node('localhost', :master_port);
ERROR: cannot change "isactive" field of the coordinator node ERROR: cannot change "isactive" field of the coordinator node

View File

@ -83,6 +83,7 @@ ORDER BY 1;
function citus_internal_local_blocked_processes() function citus_internal_local_blocked_processes()
function citus_internal_update_placement_metadata(bigint,integer,integer) function citus_internal_update_placement_metadata(bigint,integer,integer)
function citus_internal_update_relation_colocation(oid,integer) function citus_internal_update_relation_colocation(oid,integer)
function citus_is_coordinator()
function citus_isolation_test_session_is_blocked(integer,integer[]) function citus_isolation_test_session_is_blocked(integer,integer[])
function citus_json_concatenate(json,json) function citus_json_concatenate(json,json)
function citus_json_concatenate_final(json) function citus_json_concatenate_final(json)
@ -283,5 +284,5 @@ ORDER BY 1;
view citus_stat_statements view citus_stat_statements
view pg_dist_shard_placement view pg_dist_shard_placement
view time_partitions view time_partitions
(267 rows) (268 rows)

View File

@ -8,6 +8,11 @@ ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1;
SELECT 1 FROM master_add_node('localhost', :worker_1_port); SELECT 1 FROM master_add_node('localhost', :worker_1_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port); SELECT 1 FROM master_add_node('localhost', :worker_2_port);
-- I am coordinator
SELECT citus_is_coordinator();
-- workers are not coordinator
SELECT result FROM run_command_on_workers('SELECT citus_is_coordinator()');
-- get the active nodes -- get the active nodes
SELECT master_get_active_worker_nodes(); SELECT master_get_active_worker_nodes();

View File

@ -13,10 +13,16 @@ ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
-- adding the coordinator as inactive is disallowed -- adding the coordinator as inactive is disallowed
SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0);
-- before adding a node we are not officially a coordinator
SELECT citus_is_coordinator();
-- idempotently add node to allow this test to run without add_coordinator -- idempotently add node to allow this test to run without add_coordinator
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port); SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port);
-- after adding a node we are officially a coordinator
SELECT citus_is_coordinator();
-- coordinator cannot be disabled -- coordinator cannot be disabled
SELECT 1 FROM citus_disable_node('localhost', :master_port); SELECT 1 FROM citus_disable_node('localhost', :master_port);