Detect remotely closed sockets and add a single connection retry in the executor (#6404)

PostgreSQL 15 exposes WL_SOCKET_CLOSED in WaitEventSet API, which is
useful for detecting closed remote sockets. In this patch, we use this
new event and try to detect closed remote sockets in the executor.

When a closed socket is detected, the executor now has the ability to
retry the connection establishment. Note that, the executor can retry
connection establishments only for the connection that has not been
used. Basically, this patch is mostly useful for preventing the executor
to fail if a cached connection is closed because of the worker node
restart (or worker failover).

In other words, the executor cannot retry connection establishment if we
are in a distributed transaction AND any command has been sent over the
connection. That requires more sophisticated retry mechanisms. For now,
fixing the above use case is enough.


Fixes #5538 

Earlier discussions: #5908, #6259 and #6283

### Summary of the current approach regards to earlier trials

As noted, we explored some alternatives before getting into this.
https://github.com/citusdata/citus/pull/6283 is simple, but lacks an
important property. We should be checking for `WL_SOCKET_CLOSED`
_before_ sending anything over the wire. Otherwise, it becomes very
tricky to understand which connection is actually safe to retry. For
example, in the current patch, we can safely check
`transaction->transactionState == REMOTE_TRANS_NOT_STARTED` before
restarting a connection.

#6259 does what we intent here (e.g., check for sending any command).
However, as @marcocitus noted, it is very tricky to handle
`WaitEventSets` in multiple places. And, the executor is designed such
that it reacts to the events. So, adding anything `pre-executor` seemed
too ugly.

In the end, I converged into this patch. This patch relies on the
simplicity of #6283 and also does a very limited handling of
`WaitEventSets`, just for our purpose. Just before we add any connection
to the execution, we check if the remote session has already closed.
With that, we do a brief interaction of multiple wait event processing,
but with different purposes. The new wait event processing we added does
not even consider cancellations. We let that handled by the main event
processing loop.

Co-authored-by: Marco Slot <marco.slot@gmail.com>
pull/6428/head^2
Önder Kalacı 2022-10-14 15:08:49 +02:00 committed by GitHub
parent 4d037f03fe
commit 8b624b5c9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 678 additions and 47 deletions

View File

@ -1285,7 +1285,12 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key
(const char **) entry->values,
false);
INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentStart);
/* do not increment for restarted connections */
if (connection->connectionId == 0)
{
connection->connectionId = connectionId++;
}
/*
* To avoid issues with interrupts not getting caught all our connections
@ -1501,6 +1506,57 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
}
/*
* RestartConnection starts a new connection attempt for the given
* MultiConnection.
*
* The internal state of the MultiConnection is preserved. For example, we
* assume that we already went through all the other initialization steps in
* StartNodeUserDatabaseConnection, such as incrementing shared connection
* counters.
*
* This function should be used cautiously. If a connection is already
* involved in a remote transaction, we cannot restart the underlying
* connection. The caller is responsible for enforcing the restrictions
* on this.
*/
void
RestartConnection(MultiConnection *connection)
{
/* we cannot restart any connection that refers to a placement */
Assert(dlist_is_empty(&connection->referencedPlacements));
/* we cannot restart any connection that is part of a transaction */
Assert(connection->remoteTransaction.transactionState == REMOTE_TRANS_NOT_STARTED);
ConnectionHashKey key;
strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH);
key.port = connection->port;
strlcpy(key.user, connection->user, NAMEDATALEN);
strlcpy(key.database, connection->database, NAMEDATALEN);
key.replicationConnParam = connection->requiresReplication;
/*
* With low-level APIs, we shutdown and restart the connection.
* The main trick here is that we are using the same MultiConnection *
* such that all the state of the connection is preserved.
*/
ShutdownConnection(connection);
StartConnectionEstablishment(connection, &key);
/*
* We are restarting an already initialized connection which has
* gone through StartNodeUserDatabaseConnection(). That's why we
* can safely mark the state initialized.
*
* Not that we have to do this because ShutdownConnection() sets the
* state to not initialized.
*/
connection->initilizationState = POOL_STATE_INITIALIZED;
connection->connectionState = MULTI_CONNECTION_CONNECTING;
}
/*
* RemoteTransactionIdle function returns true if we manually
* set flag on run_commands_on_session_level_connection_to_node to true to

View File

@ -254,6 +254,11 @@ ReportConnectionError(MultiConnection *connection, int elevel)
if (pgConn != NULL)
{
messageDetail = pchomp(PQerrorMessage(pgConn));
if (messageDetail == NULL || messageDetail[0] == '\0')
{
/* give a similar messages to Postgres */
messageDetail = "connection not open";
}
}
if (messageDetail)

