mirror of https://github.com/citusdata/citus.git
better style triall
parent
d377dce852
commit
f1d347d762
|
@ -58,7 +58,8 @@ static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
|
||||||
static void StartConnectionEstablishment(MultiConnection *connectionn,
|
static void StartConnectionEstablishment(MultiConnection *connectionn,
|
||||||
ConnectionHashKey *key);
|
ConnectionHashKey *key);
|
||||||
static MultiConnection * FindAvailableConnection(List *connections, uint32 flags);
|
static MultiConnection * FindAvailableConnection(List *connections, uint32 flags);
|
||||||
static List * EnsureAndGetHealthyConnections(dlist_head *connections, uint32 flags);
|
static List * EnsureAndGetHealthyConnections(dlist_head *connections, bool
|
||||||
|
checkTransactionState);
|
||||||
static bool RemoteSocketClosed(MultiConnection *connection);
|
static bool RemoteSocketClosed(MultiConnection *connection);
|
||||||
static void ErrorIfMultipleMetadataConnectionExists(List *connections);
|
static void ErrorIfMultipleMetadataConnectionExists(List *connections);
|
||||||
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
|
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
|
||||||
|
@ -335,8 +336,9 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
/* if desired, check whether there's a usable connection */
|
/* if desired, check whether there's a usable connection */
|
||||||
if (!(flags & FORCE_NEW_CONNECTION))
|
if (!(flags & FORCE_NEW_CONNECTION))
|
||||||
{
|
{
|
||||||
List *healthyConnections = EnsureAndGetHealthyConnections(entry->connections,
|
bool checkTransactionState = !(flags & OUTSIDE_TRANSACTION);
|
||||||
flags);
|
List *healthyConnections =
|
||||||
|
EnsureAndGetHealthyConnections(entry->connections, checkTransactionState);
|
||||||
|
|
||||||
/* check connection cache for a connection that's not already in use */
|
/* check connection cache for a connection that's not already in use */
|
||||||
MultiConnection *connection = FindAvailableConnection(healthyConnections, flags);
|
MultiConnection *connection = FindAvailableConnection(healthyConnections, flags);
|
||||||
|
@ -531,7 +533,7 @@ FindAvailableConnection(List *connections, uint32 flags)
|
||||||
* EnsureAndGetHealthyConnections is a helper function that goes over the
|
* EnsureAndGetHealthyConnections is a helper function that goes over the
|
||||||
* input connections, and:
|
* input connections, and:
|
||||||
* - Errors for the connections that are not healthy but still part of a
|
* - Errors for the connections that are not healthy but still part of a
|
||||||
* remote transaction
|
* remote transaction when checkTransactionState=true
|
||||||
* - Skips connections that are not healthy, marks them to be closed
|
* - Skips connections that are not healthy, marks them to be closed
|
||||||
* at the end of the transaction such that they do not linger around
|
* at the end of the transaction such that they do not linger around
|
||||||
*
|
*
|
||||||
|
@ -539,7 +541,7 @@ FindAvailableConnection(List *connections, uint32 flags)
|
||||||
* that can be reused.
|
* that can be reused.
|
||||||
*/
|
*/
|
||||||
static List *
|
static List *
|
||||||
EnsureAndGetHealthyConnections(dlist_head *connections, uint32 flags)
|
EnsureAndGetHealthyConnections(dlist_head *connections, bool checkTransactionState)
|
||||||
{
|
{
|
||||||
List *healthyConnections = NIL;
|
List *healthyConnections = NIL;
|
||||||
|
|
||||||
|
@ -563,7 +565,7 @@ EnsureAndGetHealthyConnections(dlist_head *connections, uint32 flags)
|
||||||
bool inRemoteTrasaction =
|
bool inRemoteTrasaction =
|
||||||
transaction->transactionState != REMOTE_TRANS_NOT_STARTED;
|
transaction->transactionState != REMOTE_TRANS_NOT_STARTED;
|
||||||
|
|
||||||
if (inRemoteTrasaction)
|
if (inRemoteTrasaction && checkTransactionState)
|
||||||
{
|
{
|
||||||
PGTransactionStatusType status = PQtransactionStatus(connection->pgConn);
|
PGTransactionStatusType status = PQtransactionStatus(connection->pgConn);
|
||||||
|
|
||||||
|
@ -578,18 +580,15 @@ EnsureAndGetHealthyConnections(dlist_head *connections, uint32 flags)
|
||||||
{
|
{
|
||||||
healthyConnections = lappend(healthyConnections, connection);
|
healthyConnections = lappend(healthyConnections, connection);
|
||||||
}
|
}
|
||||||
else if ((inRemoteTrasaction && !(flags & OUTSIDE_TRANSACTION)) &&
|
else if (inRemoteTrasaction && checkTransactionState &&
|
||||||
(transaction->transactionCritical ||
|
(transaction->transactionCritical ||
|
||||||
!dlist_is_empty(&connection->referencedPlacements)))
|
!dlist_is_empty(&connection->referencedPlacements)))
|
||||||
{
|
{
|
||||||
ReportConnectionError(connection, ERROR);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If a connection is executing a critical transaction or accessed any
|
* If a connection is executing a critical transaction or accessed any
|
||||||
* placements, we should not continue the execution.
|
* placements, we should not continue the execution.
|
||||||
*/
|
*/
|
||||||
ereport(WARNING, (errmsg("failure on connection marked as essential: %s:%d",
|
ReportConnectionError(connection, ERROR);
|
||||||
connection->hostname, connection->port)));
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -731,9 +730,11 @@ ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int flags = 0;
|
bool checkTransactionState = true;
|
||||||
List *healthyConnections = EnsureAndGetHealthyConnections(entry->connections, flags);
|
List *healthyConnections =
|
||||||
|
EnsureAndGetHealthyConnections(entry->connections, checkTransactionState);
|
||||||
|
|
||||||
|
int flags = 0;
|
||||||
MultiConnection *connection = FindAvailableConnection(healthyConnections, flags);
|
MultiConnection *connection = FindAvailableConnection(healthyConnections, flags);
|
||||||
|
|
||||||
return connection;
|
return connection;
|
||||||
|
|
Loading…
Reference in New Issue