From 29e45166427bab00f5301f3665f55818290b6dbb Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Thu, 2 Dec 2021 01:41:54 +0300 Subject: [PATCH] Introduce citus_check_cluster_node_health UDF This UDF coordinates connectivity checks accross the whole cluster. This UDF gets the list of active readable nodes in the cluster, and coordinates all connectivity checks in sequential order. The algorithm is: for sourceNode in activeReadableWorkerList: c = connectToNode(sourceNode) for targetNode in activeReadableWorkerList: result = c.execute( "SELECT citus_check_connection_to_node(targetNode.name, targetNode.port") emit sourceNode.name, sourceNode.port, targetNode.name, targetNode.port, result - result -> true -> connection attempt from source to target succeeded - result -> false -> connection attempt from source to target failed - result -> NULL -> connection attempt from the current node to source node failed I suggest you use the following query to get an overview on the connectivity: SELECT bool_and(COALESCE(result, false)) FROM citus_check_cluster_node_health(); Whenever this query returns false, there is a connectivity issue, check in detail. --- .../distributed/operations/citus_tools.c | 140 ++++++++++++++++++ .../distributed/sql/citus--10.2-4--11.0-1.sql | 2 + .../sql/downgrades/citus--11.0-1--10.2-4.sql | 2 + .../11.0-1.sql | 13 ++ .../latest.sql | 13 ++ src/test/regress/create_schedule | 1 + .../regress/expected/connectivity_checks.out | 6 + .../failure_connection_establishment.out | 115 ++++++++++++++ .../regress/expected/multi_citus_tools.out | 39 +++++ src/test/regress/expected/multi_extension.out | 3 +- .../multi_follower_select_statements.out | 10 ++ .../expected/multi_mx_add_coordinator.out | 35 ++++- .../expected/upgrade_list_citus_objects.out | 3 +- src/test/regress/sql/connectivity_checks.sql | 1 + .../sql/failure_connection_establishment.sql | 39 +++++ src/test/regress/sql/multi_citus_tools.sql | 14 ++ .../sql/multi_follower_select_statements.sql | 3 + .../regress/sql/multi_mx_add_coordinator.sql | 25 +++- src/test/regress/sql_schedule | 1 + 19 files changed, 455 insertions(+), 10 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_check_cluster_node_health/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_check_cluster_node_health/latest.sql create mode 100644 src/test/regress/expected/connectivity_checks.out create mode 100644 src/test/regress/sql/connectivity_checks.sql diff --git a/src/backend/distributed/operations/citus_tools.c b/src/backend/distributed/operations/citus_tools.c index 19d198c0c..564a51412 100644 --- a/src/backend/distributed/operations/citus_tools.c +++ b/src/backend/distributed/operations/citus_tools.c @@ -20,6 +20,9 @@ #include "distributed/multi_client_executor.h" #include "distributed/multi_server_executor.h" #include "distributed/remote_commands.h" +#include "distributed/listutils.h" +#include "distributed/lock_graph.h" +#include "distributed/tuplestore.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" #include "funcapi.h" @@ -30,8 +33,10 @@ /* simple query to run on workers to check connectivity */ #define CONNECTIVITY_CHECK_QUERY "SELECT 1" +#define CONNECTIVITY_CHECK_COLUMNS 5 PG_FUNCTION_INFO_V1(citus_check_connection_to_node); +PG_FUNCTION_INFO_V1(citus_check_cluster_node_health); PG_FUNCTION_INFO_V1(master_run_on_worker); static bool CheckConnectionToNode(char *nodeName, uint32 nodePort); @@ -61,6 +66,9 @@ static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor, StringInfo *nodeNameArray, int *nodePortArray, bool *statusArray, StringInfo *resultArray, int commandCount); +static void StoreAllConnectivityChecks(Tuplestorestate *tupleStore, + TupleDesc tupleDescriptor); +static char * GetConnectivityCheckCommand(const char *nodeName, const uint32 nodePort); /* @@ -93,6 +101,138 @@ CheckConnectionToNode(char *nodeName, uint32 nodePort) } +/* + * citus_check_cluster_node_health UDF performs connectivity checks from all the nodes to + * all the nodes, and report success status + */ +Datum +citus_check_cluster_node_health(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); + + StoreAllConnectivityChecks(tupleStore, tupleDescriptor); + + PG_RETURN_VOID(); +} + + +/* + * GetConnectivityCheckCommand returns the command to check connections to a node + */ +static char * +GetConnectivityCheckCommand(const char *nodeName, const uint32 nodePort) +{ + StringInfo connectivityCheckCommand = makeStringInfo(); + appendStringInfo(connectivityCheckCommand, + "SELECT citus_check_connection_to_node('%s', %d)", + nodeName, nodePort); + + return connectivityCheckCommand->data; +} + + +/* + * StoreAllConnectivityChecks performs connectivity checks from all the nodes to all the + * nodes, and report success status. + * + * Algorithm is: + * for sourceNode in activeReadableNodeList: + * c = connectToNode(sourceNode) + * for targetNode in activeReadableNodeList: + * result = c.execute("SELECT citus_check_connection_to_node(targetNode.name, targetNode.port") + * emit sourceNode.name, sourceNode.port, targetNode.name, targetNode.port, result + * + * -- result -> true -> connection attempt from source to target succeeded + * -- result -> false -> connection attempt from source to target failed + * -- result -> NULL -> connection attempt from the current node to source node failed + */ +static void +StoreAllConnectivityChecks(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) +{ + Datum values[CONNECTIVITY_CHECK_COLUMNS]; + bool isNulls[CONNECTIVITY_CHECK_COLUMNS]; + + /* + * Get all the readable node list so that we will check connectivity to followers in + * the cluster as well. + */ + List *workerNodeList = ActiveReadableNodeList(); + + /* we want to check for connectivity in a deterministic order */ + workerNodeList = SortList(workerNodeList, CompareWorkerNodes); + + /* + * We iterate over the workerNodeList twice, for source and target worker nodes. This + * operation is safe for foreach_ptr macro, as long as we use different variables for + * each iteration. + */ + WorkerNode *sourceWorkerNode = NULL; + foreach_ptr(sourceWorkerNode, workerNodeList) + { + const char *sourceNodeName = sourceWorkerNode->workerName; + const int sourceNodePort = sourceWorkerNode->workerPort; + int32 connectionFlags = 0; + + /* open a connection to the source node using the synchronous api */ + MultiConnection *connectionToSourceNode = + GetNodeConnection(connectionFlags, sourceNodeName, sourceNodePort); + + /* the second iteration over workerNodeList for the target worker nodes. */ + WorkerNode *targetWorkerNode = NULL; + foreach_ptr(targetWorkerNode, workerNodeList) + { + const char *targetNodeName = targetWorkerNode->workerName; + const int targetNodePort = targetWorkerNode->workerPort; + + char *connectivityCheckCommandToTargetNode = + GetConnectivityCheckCommand(targetNodeName, targetNodePort); + + PGresult *result = NULL; + int executionResult = + ExecuteOptionalRemoteCommand(connectionToSourceNode, + connectivityCheckCommandToTargetNode, + &result); + + /* get ready for the next tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[0] = PointerGetDatum(cstring_to_text(sourceNodeName)); + values[1] = Int32GetDatum(sourceNodePort); + values[2] = PointerGetDatum(cstring_to_text(targetNodeName)); + values[3] = Int32GetDatum(targetNodePort); + + /* + * If we could not send the query or the result was not ok, set success field + * to NULL. This may indicate connection errors to a worker node, however that + * node can potentially connect to other nodes. + * + * Therefore, we mark the success as NULL to indicate that the connectivity + * status is unknown. + */ + if (executionResult != RESPONSE_OKAY) + { + isNulls[4] = true; + } + else + { + int rowIndex = 0; + int columnIndex = 0; + values[4] = BoolGetDatum(ParseBoolField(result, rowIndex, columnIndex)); + } + + tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); + + PQclear(result); + ForgetResults(connectionToSourceNode); + } + } +} + + /* * master_run_on_worker executes queries/commands to run on specified worker and * returns success status and query/command result. Expected input is 3 arrays diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index 3b7814c63..c5f333152 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -4,6 +4,8 @@ #include "udfs/citus_disable_node/11.0-1.sql" #include "udfs/citus_check_connection_to_node/11.0-1.sql" +#include "udfs/citus_check_cluster_node_health/11.0-1.sql" + #include "udfs/citus_internal_add_object_metadata/11.0-1.sql" DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text); diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index 0087ecf1c..e5a7cb2bd 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -41,4 +41,6 @@ COMMENT ON FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport intege IS 'removes node from the cluster temporarily'; DROP FUNCTION pg_catalog.citus_check_connection_to_node (text, integer); +DROP FUNCTION pg_catalog.citus_check_cluster_node_health (); + DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer); diff --git a/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/11.0-1.sql new file mode 100644 index 000000000..378e510ed --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/11.0-1.sql @@ -0,0 +1,13 @@ +CREATE FUNCTION pg_catalog.citus_check_cluster_node_health ( + OUT from_nodename text, + OUT from_nodeport int, + OUT to_nodename text, + OUT to_nodeport int, + OUT result bool ) + RETURNS SETOF RECORD + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME', $$citus_check_cluster_node_health$$; + +COMMENT ON FUNCTION pg_catalog.citus_check_cluster_node_health () + IS 'checks connections between all nodes in the cluster'; diff --git a/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/latest.sql b/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/latest.sql new file mode 100644 index 000000000..378e510ed --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/latest.sql @@ -0,0 +1,13 @@ +CREATE FUNCTION pg_catalog.citus_check_cluster_node_health ( + OUT from_nodename text, + OUT from_nodeport int, + OUT to_nodename text, + OUT to_nodeport int, + OUT result bool ) + RETURNS SETOF RECORD + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME', $$citus_check_cluster_node_health$$; + +COMMENT ON FUNCTION pg_catalog.citus_check_cluster_node_health () + IS 'checks connections between all nodes in the cluster'; diff --git a/src/test/regress/create_schedule b/src/test/regress/create_schedule index 50a299d7e..17fc6559b 100644 --- a/src/test/regress/create_schedule +++ b/src/test/regress/create_schedule @@ -3,3 +3,4 @@ test: prepared_statements_create_load ch_benchmarks_create_load test: dropped_columns_create_load distributed_planning_create_load test: local_dist_join_load test: partitioned_indexes_create +test: connectivity_checks diff --git a/src/test/regress/expected/connectivity_checks.out b/src/test/regress/expected/connectivity_checks.out new file mode 100644 index 000000000..9fbc61482 --- /dev/null +++ b/src/test/regress/expected/connectivity_checks.out @@ -0,0 +1,6 @@ +SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health(); + bool_and +--------------------------------------------------------------------- + t +(1 row) + diff --git a/src/test/regress/expected/failure_connection_establishment.out b/src/test/regress/expected/failure_connection_establishment.out index 199fb1d50..9a60f8302 100644 --- a/src/test/regress/expected/failure_connection_establishment.out +++ b/src/test/regress/expected/failure_connection_establishment.out @@ -319,6 +319,121 @@ SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); f (1 row) +-- tests for citus_check_cluster_node_health +-- kill all connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 9060 | localhost | 9060 | + localhost | 9060 | localhost | 57637 | + localhost | 57637 | localhost | 9060 | t + localhost | 57637 | localhost | 57637 | t +(4 rows) + +-- suggested summary queries for connectivity checks +SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health(); + bool_and +--------------------------------------------------------------------- + f +(1 row) + +SELECT result, count(*) FROM citus_check_cluster_node_health() GROUP BY result ORDER BY 1; + result | count +--------------------------------------------------------------------- + t | 2 + | 2 +(2 rows) + +-- cancel all connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); +ERROR: canceling statement due to user request +-- kill all but first connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 9060 | localhost | 9060 | t + localhost | 9060 | localhost | 57637 | + localhost | 57637 | localhost | 9060 | t + localhost | 57637 | localhost | 57637 | t +(4 rows) + +-- cancel all but first connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); +ERROR: canceling statement due to user request +-- kill all connections to this node +SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 9060 | localhost | 9060 | + localhost | 9060 | localhost | 57637 | + localhost | 57637 | localhost | 9060 | f + localhost | 57637 | localhost | 57637 | t +(4 rows) + +-- cancel all connections to this node +SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); +ERROR: canceling statement due to user request +-- kill connection checks to this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 9060 | localhost | 9060 | f + localhost | 9060 | localhost | 57637 | t + localhost | 57637 | localhost | 9060 | f + localhost | 57637 | localhost | 57637 | t +(4 rows) + +-- cancel connection checks to this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); +ERROR: canceling statement due to user request RESET client_min_messages; SELECT citus.mitmproxy('conn.allow()'); mitmproxy diff --git a/src/test/regress/expected/multi_citus_tools.out b/src/test/regress/expected/multi_citus_tools.out index 7c091a0ba..792839d87 100644 --- a/src/test/regress/expected/multi_citus_tools.out +++ b/src/test/regress/expected/multi_citus_tools.out @@ -629,6 +629,45 @@ SELECT citus_check_connection_to_node('localhost', :worker_2_port); (1 row) \c - - - :master_port +SELECT * FROM citus_check_cluster_node_health() ORDER BY 1,2,3,4; + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 57637 | localhost | 57637 | t + localhost | 57637 | localhost | 57638 | t + localhost | 57638 | localhost | 57637 | t + localhost | 57638 | localhost | 57638 | t +(4 rows) + +-- test cluster connectivity when we have broken nodes +SET client_min_messages TO ERROR; +SET citus.node_connection_timeout TO 10; +BEGIN; +INSERT INTO pg_dist_node VALUES + (123456789, 123456789, 'localhost', 123456789), + (1234567890, 1234567890, 'www.citusdata.com', 5432); +SELECT * FROM citus_check_cluster_node_health() ORDER BY 5,1,2,3,4; + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 57637 | localhost | 123456789 | f + localhost | 57637 | www.citusdata.com | 5432 | f + localhost | 57638 | localhost | 123456789 | f + localhost | 57638 | www.citusdata.com | 5432 | f + localhost | 57637 | localhost | 57637 | t + localhost | 57637 | localhost | 57638 | t + localhost | 57638 | localhost | 57637 | t + localhost | 57638 | localhost | 57638 | t + localhost | 123456789 | localhost | 57637 | + localhost | 123456789 | localhost | 57638 | + localhost | 123456789 | localhost | 123456789 | + localhost | 123456789 | www.citusdata.com | 5432 | + www.citusdata.com | 5432 | localhost | 57637 | + www.citusdata.com | 5432 | localhost | 57638 | + www.citusdata.com | 5432 | localhost | 123456789 | + www.citusdata.com | 5432 | www.citusdata.com | 5432 | +(16 rows) + +ROLLBACK; +RESET citus.node_connection_timeout; RESET client_min_messages; DROP SCHEMA tools CASCADE; RESET SEARCH_PATH; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index af21ebb7f..fe48f175d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -973,10 +973,11 @@ SELECT * FROM multi_extension.print_extension_changes(); function master_append_table_to_shard(bigint,text,text,integer) real | function master_apply_delete_command(text) integer | function master_get_table_metadata(text) record | + | function citus_check_cluster_node_health() SETOF record | function citus_check_connection_to_node(text,integer) boolean | function citus_disable_node(text,integer,boolean) void | function citus_internal_add_object_metadata(text,text[],text[],integer,integer) void -(7 rows) +(8 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_follower_select_statements.out b/src/test/regress/expected/multi_follower_select_statements.out index 13e09a6ca..9f92db197 100644 --- a/src/test/regress/expected/multi_follower_select_statements.out +++ b/src/test/regress/expected/multi_follower_select_statements.out @@ -159,6 +159,16 @@ SELECT * FROM the_table; 1 | 2 (2 rows) +-- Check for connectivity in the cluster +SELECT * FROM citus_check_cluster_node_health(); + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 9071 | localhost | 9071 | t + localhost | 9071 | localhost | 9072 | t + localhost | 9072 | localhost | 9071 | t + localhost | 9072 | localhost | 9072 | t +(4 rows) + -- clean up after ourselves \c -reuse-previous=off regression - - :master_port DROP TABLE the_table; diff --git a/src/test/regress/expected/multi_mx_add_coordinator.out b/src/test/regress/expected/multi_mx_add_coordinator.out index 88ca84b9c..f08fc78b9 100644 --- a/src/test/regress/expected/multi_mx_add_coordinator.out +++ b/src/test/regress/expected/multi_mx_add_coordinator.out @@ -1,9 +1,5 @@ CREATE SCHEMA mx_add_coordinator; SET search_path TO mx_add_coordinator,public; -SET citus.shard_replication_factor TO 1; -SET citus.shard_count TO 8; -SET citus.next_shard_id TO 7000000; -SET citus.next_placement_id TO 7000000; SET client_min_messages TO WARNING; CREATE USER reprefuser WITH LOGIN; SELECT run_command_on_workers('CREATE USER reprefuser WITH LOGIN'); @@ -16,12 +12,43 @@ SELECT run_command_on_workers('CREATE USER reprefuser WITH LOGIN'); SET citus.enable_alter_role_propagation TO ON; -- alter role for other than the extension owner works in enterprise, output differs accordingly ALTER ROLE reprefuser WITH CREATEDB; +-- check connectivity in the cluster +-- verify that we test for 4 node pairs before we add coordinator to metadata +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + bool_and | count +--------------------------------------------------------------------- + t | 4 +(1 row) + SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); ?column? --------------------------------------------------------------------- 1 (1 row) +-- verify that we test for 9 node pairs after we add coordinator to metadata +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + bool_and | count +--------------------------------------------------------------------- + t | 9 +(1 row) + +-- test that we can test for connectivity when connected to worker nodes as well +\c - - - :worker_1_port +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + bool_and | count +--------------------------------------------------------------------- + t | 9 +(1 row) + +\c - - - :master_port +-- set the configs after reconnecting to coordinator +SET search_path TO mx_add_coordinator,public; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 8; +SET citus.next_shard_id TO 7000000; +SET citus.next_placement_id TO 7000000; +SET client_min_messages TO WARNING; -- test that coordinator pg_dist_node entry is synced to the workers SELECT wait_until_metadata_sync(30000); wait_until_metadata_sync diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 53992aa93..9ffef1f70 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -38,6 +38,7 @@ ORDER BY 1; function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) function citus_add_secondary_node(text,integer,text,integer,name) function citus_blocking_pids(integer) + function citus_check_cluster_node_health() function citus_check_connection_to_node(text,integer) function citus_cleanup_orphaned_shards() function citus_conninfo_cache_invalidate() @@ -261,5 +262,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(245 rows) +(246 rows) diff --git a/src/test/regress/sql/connectivity_checks.sql b/src/test/regress/sql/connectivity_checks.sql new file mode 100644 index 000000000..070cf8024 --- /dev/null +++ b/src/test/regress/sql/connectivity_checks.sql @@ -0,0 +1 @@ +SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health(); diff --git a/src/test/regress/sql/failure_connection_establishment.sql b/src/test/regress/sql/failure_connection_establishment.sql index e76296cc9..a1c2c1e23 100644 --- a/src/test/regress/sql/failure_connection_establishment.sql +++ b/src/test/regress/sql/failure_connection_establishment.sql @@ -161,6 +161,45 @@ SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); SELECT citus.mitmproxy('conn.delay(500)'); SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); +-- tests for citus_check_cluster_node_health + +-- kill all connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").kill()'); +SELECT * FROM citus_check_cluster_node_health(); + +-- suggested summary queries for connectivity checks +SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health(); +SELECT result, count(*) FROM citus_check_cluster_node_health() GROUP BY result ORDER BY 1; + +-- cancel all connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").cancel(' || pg_backend_pid() || ')'); +SELECT * FROM citus_check_cluster_node_health(); + +-- kill all but first connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).kill()'); +SELECT * FROM citus_check_cluster_node_health(); + +-- cancel all but first connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).cancel(' || pg_backend_pid() || ')'); +SELECT * FROM citus_check_cluster_node_health(); + +-- kill all connections to this node +SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()'); +SELECT * FROM citus_check_cluster_node_health(); + +-- cancel all connections to this node +SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')'); +SELECT * FROM citus_check_cluster_node_health(); + +-- kill connection checks to this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").kill()'); +SELECT * FROM citus_check_cluster_node_health(); + +-- cancel connection checks to this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_pid() || ')'); +SELECT * FROM citus_check_cluster_node_health(); + + RESET client_min_messages; SELECT citus.mitmproxy('conn.allow()'); SET citus.node_connection_timeout TO DEFAULT; diff --git a/src/test/regress/sql/multi_citus_tools.sql b/src/test/regress/sql/multi_citus_tools.sql index 6db60c8d4..76a4a93b1 100644 --- a/src/test/regress/sql/multi_citus_tools.sql +++ b/src/test/regress/sql/multi_citus_tools.sql @@ -319,6 +319,20 @@ SELECT citus_check_connection_to_node('localhost', :worker_2_port); \c - - - :master_port +SELECT * FROM citus_check_cluster_node_health() ORDER BY 1,2,3,4; + +-- test cluster connectivity when we have broken nodes +SET client_min_messages TO ERROR; +SET citus.node_connection_timeout TO 10; + +BEGIN; +INSERT INTO pg_dist_node VALUES + (123456789, 123456789, 'localhost', 123456789), + (1234567890, 1234567890, 'www.citusdata.com', 5432); +SELECT * FROM citus_check_cluster_node_health() ORDER BY 5,1,2,3,4; +ROLLBACK; + +RESET citus.node_connection_timeout; RESET client_min_messages; DROP SCHEMA tools CASCADE; RESET SEARCH_PATH; diff --git a/src/test/regress/sql/multi_follower_select_statements.sql b/src/test/regress/sql/multi_follower_select_statements.sql index f0e7bd404..edaccc869 100644 --- a/src/test/regress/sql/multi_follower_select_statements.sql +++ b/src/test/regress/sql/multi_follower_select_statements.sql @@ -111,6 +111,9 @@ UPDATE pg_dist_node SET nodecluster = 'second-cluster' WHERE noderole = 'seconda \c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'" SELECT * FROM the_table; +-- Check for connectivity in the cluster +SELECT * FROM citus_check_cluster_node_health(); + -- clean up after ourselves \c -reuse-previous=off regression - - :master_port DROP TABLE the_table; diff --git a/src/test/regress/sql/multi_mx_add_coordinator.sql b/src/test/regress/sql/multi_mx_add_coordinator.sql index 86e17b3db..c318e78aa 100644 --- a/src/test/regress/sql/multi_mx_add_coordinator.sql +++ b/src/test/regress/sql/multi_mx_add_coordinator.sql @@ -1,9 +1,5 @@ CREATE SCHEMA mx_add_coordinator; SET search_path TO mx_add_coordinator,public; -SET citus.shard_replication_factor TO 1; -SET citus.shard_count TO 8; -SET citus.next_shard_id TO 7000000; -SET citus.next_placement_id TO 7000000; SET client_min_messages TO WARNING; CREATE USER reprefuser WITH LOGIN; @@ -12,8 +8,29 @@ SET citus.enable_alter_role_propagation TO ON; -- alter role for other than the extension owner works in enterprise, output differs accordingly ALTER ROLE reprefuser WITH CREATEDB; +-- check connectivity in the cluster +-- verify that we test for 4 node pairs before we add coordinator to metadata +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); +-- verify that we test for 9 node pairs after we add coordinator to metadata +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + +-- test that we can test for connectivity when connected to worker nodes as well +\c - - - :worker_1_port +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + +\c - - - :master_port + +-- set the configs after reconnecting to coordinator +SET search_path TO mx_add_coordinator,public; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 8; +SET citus.next_shard_id TO 7000000; +SET citus.next_placement_id TO 7000000; +SET client_min_messages TO WARNING; + -- test that coordinator pg_dist_node entry is synced to the workers SELECT wait_until_metadata_sync(30000); diff --git a/src/test/regress/sql_schedule b/src/test/regress/sql_schedule index 40395bcbf..a8073825a 100644 --- a/src/test/regress/sql_schedule +++ b/src/test/regress/sql_schedule @@ -5,3 +5,4 @@ test: ch_benchmarks_4 ch_benchmarks_5 ch_benchmarks_6 test: intermediate_result_pruning_queries_1 intermediate_result_pruning_queries_2 test: dropped_columns_1 distributed_planning test: local_dist_join +test: connectivity_checks