Merge pull request #4895 from citusdata/conservative_connection_establishment

Executor takes connection establishment and task execution costs into account
pull/4923/head
Önder Kalacı 2021-05-19 15:57:22 +02:00 committed by GitHub
commit 61977a3c09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 218 additions and 13 deletions

View File

@ -124,6 +124,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h" #include "pgstat.h"
#include <math.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
@ -414,6 +415,10 @@ typedef struct WorkerPool
* use it anymore. * use it anymore.
*/ */
WorkerPoolFailureState failureState; WorkerPoolFailureState failureState;
/* execution statistics per pool, in microseconds */
uint64 totalTaskExecutionTime;
int totalExecutedTasks;
} WorkerPool; } WorkerPool;
struct TaskPlacementExecution; struct TaskPlacementExecution;
@ -473,6 +478,7 @@ bool EnableBinaryProtocol = false;
/* GUC, number of ms to wait between opening connections to the same worker */ /* GUC, number of ms to wait between opening connections to the same worker */
int ExecutorSlowStartInterval = 10; int ExecutorSlowStartInterval = 10;
bool EnableCostBasedConnectionEstablishment = true;
/* /*
@ -635,6 +641,12 @@ static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
static void ManageWorkerPool(WorkerPool *workerPool); static void ManageWorkerPool(WorkerPool *workerPool);
static bool ShouldWaitForSlowStart(WorkerPool *workerPool); static bool ShouldWaitForSlowStart(WorkerPool *workerPool);
static int CalculateNewConnectionCount(WorkerPool *workerPool); static int CalculateNewConnectionCount(WorkerPool *workerPool);
static bool UsingExistingSessionsCheaperThanEstablishingNewConnections(int
readyTaskCount,
WorkerPool *
workerPool);
static double AvgTaskExecutionTimeApproximation(WorkerPool *workerPool);
static double AvgConnectionEstablishmentTime(WorkerPool *workerPool);
static void OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, static void OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
TransactionProperties *transactionProperties); TransactionProperties *transactionProperties);
static void CheckConnectionTimeout(WorkerPool *workerPool); static void CheckConnectionTimeout(WorkerPool *workerPool);
@ -680,6 +692,7 @@ static int RebuildWaitEventSet(DistributedExecution *execution);
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
eventCount, bool *cancellationReceived); eventCount, bool *cancellationReceived);
static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime); static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
static HeapTuple BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values); static HeapTuple BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values);
static AttInMetadata * TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc); static AttInMetadata * TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc);
static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey); static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey);
@ -1642,8 +1655,10 @@ CleanUpSessions(DistributedExecution *execution)
{ {
MultiConnection *connection = session->connection; MultiConnection *connection = session->connection;
ereport(DEBUG4, (errmsg("Total number of commands sent over the session %ld: %ld", ereport(DEBUG4, (errmsg("Total number of commands sent over the session %ld: %ld "
session->sessionId, session->commandsSent))); "to node %s:%d", session->sessionId,
session->commandsSent,
connection->hostname, connection->port)));
UnclaimConnection(connection); UnclaimConnection(connection);
@ -2454,6 +2469,9 @@ ManageWorkerPool(WorkerPool *workerPool)
return; return;
} }
/* increase the open rate every cycle (like TCP slow start) */
workerPool->maxNewConnectionsPerCycle += 1;
OpenNewConnections(workerPool, newConnectionCount, execution->transactionProperties); OpenNewConnections(workerPool, newConnectionCount, execution->transactionProperties);
/* /*
@ -2640,16 +2658,176 @@ CalculateNewConnectionCount(WorkerPool *workerPool)
* than the target pool size. * than the target pool size.
*/ */
newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount); newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount);
if (newConnectionCount > 0) if (EnableCostBasedConnectionEstablishment && newConnectionCount > 0 &&
initiatedConnectionCount <= MaxCachedConnectionsPerWorker &&
UsingExistingSessionsCheaperThanEstablishingNewConnections(
readyTaskCount, workerPool))
{ {
/* increase the open rate every cycle (like TCP slow start) */ /*
workerPool->maxNewConnectionsPerCycle += 1; * Before giving the decision, we do one more check. If the cost of
* executing the remaining tasks over the existing sessions in the
* pool is cheaper than establishing new connections and executing
* the tasks over the new connections, we prefer the former.
*
* For cached connections we should ignore any optimizations as
* cached connections are almost free to get. In other words,
* as long as there are cached connections that the pool has
* not used yet, aggressively use these already established
* connections.
*
* Note that until MaxCachedConnectionsPerWorker has already been
* established within the session, we still need to establish
* the connections right now.
*
* Also remember that we are not trying to find the optimal number
* of connections for the remaining tasks here. Our goal is to prevent
* connection establishments that are absolutely unnecessary. In the
* future, we may improve the calculations below to find the optimal
* number of new connections required.
*/
return 0;
} }
} }
return newConnectionCount; return newConnectionCount;
} }
/*
* UsingExistingSessionsCheaperThanEstablishingNewConnections returns true if
* using the already established connections takes less time compared to opening
* new connections based on the current execution's stats.
*
* The function returns false if the current execution has not established any connections
* or finished any tasks (e.g., no stats to act on).
*/
static bool
UsingExistingSessionsCheaperThanEstablishingNewConnections(int readyTaskCount,
WorkerPool *workerPool)
{
int activeConnectionCount = workerPool->activeConnectionCount;
if (workerPool->totalExecutedTasks < 1 || activeConnectionCount < 1)
{
/*
* The pool has not finished any connection establishment or
* task yet. So, we refrain from optimizing the execution.
*/
return false;
}
double avgTaskExecutionTime = AvgTaskExecutionTimeApproximation(workerPool);
double avgConnectionEstablishmentTime = AvgConnectionEstablishmentTime(workerPool);
/* we assume that we are halfway through the execution */
double remainingTimeForActiveTaskExecutionsToFinish = avgTaskExecutionTime / 2;
/*
* We use "newConnectionCount" as if it is the task count as
* we are only interested in this iteration of CalculateNewConnectionCount().
*/
double totalTimeToExecuteNewTasks = avgTaskExecutionTime * readyTaskCount;
double estimatedExecutionTimeForNewTasks =
floor(totalTimeToExecuteNewTasks / activeConnectionCount);
/*
* First finish the already running tasks, and then use the connections
* to execute the new tasks.
*/
double costOfExecutingTheTasksOverExistingConnections =
remainingTimeForActiveTaskExecutionsToFinish +
estimatedExecutionTimeForNewTasks;
/*
* For every task, the executor is supposed to establish one
* connection and then execute the task over the connection.
*/
double costOfExecutingTheTasksOverNewConnection =
(avgTaskExecutionTime + avgConnectionEstablishmentTime);
return (costOfExecutingTheTasksOverExistingConnections <=
costOfExecutingTheTasksOverNewConnection);
}
/*
* AvgTaskExecutionTimeApproximation returns the approximation of the average task
* execution times on the workerPool.
*/
static double
AvgTaskExecutionTimeApproximation(WorkerPool *workerPool)
{
uint64 totalTaskExecutionTime = workerPool->totalTaskExecutionTime;
int taskCount = workerPool->totalExecutedTasks;
instr_time now;
INSTR_TIME_SET_CURRENT(now);
WorkerSession *session = NULL;
foreach_ptr(session, workerPool->sessionList)
{
/*
* Involve the tasks that are currently running. We do this to
* make sure that the execution responds with new connections
* quickly if the actively running tasks
*/
TaskPlacementExecution *placementExecution = session->currentTask;
if (placementExecution != NULL &&
placementExecution->executionState == PLACEMENT_EXECUTION_RUNNING)
{
uint64 durationInMicroSecs =
MicrosecondsBetweenTimestamps(placementExecution->startTime, now);
/*
* Our approximation is that we assume that the task execution is
* just in the halfway through.
*/
totalTaskExecutionTime += (2 * durationInMicroSecs);
taskCount += 1;
}
}
return taskCount == 0 ? 0 : ((double) totalTaskExecutionTime / taskCount);
}
/*
* AvgConnectionEstablishmentTime calculates the average connection establishment times
* for the input workerPool.
*/
static double
AvgConnectionEstablishmentTime(WorkerPool *workerPool)
{
double totalTimeMicrosec = 0;
int sessionCount = 0;
WorkerSession *session = NULL;
foreach_ptr(session, workerPool->sessionList)
{
MultiConnection *connection = session->connection;
/*
* There could be MaxCachedConnectionsPerWorker connections that are
* already connected. Those connections might skew the average
* connection establishment times for the current execution. The reason
* is that they are established earlier and the connection establishment
* times might be different at the moment those connections are established.
*/
if (connection->connectionState == MULTI_CONNECTION_CONNECTED)
{
long connectionEstablishmentTime =
MicrosecondsBetweenTimestamps(connection->connectionEstablishmentStart,
connection->connectionEstablishmentEnd);
totalTimeMicrosec += connectionEstablishmentTime;
++sessionCount;
}
}
return (sessionCount == 0) ? 0 : (totalTimeMicrosec / sessionCount);
}
/* /*
* OpenNewConnections opens the given amount of connections for the given workerPool. * OpenNewConnections opens the given amount of connections for the given workerPool.
*/ */
@ -2954,6 +3132,18 @@ MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime)
} }
/*
* MicrosecondsBetweenTimestamps is a helper to get the number of microseconds
* between timestamps.
*/
static uint64
MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime)
{
INSTR_TIME_SUBTRACT(endTime, startTime);
return INSTR_TIME_GET_MICROSEC(endTime);
}
/* /*
* ConnectionStateMachine opens a connection and descends into the transaction * ConnectionStateMachine opens a connection and descends into the transaction
* state machine when ready. * state machine when ready.
@ -3180,10 +3370,10 @@ HandleMultiConnectionSuccess(WorkerSession *session)
MarkConnectionConnected(connection); MarkConnectionConnected(connection);
ereport(DEBUG4, (errmsg("established connection to %s:%d for " ereport(DEBUG4, (errmsg("established connection to %s:%d for "
"session %ld in %ld msecs", "session %ld in %ld microseconds",
connection->hostname, connection->port, connection->hostname, connection->port,
session->sessionId, session->sessionId,
MillisecondsBetweenTimestamps( MicrosecondsBetweenTimestamps(
connection->connectionEstablishmentStart, connection->connectionEstablishmentStart,
connection->connectionEstablishmentEnd)))); connection->connectionEstablishmentEnd))));
@ -4326,19 +4516,20 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede
Assert(INSTR_TIME_IS_ZERO(placementExecution->endTime)); Assert(INSTR_TIME_IS_ZERO(placementExecution->endTime));
INSTR_TIME_SET_CURRENT(placementExecution->endTime); INSTR_TIME_SET_CURRENT(placementExecution->endTime);
uint64 durationMicrosecs =
MicrosecondsBetweenTimestamps(placementExecution->startTime,
placementExecution->endTime);
workerPool->totalTaskExecutionTime += durationMicrosecs;
workerPool->totalExecutedTasks += 1;
if (IsLoggableLevel(DEBUG4)) if (IsLoggableLevel(DEBUG4))
{ {
long durationMillisecs =
MillisecondsBetweenTimestamps(placementExecution->startTime,
placementExecution->endTime);
ereport(DEBUG4, (errmsg("task execution (%d) for placement (%ld) on anchor " ereport(DEBUG4, (errmsg("task execution (%d) for placement (%ld) on anchor "
"shard (%ld) finished in %ld msecs on worker " "shard (%ld) finished in %ld microseconds on worker "
"node %s:%d", shardCommandExecution->task->taskId, "node %s:%d", shardCommandExecution->task->taskId,
placementExecution->shardPlacement->placementId, placementExecution->shardPlacement->placementId,
shardCommandExecution->task->anchorShardId, shardCommandExecution->task->anchorShardId,
durationMillisecs, workerPool->nodeName, durationMicrosecs, workerPool->nodeName,
workerPool->nodePort))); workerPool->nodePort)));
} }
} }

View File

@ -965,6 +965,18 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_cost_based_connection_establishment",
gettext_noop("When enabled the connection establishment times "
"and task execution times into account for deciding "
"whether or not to establish new connections."),
NULL,
&EnableCostBasedConnectionEstablishment,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.explain_distributed_queries", "citus.explain_distributed_queries",
gettext_noop("Enables Explain for distributed queries."), gettext_noop("Enables Explain for distributed queries."),

View File

@ -8,8 +8,10 @@ extern bool ForceMaxQueryParallelization;
extern int MaxAdaptiveExecutorPoolSize; extern int MaxAdaptiveExecutorPoolSize;
extern bool EnableBinaryProtocol; extern bool EnableBinaryProtocol;
/* GUC, number of ms to wait between opening connections to the same worker */ /* GUC, number of ms to wait between opening connections to the same worker */
extern int ExecutorSlowStartInterval; extern int ExecutorSlowStartInterval;
extern bool EnableCostBasedConnectionEstablishment;
extern bool ShouldRunTasksSequentially(List *taskList); extern bool ShouldRunTasksSequentially(List *taskList);
extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported); extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported);