diff --git a/src/backend/distributed/operations/citus_tools.c b/src/backend/distributed/operations/citus_tools.c index 19d198c0c..564a51412 100644 --- a/src/backend/distributed/operations/citus_tools.c +++ b/src/backend/distributed/operations/citus_tools.c @@ -20,6 +20,9 @@ #include "distributed/multi_client_executor.h" #include "distributed/multi_server_executor.h" #include "distributed/remote_commands.h" +#include "distributed/listutils.h" +#include "distributed/lock_graph.h" +#include "distributed/tuplestore.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" #include "funcapi.h" @@ -30,8 +33,10 @@ /* simple query to run on workers to check connectivity */ #define CONNECTIVITY_CHECK_QUERY "SELECT 1" +#define CONNECTIVITY_CHECK_COLUMNS 5 PG_FUNCTION_INFO_V1(citus_check_connection_to_node); +PG_FUNCTION_INFO_V1(citus_check_cluster_node_health); PG_FUNCTION_INFO_V1(master_run_on_worker); static bool CheckConnectionToNode(char *nodeName, uint32 nodePort); @@ -61,6 +66,9 @@ static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor, StringInfo *nodeNameArray, int *nodePortArray, bool *statusArray, StringInfo *resultArray, int commandCount); +static void StoreAllConnectivityChecks(Tuplestorestate *tupleStore, + TupleDesc tupleDescriptor); +static char * GetConnectivityCheckCommand(const char *nodeName, const uint32 nodePort); /* @@ -93,6 +101,138 @@ CheckConnectionToNode(char *nodeName, uint32 nodePort) } +/* + * citus_check_cluster_node_health UDF performs connectivity checks from all the nodes to + * all the nodes, and report success status + */ +Datum +citus_check_cluster_node_health(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); + + StoreAllConnectivityChecks(tupleStore, tupleDescriptor); + + PG_RETURN_VOID(); +} + + +/* + * GetConnectivityCheckCommand returns the command to check connections to a node + */ +static char * +GetConnectivityCheckCommand(const char *nodeName, const uint32 nodePort) +{ + StringInfo connectivityCheckCommand = makeStringInfo(); + appendStringInfo(connectivityCheckCommand, + "SELECT citus_check_connection_to_node('%s', %d)", + nodeName, nodePort); + + return connectivityCheckCommand->data; +} + + +/* + * StoreAllConnectivityChecks performs connectivity checks from all the nodes to all the + * nodes, and report success status. + * + * Algorithm is: + * for sourceNode in activeReadableNodeList: + * c = connectToNode(sourceNode) + * for targetNode in activeReadableNodeList: + * result = c.execute("SELECT citus_check_connection_to_node(targetNode.name, targetNode.port") + * emit sourceNode.name, sourceNode.port, targetNode.name, targetNode.port, result + * + * -- result -> true -> connection attempt from source to target succeeded + * -- result -> false -> connection attempt from source to target failed + * -- result -> NULL -> connection attempt from the current node to source node failed + */ +static void +StoreAllConnectivityChecks(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) +{ + Datum values[CONNECTIVITY_CHECK_COLUMNS]; + bool isNulls[CONNECTIVITY_CHECK_COLUMNS]; + + /* + * Get all the readable node list so that we will check connectivity to followers in + * the cluster as well. + */ + List *workerNodeList = ActiveReadableNodeList(); + + /* we want to check for connectivity in a deterministic order */ + workerNodeList = SortList(workerNodeList, CompareWorkerNodes); + + /* + * We iterate over the workerNodeList twice, for source and target worker nodes. This + * operation is safe for foreach_ptr macro, as long as we use different variables for + * each iteration. + */ + WorkerNode *sourceWorkerNode = NULL; + foreach_ptr(sourceWorkerNode, workerNodeList) + { + const char *sourceNodeName = sourceWorkerNode->workerName; + const int sourceNodePort = sourceWorkerNode->workerPort; + int32 connectionFlags = 0; + + /* open a connection to the source node using the synchronous api */ + MultiConnection *connectionToSourceNode = + GetNodeConnection(connectionFlags, sourceNodeName, sourceNodePort); + + /* the second iteration over workerNodeList for the target worker nodes. */ + WorkerNode *targetWorkerNode = NULL; + foreach_ptr(targetWorkerNode, workerNodeList) + { + const char *targetNodeName = targetWorkerNode->workerName; + const int targetNodePort = targetWorkerNode->workerPort; + + char *connectivityCheckCommandToTargetNode = + GetConnectivityCheckCommand(targetNodeName, targetNodePort); + + PGresult *result = NULL; + int executionResult = + ExecuteOptionalRemoteCommand(connectionToSourceNode, + connectivityCheckCommandToTargetNode, + &result); + + /* get ready for the next tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[0] = PointerGetDatum(cstring_to_text(sourceNodeName)); + values[1] = Int32GetDatum(sourceNodePort); + values[2] = PointerGetDatum(cstring_to_text(targetNodeName)); + values[3] = Int32GetDatum(targetNodePort); + + /* + * If we could not send the query or the result was not ok, set success field + * to NULL. This may indicate connection errors to a worker node, however that + * node can potentially connect to other nodes. + * + * Therefore, we mark the success as NULL to indicate that the connectivity + * status is unknown. + */ + if (executionResult != RESPONSE_OKAY) + { + isNulls[4] = true; + } + else + { + int rowIndex = 0; + int columnIndex = 0; + values[4] = BoolGetDatum(ParseBoolField(result, rowIndex, columnIndex)); + } + + tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); + + PQclear(result); + ForgetResults(connectionToSourceNode); + } + } +} + + /* * master_run_on_worker executes queries/commands to run on specified worker and * returns success status and query/command result. Expected input is 3 arrays diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index 3b7814c63..c5f333152 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -4,6 +4,8 @@ #include "udfs/citus_disable_node/11.0-1.sql" #include "udfs/citus_check_connection_to_node/11.0-1.sql" +#include "udfs/citus_check_cluster_node_health/11.0-1.sql" + #include "udfs/citus_internal_add_object_metadata/11.0-1.sql" DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text); diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index 0087ecf1c..e5a7cb2bd 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -41,4 +41,6 @@ COMMENT ON FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport intege IS 'removes node from the cluster temporarily'; DROP FUNCTION pg_catalog.citus_check_connection_to_node (text, integer); +DROP FUNCTION pg_catalog.citus_check_cluster_node_health (); + DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer); diff --git a/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/11.0-1.sql new file mode 100644 index 000000000..378e510ed --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/11.0-1.sql @@ -0,0 +1,13 @@ +CREATE FUNCTION pg_catalog.citus_check_cluster_node_health ( + OUT from_nodename text, + OUT from_nodeport int, + OUT to_nodename text, + OUT to_nodeport int, + OUT result bool ) + RETURNS SETOF RECORD + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME', $$citus_check_cluster_node_health$$; + +COMMENT ON FUNCTION pg_catalog.citus_check_cluster_node_health () + IS 'checks connections between all nodes in the cluster'; diff --git a/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/latest.sql b/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/latest.sql new file mode 100644 index 000000000..378e510ed --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/latest.sql @@ -0,0 +1,13 @@ +CREATE FUNCTION pg_catalog.citus_check_cluster_node_health ( + OUT from_nodename text, + OUT from_nodeport int, + OUT to_nodename text, + OUT to_nodeport int, + OUT result bool ) + RETURNS SETOF RECORD + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME', $$citus_check_cluster_node_health$$; + +COMMENT ON FUNCTION pg_catalog.citus_check_cluster_node_health () + IS 'checks connections between all nodes in the cluster'; diff --git a/src/test/regress/create_schedule b/src/test/regress/create_schedule index 50a299d7e..17fc6559b 100644 --- a/src/test/regress/create_schedule +++ b/src/test/regress/create_schedule @@ -3,3 +3,4 @@ test: prepared_statements_create_load ch_benchmarks_create_load test: dropped_columns_create_load distributed_planning_create_load test: local_dist_join_load test: partitioned_indexes_create +test: connectivity_checks diff --git a/src/test/regress/expected/connectivity_checks.out b/src/test/regress/expected/connectivity_checks.out new file mode 100644 index 000000000..9fbc61482 --- /dev/null +++ b/src/test/regress/expected/connectivity_checks.out @@ -0,0 +1,6 @@ +SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health(); + bool_and +--------------------------------------------------------------------- + t +(1 row) + diff --git a/src/test/regress/expected/failure_connection_establishment.out b/src/test/regress/expected/failure_connection_establishment.out index 199fb1d50..9a60f8302 100644 --- a/src/test/regress/expected/failure_connection_establishment.out +++ b/src/test/regress/expected/failure_connection_establishment.out @@ -319,6 +319,121 @@ SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); f (1 row) +-- tests for citus_check_cluster_node_health +-- kill all connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 9060 | localhost | 9060 | + localhost | 9060 | localhost | 57637 | + localhost | 57637 | localhost | 9060 | t + localhost | 57637 | localhost | 57637 | t +(4 rows) + +-- suggested summary queries for connectivity checks +SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health(); + bool_and +--------------------------------------------------------------------- + f +(1 row) + +SELECT result, count(*) FROM citus_check_cluster_node_health() GROUP BY result ORDER BY 1; + result | count +--------------------------------------------------------------------- + t | 2 + | 2 +(2 rows) + +-- cancel all connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); +ERROR: canceling statement due to user request +-- kill all but first connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 9060 | localhost | 9060 | t + localhost | 9060 | localhost | 57637 | + localhost | 57637 | localhost | 9060 | t + localhost | 57637 | localhost | 57637 | t +(4 rows) + +-- cancel all but first connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); +ERROR: canceling statement due to user request +-- kill all connections to this node +SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 9060 | localhost | 9060 | + localhost | 9060 | localhost | 57637 | + localhost | 57637 | localhost | 9060 | f + localhost | 57637 | localhost | 57637 | t +(4 rows) + +-- cancel all connections to this node +SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); +ERROR: canceling statement due to user request +-- kill connection checks to this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 9060 | localhost | 9060 | f + localhost | 9060 | localhost | 57637 | t + localhost | 57637 | localhost | 9060 | f + localhost | 57637 | localhost | 57637 | t +(4 rows) + +-- cancel connection checks to this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); +ERROR: canceling statement due to user request RESET client_min_messages; SELECT citus.mitmproxy('conn.allow()'); mitmproxy diff --git a/src/test/regress/expected/multi_citus_tools.out b/src/test/regress/expected/multi_citus_tools.out index 7c091a0ba..792839d87 100644 --- a/src/test/regress/expected/multi_citus_tools.out +++ b/src/test/regress/expected/multi_citus_tools.out @@ -629,6 +629,45 @@ SELECT citus_check_connection_to_node('localhost', :worker_2_port); (1 row) \c - - - :master_port +SELECT * FROM citus_check_cluster_node_health() ORDER BY 1,2,3,4; + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 57637 | localhost | 57637 | t + localhost | 57637 | localhost | 57638 | t + localhost | 57638 | localhost | 57637 | t + localhost | 57638 | localhost | 57638 | t +(4 rows) + +-- test cluster connectivity when we have broken nodes +SET client_min_messages TO ERROR; +SET citus.node_connection_timeout TO 10; +BEGIN; +INSERT INTO pg_dist_node VALUES + (123456789, 123456789, 'localhost', 123456789), + (1234567890, 1234567890, 'www.citusdata.com', 5432); +SELECT * FROM citus_check_cluster_node_health() ORDER BY 5,1,2,3,4; + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 57637 | localhost | 123456789 | f + localhost | 57637 | www.citusdata.com | 5432 | f + localhost | 57638 | localhost | 123456789 | f + localhost | 57638 | www.citusdata.com | 5432 | f + localhost | 57637 | localhost | 57637 | t + localhost | 57637 | localhost | 57638 | t + localhost | 57638 | localhost | 57637 | t + localhost | 57638 | localhost | 57638 | t + localhost | 123456789 | localhost | 57637 | + localhost | 123456789 | localhost | 57638 | + localhost | 123456789 | localhost | 123456789 | + localhost | 123456789 | www.citusdata.com | 5432 | + www.citusdata.com | 5432 | localhost | 57637 | + www.citusdata.com | 5432 | localhost | 57638 | + www.citusdata.com | 5432 | localhost | 123456789 | + www.citusdata.com | 5432 | www.citusdata.com | 5432 | +(16 rows) + +ROLLBACK; +RESET citus.node_connection_timeout; RESET client_min_messages; DROP SCHEMA tools CASCADE; RESET SEARCH_PATH; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index af21ebb7f..fe48f175d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -973,10 +973,11 @@ SELECT * FROM multi_extension.print_extension_changes(); function master_append_table_to_shard(bigint,text,text,integer) real | function master_apply_delete_command(text) integer | function master_get_table_metadata(text) record | + | function citus_check_cluster_node_health() SETOF record | function citus_check_connection_to_node(text,integer) boolean | function citus_disable_node(text,integer,boolean) void | function citus_internal_add_object_metadata(text,text[],text[],integer,integer) void -(7 rows) +(8 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_follower_select_statements.out b/src/test/regress/expected/multi_follower_select_statements.out index 13e09a6ca..9f92db197 100644 --- a/src/test/regress/expected/multi_follower_select_statements.out +++ b/src/test/regress/expected/multi_follower_select_statements.out @@ -159,6 +159,16 @@ SELECT * FROM the_table; 1 | 2 (2 rows) +-- Check for connectivity in the cluster +SELECT * FROM citus_check_cluster_node_health(); + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 9071 | localhost | 9071 | t + localhost | 9071 | localhost | 9072 | t + localhost | 9072 | localhost | 9071 | t + localhost | 9072 | localhost | 9072 | t +(4 rows) + -- clean up after ourselves \c -reuse-previous=off regression - - :master_port DROP TABLE the_table; diff --git a/src/test/regress/expected/multi_mx_add_coordinator.out b/src/test/regress/expected/multi_mx_add_coordinator.out index 88ca84b9c..f08fc78b9 100644 --- a/src/test/regress/expected/multi_mx_add_coordinator.out +++ b/src/test/regress/expected/multi_mx_add_coordinator.out @@ -1,9 +1,5 @@ CREATE SCHEMA mx_add_coordinator; SET search_path TO mx_add_coordinator,public; -SET citus.shard_replication_factor TO 1; -SET citus.shard_count TO 8; -SET citus.next_shard_id TO 7000000; -SET citus.next_placement_id TO 7000000; SET client_min_messages TO WARNING; CREATE USER reprefuser WITH LOGIN; SELECT run_command_on_workers('CREATE USER reprefuser WITH LOGIN'); @@ -16,12 +12,43 @@ SELECT run_command_on_workers('CREATE USER reprefuser WITH LOGIN'); SET citus.enable_alter_role_propagation TO ON; -- alter role for other than the extension owner works in enterprise, output differs accordingly ALTER ROLE reprefuser WITH CREATEDB; +-- check connectivity in the cluster +-- verify that we test for 4 node pairs before we add coordinator to metadata +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + bool_and | count +--------------------------------------------------------------------- + t | 4 +(1 row) + SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); ?column? --------------------------------------------------------------------- 1 (1 row) +-- verify that we test for 9 node pairs after we add coordinator to metadata +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + bool_and | count +--------------------------------------------------------------------- + t | 9 +(1 row) + +-- test that we can test for connectivity when connected to worker nodes as well +\c - - - :worker_1_port +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + bool_and | count +--------------------------------------------------------------------- + t | 9 +(1 row) + +\c - - - :master_port +-- set the configs after reconnecting to coordinator +SET search_path TO mx_add_coordinator,public; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 8; +SET citus.next_shard_id TO 7000000; +SET citus.next_placement_id TO 7000000; +SET client_min_messages TO WARNING; -- test that coordinator pg_dist_node entry is synced to the workers SELECT wait_until_metadata_sync(30000); wait_until_metadata_sync diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 53992aa93..9ffef1f70 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -38,6 +38,7 @@ ORDER BY 1; function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) function citus_add_secondary_node(text,integer,text,integer,name) function citus_blocking_pids(integer) + function citus_check_cluster_node_health() function citus_check_connection_to_node(text,integer) function citus_cleanup_orphaned_shards() function citus_conninfo_cache_invalidate() @@ -261,5 +262,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(245 rows) +(246 rows) diff --git a/src/test/regress/sql/connectivity_checks.sql b/src/test/regress/sql/connectivity_checks.sql new file mode 100644 index 000000000..070cf8024 --- /dev/null +++ b/src/test/regress/sql/connectivity_checks.sql @@ -0,0 +1 @@ +SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health(); diff --git a/src/test/regress/sql/failure_connection_establishment.sql b/src/test/regress/sql/failure_connection_establishment.sql index e76296cc9..a1c2c1e23 100644 --- a/src/test/regress/sql/failure_connection_establishment.sql +++ b/src/test/regress/sql/failure_connection_establishment.sql @@ -161,6 +161,45 @@ SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); SELECT citus.mitmproxy('conn.delay(500)'); SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); +-- tests for citus_check_cluster_node_health + +-- kill all connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").kill()'); +SELECT * FROM citus_check_cluster_node_health(); + +-- suggested summary queries for connectivity checks +SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health(); +SELECT result, count(*) FROM citus_check_cluster_node_health() GROUP BY result ORDER BY 1; + +-- cancel all connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").cancel(' || pg_backend_pid() || ')'); +SELECT * FROM citus_check_cluster_node_health(); + +-- kill all but first connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).kill()'); +SELECT * FROM citus_check_cluster_node_health(); + +-- cancel all but first connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).cancel(' || pg_backend_pid() || ')'); +SELECT * FROM citus_check_cluster_node_health(); + +-- kill all connections to this node +SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()'); +SELECT * FROM citus_check_cluster_node_health(); + +-- cancel all connections to this node +SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')'); +SELECT * FROM citus_check_cluster_node_health(); + +-- kill connection checks to this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").kill()'); +SELECT * FROM citus_check_cluster_node_health(); + +-- cancel connection checks to this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_pid() || ')'); +SELECT * FROM citus_check_cluster_node_health(); + + RESET client_min_messages; SELECT citus.mitmproxy('conn.allow()'); SET citus.node_connection_timeout TO DEFAULT; diff --git a/src/test/regress/sql/multi_citus_tools.sql b/src/test/regress/sql/multi_citus_tools.sql index 6db60c8d4..76a4a93b1 100644 --- a/src/test/regress/sql/multi_citus_tools.sql +++ b/src/test/regress/sql/multi_citus_tools.sql @@ -319,6 +319,20 @@ SELECT citus_check_connection_to_node('localhost', :worker_2_port); \c - - - :master_port +SELECT * FROM citus_check_cluster_node_health() ORDER BY 1,2,3,4; + +-- test cluster connectivity when we have broken nodes +SET client_min_messages TO ERROR; +SET citus.node_connection_timeout TO 10; + +BEGIN; +INSERT INTO pg_dist_node VALUES + (123456789, 123456789, 'localhost', 123456789), + (1234567890, 1234567890, 'www.citusdata.com', 5432); +SELECT * FROM citus_check_cluster_node_health() ORDER BY 5,1,2,3,4; +ROLLBACK; + +RESET citus.node_connection_timeout; RESET client_min_messages; DROP SCHEMA tools CASCADE; RESET SEARCH_PATH; diff --git a/src/test/regress/sql/multi_follower_select_statements.sql b/src/test/regress/sql/multi_follower_select_statements.sql index f0e7bd404..edaccc869 100644 --- a/src/test/regress/sql/multi_follower_select_statements.sql +++ b/src/test/regress/sql/multi_follower_select_statements.sql @@ -111,6 +111,9 @@ UPDATE pg_dist_node SET nodecluster = 'second-cluster' WHERE noderole = 'seconda \c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'" SELECT * FROM the_table; +-- Check for connectivity in the cluster +SELECT * FROM citus_check_cluster_node_health(); + -- clean up after ourselves \c -reuse-previous=off regression - - :master_port DROP TABLE the_table; diff --git a/src/test/regress/sql/multi_mx_add_coordinator.sql b/src/test/regress/sql/multi_mx_add_coordinator.sql index 86e17b3db..c318e78aa 100644 --- a/src/test/regress/sql/multi_mx_add_coordinator.sql +++ b/src/test/regress/sql/multi_mx_add_coordinator.sql @@ -1,9 +1,5 @@ CREATE SCHEMA mx_add_coordinator; SET search_path TO mx_add_coordinator,public; -SET citus.shard_replication_factor TO 1; -SET citus.shard_count TO 8; -SET citus.next_shard_id TO 7000000; -SET citus.next_placement_id TO 7000000; SET client_min_messages TO WARNING; CREATE USER reprefuser WITH LOGIN; @@ -12,8 +8,29 @@ SET citus.enable_alter_role_propagation TO ON; -- alter role for other than the extension owner works in enterprise, output differs accordingly ALTER ROLE reprefuser WITH CREATEDB; +-- check connectivity in the cluster +-- verify that we test for 4 node pairs before we add coordinator to metadata +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); +-- verify that we test for 9 node pairs after we add coordinator to metadata +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + +-- test that we can test for connectivity when connected to worker nodes as well +\c - - - :worker_1_port +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + +\c - - - :master_port + +-- set the configs after reconnecting to coordinator +SET search_path TO mx_add_coordinator,public; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 8; +SET citus.next_shard_id TO 7000000; +SET citus.next_placement_id TO 7000000; +SET client_min_messages TO WARNING; + -- test that coordinator pg_dist_node entry is synced to the workers SELECT wait_until_metadata_sync(30000); diff --git a/src/test/regress/sql_schedule b/src/test/regress/sql_schedule index 40395bcbf..a8073825a 100644 --- a/src/test/regress/sql_schedule +++ b/src/test/regress/sql_schedule @@ -5,3 +5,4 @@ test: ch_benchmarks_4 ch_benchmarks_5 ch_benchmarks_6 test: intermediate_result_pruning_queries_1 intermediate_result_pruning_queries_2 test: dropped_columns_1 distributed_planning test: local_dist_join +test: connectivity_checks