mirror of https://github.com/citusdata/citus.git
Allow outside tx connections freely
parent
25d93bd69a
commit
376797be5d
|
@ -58,7 +58,7 @@ static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
|
||||||
static void StartConnectionEstablishment(MultiConnection *connectionn,
|
static void StartConnectionEstablishment(MultiConnection *connectionn,
|
||||||
ConnectionHashKey *key);
|
ConnectionHashKey *key);
|
||||||
static MultiConnection * FindAvailableConnection(List *connections, uint32 flags);
|
static MultiConnection * FindAvailableConnection(List *connections, uint32 flags);
|
||||||
static List * EnsureAndGetHealthyConnections(dlist_head *connections);
|
static List * EnsureAndGetHealthyConnections(dlist_head *connections, uint32 flags);
|
||||||
static bool RemoteSocketClosed(MultiConnection *connection);
|
static bool RemoteSocketClosed(MultiConnection *connection);
|
||||||
static void ErrorIfMultipleMetadataConnectionExists(List *connections);
|
static void ErrorIfMultipleMetadataConnectionExists(List *connections);
|
||||||
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
|
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
|
||||||
|
@ -335,7 +335,8 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
/* if desired, check whether there's a usable connection */
|
/* if desired, check whether there's a usable connection */
|
||||||
if (!(flags & FORCE_NEW_CONNECTION))
|
if (!(flags & FORCE_NEW_CONNECTION))
|
||||||
{
|
{
|
||||||
List *healthyConnections = EnsureAndGetHealthyConnections(entry->connections);
|
List *healthyConnections = EnsureAndGetHealthyConnections(entry->connections,
|
||||||
|
flags);
|
||||||
|
|
||||||
/* check connection cache for a connection that's not already in use */
|
/* check connection cache for a connection that's not already in use */
|
||||||
MultiConnection *connection = FindAvailableConnection(healthyConnections, flags);
|
MultiConnection *connection = FindAvailableConnection(healthyConnections, flags);
|
||||||
|
@ -538,7 +539,7 @@ FindAvailableConnection(List *connections, uint32 flags)
|
||||||
* that can be reused.
|
* that can be reused.
|
||||||
*/
|
*/
|
||||||
static List *
|
static List *
|
||||||
EnsureAndGetHealthyConnections(dlist_head *connections)
|
EnsureAndGetHealthyConnections(dlist_head *connections, uint32 flags)
|
||||||
{
|
{
|
||||||
List *healthyConnections = NIL;
|
List *healthyConnections = NIL;
|
||||||
|
|
||||||
|
@ -577,19 +578,17 @@ EnsureAndGetHealthyConnections(dlist_head *connections)
|
||||||
{
|
{
|
||||||
healthyConnections = lappend(healthyConnections, connection);
|
healthyConnections = lappend(healthyConnections, connection);
|
||||||
}
|
}
|
||||||
else if (inRemoteTrasaction &&
|
else if ((inRemoteTrasaction && !(flags & OUTSIDE_TRANSACTION)) &&
|
||||||
(transaction->transactionCritical ||
|
(transaction->transactionCritical ||
|
||||||
!dlist_is_empty(&connection->referencedPlacements)))
|
!dlist_is_empty(&connection->referencedPlacements)))
|
||||||
{
|
{
|
||||||
CHECK_FOR_INTERRUPTS();
|
|
||||||
|
|
||||||
ReportConnectionError(connection, ERROR);
|
ReportConnectionError(connection, ERROR);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If a connection is executing a critical transaction or accessed any
|
* If a connection is executing a critical transaction or accessed any
|
||||||
* placements, we should not continue the execution.
|
* placements, we should not continue the execution.
|
||||||
*/
|
*/
|
||||||
ereport(ERROR, (errmsg("failure on connection marked as essential: %s:%d",
|
ereport(WARNING, (errmsg("failure on connection marked as essential: %s:%d",
|
||||||
connection->hostname, connection->port)));
|
connection->hostname, connection->port)));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -733,7 +732,7 @@ ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName,
|
||||||
}
|
}
|
||||||
|
|
||||||
int flags = 0;
|
int flags = 0;
|
||||||
List *healthyConnections = EnsureAndGetHealthyConnections(entry->connections);
|
List *healthyConnections = EnsureAndGetHealthyConnections(entry->connections, flags);
|
||||||
|
|
||||||
MultiConnection *connection = FindAvailableConnection(healthyConnections, flags);
|
MultiConnection *connection = FindAvailableConnection(healthyConnections, flags);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue