add more tests, fix failed conn tracking and always enable the feat for tests

pull/7917/head
Onur Tirtir 2025-04-22 12:26:50 +03:00
parent 75fe86a0a5
commit 3c6240639c
12 changed files with 322 additions and 35 deletions

View File

@ -1001,6 +1001,14 @@ FinishConnectionListEstablishment(List *multiConnectionList)
{
waitCount++;
}
else if (connectionState->phase == MULTI_CONNECTION_PHASE_ERROR)
{
/*
* Here we count the connections establishments that failed and that
* we won't wait anymore.
*/
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
}
}
/* prepare space for socket events */

View File

@ -2812,21 +2812,21 @@ CheckConnectionTimeout(WorkerPool *workerPool)
logLevel = ERROR;
}
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not establish any connections to the node "
"%s:%d after %u ms", workerPool->nodeName,
workerPool->nodePort,
NodeConnectionTimeout)));
/*
* We hit the connection timeout. In that case, we should not let the
* connection establishment to continue because the execution logic
* pretends that failed sessions are not going to be used anymore.
*
* That's why we mark the connection as timed out to trigger the state
* changes in the executor.
* changes in the executor, if we don't throw an error below.
*/
MarkEstablishingSessionsTimedOut(workerPool);
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not establish any connections to the node "
"%s:%d after %u ms", workerPool->nodeName,
workerPool->nodePort,
NodeConnectionTimeout)));
}
else
{
@ -2854,6 +2854,7 @@ MarkEstablishingSessionsTimedOut(WorkerPool *workerPool)
connection->connectionState == MULTI_CONNECTION_INITIAL)
{
connection->connectionState = MULTI_CONNECTION_TIMED_OUT;
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
}
}
}
@ -3011,9 +3012,12 @@ ConnectionStateMachine(WorkerSession *session)
* the state machines might have already progressed and used
* new pools/sessions instead. That's why we terminate the
* connection, clear any state associated with it.
*
* Note that here we don't increment the failed connection
* stat counter because MarkEstablishingSessionsTimedOut()
* already did that.
*/
connection->connectionState = MULTI_CONNECTION_FAILED;
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
break;
}

View File

@ -119,6 +119,13 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
if (list_length(distSelectTaskList) <= 1)
{
/*
* Probably we will never get here for a repartitioned
* INSERT..SELECT because when the source is a single shard
* table, we should most probably choose to use
* MODIFY_WITH_SELECT_VIA_COORDINATOR, but we still keep this
* here.
*/
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else

View File

@ -92,6 +92,69 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row)
SET citus.enable_stat_counters TO true;
SET citus.node_connection_timeout TO 900;
SELECT citus.mitmproxy('conn.connect_delay(1400)');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
FROM pg_database WHERE datname = current_database() \gset
SELECT * FROM products;
WARNING: could not establish any connections to the node localhost:xxxxx after 900 ms
product_no | name | price
---------------------------------------------------------------------
(0 rows)
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
FROM pg_database WHERE datname = current_database();
?column?
---------------------------------------------------------------------
t
(1 row)
RESET citus.node_connection_timeout;
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
-- this time set citus.force_max_query_parallelization set to on
SET citus.force_max_query_parallelization TO ON;
SET citus.node_connection_timeout TO 900;
SELECT citus.mitmproxy('conn.connect_delay(1400)');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
FROM pg_database WHERE datname = current_database() \gset
SELECT * FROM products;
WARNING: could not establish any connections to the node localhost:xxxxx after 900 ms
product_no | name | price
---------------------------------------------------------------------
(0 rows)
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
FROM pg_database WHERE datname = current_database();
?column?
---------------------------------------------------------------------
t
(1 row)
RESET citus.node_connection_timeout;
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
RESET citus.force_max_query_parallelization;
RESET citus.enable_stat_counters;
-- Make sure that we fall back to a working node for reads, even if it's not
-- the first choice in our task assignment policy.
SET citus.node_connection_timeout TO 900;
@ -168,6 +231,46 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row)
SET citus.force_max_query_parallelization TO ON;
SET citus.node_connection_timeout TO 900;
SELECT citus.mitmproxy('conn.connect_delay(1400)');
mitmproxy
---------------------------------------------------------------------
(1 row)
-- test insert into a single replicated table
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
FROM pg_database WHERE datname = current_database() \gset
INSERT INTO single_replicatated VALUES (100);
ERROR: could not establish any connections to the node localhost:xxxxx after 900 ms
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
FROM pg_database WHERE datname = current_database();
?column?
---------------------------------------------------------------------
t
(1 row)
-- test select from a single replicated table
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
FROM pg_database WHERE datname = current_database() \gset
SELECT count(*) FROM single_replicatated;
ERROR: could not establish any connections to the node localhost:xxxxx after 900 ms
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
FROM pg_database WHERE datname = current_database();
?column?
---------------------------------------------------------------------
t
(1 row)
RESET citus.force_max_query_parallelization;
RESET citus.node_connection_timeout;
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
-- one similar test, and this time on modification queries
-- to see that connection establishement failures could
-- fail the transaction (but not mark any placements as INVALID)

View File

@ -48,6 +48,43 @@ show citus.node_conninfo;
-- Should give a connection error because of bad sslmode
select count(*) from test where a = 0;
ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
FROM pg_database WHERE datname = current_database() \gset
SET citus.enable_stat_counters TO true;
select count(*) from test;
WARNING: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
RESET citus.enable_stat_counters;
-- make sure that we properly updated the connection_establishment_failed counter
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
FROM pg_database WHERE datname = current_database();
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
FROM pg_database WHERE datname = current_database() \gset
-- Test a function that tries to establish parallel node connections.
SET citus.enable_stat_counters TO true;
-- we don't care about the result, hence make it always return true
SELECT COUNT(*) > -1 FROM get_global_active_transactions();
WARNING: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
WARNING: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
?column?
---------------------------------------------------------------------
t
(1 row)
RESET citus.enable_stat_counters;
-- make sure that we properly updated the connection_establishment_failed counter
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed = 2
FROM pg_database WHERE datname = current_database();
?column?
---------------------------------------------------------------------
t
(1 row)
-- Reset it again
ALTER SYSTEM RESET citus.node_conninfo;
select pg_reload_conf();

View File

@ -114,8 +114,7 @@ SELECT stats_reset IS NOT NULL FROM citus_stat_counters WHERE name = current_dat
(1 row)
-- multi_1_schedule has this test in an individual line, so there cannot be any other backends
-- -except Citus maintenance daemon- that can update the stat counters other than us. We also
-- know that Citus maintenance daemon cannot update query related stats.
-- that can update the stat counters other than us.
--
-- So, no one could have incremented query related stats so far.
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0 FROM citus_stat_counters;
@ -130,10 +129,6 @@ SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0 FROM ci
-- Even further, for the databases that don't have Citus extension installed,
-- we should get 0 for other stats too.
--
-- For the databases that have Citus extension installed, we might or might not
-- get 0 for connection related stats, depending on whether the Citus maintenance
-- daemon has done any work so far, so we don't check them.
SELECT connection_establishment_succeeded = 0,
connection_establishment_failed = 0,
connection_reused = 0
@ -336,16 +331,12 @@ SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_d
(1 row)
-- no one could have incremented query related stats so far
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0 FROM citus_stat_counters;
?column? | ?column?
-- No one could have incremented query related stats and connection_reused so far.
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0, connection_reused = 0 FROM citus_stat_counters WHERE name = current_database();
?column? | ?column? | ?column?
---------------------------------------------------------------------
t | t
t | t
t | t
t | t
t | t
(5 rows)
t | t | t
(1 row)
SET citus.enable_stat_counters TO true;
SELECT * FROM stat_counters.dist_table WHERE a = 1;
@ -358,6 +349,13 @@ SELECT * FROM stat_counters.dist_table WHERE a = 1;
---------------------------------------------------------------------
(0 rows)
-- first one establishes a connection, the second one reuses it
SELECT connection_reused = 1 FROM citus_stat_counters WHERE name = current_database();
?column?
---------------------------------------------------------------------
t
(1 row)
SET citus.force_max_query_parallelization TO ON;
SELECT * FROM stat_counters.dist_table;
a | b
@ -726,6 +724,25 @@ CALL exec_query_and_check_query_counters($$
$$,
1, 1
);
-- safe to push-down
CALL exec_query_and_check_query_counters($$
SELECT * FROM (SELECT * FROM dist_table UNION SELECT * FROM dist_table) as foo
$$,
0, 1
);
-- weird but not safe to pushdown because the set operation is NOT wrapped into a subquery.
CALL exec_query_and_check_query_counters($$
SELECT * FROM dist_table UNION SELECT * FROM dist_table
$$,
1, 2
);
SET citus.local_table_join_policy TO "prefer-local";
CALL exec_query_and_check_query_counters($$
SELECT * FROM dist_table, local_table WHERE dist_table.a = local_table.a
$$,
0, 1
);
RESET citus.local_table_join_policy;
-- citus_stat_counters lists all the databases that currently exist,
-- so we should get 5 rows here.
SELECT COUNT(*) = 5 FROM citus_stat_counters;

