mirror of https://github.com/citusdata/citus.git
Introduce citus_check_cluster_node_health UDF
This UDF coordinates connectivity checks accross the whole cluster. This UDF gets the list of active readable nodes in the cluster, and coordinates all connectivity checks in sequential order. The algorithm is: for sourceNode in activeReadableWorkerList: c = connectToNode(sourceNode) for targetNode in activeReadableWorkerList: result = c.execute( "SELECT citus_check_connection_to_node(targetNode.name, targetNode.port") emit sourceNode.name, sourceNode.port, targetNode.name, targetNode.port, result - result -> true -> connection attempt from source to target succeeded - result -> false -> connection attempt from source to target failed - result -> NULL -> connection attempt from the current node to source node failed I suggest you use the following query to get an overview on the connectivity: SELECT bool_and(COALESCE(result, false)) FROM citus_check_cluster_node_health(); Whenever this query returns false, there is a connectivity issue, check in detail.pull/5528/head^2
parent
13fff9c37a
commit
29e4516642
|
@ -20,6 +20,9 @@
|
|||
#include "distributed/multi_client_executor.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/lock_graph.h"
|
||||
#include "distributed/tuplestore.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "funcapi.h"
|
||||
|
@ -30,8 +33,10 @@
|
|||
|
||||
/* simple query to run on workers to check connectivity */
|
||||
#define CONNECTIVITY_CHECK_QUERY "SELECT 1"
|
||||
#define CONNECTIVITY_CHECK_COLUMNS 5
|
||||
|
||||
PG_FUNCTION_INFO_V1(citus_check_connection_to_node);
|
||||
PG_FUNCTION_INFO_V1(citus_check_cluster_node_health);
|
||||
PG_FUNCTION_INFO_V1(master_run_on_worker);
|
||||
|
||||
static bool CheckConnectionToNode(char *nodeName, uint32 nodePort);
|
||||
|
@ -61,6 +66,9 @@ static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor,
|
|||
StringInfo *nodeNameArray, int *nodePortArray,
|
||||
bool *statusArray,
|
||||
StringInfo *resultArray, int commandCount);
|
||||
static void StoreAllConnectivityChecks(Tuplestorestate *tupleStore,
|
||||
TupleDesc tupleDescriptor);
|
||||
static char * GetConnectivityCheckCommand(const char *nodeName, const uint32 nodePort);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -93,6 +101,138 @@ CheckConnectionToNode(char *nodeName, uint32 nodePort)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* citus_check_cluster_node_health UDF performs connectivity checks from all the nodes to
|
||||
* all the nodes, and report success status
|
||||
*/
|
||||
Datum
|
||||
citus_check_cluster_node_health(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
||||
|
||||
StoreAllConnectivityChecks(tupleStore, tupleDescriptor);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetConnectivityCheckCommand returns the command to check connections to a node
|
||||
*/
|
||||
static char *
|
||||
GetConnectivityCheckCommand(const char *nodeName, const uint32 nodePort)
|
||||
{
|
||||
StringInfo connectivityCheckCommand = makeStringInfo();
|
||||
appendStringInfo(connectivityCheckCommand,
|
||||
"SELECT citus_check_connection_to_node('%s', %d)",
|
||||
nodeName, nodePort);
|
||||
|
||||
return connectivityCheckCommand->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StoreAllConnectivityChecks performs connectivity checks from all the nodes to all the
|
||||
* nodes, and report success status.
|
||||
*
|
||||
* Algorithm is:
|
||||
* for sourceNode in activeReadableNodeList:
|
||||
* c = connectToNode(sourceNode)
|
||||
* for targetNode in activeReadableNodeList:
|
||||
* result = c.execute("SELECT citus_check_connection_to_node(targetNode.name, targetNode.port")
|
||||
* emit sourceNode.name, sourceNode.port, targetNode.name, targetNode.port, result
|
||||
*
|
||||
* -- result -> true -> connection attempt from source to target succeeded
|
||||
* -- result -> false -> connection attempt from source to target failed
|
||||
* -- result -> NULL -> connection attempt from the current node to source node failed
|
||||
*/
|
||||
static void
|
||||
StoreAllConnectivityChecks(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
|
||||
{
|
||||
Datum values[CONNECTIVITY_CHECK_COLUMNS];
|
||||
bool isNulls[CONNECTIVITY_CHECK_COLUMNS];
|
||||
|
||||
/*
|
||||
* Get all the readable node list so that we will check connectivity to followers in
|
||||
* the cluster as well.
|
||||
*/
|
||||
List *workerNodeList = ActiveReadableNodeList();
|
||||
|
||||
/* we want to check for connectivity in a deterministic order */
|
||||
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
|
||||
|
||||
/*
|
||||
* We iterate over the workerNodeList twice, for source and target worker nodes. This
|
||||
* operation is safe for foreach_ptr macro, as long as we use different variables for
|
||||
* each iteration.
|
||||
*/
|
||||
WorkerNode *sourceWorkerNode = NULL;
|
||||
foreach_ptr(sourceWorkerNode, workerNodeList)
|
||||
{
|
||||
const char *sourceNodeName = sourceWorkerNode->workerName;
|
||||
const int sourceNodePort = sourceWorkerNode->workerPort;
|
||||
int32 connectionFlags = 0;
|
||||
|
||||
/* open a connection to the source node using the synchronous api */
|
||||
MultiConnection *connectionToSourceNode =
|
||||
GetNodeConnection(connectionFlags, sourceNodeName, sourceNodePort);
|
||||
|
||||
/* the second iteration over workerNodeList for the target worker nodes. */
|
||||
WorkerNode *targetWorkerNode = NULL;
|
||||
foreach_ptr(targetWorkerNode, workerNodeList)
|
||||
{
|
||||
const char *targetNodeName = targetWorkerNode->workerName;
|
||||
const int targetNodePort = targetWorkerNode->workerPort;
|
||||
|
||||
char *connectivityCheckCommandToTargetNode =
|
||||
GetConnectivityCheckCommand(targetNodeName, targetNodePort);
|
||||
|
||||
PGresult *result = NULL;
|
||||
int executionResult =
|
||||
ExecuteOptionalRemoteCommand(connectionToSourceNode,
|
||||
connectivityCheckCommandToTargetNode,
|
||||
&result);
|
||||
|
||||
/* get ready for the next tuple */
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(isNulls, false, sizeof(isNulls));
|
||||
|
||||
values[0] = PointerGetDatum(cstring_to_text(sourceNodeName));
|
||||
values[1] = Int32GetDatum(sourceNodePort);
|
||||
values[2] = PointerGetDatum(cstring_to_text(targetNodeName));
|
||||
values[3] = Int32GetDatum(targetNodePort);
|
||||
|
||||
/*
|
||||
* If we could not send the query or the result was not ok, set success field
|
||||
* to NULL. This may indicate connection errors to a worker node, however that
|
||||
* node can potentially connect to other nodes.
|
||||
*
|
||||
* Therefore, we mark the success as NULL to indicate that the connectivity
|
||||
* status is unknown.
|
||||
*/
|
||||
if (executionResult != RESPONSE_OKAY)
|
||||
{
|
||||
isNulls[4] = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
int rowIndex = 0;
|
||||
int columnIndex = 0;
|
||||
values[4] = BoolGetDatum(ParseBoolField(result, rowIndex, columnIndex));
|
||||
}
|
||||
|
||||
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
||||
|
||||
PQclear(result);
|
||||
ForgetResults(connectionToSourceNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* master_run_on_worker executes queries/commands to run on specified worker and
|
||||
* returns success status and query/command result. Expected input is 3 arrays
|
||||
|
|
|
@ -4,6 +4,8 @@
|
|||
#include "udfs/citus_disable_node/11.0-1.sql"
|
||||
|
||||
#include "udfs/citus_check_connection_to_node/11.0-1.sql"
|
||||
#include "udfs/citus_check_cluster_node_health/11.0-1.sql"
|
||||
|
||||
#include "udfs/citus_internal_add_object_metadata/11.0-1.sql"
|
||||
|
||||
DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text);
|
||||
|
|
|
@ -41,4 +41,6 @@ COMMENT ON FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport intege
|
|||
IS 'removes node from the cluster temporarily';
|
||||
|
||||
DROP FUNCTION pg_catalog.citus_check_connection_to_node (text, integer);
|
||||
DROP FUNCTION pg_catalog.citus_check_cluster_node_health ();
|
||||
|
||||
DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer);
|
||||
|
|
13
src/backend/distributed/sql/udfs/citus_check_cluster_node_health/11.0-1.sql
generated
Normal file
13
src/backend/distributed/sql/udfs/citus_check_cluster_node_health/11.0-1.sql
generated
Normal file
|
@ -0,0 +1,13 @@
|
|||
CREATE FUNCTION pg_catalog.citus_check_cluster_node_health (
|
||||
OUT from_nodename text,
|
||||
OUT from_nodeport int,
|
||||
OUT to_nodename text,
|
||||
OUT to_nodeport int,
|
||||
OUT result bool )
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C
|
||||
STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_check_cluster_node_health$$;
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.citus_check_cluster_node_health ()
|
||||
IS 'checks connections between all nodes in the cluster';
|
|
@ -0,0 +1,13 @@
|
|||
CREATE FUNCTION pg_catalog.citus_check_cluster_node_health (
|
||||
OUT from_nodename text,
|
||||
OUT from_nodeport int,
|
||||
OUT to_nodename text,
|
||||
OUT to_nodeport int,
|
||||
OUT result bool )
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C
|
||||
STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_check_cluster_node_health$$;
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.citus_check_cluster_node_health ()
|
||||
IS 'checks connections between all nodes in the cluster';
|
|
@ -3,3 +3,4 @@ test: prepared_statements_create_load ch_benchmarks_create_load
|
|||
test: dropped_columns_create_load distributed_planning_create_load
|
||||
test: local_dist_join_load
|
||||
test: partitioned_indexes_create
|
||||
test: connectivity_checks
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health();
|
||||
bool_and
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
|
@ -319,6 +319,121 @@ SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
|
|||
f
|
||||
(1 row)
|
||||
|
||||
-- tests for citus_check_cluster_node_health
|
||||
-- kill all connectivity checks that originate from this node
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").kill()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
from_nodename | from_nodeport | to_nodename | to_nodeport | result
|
||||
---------------------------------------------------------------------
|
||||
localhost | 9060 | localhost | 9060 |
|
||||
localhost | 9060 | localhost | 57637 |
|
||||
localhost | 57637 | localhost | 9060 | t
|
||||
localhost | 57637 | localhost | 57637 | t
|
||||
(4 rows)
|
||||
|
||||
-- suggested summary queries for connectivity checks
|
||||
SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health();
|
||||
bool_and
|
||||
---------------------------------------------------------------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
SELECT result, count(*) FROM citus_check_cluster_node_health() GROUP BY result ORDER BY 1;
|
||||
result | count
|
||||
---------------------------------------------------------------------
|
||||
t | 2
|
||||
| 2
|
||||
(2 rows)
|
||||
|
||||
-- cancel all connectivity checks that originate from this node
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
ERROR: canceling statement due to user request
|
||||
-- kill all but first connectivity checks that originate from this node
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).kill()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
from_nodename | from_nodeport | to_nodename | to_nodeport | result
|
||||
---------------------------------------------------------------------
|
||||
localhost | 9060 | localhost | 9060 | t
|
||||
localhost | 9060 | localhost | 57637 |
|
||||
localhost | 57637 | localhost | 9060 | t
|
||||
localhost | 57637 | localhost | 57637 | t
|
||||
(4 rows)
|
||||
|
||||
-- cancel all but first connectivity checks that originate from this node
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
ERROR: canceling statement due to user request
|
||||
-- kill all connections to this node
|
||||
SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
from_nodename | from_nodeport | to_nodename | to_nodeport | result
|
||||
---------------------------------------------------------------------
|
||||
localhost | 9060 | localhost | 9060 |
|
||||
localhost | 9060 | localhost | 57637 |
|
||||
localhost | 57637 | localhost | 9060 | f
|
||||
localhost | 57637 | localhost | 57637 | t
|
||||
(4 rows)
|
||||
|
||||
-- cancel all connections to this node
|
||||
SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
ERROR: canceling statement due to user request
|
||||
-- kill connection checks to this node
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").kill()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
from_nodename | from_nodeport | to_nodename | to_nodeport | result
|
||||
---------------------------------------------------------------------
|
||||
localhost | 9060 | localhost | 9060 | f
|
||||
localhost | 9060 | localhost | 57637 | t
|
||||
localhost | 57637 | localhost | 9060 | f
|
||||
localhost | 57637 | localhost | 57637 | t
|
||||
(4 rows)
|
||||
|
||||
-- cancel connection checks to this node
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
ERROR: canceling statement due to user request
|
||||
RESET client_min_messages;
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
|
|
|
@ -629,6 +629,45 @@ SELECT citus_check_connection_to_node('localhost', :worker_2_port);
|
|||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
SELECT * FROM citus_check_cluster_node_health() ORDER BY 1,2,3,4;
|
||||
from_nodename | from_nodeport | to_nodename | to_nodeport | result
|
||||
---------------------------------------------------------------------
|
||||
localhost | 57637 | localhost | 57637 | t
|
||||
localhost | 57637 | localhost | 57638 | t
|
||||
localhost | 57638 | localhost | 57637 | t
|
||||
localhost | 57638 | localhost | 57638 | t
|
||||
(4 rows)
|
||||
|
||||
-- test cluster connectivity when we have broken nodes
|
||||
SET client_min_messages TO ERROR;
|
||||
SET citus.node_connection_timeout TO 10;
|
||||
BEGIN;
|
||||
INSERT INTO pg_dist_node VALUES
|
||||
(123456789, 123456789, 'localhost', 123456789),
|
||||
(1234567890, 1234567890, 'www.citusdata.com', 5432);
|
||||
SELECT * FROM citus_check_cluster_node_health() ORDER BY 5,1,2,3,4;
|
||||
from_nodename | from_nodeport | to_nodename | to_nodeport | result
|
||||
---------------------------------------------------------------------
|
||||
localhost | 57637 | localhost | 123456789 | f
|
||||
localhost | 57637 | www.citusdata.com | 5432 | f
|
||||
localhost | 57638 | localhost | 123456789 | f
|
||||
localhost | 57638 | www.citusdata.com | 5432 | f
|
||||
localhost | 57637 | localhost | 57637 | t
|
||||
localhost | 57637 | localhost | 57638 | t
|
||||
localhost | 57638 | localhost | 57637 | t
|
||||
localhost | 57638 | localhost | 57638 | t
|
||||
localhost | 123456789 | localhost | 57637 |
|
||||
localhost | 123456789 | localhost | 57638 |
|
||||
localhost | 123456789 | localhost | 123456789 |
|
||||
localhost | 123456789 | www.citusdata.com | 5432 |
|
||||
www.citusdata.com | 5432 | localhost | 57637 |
|
||||
www.citusdata.com | 5432 | localhost | 57638 |
|
||||
www.citusdata.com | 5432 | localhost | 123456789 |
|
||||
www.citusdata.com | 5432 | www.citusdata.com | 5432 |
|
||||
(16 rows)
|
||||
|
||||
ROLLBACK;
|
||||
RESET citus.node_connection_timeout;
|
||||
RESET client_min_messages;
|
||||
DROP SCHEMA tools CASCADE;
|
||||
RESET SEARCH_PATH;
|
||||
|
|
|
@ -973,10 +973,11 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
function master_append_table_to_shard(bigint,text,text,integer) real |
|
||||
function master_apply_delete_command(text) integer |
|
||||
function master_get_table_metadata(text) record |
|
||||
| function citus_check_cluster_node_health() SETOF record
|
||||
| function citus_check_connection_to_node(text,integer) boolean
|
||||
| function citus_disable_node(text,integer,boolean) void
|
||||
| function citus_internal_add_object_metadata(text,text[],text[],integer,integer) void
|
||||
(7 rows)
|
||||
(8 rows)
|
||||
|
||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -159,6 +159,16 @@ SELECT * FROM the_table;
|
|||
1 | 2
|
||||
(2 rows)
|
||||
|
||||
-- Check for connectivity in the cluster
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
from_nodename | from_nodeport | to_nodename | to_nodeport | result
|
||||
---------------------------------------------------------------------
|
||||
localhost | 9071 | localhost | 9071 | t
|
||||
localhost | 9071 | localhost | 9072 | t
|
||||
localhost | 9072 | localhost | 9071 | t
|
||||
localhost | 9072 | localhost | 9072 | t
|
||||
(4 rows)
|
||||
|
||||
-- clean up after ourselves
|
||||
\c -reuse-previous=off regression - - :master_port
|
||||
DROP TABLE the_table;
|
||||
|
|
|
@ -1,9 +1,5 @@
|
|||
CREATE SCHEMA mx_add_coordinator;
|
||||
SET search_path TO mx_add_coordinator,public;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count TO 8;
|
||||
SET citus.next_shard_id TO 7000000;
|
||||
SET citus.next_placement_id TO 7000000;
|
||||
SET client_min_messages TO WARNING;
|
||||
CREATE USER reprefuser WITH LOGIN;
|
||||
SELECT run_command_on_workers('CREATE USER reprefuser WITH LOGIN');
|
||||
|
@ -16,12 +12,43 @@ SELECT run_command_on_workers('CREATE USER reprefuser WITH LOGIN');
|
|||
SET citus.enable_alter_role_propagation TO ON;
|
||||
-- alter role for other than the extension owner works in enterprise, output differs accordingly
|
||||
ALTER ROLE reprefuser WITH CREATEDB;
|
||||
-- check connectivity in the cluster
|
||||
-- verify that we test for 4 node pairs before we add coordinator to metadata
|
||||
SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health();
|
||||
bool_and | count
|
||||
---------------------------------------------------------------------
|
||||
t | 4
|
||||
(1 row)
|
||||
|
||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- verify that we test for 9 node pairs after we add coordinator to metadata
|
||||
SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health();
|
||||
bool_and | count
|
||||
---------------------------------------------------------------------
|
||||
t | 9
|
||||
(1 row)
|
||||
|
||||
-- test that we can test for connectivity when connected to worker nodes as well
|
||||
\c - - - :worker_1_port
|
||||
SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health();
|
||||
bool_and | count
|
||||
---------------------------------------------------------------------
|
||||
t | 9
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
-- set the configs after reconnecting to coordinator
|
||||
SET search_path TO mx_add_coordinator,public;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count TO 8;
|
||||
SET citus.next_shard_id TO 7000000;
|
||||
SET citus.next_placement_id TO 7000000;
|
||||
SET client_min_messages TO WARNING;
|
||||
-- test that coordinator pg_dist_node entry is synced to the workers
|
||||
SELECT wait_until_metadata_sync(30000);
|
||||
wait_until_metadata_sync
|
||||
|
|
|
@ -38,6 +38,7 @@ ORDER BY 1;
|
|||
function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real)
|
||||
function citus_add_secondary_node(text,integer,text,integer,name)
|
||||
function citus_blocking_pids(integer)
|
||||
function citus_check_cluster_node_health()
|
||||
function citus_check_connection_to_node(text,integer)
|
||||
function citus_cleanup_orphaned_shards()
|
||||
function citus_conninfo_cache_invalidate()
|
||||
|
@ -261,5 +262,5 @@ ORDER BY 1;
|
|||
view citus_worker_stat_activity
|
||||
view pg_dist_shard_placement
|
||||
view time_partitions
|
||||
(245 rows)
|
||||
(246 rows)
|
||||
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health();
|
|
@ -161,6 +161,45 @@ SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
|
|||
SELECT citus.mitmproxy('conn.delay(500)');
|
||||
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
|
||||
|
||||
-- tests for citus_check_cluster_node_health
|
||||
|
||||
-- kill all connectivity checks that originate from this node
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").kill()');
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
|
||||
-- suggested summary queries for connectivity checks
|
||||
SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health();
|
||||
SELECT result, count(*) FROM citus_check_cluster_node_health() GROUP BY result ORDER BY 1;
|
||||
|
||||
-- cancel all connectivity checks that originate from this node
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
|
||||
-- kill all but first connectivity checks that originate from this node
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).kill()');
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
|
||||
-- cancel all but first connectivity checks that originate from this node
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).cancel(' || pg_backend_pid() || ')');
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
|
||||
-- kill all connections to this node
|
||||
SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()');
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
|
||||
-- cancel all connections to this node
|
||||
SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')');
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
|
||||
-- kill connection checks to this node
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").kill()');
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
|
||||
-- cancel connection checks to this node
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
|
||||
|
||||
RESET client_min_messages;
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SET citus.node_connection_timeout TO DEFAULT;
|
||||
|
|
|
@ -319,6 +319,20 @@ SELECT citus_check_connection_to_node('localhost', :worker_2_port);
|
|||
|
||||
\c - - - :master_port
|
||||
|
||||
SELECT * FROM citus_check_cluster_node_health() ORDER BY 1,2,3,4;
|
||||
|
||||
-- test cluster connectivity when we have broken nodes
|
||||
SET client_min_messages TO ERROR;
|
||||
SET citus.node_connection_timeout TO 10;
|
||||
|
||||
BEGIN;
|
||||
INSERT INTO pg_dist_node VALUES
|
||||
(123456789, 123456789, 'localhost', 123456789),
|
||||
(1234567890, 1234567890, 'www.citusdata.com', 5432);
|
||||
SELECT * FROM citus_check_cluster_node_health() ORDER BY 5,1,2,3,4;
|
||||
ROLLBACK;
|
||||
|
||||
RESET citus.node_connection_timeout;
|
||||
RESET client_min_messages;
|
||||
DROP SCHEMA tools CASCADE;
|
||||
RESET SEARCH_PATH;
|
||||
|
|
|
@ -111,6 +111,9 @@ UPDATE pg_dist_node SET nodecluster = 'second-cluster' WHERE noderole = 'seconda
|
|||
\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'"
|
||||
SELECT * FROM the_table;
|
||||
|
||||
-- Check for connectivity in the cluster
|
||||
SELECT * FROM citus_check_cluster_node_health();
|
||||
|
||||
-- clean up after ourselves
|
||||
\c -reuse-previous=off regression - - :master_port
|
||||
DROP TABLE the_table;
|
||||
|
|
|
@ -1,9 +1,5 @@
|
|||
CREATE SCHEMA mx_add_coordinator;
|
||||
SET search_path TO mx_add_coordinator,public;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count TO 8;
|
||||
SET citus.next_shard_id TO 7000000;
|
||||
SET citus.next_placement_id TO 7000000;
|
||||
SET client_min_messages TO WARNING;
|
||||
|
||||
CREATE USER reprefuser WITH LOGIN;
|
||||
|
@ -12,8 +8,29 @@ SET citus.enable_alter_role_propagation TO ON;
|
|||
-- alter role for other than the extension owner works in enterprise, output differs accordingly
|
||||
ALTER ROLE reprefuser WITH CREATEDB;
|
||||
|
||||
-- check connectivity in the cluster
|
||||
-- verify that we test for 4 node pairs before we add coordinator to metadata
|
||||
SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health();
|
||||
|
||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
|
||||
|
||||
-- verify that we test for 9 node pairs after we add coordinator to metadata
|
||||
SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health();
|
||||
|
||||
-- test that we can test for connectivity when connected to worker nodes as well
|
||||
\c - - - :worker_1_port
|
||||
SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health();
|
||||
|
||||
\c - - - :master_port
|
||||
|
||||
-- set the configs after reconnecting to coordinator
|
||||
SET search_path TO mx_add_coordinator,public;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count TO 8;
|
||||
SET citus.next_shard_id TO 7000000;
|
||||
SET citus.next_placement_id TO 7000000;
|
||||
SET client_min_messages TO WARNING;
|
||||
|
||||
-- test that coordinator pg_dist_node entry is synced to the workers
|
||||
SELECT wait_until_metadata_sync(30000);
|
||||
|
||||
|
|
|
@ -5,3 +5,4 @@ test: ch_benchmarks_4 ch_benchmarks_5 ch_benchmarks_6
|
|||
test: intermediate_result_pruning_queries_1 intermediate_result_pruning_queries_2
|
||||
test: dropped_columns_1 distributed_planning
|
||||
test: local_dist_join
|
||||
test: connectivity_checks
|
||||
|
|
Loading…
Reference in New Issue