mirror of https://github.com/citusdata/citus.git
Do not wait by default, only executor waits
parent
24ba897b17
commit
3e6180367b
|
@ -3425,15 +3425,6 @@ CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure)
|
||||||
MultiShardConnectionType != SEQUENTIAL_CONNECTION)
|
MultiShardConnectionType != SEQUENTIAL_CONNECTION)
|
||||||
{
|
{
|
||||||
connectionFlags |= CONNECTION_PER_PLACEMENT;
|
connectionFlags |= CONNECTION_PER_PLACEMENT;
|
||||||
|
|
||||||
/*
|
|
||||||
* Via connection throttling, the connection establishments may be suspended
|
|
||||||
* until a connection slot is empty to the remote host. When forced to use
|
|
||||||
* one connection per placement, do not enforce this restriction as it could
|
|
||||||
* deadlock against concurrent operation where each operation is blocked on
|
|
||||||
* waiting for others.
|
|
||||||
*/
|
|
||||||
connectionFlags |= NEVER_WAIT_FOR_CONNECTION;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
connection = GetPlacementConnection(connectionFlags, placement, nodeUser);
|
connection = GetPlacementConnection(connectionFlags, placement, nodeUser);
|
||||||
|
|
|
@ -319,18 +319,9 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (flags & NEVER_WAIT_FOR_CONNECTION)
|
if (flags & WAIT_FOR_CONNECTION)
|
||||||
{
|
{
|
||||||
/*
|
WaitLoopForSharedConnection(hostname, port);
|
||||||
* The caller doesn't want the connection manager to wait
|
|
||||||
* until a connection slot is available on the remote node.
|
|
||||||
* In the end, we might fail to establish connection to the
|
|
||||||
* remote node as it might not have any space in
|
|
||||||
* max_connections for this connection establishment.
|
|
||||||
*
|
|
||||||
* Still, we keep track of the connnection counter.
|
|
||||||
*/
|
|
||||||
IncrementSharedConnectionCounter(hostname, port);
|
|
||||||
}
|
}
|
||||||
else if (flags & OPTIONAL_CONNECTION)
|
else if (flags & OPTIONAL_CONNECTION)
|
||||||
{
|
{
|
||||||
|
@ -347,7 +338,16 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
WaitOrErrorForSharedConnection(hostname, port);
|
/*
|
||||||
|
* The caller doesn't want the connection manager to wait
|
||||||
|
* until a connection slot is available on the remote node.
|
||||||
|
* In the end, we might fail to establish connection to the
|
||||||
|
* remote node as it might not have any space in
|
||||||
|
* max_connections for this connection establishment.
|
||||||
|
*
|
||||||
|
* Still, we keep track of the connnection counter.
|
||||||
|
*/
|
||||||
|
IncrementSharedConnectionCounter(hostname, port);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -263,15 +263,14 @@ GetMaxSharedPoolSize(void)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WaitOrErrorForSharedConnection tries to increment the shared connection
|
* WaitLoopForSharedConnection tries to increment the shared connection
|
||||||
* counter for the given hostname/port and the current database in
|
* counter for the given hostname/port and the current database in
|
||||||
* SharedConnStatsHash.
|
* SharedConnStatsHash.
|
||||||
*
|
*
|
||||||
* The function implements a retry mechanism. If the function cannot increment
|
* The function implements a retry mechanism via a condition variable.
|
||||||
* the counter withing the specificed amount of the time, it throws an error.
|
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
WaitOrErrorForSharedConnection(const char *hostname, int port)
|
WaitLoopForSharedConnection(const char *hostname, int port)
|
||||||
{
|
{
|
||||||
while (!TryToIncrementSharedConnectionCounter(hostname, port))
|
while (!TryToIncrementSharedConnectionCounter(hostname, port))
|
||||||
{
|
{
|
||||||
|
|
|
@ -2395,16 +2395,15 @@ ManageWorkerPool(WorkerPool *workerPool)
|
||||||
*/
|
*/
|
||||||
connectionFlags |= OPTIONAL_CONNECTION;
|
connectionFlags |= OPTIONAL_CONNECTION;
|
||||||
}
|
}
|
||||||
else if (UseConnectionPerPlacement())
|
else if (!UseConnectionPerPlacement())
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Via connection throttling, the connection establishments may be suspended
|
* Via connection throttling, the connection establishments may be suspended
|
||||||
* until a connection slot is empty to the remote host. When forced to use
|
* until a connection slot is empty to the remote host. Adaptive executor can
|
||||||
* one connection per placement, do not enforce this restriction as it could
|
* always finish the execution with a single connection, so wait until we get
|
||||||
* deadlock against concurrent operation where each operation is blocked on
|
* one connection.
|
||||||
* waiting for others.
|
|
||||||
*/
|
*/
|
||||||
connectionFlags |= NEVER_WAIT_FOR_CONNECTION;
|
connectionFlags |= WAIT_FOR_CONNECTION;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* open a new connection to the worker */
|
/* open a new connection to the worker */
|
||||||
|
|
|
@ -67,7 +67,7 @@ enum MultiConnectionMode
|
||||||
* until a connection slot is empty to the remote host. When this flag is passed,
|
* until a connection slot is empty to the remote host. When this flag is passed,
|
||||||
* the connection manager skips waiting.
|
* the connection manager skips waiting.
|
||||||
*/
|
*/
|
||||||
NEVER_WAIT_FOR_CONNECTION = 1 << 6
|
WAIT_FOR_CONNECTION = 1 << 6
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,7 @@ extern void WakeupWaiterBackendsForSharedConnection(void);
|
||||||
extern void RemoveInactiveNodesFromSharedConnections(void);
|
extern void RemoveInactiveNodesFromSharedConnections(void);
|
||||||
extern int GetMaxSharedPoolSize(void);
|
extern int GetMaxSharedPoolSize(void);
|
||||||
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
|
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
|
||||||
extern void WaitOrErrorForSharedConnection(const char *hostname, int port);
|
extern void WaitLoopForSharedConnection(const char *hostname, int port);
|
||||||
extern void DecrementSharedConnectionCounter(const char *hostname, int port);
|
extern void DecrementSharedConnectionCounter(const char *hostname, int port);
|
||||||
extern void IncrementSharedConnectionCounter(const char *hostname, int port);
|
extern void IncrementSharedConnectionCounter(const char *hostname, int port);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue