Merge pull request #2848 from citusdata/adaptive_executor_tuning

Adaptive executor performance improvements
pull/2852/head
Marco Slot 2019-07-25 16:52:36 +02:00 committed by GitHub
commit c471d9680c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 140 additions and 83 deletions

View File

@ -333,7 +333,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
connection = StartConnectionEstablishment(&key); connection = StartConnectionEstablishment(&key);
dlist_push_tail(entry->connections, &connection->connectionNode); dlist_push_tail(entry->connections, &connection->connectionNode);
entry->connectionCount++;
ResetShardPlacementAssociation(connection); ResetShardPlacementAssociation(connection);
@ -1065,8 +1064,6 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
cachedConnectionCount++; cachedConnectionCount++;
} }
} }
entry->connectionCount = cachedConnectionCount;
} }
@ -1172,29 +1169,3 @@ TrimLogLevel(const char *message)
return chompedMessage + n; return chompedMessage + n;
} }
/*
* NodeConnectionCount gets the number of connections to the given node
* for the current username and database.
*/
int
NodeConnectionCount(char *hostname, int port)
{
ConnectionHashKey key;
ConnectionHashEntry *entry = NULL;
bool found = false;
strlcpy(key.hostname, hostname, MAX_NODE_LENGTH);
key.port = port;
strlcpy(key.user, CurrentUserName(), NAMEDATALEN);
strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN);
entry = (ConnectionHashEntry *) hash_search(ConnectionHash, &key, HASH_FIND, &found);
if (!found)
{
return 0;
}
return entry->connectionCount;
}

View File

