Add a run_command_on_coordinator function

pull/5941/head
Marco Slot 2022-05-13 12:15:43 +02:00
parent fa9cee409c
commit 79d7e860e6
8 changed files with 211 additions and 5 deletions

View File

@ -2,3 +2,4 @@
#include "udfs/citus_shard_indexes_on_worker/11.0-2.sql"
#include "udfs/citus_is_coordinator/11.0-2.sql"
#include "udfs/citus_disable_node/11.0-2.sql"
#include "udfs/run_command_on_coordinator/11.0-2.sql"

View File

@ -12,3 +12,4 @@ COMMENT ON FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport intege
REVOKE ALL ON FUNCTION pg_catalog.citus_disable_node(text,int, bool) FROM PUBLIC;
DROP FUNCTION pg_catalog.citus_is_coordinator();
DROP FUNCTION pg_catalog.run_command_on_coordinator(text,boolean);

View File

@ -0,0 +1,59 @@
-- run_command_on_coordinator tries to closely follow the semantics of run_command_on_all_nodes,
-- but only runs the command on the coordinator
CREATE FUNCTION pg_catalog.run_command_on_coordinator(command text, give_warning_for_connection_errors bool default false,
OUT nodeid int, OUT success bool, OUT result text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
DECLARE
nodenames text[];
ports int[];
commands text[];
coordinator_is_in_metadata boolean;
parallel boolean := false;
BEGIN
WITH citus_nodes AS (
SELECT * FROM pg_dist_node
WHERE isactive AND nodecluster = current_setting('citus.cluster_name') AND groupid = 0
AND (
(current_setting('citus.use_secondary_nodes') = 'never' AND noderole = 'primary')
OR
(current_setting('citus.use_secondary_nodes') = 'always' AND noderole = 'secondary')
)
ORDER BY nodename, nodeport
)
SELECT array_agg(citus_nodes.nodename), array_agg(citus_nodes.nodeport), array_agg(command), count(*) > 0
FROM citus_nodes
INTO nodenames, ports, commands, coordinator_is_in_metadata;
IF NOT coordinator_is_in_metadata THEN
-- This will happen when we call this function on coordinator and
-- the coordinator is not added to the metadata.
-- We'll manually add current node to the lists to actually run on all nodes.
-- But when the coordinator is not added to metadata and this function
-- is called from a worker node, this will not be enough and we'll
-- not be able run on all nodes.
IF citus_is_coordinator() THEN
SELECT
array_append(nodenames, current_setting('citus.local_hostname')),
array_append(ports, current_setting('port')::int),
array_append(commands, command)
INTO nodenames, ports, commands;
ELSE
RAISE EXCEPTION 'the coordinator is not added to the metadata'
USING HINT = 'Add the node as a coordinator by using: SELECT citus_set_coordinator_host(''<hostname>'')';
END IF;
END IF;
FOR nodeid, success, result IN
SELECT coalesce(pg_dist_node.nodeid, 0) AS nodeid, mrow.success, mrow.result
FROM master_run_on_worker(nodenames, ports, commands, parallel) mrow
LEFT JOIN pg_dist_node ON mrow.node_name = pg_dist_node.nodename AND mrow.node_port = pg_dist_node.nodeport
LOOP
IF give_warning_for_connection_errors AND NOT success THEN
RAISE WARNING 'Error on node with node id %: %', nodeid, result;
END IF;
RETURN NEXT;
END LOOP;
END;
$function$;

View File

@ -0,0 +1,59 @@
-- run_command_on_coordinator tries to closely follow the semantics of run_command_on_all_nodes,
-- but only runs the command on the coordinator
CREATE FUNCTION pg_catalog.run_command_on_coordinator(command text, give_warning_for_connection_errors bool default false,
OUT nodeid int, OUT success bool, OUT result text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
DECLARE
nodenames text[];
ports int[];
commands text[];
coordinator_is_in_metadata boolean;
parallel boolean := false;
BEGIN
WITH citus_nodes AS (
SELECT * FROM pg_dist_node
WHERE isactive AND nodecluster = current_setting('citus.cluster_name') AND groupid = 0
AND (
(current_setting('citus.use_secondary_nodes') = 'never' AND noderole = 'primary')
OR
(current_setting('citus.use_secondary_nodes') = 'always' AND noderole = 'secondary')
)
ORDER BY nodename, nodeport
)
SELECT array_agg(citus_nodes.nodename), array_agg(citus_nodes.nodeport), array_agg(command), count(*) > 0
FROM citus_nodes
INTO nodenames, ports, commands, coordinator_is_in_metadata;
IF NOT coordinator_is_in_metadata THEN
-- This will happen when we call this function on coordinator and
-- the coordinator is not added to the metadata.
-- We'll manually add current node to the lists to actually run on all nodes.
-- But when the coordinator is not added to metadata and this function
-- is called from a worker node, this will not be enough and we'll
-- not be able run on all nodes.
IF citus_is_coordinator() THEN
SELECT
array_append(nodenames, current_setting('citus.local_hostname')),
array_append(ports, current_setting('port')::int),
array_append(commands, command)
INTO nodenames, ports, commands;
ELSE
RAISE EXCEPTION 'the coordinator is not added to the metadata'
USING HINT = 'Add the node as a coordinator by using: SELECT citus_set_coordinator_host(''<hostname>'')';
END IF;
END IF;
FOR nodeid, success, result IN
SELECT coalesce(pg_dist_node.nodeid, 0) AS nodeid, mrow.success, mrow.result
FROM master_run_on_worker(nodenames, ports, commands, parallel) mrow
LEFT JOIN pg_dist_node ON mrow.node_name = pg_dist_node.nodename AND mrow.node_port = pg_dist_node.nodeport
LOOP
IF give_warning_for_connection_errors AND NOT success THEN
RAISE WARNING 'Error on node with node id %: %', nodeid, result;
END IF;
RETURN NEXT;
END LOOP;
END;
$function$;

View File

@ -1034,10 +1034,11 @@ SELECT * FROM multi_extension.print_extension_changes();
-- Snapshot of state at 11.0-2
ALTER EXTENSION citus UPDATE TO '11.0-2';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
previous_object | current_object
---------------------------------------------------------------------
| function citus_is_coordinator() boolean
(1 row)
| function run_command_on_coordinator(text,boolean) SETOF record
(2 rows)
-- Test downgrade script (result should be empty)
ALTER EXTENSION citus UPDATE TO '11.0-1';

View File

@ -63,6 +63,7 @@ SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHE
(3 rows)
-- break a node and check messages
BEGIN;
SELECT nodeid AS worker_1_nodeid FROM pg_dist_node WHERE nodeport = :worker_1_port \gset
UPDATE pg_dist_node SET nodeport = 0 WHERE nodeid = :worker_1_nodeid;
SELECT nodeid = :worker_1_nodeid AS "Is Worker 1", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1;
@ -83,6 +84,62 @@ CONTEXT: PL/pgSQL function run_command_on_all_nodes(text,boolean,boolean) line
t | f | failed to connect to localhost:xxxxx
(3 rows)
UPDATE pg_dist_node SET nodeport = :worker_1_port WHERE nodeid = :worker_1_nodeid;
ROLLBACK;
-- break connection to localhost
BEGIN;
UPDATE pg_dist_node SET nodeport = 0 WHERE groupid = 0;
SELECT success, result
FROM run_command_on_coordinator('SELECT inet_server_port()') ORDER BY 1;
success | result
---------------------------------------------------------------------
t | 57636
(1 row)
SELECT success, result
FROM run_command_on_coordinator('SELECT inet_server_port()', give_warning_for_connection_errors:=true) ORDER BY 1;
success | result
---------------------------------------------------------------------
t | 57636
(1 row)
ROLLBACK;
-- we cannot use run_command_on_coordinator from workers if coordinator is not in the metadata
SELECT success, result FROM run_command_on_all_nodes($$select result from run_command_on_coordinator('select inet_server_port()')$$);
success | result
---------------------------------------------------------------------
f | ERROR: the coordinator is not added to the metadata
f | ERROR: the coordinator is not added to the metadata
t | 57636
(3 rows)
-- we can use run_command_on_coordinator from any node if the coordinator is in the metadata
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
SELECT success, result FROM run_command_on_all_nodes($$select result from run_command_on_coordinator('select inet_server_port()')$$);
success | result
---------------------------------------------------------------------
t | 57636
t | 57636
t | 57636
(3 rows)
SELECT citus_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0;
citus_remove_node
---------------------------------------------------------------------
(1 row)
-- check that we fail when pg_dist_node is empty
BEGIN;
DELETE FROM pg_dist_node;
SELECT success, result FROM run_command_on_coordinator('select inet_server_port()');
ERROR: the coordinator is not added to the metadata
HINT: Add the node as a coordinator by using: SELECT citus_set_coordinator_host('<hostname>')
CONTEXT: PL/pgSQL function run_command_on_coordinator(text,boolean) line XX at RAISE
ROLLBACK;
DROP SCHEMA run_command_on_all_nodes CASCADE;
NOTICE: drop cascades to table run_command_on_all_nodes.tbl

View File

@ -199,6 +199,7 @@ ORDER BY 1;
function role_exists(name)
function run_command_on_all_nodes(text,boolean,boolean)
function run_command_on_colocated_placements(regclass,regclass,text,boolean)
function run_command_on_coordinator(text,boolean)
function run_command_on_placements(regclass,text,boolean)
function run_command_on_shards(regclass,text,boolean)
function run_command_on_workers(text,boolean)
@ -276,5 +277,5 @@ ORDER BY 1;
view citus_stat_statements
view pg_dist_shard_placement
view time_partitions
(260 rows)
(261 rows)

View File

@ -30,12 +30,39 @@ SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes';
SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHERE schemaname = ''run_command_on_all_nodes'';');
-- break a node and check messages
BEGIN;
SELECT nodeid AS worker_1_nodeid FROM pg_dist_node WHERE nodeport = :worker_1_port \gset
UPDATE pg_dist_node SET nodeport = 0 WHERE nodeid = :worker_1_nodeid;
SELECT nodeid = :worker_1_nodeid AS "Is Worker 1", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1;
SELECT nodeid = :worker_1_nodeid AS "Is Worker 1", success, result FROM run_command_on_all_nodes('SELECT 1', give_warning_for_connection_errors:=true) ORDER BY 1;
UPDATE pg_dist_node SET nodeport = :worker_1_port WHERE nodeid = :worker_1_nodeid;
ROLLBACK;
-- break connection to localhost
BEGIN;
UPDATE pg_dist_node SET nodeport = 0 WHERE groupid = 0;
SELECT success, result
FROM run_command_on_coordinator('SELECT inet_server_port()') ORDER BY 1;
SELECT success, result
FROM run_command_on_coordinator('SELECT inet_server_port()', give_warning_for_connection_errors:=true) ORDER BY 1;
ROLLBACK;
-- we cannot use run_command_on_coordinator from workers if coordinator is not in the metadata
SELECT success, result FROM run_command_on_all_nodes($$select result from run_command_on_coordinator('select inet_server_port()')$$);
-- we can use run_command_on_coordinator from any node if the coordinator is in the metadata
SELECT citus_set_coordinator_host('localhost');
SELECT success, result FROM run_command_on_all_nodes($$select result from run_command_on_coordinator('select inet_server_port()')$$);
SELECT citus_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0;
-- check that we fail when pg_dist_node is empty
BEGIN;
DELETE FROM pg_dist_node;
SELECT success, result FROM run_command_on_coordinator('select inet_server_port()');
ROLLBACK;
DROP SCHEMA run_command_on_all_nodes CASCADE;