From 7b4eb9611bf0243ff79f36939511164df6d5ed62 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 20 Mar 2020 10:09:43 +0100 Subject: [PATCH] Properly terminate connections at the end session Citus coordinator (or MX nodes) caches `citus.max_cached_conns_per_worker` connections per node. This means that, those connections are not terminated after each statement. Instead, cached to avoid the cost of re-establishment. This is crucial for OLTP performance. The problem with that approach is that, we never properly handle the termnation of those cached connections. For instance, when a session on the coordinator disconnects, you'd see the following logs on the workers: ``` 2020-03-20 09:13:39.454 CET [64028] LOG: could not receive data from client: Connection reset by peer ``` With this patch, we're terminating the cached connections properly at the end of the connection. --- .../connection/connection_management.c | 31 +++++++++++++++++++ src/backend/distributed/shared_library_init.c | 15 +++++++++ .../distributed/connection_management.h | 1 + 3 files changed, 47 insertions(+) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 9f34e5ac0..900f304a3 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -494,6 +494,37 @@ CloseConnection(MultiConnection *connection) } +/* + * ShutdownAllConnections shutdowns all the MultiConnections in the + * ConnectionHash. + * + * This function is intended to be called atexit() of the backend, so + * that the cached connections are closed properly. Calling this function + * at another point in the code could be dangerous, so think twice if you + * need to call this function. + */ +void +ShutdownAllConnections(void) +{ + ConnectionHashEntry *entry = NULL; + HASH_SEQ_STATUS status; + + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != NULL) + { + dlist_iter iter; + + dlist_foreach(iter, entry->connections) + { + MultiConnection *connection = + dlist_container(MultiConnection, connectionNode, iter.cur); + + ShutdownConnection(connection); + } + } +} + + /* * ShutdownConnection, if necessary cancels the currently running statement, * and then closes the underlying libpq connection. The MultiConnection diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index c82f64553..3c12eec8c 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -87,6 +87,7 @@ static char *CitusVersion = CITUS_VERSION; void _PG_init(void); +static void CitusBackendAtExit(void); static void ResizeStackToMaximumDepth(void); static void multi_log_hook(ErrorData *edata); static void CreateRequiredDirectories(void); @@ -270,6 +271,8 @@ _PG_init(void) InitPlacementConnectionManagement(); InitializeCitusQueryStats(); + atexit(CitusBackendAtExit); + /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) { @@ -279,6 +282,18 @@ _PG_init(void) } +/* + * CitusBackendAtExit is called atexit of the backend for the purposes of + * any clean-up needed. + */ +static void +CitusBackendAtExit(void) +{ + /* properly close all the cached connections */ + ShutdownAllConnections(); +} + + /* * Stack size increase during high memory load may cause unexpected crashes. * With this alloca call, we are increasing stack size explicitly, so that if diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 571e03c38..ac25e8662 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -221,6 +221,7 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, const char *database); extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); extern void CloseConnection(MultiConnection *connection); +extern void ShutdownAllConnections(void); extern void ShutdownConnection(MultiConnection *connection); /* dealing with a connection */