diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 12b5da547..c5b300fd4 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -1285,7 +1285,12 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key (const char **) entry->values, false); INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentStart); - connection->connectionId = connectionId++; + + /* do not increment for restarted connections */ + if (connection->connectionId == 0) + { + connection->connectionId = connectionId++; + } /* * To avoid issues with interrupts not getting caught all our connections @@ -1501,6 +1506,57 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection } +/* + * RestartConnection starts a new connection attempt for the given + * MultiConnection. + * + * The internal state of the MultiConnection is preserved. For example, we + * assume that we already went through all the other initialization steps in + * StartNodeUserDatabaseConnection, such as incrementing shared connection + * counters. + * + * This function should be used cautiously. If a connection is already + * involved in a remote transaction, we cannot restart the underlying + * connection. The caller is responsible for enforcing the restrictions + * on this. + */ +void +RestartConnection(MultiConnection *connection) +{ + /* we cannot restart any connection that refers to a placement */ + Assert(dlist_is_empty(&connection->referencedPlacements)); + + /* we cannot restart any connection that is part of a transaction */ + Assert(connection->remoteTransaction.transactionState == REMOTE_TRANS_NOT_STARTED); + + ConnectionHashKey key; + strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH); + key.port = connection->port; + strlcpy(key.user, connection->user, NAMEDATALEN); + strlcpy(key.database, connection->database, NAMEDATALEN); + key.replicationConnParam = connection->requiresReplication; + + /* + * With low-level APIs, we shutdown and restart the connection. + * The main trick here is that we are using the same MultiConnection * + * such that all the state of the connection is preserved. + */ + ShutdownConnection(connection); + StartConnectionEstablishment(connection, &key); + + /* + * We are restarting an already initialized connection which has + * gone through StartNodeUserDatabaseConnection(). That's why we + * can safely mark the state initialized. + * + * Not that we have to do this because ShutdownConnection() sets the + * state to not initialized. + */ + connection->initilizationState = POOL_STATE_INITIALIZED; + connection->connectionState = MULTI_CONNECTION_CONNECTING; +} + + /* * RemoteTransactionIdle function returns true if we manually * set flag on run_commands_on_session_level_connection_to_node to true to diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index c5aeefd51..2f4ec99a3 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -254,6 +254,11 @@ ReportConnectionError(MultiConnection *connection, int elevel) if (pgConn != NULL) { messageDetail = pchomp(PQerrorMessage(pgConn)); + if (messageDetail == NULL || messageDetail[0] == '\0') + { + /* give a similar messages to Postgres */ + messageDetail = "connection not open"; + } } if (messageDetail) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 948d65791..4531245c1 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -476,6 +476,12 @@ typedef struct WorkerSession /* events reported by the latest call to WaitEventSetWait */ int latestUnconsumedWaitEvents; + + /* for some restricted scenarios, we allow a single connection retry */ + bool connectionRetried; + + /* keep track of if the session has an active connection */ + bool sessionHasActiveConnection; } WorkerSession; @@ -659,6 +665,7 @@ static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int nodePort); static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection); +static bool RemoteSocketClosedForNewSession(WorkerSession *session); static void ManageWorkerPool(WorkerPool *workerPool); static bool ShouldWaitForSlowStart(WorkerPool *workerPool); static int CalculateNewConnectionCount(WorkerPool *workerPool); @@ -675,6 +682,8 @@ static void MarkEstablishingSessionsTimedOut(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); static WaitEventSet * BuildWaitEventSet(List *sessionList); +static void AddSessionToWaitEventSet(WorkerSession *session, + WaitEventSet *waitEventSet); static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); @@ -715,6 +724,10 @@ static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution static int RebuildWaitEventSet(DistributedExecution *execution); static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount, bool *cancellationReceived); +#if PG_VERSION_NUM >= PG_VERSION_15 + +static bool ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount); +#endif static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime); static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime); static HeapTuple BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values); @@ -2424,11 +2437,25 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) dlist_init(&session->pendingTaskQueue); dlist_init(&session->readyTaskQueue); - /* keep track of how many connections are ready */ + /* + * Before using this connection in the distributed execution, we check + * whether the remote connection is closed/lost. This is common + * when we have a cached connection and remote server restarted + * (due to failover or restart etc.). We do this because we can + * retry connection a single time. + */ + if (RemoteSocketClosedForNewSession(session)) + { + connection->connectionState = MULTI_CONNECTION_LOST; + } + if (connection->connectionState == MULTI_CONNECTION_CONNECTED) { + /* keep track of how many connections are ready */ workerPool->activeConnectionCount++; workerPool->idleConnectionCount++; + + session->sessionHasActiveConnection = true; } workerPool->unusedConnectionCount++; @@ -2450,6 +2477,76 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) } +/* + * RemoteSocketClosedForNewSession is a helper function for detecting whether + * the remote socket corresponding to the input session is closed. This is + * mostly common there is a cached connection and remote server restarted + * (due to failover or restart etc.). + * + * The function is not a generic function that can be called at the start of + * the execution. The function is not generic because it does not check all + * the events, even ignores cancellation events. Future callers of this + * function should consider its limitations. + */ +static bool +RemoteSocketClosedForNewSession(WorkerSession *session) +{ + bool socketClosed = false; + +#if PG_VERSION_NUM >= PG_VERSION_15 + + if (!WaitEventSetCanReportClosed()) + { + /* we cannot detect for this OS */ + return socketClosed; + } + + MultiConnection *connection = session->connection; + long timeout = 0;/* don't wait */ + int eventSetSize = 2; + WaitEvent *events = palloc0(eventSetSize * sizeof(WaitEvent)); + + /* + * Only wait for WL_SOCKET_CLOSED and postmaster death, do not even check + * for cancellations. Everything else are going to be checked soon in the + * main event processing. At this point, our only goal is to understand + * whether the remote socket is closed or not. + */ + int originalWaitFlags = connection->waitFlags; + connection->waitFlags = WL_SOCKET_CLOSED; + WaitEventSet *waitEventSet = + CreateWaitEventSet(CurrentMemoryContext, eventSetSize); + + AddSessionToWaitEventSet(session, waitEventSet); + + /* always good to wait for postmaster death */ + CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, + NULL); + + int eventCount = WaitEventSetWait(waitEventSet, timeout, events, + eventSetSize, WAIT_EVENT_CLIENT_READ); + socketClosed = + ProcessWaitEventsForSocketClosed(events, eventCount); + + /* we can at most receive a single event, which is WL_SOCKET_CLOSED */ + Assert(eventCount <= 1); + + FreeWaitEventSet(waitEventSet); + pfree(events); + + /* + * We only searched for WL_SOCKET_CLOSED, and we processed the + * event already. Now, set back to the original flags. + */ + UpdateConnectionWaitFlags(session, originalWaitFlags); + session->latestUnconsumedWaitEvents = 0; + +#endif + + return socketClosed; +} + + /* * ShouldRunTasksSequentially returns true if each of the individual tasks * should be executed one by one. Note that this is different than @@ -2813,6 +2910,44 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC } +#if PG_VERSION_NUM >= PG_VERSION_15 + +/* + * ProcessWaitEventsForSocketClosed mainly checks for WL_SOCKET_CLOSED event. + * If WL_SOCKET_CLOSED is found, the function sets the underlying connection's + * state as MULTI_CONNECTION_LOST. + */ +static bool +ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount) +{ + int eventIndex = 0; + + /* process I/O events */ + for (; eventIndex < eventCount; eventIndex++) + { + WaitEvent *event = &events[eventIndex]; + + if (event->events & WL_POSTMASTER_DEATH) + { + ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + } + + WorkerSession *session = (WorkerSession *) event->user_data; + session->latestUnconsumedWaitEvents = event->events; + + if (session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED) + { + return true; + } + } + + return false; +} + + +#endif + + /* * ManageWorkerPool ensures the worker pool has the appropriate number of connections * based on the number of pending tasks. @@ -3647,13 +3782,38 @@ ConnectionStateMachine(WorkerSession *session) case MULTI_CONNECTION_LOST: { - /* managed to connect, but connection was lost */ - workerPool->activeConnectionCount--; - - if (session->currentTask == NULL) + /* + * If a connection is lost, we retry the connection for some + * very restricted scenarios. The main use case is to retry + * connection establishment when a cached connection is used + * in the executor while remote server has restarted / failedover + * etc. + * + * For simplicity, we only allow retrying connection establishment + * a single time. + * + * We can only retry connection when the remote transaction has + * not started over the connection. Otherwise, we'd have to deal + * with restoring the transaction state, which iis beyond our + * purpose at this time. + */ + RemoteTransaction *transaction = &connection->remoteTransaction; + if (!session->connectionRetried && + transaction->transactionState == REMOTE_TRANS_NOT_STARTED) { - /* this was an idle connection */ - workerPool->idleConnectionCount--; + /* + * Try to connect again, we will reuse the same MultiConnection + * and keep it as claimed. + */ + RestartConnection(connection); + + /* socket have changed */ + execution->rebuildWaitEventSet = true; + session->latestUnconsumedWaitEvents = 0; + + session->connectionRetried = true; + + break; } connection->connectionState = MULTI_CONNECTION_FAILED; @@ -3662,6 +3822,20 @@ ConnectionStateMachine(WorkerSession *session) case MULTI_CONNECTION_FAILED: { + /* managed to connect, but connection was lost */ + if (session->sessionHasActiveConnection) + { + workerPool->activeConnectionCount--; + + if (session->currentTask == NULL) + { + /* this was an idle connection */ + workerPool->idleConnectionCount--; + } + + session->sessionHasActiveConnection = false; + } + /* connection failed or was lost */ int totalConnectionCount = list_length(workerPool->sessionList); @@ -3821,6 +3995,7 @@ HandleMultiConnectionSuccess(WorkerSession *session) workerPool->activeConnectionCount++; workerPool->idleConnectionCount++; + session->sessionHasActiveConnection = true; } @@ -4164,7 +4339,13 @@ UpdateConnectionWaitFlags(WorkerSession *session, int waitFlags) return; } +#if PG_VERSION_NUM >= PG_VERSION_15 + + /* always detect closed sockets */ + connection->waitFlags = waitFlags | WL_SOCKET_CLOSED; +#else connection->waitFlags = waitFlags; +#endif /* without signalling the execution, the flag changes won't be reflected */ execution->waitFlagsChanged = true; @@ -4189,6 +4370,14 @@ CheckConnectionReady(WorkerSession *session) return false; } +#if PG_VERSION_NUM >= PG_VERSION_15 + if ((session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED) != 0) + { + connection->connectionState = MULTI_CONNECTION_LOST; + return false; + } +#endif + /* try to send all pending data */ int sendStatus = PQflush(connection->pgConn); if (sendStatus == -1) @@ -5341,44 +5530,7 @@ BuildWaitEventSet(List *sessionList) WorkerSession *session = NULL; foreach_ptr(session, sessionList) { - MultiConnection *connection = session->connection; - - if (connection->pgConn == NULL) - { - /* connection died earlier in the transaction */ - continue; - } - - if (connection->waitFlags == 0) - { - /* not currently waiting for this connection */ - continue; - } - - int sock = PQsocket(connection->pgConn); - if (sock == -1) - { - /* connection was closed */ - continue; - } - - int waitEventSetIndex = - CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags, sock, - NULL, (void *) session); - session->waitEventSetIndex = waitEventSetIndex; - - /* - * Inform failed to add to wait event set with a debug message as this - * is too detailed information for users. - */ - if (session->waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED) - { - ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("Adding wait event for node %s:%d failed. " - "The socket was: %d", - session->workerPool->nodeName, - session->workerPool->nodePort, sock))); - } + AddSessionToWaitEventSet(session, waitEventSet); } CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, @@ -5390,6 +5542,55 @@ BuildWaitEventSet(List *sessionList) } +/* + * AddSessionToWaitEventSet is a helper function which adds the session to + * the waitEventSet. The function does certain checks before adding the session + * to the waitEventSet. + */ +static void +AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet) +{ + MultiConnection *connection = session->connection; + + if (connection->pgConn == NULL) + { + /* connection died earlier in the transaction */ + return; + } + + if (connection->waitFlags == 0) + { + /* not currently waiting for this connection */ + return; + } + + int sock = PQsocket(connection->pgConn); + if (sock == -1) + { + /* connection was closed */ + return; + } + + int waitEventSetIndex = + CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags, sock, + NULL, (void *) session); + session->waitEventSetIndex = waitEventSetIndex; + + /* + * Inform failed to add to wait event set with a debug message as this + * is too detailed information for users. + */ + if (session->waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED) + { + ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("Adding wait event for node %s:%d failed. " + "The socket was: %d", + session->workerPool->nodeName, + session->workerPool->nodePort, sock))); + } +} + + /* * GetEventSetSize returns the event set size for a list of sessions. */ diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index b64785ec7..9a4e8a134 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -295,6 +295,7 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, int32 port, const char *user, const char *database); +extern void RestartConnection(MultiConnection *connection); extern void CloseAllConnectionsAfterTransaction(void); extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort, diff --git a/src/test/regress/expected/detect_conn_close.out b/src/test/regress/expected/detect_conn_close.out new file mode 100644 index 000000000..1bc264f65 --- /dev/null +++ b/src/test/regress/expected/detect_conn_close.out @@ -0,0 +1,220 @@ +-- +-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+ +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif +CREATE SCHEMA socket_close; +SET search_path TO socket_close; +CREATE OR REPLACE FUNCTION kill_all_cached_internal_conns(gpid bigint) +RETURNS bool LANGUAGE plpgsql +AS $function$ +DECLARE + killed_backend_ct int; +BEGIN + -- kill all the cached backends on the workers + WITH command AS (SELECT 'SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE application_name ILIKE ''%citus_internal gpid=' || gpid::text ||''' AND pid !=pg_backend_pid()) as foo') + SELECT sum(result::int) INTO killed_backend_ct FROM command JOIN LATERAL run_command_on_workers((SELECT * FROM command)) on (true); + + RETURN killed_backend_ct > 0; +END; +$function$; +SET citus.shard_count TO 8; +SET citus.shard_replication_factor TO 1; +CREATE TABLE socket_test_table(id bigserial, value text); +SELECT create_distributed_table('socket_test_table', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO socket_test_table (value) SELECT i::text FROM generate_series(0,100)i; +-- first, simulate that we only have one cached connection per node +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); + kill_all_cached_internal_conns +--------------------------------------------------------------------- + t +(1 row) + +-- show that none remains +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + result +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +-- even though the cached connections closed, the execution recovers and establishes new connections +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- now, use 16 connections per worker, we can still recover all connections +SET citus.max_adaptive_executor_pool_size TO 16; +SET citus.max_cached_conns_per_worker TO 16; +SET citus.force_max_query_parallelization TO ON; +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); + kill_all_cached_internal_conns +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- now, get back to sane defaults +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SET citus.force_max_query_parallelization TO OFF; +-- we can recover for modification queries as well +-- single row INSERT +INSERT INTO socket_test_table VALUES (1); +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); + kill_all_cached_internal_conns +--------------------------------------------------------------------- + t +(1 row) + +INSERT INTO socket_test_table VALUES (1); +-- single row UPDATE +UPDATE socket_test_table SET value = 15 WHERE id = 1; +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); + kill_all_cached_internal_conns +--------------------------------------------------------------------- + t +(1 row) + +UPDATE socket_test_table SET value = 15 WHERE id = 1; +-- we cannot recover in a transaction block +BEGIN; + SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 103 +(1 row) + + -- kill all the cached backends on the workers initiated by the current gpid + select kill_all_cached_internal_conns(citus_backend_gpid()); + kill_all_cached_internal_conns +--------------------------------------------------------------------- + t +(1 row) + + SELECT count(*) FROM socket_test_table; +ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: terminating connection due to administrator command +CONTEXT: while executing command on localhost:xxxxx +WARNING: terminating connection due to administrator command +CONTEXT: while executing command on localhost:xxxxx +ROLLBACK; +-- repartition joins also can recover +SET citus.enable_repartition_joins TO on; +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SELECT count(*) FROM socket_test_table t1 JOIN socket_test_table t2 USING(value); + count +--------------------------------------------------------------------- + 115 +(1 row) + +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); + kill_all_cached_internal_conns +--------------------------------------------------------------------- + t +(1 row) + +-- even though the cached connections closed, the execution recovers and establishes new connections +SELECT count(*) FROM socket_test_table t1 JOIN socket_test_table t2 USING(value); + count +--------------------------------------------------------------------- + 115 +(1 row) + +-- also, recover insert .. select repartitioning +INSERT INTO socket_test_table SELECT value::bigint, value FROM socket_test_table; +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); + kill_all_cached_internal_conns +--------------------------------------------------------------------- + t +(1 row) + +-- even though the cached connections closed, the execution recovers and establishes new connections +INSERT INTO socket_test_table SELECT value::bigint, value FROM socket_test_table; +-- also, recover with intermediate results +WITH cte_1 AS (SELECT * FROM socket_test_table LIMIT 1) SELECT count(*) FROM cte_1; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); + kill_all_cached_internal_conns +--------------------------------------------------------------------- + t +(1 row) + +-- even though the cached connections closed, the execution recovers and establishes new connections +WITH cte_1 AS (SELECT * FROM socket_test_table LIMIT 1) SELECT count(*) FROM cte_1; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- although should have no difference, we can recover from the failures on the workers as well +\c - - - :worker_1_port +SET search_path TO socket_close; +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SET citus.force_max_query_parallelization TO ON; +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 412 +(1 row) + +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); + kill_all_cached_internal_conns +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM socket_test_table; + count +--------------------------------------------------------------------- + 412 +(1 row) + +\c - - - :master_port +SET client_min_messages TO ERROR; +DROP SCHEMA socket_close CASCADE; diff --git a/src/test/regress/expected/detect_conn_close_0.out b/src/test/regress/expected/detect_conn_close_0.out new file mode 100644 index 000000000..27e9787c6 --- /dev/null +++ b/src/test/regress/expected/detect_conn_close_0.out @@ -0,0 +1,9 @@ +-- +-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+ +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 9abab954c..14b02d7a4 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -58,7 +58,7 @@ test: cte_inline recursive_view_local_table values sequences_with_different_type test: pg13 pg12 # run pg14 sequentially as it syncs metadata test: pg14 -test: pg15 pg15_jsonpath +test: pg15 pg15_jsonpath detect_conn_close test: drop_column_partitioned_table test: tableam diff --git a/src/test/regress/sql/detect_conn_close.sql b/src/test/regress/sql/detect_conn_close.sql new file mode 100644 index 000000000..56ec9bd1d --- /dev/null +++ b/src/test/regress/sql/detect_conn_close.sql @@ -0,0 +1,139 @@ +-- +-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+ +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif + +CREATE SCHEMA socket_close; +SET search_path TO socket_close; + +CREATE OR REPLACE FUNCTION kill_all_cached_internal_conns(gpid bigint) +RETURNS bool LANGUAGE plpgsql +AS $function$ +DECLARE + killed_backend_ct int; +BEGIN + -- kill all the cached backends on the workers + WITH command AS (SELECT 'SELECT count(*) FROM (SELECT pg_terminate_backend(pid, 1000) FROM pg_stat_activity WHERE application_name ILIKE ''%citus_internal gpid=' || gpid::text ||''' AND pid !=pg_backend_pid()) as foo') + SELECT sum(result::int) INTO killed_backend_ct FROM command JOIN LATERAL run_command_on_workers((SELECT * FROM command)) on (true); + + RETURN killed_backend_ct > 0; +END; +$function$; + + +SET citus.shard_count TO 8; +SET citus.shard_replication_factor TO 1; + +CREATE TABLE socket_test_table(id bigserial, value text); +SELECT create_distributed_table('socket_test_table', 'id'); +INSERT INTO socket_test_table (value) SELECT i::text FROM generate_series(0,100)i; + +-- first, simulate that we only have one cached connection per node +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SELECT count(*) FROM socket_test_table; + +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); + +-- show that none remains +SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$); + +-- even though the cached connections closed, the execution recovers and establishes new connections +SELECT count(*) FROM socket_test_table; + +-- now, use 16 connections per worker, we can still recover all connections +SET citus.max_adaptive_executor_pool_size TO 16; +SET citus.max_cached_conns_per_worker TO 16; +SET citus.force_max_query_parallelization TO ON; +SELECT count(*) FROM socket_test_table; + +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); + +SELECT count(*) FROM socket_test_table; + +-- now, get back to sane defaults +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SET citus.force_max_query_parallelization TO OFF; + +-- we can recover for modification queries as well + +-- single row INSERT +INSERT INTO socket_test_table VALUES (1); + +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); + +INSERT INTO socket_test_table VALUES (1); + +-- single row UPDATE +UPDATE socket_test_table SET value = 15 WHERE id = 1; +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); +UPDATE socket_test_table SET value = 15 WHERE id = 1; + +-- we cannot recover in a transaction block +BEGIN; + SELECT count(*) FROM socket_test_table; + + -- kill all the cached backends on the workers initiated by the current gpid + select kill_all_cached_internal_conns(citus_backend_gpid()); + + SELECT count(*) FROM socket_test_table; +ROLLBACK; + + +-- repartition joins also can recover +SET citus.enable_repartition_joins TO on; +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SELECT count(*) FROM socket_test_table t1 JOIN socket_test_table t2 USING(value); + +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); + +-- even though the cached connections closed, the execution recovers and establishes new connections +SELECT count(*) FROM socket_test_table t1 JOIN socket_test_table t2 USING(value); + +-- also, recover insert .. select repartitioning +INSERT INTO socket_test_table SELECT value::bigint, value FROM socket_test_table; + +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); + +-- even though the cached connections closed, the execution recovers and establishes new connections +INSERT INTO socket_test_table SELECT value::bigint, value FROM socket_test_table; + +-- also, recover with intermediate results +WITH cte_1 AS (SELECT * FROM socket_test_table LIMIT 1) SELECT count(*) FROM cte_1; + +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); + +-- even though the cached connections closed, the execution recovers and establishes new connections +WITH cte_1 AS (SELECT * FROM socket_test_table LIMIT 1) SELECT count(*) FROM cte_1; + +-- although should have no difference, we can recover from the failures on the workers as well +\c - - - :worker_1_port +SET search_path TO socket_close; + +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.max_cached_conns_per_worker TO 1; +SET citus.force_max_query_parallelization TO ON; +SELECT count(*) FROM socket_test_table; +-- kill all the cached backends on the workers initiated by the current gpid +select kill_all_cached_internal_conns(citus_backend_gpid()); +SELECT count(*) FROM socket_test_table; + +\c - - - :master_port + +SET client_min_messages TO ERROR; +DROP SCHEMA socket_close CASCADE;