diff --git a/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql b/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql index 6263b3154..d926ee5e3 100644 --- a/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql +++ b/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql @@ -2,3 +2,4 @@ #include "udfs/citus_shard_indexes_on_worker/11.0-2.sql" #include "udfs/citus_is_coordinator/11.0-2.sql" #include "udfs/citus_disable_node/11.0-2.sql" +#include "udfs/run_command_on_coordinator/11.0-2.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql b/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql index 3b557beb1..270bbb52e 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql @@ -12,3 +12,4 @@ COMMENT ON FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport intege REVOKE ALL ON FUNCTION pg_catalog.citus_disable_node(text,int, bool) FROM PUBLIC; DROP FUNCTION pg_catalog.citus_is_coordinator(); +DROP FUNCTION pg_catalog.run_command_on_coordinator(text,boolean); diff --git a/src/backend/distributed/sql/udfs/run_command_on_coordinator/11.0-2.sql b/src/backend/distributed/sql/udfs/run_command_on_coordinator/11.0-2.sql new file mode 100644 index 000000000..d8fb3c35c --- /dev/null +++ b/src/backend/distributed/sql/udfs/run_command_on_coordinator/11.0-2.sql @@ -0,0 +1,59 @@ +-- run_command_on_coordinator tries to closely follow the semantics of run_command_on_all_nodes, +-- but only runs the command on the coordinator +CREATE FUNCTION pg_catalog.run_command_on_coordinator(command text, give_warning_for_connection_errors bool default false, + OUT nodeid int, OUT success bool, OUT result text) + RETURNS SETOF record + LANGUAGE plpgsql + AS $function$ +DECLARE + nodenames text[]; + ports int[]; + commands text[]; + coordinator_is_in_metadata boolean; + parallel boolean := false; +BEGIN + WITH citus_nodes AS ( + SELECT * FROM pg_dist_node + WHERE isactive AND nodecluster = current_setting('citus.cluster_name') AND groupid = 0 + AND ( + (current_setting('citus.use_secondary_nodes') = 'never' AND noderole = 'primary') + OR + (current_setting('citus.use_secondary_nodes') = 'always' AND noderole = 'secondary') + ) + ORDER BY nodename, nodeport + ) + SELECT array_agg(citus_nodes.nodename), array_agg(citus_nodes.nodeport), array_agg(command), count(*) > 0 + FROM citus_nodes + INTO nodenames, ports, commands, coordinator_is_in_metadata; + + IF NOT coordinator_is_in_metadata THEN + -- This will happen when we call this function on coordinator and + -- the coordinator is not added to the metadata. + -- We'll manually add current node to the lists to actually run on all nodes. + -- But when the coordinator is not added to metadata and this function + -- is called from a worker node, this will not be enough and we'll + -- not be able run on all nodes. + IF citus_is_coordinator() THEN + SELECT + array_append(nodenames, current_setting('citus.local_hostname')), + array_append(ports, current_setting('port')::int), + array_append(commands, command) + INTO nodenames, ports, commands; + ELSE + RAISE EXCEPTION 'the coordinator is not added to the metadata' + USING HINT = 'Add the node as a coordinator by using: SELECT citus_set_coordinator_host('''')'; + END IF; + END IF; + + FOR nodeid, success, result IN + SELECT coalesce(pg_dist_node.nodeid, 0) AS nodeid, mrow.success, mrow.result + FROM master_run_on_worker(nodenames, ports, commands, parallel) mrow + LEFT JOIN pg_dist_node ON mrow.node_name = pg_dist_node.nodename AND mrow.node_port = pg_dist_node.nodeport + LOOP + IF give_warning_for_connection_errors AND NOT success THEN + RAISE WARNING 'Error on node with node id %: %', nodeid, result; + END IF; + RETURN NEXT; + END LOOP; +END; +$function$; diff --git a/src/backend/distributed/sql/udfs/run_command_on_coordinator/latest.sql b/src/backend/distributed/sql/udfs/run_command_on_coordinator/latest.sql new file mode 100644 index 000000000..d8fb3c35c --- /dev/null +++ b/src/backend/distributed/sql/udfs/run_command_on_coordinator/latest.sql @@ -0,0 +1,59 @@ +-- run_command_on_coordinator tries to closely follow the semantics of run_command_on_all_nodes, +-- but only runs the command on the coordinator +CREATE FUNCTION pg_catalog.run_command_on_coordinator(command text, give_warning_for_connection_errors bool default false, + OUT nodeid int, OUT success bool, OUT result text) + RETURNS SETOF record + LANGUAGE plpgsql + AS $function$ +DECLARE + nodenames text[]; + ports int[]; + commands text[]; + coordinator_is_in_metadata boolean; + parallel boolean := false; +BEGIN + WITH citus_nodes AS ( + SELECT * FROM pg_dist_node + WHERE isactive AND nodecluster = current_setting('citus.cluster_name') AND groupid = 0 + AND ( + (current_setting('citus.use_secondary_nodes') = 'never' AND noderole = 'primary') + OR + (current_setting('citus.use_secondary_nodes') = 'always' AND noderole = 'secondary') + ) + ORDER BY nodename, nodeport + ) + SELECT array_agg(citus_nodes.nodename), array_agg(citus_nodes.nodeport), array_agg(command), count(*) > 0 + FROM citus_nodes + INTO nodenames, ports, commands, coordinator_is_in_metadata; + + IF NOT coordinator_is_in_metadata THEN + -- This will happen when we call this function on coordinator and + -- the coordinator is not added to the metadata. + -- We'll manually add current node to the lists to actually run on all nodes. + -- But when the coordinator is not added to metadata and this function + -- is called from a worker node, this will not be enough and we'll + -- not be able run on all nodes. + IF citus_is_coordinator() THEN + SELECT + array_append(nodenames, current_setting('citus.local_hostname')), + array_append(ports, current_setting('port')::int), + array_append(commands, command) + INTO nodenames, ports, commands; + ELSE + RAISE EXCEPTION 'the coordinator is not added to the metadata' + USING HINT = 'Add the node as a coordinator by using: SELECT citus_set_coordinator_host('''')'; + END IF; + END IF; + + FOR nodeid, success, result IN + SELECT coalesce(pg_dist_node.nodeid, 0) AS nodeid, mrow.success, mrow.result + FROM master_run_on_worker(nodenames, ports, commands, parallel) mrow + LEFT JOIN pg_dist_node ON mrow.node_name = pg_dist_node.nodename AND mrow.node_port = pg_dist_node.nodeport + LOOP + IF give_warning_for_connection_errors AND NOT success THEN + RAISE WARNING 'Error on node with node id %: %', nodeid, result; + END IF; + RETURN NEXT; + END LOOP; +END; +$function$; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 3f58f9efc..8f211bd97 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1034,10 +1034,11 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Snapshot of state at 11.0-2 ALTER EXTENSION citus UPDATE TO '11.0-2'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- | function citus_is_coordinator() boolean -(1 row) + | function run_command_on_coordinator(text,boolean) SETOF record +(2 rows) -- Test downgrade script (result should be empty) ALTER EXTENSION citus UPDATE TO '11.0-1'; diff --git a/src/test/regress/expected/run_command_on_all_nodes.out b/src/test/regress/expected/run_command_on_all_nodes.out index 9cf838c83..3e37b7c60 100644 --- a/src/test/regress/expected/run_command_on_all_nodes.out +++ b/src/test/regress/expected/run_command_on_all_nodes.out @@ -63,6 +63,7 @@ SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHE (3 rows) -- break a node and check messages +BEGIN; SELECT nodeid AS worker_1_nodeid FROM pg_dist_node WHERE nodeport = :worker_1_port \gset UPDATE pg_dist_node SET nodeport = 0 WHERE nodeid = :worker_1_nodeid; SELECT nodeid = :worker_1_nodeid AS "Is Worker 1", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1; @@ -83,6 +84,62 @@ CONTEXT: PL/pgSQL function run_command_on_all_nodes(text,boolean,boolean) line t | f | failed to connect to localhost:xxxxx (3 rows) -UPDATE pg_dist_node SET nodeport = :worker_1_port WHERE nodeid = :worker_1_nodeid; +ROLLBACK; +-- break connection to localhost +BEGIN; +UPDATE pg_dist_node SET nodeport = 0 WHERE groupid = 0; +SELECT success, result +FROM run_command_on_coordinator('SELECT inet_server_port()') ORDER BY 1; + success | result +--------------------------------------------------------------------- + t | 57636 +(1 row) + +SELECT success, result +FROM run_command_on_coordinator('SELECT inet_server_port()', give_warning_for_connection_errors:=true) ORDER BY 1; + success | result +--------------------------------------------------------------------- + t | 57636 +(1 row) + +ROLLBACK; +-- we cannot use run_command_on_coordinator from workers if coordinator is not in the metadata +SELECT success, result FROM run_command_on_all_nodes($$select result from run_command_on_coordinator('select inet_server_port()')$$); + success | result +--------------------------------------------------------------------- + f | ERROR: the coordinator is not added to the metadata + f | ERROR: the coordinator is not added to the metadata + t | 57636 +(3 rows) + +-- we can use run_command_on_coordinator from any node if the coordinator is in the metadata +SELECT citus_set_coordinator_host('localhost'); + citus_set_coordinator_host +--------------------------------------------------------------------- + +(1 row) + +SELECT success, result FROM run_command_on_all_nodes($$select result from run_command_on_coordinator('select inet_server_port()')$$); + success | result +--------------------------------------------------------------------- + t | 57636 + t | 57636 + t | 57636 +(3 rows) + +SELECT citus_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0; + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +-- check that we fail when pg_dist_node is empty +BEGIN; +DELETE FROM pg_dist_node; +SELECT success, result FROM run_command_on_coordinator('select inet_server_port()'); +ERROR: the coordinator is not added to the metadata +HINT: Add the node as a coordinator by using: SELECT citus_set_coordinator_host('') +CONTEXT: PL/pgSQL function run_command_on_coordinator(text,boolean) line XX at RAISE +ROLLBACK; DROP SCHEMA run_command_on_all_nodes CASCADE; NOTICE: drop cascades to table run_command_on_all_nodes.tbl diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 613cab1f9..0315c411e 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -199,6 +199,7 @@ ORDER BY 1; function role_exists(name) function run_command_on_all_nodes(text,boolean,boolean) function run_command_on_colocated_placements(regclass,regclass,text,boolean) + function run_command_on_coordinator(text,boolean) function run_command_on_placements(regclass,text,boolean) function run_command_on_shards(regclass,text,boolean) function run_command_on_workers(text,boolean) @@ -276,5 +277,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(260 rows) +(261 rows) diff --git a/src/test/regress/sql/run_command_on_all_nodes.sql b/src/test/regress/sql/run_command_on_all_nodes.sql index 46f838eb5..2f6327ce2 100644 --- a/src/test/regress/sql/run_command_on_all_nodes.sql +++ b/src/test/regress/sql/run_command_on_all_nodes.sql @@ -30,12 +30,39 @@ SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes'; SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHERE schemaname = ''run_command_on_all_nodes'';'); -- break a node and check messages +BEGIN; SELECT nodeid AS worker_1_nodeid FROM pg_dist_node WHERE nodeport = :worker_1_port \gset UPDATE pg_dist_node SET nodeport = 0 WHERE nodeid = :worker_1_nodeid; SELECT nodeid = :worker_1_nodeid AS "Is Worker 1", success, result FROM run_command_on_all_nodes('SELECT 1') ORDER BY 1; SELECT nodeid = :worker_1_nodeid AS "Is Worker 1", success, result FROM run_command_on_all_nodes('SELECT 1', give_warning_for_connection_errors:=true) ORDER BY 1; -UPDATE pg_dist_node SET nodeport = :worker_1_port WHERE nodeid = :worker_1_nodeid; +ROLLBACK; + +-- break connection to localhost +BEGIN; +UPDATE pg_dist_node SET nodeport = 0 WHERE groupid = 0; + +SELECT success, result +FROM run_command_on_coordinator('SELECT inet_server_port()') ORDER BY 1; + +SELECT success, result +FROM run_command_on_coordinator('SELECT inet_server_port()', give_warning_for_connection_errors:=true) ORDER BY 1; + +ROLLBACK; + +-- we cannot use run_command_on_coordinator from workers if coordinator is not in the metadata +SELECT success, result FROM run_command_on_all_nodes($$select result from run_command_on_coordinator('select inet_server_port()')$$); + +-- we can use run_command_on_coordinator from any node if the coordinator is in the metadata +SELECT citus_set_coordinator_host('localhost'); +SELECT success, result FROM run_command_on_all_nodes($$select result from run_command_on_coordinator('select inet_server_port()')$$); +SELECT citus_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0; + +-- check that we fail when pg_dist_node is empty +BEGIN; +DELETE FROM pg_dist_node; +SELECT success, result FROM run_command_on_coordinator('select inet_server_port()'); +ROLLBACK; DROP SCHEMA run_command_on_all_nodes CASCADE;