mirror of https://github.com/citusdata/citus.git
Properly terminate connections at the end session
Citus coordinator (or MX nodes) caches `citus.max_cached_conns_per_worker` connections per node. This means that, those connections are not terminated after each statement. Instead, cached to avoid the cost of re-establishment. This is crucial for OLTP performance. The problem with that approach is that, we never properly handle the termnation of those cached connections. For instance, when a session on the coordinator disconnects, you'd see the following logs on the workers: ``` 2020-03-20 09:13:39.454 CET [64028] LOG: could not receive data from client: Connection reset by peer ``` With this patch, we're terminating the cached connections properly at the end of the connection.pull/3631/head
parent
8deb805338
commit
7b4eb9611b
|
@ -494,6 +494,37 @@ CloseConnection(MultiConnection *connection)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ShutdownAllConnections shutdowns all the MultiConnections in the
|
||||||
|
* ConnectionHash.
|
||||||
|
*
|
||||||
|
* This function is intended to be called atexit() of the backend, so
|
||||||
|
* that the cached connections are closed properly. Calling this function
|
||||||
|
* at another point in the code could be dangerous, so think twice if you
|
||||||
|
* need to call this function.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ShutdownAllConnections(void)
|
||||||
|
{
|
||||||
|
ConnectionHashEntry *entry = NULL;
|
||||||
|
HASH_SEQ_STATUS status;
|
||||||
|
|
||||||
|
hash_seq_init(&status, ConnectionHash);
|
||||||
|
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != NULL)
|
||||||
|
{
|
||||||
|
dlist_iter iter;
|
||||||
|
|
||||||
|
dlist_foreach(iter, entry->connections)
|
||||||
|
{
|
||||||
|
MultiConnection *connection =
|
||||||
|
dlist_container(MultiConnection, connectionNode, iter.cur);
|
||||||
|
|
||||||
|
ShutdownConnection(connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ShutdownConnection, if necessary cancels the currently running statement,
|
* ShutdownConnection, if necessary cancels the currently running statement,
|
||||||
* and then closes the underlying libpq connection. The MultiConnection
|
* and then closes the underlying libpq connection. The MultiConnection
|
||||||
|
|
|
@ -87,6 +87,7 @@ static char *CitusVersion = CITUS_VERSION;
|
||||||
|
|
||||||
void _PG_init(void);
|
void _PG_init(void);
|
||||||
|
|
||||||
|
static void CitusBackendAtExit(void);
|
||||||
static void ResizeStackToMaximumDepth(void);
|
static void ResizeStackToMaximumDepth(void);
|
||||||
static void multi_log_hook(ErrorData *edata);
|
static void multi_log_hook(ErrorData *edata);
|
||||||
static void CreateRequiredDirectories(void);
|
static void CreateRequiredDirectories(void);
|
||||||
|
@ -270,6 +271,8 @@ _PG_init(void)
|
||||||
InitPlacementConnectionManagement();
|
InitPlacementConnectionManagement();
|
||||||
InitializeCitusQueryStats();
|
InitializeCitusQueryStats();
|
||||||
|
|
||||||
|
atexit(CitusBackendAtExit);
|
||||||
|
|
||||||
/* enable modification of pg_catalog tables during pg_upgrade */
|
/* enable modification of pg_catalog tables during pg_upgrade */
|
||||||
if (IsBinaryUpgrade)
|
if (IsBinaryUpgrade)
|
||||||
{
|
{
|
||||||
|
@ -279,6 +282,18 @@ _PG_init(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CitusBackendAtExit is called atexit of the backend for the purposes of
|
||||||
|
* any clean-up needed.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CitusBackendAtExit(void)
|
||||||
|
{
|
||||||
|
/* properly close all the cached connections */
|
||||||
|
ShutdownAllConnections();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Stack size increase during high memory load may cause unexpected crashes.
|
* Stack size increase during high memory load may cause unexpected crashes.
|
||||||
* With this alloca call, we are increasing stack size explicitly, so that if
|
* With this alloca call, we are increasing stack size explicitly, so that if
|
||||||
|
|
|
@ -221,6 +221,7 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
|
||||||
const char *database);
|
const char *database);
|
||||||
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
|
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
|
||||||
extern void CloseConnection(MultiConnection *connection);
|
extern void CloseConnection(MultiConnection *connection);
|
||||||
|
extern void ShutdownAllConnections(void);
|
||||||
extern void ShutdownConnection(MultiConnection *connection);
|
extern void ShutdownConnection(MultiConnection *connection);
|
||||||
|
|
||||||
/* dealing with a connection */
|
/* dealing with a connection */
|
||||||
|
|
Loading…
Reference in New Issue