@ -290,7 +290,8 @@ typedef struct WorkerPool
DistributedExecution *distributedExecution; DistributedExecution *distributedExecution;
/* worker node on which we have a pool of sessions */ /* worker node on which we have a pool of sessions */
WorkerNode *node; char *nodeName;
int nodePort;
/* all sessions on the worker that are part of the current execution */ /* all sessions on the worker that are part of the current execution */
List *sessionList; List *sessionList;
@ -386,6 +387,12 @@ typedef struct WorkerSession
* distributed transaction related commands such as BEGIN/COMMIT etc. * distributed transaction related commands such as BEGIN/COMMIT etc.
*/ */
uint64 commandsSent; uint64 commandsSent;
/* index in the wait event set */
int waitEventSetIndex;
/* events reported by the latest call to WaitEventSetWait */
int latestUnconsumedWaitEvents;
} WorkerSession; } WorkerSession;
@ -531,7 +538,7 @@ static void UnclaimAllSessionConnections(List *sessionList);
static bool UseConnectionPerPlacement(void); static bool UseConnectionPerPlacement(void);
static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task); static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task);
static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution,
WorkerNode *workerNode); char *nodeName, int nodePort);
static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
MultiConnection *connection); MultiConnection *connection);
static void ManageWorkerPool(WorkerPool *workerPool); static void ManageWorkerPool(WorkerPool *workerPool);
@ -540,6 +547,7 @@ static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution); static long NextEventTimeout(DistributedExecution *execution);
static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime); static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime);
static WaitEventSet * BuildWaitEventSet(List *sessionList); static WaitEventSet * BuildWaitEventSet(List *sessionList);
static void UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList);
static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session);
static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session);
static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool); static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool);
@ -1119,8 +1127,8 @@ CleanUpSessions(DistributedExecution *execution)
MultiConnection *connection = session->connection; MultiConnection *connection = session->connection;
elog(DEBUG4, "Total number of commands sent over the session %ld: %ld", ereport(DEBUG4, (errmsg("Total number of commands sent over the session %ld: %ld",
session->sessionId, session->commandsSent); session->sessionId, session->commandsSent)));
UnclaimConnection(connection); UnclaimConnection(connection);
@ -1168,8 +1176,8 @@ CleanUpSessions(DistributedExecution *execution)
* the connection and everything is cleared up. Otherwise, we'd have been * the connection and everything is cleared up. Otherwise, we'd have been
* on MULTI_CONNECTION_FAILED state. * on MULTI_CONNECTION_FAILED state.
*/ */
elog(WARNING, "unexpected transaction state at the end of execution: %d", ereport(WARNING, (errmsg("unexpected transaction state at the end of "
transactionState); "execution: %d", transactionState)));
} }
/* get ready for the next executions if we need use the same connection */ /* get ready for the next executions if we need use the same connection */
@ -1177,8 +1185,8 @@ CleanUpSessions(DistributedExecution *execution)
} }
else else
{ {
elog(WARNING, "unexpected connection state at the end of execution: %d", ereport(WARNING, (errmsg("unexpected connection state at the end of "
connection->connectionState); "execution: %d", connection->connectionState)));
} }
} }
} }
@ -1251,9 +1259,10 @@ AssignTasksToConnections(DistributedExecution *execution)
MultiConnection *connection = NULL; MultiConnection *connection = NULL;
int connectionFlags = 0; int connectionFlags = 0;
TaskPlacementExecution *placementExecution = NULL; TaskPlacementExecution *placementExecution = NULL;
WorkerNode *node = FindWorkerNode(taskPlacement->nodeName, char *nodeName = taskPlacement->nodeName;
taskPlacement->nodePort); int nodePort = taskPlacement->nodePort;
WorkerPool *workerPool = FindOrCreateWorkerPool(execution, node); WorkerPool *workerPool = FindOrCreateWorkerPool(execution, nodeName,
nodePort);
/* /*
* Execution of a command on a shard placement, which may not always * Execution of a command on a shard placement, which may not always
@ -1299,8 +1308,9 @@ AssignTasksToConnections(DistributedExecution *execution)
WorkerSession *session = WorkerSession *session =
FindOrCreateWorkerSession(workerPool, connection); FindOrCreateWorkerSession(workerPool, connection);
elog(DEBUG4, "Session %ld (%s:%d) has an assigned task", ereport(DEBUG4, (errmsg("Session %ld (%s:%d) has an assigned task",
session->sessionId, connection->hostname, connection->port); session->sessionId, connection->hostname,
connection->port)));
placementExecution->assignedSession = session; placementExecution->assignedSession = session;
@ -1433,7 +1443,8 @@ ExecutionOrderForTask(RowModifyLevel modLevel, Task *task)
case MERGE_FETCH_TASK: case MERGE_FETCH_TASK:
default: default:
{ {
elog(ERROR, "unsupported task type %d in adaptive executor", task->taskType); ereport(ERROR, (errmsg("unsupported task type %d in adaptive executor",
task->taskType)));
} }
} }
} }
@ -1443,7 +1454,7 @@ ExecutionOrderForTask(RowModifyLevel modLevel, Task *task)
* FindOrCreateWorkerPool gets the pool of connections for a particular worker. * FindOrCreateWorkerPool gets the pool of connections for a particular worker.
*/ */
static WorkerPool * static WorkerPool *
FindOrCreateWorkerPool(DistributedExecution *execution, WorkerNode *workerNode) FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int nodePort)
{ {
WorkerPool *workerPool = NULL; WorkerPool *workerPool = NULL;
ListCell *workerCell = NULL; ListCell *workerCell = NULL;
@ -1453,20 +1464,21 @@ FindOrCreateWorkerPool(DistributedExecution *execution, WorkerNode *workerNode)
{ {
workerPool = lfirst(workerCell); workerPool = lfirst(workerCell);
if (WorkerNodeCompare(workerPool->node, workerNode, 0) == 0) if (strncmp(nodeName, workerPool->nodeName, WORKER_LENGTH) == 0 &&
nodePort == workerPool->nodePort)
{ {
return workerPool; return workerPool;
} }
} }
workerPool = (WorkerPool *) palloc0(sizeof(WorkerPool)); workerPool = (WorkerPool *) palloc0(sizeof(WorkerPool));
workerPool->node = workerNode; workerPool->nodeName = pstrdup(nodeName);
workerPool->nodePort = nodePort;
workerPool->poolStartTime = 0; workerPool->poolStartTime = 0;
workerPool->distributedExecution = execution; workerPool->distributedExecution = execution;
/* "open" connections aggressively when there are cached connections */ /* "open" connections aggressively when there are cached connections */
nodeConnectionCount = NodeConnectionCount(workerNode->workerName, nodeConnectionCount = MaxCachedConnectionsPerWorker;
workerNode->workerPort);
workerPool->maxNewConnectionsPerCycle = Max(1, nodeConnectionCount); workerPool->maxNewConnectionsPerCycle = Max(1, nodeConnectionCount);
dlist_init(&workerPool->pendingTaskQueue); dlist_init(&workerPool->pendingTaskQueue);
@ -1631,8 +1643,8 @@ RunDistributedExecution(DistributedExecution *execution)
/* additional 2 is for postmaster and latch */ /* additional 2 is for postmaster and latch */
int eventSetSize = list_length(execution->sessionList) + 2; int eventSetSize = list_length(execution->sessionList) + 2;
execution->waitEventSet = BuildWaitEventSet(execution->sessionList); /* always (re)build the wait event set the first time */
events = palloc0(eventSetSize * sizeof(WaitEvent)); execution->connectionSetChanged = true;
while (execution->unfinishedTaskCount > 0 && !cancellationReceived) while (execution->unfinishedTaskCount > 0 && !cancellationReceived)
{ {
@ -1647,29 +1659,39 @@ RunDistributedExecution(DistributedExecution *execution)
ManageWorkerPool(workerPool); ManageWorkerPool(workerPool);
} }
if (execution->connectionSetChanged || execution->waitFlagsChanged) if (execution->connectionSetChanged)
{ {
FreeWaitEventSet(execution->waitEventSet); if (execution->waitEventSet != NULL)
{
FreeWaitEventSet(execution->waitEventSet);
execution->waitEventSet = NULL;
}
execution->waitEventSet = BuildWaitEventSet(execution->sessionList); if (events != NULL)
if (execution->connectionSetChanged)
{ {
/* /*
* The execution might take a while, so explicitly free at this point * The execution might take a while, so explicitly free at this point
* because we don't need anymore. * because we don't need anymore.
*/ */
pfree(events); pfree(events);
events = NULL;
/* recalculate (and allocate) since the sessions have changed */
eventSetSize = list_length(execution->sessionList) + 2;
events = palloc0(eventSetSize * sizeof(WaitEvent));
} }
execution->waitEventSet = BuildWaitEventSet(execution->sessionList);
/* recalculate (and allocate) since the sessions have changed */
eventSetSize = list_length(execution->sessionList) + 2;
events = palloc0(eventSetSize * sizeof(WaitEvent));
execution->connectionSetChanged = false; execution->connectionSetChanged = false;
execution->waitFlagsChanged = false; execution->waitFlagsChanged = false;
} }
else if (execution->waitFlagsChanged)
{
UpdateWaitEventSetFlags(execution->waitEventSet, execution->sessionList);
execution->waitFlagsChanged = false;
}
/* wait for I/O events */ /* wait for I/O events */
#if (PG_VERSION_NUM >= 100000) #if (PG_VERSION_NUM >= 100000)
@ -1716,13 +1738,22 @@ RunDistributedExecution(DistributedExecution *execution)
} }
session = (WorkerSession *) event->user_data; session = (WorkerSession *) event->user_data;
session->latestUnconsumedWaitEvents = event->events;
ConnectionStateMachine(session); ConnectionStateMachine(session);
} }
} }
pfree(events); if (events != NULL)
FreeWaitEventSet(execution->waitEventSet); {
pfree(events);
}
if (execution->waitEventSet != NULL)
{
FreeWaitEventSet(execution->waitEventSet);
execution->waitEventSet = NULL;
}
CleanUpSessions(execution); CleanUpSessions(execution);
} }
@ -1734,7 +1765,11 @@ RunDistributedExecution(DistributedExecution *execution)
*/ */
UnclaimAllSessionConnections(execution->sessionList); UnclaimAllSessionConnections(execution->sessionList);
FreeWaitEventSet(execution->waitEventSet); if (execution->waitEventSet != NULL)
{
FreeWaitEventSet(execution->waitEventSet);
execution->waitEventSet = NULL;
}
PG_RE_THROW(); PG_RE_THROW();
} }
@ -1750,7 +1785,6 @@ static void
ManageWorkerPool(WorkerPool *workerPool) ManageWorkerPool(WorkerPool *workerPool)
{ {
DistributedExecution *execution = workerPool->distributedExecution; DistributedExecution *execution = workerPool->distributedExecution;
WorkerNode *workerNode = workerPool->node;
int targetPoolSize = execution->targetPoolSize; int targetPoolSize = execution->targetPoolSize;
int initiatedConnectionCount = list_length(workerPool->sessionList); int initiatedConnectionCount = list_length(workerPool->sessionList);
int activeConnectionCount PG_USED_FOR_ASSERTS_ONLY = int activeConnectionCount PG_USED_FOR_ASSERTS_ONLY =
@ -1778,7 +1812,10 @@ ManageWorkerPool(WorkerPool *workerPool)
} }
/* we might fail the execution or warn the user about connection timeouts */ /* we might fail the execution or warn the user about connection timeouts */
CheckConnectionTimeout(workerPool); if (workerPool->checkForPoolTimeout)
{
CheckConnectionTimeout(workerPool);
}
if (failedConnectionCount >= 1) if (failedConnectionCount >= 1)
{ {
@ -1844,8 +1881,8 @@ ManageWorkerPool(WorkerPool *workerPool)
return; return;
} }
elog(DEBUG4, "opening %d new connections to %s:%d", newConnectionCount, ereport(DEBUG4, (errmsg("opening %d new connections to %s:%d", newConnectionCount,
workerNode->workerName, workerNode->workerPort); workerPool->nodeName, workerPool->nodePort)));
for (connectionIndex = 0; connectionIndex < newConnectionCount; connectionIndex++) for (connectionIndex = 0; connectionIndex < newConnectionCount; connectionIndex++)
{ {
@ -1857,8 +1894,8 @@ ManageWorkerPool(WorkerPool *workerPool)
/* open a new connection to the worker */ /* open a new connection to the worker */
connection = StartNodeUserDatabaseConnection(connectionFlags, connection = StartNodeUserDatabaseConnection(connectionFlags,
workerNode->workerName, workerPool->nodeName,
workerNode->workerPort, workerPool->nodePort,
NULL, NULL); NULL, NULL);
/* /*
@ -1952,8 +1989,8 @@ CheckConnectionTimeout(WorkerPool *workerPool)
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE), ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not establish any connections to the node " errmsg("could not establish any connections to the node "
"%s:%d after %u ms", workerPool->node->workerName, "%s:%d after %u ms", workerPool->nodeName,
workerPool->node->workerPort, workerPool->nodePort,
NodeConnectionTimeout))); NodeConnectionTimeout)));
} }
else else
@ -2108,8 +2145,10 @@ ConnectionStateMachine(WorkerSession *session)
ConnStatusType status = PQstatus(connection->pgConn); ConnStatusType status = PQstatus(connection->pgConn);
if (status == CONNECTION_OK) if (status == CONNECTION_OK)
{ {
elog(DEBUG4, "established connection to %s:%d for session %ld", ereport(DEBUG4, (errmsg("established connection to %s:%d for "
connection->hostname, connection->port, session->sessionId); "session %ld",
connection->hostname, connection->port,
session->sessionId)));
workerPool->activeConnectionCount++; workerPool->activeConnectionCount++;
workerPool->idleConnectionCount++; workerPool->idleConnectionCount++;
@ -2141,8 +2180,10 @@ ConnectionStateMachine(WorkerSession *session)
} }
else else
{ {
elog(DEBUG4, "established connection to %s:%d for session %ld", ereport(DEBUG4, (errmsg("established connection to %s:%d for "
connection->hostname, connection->port, session->sessionId); "session %ld",
connection->hostname, connection->port,
session->sessionId)));
workerPool->activeConnectionCount++; workerPool->activeConnectionCount++;
workerPool->idleConnectionCount++; workerPool->idleConnectionCount++;
@ -2567,11 +2608,13 @@ CheckConnectionReady(WorkerSession *session)
waitFlags = waitFlags | WL_SOCKET_WRITEABLE; waitFlags = waitFlags | WL_SOCKET_WRITEABLE;
} }
/* if reading fails, there's not much we can do */ if ((session->latestUnconsumedWaitEvents & WL_SOCKET_READABLE) != 0)
if (PQconsumeInput(connection->pgConn) == 0)
{ {
connection->connectionState = MULTI_CONNECTION_LOST; if (PQconsumeInput(connection->pgConn) == 0)
return false; {
connection->connectionState = MULTI_CONNECTION_LOST;
return false;
}
} }
if (!PQisBusy(connection->pgConn)) if (!PQisBusy(connection->pgConn))
@ -2581,6 +2624,9 @@ CheckConnectionReady(WorkerSession *session)
UpdateConnectionWaitFlags(session, waitFlags); UpdateConnectionWaitFlags(session, waitFlags);
/* don't consume input redundantly if we cycle back into CheckConnectionReady */
session->latestUnconsumedWaitEvents = 0;
return connectionReady; return connectionReady;
} }
@ -3413,6 +3459,7 @@ BuildWaitEventSet(List *sessionList)
WorkerSession *session = lfirst(sessionCell); WorkerSession *session = lfirst(sessionCell);
MultiConnection *connection = session->connection; MultiConnection *connection = session->connection;
int socket = 0; int socket = 0;
int waitEventSetIndex = 0;
if (connection->pgConn == NULL) if (connection->pgConn == NULL)
{ {
@ -3433,8 +3480,9 @@ BuildWaitEventSet(List *sessionList)
continue; continue;
} }
AddWaitEventToSet(waitEventSet, connection->waitFlags, socket, NULL, waitEventSetIndex = AddWaitEventToSet(waitEventSet, connection->waitFlags, socket,
(void *) session); NULL, (void *) session);
session->waitEventSetIndex = waitEventSetIndex;
} }
AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL); AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
@ -3444,6 +3492,46 @@ BuildWaitEventSet(List *sessionList)
} }
/*
* UpdateWaitEventSetFlags modifies the given waitEventSet with the wait flags
* for connections in the sessionList.
*/
static void
UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList)
{
ListCell *sessionCell = NULL;
foreach(sessionCell, sessionList)
{
WorkerSession *session = lfirst(sessionCell);
MultiConnection *connection = session->connection;
int socket = 0;
int waitEventSetIndex = session->waitEventSetIndex;
if (connection->pgConn == NULL)
{
/* connection died earlier in the transaction */
continue;
}
if (connection->waitFlags == 0)
{
/* not currently waiting for this connection */
continue;
}
socket = PQsocket(connection->pgConn);
if (socket == -1)
{
/* connection was closed */
continue;
}
ModifyWaitEvent(waitEventSet, waitEventSetIndex, connection->waitFlags, NULL);
}
}
/* /*
* SetLocalForceMaxQueryParallelization simply a C interface for * SetLocalForceMaxQueryParallelization simply a C interface for
* setting the following: * setting the following:

View File

@ -128,7 +128,6 @@ typedef struct ConnectionHashEntry
{ {
ConnectionHashKey key; ConnectionHashKey key;
dlist_head *connections; dlist_head *connections;
int connectionCount;
} ConnectionHashEntry; } ConnectionHashEntry;
/* hash entry for cached connection parameters */ /* hash entry for cached connection parameters */
@ -191,7 +190,6 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
extern void CloseConnection(MultiConnection *connection); extern void CloseConnection(MultiConnection *connection);
extern void ShutdownConnection(MultiConnection *connection); extern void ShutdownConnection(MultiConnection *connection);
extern int NodeConnectionCount(char *nodeName, int nodePort);
/* dealing with a connection */ /* dealing with a connection */
extern void FinishConnectionListEstablishment(List *multiConnectionList); extern void FinishConnectionListEstablishment(List *multiConnectionList);