diff --git a/src/backend/distributed/operations/citus_tools.c b/src/backend/distributed/operations/citus_tools.c index de267a822..77f5190e3 100644 --- a/src/backend/distributed/operations/citus_tools.c +++ b/src/backend/distributed/operations/citus_tools.c @@ -11,27 +11,30 @@ */ #include "postgres.h" -#include "funcapi.h" -#include "libpq-fe.h" -#include "miscadmin.h" - #include "access/htup_details.h" #include "catalog/pg_type.h" +#include "distributed/argutils.h" #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_client_executor.h" #include "distributed/multi_server_executor.h" #include "distributed/remote_commands.h" -#include "distributed/worker_protocol.h" #include "distributed/version_compat.h" +#include "distributed/worker_protocol.h" +#include "funcapi.h" #include "lib/stringinfo.h" +#include "libpq-fe.h" +#include "miscadmin.h" #include "utils/builtins.h" -#include "distributed/multi_client_executor.h" - +/* simple query to run on workers to check connectivity */ +#define CONNECTIVITY_CHECK_QUERY "SELECT 1" +PG_FUNCTION_INFO_V1(citus_check_connection_to_node); PG_FUNCTION_INFO_V1(master_run_on_worker); +static bool CheckConnectionToNode(char *nodeName, uint32 nodePort); static int ParseCommandParameters(FunctionCallInfo fcinfo, StringInfo **nodeNameArray, int **nodePortsArray, StringInfo **commandStringArray, bool *parallel); @@ -60,6 +63,36 @@ static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor, StringInfo *resultArray, int commandCount); +/* + * citus_check_connection_to_node sends a simple query from a worker node to another + * node, and returns success status. + */ +Datum +citus_check_connection_to_node(PG_FUNCTION_ARGS) +{ + char *nodeName = PG_GETARG_TEXT_TO_CSTRING(0); + uint32 nodePort = PG_GETARG_UINT32(1); + + bool success = CheckConnectionToNode(nodeName, nodePort); + PG_RETURN_BOOL(success); +} + + +/* + * CheckConnectionToNode sends a simple query to a node and returns success status + */ +static bool +CheckConnectionToNode(char *nodeName, uint32 nodePort) +{ + int connectionFlags = 0; + MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); + int responseStatus = ExecuteOptionalRemoteCommand(connection, + CONNECTIVITY_CHECK_QUERY, NULL); + + return responseStatus == RESPONSE_OKAY; +} + + /* * master_run_on_worker executes queries/commands to run on specified worker and * returns success status and query/command result. Expected input is 3 arrays diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index 97bea41a3..03db03933 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -3,6 +3,8 @@ -- bump version to 11.0-1 #include "udfs/citus_disable_node/11.0-1.sql" +#include "udfs/citus_check_connection_to_node/11.0-1.sql" + DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text); DROP FUNCTION pg_catalog.master_get_table_metadata(text); DROP FUNCTION pg_catalog.master_append_table_to_shard(bigint, text, text, integer); diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index f171a407b..3d39a8808 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -39,3 +39,5 @@ CREATE FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport integer) AS 'MODULE_PATHNAME', $$citus_disable_node$$; COMMENT ON FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport integer) IS 'removes node from the cluster temporarily'; + +DROP FUNCTION pg_catalog.citus_check_connection_to_node (text, integer); diff --git a/src/backend/distributed/sql/udfs/citus_check_connection_to_node/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_check_connection_to_node/11.0-1.sql new file mode 100644 index 000000000..3c96d8aa8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_check_connection_to_node/11.0-1.sql @@ -0,0 +1,11 @@ +CREATE FUNCTION pg_catalog.citus_check_connection_to_node ( + nodename text, + nodeport integer DEFAULT 5432) + RETURNS bool + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME', $$citus_check_connection_to_node$$; + +COMMENT ON FUNCTION pg_catalog.citus_check_connection_to_node ( + nodename text, nodeport integer) + IS 'checks connection to another node'; diff --git a/src/backend/distributed/sql/udfs/citus_check_connection_to_node/latest.sql b/src/backend/distributed/sql/udfs/citus_check_connection_to_node/latest.sql new file mode 100644 index 000000000..3c96d8aa8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_check_connection_to_node/latest.sql @@ -0,0 +1,11 @@ +CREATE FUNCTION pg_catalog.citus_check_connection_to_node ( + nodename text, + nodeport integer DEFAULT 5432) + RETURNS bool + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME', $$citus_check_connection_to_node$$; + +COMMENT ON FUNCTION pg_catalog.citus_check_connection_to_node ( + nodename text, nodeport integer) + IS 'checks connection to another node'; diff --git a/src/test/regress/expected/failure_connection_establishment.out b/src/test/regress/expected/failure_connection_establishment.out index 4bec0f23d..bb9b27681 100644 --- a/src/test/regress/expected/failure_connection_establishment.out +++ b/src/test/regress/expected/failure_connection_establishment.out @@ -233,6 +233,88 @@ WARNING: connection to the remote node localhost:xxxxx failed --------------------------------------------------------------------- (0 rows) +-- tests for connectivity checks +SET client_min_messages TO ERROR; +-- kill the connection after authentication is ok +SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); + citus_check_connection_to_node +--------------------------------------------------------------------- + f +(1 row) + +-- cancel the connection after authentication is ok +SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); +ERROR: canceling statement due to user request +-- kill the connection after connectivity check query is sent +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); + citus_check_connection_to_node +--------------------------------------------------------------------- + f +(1 row) + +-- cancel the connection after connectivity check query is sent +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); +ERROR: canceling statement due to user request +-- kill the connection after connectivity check command is complete +SELECT citus.mitmproxy('conn.onCommandComplete(command="SELECT 1").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); + citus_check_connection_to_node +--------------------------------------------------------------------- + f +(1 row) + +-- cancel the connection after connectivity check command is complete +SELECT citus.mitmproxy('conn.onCommandComplete(command="SELECT 1").cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); +ERROR: canceling statement due to user request +-- verify that the checks are not successful when timeouts happen on a connection +SELECT citus.mitmproxy('conn.delay(500)'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); + citus_check_connection_to_node +--------------------------------------------------------------------- + f +(1 row) + +RESET client_min_messages; SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_citus_tools.out b/src/test/regress/expected/multi_citus_tools.out index a1c073530..7c091a0ba 100644 --- a/src/test/regress/expected/multi_citus_tools.out +++ b/src/test/regress/expected/multi_citus_tools.out @@ -3,14 +3,11 @@ -- -- tests UDFs created for citus tools -- +CREATE SCHEMA tools; +SET SEARCH_PATH TO 'tools'; SET citus.next_shard_id TO 1240000; -- test with invalid port, prevent OS dependent warning from being displayed SET client_min_messages to ERROR; --- PG 9.5 does not show context for plpgsql raise --- message whereas PG 9.6 shows. disabling it --- for this test only to have consistent behavior --- b/w PG 9.6+ and PG 9.5. -\set SHOW_CONTEXT never SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[], ARRAY['select count(*) from pg_dist_shard']::text[], false); @@ -429,6 +426,7 @@ SELECT create_distributed_table('second_table', 'key', 'hash'); SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table', 'select 1'); ERROR: tables check_colocated and second_table are not co-located +CONTEXT: PL/pgSQL function run_command_on_colocated_placements(regclass,regclass,text,boolean) line XX at RAISE -- even when the difference is in replication factor, an error is thrown DROP TABLE second_table; SET citus.shard_replication_factor TO 1; @@ -443,6 +441,7 @@ SELECT create_distributed_table('second_table', 'key', 'hash'); SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table', 'select 1'); ERROR: tables check_colocated and second_table are not co-located +CONTEXT: PL/pgSQL function run_command_on_colocated_placements(regclass,regclass,text,boolean) line XX at RAISE -- when everything matches, the command is run! DROP TABLE second_table; SET citus.shard_replication_factor TO 2; @@ -494,6 +493,7 @@ SELECT * FROM run_command_on_shards('check_shards', 'select 1'); UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid % 2 = 0; SELECT * FROM run_command_on_shards('check_shards', 'select 1'); NOTICE: some shards do not have active placements +CONTEXT: PL/pgSQL function run_command_on_shards(regclass,text,boolean) line XX at RAISE shardid | success | result --------------------------------------------------------------------- 1240025 | t | 1 @@ -501,5 +501,136 @@ NOTICE: some shards do not have active placements (2 rows) DROP TABLE check_shards CASCADE; +-- test the connections to worker nodes +SELECT bool_and(success) AS all_nodes_are_successful FROM ( + SELECT citus_check_connection_to_node(nodename, nodeport) AS success + FROM pg_dist_node + WHERE isactive = 't' AND noderole='primary' +) subquery; + all_nodes_are_successful +--------------------------------------------------------------------- + t +(1 row) + +-- verify that the coordinator can connect to itself +SELECT citus_check_connection_to_node('localhost', :master_port); + citus_check_connection_to_node +--------------------------------------------------------------------- + t +(1 row) + +-- verify that the connections are not successful for wrong port +-- test with invalid port, prevent OS dependent warning from being displayed +SET client_min_messages TO ERROR; +SELECT citus_check_connection_to_node('localhost', nodeport:=1234); + citus_check_connection_to_node +--------------------------------------------------------------------- + f +(1 row) + +-- verify that the connections are not successful due to timeouts +SET citus.node_connection_timeout TO 10; +SELECT citus_check_connection_to_node('www.citusdata.com'); + citus_check_connection_to_node +--------------------------------------------------------------------- + f +(1 row) + +RESET citus.node_connection_timeout; +SET client_min_messages TO DEBUG; +-- check the connections in a transaction block +BEGIN; +SELECT citus_check_connection_to_node(nodename, nodeport) +FROM pg_dist_node +WHERE isactive = 't' AND noderole='primary'; + citus_check_connection_to_node +--------------------------------------------------------------------- + t + t +(2 rows) + +CREATE TABLE distributed(id int, data text); +SELECT create_distributed_table('distributed', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM distributed; +DEBUG: Router planner cannot handle multi-shard select queries + count +--------------------------------------------------------------------- + 0 +(1 row) + +ROLLBACK; +-- create some roles for testing purposes +SET client_min_messages TO ERROR; +CREATE ROLE role_without_login WITH NOLOGIN; +SELECT 1 FROM run_command_on_workers($$CREATE ROLE role_without_login WITH NOLOGIN$$); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +CREATE ROLE role_with_login WITH LOGIN; +SELECT 1 FROM run_command_on_workers($$CREATE ROLE role_with_login WITH LOGIN$$); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +SET client_min_messages TO DEBUG; +-- verify that we can create connections only with users with login privileges. +SET ROLE role_without_login; +SELECT citus_check_connection_to_node('localhost', :worker_1_port); +WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "role_without_login" is not permitted to log in + citus_check_connection_to_node +--------------------------------------------------------------------- + f +(1 row) + +SET ROLE role_with_login; +SELECT citus_check_connection_to_node('localhost', :worker_1_port); + citus_check_connection_to_node +--------------------------------------------------------------------- + t +(1 row) + +RESET role; +DROP ROLE role_with_login, role_without_login; +SELECT 1 FROM run_command_on_workers($$DROP ROLE role_with_login, role_without_login$$); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- check connections from a worker node +\c - - - :worker_1_port +SELECT citus_check_connection_to_node('localhost', :master_port); + citus_check_connection_to_node +--------------------------------------------------------------------- + t +(1 row) + +SELECT citus_check_connection_to_node('localhost', :worker_1_port); + citus_check_connection_to_node +--------------------------------------------------------------------- + t +(1 row) + +SELECT citus_check_connection_to_node('localhost', :worker_2_port); + citus_check_connection_to_node +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :master_port +RESET client_min_messages; +DROP SCHEMA tools CASCADE; +RESET SEARCH_PATH; -- set SHOW_CONTEXT back to default \set SHOW_CONTEXT errors diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 2ba1470b6..c59f67b95 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -966,14 +966,15 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Snapshot of state at 11.0-1 ALTER EXTENSION citus UPDATE TO '11.0-1'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- function citus_disable_node(text,integer) void | function master_append_table_to_shard(bigint,text,text,integer) real | function master_apply_delete_command(text) integer | function master_get_table_metadata(text) record | + | function citus_check_connection_to_node(text,integer) boolean | function citus_disable_node(text,integer,boolean) void -(5 rows) +(6 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 10c8f82fb..8bdd5cdab 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -38,6 +38,7 @@ ORDER BY 1; function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) function citus_add_secondary_node(text,integer,text,integer,name) function citus_blocking_pids(integer) + function citus_check_connection_to_node(text,integer) function citus_cleanup_orphaned_shards() function citus_conninfo_cache_invalidate() function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode) @@ -259,5 +260,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(243 rows) +(244 rows) diff --git a/src/test/regress/sql/failure_connection_establishment.sql b/src/test/regress/sql/failure_connection_establishment.sql index 95c4ee20e..70812e56a 100644 --- a/src/test/regress/sql/failure_connection_establishment.sql +++ b/src/test/regress/sql/failure_connection_establishment.sql @@ -129,7 +129,38 @@ RESET client_min_messages; -- verify get_global_active_transactions works when a timeout happens on a connection SELECT get_global_active_transactions(); +-- tests for connectivity checks +SET client_min_messages TO ERROR; +-- kill the connection after authentication is ok +SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()'); +SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); + +-- cancel the connection after authentication is ok +SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')'); +SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); + +-- kill the connection after connectivity check query is sent +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").kill()'); +SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); + +-- cancel the connection after connectivity check query is sent +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_pid() || ')'); +SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); + +-- kill the connection after connectivity check command is complete +SELECT citus.mitmproxy('conn.onCommandComplete(command="SELECT 1").kill()'); +SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); + +-- cancel the connection after connectivity check command is complete +SELECT citus.mitmproxy('conn.onCommandComplete(command="SELECT 1").cancel(' || pg_backend_pid() || ')'); +SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); + +-- verify that the checks are not successful when timeouts happen on a connection +SELECT citus.mitmproxy('conn.delay(500)'); +SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); + +RESET client_min_messages; SELECT citus.mitmproxy('conn.allow()'); SET citus.node_connection_timeout TO DEFAULT; DROP SCHEMA fail_connect CASCADE; diff --git a/src/test/regress/sql/multi_citus_tools.sql b/src/test/regress/sql/multi_citus_tools.sql index 4ec9c1307..6db60c8d4 100644 --- a/src/test/regress/sql/multi_citus_tools.sql +++ b/src/test/regress/sql/multi_citus_tools.sql @@ -4,15 +4,12 @@ -- tests UDFs created for citus tools -- +CREATE SCHEMA tools; +SET SEARCH_PATH TO 'tools'; SET citus.next_shard_id TO 1240000; -- test with invalid port, prevent OS dependent warning from being displayed SET client_min_messages to ERROR; --- PG 9.5 does not show context for plpgsql raise --- message whereas PG 9.6 shows. disabling it --- for this test only to have consistent behavior --- b/w PG 9.6+ and PG 9.5. -\set SHOW_CONTEXT never SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[], ARRAY['select count(*) from pg_dist_shard']::text[], @@ -257,5 +254,73 @@ UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid % 2 = 0; SELECT * FROM run_command_on_shards('check_shards', 'select 1'); DROP TABLE check_shards CASCADE; +-- test the connections to worker nodes +SELECT bool_and(success) AS all_nodes_are_successful FROM ( + SELECT citus_check_connection_to_node(nodename, nodeport) AS success + FROM pg_dist_node + WHERE isactive = 't' AND noderole='primary' +) subquery; + +-- verify that the coordinator can connect to itself +SELECT citus_check_connection_to_node('localhost', :master_port); + +-- verify that the connections are not successful for wrong port +-- test with invalid port, prevent OS dependent warning from being displayed +SET client_min_messages TO ERROR; +SELECT citus_check_connection_to_node('localhost', nodeport:=1234); + +-- verify that the connections are not successful due to timeouts +SET citus.node_connection_timeout TO 10; +SELECT citus_check_connection_to_node('www.citusdata.com'); +RESET citus.node_connection_timeout; + +SET client_min_messages TO DEBUG; + +-- check the connections in a transaction block +BEGIN; +SELECT citus_check_connection_to_node(nodename, nodeport) +FROM pg_dist_node +WHERE isactive = 't' AND noderole='primary'; + +CREATE TABLE distributed(id int, data text); +SELECT create_distributed_table('distributed', 'id'); +SELECT count(*) FROM distributed; + +ROLLBACK; + +-- create some roles for testing purposes +SET client_min_messages TO ERROR; + +CREATE ROLE role_without_login WITH NOLOGIN; +SELECT 1 FROM run_command_on_workers($$CREATE ROLE role_without_login WITH NOLOGIN$$); + +CREATE ROLE role_with_login WITH LOGIN; +SELECT 1 FROM run_command_on_workers($$CREATE ROLE role_with_login WITH LOGIN$$); + +SET client_min_messages TO DEBUG; + +-- verify that we can create connections only with users with login privileges. +SET ROLE role_without_login; +SELECT citus_check_connection_to_node('localhost', :worker_1_port); + +SET ROLE role_with_login; +SELECT citus_check_connection_to_node('localhost', :worker_1_port); + +RESET role; + +DROP ROLE role_with_login, role_without_login; +SELECT 1 FROM run_command_on_workers($$DROP ROLE role_with_login, role_without_login$$); + +-- check connections from a worker node +\c - - - :worker_1_port +SELECT citus_check_connection_to_node('localhost', :master_port); +SELECT citus_check_connection_to_node('localhost', :worker_1_port); +SELECT citus_check_connection_to_node('localhost', :worker_2_port); + +\c - - - :master_port + +RESET client_min_messages; +DROP SCHEMA tools CASCADE; +RESET SEARCH_PATH; -- set SHOW_CONTEXT back to default \set SHOW_CONTEXT errors