diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index f09ee9b1c..632891c7f 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -25,6 +25,7 @@ #include "distributed/connection_management.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" #include "distributed/shared_connection_stats.h" #include "distributed/time_constants.h" #include "distributed/tuplestore.h" @@ -104,6 +105,7 @@ static void LockConnectionSharedMemory(LWLockMode lockMode); static void UnLockConnectionSharedMemory(void); static void SharedConnectionStatsShmemInit(void); static size_t SharedConnectionStatsShmemSize(void); +static bool ShouldWaitForConnection(int currentConnectionCount); static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); static uint32 SharedConnectionHashHash(const void *key, Size keysize); @@ -587,6 +589,100 @@ SharedConnectionStatsShmemInit(void) } +/* + * AdaptiveConnectionManagementFlag returns the appropriate connection flag, + * regarding the adaptive connection management, based on the given + * activeConnectionCount to remote nodes. + * + * This function should only be called if the code-path is capable of handling + * optional connections. + */ +int +AdaptiveConnectionManagementFlag(int activeConnectionCount) +{ + if (UseConnectionPerPlacement()) + { + /* + * User wants one connection per placement, so no throttling is desired + * and we do not set any flags. + * + * The primary reason for this is that allowing multiple backends to use + * connection per placement could lead to unresolved self deadlocks. In other + * words, each backend may stuck waiting for other backends to get a slot + * in the shared connection counters. + */ + return 0; + } + else if (ShouldWaitForConnection(activeConnectionCount)) + { + /* + * We need this connection to finish the execution. If it is not + * available based on the current number of connections to the worker + * then wait for it. + */ + return WAIT_FOR_CONNECTION; + } + else + { + /* + * The execution can be finished the execution with a single connection, + * remaining are optional. If the execution can get more connections, + * it can increase the parallelism. + */ + return OPTIONAL_CONNECTION; + } +} + + +/* + * UseConnectionPerPlacement returns whether we should use a separate connection + * per placement even if another connection is idle. We mostly use this in testing + * scenarios. + */ +bool +UseConnectionPerPlacement(void) +{ + return ForceMaxQueryParallelization && + MultiShardConnectionType != SEQUENTIAL_CONNECTION; +} + + +/* + * ShouldWaitForConnection returns true if the workerPool should wait to + * get the next connection until one slot is empty within + * citus.max_shared_pool_size on the worker. Note that, if there is an + * empty slot, the connection will not wait anyway. + */ +static bool +ShouldWaitForConnection(int currentConnectionCount) +{ + if (currentConnectionCount == 0) + { + /* + * We definitely need at least 1 connection to finish the execution. + * All single shard queries hit here with the default settings. + */ + return true; + } + + if (currentConnectionCount < MaxCachedConnectionsPerWorker) + { + /* + * Until this session caches MaxCachedConnectionsPerWorker connections, + * this might lead some optional connections to be considered as non-optional + * when MaxCachedConnectionsPerWorker > 1. + * + * However, once the session caches MaxCachedConnectionsPerWorker (which is + * the second transaction executed in the session), Citus would utilize the + * cached connections as much as possible. + */ + return true; + } + + return false; +} + + static uint32 SharedConnectionHashHash(const void *key, Size keysize) { diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 54838f60c..9dcabfb86 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -155,6 +155,7 @@ #include "distributed/remote_commands.h" #include "distributed/repartition_join_execution.h" #include "distributed/resource_lock.h" +#include "distributed/shared_connection_stats.h" #include "distributed/subplan_execution.h" #include "distributed/transaction_management.h" #include "distributed/tuple_destination.h" @@ -596,14 +597,12 @@ static bool TaskListRequires2PC(List *taskList); static bool SelectForUpdateOnReferenceTable(List *taskList); static void AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution); static void UnclaimAllSessionConnections(List *sessionList); -static bool UseConnectionPerPlacement(void); static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task); static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int nodePort); static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection); static void ManageWorkerPool(WorkerPool *workerPool); -static bool ShouldWaitForConnection(WorkerPool *workerPool); static void CheckConnectionTimeout(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); @@ -1983,19 +1982,6 @@ SetAttributeInputMetadata(DistributedExecution *execution, } -/* - * UseConnectionPerPlacement returns whether we should use a separate connection - * per placement even if another connection is idle. We mostly use this in testing - * scenarios. - */ -static bool -UseConnectionPerPlacement(void) -{ - return ForceMaxQueryParallelization && - MultiShardConnectionType != SEQUENTIAL_CONNECTION; -} - - /* * ExecutionOrderForTask gives the appropriate execution order for a task. */ @@ -2508,36 +2494,13 @@ ManageWorkerPool(WorkerPool *workerPool) connectionFlags |= OUTSIDE_TRANSACTION; } - if (UseConnectionPerPlacement()) - { - /* - * User wants one connection per placement, so no throttling is desired - * and we do not set any flags. - * - * The primary reason for this is that allowing multiple backends to use - * connection per placement could lead to unresolved self deadlocks. In other - * words, each backend may stuck waiting for other backends to get a slot - * in the shared connection counters. - */ - } - else if (ShouldWaitForConnection(workerPool)) - { - /* - * We need this connection to finish the execution. If it is not - * available based on the current number of connections to the worker - * then wait for it. - */ - connectionFlags |= WAIT_FOR_CONNECTION; - } - else - { - /* - * The executor can finish the execution with a single connection, - * remaining are optional. If the executor can get more connections, - * it can increase the parallelism. - */ - connectionFlags |= OPTIONAL_CONNECTION; - } + /* + * Enforce the requirements for adaptive connection management (a.k.a., + * throttle connections if citus.max_shared_pool_size reached) + */ + int adaptiveConnectionManagementFlag = + AdaptiveConnectionManagementFlag(list_length(workerPool->sessionList)); + connectionFlags |= adaptiveConnectionManagementFlag; /* open a new connection to the worker */ MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags, @@ -2587,42 +2550,6 @@ ManageWorkerPool(WorkerPool *workerPool) } -/* - * ShouldWaitForConnection returns true if the workerPool should wait to - * get the next connection until one slot is empty within - * citus.max_shared_pool_size on the worker. Note that, if there is an - * empty slot, the connection will not wait anyway. - */ -static bool -ShouldWaitForConnection(WorkerPool *workerPool) -{ - if (list_length(workerPool->sessionList) == 0) - { - /* - * We definitely need at least 1 connection to finish the execution. - * All single shard queries hit here with the default settings. - */ - return true; - } - - if (list_length(workerPool->sessionList) < MaxCachedConnectionsPerWorker) - { - /* - * Until this session caches MaxCachedConnectionsPerWorker connections, - * this might lead some optional connections to be considered as non-optional - * when MaxCachedConnectionsPerWorker > 1. - * - * However, once the session caches MaxCachedConnectionsPerWorker (which is - * the second transaction executed in the session), Citus would utilize the - * cached connections as much as possible. - */ - return true; - } - - return false; -} - - /* * CheckConnectionTimeout makes sure that the execution enforces the connection * establishment timeout defined by the user (NodeConnectionTimeout). diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 29e050114..8db6256e3 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -22,5 +22,7 @@ extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port extern void WaitLoopForSharedConnection(const char *hostname, int port); extern void DecrementSharedConnectionCounter(const char *hostname, int port); extern void IncrementSharedConnectionCounter(const char *hostname, int port); +extern int AdaptiveConnectionManagementFlag(int activeConnectionCount); +extern bool UseConnectionPerPlacement(void); #endif /* SHARED_CONNECTION_STATS_H */