From eca9fd358d73fc49f8b61ecbc3519270c73399d5 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 30 Aug 2022 18:51:05 +0300 Subject: [PATCH] Fix bugs --- .../connection/connection_management.c | 168 +++++++++++------- .../regress/expected/detect_conn_close.out | 148 +++++++++++++++ .../regress/expected/detect_conn_close_0.out | 9 + src/test/regress/sql/detect_conn_close.sql | 85 +++++++++ 4 files changed, 341 insertions(+), 69 deletions(-) create mode 100644 src/test/regress/expected/detect_conn_close.out create mode 100644 src/test/regress/expected/detect_conn_close_0.out create mode 100644 src/test/regress/sql/detect_conn_close.sql diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index bf23cd26a..3a9dcf0ca 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -58,9 +58,13 @@ static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static void StartConnectionEstablishment(MultiConnection *connectionn, ConnectionHashKey *key); static MultiConnection * FindAvailableConnection(List *connections, uint32 flags); -static List * EnsureAndGetHealthyConnections(dlist_head *connections, bool - checkTransactionState); +static List * EnsureTxStateAndGetHealthyConnections(dlist_head *connections, + bool checkTransactionState); static bool RemoteSocketClosed(MultiConnection *connection); +#if PG_VERSION_NUM >= PG_VERSION_15 +static bool ProcessWaitEventsForSocketClose(WaitEvent *events, int eventCount, + bool *socketClosed); +#endif static void ErrorIfMultipleMetadataConnectionExists(List *connections); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); @@ -338,7 +342,8 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, { bool checkTransactionState = !(flags & OUTSIDE_TRANSACTION); List *healthyConnections = - EnsureAndGetHealthyConnections(entry->connections, checkTransactionState); + EnsureTxStateAndGetHealthyConnections(entry->connections, + checkTransactionState); /* check connection cache for a connection that's not already in use */ MultiConnection *connection = FindAvailableConnection(healthyConnections, flags); @@ -530,18 +535,19 @@ FindAvailableConnection(List *connections, uint32 flags) /* - * EnsureAndGetHealthyConnections is a helper function that goes over the - * input connections, and: - * - Errors for the connections that are not healthy but still part of a - * remote transaction when checkTransactionState=true - * - Skips connections that are not healthy, marks them to be closed + * EnsureTxStateAndGetHealthyConnections is a helper function that goes + * over the input connections, and: + * - Errors for the connections whose remote transaction is broken + * when checkTransactionState=true + * - Skips connections that are not healthy, marks those to be closed * at the end of the transaction such that they do not linger around * * And, finally returns the list (unless already errored out) of connections - * that can be reused. + * that can are healthy and can be reused. */ static List * -EnsureAndGetHealthyConnections(dlist_head *connections, bool checkTransactionState) +EnsureTxStateAndGetHealthyConnections(dlist_head *connections, + bool checkTransactionState) { List *healthyConnections = NIL; @@ -550,16 +556,6 @@ EnsureAndGetHealthyConnections(dlist_head *connections, bool checkTransactionSta { MultiConnection *connection = dlist_container(MultiConnection, connectionNode, iter.cur); - bool healthyConnection = true; - - if (connection->pgConn == NULL || PQsocket(connection->pgConn) == -1 || - PQstatus(connection->pgConn) != CONNECTION_OK || - connection->connectionState != MULTI_CONNECTION_CONNECTED || - RemoteSocketClosed(connection)) - { - /* connection is not healthy */ - healthyConnection = false; - } RemoteTransaction *transaction = &connection->remoteTransaction; bool inRemoteTrasaction = @@ -574,24 +570,23 @@ EnsureAndGetHealthyConnections(dlist_head *connections, bool checkTransactionSta { transaction->transactionFailed = true; } + + if (transaction->transactionFailed && + (transaction->transactionCritical || + !dlist_is_empty(&connection->referencedPlacements))) + { + /* + * If a connection is executing a critical transaction or accessed any + * placements, we should not continue the execution. + */ + ReportConnectionError(connection, ERROR); + } } - if (healthyConnection) - { - healthyConnections = lappend(healthyConnections, connection); - } - else if (checkTransactionState && inRemoteTrasaction && - transaction->transactionFailed && - (transaction->transactionCritical || - !dlist_is_empty(&connection->referencedPlacements))) - { - /* - * If a connection is executing a critical transaction or accessed any - * placements, we should not continue the execution. - */ - ReportConnectionError(connection, ERROR); - } - else + if (connection->pgConn == NULL || PQsocket(connection->pgConn) == -1 || + PQstatus(connection->pgConn) == CONNECTION_BAD || + connection->connectionState == MULTI_CONNECTION_FAILED || + RemoteSocketClosed(connection)) { /* * If a connection is not usable and we do not need to throw an error, @@ -599,6 +594,11 @@ EnsureAndGetHealthyConnections(dlist_head *connections, bool checkTransactionSta */ connection->forceCloseAtTransactionEnd = true; } + else + { + /* connection is healthy */ + healthyConnections = lappend(healthyConnections, connection); + } } return healthyConnections; @@ -623,9 +623,7 @@ RemoteSocketClosed(MultiConnection *connection) return false; } - int eventCount = 0; WaitEvent events[3]; - WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext, 3); int sock = PQsocket(connection->pgConn); @@ -656,13 +654,45 @@ RemoteSocketClosed(MultiConnection *connection) CitusAddWaitEventSetToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); - long timeout = 0; /* do not wait at all */ - int eventIndex = 0; + bool retry = false; + do { + long timeout = 0; /* do not wait at all */ + int eventCount = + WaitEventSetWait(waitEventSet, timeout, events, + 3, WAIT_EVENT_CLIENT_READ); -retry: - eventCount = - WaitEventSetWait(waitEventSet, timeout, events, - 3, WAIT_EVENT_CLIENT_READ); + retry = ProcessWaitEventsForSocketClose(events, eventCount, &socketClosed); + if (socketClosed) + { + /* socket is closed, no further information needed */ + break; + } + } while (retry); + + + FreeWaitEventSet(waitEventSet); +#endif + + return socketClosed; +} + + +#if PG_VERSION_NUM >= PG_VERSION_15 + +/* + * ProcessWaitEventsForSocketClose goes over the events and mainly looks for + * WL_SOCKET_CLOSED event. When found, socketClosed is set to true. + * + * The function also returns true if we are required to re-check the events. We + * might need to re-check if latch is set. A latch event might be preventing + * other events from being reported. Poll and check again. + */ +static bool +ProcessWaitEventsForSocketClose(WaitEvent *events, int eventCount, bool *socketClosed) +{ + *socketClosed = false; + + int eventIndex = 0; for (; eventIndex < eventCount; eventIndex++) { WaitEvent *event = &events[eventIndex]; @@ -674,33 +704,27 @@ retry: if (event->events & WL_SOCKET_CLOSED) { - socketClosed = true; + *socketClosed = true; /* we found what we are searching for */ - break; + return false; } if (event->events & WL_LATCH_SET) { - /* - * A latch event might be preventing other events from being - * reported. Reset it and poll again. No need to restore it - * because no code should expect latches to survive across - * CHECK_FOR_INTERRUPTS(). - */ ResetLatch(MyLatch); - goto retry; + /* the caller should retry */ + return true; } } - FreeWaitEventSet(waitEventSet); -#endif - - return socketClosed; + return false; } +#endif + /* * ErrorIfMultipleMetadataConnectionExists throws an error if the * input connection dlist contains more than one metadata connections. @@ -790,7 +814,7 @@ ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName, bool checkTransactionState = true; List *healthyConnections = - EnsureAndGetHealthyConnections(entry->connections, checkTransactionState); + EnsureTxStateAndGetHealthyConnections(entry->connections, checkTransactionState); int flags = 0; MultiConnection *connection = FindAvailableConnection(healthyConnections, flags); @@ -1026,7 +1050,8 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount) *waitCount = 0; } - WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext, eventSetSize); + WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext, + eventSetSize); EnsureReleaseResource((MemoryContextCallbackFunction) (&FreeWaitEventSet), waitEventSet); @@ -1034,7 +1059,8 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount) * Put the wait events for the signal latch and postmaster death at the end such that * event index + pendingConnectionsStartIndex = the connection index in the array. */ - AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL); + AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, + NULL); AddWaitEventToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); numEventsAdded += 2; @@ -1066,8 +1092,9 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount) errmsg("connection establishment for node %s:%d failed", connectionState->connection->hostname, connectionState->connection->port), - errhint("Check both the local and remote server logs for the " - "connection establishment errors."))); + errhint( + "Check both the local and remote server logs for the " + "connection establishment errors."))); } numEventsAdded++; @@ -1225,12 +1252,14 @@ FinishConnectionListEstablishment(List *multiConnectionList) if (!success) { ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("connection establishment for node %s:%d " - "failed", connection->hostname, - connection->port), - errhint("Check both the local and remote server " - "logs for the connection establishment " - "errors."))); + errmsg( + "connection establishment for node %s:%d " + "failed", connection->hostname, + connection->port), + errhint( + "Check both the local and remote server " + "logs for the connection establishment " + "errors."))); } } @@ -1515,7 +1544,8 @@ FindOrCreateConnParamsEntry(ConnectionHashKey *key) } /* if not found or not valid, compute them from GUC, runtime, etc. */ - GetConnParams(key, &entry->keywords, &entry->values, &entry->runtimeParamStart, + GetConnParams(key, &entry->keywords, &entry->values, + &entry->runtimeParamStart, ConnectionContext); entry->isValid = true; diff --git a/src/test/regress/expected/detect_conn_close.out b/src/test/regress/expected/detect_conn_close.out new file mode 100644 index 000000000..79054cfa1 --- /dev/null +++ b/src/test/regress/expected/detect_conn_close.out @@ -0,0 +1,148 @@ +-- +-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+ +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif +CREATE SCHEMA socket_close; +SET search_path TO socket_close; +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; +CREATE TABLE socket_test_table(id bigserial, value text); +SELECT create_distributed_table('socket_test_table', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO socket_test_table (value) SELECT i::text FROM generate_series(0,100)i; +-- first, simulate that we only have one cached connection per node +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- kill all the cached backends on the workers +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + result +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- show that none remains +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + result +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +-- even though the cached connections closed, the execution recovers and establishes new connections +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- now, use 16 connections per worker, we can still recover all connections +SET citus.max_adaptive_executor_pool_size TO 16; +SET citus.max_cached_conns_per_worker TO 16; +SET citus.force_max_query_parallelization TO ON; +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + result +--------------------------------------------------------------------- + 16 + 16 +(2 rows) + +-- show that none remains +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + result +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- now, get back to sane defaults +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SET citus.force_max_query_parallelization TO OFF; +-- we can recover for modification queries as well +-- single row INSERT +INSERT INTO socket_test_table VALUES (1); +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result; + result +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- show that none remains +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + result +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +INSERT INTO socket_test_table VALUES (1); +-- single row UPDATE +UPDATE socket_test_table SET value = 15 WHERE id = 1; +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result; + result +--------------------------------------------------------------------- + 0 + 1 +(2 rows) + +-- show that none remains +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + result +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +UPDATE socket_test_table SET value = 15 WHERE id = 1; +-- we cannot recover in a transaction block +BEGIN; + SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 103 +(1 row) + + -- kill all the cached backends on the workers + SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + result +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + + SELECT count(*) FROM socket_test_table; +ERROR: terminating connection due to administrator command +CONTEXT: while executing command on localhost:xxxxx +ROLLBACK; +SET client_min_messages TO ERROR; +DROP SCHEMA socket_close CASCADE; diff --git a/src/test/regress/expected/detect_conn_close_0.out b/src/test/regress/expected/detect_conn_close_0.out new file mode 100644 index 000000000..27e9787c6 --- /dev/null +++ b/src/test/regress/expected/detect_conn_close_0.out @@ -0,0 +1,9 @@ +-- +-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+ +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q diff --git a/src/test/regress/sql/detect_conn_close.sql b/src/test/regress/sql/detect_conn_close.sql new file mode 100644 index 000000000..f4765a568 --- /dev/null +++ b/src/test/regress/sql/detect_conn_close.sql @@ -0,0 +1,85 @@ +-- +-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+ +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif + +CREATE SCHEMA socket_close; +SET search_path TO socket_close; + +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; + +CREATE TABLE socket_test_table(id bigserial, value text); +SELECT create_distributed_table('socket_test_table', 'id'); +INSERT INTO socket_test_table (value) SELECT i::text FROM generate_series(0,100)i; + +-- first, simulate that we only have one cached connection per node +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SELECT count(*) FROM socket_test_table; + +-- kill all the cached backends on the workers +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + +-- show that none remains +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + +-- even though the cached connections closed, the execution recovers and establishes new connections +SELECT count(*) FROM socket_test_table; + + +-- now, use 16 connections per worker, we can still recover all connections +SET citus.max_adaptive_executor_pool_size TO 16; +SET citus.max_cached_conns_per_worker TO 16; +SET citus.force_max_query_parallelization TO ON; +SELECT count(*) FROM socket_test_table; +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); +-- show that none remains +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + +SELECT count(*) FROM socket_test_table; + +-- now, get back to sane defaults +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SET citus.force_max_query_parallelization TO OFF; + +-- we can recover for modification queries as well + +-- single row INSERT +INSERT INTO socket_test_table VALUES (1); +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result; +-- show that none remains +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + +INSERT INTO socket_test_table VALUES (1); + + +-- single row UPDATE +UPDATE socket_test_table SET value = 15 WHERE id = 1; +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result; +-- show that none remains +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + +UPDATE socket_test_table SET value = 15 WHERE id = 1; + + +-- we cannot recover in a transaction block +BEGIN; + SELECT count(*) FROM socket_test_table; + + -- kill all the cached backends on the workers + SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + + SELECT count(*) FROM socket_test_table; +ROLLBACK; + + +SET client_min_messages TO ERROR; +DROP SCHEMA socket_close CASCADE;