View File

@ -476,6 +476,12 @@ typedef struct WorkerSession
/* events reported by the latest call to WaitEventSetWait */
int latestUnconsumedWaitEvents;
/* for some restricted scenarios, we allow a single connection retry */
bool connectionRetried;
/* keep track of if the session has an active connection */
bool sessionHasActiveConnection;
} WorkerSession;
@ -659,6 +665,7 @@ static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution,
char *nodeName, int nodePort);
static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
MultiConnection *connection);
static bool RemoteSocketClosedForNewSession(WorkerSession *session);
static void ManageWorkerPool(WorkerPool *workerPool);
static bool ShouldWaitForSlowStart(WorkerPool *workerPool);
static int CalculateNewConnectionCount(WorkerPool *workerPool);
@ -675,6 +682,8 @@ static void MarkEstablishingSessionsTimedOut(WorkerPool *workerPool);
static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution);
static WaitEventSet * BuildWaitEventSet(List *sessionList);
static void AddSessionToWaitEventSet(WorkerSession *session,
WaitEventSet *waitEventSet);
static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList);
static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session);
static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session);
@ -715,6 +724,10 @@ static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution
static int RebuildWaitEventSet(DistributedExecution *execution);
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
eventCount, bool *cancellationReceived);
#if PG_VERSION_NUM >= PG_VERSION_15
static bool ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount);
#endif
static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
static HeapTuple BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values);
@ -2424,11 +2437,25 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
dlist_init(&session->pendingTaskQueue);
dlist_init(&session->readyTaskQueue);
/* keep track of how many connections are ready */
/*
* Before using this connection in the distributed execution, we check
* whether the remote connection is closed/lost. This is common
* when we have a cached connection and remote server restarted
* (due to failover or restart etc.). We do this because we can
* retry connection a single time.
*/
if (RemoteSocketClosedForNewSession(session))
{
connection->connectionState = MULTI_CONNECTION_LOST;
}
if (connection->connectionState == MULTI_CONNECTION_CONNECTED)
{
/* keep track of how many connections are ready */
workerPool->activeConnectionCount++;
workerPool->idleConnectionCount++;
session->sessionHasActiveConnection = true;
}
workerPool->unusedConnectionCount++;
@ -2450,6 +2477,76 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
}
/*
* RemoteSocketClosedForNewSession is a helper function for detecting whether
* the remote socket corresponding to the input session is closed. This is
* mostly common there is a cached connection and remote server restarted
* (due to failover or restart etc.).
*
* The function is not a generic function that can be called at the start of
* the execution. The function is not generic because it does not check all
* the events, even ignores cancellation events. Future callers of this
* function should consider its limitations.
*/
static bool
RemoteSocketClosedForNewSession(WorkerSession *session)
{
bool socketClosed = false;
#if PG_VERSION_NUM >= PG_VERSION_15
if (!WaitEventSetCanReportClosed())
{
/* we cannot detect for this OS */
return socketClosed;
}
MultiConnection *connection = session->connection;
long timeout = 0;/* don't wait */
int eventSetSize = 2;
WaitEvent *events = palloc0(eventSetSize * sizeof(WaitEvent));
/*
* Only wait for WL_SOCKET_CLOSED and postmaster death, do not even check
* for cancellations. Everything else are going to be checked soon in the
* main event processing. At this point, our only goal is to understand
* whether the remote socket is closed or not.
*/
int originalWaitFlags = connection->waitFlags;
connection->waitFlags = WL_SOCKET_CLOSED;
WaitEventSet *waitEventSet =
CreateWaitEventSet(CurrentMemoryContext, eventSetSize);
AddSessionToWaitEventSet(session, waitEventSet);
/* always good to wait for postmaster death */
CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL,
NULL);
int eventCount = WaitEventSetWait(waitEventSet, timeout, events,
eventSetSize, WAIT_EVENT_CLIENT_READ);
socketClosed =
ProcessWaitEventsForSocketClosed(events, eventCount);
/* we can at most receive a single event, which is WL_SOCKET_CLOSED */
Assert(eventCount <= 1);
FreeWaitEventSet(waitEventSet);
pfree(events);
/*
* We only searched for WL_SOCKET_CLOSED, and we processed the
* event already. Now, set back to the original flags.
*/
UpdateConnectionWaitFlags(session, originalWaitFlags);
session->latestUnconsumedWaitEvents = 0;
#endif
return socketClosed;
}
/*
* ShouldRunTasksSequentially returns true if each of the individual tasks
* should be executed one by one. Note that this is different than
@ -2813,6 +2910,44 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC
}
#if PG_VERSION_NUM >= PG_VERSION_15
/*
* ProcessWaitEventsForSocketClosed mainly checks for WL_SOCKET_CLOSED event.
* If WL_SOCKET_CLOSED is found, the function sets the underlying connection's
* state as MULTI_CONNECTION_LOST.
*/
static bool
ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount)
{
int eventIndex = 0;
/* process I/O events */
for (; eventIndex < eventCount; eventIndex++)
{
WaitEvent *event = &events[eventIndex];
if (event->events & WL_POSTMASTER_DEATH)
{
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
}
WorkerSession *session = (WorkerSession *) event->user_data;
session->latestUnconsumedWaitEvents = event->events;
if (session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED)
{
return true;
}
}
return false;
}
#endif
/*
* ManageWorkerPool ensures the worker pool has the appropriate number of connections
* based on the number of pending tasks.
@ -3647,13 +3782,38 @@ ConnectionStateMachine(WorkerSession *session)
case MULTI_CONNECTION_LOST:
{
/* managed to connect, but connection was lost */
workerPool->activeConnectionCount--;
if (session->currentTask == NULL)
/*
* If a connection is lost, we retry the connection for some
* very restricted scenarios. The main use case is to retry
* connection establishment when a cached connection is used
* in the executor while remote server has restarted / failedover
* etc.
*
* For simplicity, we only allow retrying connection establishment
* a single time.
*
* We can only retry connection when the remote transaction has
* not started over the connection. Otherwise, we'd have to deal
* with restoring the transaction state, which iis beyond our
* purpose at this time.
*/
RemoteTransaction *transaction = &connection->remoteTransaction;
if (!session->connectionRetried &&
transaction->transactionState == REMOTE_TRANS_NOT_STARTED)
{
/* this was an idle connection */
workerPool->idleConnectionCount--;
/*
* Try to connect again, we will reuse the same MultiConnection
* and keep it as claimed.
*/
RestartConnection(connection);
/* socket have changed */
execution->rebuildWaitEventSet = true;
session->latestUnconsumedWaitEvents = 0;
session->connectionRetried = true;
break;
}
connection->connectionState = MULTI_CONNECTION_FAILED;
@ -3662,6 +3822,20 @@ ConnectionStateMachine(WorkerSession *session)
case MULTI_CONNECTION_FAILED:
{
/* managed to connect, but connection was lost */
if (session->sessionHasActiveConnection)
{
workerPool->activeConnectionCount--;
if (session->currentTask == NULL)
{
/* this was an idle connection */
workerPool->idleConnectionCount--;
}
session->sessionHasActiveConnection = false;
}
/* connection failed or was lost */
int totalConnectionCount = list_length(workerPool->sessionList);
@ -3821,6 +3995,7 @@ HandleMultiConnectionSuccess(WorkerSession *session)
workerPool->activeConnectionCount++;
workerPool->idleConnectionCount++;
session->sessionHasActiveConnection = true;
}
@ -4164,7 +4339,13 @@ UpdateConnectionWaitFlags(WorkerSession *session, int waitFlags)
return;
}
#if PG_VERSION_NUM >= PG_VERSION_15
/* always detect closed sockets */
connection->waitFlags = waitFlags | WL_SOCKET_CLOSED;
#else
connection->waitFlags = waitFlags;
#endif
/* without signalling the execution, the flag changes won't be reflected */
execution->waitFlagsChanged = true;
@ -4189,6 +4370,14 @@ CheckConnectionReady(WorkerSession *session)
return false;
}
#if PG_VERSION_NUM >= PG_VERSION_15
if ((session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED) != 0)
{
connection->connectionState = MULTI_CONNECTION_LOST;
return false;
}
#endif
/* try to send all pending data */
int sendStatus = PQflush(connection->pgConn);
if (sendStatus == -1)
@ -5341,25 +5530,45 @@ BuildWaitEventSet(List *sessionList)
WorkerSession *session = NULL;
foreach_ptr(session, sessionList)
{
AddSessionToWaitEventSet(session, waitEventSet);
}
CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL,
NULL);
CitusAddWaitEventSetToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch,
NULL);
return waitEventSet;
}
/*
* AddSessionToWaitEventSet is a helper function which adds the session to
* the waitEventSet. The function does certain checks before adding the session
* to the waitEventSet.
*/
static void
AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet)
{
MultiConnection *connection = session->connection;
if (connection->pgConn == NULL)
{
/* connection died earlier in the transaction */
continue;
return;
}
if (connection->waitFlags == 0)
{
/* not currently waiting for this connection */
continue;
return;
}
int sock = PQsocket(connection->pgConn);
if (sock == -1)
{
/* connection was closed */
continue;
return;
}
int waitEventSetIndex =
@ -5379,14 +5588,6 @@ BuildWaitEventSet(List *sessionList)
session->workerPool->nodeName,
session->workerPool->nodePort, sock)));
}
}
CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL,
NULL);
CitusAddWaitEventSetToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch,
NULL);
return waitEventSet;
}

