mirror of https://github.com/citusdata/citus.git
Error early
parent
65816467f2
commit
c281a819e5
|
@ -57,8 +57,10 @@ static uint32 ConnectionHashHash(const void *key, Size keysize);
|
||||||
static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
|
static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
|
||||||
static void StartConnectionEstablishment(MultiConnection *connectionn,
|
static void StartConnectionEstablishment(MultiConnection *connectionn,
|
||||||
ConnectionHashKey *key);
|
ConnectionHashKey *key);
|
||||||
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
|
static MultiConnection * FindAvailableConnection(List *connections, uint32 flags);
|
||||||
static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections);
|
static List * EnsureAndGetHealthyConnections(dlist_head *connections);
|
||||||
|
static bool RemoteSocketClosed(MultiConnection *connection);
|
||||||
|
static void ErrorIfMultipleMetadataConnectionExists(List *connections);
|
||||||
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
|
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
|
||||||
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
|
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
|
||||||
static bool ShouldShutdownConnection(MultiConnection *connection, const int
|
static bool ShouldShutdownConnection(MultiConnection *connection, const int
|
||||||
|
@ -66,7 +68,6 @@ static bool ShouldShutdownConnection(MultiConnection *connection, const int
|
||||||
static void ResetConnection(MultiConnection *connection);
|
static void ResetConnection(MultiConnection *connection);
|
||||||
static bool RemoteTransactionIdle(MultiConnection *connection);
|
static bool RemoteTransactionIdle(MultiConnection *connection);
|
||||||
static int EventSetSizeForConnectionList(List *connections);
|
static int EventSetSizeForConnectionList(List *connections);
|
||||||
static bool RemoteConnectionClosed(MultiConnection *connection);
|
|
||||||
|
|
||||||
/* types for async connection management */
|
/* types for async connection management */
|
||||||
enum MultiConnectionPhase
|
enum MultiConnectionPhase
|
||||||
|
@ -334,8 +335,10 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
/* if desired, check whether there's a usable connection */
|
/* if desired, check whether there's a usable connection */
|
||||||
if (!(flags & FORCE_NEW_CONNECTION))
|
if (!(flags & FORCE_NEW_CONNECTION))
|
||||||
{
|
{
|
||||||
|
List *healthyConnections = EnsureAndGetHealthyConnections(entry->connections);
|
||||||
|
|
||||||
/* check connection cache for a connection that's not already in use */
|
/* check connection cache for a connection that's not already in use */
|
||||||
MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
|
MultiConnection *connection = FindAvailableConnection(healthyConnections, flags);
|
||||||
if (connection)
|
if (connection)
|
||||||
{
|
{
|
||||||
return connection;
|
return connection;
|
||||||
|
@ -427,16 +430,13 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
* If no connection is available, FindAvailableConnection returns NULL.
|
* If no connection is available, FindAvailableConnection returns NULL.
|
||||||
*/
|
*/
|
||||||
static MultiConnection *
|
static MultiConnection *
|
||||||
FindAvailableConnection(dlist_head *connections, uint32 flags)
|
FindAvailableConnection(List *connections, uint32 flags)
|
||||||
{
|
{
|
||||||
List *metadataConnectionCandidateList = NIL;
|
List *metadataConnectionCandidateList = NIL;
|
||||||
|
|
||||||
dlist_iter iter;
|
MultiConnection *connection = NULL;
|
||||||
dlist_foreach(iter, connections)
|
foreach_ptr(connection, connections)
|
||||||
{
|
{
|
||||||
MultiConnection *connection =
|
|
||||||
dlist_container(MultiConnection, connectionNode, iter.cur);
|
|
||||||
|
|
||||||
if (flags & OUTSIDE_TRANSACTION)
|
if (flags & OUTSIDE_TRANSACTION)
|
||||||
{
|
{
|
||||||
/* don't return connections that are used in transactions */
|
/* don't return connections that are used in transactions */
|
||||||
|
@ -492,11 +492,6 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (RemoteConnectionClosed(connection))
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -531,8 +526,92 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* EnsureAndGetHealthyConnections is a helper function that goes over the
|
||||||
|
* input connections, and:
|
||||||
|
* - Errors for the connections that are not healthy but still part of a
|
||||||
|
* remote transaction
|
||||||
|
* - Skips connections that are not healthy, marks them to be closed
|
||||||
|
* at the end of the transaction such that they do not linger around
|
||||||
|
*
|
||||||
|
* And, finally returns the list (unless already errored out) of connections
|
||||||
|
* that can be reused.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
EnsureAndGetHealthyConnections(dlist_head *connections)
|
||||||
|
{
|
||||||
|
List *healthyConnections = NIL;
|
||||||
|
|
||||||
|
dlist_iter iter;
|
||||||
|
dlist_foreach(iter, connections)
|
||||||
|
{
|
||||||
|
MultiConnection *connection =
|
||||||
|
dlist_container(MultiConnection, connectionNode, iter.cur);
|
||||||
|
bool healthyConnection = true;
|
||||||
|
|
||||||
|
if (connection->pgConn == NULL || PQsocket(connection->pgConn) == -1 ||
|
||||||
|
PQstatus(connection->pgConn) != CONNECTION_OK ||
|
||||||
|
connection->connectionState != MULTI_CONNECTION_CONNECTED ||
|
||||||
|
RemoteSocketClosed(connection))
|
||||||
|
{
|
||||||
|
/* connection is not healthy */
|
||||||
|
healthyConnection = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
bool inRemoteTrasaction =
|
||||||
|
transaction->transactionState != REMOTE_TRANS_NOT_STARTED;
|
||||||
|
|
||||||
|
if (inRemoteTrasaction)
|
||||||
|
{
|
||||||
|
PGTransactionStatusType status = PQtransactionStatus(connection->pgConn);
|
||||||
|
|
||||||
|
/* if the connection is in a bad state, so is the transaction's state */
|
||||||
|
if (status == PQTRANS_INERROR || status == PQTRANS_UNKNOWN)
|
||||||
|
{
|
||||||
|
transaction->transactionFailed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (healthyConnection)
|
||||||
|
{
|
||||||
|
healthyConnections = lappend(healthyConnections, connection);
|
||||||
|
}
|
||||||
|
else if (inRemoteTrasaction &&
|
||||||
|
(transaction->transactionCritical ||
|
||||||
|
!dlist_is_empty(&connection->referencedPlacements)))
|
||||||
|
{
|
||||||
|
ReportConnectionError(connection, ERROR);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If a connection is executing a critical transaction or accessed any
|
||||||
|
* placements, we should not continue the execution.
|
||||||
|
*/
|
||||||
|
ereport(ERROR, (errmsg("failure on connection marked as essential: %s:%d",
|
||||||
|
connection->hostname, connection->port)));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If a connection is not usable and we do not need to throw an error,
|
||||||
|
* close at the end of the transaction.
|
||||||
|
*/
|
||||||
|
connection->forceCloseAtTransactionEnd = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return healthyConnections;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoteSocketClosed returns true if the remote socket is closed.
|
||||||
|
*
|
||||||
|
* Note that we rely on WL_SOCKET_CLOSED, which is only available
|
||||||
|
* for PG 15+ and for kernels that support the feature.
|
||||||
|
*/
|
||||||
static bool
|
static bool
|
||||||
RemoteConnectionClosed(MultiConnection *connection)
|
RemoteSocketClosed(MultiConnection *connection)
|
||||||
{
|
{
|
||||||
bool socketClosed = false;
|
bool socketClosed = false;
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||||
|
@ -569,15 +648,12 @@ RemoteConnectionClosed(MultiConnection *connection)
|
||||||
* input connection dlist contains more than one metadata connections.
|
* input connection dlist contains more than one metadata connections.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
ErrorIfMultipleMetadataConnectionExists(dlist_head *connections)
|
ErrorIfMultipleMetadataConnectionExists(List *connections)
|
||||||
{
|
{
|
||||||
bool foundMetadataConnection = false;
|
bool foundMetadataConnection = false;
|
||||||
dlist_iter iter;
|
MultiConnection *connection = NULL;
|
||||||
dlist_foreach(iter, connections)
|
foreach_ptr(connection, connections)
|
||||||
{
|
{
|
||||||
MultiConnection *connection =
|
|
||||||
dlist_container(MultiConnection, connectionNode, iter.cur);
|
|
||||||
|
|
||||||
if (connection->useForMetadataOperations)
|
if (connection->useForMetadataOperations)
|
||||||
{
|
{
|
||||||
if (foundMetadataConnection)
|
if (foundMetadataConnection)
|
||||||
|
@ -655,7 +731,9 @@ ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName,
|
||||||
}
|
}
|
||||||
|
|
||||||
int flags = 0;
|
int flags = 0;
|
||||||
MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
|
List *healthyConnections = EnsureAndGetHealthyConnections(entry->connections);
|
||||||
|
|
||||||
|
MultiConnection *connection = FindAvailableConnection(healthyConnections, flags);
|
||||||
|
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
|
@ -1517,7 +1595,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
|
||||||
(MaxCachedConnectionLifetime >= 0 &&
|
(MaxCachedConnectionLifetime >= 0 &&
|
||||||
MillisecondsToTimeout(connection->connectionEstablishmentStart,
|
MillisecondsToTimeout(connection->connectionEstablishmentStart,
|
||||||
MaxCachedConnectionLifetime) <= 0) ||
|
MaxCachedConnectionLifetime) <= 0) ||
|
||||||
RemoteConnectionClosed(connection);
|
RemoteSocketClosed(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue