From 3e6180367b06f6ad79169b2ebcbbb12a4c392d1e Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 8 Apr 2020 14:30:28 +0200 Subject: [PATCH] Do not wait by default, only executor waits --- src/backend/distributed/commands/multi_copy.c | 9 ------- .../connection/connection_management.c | 24 +++++++++---------- .../connection/shared_connection_stats.c | 7 +++--- .../distributed/executor/adaptive_executor.c | 11 ++++----- .../distributed/connection_management.h | 2 +- .../distributed/shared_connection_stats.h | 2 +- 6 files changed, 22 insertions(+), 33 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 926528688..7724d2018 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -3425,15 +3425,6 @@ CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure) MultiShardConnectionType != SEQUENTIAL_CONNECTION) { connectionFlags |= CONNECTION_PER_PLACEMENT; - - /* - * Via connection throttling, the connection establishments may be suspended - * until a connection slot is empty to the remote host. When forced to use - * one connection per placement, do not enforce this restriction as it could - * deadlock against concurrent operation where each operation is blocked on - * waiting for others. - */ - connectionFlags |= NEVER_WAIT_FOR_CONNECTION; } connection = GetPlacementConnection(connectionFlags, placement, nodeUser); diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index d76a76d1c..71af71de5 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -319,18 +319,9 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, } - if (flags & NEVER_WAIT_FOR_CONNECTION) + if (flags & WAIT_FOR_CONNECTION) { - /* - * The caller doesn't want the connection manager to wait - * until a connection slot is available on the remote node. - * In the end, we might fail to establish connection to the - * remote node as it might not have any space in - * max_connections for this connection establishment. - * - * Still, we keep track of the connnection counter. - */ - IncrementSharedConnectionCounter(hostname, port); + WaitLoopForSharedConnection(hostname, port); } else if (flags & OPTIONAL_CONNECTION) { @@ -347,7 +338,16 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, } else { - WaitOrErrorForSharedConnection(hostname, port); + /* + * The caller doesn't want the connection manager to wait + * until a connection slot is available on the remote node. + * In the end, we might fail to establish connection to the + * remote node as it might not have any space in + * max_connections for this connection establishment. + * + * Still, we keep track of the connnection counter. + */ + IncrementSharedConnectionCounter(hostname, port); } /* diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index e530f3cad..af4bac71f 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -263,15 +263,14 @@ GetMaxSharedPoolSize(void) /* - * WaitOrErrorForSharedConnection tries to increment the shared connection + * WaitLoopForSharedConnection tries to increment the shared connection * counter for the given hostname/port and the current database in * SharedConnStatsHash. * - * The function implements a retry mechanism. If the function cannot increment - * the counter withing the specificed amount of the time, it throws an error. + * The function implements a retry mechanism via a condition variable. */ void -WaitOrErrorForSharedConnection(const char *hostname, int port) +WaitLoopForSharedConnection(const char *hostname, int port) { while (!TryToIncrementSharedConnectionCounter(hostname, port)) { diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index d48a5b8c9..3bf30e572 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -2395,16 +2395,15 @@ ManageWorkerPool(WorkerPool *workerPool) */ connectionFlags |= OPTIONAL_CONNECTION; } - else if (UseConnectionPerPlacement()) + else if (!UseConnectionPerPlacement()) { /* * Via connection throttling, the connection establishments may be suspended - * until a connection slot is empty to the remote host. When forced to use - * one connection per placement, do not enforce this restriction as it could - * deadlock against concurrent operation where each operation is blocked on - * waiting for others. + * until a connection slot is empty to the remote host. Adaptive executor can + * always finish the execution with a single connection, so wait until we get + * one connection. */ - connectionFlags |= NEVER_WAIT_FOR_CONNECTION; + connectionFlags |= WAIT_FOR_CONNECTION; } /* open a new connection to the worker */ diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index dc32b7e9d..2255063b7 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -67,7 +67,7 @@ enum MultiConnectionMode * until a connection slot is empty to the remote host. When this flag is passed, * the connection manager skips waiting. */ - NEVER_WAIT_FOR_CONNECTION = 1 << 6 + WAIT_FOR_CONNECTION = 1 << 6 }; diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 8ff2c64f4..93368e28e 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -21,7 +21,7 @@ extern void WakeupWaiterBackendsForSharedConnection(void); extern void RemoveInactiveNodesFromSharedConnections(void); extern int GetMaxSharedPoolSize(void); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); -extern void WaitOrErrorForSharedConnection(const char *hostname, int port); +extern void WaitLoopForSharedConnection(const char *hostname, int port); extern void DecrementSharedConnectionCounter(const char *hostname, int port); extern void IncrementSharedConnectionCounter(const char *hostname, int port);