View File

@ -295,6 +295,7 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
int32 port,
const char *user,
const char *database);
extern void RestartConnection(MultiConnection *connection);
extern void CloseAllConnectionsAfterTransaction(void);
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort,

View File

@ -0,0 +1,220 @@
--
-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+
--
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
\endif
CREATE SCHEMA socket_close;
SET search_path TO socket_close;
CREATE OR REPLACE FUNCTION kill_all_cached_internal_conns(gpid bigint)
RETURNS bool LANGUAGE plpgsql
AS $function$
DECLARE
killed_backend_ct int;
BEGIN
-- kill all the cached backends on the workers
WITH command AS (SELECT 'SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE application_name ILIKE ''%citus_internal gpid=' || gpid::text ||''' AND pid !=pg_backend_pid()) as foo')
SELECT sum(result::int) INTO killed_backend_ct FROM command JOIN LATERAL run_command_on_workers((SELECT * FROM command)) on (true);
RETURN killed_backend_ct > 0;
END;
$function$;
SET citus.shard_count TO 8;
SET citus.shard_replication_factor TO 1;
CREATE TABLE socket_test_table(id bigserial, value text);
SELECT create_distributed_table('socket_test_table', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO socket_test_table (value) SELECT i::text FROM generate_series(0,100)i;
-- first, simulate that we only have one cached connection per node
SET citus.max_adaptive_executor_pool_size TO 1;
SET citus.max_cached_conns_per_worker TO 1;
SELECT count(*) FROM socket_test_table;
count
---------------------------------------------------------------------
101
(1 row)
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
kill_all_cached_internal_conns
---------------------------------------------------------------------
t
(1 row)
-- show that none remains
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
result
---------------------------------------------------------------------
0
0
(2 rows)
-- even though the cached connections closed, the execution recovers and establishes new connections
SELECT count(*) FROM socket_test_table;
count
---------------------------------------------------------------------
101
(1 row)
-- now, use 16 connections per worker, we can still recover all connections
SET citus.max_adaptive_executor_pool_size TO 16;
SET citus.max_cached_conns_per_worker TO 16;
SET citus.force_max_query_parallelization TO ON;
SELECT count(*) FROM socket_test_table;
count
---------------------------------------------------------------------
101
(1 row)
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
kill_all_cached_internal_conns
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM socket_test_table;
count
---------------------------------------------------------------------
101
(1 row)
-- now, get back to sane defaults
SET citus.max_adaptive_executor_pool_size TO 1;
SET citus.max_cached_conns_per_worker TO 1;
SET citus.force_max_query_parallelization TO OFF;
-- we can recover for modification queries as well
-- single row INSERT
INSERT INTO socket_test_table VALUES (1);
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
kill_all_cached_internal_conns
---------------------------------------------------------------------
t
(1 row)
INSERT INTO socket_test_table VALUES (1);
-- single row UPDATE
UPDATE socket_test_table SET value = 15 WHERE id = 1;
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
kill_all_cached_internal_conns
---------------------------------------------------------------------
t
(1 row)
UPDATE socket_test_table SET value = 15 WHERE id = 1;
-- we cannot recover in a transaction block
BEGIN;
SELECT count(*) FROM socket_test_table;
count
---------------------------------------------------------------------
103
(1 row)
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
kill_all_cached_internal_conns
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM socket_test_table;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: terminating connection due to administrator command
CONTEXT: while executing command on localhost:xxxxx
WARNING: terminating connection due to administrator command
CONTEXT: while executing command on localhost:xxxxx
ROLLBACK;
-- repartition joins also can recover
SET citus.enable_repartition_joins TO on;
SET citus.max_adaptive_executor_pool_size TO 1;
SET citus.max_cached_conns_per_worker TO 1;
SELECT count(*) FROM socket_test_table t1 JOIN socket_test_table t2 USING(value);
count
---------------------------------------------------------------------
115
(1 row)
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
kill_all_cached_internal_conns
---------------------------------------------------------------------
t
(1 row)
-- even though the cached connections closed, the execution recovers and establishes new connections
SELECT count(*) FROM socket_test_table t1 JOIN socket_test_table t2 USING(value);
count
---------------------------------------------------------------------
115
(1 row)
-- also, recover insert .. select repartitioning
INSERT INTO socket_test_table SELECT value::bigint, value FROM socket_test_table;
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
kill_all_cached_internal_conns
---------------------------------------------------------------------
t
(1 row)
-- even though the cached connections closed, the execution recovers and establishes new connections
INSERT INTO socket_test_table SELECT value::bigint, value FROM socket_test_table;
-- also, recover with intermediate results
WITH cte_1 AS (SELECT * FROM socket_test_table LIMIT 1) SELECT count(*) FROM cte_1;
count
---------------------------------------------------------------------
1
(1 row)
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
kill_all_cached_internal_conns
---------------------------------------------------------------------
t
(1 row)
-- even though the cached connections closed, the execution recovers and establishes new connections
WITH cte_1 AS (SELECT * FROM socket_test_table LIMIT 1) SELECT count(*) FROM cte_1;
count
---------------------------------------------------------------------
1
(1 row)
-- although should have no difference, we can recover from the failures on the workers as well
\c - - - :worker_1_port
SET search_path TO socket_close;
SET citus.max_adaptive_executor_pool_size TO 1;
SET citus.max_cached_conns_per_worker TO 1;
SET citus.force_max_query_parallelization TO ON;
SELECT count(*) FROM socket_test_table;
count
---------------------------------------------------------------------
412
(1 row)
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
kill_all_cached_internal_conns
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM socket_test_table;
count
---------------------------------------------------------------------
412
(1 row)
\c - - - :master_port
SET client_min_messages TO ERROR;
DROP SCHEMA socket_close CASCADE;

View File

@ -0,0 +1,9 @@
--
-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+
--
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q

View File

@ -58,7 +58,7 @@ test: cte_inline recursive_view_local_table values sequences_with_different_type
test: pg13 pg12
# run pg14 sequentially as it syncs metadata
test: pg14
test: pg15 pg15_jsonpath
test: pg15 pg15_jsonpath detect_conn_close
test: drop_column_partitioned_table
test: tableam

View File

@ -0,0 +1,139 @@
--
-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+
--
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
\endif
CREATE SCHEMA socket_close;
SET search_path TO socket_close;
CREATE OR REPLACE FUNCTION kill_all_cached_internal_conns(gpid bigint)
RETURNS bool LANGUAGE plpgsql
AS $function$
DECLARE
killed_backend_ct int;
BEGIN
-- kill all the cached backends on the workers
WITH command AS (SELECT 'SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE application_name ILIKE ''%citus_internal gpid=' || gpid::text ||''' AND pid !=pg_backend_pid()) as foo')
SELECT sum(result::int) INTO killed_backend_ct FROM command JOIN LATERAL run_command_on_workers((SELECT * FROM command)) on (true);
RETURN killed_backend_ct > 0;
END;
$function$;
SET citus.shard_count TO 8;
SET citus.shard_replication_factor TO 1;
CREATE TABLE socket_test_table(id bigserial, value text);
SELECT create_distributed_table('socket_test_table', 'id');
INSERT INTO socket_test_table (value) SELECT i::text FROM generate_series(0,100)i;
-- first, simulate that we only have one cached connection per node
SET citus.max_adaptive_executor_pool_size TO 1;
SET citus.max_cached_conns_per_worker TO 1;
SELECT count(*) FROM socket_test_table;
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
-- show that none remains
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
-- even though the cached connections closed, the execution recovers and establishes new connections
SELECT count(*) FROM socket_test_table;
-- now, use 16 connections per worker, we can still recover all connections
SET citus.max_adaptive_executor_pool_size TO 16;
SET citus.max_cached_conns_per_worker TO 16;
SET citus.force_max_query_parallelization TO ON;
SELECT count(*) FROM socket_test_table;
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
SELECT count(*) FROM socket_test_table;
-- now, get back to sane defaults
SET citus.max_adaptive_executor_pool_size TO 1;
SET citus.max_cached_conns_per_worker TO 1;
SET citus.force_max_query_parallelization TO OFF;
-- we can recover for modification queries as well
-- single row INSERT
INSERT INTO socket_test_table VALUES (1);
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
INSERT INTO socket_test_table VALUES (1);
-- single row UPDATE
UPDATE socket_test_table SET value = 15 WHERE id = 1;
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
UPDATE socket_test_table SET value = 15 WHERE id = 1;
-- we cannot recover in a transaction block
BEGIN;
SELECT count(*) FROM socket_test_table;
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
SELECT count(*) FROM socket_test_table;
ROLLBACK;
-- repartition joins also can recover
SET citus.enable_repartition_joins TO on;
SET citus.max_adaptive_executor_pool_size TO 1;
SET citus.max_cached_conns_per_worker TO 1;
SELECT count(*) FROM socket_test_table t1 JOIN socket_test_table t2 USING(value);
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
-- even though the cached connections closed, the execution recovers and establishes new connections
SELECT count(*) FROM socket_test_table t1 JOIN socket_test_table t2 USING(value);
-- also, recover insert .. select repartitioning
INSERT INTO socket_test_table SELECT value::bigint, value FROM socket_test_table;
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
-- even though the cached connections closed, the execution recovers and establishes new connections
INSERT INTO socket_test_table SELECT value::bigint, value FROM socket_test_table;
-- also, recover with intermediate results
WITH cte_1 AS (SELECT * FROM socket_test_table LIMIT 1) SELECT count(*) FROM cte_1;
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
-- even though the cached connections closed, the execution recovers and establishes new connections
WITH cte_1 AS (SELECT * FROM socket_test_table LIMIT 1) SELECT count(*) FROM cte_1;
-- although should have no difference, we can recover from the failures on the workers as well
\c - - - :worker_1_port
SET search_path TO socket_close;
SET citus.max_adaptive_executor_pool_size TO 1;
SET citus.max_cached_conns_per_worker TO 1;
SET citus.force_max_query_parallelization TO ON;
SELECT count(*) FROM socket_test_table;
-- kill all the cached backends on the workers initiated by the current gpid
select kill_all_cached_internal_conns(citus_backend_gpid());
SELECT count(*) FROM socket_test_table;
\c - - - :master_port
SET client_min_messages TO ERROR;
DROP SCHEMA socket_close CASCADE;