View File

@ -33,6 +33,8 @@ test: failure_savepoints
test: failure_multi_row_insert
test: failure_mx_metadata_sync
test: failure_mx_metadata_sync_multi_trans
# Do not parallelize with others because this measures stat counters
# for failed connections for a few queries.
test: failure_connection_establishment
test: failure_create_database

View File

@ -45,10 +45,10 @@ test: single_shard_table_udfs
test: schema_based_sharding
test: citus_schema_distribute_undistribute
# Don't parallelize stat_counters with others because we don't want statistics
# to be updated by other tests.
# to be updated by other tests concurrently.
#
# Also, this needs to be the first test that queries stats_reset column on
# citus_stat_counters since it checks the value of that column.
# Also, this needs to be the first test that calls citus_stat_counters()
# because it checks the value of stats_reset column before calling the function.
test: stat_counters
test: multi_test_catalog_views
@ -296,6 +296,9 @@ test: multi_colocation_utils
# ----------
# node_conninfo_reload tests that node_conninfo changes take effect
#
# Do not parallelize with others because this measures stat counters
# for failed connections for a few queries.
# ----------
test: node_conninfo_reload

View File

@ -492,6 +492,7 @@ push(@pgOptions, "citus.stat_statements_track = 'all'");
push(@pgOptions, "citus.enable_change_data_capture=on");
push(@pgOptions, "citus.stat_tenants_limit = 2");
push(@pgOptions, "citus.stat_tenants_track = 'ALL'");
push(@pgOptions, "citus.enable_stat_counters=on");
push(@pgOptions, "citus.superuser = 'postgres'");
# Some tests look at shards in pg_class, make sure we can usually see them:
@ -1199,4 +1200,3 @@ else {
die "Failed in ". ($endTime - $startTime)." seconds. \n";
}

View File

@ -55,6 +55,43 @@ ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no);
RESET citus.node_connection_timeout;
SELECT citus.mitmproxy('conn.allow()');
SET citus.enable_stat_counters TO true;
SET citus.node_connection_timeout TO 900;
SELECT citus.mitmproxy('conn.connect_delay(1400)');
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
FROM pg_database WHERE datname = current_database() \gset
SELECT * FROM products;
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
FROM pg_database WHERE datname = current_database();
RESET citus.node_connection_timeout;
SELECT citus.mitmproxy('conn.allow()');
-- this time set citus.force_max_query_parallelization set to on
SET citus.force_max_query_parallelization TO ON;
SET citus.node_connection_timeout TO 900;
SELECT citus.mitmproxy('conn.connect_delay(1400)');
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
FROM pg_database WHERE datname = current_database() \gset
SELECT * FROM products;
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
FROM pg_database WHERE datname = current_database();
RESET citus.node_connection_timeout;
SELECT citus.mitmproxy('conn.allow()');
RESET citus.force_max_query_parallelization;
RESET citus.enable_stat_counters;
-- Make sure that we fall back to a working node for reads, even if it's not
-- the first choice in our task assignment policy.
SET citus.node_connection_timeout TO 900;
@ -87,6 +124,31 @@ RESET citus.force_max_query_parallelization;
RESET citus.node_connection_timeout;
SELECT citus.mitmproxy('conn.allow()');
SET citus.force_max_query_parallelization TO ON;
SET citus.node_connection_timeout TO 900;
SELECT citus.mitmproxy('conn.connect_delay(1400)');
-- test insert into a single replicated table
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
FROM pg_database WHERE datname = current_database() \gset
INSERT INTO single_replicatated VALUES (100);
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
FROM pg_database WHERE datname = current_database();
-- test select from a single replicated table
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
FROM pg_database WHERE datname = current_database() \gset
SELECT count(*) FROM single_replicatated;
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
FROM pg_database WHERE datname = current_database();
RESET citus.force_max_query_parallelization;
RESET citus.node_connection_timeout;
SELECT citus.mitmproxy('conn.allow()');
-- one similar test, and this time on modification queries
-- to see that connection establishement failures could

