Merge pull request #4052 from citusdata/refactor_adaptive_flags

Move executor specific logic to a function
pull/3298/head
Önder Kalacı 2020-07-22 16:31:15 +02:00 committed by GitHub
commit 2c8066a313
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 106 additions and 81 deletions

View File

@ -25,6 +25,7 @@
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/time_constants.h"
#include "distributed/tuplestore.h"
@ -104,6 +105,7 @@ static void LockConnectionSharedMemory(LWLockMode lockMode);
static void UnLockConnectionSharedMemory(void);
static void SharedConnectionStatsShmemInit(void);
static size_t SharedConnectionStatsShmemSize(void);
static bool ShouldWaitForConnection(int currentConnectionCount);
static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize);
static uint32 SharedConnectionHashHash(const void *key, Size keysize);
@ -587,6 +589,100 @@ SharedConnectionStatsShmemInit(void)
}
/*
* AdaptiveConnectionManagementFlag returns the appropriate connection flag,
* regarding the adaptive connection management, based on the given
* activeConnectionCount to remote nodes.
*
* This function should only be called if the code-path is capable of handling
* optional connections.
*/
int
AdaptiveConnectionManagementFlag(int activeConnectionCount)
{
if (UseConnectionPerPlacement())
{
/*
* User wants one connection per placement, so no throttling is desired
* and we do not set any flags.
*
* The primary reason for this is that allowing multiple backends to use
* connection per placement could lead to unresolved self deadlocks. In other
* words, each backend may stuck waiting for other backends to get a slot
* in the shared connection counters.
*/
return 0;
}
else if (ShouldWaitForConnection(activeConnectionCount))
{
/*
* We need this connection to finish the execution. If it is not
* available based on the current number of connections to the worker
* then wait for it.
*/
return WAIT_FOR_CONNECTION;
}
else
{
/*
* The execution can be finished the execution with a single connection,
* remaining are optional. If the execution can get more connections,
* it can increase the parallelism.
*/
return OPTIONAL_CONNECTION;
}
}
/*
* UseConnectionPerPlacement returns whether we should use a separate connection
* per placement even if another connection is idle. We mostly use this in testing
* scenarios.
*/
bool
UseConnectionPerPlacement(void)
{
return ForceMaxQueryParallelization &&
MultiShardConnectionType != SEQUENTIAL_CONNECTION;
}
/*
* ShouldWaitForConnection returns true if the workerPool should wait to
* get the next connection until one slot is empty within
* citus.max_shared_pool_size on the worker. Note that, if there is an
* empty slot, the connection will not wait anyway.
*/
static bool
ShouldWaitForConnection(int currentConnectionCount)
{
if (currentConnectionCount == 0)
{
/*
* We definitely need at least 1 connection to finish the execution.
* All single shard queries hit here with the default settings.
*/
return true;
}
if (currentConnectionCount < MaxCachedConnectionsPerWorker)
{
/*
* Until this session caches MaxCachedConnectionsPerWorker connections,
* this might lead some optional connections to be considered as non-optional
* when MaxCachedConnectionsPerWorker > 1.
*
* However, once the session caches MaxCachedConnectionsPerWorker (which is
* the second transaction executed in the session), Citus would utilize the
* cached connections as much as possible.
*/
return true;
}
return false;
}
static uint32
SharedConnectionHashHash(const void *key, Size keysize)
{

View File

@ -155,6 +155,7 @@
#include "distributed/remote_commands.h"
#include "distributed/repartition_join_execution.h"
#include "distributed/resource_lock.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/subplan_execution.h"
#include "distributed/transaction_management.h"
#include "distributed/tuple_destination.h"
@ -596,14 +597,12 @@ static bool TaskListRequires2PC(List *taskList);
static bool SelectForUpdateOnReferenceTable(List *taskList);
static void AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution);
static void UnclaimAllSessionConnections(List *sessionList);
static bool UseConnectionPerPlacement(void);
static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task);
static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution,
char *nodeName, int nodePort);
static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
MultiConnection *connection);
static void ManageWorkerPool(WorkerPool *workerPool);
static bool ShouldWaitForConnection(WorkerPool *workerPool);
static void CheckConnectionTimeout(WorkerPool *workerPool);
static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution);
@ -1983,19 +1982,6 @@ SetAttributeInputMetadata(DistributedExecution *execution,
}
/*
* UseConnectionPerPlacement returns whether we should use a separate connection
* per placement even if another connection is idle. We mostly use this in testing
* scenarios.
*/
static bool
UseConnectionPerPlacement(void)
{
return ForceMaxQueryParallelization &&
MultiShardConnectionType != SEQUENTIAL_CONNECTION;
}
/*
* ExecutionOrderForTask gives the appropriate execution order for a task.
*/
@ -2508,36 +2494,13 @@ ManageWorkerPool(WorkerPool *workerPool)
connectionFlags |= OUTSIDE_TRANSACTION;
}
if (UseConnectionPerPlacement())
{
/*
* User wants one connection per placement, so no throttling is desired
* and we do not set any flags.
*
* The primary reason for this is that allowing multiple backends to use
* connection per placement could lead to unresolved self deadlocks. In other
* words, each backend may stuck waiting for other backends to get a slot
* in the shared connection counters.
*/
}
else if (ShouldWaitForConnection(workerPool))
{
/*
* We need this connection to finish the execution. If it is not
* available based on the current number of connections to the worker
* then wait for it.
*/
connectionFlags |= WAIT_FOR_CONNECTION;
}
else
{
/*
* The executor can finish the execution with a single connection,
* remaining are optional. If the executor can get more connections,
* it can increase the parallelism.
*/
connectionFlags |= OPTIONAL_CONNECTION;
}
/*
* Enforce the requirements for adaptive connection management (a.k.a.,
* throttle connections if citus.max_shared_pool_size reached)
*/
int adaptiveConnectionManagementFlag =
AdaptiveConnectionManagementFlag(list_length(workerPool->sessionList));
connectionFlags |= adaptiveConnectionManagementFlag;
/* open a new connection to the worker */
MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
@ -2587,42 +2550,6 @@ ManageWorkerPool(WorkerPool *workerPool)
}
/*
* ShouldWaitForConnection returns true if the workerPool should wait to
* get the next connection until one slot is empty within
* citus.max_shared_pool_size on the worker. Note that, if there is an
* empty slot, the connection will not wait anyway.
*/
static bool
ShouldWaitForConnection(WorkerPool *workerPool)
{
if (list_length(workerPool->sessionList) == 0)
{
/*
* We definitely need at least 1 connection to finish the execution.
* All single shard queries hit here with the default settings.
*/
return true;
}
if (list_length(workerPool->sessionList) < MaxCachedConnectionsPerWorker)
{
/*
* Until this session caches MaxCachedConnectionsPerWorker connections,
* this might lead some optional connections to be considered as non-optional
* when MaxCachedConnectionsPerWorker > 1.
*
* However, once the session caches MaxCachedConnectionsPerWorker (which is
* the second transaction executed in the session), Citus would utilize the
* cached connections as much as possible.
*/
return true;
}
return false;
}
/*
* CheckConnectionTimeout makes sure that the execution enforces the connection
* establishment timeout defined by the user (NodeConnectionTimeout).

View File

@ -22,5 +22,7 @@ extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port
extern void WaitLoopForSharedConnection(const char *hostname, int port);
extern void DecrementSharedConnectionCounter(const char *hostname, int port);
extern void IncrementSharedConnectionCounter(const char *hostname, int port);
extern int AdaptiveConnectionManagementFlag(int activeConnectionCount);
extern bool UseConnectionPerPlacement(void);
#endif /* SHARED_CONNECTION_STATS_H */