diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 20d0353b6..51c85673f 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -63,7 +63,7 @@ static List * EnsureTxStateAndGetHealthyConnections(dlist_head *connections, static bool RemoteSocketClosed(MultiConnection *connection); #if PG_VERSION_NUM >= PG_VERSION_15 static bool ProcessWaitEventsForSocketClose(WaitEvent *events, int eventCount, - bool *socketClosed); + List **closedConnectionList); #endif static void ErrorIfMultipleMetadataConnectionExists(List *connections); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); @@ -549,116 +549,59 @@ static List * EnsureTxStateAndGetHealthyConnections(dlist_head *connections, bool checkTransactionState) { - List *healthyConnections = NIL; + List *connectionList = NIL; + bool waitEventSetCanReportClosed = false; + +#if PG_VERSION_NUM >= PG_VERSION_15 + + /* some kernel builds do not support WL_SOCKET_CLOSED */ + waitEventSetCanReportClosed = WaitEventSetCanReportClosed(); +#endif dlist_iter iter; + dlist_foreach(iter, connections) { MultiConnection *connection = dlist_container(MultiConnection, connectionNode, iter.cur); - RemoteTransaction *transaction = &connection->remoteTransaction; - bool inRemoteTrasaction = - transaction->transactionState != REMOTE_TRANS_NOT_STARTED; + connectionList = lappend(connectionList, connection); + } - if (checkTransactionState && inRemoteTrasaction) - { - PGTransactionStatusType status = PQtransactionStatus(connection->pgConn); + if (!waitEventSetCanReportClosed) + { + return connectionList; + } - /* if the connection is in a bad state, so is the transaction's state */ - if (status == PQTRANS_INERROR || status == PQTRANS_UNKNOWN) - { - transaction->transactionFailed = true; - } + int eventSetSize = EventSetSizeForConnectionList(connectionList); + WaitEvent *events = palloc(sizeof(WaitEvent) * eventSetSize); + WaitEventSet *waitEventSet = + CreateWaitEventSet(CurrentMemoryContext, eventSetSize); - if (transaction->transactionFailed && - (transaction->transactionCritical || - !dlist_is_empty(&connection->referencedPlacements))) - { - /* - * If a connection is executing a critical transaction or accessed any - * placements, we should not skip this connection (or return as - * healthy connection). - * - * Critical transaction means that the caller -- or the initiator - * of the transaction -- cannot afford to handle any failures - * within the transaction, so better fail right now. - * - * If a placement is accessed inside a transaction and the - * transaction has failed, we cannot proceed. Otherwise, - * another connection in the same transaction might try - * to access the same placement over a different connection. - * That could cause self-deadlocks or break read-your-own-writes - * consistency. - */ - ReportConnectionError(connection, ERROR); - } - } - if (connection->pgConn == NULL || PQsocket(connection->pgConn) == -1 || - PQstatus(connection->pgConn) == CONNECTION_BAD || - connection->connectionState == MULTI_CONNECTION_FAILED || - RemoteSocketClosed(connection)) + dlist_foreach(iter, connections) + { + MultiConnection *connection = + dlist_container(MultiConnection, connectionNode, iter.cur); + + int sock = PQsocket(connection->pgConn); + int waitEventSetIndex = + CitusAddWaitEventSetToSet(waitEventSet, WL_SOCKET_CLOSED, sock, + NULL, (void *) connection); + + if (waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED) { /* - * If a connection is not usable and we do not need to throw an error, - * close at the end of the transaction. + * Inform failed to add to wait event set with a debug message as this + * is too detailed information for users. We let the code flow just in + * case it can successfully finish the execution. */ - connection->forceCloseAtTransactionEnd = true; + ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("Adding wait event for node %s:%d failed. " + "The socket was: %d", + connection->hostname, + connection->port, sock))); } - else - { - /* connection is healthy */ - healthyConnections = lappend(healthyConnections, connection); - } - } - - return healthyConnections; -} - - -/* - * RemoteSocketClosed returns true if the remote socket is closed. - * - * Note that we rely on WL_SOCKET_CLOSED, which is only available - * for PG 15+ and for kernels that support the feature. - */ -static bool -RemoteSocketClosed(MultiConnection *connection) -{ - bool socketClosed = false; -#if PG_VERSION_NUM >= PG_VERSION_15 - - if (!WaitEventSetCanReportClosed()) - { - /* the kernel build does not support WL_SOCKET_CLOSED */ - return false; - } - - WaitEvent events[3]; - WaitEventSet *waitEventSet = - CreateWaitEventSet(CurrentMemoryContext, 3); - int sock = PQsocket(connection->pgConn); - int waitEventSetIndex = - CitusAddWaitEventSetToSet(waitEventSet, WL_SOCKET_CLOSED, sock, - NULL, (void *) connection); - - if (waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED) - { - /* - * Inform failed to add to wait event set with a debug message as this - * is too detailed information for users. We let the code flow just in - * case it can successfully finish the execution. - */ - ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("Adding wait event for node %s:%d failed. " - "The socket was: %d", - connection->hostname, - connection->port, sock))); - - FreeWaitEventSet(waitEventSet); - - return false; } CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, @@ -666,26 +609,32 @@ RemoteSocketClosed(MultiConnection *connection) CitusAddWaitEventSetToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); + List *closedConnectionList = NIL; bool retry = false; do { long timeout = 0; /* do not wait at all */ int eventCount = WaitEventSetWait(waitEventSet, timeout, events, - 3, WAIT_EVENT_CLIENT_READ); + eventSetSize, WAIT_EVENT_CLIENT_READ); - retry = ProcessWaitEventsForSocketClose(events, eventCount, &socketClosed); - if (socketClosed) - { - /* socket is closed, no further information needed */ - break; - } + retry = + ProcessWaitEventsForSocketClose(events, eventCount, + &closedConnectionList); } while (retry); - FreeWaitEventSet(waitEventSet); -#endif - return socketClosed; + List *healthyConnectionList = list_difference_ptr(connectionList, + closedConnectionList); + + MultiConnection *connection = NULL; + foreach_ptr(connection, closedConnectionList) + { + MarkRemoteTransactionFailed(connection, checkTransactionState); + CloseConnection(connection); + } + + return healthyConnectionList; } @@ -700,10 +649,9 @@ RemoteSocketClosed(MultiConnection *connection) * other events from being reported. Poll and check again. */ static bool -ProcessWaitEventsForSocketClose(WaitEvent *events, int eventCount, bool *socketClosed) +ProcessWaitEventsForSocketClose(WaitEvent *events, int eventCount, + List **closedConnectionList) { - *socketClosed = false; - for (int eventIndex = 0; eventIndex < eventCount; eventIndex++) { WaitEvent *event = &events[eventIndex]; @@ -715,16 +663,17 @@ ProcessWaitEventsForSocketClose(WaitEvent *events, int eventCount, bool *socketC if (event->events & WL_SOCKET_CLOSED) { - *socketClosed = true; + MultiConnection *connection = (MultiConnection *) event->user_data; - /* we found what we are searching for */ - return false; + *closedConnectionList = lappend(*closedConnectionList, connection); } if (event->events & WL_LATCH_SET) { ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + /* the caller should retry */ return true; }