diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 66c486f47..54ae7c625 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -124,6 +124,7 @@ #include "miscadmin.h" #include "pgstat.h" +#include #include #include @@ -414,6 +415,10 @@ typedef struct WorkerPool * use it anymore. */ WorkerPoolFailureState failureState; + + /* execution statistics per pool, in microseconds */ + uint64 totalTaskExecutionTime; + int totalExecutedTasks; } WorkerPool; struct TaskPlacementExecution; @@ -473,6 +478,7 @@ bool EnableBinaryProtocol = false; /* GUC, number of ms to wait between opening connections to the same worker */ int ExecutorSlowStartInterval = 10; +bool EnableCostBasedConnectionEstablishment = true; /* @@ -635,6 +641,12 @@ static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, static void ManageWorkerPool(WorkerPool *workerPool); static bool ShouldWaitForSlowStart(WorkerPool *workerPool); static int CalculateNewConnectionCount(WorkerPool *workerPool); +static bool UsingExistingSessionsCheaperThanEstablishingNewConnections(int + readyTaskCount, + WorkerPool * + workerPool); +static double AvgTaskExecutionTimeApproximation(WorkerPool *workerPool); +static double AvgConnectionEstablishmentTime(WorkerPool *workerPool); static void OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, TransactionProperties *transactionProperties); static void CheckConnectionTimeout(WorkerPool *workerPool); @@ -680,6 +692,7 @@ static int RebuildWaitEventSet(DistributedExecution *execution); static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount, bool *cancellationReceived); static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime); +static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime); static HeapTuple BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values); static AttInMetadata * TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc); static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey); @@ -1642,8 +1655,10 @@ CleanUpSessions(DistributedExecution *execution) { MultiConnection *connection = session->connection; - ereport(DEBUG4, (errmsg("Total number of commands sent over the session %ld: %ld", - session->sessionId, session->commandsSent))); + ereport(DEBUG4, (errmsg("Total number of commands sent over the session %ld: %ld " + "to node %s:%d", session->sessionId, + session->commandsSent, + connection->hostname, connection->port))); UnclaimConnection(connection); @@ -2454,6 +2469,9 @@ ManageWorkerPool(WorkerPool *workerPool) return; } + /* increase the open rate every cycle (like TCP slow start) */ + workerPool->maxNewConnectionsPerCycle += 1; + OpenNewConnections(workerPool, newConnectionCount, execution->transactionProperties); /* @@ -2640,16 +2658,176 @@ CalculateNewConnectionCount(WorkerPool *workerPool) * than the target pool size. */ newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount); - if (newConnectionCount > 0) + if (EnableCostBasedConnectionEstablishment && newConnectionCount > 0 && + initiatedConnectionCount <= MaxCachedConnectionsPerWorker && + UsingExistingSessionsCheaperThanEstablishingNewConnections( + readyTaskCount, workerPool)) { - /* increase the open rate every cycle (like TCP slow start) */ - workerPool->maxNewConnectionsPerCycle += 1; + /* + * Before giving the decision, we do one more check. If the cost of + * executing the remaining tasks over the existing sessions in the + * pool is cheaper than establishing new connections and executing + * the tasks over the new connections, we prefer the former. + * + * For cached connections we should ignore any optimizations as + * cached connections are almost free to get. In other words, + * as long as there are cached connections that the pool has + * not used yet, aggressively use these already established + * connections. + * + * Note that until MaxCachedConnectionsPerWorker has already been + * established within the session, we still need to establish + * the connections right now. + * + * Also remember that we are not trying to find the optimal number + * of connections for the remaining tasks here. Our goal is to prevent + * connection establishments that are absolutely unnecessary. In the + * future, we may improve the calculations below to find the optimal + * number of new connections required. + */ + return 0; } } + return newConnectionCount; } +/* + * UsingExistingSessionsCheaperThanEstablishingNewConnections returns true if + * using the already established connections takes less time compared to opening + * new connections based on the current execution's stats. + * + * The function returns false if the current execution has not established any connections + * or finished any tasks (e.g., no stats to act on). + */ +static bool +UsingExistingSessionsCheaperThanEstablishingNewConnections(int readyTaskCount, + WorkerPool *workerPool) +{ + int activeConnectionCount = workerPool->activeConnectionCount; + if (workerPool->totalExecutedTasks < 1 || activeConnectionCount < 1) + { + /* + * The pool has not finished any connection establishment or + * task yet. So, we refrain from optimizing the execution. + */ + return false; + } + + double avgTaskExecutionTime = AvgTaskExecutionTimeApproximation(workerPool); + double avgConnectionEstablishmentTime = AvgConnectionEstablishmentTime(workerPool); + + /* we assume that we are halfway through the execution */ + double remainingTimeForActiveTaskExecutionsToFinish = avgTaskExecutionTime / 2; + + /* + * We use "newConnectionCount" as if it is the task count as + * we are only interested in this iteration of CalculateNewConnectionCount(). + */ + double totalTimeToExecuteNewTasks = avgTaskExecutionTime * readyTaskCount; + + double estimatedExecutionTimeForNewTasks = + floor(totalTimeToExecuteNewTasks / activeConnectionCount); + + /* + * First finish the already running tasks, and then use the connections + * to execute the new tasks. + */ + double costOfExecutingTheTasksOverExistingConnections = + remainingTimeForActiveTaskExecutionsToFinish + + estimatedExecutionTimeForNewTasks; + + /* + * For every task, the executor is supposed to establish one + * connection and then execute the task over the connection. + */ + double costOfExecutingTheTasksOverNewConnection = + (avgTaskExecutionTime + avgConnectionEstablishmentTime); + + return (costOfExecutingTheTasksOverExistingConnections <= + costOfExecutingTheTasksOverNewConnection); +} + + +/* + * AvgTaskExecutionTimeApproximation returns the approximation of the average task + * execution times on the workerPool. + */ +static double +AvgTaskExecutionTimeApproximation(WorkerPool *workerPool) +{ + uint64 totalTaskExecutionTime = workerPool->totalTaskExecutionTime; + int taskCount = workerPool->totalExecutedTasks; + + instr_time now; + INSTR_TIME_SET_CURRENT(now); + + WorkerSession *session = NULL; + foreach_ptr(session, workerPool->sessionList) + { + /* + * Involve the tasks that are currently running. We do this to + * make sure that the execution responds with new connections + * quickly if the actively running tasks + */ + TaskPlacementExecution *placementExecution = session->currentTask; + if (placementExecution != NULL && + placementExecution->executionState == PLACEMENT_EXECUTION_RUNNING) + { + uint64 durationInMicroSecs = + MicrosecondsBetweenTimestamps(placementExecution->startTime, now); + + /* + * Our approximation is that we assume that the task execution is + * just in the halfway through. + */ + totalTaskExecutionTime += (2 * durationInMicroSecs); + taskCount += 1; + } + } + + return taskCount == 0 ? 0 : ((double) totalTaskExecutionTime / taskCount); +} + + +/* + * AvgConnectionEstablishmentTime calculates the average connection establishment times + * for the input workerPool. + */ +static double +AvgConnectionEstablishmentTime(WorkerPool *workerPool) +{ + double totalTimeMicrosec = 0; + int sessionCount = 0; + + WorkerSession *session = NULL; + foreach_ptr(session, workerPool->sessionList) + { + MultiConnection *connection = session->connection; + + /* + * There could be MaxCachedConnectionsPerWorker connections that are + * already connected. Those connections might skew the average + * connection establishment times for the current execution. The reason + * is that they are established earlier and the connection establishment + * times might be different at the moment those connections are established. + */ + if (connection->connectionState == MULTI_CONNECTION_CONNECTED) + { + long connectionEstablishmentTime = + MicrosecondsBetweenTimestamps(connection->connectionEstablishmentStart, + connection->connectionEstablishmentEnd); + + totalTimeMicrosec += connectionEstablishmentTime; + ++sessionCount; + } + } + + return (sessionCount == 0) ? 0 : (totalTimeMicrosec / sessionCount); +} + + /* * OpenNewConnections opens the given amount of connections for the given workerPool. */ @@ -2954,6 +3132,18 @@ MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime) } +/* + * MicrosecondsBetweenTimestamps is a helper to get the number of microseconds + * between timestamps. + */ +static uint64 +MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime) +{ + INSTR_TIME_SUBTRACT(endTime, startTime); + return INSTR_TIME_GET_MICROSEC(endTime); +} + + /* * ConnectionStateMachine opens a connection and descends into the transaction * state machine when ready. @@ -3180,10 +3370,10 @@ HandleMultiConnectionSuccess(WorkerSession *session) MarkConnectionConnected(connection); ereport(DEBUG4, (errmsg("established connection to %s:%d for " - "session %ld in %ld msecs", + "session %ld in %ld microseconds", connection->hostname, connection->port, session->sessionId, - MillisecondsBetweenTimestamps( + MicrosecondsBetweenTimestamps( connection->connectionEstablishmentStart, connection->connectionEstablishmentEnd)))); @@ -4326,19 +4516,20 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede Assert(INSTR_TIME_IS_ZERO(placementExecution->endTime)); INSTR_TIME_SET_CURRENT(placementExecution->endTime); + uint64 durationMicrosecs = + MicrosecondsBetweenTimestamps(placementExecution->startTime, + placementExecution->endTime); + workerPool->totalTaskExecutionTime += durationMicrosecs; + workerPool->totalExecutedTasks += 1; if (IsLoggableLevel(DEBUG4)) { - long durationMillisecs = - MillisecondsBetweenTimestamps(placementExecution->startTime, - placementExecution->endTime); - ereport(DEBUG4, (errmsg("task execution (%d) for placement (%ld) on anchor " - "shard (%ld) finished in %ld msecs on worker " + "shard (%ld) finished in %ld microseconds on worker " "node %s:%d", shardCommandExecution->task->taskId, placementExecution->shardPlacement->placementId, shardCommandExecution->task->anchorShardId, - durationMillisecs, workerPool->nodeName, + durationMicrosecs, workerPool->nodeName, workerPool->nodePort))); } } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 17fad37a4..ad59e524c 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -965,6 +965,18 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_cost_based_connection_establishment", + gettext_noop("When enabled the connection establishment times " + "and task execution times into account for deciding " + "whether or not to establish new connections."), + NULL, + &EnableCostBasedConnectionEstablishment, + true, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.explain_distributed_queries", gettext_noop("Enables Explain for distributed queries."), diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index 3affd1877..111d886c0 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -8,8 +8,10 @@ extern bool ForceMaxQueryParallelization; extern int MaxAdaptiveExecutorPoolSize; extern bool EnableBinaryProtocol; + /* GUC, number of ms to wait between opening connections to the same worker */ extern int ExecutorSlowStartInterval; +extern bool EnableCostBasedConnectionEstablishment; extern bool ShouldRunTasksSequentially(List *taskList); extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported);