mirror of https://github.com/citusdata/citus.git
- Use CloseConnection() over ShutdownConnection(): Because the latter just partially closes the connection, such as does not remove from InProgressTXes
- StartNodeUserDatabaseConnection over StartConnectionEstablishment: Because we should use the connectionFlags properly - Move task back to sessionReadyQueueNode: Let the state machines decide what should happen - Add some tests backonder_exec_retry
parent
06affc7467
commit
befb5693c8
|
@ -1483,17 +1483,18 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RestartConnection starts a new connection attempt for the given
|
* RestartConnection first closes the input connection,
|
||||||
* MultiConnection.
|
* and returns a new connection attempt to the node
|
||||||
*
|
* the oldConnection connected to.
|
||||||
* We assume that we already went through all the other initialization steps in
|
|
||||||
* StartNodeUserDatabaseConnection, such as incrementing shared connection
|
|
||||||
* counters.
|
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
RestartConnection(MultiConnection *connection)
|
RestartPQConnection(MultiConnection *connection)
|
||||||
{
|
{
|
||||||
ShutdownConnection(connection);
|
if (connection->pgConn != NULL)
|
||||||
|
{
|
||||||
|
PQfinish(connection->pgConn);
|
||||||
|
connection->pgConn = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
ConnectionHashKey key;
|
ConnectionHashKey key;
|
||||||
strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH);
|
strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH);
|
||||||
|
|
|
@ -3659,7 +3659,6 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
workerPool->idleConnectionCount--;
|
workerPool->idleConnectionCount--;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TODO: refine this check */
|
|
||||||
RemoteTransaction *transaction = &connection->remoteTransaction;
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
if (!transaction->transactionCritical &&
|
if (!transaction->transactionCritical &&
|
||||||
session->connectionRetryCount == 0)
|
session->connectionRetryCount == 0)
|
||||||
|
@ -3667,15 +3666,39 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
session->connectionRetryCount++;
|
session->connectionRetryCount++;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Try to connect again, we will reuse the same MultiConnection
|
* Close the underlying pgConn connection, and establish
|
||||||
* and keep it as claimed.
|
* a new one. We refrain using ShutdownConnection() or
|
||||||
|
* CloseConnection() as at this point we already have a
|
||||||
|
* connection with proper flags (e.g., WAIT_FOR_CONNECTION vs
|
||||||
|
* OPTIONAL_CONNECTION or metadata connection etc.).
|
||||||
|
*
|
||||||
|
* Also, it is sufficient just to re-connect for the underlying
|
||||||
|
* pqConn and keep the executor state as-is.
|
||||||
|
*
|
||||||
|
* Most commonly this would be used when a remote socket of
|
||||||
|
* a cached connection is closed, and we are at the start of
|
||||||
|
* the execution.
|
||||||
*/
|
*/
|
||||||
RestartConnection(connection);
|
RestartPQConnection(connection);
|
||||||
|
|
||||||
/* socket will have changed */
|
/*
|
||||||
|
* The currentTask should be re-assigned to the newConnection,
|
||||||
|
* but let the transaction state machine do that.
|
||||||
|
*/
|
||||||
|
if (session->currentTask)
|
||||||
|
{
|
||||||
|
/* the task could not have ended */
|
||||||
|
Assert(INSTR_TIME_IS_ZERO(session->currentTask->endTime));
|
||||||
|
|
||||||
|
/* we are going to re-start the task execution */
|
||||||
|
INSTR_TIME_SET_ZERO(session->currentTask->startTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* connection changed, so we need to rebuild */
|
||||||
execution->rebuildWaitEventSet = true;
|
execution->rebuildWaitEventSet = true;
|
||||||
|
|
||||||
connection->connectionState = MULTI_CONNECTION_INITIAL;
|
connection->connectionState = MULTI_CONNECTION_INITIAL;
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3843,7 +3866,10 @@ HandleMultiConnectionSuccess(WorkerSession *session)
|
||||||
connection->connectionEstablishmentEnd))));
|
connection->connectionEstablishmentEnd))));
|
||||||
|
|
||||||
workerPool->activeConnectionCount++;
|
workerPool->activeConnectionCount++;
|
||||||
workerPool->idleConnectionCount++;
|
if (session->currentTask == NULL)
|
||||||
|
{
|
||||||
|
workerPool->idleConnectionCount++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -3958,8 +3984,8 @@ TransactionStateMachine(WorkerSession *session)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
TaskPlacementExecution *placementExecution = PopPlacementExecution(
|
TaskPlacementExecution *placementExecution =
|
||||||
session);
|
PopPlacementExecution(session);
|
||||||
if (placementExecution == NULL)
|
if (placementExecution == NULL)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -294,7 +294,7 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
|
||||||
int32 port,
|
int32 port,
|
||||||
const char *user,
|
const char *user,
|
||||||
const char *database);
|
const char *database);
|
||||||
extern void RestartConnection(MultiConnection *connection);
|
extern void RestartPQConnection(MultiConnection *oldConnection);
|
||||||
extern void CloseAllConnectionsAfterTransaction(void);
|
extern void CloseAllConnectionsAfterTransaction(void);
|
||||||
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
|
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
|
||||||
extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort,
|
extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort,
|
||||||
|
|
|
@ -0,0 +1,149 @@
|
||||||
|
--
|
||||||
|
-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+
|
||||||
|
--
|
||||||
|
SHOW server_version \gset
|
||||||
|
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
|
||||||
|
\gset
|
||||||
|
\if :server_version_ge_15
|
||||||
|
\else
|
||||||
|
\q
|
||||||
|
\endif
|
||||||
|
CREATE SCHEMA socket_close;
|
||||||
|
SET search_path TO socket_close;
|
||||||
|
SET citus.shard_count TO 32;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
CREATE TABLE socket_test_table(id bigserial, value text);
|
||||||
|
SELECT create_distributed_table('socket_test_table', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO socket_test_table (value) SELECT i::text FROM generate_series(0,100)i;
|
||||||
|
-- first, simulate that we only have one cached connection per node
|
||||||
|
SET citus.max_adaptive_executor_pool_size TO 1;
|
||||||
|
SET citus.max_cached_conns_per_worker TO 1;
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
101
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- kill all the cached backends on the workers
|
||||||
|
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
|
||||||
|
result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- show that none remains
|
||||||
|
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
|
||||||
|
result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
0
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- even though the cached connections closed, the execution recovers and establishes new connections
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
101
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- now, use 16 connections per worker, we can still recover all connections
|
||||||
|
SET citus.max_adaptive_executor_pool_size TO 16;
|
||||||
|
SET citus.max_cached_conns_per_worker TO 16;
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
101
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
|
||||||
|
result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
16
|
||||||
|
16
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
101
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- now, get back to sane defaults
|
||||||
|
SET citus.max_adaptive_executor_pool_size TO 1;
|
||||||
|
SET citus.max_cached_conns_per_worker TO 1;
|
||||||
|
SET citus.force_max_query_parallelization TO OFF;
|
||||||
|
-- we can recover for modification queries as well
|
||||||
|
-- single row INSERT
|
||||||
|
INSERT INTO socket_test_table VALUES (1);
|
||||||
|
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result;
|
||||||
|
result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
INSERT INTO socket_test_table VALUES (1);
|
||||||
|
-- single row UPDATE
|
||||||
|
UPDATE socket_test_table SET value = 15 WHERE id = 1;
|
||||||
|
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result;
|
||||||
|
result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
UPDATE socket_test_table SET value = 15 WHERE id = 1;
|
||||||
|
-- we cannot recover in a transaction block
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
103
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- kill all the cached backends on the workers
|
||||||
|
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
|
||||||
|
result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
ERROR: connection to the remote node localhost:xxxxx failed
|
||||||
|
ROLLBACK;
|
||||||
|
-- although should have no difference, we can recover from the failures on the workers as well
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO socket_close;
|
||||||
|
SET citus.max_adaptive_executor_pool_size TO 1;
|
||||||
|
SET citus.max_cached_conns_per_worker TO 1;
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
103
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table_%' AND query not ilike '%pg_stat_activity%' AND pid !=pg_backend_pid()) as foo$$);
|
||||||
|
result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
103
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA socket_close CASCADE;
|
|
@ -0,0 +1,9 @@
|
||||||
|
--
|
||||||
|
-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+
|
||||||
|
--
|
||||||
|
SHOW server_version \gset
|
||||||
|
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
|
||||||
|
\gset
|
||||||
|
\if :server_version_ge_15
|
||||||
|
\else
|
||||||
|
\q
|
|
@ -55,7 +55,7 @@ test: subquery_in_targetlist subquery_in_where subquery_complex_target_list subq
|
||||||
test: subquery_prepared_statements
|
test: subquery_prepared_statements
|
||||||
test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins
|
test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins
|
||||||
test: cte_inline recursive_view_local_table values sequences_with_different_types
|
test: cte_inline recursive_view_local_table values sequences_with_different_types
|
||||||
test: pg13 pg12 pg15_json json_table_select_only
|
test: pg13 pg12 pg15_json json_table_select_only detect_conn_close
|
||||||
# run pg14 sequentially as it syncs metadata
|
# run pg14 sequentially as it syncs metadata
|
||||||
test: pg14
|
test: pg14
|
||||||
test: pg15
|
test: pg15
|
||||||
|
|
|
@ -0,0 +1,88 @@
|
||||||
|
--
|
||||||
|
-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+
|
||||||
|
--
|
||||||
|
SHOW server_version \gset
|
||||||
|
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
|
||||||
|
\gset
|
||||||
|
\if :server_version_ge_15
|
||||||
|
\else
|
||||||
|
\q
|
||||||
|
\endif
|
||||||
|
|
||||||
|
CREATE SCHEMA socket_close;
|
||||||
|
SET search_path TO socket_close;
|
||||||
|
|
||||||
|
SET citus.shard_count TO 32;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
CREATE TABLE socket_test_table(id bigserial, value text);
|
||||||
|
SELECT create_distributed_table('socket_test_table', 'id');
|
||||||
|
INSERT INTO socket_test_table (value) SELECT i::text FROM generate_series(0,100)i;
|
||||||
|
|
||||||
|
-- first, simulate that we only have one cached connection per node
|
||||||
|
SET citus.max_adaptive_executor_pool_size TO 1;
|
||||||
|
SET citus.max_cached_conns_per_worker TO 1;
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
|
||||||
|
-- kill all the cached backends on the workers
|
||||||
|
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
|
||||||
|
|
||||||
|
-- show that none remains
|
||||||
|
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
|
||||||
|
|
||||||
|
-- even though the cached connections closed, the execution recovers and establishes new connections
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
|
||||||
|
|
||||||
|
-- now, use 16 connections per worker, we can still recover all connections
|
||||||
|
SET citus.max_adaptive_executor_pool_size TO 16;
|
||||||
|
SET citus.max_cached_conns_per_worker TO 16;
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
|
||||||
|
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
|
||||||
|
-- now, get back to sane defaults
|
||||||
|
SET citus.max_adaptive_executor_pool_size TO 1;
|
||||||
|
SET citus.max_cached_conns_per_worker TO 1;
|
||||||
|
SET citus.force_max_query_parallelization TO OFF;
|
||||||
|
|
||||||
|
-- we can recover for modification queries as well
|
||||||
|
|
||||||
|
-- single row INSERT
|
||||||
|
INSERT INTO socket_test_table VALUES (1);
|
||||||
|
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result;
|
||||||
|
INSERT INTO socket_test_table VALUES (1);
|
||||||
|
|
||||||
|
-- single row UPDATE
|
||||||
|
UPDATE socket_test_table SET value = 15 WHERE id = 1;
|
||||||
|
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result;
|
||||||
|
UPDATE socket_test_table SET value = 15 WHERE id = 1;
|
||||||
|
|
||||||
|
-- we cannot recover in a transaction block
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
|
||||||
|
-- kill all the cached backends on the workers
|
||||||
|
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
|
||||||
|
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- although should have no difference, we can recover from the failures on the workers as well
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO socket_close;
|
||||||
|
|
||||||
|
SET citus.max_adaptive_executor_pool_size TO 1;
|
||||||
|
SET citus.max_cached_conns_per_worker TO 1;
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table_%' AND query not ilike '%pg_stat_activity%' AND pid !=pg_backend_pid()) as foo$$);
|
||||||
|
SELECT count(*) FROM socket_test_table;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA socket_close CASCADE;
|
Loading…
Reference in New Issue