mirror of https://github.com/citusdata/citus.git
Executor takes connection establishment and task execution costs into account
With this commit, the executor becomes smarter about refrain to open new connections. The very basic example is that, if the connection establishments take 1000ms and task executions as 5 msecs, the executor becomes smart enough to not establish new connections.pull/4895/head
parent
28b0b4ebd1
commit
995adf1a19
|
@ -124,6 +124,7 @@
|
|||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
|
||||
#include <math.h>
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
|
||||
|
@ -414,6 +415,10 @@ typedef struct WorkerPool
|
|||
* use it anymore.
|
||||
*/
|
||||
WorkerPoolFailureState failureState;
|
||||
|
||||
/* execution statistics per pool, in microseconds */
|
||||
uint64 totalTaskExecutionTime;
|
||||
int totalExecutedTasks;
|
||||
} WorkerPool;
|
||||
|
||||
struct TaskPlacementExecution;
|
||||
|
@ -473,6 +478,7 @@ bool EnableBinaryProtocol = false;
|
|||
|
||||
/* GUC, number of ms to wait between opening connections to the same worker */
|
||||
int ExecutorSlowStartInterval = 10;
|
||||
bool EnableCostBasedConnectionEstablishment = true;
|
||||
|
||||
|
||||
/*
|
||||
|
@ -635,6 +641,12 @@ static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
|
|||
static void ManageWorkerPool(WorkerPool *workerPool);
|
||||
static bool ShouldWaitForSlowStart(WorkerPool *workerPool);
|
||||
static int CalculateNewConnectionCount(WorkerPool *workerPool);
|
||||
static bool UsingExistingSessionsCheaperThanEstablishingNewConnections(int
|
||||
readyTaskCount,
|
||||
WorkerPool *
|
||||
workerPool);
|
||||
static double AvgTaskExecutionTimeApproximation(WorkerPool *workerPool);
|
||||
static double AvgConnectionEstablishmentTime(WorkerPool *workerPool);
|
||||
static void OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
|
||||
TransactionProperties *transactionProperties);
|
||||
static void CheckConnectionTimeout(WorkerPool *workerPool);
|
||||
|
@ -680,6 +692,7 @@ static int RebuildWaitEventSet(DistributedExecution *execution);
|
|||
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
|
||||
eventCount, bool *cancellationReceived);
|
||||
static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
|
||||
static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
|
||||
static HeapTuple BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values);
|
||||
static AttInMetadata * TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc);
|
||||
static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey);
|
||||
|
@ -1642,8 +1655,10 @@ CleanUpSessions(DistributedExecution *execution)
|
|||
{
|
||||
MultiConnection *connection = session->connection;
|
||||
|
||||
ereport(DEBUG4, (errmsg("Total number of commands sent over the session %ld: %ld",
|
||||
session->sessionId, session->commandsSent)));
|
||||
ereport(DEBUG4, (errmsg("Total number of commands sent over the session %ld: %ld "
|
||||
"to node %s:%d", session->sessionId,
|
||||
session->commandsSent,
|
||||
connection->hostname, connection->port)));
|
||||
|
||||
UnclaimConnection(connection);
|
||||
|
||||
|
@ -2643,13 +2658,176 @@ CalculateNewConnectionCount(WorkerPool *workerPool)
|
|||
* than the target pool size.
|
||||
*/
|
||||
newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount);
|
||||
if (newConnectionCount > 0)
|
||||
{ }
|
||||
if (EnableCostBasedConnectionEstablishment && newConnectionCount > 0 &&
|
||||
initiatedConnectionCount <= MaxCachedConnectionsPerWorker &&
|
||||
UsingExistingSessionsCheaperThanEstablishingNewConnections(
|
||||
readyTaskCount, workerPool))
|
||||
{
|
||||
/*
|
||||
* Before giving the decision, we do one more check. If the cost of
|
||||
* executing the remaining tasks over the existing sessions in the
|
||||
* pool is cheaper than establishing new connections and executing
|
||||
* the tasks over the new connections, we prefer the former.
|
||||
*
|
||||
* For cached connections we should ignore any optimizations as
|
||||
* cached connections are almost free to get. In other words,
|
||||
* as long as there are cached connections that the pool has
|
||||
* not used yet, aggressively use these already established
|
||||
* connections.
|
||||
*
|
||||
* Note that until MaxCachedConnectionsPerWorker has already been
|
||||
* established within the session, we still need to establish
|
||||
* the connections right now.
|
||||
*
|
||||
* Also remember that we are not trying to find the optimal number
|
||||
* of connections for the remaining tasks here. Our goal is to prevent
|
||||
* connection establishments that are absolutely unnecessary. In the
|
||||
* future, we may improve the calculations below to find the optimal
|
||||
* number of new connections required.
|
||||
*/
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
return newConnectionCount;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UsingExistingSessionsCheaperThanEstablishingNewConnections returns true if
|
||||
* using the already established connections takes less time compared to opening
|
||||
* new connections based on the current execution's stats.
|
||||
*
|
||||
* The function returns false if the current execution has not established any connections
|
||||
* or finished any tasks (e.g., no stats to act on).
|
||||
*/
|
||||
static bool
|
||||
UsingExistingSessionsCheaperThanEstablishingNewConnections(int readyTaskCount,
|
||||
WorkerPool *workerPool)
|
||||
{
|
||||
int activeConnectionCount = workerPool->activeConnectionCount;
|
||||
if (workerPool->totalExecutedTasks < 1 || activeConnectionCount < 1)
|
||||
{
|
||||
/*
|
||||
* The pool has not finished any connection establishment or
|
||||
* task yet. So, we refrain from optimizing the execution.
|
||||
*/
|
||||
return false;
|
||||
}
|
||||
|
||||
double avgTaskExecutionTime = AvgTaskExecutionTimeApproximation(workerPool);
|
||||
double avgConnectionEstablishmentTime = AvgConnectionEstablishmentTime(workerPool);
|
||||
|
||||
/* we assume that we are halfway through the execution */
|
||||
double remainingTimeForActiveTaskExecutionsToFinish = avgTaskExecutionTime / 2;
|
||||
|
||||
/*
|
||||
* We use "newConnectionCount" as if it is the task count as
|
||||
* we are only interested in this iteration of CalculateNewConnectionCount().
|
||||
*/
|
||||
double totalTimeToExecuteNewTasks = avgTaskExecutionTime * readyTaskCount;
|
||||
|
||||
double estimatedExecutionTimeForNewTasks =
|
||||
floor(totalTimeToExecuteNewTasks / activeConnectionCount);
|
||||
|
||||
/*
|
||||
* First finish the already running tasks, and then use the connections
|
||||
* to execute the new tasks.
|
||||
*/
|
||||
double costOfExecutingTheTasksOverExistingConnections =
|
||||
remainingTimeForActiveTaskExecutionsToFinish +
|
||||
estimatedExecutionTimeForNewTasks;
|
||||
|
||||
/*
|
||||
* For every task, the executor is supposed to establish one
|
||||
* connection and then execute the task over the connection.
|
||||
*/
|
||||
double costOfExecutingTheTasksOverNewConnection =
|
||||
(avgTaskExecutionTime + avgConnectionEstablishmentTime);
|
||||
|
||||
return (costOfExecutingTheTasksOverExistingConnections <=
|
||||
costOfExecutingTheTasksOverNewConnection);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AvgTaskExecutionTimeApproximation returns the approximation of the average task
|
||||
* execution times on the workerPool.
|
||||
*/
|
||||
static double
|
||||
AvgTaskExecutionTimeApproximation(WorkerPool *workerPool)
|
||||
{
|
||||
uint64 totalTaskExecutionTime = workerPool->totalTaskExecutionTime;
|
||||
int taskCount = workerPool->totalExecutedTasks;
|
||||
|
||||
instr_time now;
|
||||
INSTR_TIME_SET_CURRENT(now);
|
||||
|
||||
WorkerSession *session = NULL;
|
||||
foreach_ptr(session, workerPool->sessionList)
|
||||
{
|
||||
/*
|
||||
* Involve the tasks that are currently running. We do this to
|
||||
* make sure that the execution responds with new connections
|
||||
* quickly if the actively running tasks
|
||||
*/
|
||||
TaskPlacementExecution *placementExecution = session->currentTask;
|
||||
if (placementExecution != NULL &&
|
||||
placementExecution->executionState == PLACEMENT_EXECUTION_RUNNING)
|
||||
{
|
||||
uint64 durationInMicroSecs =
|
||||
MicrosecondsBetweenTimestamps(placementExecution->startTime, now);
|
||||
|
||||
/*
|
||||
* Our approximation is that we assume that the task execution is
|
||||
* just in the halfway through.
|
||||
*/
|
||||
totalTaskExecutionTime += (2 * durationInMicroSecs);
|
||||
taskCount += 1;
|
||||
}
|
||||
}
|
||||
|
||||
return taskCount == 0 ? 0 : ((double) totalTaskExecutionTime / taskCount);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AvgConnectionEstablishmentTime calculates the average connection establishment times
|
||||
* for the input workerPool.
|
||||
*/
|
||||
static double
|
||||
AvgConnectionEstablishmentTime(WorkerPool *workerPool)
|
||||
{
|
||||
double totalTimeMicrosec = 0;
|
||||
int sessionCount = 0;
|
||||
|
||||
WorkerSession *session = NULL;
|
||||
foreach_ptr(session, workerPool->sessionList)
|
||||
{
|
||||
MultiConnection *connection = session->connection;
|
||||
|
||||
/*
|
||||
* There could be MaxCachedConnectionsPerWorker connections that are
|
||||
* already connected. Those connections might skew the average
|
||||
* connection establishment times for the current execution. The reason
|
||||
* is that they are established earlier and the connection establishment
|
||||
* times might be different at the moment those connections are established.
|
||||
*/
|
||||
if (connection->connectionState == MULTI_CONNECTION_CONNECTED)
|
||||
{
|
||||
long connectionEstablishmentTime =
|
||||
MicrosecondsBetweenTimestamps(connection->connectionEstablishmentStart,
|
||||
connection->connectionEstablishmentEnd);
|
||||
|
||||
totalTimeMicrosec += connectionEstablishmentTime;
|
||||
++sessionCount;
|
||||
}
|
||||
}
|
||||
|
||||
return (sessionCount == 0) ? 0 : (totalTimeMicrosec / sessionCount);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* OpenNewConnections opens the given amount of connections for the given workerPool.
|
||||
*/
|
||||
|
@ -2954,6 +3132,18 @@ MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* MicrosecondsBetweenTimestamps is a helper to get the number of microseconds
|
||||
* between timestamps.
|
||||
*/
|
||||
static uint64
|
||||
MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime)
|
||||
{
|
||||
INSTR_TIME_SUBTRACT(endTime, startTime);
|
||||
return INSTR_TIME_GET_MICROSEC(endTime);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ConnectionStateMachine opens a connection and descends into the transaction
|
||||
* state machine when ready.
|
||||
|
@ -3180,10 +3370,10 @@ HandleMultiConnectionSuccess(WorkerSession *session)
|
|||
MarkConnectionConnected(connection);
|
||||
|
||||
ereport(DEBUG4, (errmsg("established connection to %s:%d for "
|
||||
"session %ld in %ld msecs",
|
||||
"session %ld in %ld microseconds",
|
||||
connection->hostname, connection->port,
|
||||
session->sessionId,
|
||||
MillisecondsBetweenTimestamps(
|
||||
MicrosecondsBetweenTimestamps(
|
||||
connection->connectionEstablishmentStart,
|
||||
connection->connectionEstablishmentEnd))));
|
||||
|
||||
|
@ -4326,19 +4516,20 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede
|
|||
|
||||
Assert(INSTR_TIME_IS_ZERO(placementExecution->endTime));
|
||||
INSTR_TIME_SET_CURRENT(placementExecution->endTime);
|
||||
uint64 durationMicrosecs =
|
||||
MicrosecondsBetweenTimestamps(placementExecution->startTime,
|
||||
placementExecution->endTime);
|
||||
workerPool->totalTaskExecutionTime += durationMicrosecs;
|
||||
workerPool->totalExecutedTasks += 1;
|
||||
|
||||
if (IsLoggableLevel(DEBUG4))
|
||||
{
|
||||
long durationMillisecs =
|
||||
MillisecondsBetweenTimestamps(placementExecution->startTime,
|
||||
placementExecution->endTime);
|
||||
|
||||
ereport(DEBUG4, (errmsg("task execution (%d) for placement (%ld) on anchor "
|
||||
"shard (%ld) finished in %ld msecs on worker "
|
||||
"shard (%ld) finished in %ld microseconds on worker "
|
||||
"node %s:%d", shardCommandExecution->task->taskId,
|
||||
placementExecution->shardPlacement->placementId,
|
||||
shardCommandExecution->task->anchorShardId,
|
||||
durationMillisecs, workerPool->nodeName,
|
||||
durationMicrosecs, workerPool->nodeName,
|
||||
workerPool->nodePort)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -965,6 +965,18 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.enable_cost_based_connection_establishment",
|
||||
gettext_noop("When enabled the connection establishment times "
|
||||
"and task execution times into account for deciding "
|
||||
"whether or not to establish new connections."),
|
||||
NULL,
|
||||
&EnableCostBasedConnectionEstablishment,
|
||||
true,
|
||||
PGC_USERSET,
|
||||
GUC_NO_SHOW_ALL,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.explain_distributed_queries",
|
||||
gettext_noop("Enables Explain for distributed queries."),
|
||||
|
|
|
@ -8,8 +8,10 @@ extern bool ForceMaxQueryParallelization;
|
|||
extern int MaxAdaptiveExecutorPoolSize;
|
||||
extern bool EnableBinaryProtocol;
|
||||
|
||||
|
||||
/* GUC, number of ms to wait between opening connections to the same worker */
|
||||
extern int ExecutorSlowStartInterval;
|
||||
extern bool EnableCostBasedConnectionEstablishment;
|
||||
|
||||
extern bool ShouldRunTasksSequentially(List *taskList);
|
||||
extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported);
|
||||
|
|
Loading…
Reference in New Issue