From 71ad5c095bfc199eb0c40cb0963925131841d602 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sun, 21 Jul 2019 01:35:04 +0200 Subject: [PATCH 1/7] Use ModifyWaitEvent when only wait flags changed --- .../distributed/executor/adaptive_executor.c | 76 +++++++++++++++---- 1 file changed, 62 insertions(+), 14 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 72b7e3ae4..ffab29354 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -386,6 +386,9 @@ typedef struct WorkerSession * distributed transaction related commands such as BEGIN/COMMIT etc. */ uint64 commandsSent; + + /* index in the wait event set */ + int waitEventSetIndex; } WorkerSession; @@ -540,6 +543,7 @@ static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime); static WaitEventSet * BuildWaitEventSet(List *sessionList); +static void UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool); @@ -1647,29 +1651,31 @@ RunDistributedExecution(DistributedExecution *execution) ManageWorkerPool(workerPool); } - if (execution->connectionSetChanged || execution->waitFlagsChanged) + if (execution->connectionSetChanged) { FreeWaitEventSet(execution->waitEventSet); execution->waitEventSet = BuildWaitEventSet(execution->sessionList); - if (execution->connectionSetChanged) - { - /* - * The execution might take a while, so explicitly free at this point - * because we don't need anymore. - */ - pfree(events); + /* + * The execution might take a while, so explicitly free at this point + * because we don't need anymore. + */ + pfree(events); - /* recalculate (and allocate) since the sessions have changed */ - eventSetSize = list_length(execution->sessionList) + 2; + /* recalculate (and allocate) since the sessions have changed */ + eventSetSize = list_length(execution->sessionList) + 2; - events = palloc0(eventSetSize * sizeof(WaitEvent)); - } + events = palloc0(eventSetSize * sizeof(WaitEvent)); execution->connectionSetChanged = false; execution->waitFlagsChanged = false; } + else if (execution->waitFlagsChanged) + { + UpdateWaitEventSetFlags(execution->waitEventSet, execution->sessionList); + execution->waitFlagsChanged = false; + } /* wait for I/O events */ #if (PG_VERSION_NUM >= 100000) @@ -3413,6 +3419,7 @@ BuildWaitEventSet(List *sessionList) WorkerSession *session = lfirst(sessionCell); MultiConnection *connection = session->connection; int socket = 0; + int waitEventSetIndex = 0; if (connection->pgConn == NULL) { @@ -3433,8 +3440,9 @@ BuildWaitEventSet(List *sessionList) continue; } - AddWaitEventToSet(waitEventSet, connection->waitFlags, socket, NULL, - (void *) session); + waitEventSetIndex = AddWaitEventToSet(waitEventSet, connection->waitFlags, socket, + NULL, (void *) session); + session->waitEventSetIndex = waitEventSetIndex; } AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL); @@ -3444,6 +3452,46 @@ BuildWaitEventSet(List *sessionList) } +/* + * UpdateWaitEventSetFlags modifies the given waitEventSet with the wait flags + * for connections in the sessionList. + */ +static void +UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) +{ + ListCell *sessionCell = NULL; + + foreach(sessionCell, sessionList) + { + WorkerSession *session = lfirst(sessionCell); + MultiConnection *connection = session->connection; + int socket = 0; + int waitEventSetIndex = session->waitEventSetIndex; + + if (connection->pgConn == NULL) + { + /* connection died earlier in the transaction */ + continue; + } + + if (connection->waitFlags == 0) + { + /* not currently waiting for this connection */ + continue; + } + + socket = PQsocket(connection->pgConn); + if (socket == -1) + { + /* connection was closed */ + continue; + } + + ModifyWaitEvent(waitEventSet, waitEventSetIndex, connection->waitFlags, NULL); + } +} + + /* * SetLocalForceMaxQueryParallelization simply a C interface for * setting the following: From 32e7a80960616b987166fc0e0b37e4d2520656af Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sun, 21 Jul 2019 01:52:44 +0200 Subject: [PATCH 2/7] Avoid unnecessary calls to PQconsumeInput --- .../distributed/executor/adaptive_executor.c | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ffab29354..093ad22f5 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -389,6 +389,9 @@ typedef struct WorkerSession /* index in the wait event set */ int waitEventSetIndex; + + /* events reported by the latest call to WaitEventSetWait */ + int latestUnconsumedWaitEvents; } WorkerSession; @@ -1722,6 +1725,7 @@ RunDistributedExecution(DistributedExecution *execution) } session = (WorkerSession *) event->user_data; + session->latestUnconsumedWaitEvents = event->events; ConnectionStateMachine(session); } @@ -2573,11 +2577,13 @@ CheckConnectionReady(WorkerSession *session) waitFlags = waitFlags | WL_SOCKET_WRITEABLE; } - /* if reading fails, there's not much we can do */ - if (PQconsumeInput(connection->pgConn) == 0) + if ((session->latestUnconsumedWaitEvents & WL_SOCKET_READABLE) != 0) { - connection->connectionState = MULTI_CONNECTION_LOST; - return false; + if (PQconsumeInput(connection->pgConn) == 0) + { + connection->connectionState = MULTI_CONNECTION_LOST; + return false; + } } if (!PQisBusy(connection->pgConn)) @@ -2587,6 +2593,9 @@ CheckConnectionReady(WorkerSession *session) UpdateConnectionWaitFlags(session, waitFlags); + /* don't consume input redundantly if we cycle back into CheckConnectionReady */ + session->latestUnconsumedWaitEvents = 0; + return connectionReady; } From 4c0c33365e61a451402c27beff177552fcbda4f0 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sun, 21 Jul 2019 01:57:49 +0200 Subject: [PATCH 3/7] Avoid creating a redundant event set at the start --- .../distributed/executor/adaptive_executor.c | 44 ++++++++++++++----- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 093ad22f5..f7b9eabf7 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1638,8 +1638,8 @@ RunDistributedExecution(DistributedExecution *execution) /* additional 2 is for postmaster and latch */ int eventSetSize = list_length(execution->sessionList) + 2; - execution->waitEventSet = BuildWaitEventSet(execution->sessionList); - events = palloc0(eventSetSize * sizeof(WaitEvent)); + /* always (re)build the wait event set the first time */ + execution->connectionSetChanged = true; while (execution->unfinishedTaskCount > 0 && !cancellationReceived) { @@ -1656,16 +1656,24 @@ RunDistributedExecution(DistributedExecution *execution) if (execution->connectionSetChanged) { - FreeWaitEventSet(execution->waitEventSet); + if (execution->waitEventSet != NULL) + { + FreeWaitEventSet(execution->waitEventSet); + execution->waitEventSet = NULL; + } + + if (events != NULL) + { + /* + * The execution might take a while, so explicitly free at this point + * because we don't need anymore. + */ + pfree(events); + events = NULL; + } execution->waitEventSet = BuildWaitEventSet(execution->sessionList); - /* - * The execution might take a while, so explicitly free at this point - * because we don't need anymore. - */ - pfree(events); - /* recalculate (and allocate) since the sessions have changed */ eventSetSize = list_length(execution->sessionList) + 2; @@ -1731,8 +1739,16 @@ RunDistributedExecution(DistributedExecution *execution) } } - pfree(events); - FreeWaitEventSet(execution->waitEventSet); + if (events != NULL) + { + pfree(events); + } + + if (execution->waitEventSet != NULL) + { + FreeWaitEventSet(execution->waitEventSet); + execution->waitEventSet = NULL; + } CleanUpSessions(execution); } @@ -1744,7 +1760,11 @@ RunDistributedExecution(DistributedExecution *execution) */ UnclaimAllSessionConnections(execution->sessionList); - FreeWaitEventSet(execution->waitEventSet); + if (execution->waitEventSet != NULL) + { + FreeWaitEventSet(execution->waitEventSet); + execution->waitEventSet = NULL; + } PG_RE_THROW(); } From 4444d92dbceabbab046b13711f123e053b71dee2 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sun, 21 Jul 2019 14:50:43 +0200 Subject: [PATCH 4/7] Set initial pool size to cached connection count --- .../connection/connection_management.c | 29 ------------------- .../distributed/executor/adaptive_executor.c | 3 +- .../distributed/connection_management.h | 2 -- 3 files changed, 1 insertion(+), 33 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 0e9ddd7c1..df9717aa2 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -333,7 +333,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, connection = StartConnectionEstablishment(&key); dlist_push_tail(entry->connections, &connection->connectionNode); - entry->connectionCount++; ResetShardPlacementAssociation(connection); @@ -1065,8 +1064,6 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) cachedConnectionCount++; } } - - entry->connectionCount = cachedConnectionCount; } @@ -1172,29 +1169,3 @@ TrimLogLevel(const char *message) return chompedMessage + n; } - - -/* - * NodeConnectionCount gets the number of connections to the given node - * for the current username and database. - */ -int -NodeConnectionCount(char *hostname, int port) -{ - ConnectionHashKey key; - ConnectionHashEntry *entry = NULL; - bool found = false; - - strlcpy(key.hostname, hostname, MAX_NODE_LENGTH); - key.port = port; - strlcpy(key.user, CurrentUserName(), NAMEDATALEN); - strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN); - - entry = (ConnectionHashEntry *) hash_search(ConnectionHash, &key, HASH_FIND, &found); - if (!found) - { - return 0; - } - - return entry->connectionCount; -} diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index f7b9eabf7..815de8cc6 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1472,8 +1472,7 @@ FindOrCreateWorkerPool(DistributedExecution *execution, WorkerNode *workerNode) workerPool->distributedExecution = execution; /* "open" connections aggressively when there are cached connections */ - nodeConnectionCount = NodeConnectionCount(workerNode->workerName, - workerNode->workerPort); + nodeConnectionCount = MaxCachedConnectionsPerWorker; workerPool->maxNewConnectionsPerCycle = Max(1, nodeConnectionCount); dlist_init(&workerPool->pendingTaskQueue); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index d7396067e..27d7f8b32 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -128,7 +128,6 @@ typedef struct ConnectionHashEntry { ConnectionHashKey key; dlist_head *connections; - int connectionCount; } ConnectionHashEntry; /* hash entry for cached connection parameters */ @@ -191,7 +190,6 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); extern void CloseConnection(MultiConnection *connection); extern void ShutdownConnection(MultiConnection *connection); -extern int NodeConnectionCount(char *nodeName, int nodePort); /* dealing with a connection */ extern void FinishConnectionListEstablishment(List *multiConnectionList); From a3811b1e5517b8c0c5ae3a25f8753c4d9768331b Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sun, 21 Jul 2019 16:44:47 +0200 Subject: [PATCH 5/7] Avoid FindWorkerNode calls in adaptive executor --- .../distributed/executor/adaptive_executor.c | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 815de8cc6..dfbcae051 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -290,7 +290,8 @@ typedef struct WorkerPool DistributedExecution *distributedExecution; /* worker node on which we have a pool of sessions */ - WorkerNode *node; + char *nodeName; + int nodePort; /* all sessions on the worker that are part of the current execution */ List *sessionList; @@ -537,7 +538,7 @@ static void UnclaimAllSessionConnections(List *sessionList); static bool UseConnectionPerPlacement(void); static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task); static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, - WorkerNode *workerNode); + char *nodeName, int nodePort); static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection); static void ManageWorkerPool(WorkerPool *workerPool); @@ -1258,9 +1259,10 @@ AssignTasksToConnections(DistributedExecution *execution) MultiConnection *connection = NULL; int connectionFlags = 0; TaskPlacementExecution *placementExecution = NULL; - WorkerNode *node = FindWorkerNode(taskPlacement->nodeName, - taskPlacement->nodePort); - WorkerPool *workerPool = FindOrCreateWorkerPool(execution, node); + char *nodeName = taskPlacement->nodeName; + int nodePort = taskPlacement->nodePort; + WorkerPool *workerPool = FindOrCreateWorkerPool(execution, nodeName, + nodePort); /* * Execution of a command on a shard placement, which may not always @@ -1450,7 +1452,7 @@ ExecutionOrderForTask(RowModifyLevel modLevel, Task *task) * FindOrCreateWorkerPool gets the pool of connections for a particular worker. */ static WorkerPool * -FindOrCreateWorkerPool(DistributedExecution *execution, WorkerNode *workerNode) +FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int nodePort) { WorkerPool *workerPool = NULL; ListCell *workerCell = NULL; @@ -1460,14 +1462,16 @@ FindOrCreateWorkerPool(DistributedExecution *execution, WorkerNode *workerNode) { workerPool = lfirst(workerCell); - if (WorkerNodeCompare(workerPool->node, workerNode, 0) == 0) + if (strncmp(nodeName, workerPool->nodeName, WORKER_LENGTH) == 0 && + nodePort == workerPool->nodePort) { return workerPool; } } workerPool = (WorkerPool *) palloc0(sizeof(WorkerPool)); - workerPool->node = workerNode; + workerPool->nodeName = pstrdup(nodeName); + workerPool->nodePort = nodePort; workerPool->poolStartTime = 0; workerPool->distributedExecution = execution; @@ -1779,7 +1783,6 @@ static void ManageWorkerPool(WorkerPool *workerPool) { DistributedExecution *execution = workerPool->distributedExecution; - WorkerNode *workerNode = workerPool->node; int targetPoolSize = execution->targetPoolSize; int initiatedConnectionCount = list_length(workerPool->sessionList); int activeConnectionCount PG_USED_FOR_ASSERTS_ONLY = @@ -1874,7 +1877,7 @@ ManageWorkerPool(WorkerPool *workerPool) } elog(DEBUG4, "opening %d new connections to %s:%d", newConnectionCount, - workerNode->workerName, workerNode->workerPort); + workerPool->nodeName, workerPool->nodePort); for (connectionIndex = 0; connectionIndex < newConnectionCount; connectionIndex++) { @@ -1886,8 +1889,8 @@ ManageWorkerPool(WorkerPool *workerPool) /* open a new connection to the worker */ connection = StartNodeUserDatabaseConnection(connectionFlags, - workerNode->workerName, - workerNode->workerPort, + workerPool->nodeName, + workerPool->nodePort, NULL, NULL); /* @@ -1981,8 +1984,8 @@ CheckConnectionTimeout(WorkerPool *workerPool) ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not establish any connections to the node " - "%s:%d after %u ms", workerPool->node->workerName, - workerPool->node->workerPort, + "%s:%d after %u ms", workerPool->nodeName, + workerPool->nodePort, NodeConnectionTimeout))); } else From bd111366b044a3028d13de16db606b7b42a866c3 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 22 Jul 2019 17:12:58 +0200 Subject: [PATCH 6/7] Skip CheckConnectionTimeout when checkForPoolTimeout is false --- src/backend/distributed/executor/adaptive_executor.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index dfbcae051..8619f8d86 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1810,7 +1810,10 @@ ManageWorkerPool(WorkerPool *workerPool) } /* we might fail the execution or warn the user about connection timeouts */ - CheckConnectionTimeout(workerPool); + if (workerPool->checkForPoolTimeout) + { + CheckConnectionTimeout(workerPool); + } if (failedConnectionCount >= 1) { From e2bc09838e9e08f8beac77fd63acc474aea510c1 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 22 Jul 2019 18:45:12 +0200 Subject: [PATCH 7/7] Use ereport instead of elog in adaptive executor --- .../distributed/executor/adaptive_executor.c | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 8619f8d86..49e37596c 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1127,8 +1127,8 @@ CleanUpSessions(DistributedExecution *execution) MultiConnection *connection = session->connection; - elog(DEBUG4, "Total number of commands sent over the session %ld: %ld", - session->sessionId, session->commandsSent); + ereport(DEBUG4, (errmsg("Total number of commands sent over the session %ld: %ld", + session->sessionId, session->commandsSent))); UnclaimConnection(connection); @@ -1176,8 +1176,8 @@ CleanUpSessions(DistributedExecution *execution) * the connection and everything is cleared up. Otherwise, we'd have been * on MULTI_CONNECTION_FAILED state. */ - elog(WARNING, "unexpected transaction state at the end of execution: %d", - transactionState); + ereport(WARNING, (errmsg("unexpected transaction state at the end of " + "execution: %d", transactionState))); } /* get ready for the next executions if we need use the same connection */ @@ -1185,8 +1185,8 @@ CleanUpSessions(DistributedExecution *execution) } else { - elog(WARNING, "unexpected connection state at the end of execution: %d", - connection->connectionState); + ereport(WARNING, (errmsg("unexpected connection state at the end of " + "execution: %d", connection->connectionState))); } } } @@ -1308,8 +1308,9 @@ AssignTasksToConnections(DistributedExecution *execution) WorkerSession *session = FindOrCreateWorkerSession(workerPool, connection); - elog(DEBUG4, "Session %ld (%s:%d) has an assigned task", - session->sessionId, connection->hostname, connection->port); + ereport(DEBUG4, (errmsg("Session %ld (%s:%d) has an assigned task", + session->sessionId, connection->hostname, + connection->port))); placementExecution->assignedSession = session; @@ -1442,7 +1443,8 @@ ExecutionOrderForTask(RowModifyLevel modLevel, Task *task) case MERGE_FETCH_TASK: default: { - elog(ERROR, "unsupported task type %d in adaptive executor", task->taskType); + ereport(ERROR, (errmsg("unsupported task type %d in adaptive executor", + task->taskType))); } } } @@ -1879,8 +1881,8 @@ ManageWorkerPool(WorkerPool *workerPool) return; } - elog(DEBUG4, "opening %d new connections to %s:%d", newConnectionCount, - workerPool->nodeName, workerPool->nodePort); + ereport(DEBUG4, (errmsg("opening %d new connections to %s:%d", newConnectionCount, + workerPool->nodeName, workerPool->nodePort))); for (connectionIndex = 0; connectionIndex < newConnectionCount; connectionIndex++) { @@ -2143,8 +2145,10 @@ ConnectionStateMachine(WorkerSession *session) ConnStatusType status = PQstatus(connection->pgConn); if (status == CONNECTION_OK) { - elog(DEBUG4, "established connection to %s:%d for session %ld", - connection->hostname, connection->port, session->sessionId); + ereport(DEBUG4, (errmsg("established connection to %s:%d for " + "session %ld", + connection->hostname, connection->port, + session->sessionId))); workerPool->activeConnectionCount++; workerPool->idleConnectionCount++; @@ -2176,8 +2180,10 @@ ConnectionStateMachine(WorkerSession *session) } else { - elog(DEBUG4, "established connection to %s:%d for session %ld", - connection->hostname, connection->port, session->sessionId); + ereport(DEBUG4, (errmsg("established connection to %s:%d for " + "session %ld", + connection->hostname, connection->port, + session->sessionId))); workerPool->activeConnectionCount++; workerPool->idleConnectionCount++;