Introduce UDF to check worker connectivity

citus_check_connection_to_node runs a simple query on a remote node and
reports whether this attempt was successful.

This UDF will be used to make sure each worker node can connect to all
the worker nodes in the cluster.

parameters:
nodename: required
nodeport: optional (default: 5432)

return value:
boolean success
pull/5503/head
Hanefi Onaldi 2021-12-03 02:17:58 +03:00 committed by Hanefi Onaldi
parent e4ead8f408
commit 56e9b1b968
11 changed files with 390 additions and 20 deletions

View File

@ -11,27 +11,30 @@
*/
#include "postgres.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/htup_details.h"
#include "catalog/pg_type.h"
#include "distributed/argutils.h"
#include "distributed/connection_management.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_server_executor.h"
#include "distributed/remote_commands.h"
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
#include "funcapi.h"
#include "lib/stringinfo.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "utils/builtins.h"
#include "distributed/multi_client_executor.h"
/* simple query to run on workers to check connectivity */
#define CONNECTIVITY_CHECK_QUERY "SELECT 1"
PG_FUNCTION_INFO_V1(citus_check_connection_to_node);
PG_FUNCTION_INFO_V1(master_run_on_worker);
static bool CheckConnectionToNode(char *nodeName, uint32 nodePort);
static int ParseCommandParameters(FunctionCallInfo fcinfo, StringInfo **nodeNameArray,
int **nodePortsArray, StringInfo **commandStringArray,
bool *parallel);
@ -60,6 +63,36 @@ static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor,
StringInfo *resultArray, int commandCount);
/*
* citus_check_connection_to_node sends a simple query from a worker node to another
* node, and returns success status.
*/
Datum
citus_check_connection_to_node(PG_FUNCTION_ARGS)
{
char *nodeName = PG_GETARG_TEXT_TO_CSTRING(0);
uint32 nodePort = PG_GETARG_UINT32(1);
bool success = CheckConnectionToNode(nodeName, nodePort);
PG_RETURN_BOOL(success);
}
/*
* CheckConnectionToNode sends a simple query to a node and returns success status
*/
static bool
CheckConnectionToNode(char *nodeName, uint32 nodePort)
{
int connectionFlags = 0;
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
int responseStatus = ExecuteOptionalRemoteCommand(connection,
CONNECTIVITY_CHECK_QUERY, NULL);
return responseStatus == RESPONSE_OKAY;
}
/*
* master_run_on_worker executes queries/commands to run on specified worker and
* returns success status and query/command result. Expected input is 3 arrays

View File

@ -3,6 +3,8 @@
-- bump version to 11.0-1
#include "udfs/citus_disable_node/11.0-1.sql"
#include "udfs/citus_check_connection_to_node/11.0-1.sql"
DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text);
DROP FUNCTION pg_catalog.master_get_table_metadata(text);
DROP FUNCTION pg_catalog.master_append_table_to_shard(bigint, text, text, integer);

View File

@ -39,3 +39,5 @@ CREATE FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport integer)
AS 'MODULE_PATHNAME', $$citus_disable_node$$;
COMMENT ON FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport integer)
IS 'removes node from the cluster temporarily';
DROP FUNCTION pg_catalog.citus_check_connection_to_node (text, integer);

View File

@ -0,0 +1,11 @@
CREATE FUNCTION pg_catalog.citus_check_connection_to_node (
nodename text,
nodeport integer DEFAULT 5432)
RETURNS bool
LANGUAGE C
STRICT
AS 'MODULE_PATHNAME', $$citus_check_connection_to_node$$;
COMMENT ON FUNCTION pg_catalog.citus_check_connection_to_node (
nodename text, nodeport integer)
IS 'checks connection to another node';

View File

@ -0,0 +1,11 @@
CREATE FUNCTION pg_catalog.citus_check_connection_to_node (
nodename text,
nodeport integer DEFAULT 5432)
RETURNS bool
LANGUAGE C
STRICT
AS 'MODULE_PATHNAME', $$citus_check_connection_to_node$$;
COMMENT ON FUNCTION pg_catalog.citus_check_connection_to_node (
nodename text, nodeport integer)
IS 'checks connection to another node';

View File

@ -233,6 +233,88 @@ WARNING: connection to the remote node localhost:xxxxx failed
---------------------------------------------------------------------
(0 rows)
-- tests for connectivity checks
SET client_min_messages TO ERROR;
-- kill the connection after authentication is ok
SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
citus_check_connection_to_node
---------------------------------------------------------------------
f
(1 row)
-- cancel the connection after authentication is ok
SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
-- kill the connection after connectivity check query is sent
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
citus_check_connection_to_node
---------------------------------------------------------------------
f
(1 row)
-- cancel the connection after connectivity check query is sent
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
-- kill the connection after connectivity check command is complete
SELECT citus.mitmproxy('conn.onCommandComplete(command="SELECT 1").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
citus_check_connection_to_node
---------------------------------------------------------------------
f
(1 row)
-- cancel the connection after connectivity check command is complete
SELECT citus.mitmproxy('conn.onCommandComplete(command="SELECT 1").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
-- verify that the checks are not successful when timeouts happen on a connection
SELECT citus.mitmproxy('conn.delay(500)');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
citus_check_connection_to_node
---------------------------------------------------------------------
f
(1 row)
RESET client_min_messages;
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------

View File

@ -3,14 +3,11 @@
--
-- tests UDFs created for citus tools
--
CREATE SCHEMA tools;
SET SEARCH_PATH TO 'tools';
SET citus.next_shard_id TO 1240000;
-- test with invalid port, prevent OS dependent warning from being displayed
SET client_min_messages to ERROR;
-- PG 9.5 does not show context for plpgsql raise
-- message whereas PG 9.6 shows. disabling it
-- for this test only to have consistent behavior
-- b/w PG 9.6+ and PG 9.5.
\set SHOW_CONTEXT never
SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[],
ARRAY['select count(*) from pg_dist_shard']::text[],
false);
@ -429,6 +426,7 @@ SELECT create_distributed_table('second_table', 'key', 'hash');
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
ERROR: tables check_colocated and second_table are not co-located
CONTEXT: PL/pgSQL function run_command_on_colocated_placements(regclass,regclass,text,boolean) line XX at RAISE
-- even when the difference is in replication factor, an error is thrown
DROP TABLE second_table;
SET citus.shard_replication_factor TO 1;
@ -443,6 +441,7 @@ SELECT create_distributed_table('second_table', 'key', 'hash');
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
ERROR: tables check_colocated and second_table are not co-located
CONTEXT: PL/pgSQL function run_command_on_colocated_placements(regclass,regclass,text,boolean) line XX at RAISE
-- when everything matches, the command is run!
DROP TABLE second_table;
SET citus.shard_replication_factor TO 2;
@ -494,6 +493,7 @@ SELECT * FROM run_command_on_shards('check_shards', 'select 1');
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid % 2 = 0;
SELECT * FROM run_command_on_shards('check_shards', 'select 1');
NOTICE: some shards do not have active placements
CONTEXT: PL/pgSQL function run_command_on_shards(regclass,text,boolean) line XX at RAISE
shardid | success | result
---------------------------------------------------------------------
1240025 | t | 1
@ -501,5 +501,136 @@ NOTICE: some shards do not have active placements
(2 rows)
DROP TABLE check_shards CASCADE;
-- test the connections to worker nodes
SELECT bool_and(success) AS all_nodes_are_successful FROM (
SELECT citus_check_connection_to_node(nodename, nodeport) AS success
FROM pg_dist_node
WHERE isactive = 't' AND noderole='primary'
) subquery;
all_nodes_are_successful
---------------------------------------------------------------------
t
(1 row)
-- verify that the coordinator can connect to itself
SELECT citus_check_connection_to_node('localhost', :master_port);
citus_check_connection_to_node
---------------------------------------------------------------------
t
(1 row)
-- verify that the connections are not successful for wrong port
-- test with invalid port, prevent OS dependent warning from being displayed
SET client_min_messages TO ERROR;
SELECT citus_check_connection_to_node('localhost', nodeport:=1234);
citus_check_connection_to_node
---------------------------------------------------------------------
f
(1 row)
-- verify that the connections are not successful due to timeouts
SET citus.node_connection_timeout TO 10;
SELECT citus_check_connection_to_node('www.citusdata.com');
citus_check_connection_to_node
---------------------------------------------------------------------
f
(1 row)
RESET citus.node_connection_timeout;
SET client_min_messages TO DEBUG;
-- check the connections in a transaction block
BEGIN;
SELECT citus_check_connection_to_node(nodename, nodeport)
FROM pg_dist_node
WHERE isactive = 't' AND noderole='primary';
citus_check_connection_to_node
---------------------------------------------------------------------
t
t
(2 rows)
CREATE TABLE distributed(id int, data text);
SELECT create_distributed_table('distributed', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM distributed;
DEBUG: Router planner cannot handle multi-shard select queries
count
---------------------------------------------------------------------
0
(1 row)
ROLLBACK;
-- create some roles for testing purposes
SET client_min_messages TO ERROR;
CREATE ROLE role_without_login WITH NOLOGIN;
SELECT 1 FROM run_command_on_workers($$CREATE ROLE role_without_login WITH NOLOGIN$$);
?column?
---------------------------------------------------------------------
1
1
(2 rows)
CREATE ROLE role_with_login WITH LOGIN;
SELECT 1 FROM run_command_on_workers($$CREATE ROLE role_with_login WITH LOGIN$$);
?column?
---------------------------------------------------------------------
1
1
(2 rows)
SET client_min_messages TO DEBUG;
-- verify that we can create connections only with users with login privileges.
SET ROLE role_without_login;
SELECT citus_check_connection_to_node('localhost', :worker_1_port);
WARNING: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "role_without_login" is not permitted to log in
citus_check_connection_to_node
---------------------------------------------------------------------
f
(1 row)
SET ROLE role_with_login;
SELECT citus_check_connection_to_node('localhost', :worker_1_port);
citus_check_connection_to_node
---------------------------------------------------------------------
t
(1 row)
RESET role;
DROP ROLE role_with_login, role_without_login;
SELECT 1 FROM run_command_on_workers($$DROP ROLE role_with_login, role_without_login$$);
?column?
---------------------------------------------------------------------
1
1
(2 rows)
-- check connections from a worker node
\c - - - :worker_1_port
SELECT citus_check_connection_to_node('localhost', :master_port);
citus_check_connection_to_node
---------------------------------------------------------------------
t
(1 row)
SELECT citus_check_connection_to_node('localhost', :worker_1_port);
citus_check_connection_to_node
---------------------------------------------------------------------
t
(1 row)
SELECT citus_check_connection_to_node('localhost', :worker_2_port);
citus_check_connection_to_node
---------------------------------------------------------------------
t
(1 row)
\c - - - :master_port
RESET client_min_messages;
DROP SCHEMA tools CASCADE;
RESET SEARCH_PATH;
-- set SHOW_CONTEXT back to default
\set SHOW_CONTEXT errors

View File

@ -966,14 +966,15 @@ SELECT * FROM multi_extension.print_extension_changes();
-- Snapshot of state at 11.0-1
ALTER EXTENSION citus UPDATE TO '11.0-1';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
previous_object | current_object
---------------------------------------------------------------------
function citus_disable_node(text,integer) void |
function master_append_table_to_shard(bigint,text,text,integer) real |
function master_apply_delete_command(text) integer |
function master_get_table_metadata(text) record |
| function citus_check_connection_to_node(text,integer) boolean
| function citus_disable_node(text,integer,boolean) void
(5 rows)
(6 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -38,6 +38,7 @@ ORDER BY 1;
function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real)
function citus_add_secondary_node(text,integer,text,integer,name)
function citus_blocking_pids(integer)
function citus_check_connection_to_node(text,integer)
function citus_cleanup_orphaned_shards()
function citus_conninfo_cache_invalidate()
function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode)
@ -259,5 +260,5 @@ ORDER BY 1;
view citus_worker_stat_activity
view pg_dist_shard_placement
view time_partitions
(243 rows)
(244 rows)

View File

@ -129,7 +129,38 @@ RESET client_min_messages;
-- verify get_global_active_transactions works when a timeout happens on a connection
SELECT get_global_active_transactions();
-- tests for connectivity checks
SET client_min_messages TO ERROR;
-- kill the connection after authentication is ok
SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()');
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
-- cancel the connection after authentication is ok
SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')');
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
-- kill the connection after connectivity check query is sent
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").kill()');
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
-- cancel the connection after connectivity check query is sent
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_pid() || ')');
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
-- kill the connection after connectivity check command is complete
SELECT citus.mitmproxy('conn.onCommandComplete(command="SELECT 1").kill()');
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
-- cancel the connection after connectivity check command is complete
SELECT citus.mitmproxy('conn.onCommandComplete(command="SELECT 1").cancel(' || pg_backend_pid() || ')');
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
-- verify that the checks are not successful when timeouts happen on a connection
SELECT citus.mitmproxy('conn.delay(500)');
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
RESET client_min_messages;
SELECT citus.mitmproxy('conn.allow()');
SET citus.node_connection_timeout TO DEFAULT;
DROP SCHEMA fail_connect CASCADE;

View File

@ -4,15 +4,12 @@
-- tests UDFs created for citus tools
--
CREATE SCHEMA tools;
SET SEARCH_PATH TO 'tools';
SET citus.next_shard_id TO 1240000;
-- test with invalid port, prevent OS dependent warning from being displayed
SET client_min_messages to ERROR;
-- PG 9.5 does not show context for plpgsql raise
-- message whereas PG 9.6 shows. disabling it
-- for this test only to have consistent behavior
-- b/w PG 9.6+ and PG 9.5.
\set SHOW_CONTEXT never
SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[],
ARRAY['select count(*) from pg_dist_shard']::text[],
@ -257,5 +254,73 @@ UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid % 2 = 0;
SELECT * FROM run_command_on_shards('check_shards', 'select 1');
DROP TABLE check_shards CASCADE;
-- test the connections to worker nodes
SELECT bool_and(success) AS all_nodes_are_successful FROM (
SELECT citus_check_connection_to_node(nodename, nodeport) AS success
FROM pg_dist_node
WHERE isactive = 't' AND noderole='primary'
) subquery;
-- verify that the coordinator can connect to itself
SELECT citus_check_connection_to_node('localhost', :master_port);
-- verify that the connections are not successful for wrong port
-- test with invalid port, prevent OS dependent warning from being displayed
SET client_min_messages TO ERROR;
SELECT citus_check_connection_to_node('localhost', nodeport:=1234);
-- verify that the connections are not successful due to timeouts
SET citus.node_connection_timeout TO 10;
SELECT citus_check_connection_to_node('www.citusdata.com');
RESET citus.node_connection_timeout;
SET client_min_messages TO DEBUG;
-- check the connections in a transaction block
BEGIN;
SELECT citus_check_connection_to_node(nodename, nodeport)
FROM pg_dist_node
WHERE isactive = 't' AND noderole='primary';
CREATE TABLE distributed(id int, data text);
SELECT create_distributed_table('distributed', 'id');
SELECT count(*) FROM distributed;
ROLLBACK;
-- create some roles for testing purposes
SET client_min_messages TO ERROR;
CREATE ROLE role_without_login WITH NOLOGIN;
SELECT 1 FROM run_command_on_workers($$CREATE ROLE role_without_login WITH NOLOGIN$$);
CREATE ROLE role_with_login WITH LOGIN;
SELECT 1 FROM run_command_on_workers($$CREATE ROLE role_with_login WITH LOGIN$$);
SET client_min_messages TO DEBUG;
-- verify that we can create connections only with users with login privileges.
SET ROLE role_without_login;
SELECT citus_check_connection_to_node('localhost', :worker_1_port);
SET ROLE role_with_login;
SELECT citus_check_connection_to_node('localhost', :worker_1_port);
RESET role;
DROP ROLE role_with_login, role_without_login;
SELECT 1 FROM run_command_on_workers($$DROP ROLE role_with_login, role_without_login$$);
-- check connections from a worker node
\c - - - :worker_1_port
SELECT citus_check_connection_to_node('localhost', :master_port);
SELECT citus_check_connection_to_node('localhost', :worker_1_port);
SELECT citus_check_connection_to_node('localhost', :worker_2_port);
\c - - - :master_port
RESET client_min_messages;
DROP SCHEMA tools CASCADE;
RESET SEARCH_PATH;
-- set SHOW_CONTEXT back to default
\set SHOW_CONTEXT errors