View File

@ -22,6 +22,30 @@ show citus.node_conninfo;
-- Should give a connection error because of bad sslmode
select count(*) from test where a = 0;
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
FROM pg_database WHERE datname = current_database() \gset
SET citus.enable_stat_counters TO true;
select count(*) from test;
RESET citus.enable_stat_counters;
-- make sure that we properly updated the connection_establishment_failed counter
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
FROM pg_database WHERE datname = current_database();
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
FROM pg_database WHERE datname = current_database() \gset
-- Test a function that tries to establish parallel node connections.
SET citus.enable_stat_counters TO true;
-- we don't care about the result, hence make it always return true
SELECT COUNT(*) > -1 FROM get_global_active_transactions();
RESET citus.enable_stat_counters;
-- make sure that we properly updated the connection_establishment_failed counter
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed = 2
FROM pg_database WHERE datname = current_database();
-- Reset it again
ALTER SYSTEM RESET citus.node_conninfo;
select pg_reload_conf();

View File

@ -62,18 +62,13 @@ SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_d
SELECT stats_reset IS NOT NULL FROM citus_stat_counters WHERE name = current_database();
-- multi_1_schedule has this test in an individual line, so there cannot be any other backends
-- -except Citus maintenance daemon- that can update the stat counters other than us. We also
-- know that Citus maintenance daemon cannot update query related stats.
-- that can update the stat counters other than us.
--
-- So, no one could have incremented query related stats so far.
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0 FROM citus_stat_counters;
-- Even further, for the databases that don't have Citus extension installed,
-- we should get 0 for other stats too.
--
-- For the databases that have Citus extension installed, we might or might not
-- get 0 for connection related stats, depending on whether the Citus maintenance
-- daemon has done any work so far, so we don't check them.
SELECT connection_establishment_succeeded = 0,
connection_establishment_failed = 0,
connection_reused = 0
@ -175,14 +170,17 @@ SET client_min_messages TO NOTICE;
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
-- no one could have incremented query related stats so far
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0 FROM citus_stat_counters;
-- No one could have incremented query related stats and connection_reused so far.
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0, connection_reused = 0 FROM citus_stat_counters WHERE name = current_database();
SET citus.enable_stat_counters TO true;
SELECT * FROM stat_counters.dist_table WHERE a = 1;
SELECT * FROM stat_counters.dist_table WHERE a = 1;
-- first one establishes a connection, the second one reuses it
SELECT connection_reused = 1 FROM citus_stat_counters WHERE name = current_database();
SET citus.force_max_query_parallelization TO ON;
SELECT * FROM stat_counters.dist_table;
SELECT * FROM stat_counters.dist_table;
@ -516,6 +514,28 @@ CALL exec_query_and_check_query_counters($$
1, 1
);
-- safe to push-down
CALL exec_query_and_check_query_counters($$
SELECT * FROM (SELECT * FROM dist_table UNION SELECT * FROM dist_table) as foo
$$,
0, 1
);
-- weird but not safe to pushdown because the set operation is NOT wrapped into a subquery.
CALL exec_query_and_check_query_counters($$
SELECT * FROM dist_table UNION SELECT * FROM dist_table
$$,
1, 2
);
SET citus.local_table_join_policy TO "prefer-local";
CALL exec_query_and_check_query_counters($$
SELECT * FROM dist_table, local_table WHERE dist_table.a = local_table.a
$$,
0, 1
);
RESET citus.local_table_join_policy;
-- citus_stat_counters lists all the databases that currently exist,
-- so we should get 5 rows here.
SELECT COUNT(*) = 5 FROM citus_stat_counters;