From befb5693c8dac866d4fd04f46a76966a38d8ee52 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 2 Sep 2022 08:08:50 +0300 Subject: [PATCH] - Use CloseConnection() over ShutdownConnection(): Because the latter just partially closes the connection, such as does not remove from InProgressTXes - StartNodeUserDatabaseConnection over StartConnectionEstablishment: Because we should use the connectionFlags properly - Move task back to sessionReadyQueueNode: Let the state machines decide what should happen - Add some tests back --- .../connection/connection_management.c | 17 +- .../distributed/executor/adaptive_executor.c | 42 ++++- .../distributed/connection_management.h | 2 +- .../regress/expected/detect_conn_close.out | 149 ++++++++++++++++++ .../regress/expected/detect_conn_close_0.out | 9 ++ src/test/regress/multi_schedule | 2 +- src/test/regress/sql/detect_conn_close.sql | 88 +++++++++++ 7 files changed, 291 insertions(+), 18 deletions(-) create mode 100644 src/test/regress/expected/detect_conn_close.out create mode 100644 src/test/regress/expected/detect_conn_close_0.out create mode 100644 src/test/regress/sql/detect_conn_close.sql diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 516424748..37a4f4177 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -1483,17 +1483,18 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection /* - * RestartConnection starts a new connection attempt for the given - * MultiConnection. - * - * We assume that we already went through all the other initialization steps in - * StartNodeUserDatabaseConnection, such as incrementing shared connection - * counters. + * RestartConnection first closes the input connection, + * and returns a new connection attempt to the node + * the oldConnection connected to. */ void -RestartConnection(MultiConnection *connection) +RestartPQConnection(MultiConnection *connection) { - ShutdownConnection(connection); + if (connection->pgConn != NULL) + { + PQfinish(connection->pgConn); + connection->pgConn = NULL; + } ConnectionHashKey key; strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH); diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 8f9577d7f..932cac641 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -3659,7 +3659,6 @@ ConnectionStateMachine(WorkerSession *session) workerPool->idleConnectionCount--; } - /* TODO: refine this check */ RemoteTransaction *transaction = &connection->remoteTransaction; if (!transaction->transactionCritical && session->connectionRetryCount == 0) @@ -3667,15 +3666,39 @@ ConnectionStateMachine(WorkerSession *session) session->connectionRetryCount++; /* - * Try to connect again, we will reuse the same MultiConnection - * and keep it as claimed. + * Close the underlying pgConn connection, and establish + * a new one. We refrain using ShutdownConnection() or + * CloseConnection() as at this point we already have a + * connection with proper flags (e.g., WAIT_FOR_CONNECTION vs + * OPTIONAL_CONNECTION or metadata connection etc.). + * + * Also, it is sufficient just to re-connect for the underlying + * pqConn and keep the executor state as-is. + * + * Most commonly this would be used when a remote socket of + * a cached connection is closed, and we are at the start of + * the execution. */ - RestartConnection(connection); + RestartPQConnection(connection); - /* socket will have changed */ + /* + * The currentTask should be re-assigned to the newConnection, + * but let the transaction state machine do that. + */ + if (session->currentTask) + { + /* the task could not have ended */ + Assert(INSTR_TIME_IS_ZERO(session->currentTask->endTime)); + + /* we are going to re-start the task execution */ + INSTR_TIME_SET_ZERO(session->currentTask->startTime); + } + + /* connection changed, so we need to rebuild */ execution->rebuildWaitEventSet = true; connection->connectionState = MULTI_CONNECTION_INITIAL; + break; } @@ -3843,7 +3866,10 @@ HandleMultiConnectionSuccess(WorkerSession *session) connection->connectionEstablishmentEnd)))); workerPool->activeConnectionCount++; - workerPool->idleConnectionCount++; + if (session->currentTask == NULL) + { + workerPool->idleConnectionCount++; + } } @@ -3958,8 +3984,8 @@ TransactionStateMachine(WorkerSession *session) } else { - TaskPlacementExecution *placementExecution = PopPlacementExecution( - session); + TaskPlacementExecution *placementExecution = + PopPlacementExecution(session); if (placementExecution == NULL) { /* diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 9615374e7..0bedab5ba 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -294,7 +294,7 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, int32 port, const char *user, const char *database); -extern void RestartConnection(MultiConnection *connection); +extern void RestartPQConnection(MultiConnection *oldConnection); extern void CloseAllConnectionsAfterTransaction(void); extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort, diff --git a/src/test/regress/expected/detect_conn_close.out b/src/test/regress/expected/detect_conn_close.out new file mode 100644 index 000000000..172955d0f --- /dev/null +++ b/src/test/regress/expected/detect_conn_close.out @@ -0,0 +1,149 @@ +-- +-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+ +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif +CREATE SCHEMA socket_close; +SET search_path TO socket_close; +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; +CREATE TABLE socket_test_table(id bigserial, value text); +SELECT create_distributed_table('socket_test_table', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO socket_test_table (value) SELECT i::text FROM generate_series(0,100)i; +-- first, simulate that we only have one cached connection per node +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- kill all the cached backends on the workers +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + result +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- show that none remains +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + result +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +-- even though the cached connections closed, the execution recovers and establishes new connections +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- now, use 16 connections per worker, we can still recover all connections +SET citus.max_adaptive_executor_pool_size TO 16; +SET citus.max_cached_conns_per_worker TO 16; +SET citus.force_max_query_parallelization TO ON; +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + result +--------------------------------------------------------------------- + 16 + 16 +(2 rows) + +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- now, get back to sane defaults +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SET citus.force_max_query_parallelization TO OFF; +-- we can recover for modification queries as well +-- single row INSERT +INSERT INTO socket_test_table VALUES (1); +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result; + result +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +INSERT INTO socket_test_table VALUES (1); +-- single row UPDATE +UPDATE socket_test_table SET value = 15 WHERE id = 1; +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result; + result +--------------------------------------------------------------------- + 0 + 1 +(2 rows) + +UPDATE socket_test_table SET value = 15 WHERE id = 1; +-- we cannot recover in a transaction block +BEGIN; + SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 103 +(1 row) + + -- kill all the cached backends on the workers + SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + result +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + + SELECT count(*) FROM socket_test_table; +ERROR: connection to the remote node localhost:xxxxx failed +ROLLBACK; +-- although should have no difference, we can recover from the failures on the workers as well +\c - - - :worker_1_port +SET search_path TO socket_close; +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SET citus.force_max_query_parallelization TO ON; +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 103 +(1 row) + +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table_%' AND query not ilike '%pg_stat_activity%' AND pid !=pg_backend_pid()) as foo$$); + result +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 103 +(1 row) + +\c - - - :master_port +SET client_min_messages TO ERROR; +DROP SCHEMA socket_close CASCADE; diff --git a/src/test/regress/expected/detect_conn_close_0.out b/src/test/regress/expected/detect_conn_close_0.out new file mode 100644 index 000000000..d3cd58377 --- /dev/null +++ b/src/test/regress/expected/detect_conn_close_0.out @@ -0,0 +1,9 @@ +-- +-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+ +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q \ No newline at end of file diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index a92da42fd..3c837f8ed 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -55,7 +55,7 @@ test: subquery_in_targetlist subquery_in_where subquery_complex_target_list subq test: subquery_prepared_statements test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins test: cte_inline recursive_view_local_table values sequences_with_different_types -test: pg13 pg12 pg15_json json_table_select_only +test: pg13 pg12 pg15_json json_table_select_only detect_conn_close # run pg14 sequentially as it syncs metadata test: pg14 test: pg15 diff --git a/src/test/regress/sql/detect_conn_close.sql b/src/test/regress/sql/detect_conn_close.sql new file mode 100644 index 000000000..be80a6bd0 --- /dev/null +++ b/src/test/regress/sql/detect_conn_close.sql @@ -0,0 +1,88 @@ +-- +-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+ +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif + +CREATE SCHEMA socket_close; +SET search_path TO socket_close; + +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; + +CREATE TABLE socket_test_table(id bigserial, value text); +SELECT create_distributed_table('socket_test_table', 'id'); +INSERT INTO socket_test_table (value) SELECT i::text FROM generate_series(0,100)i; + +-- first, simulate that we only have one cached connection per node +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SELECT count(*) FROM socket_test_table; + +-- kill all the cached backends on the workers +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + +-- show that none remains +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + +-- even though the cached connections closed, the execution recovers and establishes new connections +SELECT count(*) FROM socket_test_table; + + +-- now, use 16 connections per worker, we can still recover all connections +SET citus.max_adaptive_executor_pool_size TO 16; +SET citus.max_cached_conns_per_worker TO 16; +SET citus.force_max_query_parallelization TO ON; +SELECT count(*) FROM socket_test_table; +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + +SELECT count(*) FROM socket_test_table; + +-- now, get back to sane defaults +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SET citus.force_max_query_parallelization TO OFF; + +-- we can recover for modification queries as well + +-- single row INSERT +INSERT INTO socket_test_table VALUES (1); +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result; +INSERT INTO socket_test_table VALUES (1); + +-- single row UPDATE +UPDATE socket_test_table SET value = 15 WHERE id = 1; +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result; +UPDATE socket_test_table SET value = 15 WHERE id = 1; + +-- we cannot recover in a transaction block +BEGIN; + SELECT count(*) FROM socket_test_table; + + -- kill all the cached backends on the workers + SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + + SELECT count(*) FROM socket_test_table; +ROLLBACK; + +-- although should have no difference, we can recover from the failures on the workers as well + +\c - - - :worker_1_port +SET search_path TO socket_close; + +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SET citus.force_max_query_parallelization TO ON; +SELECT count(*) FROM socket_test_table; +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE query ilike '%socket_test_table_%' AND query not ilike '%pg_stat_activity%' AND pid !=pg_backend_pid()) as foo$$); +SELECT count(*) FROM socket_test_table; + +\c - - - :master_port + +SET client_min_messages TO ERROR; +DROP SCHEMA socket_close CASCADE;