Apply feedback

test_av
Onder Kalaci 2022-09-01 14:48:38 +03:00
parent 6dcc960665
commit dc0ee895fc
1 changed files with 61 additions and 112 deletions

View File

@ -63,7 +63,7 @@ static List * EnsureTxStateAndGetHealthyConnections(dlist_head *connections,
static bool RemoteSocketClosed(MultiConnection *connection); static bool RemoteSocketClosed(MultiConnection *connection);
#if PG_VERSION_NUM >= PG_VERSION_15 #if PG_VERSION_NUM >= PG_VERSION_15
static bool ProcessWaitEventsForSocketClose(WaitEvent *events, int eventCount, static bool ProcessWaitEventsForSocketClose(WaitEvent *events, int eventCount,
bool *socketClosed); List **closedConnectionList);
#endif #endif
static void ErrorIfMultipleMetadataConnectionExists(List *connections); static void ErrorIfMultipleMetadataConnectionExists(List *connections);
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
@ -549,116 +549,59 @@ static List *
EnsureTxStateAndGetHealthyConnections(dlist_head *connections, EnsureTxStateAndGetHealthyConnections(dlist_head *connections,
bool checkTransactionState) bool checkTransactionState)
{ {
List *healthyConnections = NIL; List *connectionList = NIL;
bool waitEventSetCanReportClosed = false;
#if PG_VERSION_NUM >= PG_VERSION_15
/* some kernel builds do not support WL_SOCKET_CLOSED */
waitEventSetCanReportClosed = WaitEventSetCanReportClosed();
#endif
dlist_iter iter; dlist_iter iter;
dlist_foreach(iter, connections) dlist_foreach(iter, connections)
{ {
MultiConnection *connection = MultiConnection *connection =
dlist_container(MultiConnection, connectionNode, iter.cur); dlist_container(MultiConnection, connectionNode, iter.cur);
RemoteTransaction *transaction = &connection->remoteTransaction; connectionList = lappend(connectionList, connection);
bool inRemoteTrasaction = }
transaction->transactionState != REMOTE_TRANS_NOT_STARTED;
if (checkTransactionState && inRemoteTrasaction) if (!waitEventSetCanReportClosed)
{ {
PGTransactionStatusType status = PQtransactionStatus(connection->pgConn); return connectionList;
}
/* if the connection is in a bad state, so is the transaction's state */ int eventSetSize = EventSetSizeForConnectionList(connectionList);
if (status == PQTRANS_INERROR || status == PQTRANS_UNKNOWN) WaitEvent *events = palloc(sizeof(WaitEvent) * eventSetSize);
{ WaitEventSet *waitEventSet =
transaction->transactionFailed = true; CreateWaitEventSet(CurrentMemoryContext, eventSetSize);
}
if (transaction->transactionFailed &&
(transaction->transactionCritical ||
!dlist_is_empty(&connection->referencedPlacements)))
{
/*
* If a connection is executing a critical transaction or accessed any
* placements, we should not skip this connection (or return as
* healthy connection).
*
* Critical transaction means that the caller -- or the initiator
* of the transaction -- cannot afford to handle any failures
* within the transaction, so better fail right now.
*
* If a placement is accessed inside a transaction and the
* transaction has failed, we cannot proceed. Otherwise,
* another connection in the same transaction might try
* to access the same placement over a different connection.
* That could cause self-deadlocks or break read-your-own-writes
* consistency.
*/
ReportConnectionError(connection, ERROR);
}
}
if (connection->pgConn == NULL || PQsocket(connection->pgConn) == -1 || dlist_foreach(iter, connections)
PQstatus(connection->pgConn) == CONNECTION_BAD || {
connection->connectionState == MULTI_CONNECTION_FAILED || MultiConnection *connection =
RemoteSocketClosed(connection)) dlist_container(MultiConnection, connectionNode, iter.cur);
int sock = PQsocket(connection->pgConn);
int waitEventSetIndex =
CitusAddWaitEventSetToSet(waitEventSet, WL_SOCKET_CLOSED, sock,
NULL, (void *) connection);
if (waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED)
{ {
/* /*
* If a connection is not usable and we do not need to throw an error, * Inform failed to add to wait event set with a debug message as this
* close at the end of the transaction. * is too detailed information for users. We let the code flow just in
* case it can successfully finish the execution.
*/ */
connection->forceCloseAtTransactionEnd = true; ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("Adding wait event for node %s:%d failed. "
"The socket was: %d",
connection->hostname,
connection->port, sock)));
} }
else
{
/* connection is healthy */
healthyConnections = lappend(healthyConnections, connection);
}
}
return healthyConnections;
}
/*
* RemoteSocketClosed returns true if the remote socket is closed.
*
* Note that we rely on WL_SOCKET_CLOSED, which is only available
* for PG 15+ and for kernels that support the feature.
*/
static bool
RemoteSocketClosed(MultiConnection *connection)
{
bool socketClosed = false;
#if PG_VERSION_NUM >= PG_VERSION_15
if (!WaitEventSetCanReportClosed())
{
/* the kernel build does not support WL_SOCKET_CLOSED */
return false;
}
WaitEvent events[3];
WaitEventSet *waitEventSet =
CreateWaitEventSet(CurrentMemoryContext, 3);
int sock = PQsocket(connection->pgConn);
int waitEventSetIndex =
CitusAddWaitEventSetToSet(waitEventSet, WL_SOCKET_CLOSED, sock,
NULL, (void *) connection);
if (waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED)
{
/*
* Inform failed to add to wait event set with a debug message as this
* is too detailed information for users. We let the code flow just in
* case it can successfully finish the execution.
*/
ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("Adding wait event for node %s:%d failed. "
"The socket was: %d",
connection->hostname,
connection->port, sock)));
FreeWaitEventSet(waitEventSet);
return false;
} }
CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL,
@ -666,26 +609,32 @@ RemoteSocketClosed(MultiConnection *connection)
CitusAddWaitEventSetToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, CitusAddWaitEventSetToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch,
NULL); NULL);
List *closedConnectionList = NIL;
bool retry = false; bool retry = false;
do { do {
long timeout = 0; /* do not wait at all */ long timeout = 0; /* do not wait at all */
int eventCount = int eventCount =
WaitEventSetWait(waitEventSet, timeout, events, WaitEventSetWait(waitEventSet, timeout, events,
3, WAIT_EVENT_CLIENT_READ); eventSetSize, WAIT_EVENT_CLIENT_READ);
retry = ProcessWaitEventsForSocketClose(events, eventCount, &socketClosed); retry =
if (socketClosed) ProcessWaitEventsForSocketClose(events, eventCount,
{ &closedConnectionList);
/* socket is closed, no further information needed */
break;
}
} while (retry); } while (retry);
FreeWaitEventSet(waitEventSet); FreeWaitEventSet(waitEventSet);
#endif
return socketClosed; List *healthyConnectionList = list_difference_ptr(connectionList,
closedConnectionList);
MultiConnection *connection = NULL;
foreach_ptr(connection, closedConnectionList)
{
MarkRemoteTransactionFailed(connection, checkTransactionState);
CloseConnection(connection);
}
return healthyConnectionList;
} }
@ -700,10 +649,9 @@ RemoteSocketClosed(MultiConnection *connection)
* other events from being reported. Poll and check again. * other events from being reported. Poll and check again.
*/ */
static bool static bool
ProcessWaitEventsForSocketClose(WaitEvent *events, int eventCount, bool *socketClosed) ProcessWaitEventsForSocketClose(WaitEvent *events, int eventCount,
List **closedConnectionList)
{ {
*socketClosed = false;
for (int eventIndex = 0; eventIndex < eventCount; eventIndex++) for (int eventIndex = 0; eventIndex < eventCount; eventIndex++)
{ {
WaitEvent *event = &events[eventIndex]; WaitEvent *event = &events[eventIndex];
@ -715,16 +663,17 @@ ProcessWaitEventsForSocketClose(WaitEvent *events, int eventCount, bool *socketC
if (event->events & WL_SOCKET_CLOSED) if (event->events & WL_SOCKET_CLOSED)
{ {
*socketClosed = true; MultiConnection *connection = (MultiConnection *) event->user_data;
/* we found what we are searching for */ *closedConnectionList = lappend(*closedConnectionList, connection);
return false;
} }
if (event->events & WL_LATCH_SET) if (event->events & WL_LATCH_SET)
{ {
ResetLatch(MyLatch); ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* the caller should retry */ /* the caller should retry */
return true; return true;
} }