diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 577378803..516424748 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -1482,6 +1482,30 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection } +/* + * RestartConnection starts a new connection attempt for the given + * MultiConnection. + * + * We assume that we already went through all the other initialization steps in + * StartNodeUserDatabaseConnection, such as incrementing shared connection + * counters. + */ +void +RestartConnection(MultiConnection *connection) +{ + ShutdownConnection(connection); + + ConnectionHashKey key; + strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH); + key.port = connection->port; + strlcpy(key.user, connection->user, NAMEDATALEN); + strlcpy(key.database, connection->database, NAMEDATALEN); + key.replicationConnParam = connection->requiresReplication; + + StartConnectionEstablishment(connection, &key); +} + + /* * ResetConnection preserves the given connection for later usage by * resetting its states. diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index a0e16876a..8f9577d7f 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -476,6 +476,9 @@ typedef struct WorkerSession /* events reported by the latest call to WaitEventSetWait */ int latestUnconsumedWaitEvents; + + /* number of times we retried establishing the connection */ + int connectionRetryCount; } WorkerSession; @@ -3656,6 +3659,26 @@ ConnectionStateMachine(WorkerSession *session) workerPool->idleConnectionCount--; } + /* TODO: refine this check */ + RemoteTransaction *transaction = &connection->remoteTransaction; + if (!transaction->transactionCritical && + session->connectionRetryCount == 0) + { + session->connectionRetryCount++; + + /* + * Try to connect again, we will reuse the same MultiConnection + * and keep it as claimed. + */ + RestartConnection(connection); + + /* socket will have changed */ + execution->rebuildWaitEventSet = true; + + connection->connectionState = MULTI_CONNECTION_INITIAL; + break; + } + connection->connectionState = MULTI_CONNECTION_FAILED; break; } @@ -4164,7 +4187,11 @@ UpdateConnectionWaitFlags(WorkerSession *session, int waitFlags) return; } +#if PG_VERSION_NUM >= PG_VERSION_15 + connection->waitFlags = waitFlags | WL_SOCKET_CLOSED; +#else connection->waitFlags = waitFlags; +#endif /* without signalling the execution, the flag changes won't be reflected */ execution->waitFlagsChanged = true; @@ -4202,6 +4229,14 @@ CheckConnectionReady(WorkerSession *session) waitFlags = waitFlags | WL_SOCKET_WRITEABLE; } +#if PG_VERSION_NUM >= PG_VERSION_15 + if ((session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED) != 0) + { + connection->connectionState = MULTI_CONNECTION_LOST; + return false; + } +#endif + if ((session->latestUnconsumedWaitEvents & WL_SOCKET_READABLE) != 0) { if (PQconsumeInput(connection->pgConn) == 0) diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 6c3b8ae8d..9615374e7 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -294,6 +294,7 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, int32 port, const char *user, const char *database); +extern void RestartConnection(MultiConnection *connection); extern void CloseAllConnectionsAfterTransaction(void); extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort,