mirror of https://github.com/citusdata/citus.git
Add a single connection retry in MULTI_CONNECTION_LOST
parent
9e2b96caa5
commit
8f560b6031
|
@ -1482,6 +1482,30 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RestartConnection starts a new connection attempt for the given
|
||||||
|
* MultiConnection.
|
||||||
|
*
|
||||||
|
* We assume that we already went through all the other initialization steps in
|
||||||
|
* StartNodeUserDatabaseConnection, such as incrementing shared connection
|
||||||
|
* counters.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
RestartConnection(MultiConnection *connection)
|
||||||
|
{
|
||||||
|
ShutdownConnection(connection);
|
||||||
|
|
||||||
|
ConnectionHashKey key;
|
||||||
|
strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH);
|
||||||
|
key.port = connection->port;
|
||||||
|
strlcpy(key.user, connection->user, NAMEDATALEN);
|
||||||
|
strlcpy(key.database, connection->database, NAMEDATALEN);
|
||||||
|
key.replicationConnParam = connection->requiresReplication;
|
||||||
|
|
||||||
|
StartConnectionEstablishment(connection, &key);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ResetConnection preserves the given connection for later usage by
|
* ResetConnection preserves the given connection for later usage by
|
||||||
* resetting its states.
|
* resetting its states.
|
||||||
|
|
|
@ -476,6 +476,9 @@ typedef struct WorkerSession
|
||||||
|
|
||||||
/* events reported by the latest call to WaitEventSetWait */
|
/* events reported by the latest call to WaitEventSetWait */
|
||||||
int latestUnconsumedWaitEvents;
|
int latestUnconsumedWaitEvents;
|
||||||
|
|
||||||
|
/* number of times we retried establishing the connection */
|
||||||
|
int connectionRetryCount;
|
||||||
} WorkerSession;
|
} WorkerSession;
|
||||||
|
|
||||||
|
|
||||||
|
@ -3656,6 +3659,26 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
workerPool->idleConnectionCount--;
|
workerPool->idleConnectionCount--;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* TODO: refine this check */
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
if (!transaction->transactionCritical &&
|
||||||
|
session->connectionRetryCount == 0)
|
||||||
|
{
|
||||||
|
session->connectionRetryCount++;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Try to connect again, we will reuse the same MultiConnection
|
||||||
|
* and keep it as claimed.
|
||||||
|
*/
|
||||||
|
RestartConnection(connection);
|
||||||
|
|
||||||
|
/* socket will have changed */
|
||||||
|
execution->rebuildWaitEventSet = true;
|
||||||
|
|
||||||
|
connection->connectionState = MULTI_CONNECTION_INITIAL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -4164,7 +4187,11 @@ UpdateConnectionWaitFlags(WorkerSession *session, int waitFlags)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||||
|
connection->waitFlags = waitFlags | WL_SOCKET_CLOSED;
|
||||||
|
#else
|
||||||
connection->waitFlags = waitFlags;
|
connection->waitFlags = waitFlags;
|
||||||
|
#endif
|
||||||
|
|
||||||
/* without signalling the execution, the flag changes won't be reflected */
|
/* without signalling the execution, the flag changes won't be reflected */
|
||||||
execution->waitFlagsChanged = true;
|
execution->waitFlagsChanged = true;
|
||||||
|
@ -4202,6 +4229,14 @@ CheckConnectionReady(WorkerSession *session)
|
||||||
waitFlags = waitFlags | WL_SOCKET_WRITEABLE;
|
waitFlags = waitFlags | WL_SOCKET_WRITEABLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||||
|
if ((session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED) != 0)
|
||||||
|
{
|
||||||
|
connection->connectionState = MULTI_CONNECTION_LOST;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if ((session->latestUnconsumedWaitEvents & WL_SOCKET_READABLE) != 0)
|
if ((session->latestUnconsumedWaitEvents & WL_SOCKET_READABLE) != 0)
|
||||||
{
|
{
|
||||||
if (PQconsumeInput(connection->pgConn) == 0)
|
if (PQconsumeInput(connection->pgConn) == 0)
|
||||||
|
|
|
@ -294,6 +294,7 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
|
||||||
int32 port,
|
int32 port,
|
||||||
const char *user,
|
const char *user,
|
||||||
const char *database);
|
const char *database);
|
||||||
|
extern void RestartConnection(MultiConnection *connection);
|
||||||
extern void CloseAllConnectionsAfterTransaction(void);
|
extern void CloseAllConnectionsAfterTransaction(void);
|
||||||
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
|
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
|
||||||
extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort,
|
extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort,
|
||||||
|
|
Loading…
Reference in New Issue