diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 0e9ddd7c1..df9717aa2 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -333,7 +333,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, connection = StartConnectionEstablishment(&key); dlist_push_tail(entry->connections, &connection->connectionNode); - entry->connectionCount++; ResetShardPlacementAssociation(connection); @@ -1065,8 +1064,6 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) cachedConnectionCount++; } } - - entry->connectionCount = cachedConnectionCount; } @@ -1172,29 +1169,3 @@ TrimLogLevel(const char *message) return chompedMessage + n; } - - -/* - * NodeConnectionCount gets the number of connections to the given node - * for the current username and database. - */ -int -NodeConnectionCount(char *hostname, int port) -{ - ConnectionHashKey key; - ConnectionHashEntry *entry = NULL; - bool found = false; - - strlcpy(key.hostname, hostname, MAX_NODE_LENGTH); - key.port = port; - strlcpy(key.user, CurrentUserName(), NAMEDATALEN); - strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN); - - entry = (ConnectionHashEntry *) hash_search(ConnectionHash, &key, HASH_FIND, &found); - if (!found) - { - return 0; - } - - return entry->connectionCount; -} diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 72b7e3ae4..49e37596c 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -290,7 +290,8 @@ typedef struct WorkerPool DistributedExecution *distributedExecution; /* worker node on which we have a pool of sessions */ - WorkerNode *node; + char *nodeName; + int nodePort; /* all sessions on the worker that are part of the current execution */ List *sessionList; @@ -386,6 +387,12 @@ typedef struct WorkerSession * distributed transaction related commands such as BEGIN/COMMIT etc. */ uint64 commandsSent; + + /* index in the wait event set */ + int waitEventSetIndex; + + /* events reported by the latest call to WaitEventSetWait */ + int latestUnconsumedWaitEvents; } WorkerSession; @@ -531,7 +538,7 @@ static void UnclaimAllSessionConnections(List *sessionList); static bool UseConnectionPerPlacement(void); static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task); static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, - WorkerNode *workerNode); + char *nodeName, int nodePort); static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection); static void ManageWorkerPool(WorkerPool *workerPool); @@ -540,6 +547,7 @@ static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime); static WaitEventSet * BuildWaitEventSet(List *sessionList); +static void UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool); @@ -1119,8 +1127,8 @@ CleanUpSessions(DistributedExecution *execution) MultiConnection *connection = session->connection; - elog(DEBUG4, "Total number of commands sent over the session %ld: %ld", - session->sessionId, session->commandsSent); + ereport(DEBUG4, (errmsg("Total number of commands sent over the session %ld: %ld", + session->sessionId, session->commandsSent))); UnclaimConnection(connection); @@ -1168,8 +1176,8 @@ CleanUpSessions(DistributedExecution *execution) * the connection and everything is cleared up. Otherwise, we'd have been * on MULTI_CONNECTION_FAILED state. */ - elog(WARNING, "unexpected transaction state at the end of execution: %d", - transactionState); + ereport(WARNING, (errmsg("unexpected transaction state at the end of " + "execution: %d", transactionState))); } /* get ready for the next executions if we need use the same connection */ @@ -1177,8 +1185,8 @@ CleanUpSessions(DistributedExecution *execution) } else { - elog(WARNING, "unexpected connection state at the end of execution: %d", - connection->connectionState); + ereport(WARNING, (errmsg("unexpected connection state at the end of " + "execution: %d", connection->connectionState))); } } } @@ -1251,9 +1259,10 @@ AssignTasksToConnections(DistributedExecution *execution) MultiConnection *connection = NULL; int connectionFlags = 0; TaskPlacementExecution *placementExecution = NULL; - WorkerNode *node = FindWorkerNode(taskPlacement->nodeName, - taskPlacement->nodePort); - WorkerPool *workerPool = FindOrCreateWorkerPool(execution, node); + char *nodeName = taskPlacement->nodeName; + int nodePort = taskPlacement->nodePort; + WorkerPool *workerPool = FindOrCreateWorkerPool(execution, nodeName, + nodePort); /* * Execution of a command on a shard placement, which may not always @@ -1299,8 +1308,9 @@ AssignTasksToConnections(DistributedExecution *execution) WorkerSession *session = FindOrCreateWorkerSession(workerPool, connection); - elog(DEBUG4, "Session %ld (%s:%d) has an assigned task", - session->sessionId, connection->hostname, connection->port); + ereport(DEBUG4, (errmsg("Session %ld (%s:%d) has an assigned task", + session->sessionId, connection->hostname, + connection->port))); placementExecution->assignedSession = session; @@ -1433,7 +1443,8 @@ ExecutionOrderForTask(RowModifyLevel modLevel, Task *task) case MERGE_FETCH_TASK: default: { - elog(ERROR, "unsupported task type %d in adaptive executor", task->taskType); + ereport(ERROR, (errmsg("unsupported task type %d in adaptive executor", + task->taskType))); } } } @@ -1443,7 +1454,7 @@ ExecutionOrderForTask(RowModifyLevel modLevel, Task *task) * FindOrCreateWorkerPool gets the pool of connections for a particular worker. */ static WorkerPool * -FindOrCreateWorkerPool(DistributedExecution *execution, WorkerNode *workerNode) +FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int nodePort) { WorkerPool *workerPool = NULL; ListCell *workerCell = NULL; @@ -1453,20 +1464,21 @@ FindOrCreateWorkerPool(DistributedExecution *execution, WorkerNode *workerNode) { workerPool = lfirst(workerCell); - if (WorkerNodeCompare(workerPool->node, workerNode, 0) == 0) + if (strncmp(nodeName, workerPool->nodeName, WORKER_LENGTH) == 0 && + nodePort == workerPool->nodePort) { return workerPool; } } workerPool = (WorkerPool *) palloc0(sizeof(WorkerPool)); - workerPool->node = workerNode; + workerPool->nodeName = pstrdup(nodeName); + workerPool->nodePort = nodePort; workerPool->poolStartTime = 0; workerPool->distributedExecution = execution; /* "open" connections aggressively when there are cached connections */ - nodeConnectionCount = NodeConnectionCount(workerNode->workerName, - workerNode->workerPort); + nodeConnectionCount = MaxCachedConnectionsPerWorker; workerPool->maxNewConnectionsPerCycle = Max(1, nodeConnectionCount); dlist_init(&workerPool->pendingTaskQueue); @@ -1631,8 +1643,8 @@ RunDistributedExecution(DistributedExecution *execution) /* additional 2 is for postmaster and latch */ int eventSetSize = list_length(execution->sessionList) + 2; - execution->waitEventSet = BuildWaitEventSet(execution->sessionList); - events = palloc0(eventSetSize * sizeof(WaitEvent)); + /* always (re)build the wait event set the first time */ + execution->connectionSetChanged = true; while (execution->unfinishedTaskCount > 0 && !cancellationReceived) { @@ -1647,29 +1659,39 @@ RunDistributedExecution(DistributedExecution *execution) ManageWorkerPool(workerPool); } - if (execution->connectionSetChanged || execution->waitFlagsChanged) + if (execution->connectionSetChanged) { - FreeWaitEventSet(execution->waitEventSet); + if (execution->waitEventSet != NULL) + { + FreeWaitEventSet(execution->waitEventSet); + execution->waitEventSet = NULL; + } - execution->waitEventSet = BuildWaitEventSet(execution->sessionList); - - if (execution->connectionSetChanged) + if (events != NULL) { /* * The execution might take a while, so explicitly free at this point * because we don't need anymore. */ pfree(events); - - /* recalculate (and allocate) since the sessions have changed */ - eventSetSize = list_length(execution->sessionList) + 2; - - events = palloc0(eventSetSize * sizeof(WaitEvent)); + events = NULL; } + execution->waitEventSet = BuildWaitEventSet(execution->sessionList); + + /* recalculate (and allocate) since the sessions have changed */ + eventSetSize = list_length(execution->sessionList) + 2; + + events = palloc0(eventSetSize * sizeof(WaitEvent)); + execution->connectionSetChanged = false; execution->waitFlagsChanged = false; } + else if (execution->waitFlagsChanged) + { + UpdateWaitEventSetFlags(execution->waitEventSet, execution->sessionList); + execution->waitFlagsChanged = false; + } /* wait for I/O events */ #if (PG_VERSION_NUM >= 100000) @@ -1716,13 +1738,22 @@ RunDistributedExecution(DistributedExecution *execution) } session = (WorkerSession *) event->user_data; + session->latestUnconsumedWaitEvents = event->events; ConnectionStateMachine(session); } } - pfree(events); - FreeWaitEventSet(execution->waitEventSet); + if (events != NULL) + { + pfree(events); + } + + if (execution->waitEventSet != NULL) + { + FreeWaitEventSet(execution->waitEventSet); + execution->waitEventSet = NULL; + } CleanUpSessions(execution); } @@ -1734,7 +1765,11 @@ RunDistributedExecution(DistributedExecution *execution) */ UnclaimAllSessionConnections(execution->sessionList); - FreeWaitEventSet(execution->waitEventSet); + if (execution->waitEventSet != NULL) + { + FreeWaitEventSet(execution->waitEventSet); + execution->waitEventSet = NULL; + } PG_RE_THROW(); } @@ -1750,7 +1785,6 @@ static void ManageWorkerPool(WorkerPool *workerPool) { DistributedExecution *execution = workerPool->distributedExecution; - WorkerNode *workerNode = workerPool->node; int targetPoolSize = execution->targetPoolSize; int initiatedConnectionCount = list_length(workerPool->sessionList); int activeConnectionCount PG_USED_FOR_ASSERTS_ONLY = @@ -1778,7 +1812,10 @@ ManageWorkerPool(WorkerPool *workerPool) } /* we might fail the execution or warn the user about connection timeouts */ - CheckConnectionTimeout(workerPool); + if (workerPool->checkForPoolTimeout) + { + CheckConnectionTimeout(workerPool); + } if (failedConnectionCount >= 1) { @@ -1844,8 +1881,8 @@ ManageWorkerPool(WorkerPool *workerPool) return; } - elog(DEBUG4, "opening %d new connections to %s:%d", newConnectionCount, - workerNode->workerName, workerNode->workerPort); + ereport(DEBUG4, (errmsg("opening %d new connections to %s:%d", newConnectionCount, + workerPool->nodeName, workerPool->nodePort))); for (connectionIndex = 0; connectionIndex < newConnectionCount; connectionIndex++) { @@ -1857,8 +1894,8 @@ ManageWorkerPool(WorkerPool *workerPool) /* open a new connection to the worker */ connection = StartNodeUserDatabaseConnection(connectionFlags, - workerNode->workerName, - workerNode->workerPort, + workerPool->nodeName, + workerPool->nodePort, NULL, NULL); /* @@ -1952,8 +1989,8 @@ CheckConnectionTimeout(WorkerPool *workerPool) ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not establish any connections to the node " - "%s:%d after %u ms", workerPool->node->workerName, - workerPool->node->workerPort, + "%s:%d after %u ms", workerPool->nodeName, + workerPool->nodePort, NodeConnectionTimeout))); } else @@ -2108,8 +2145,10 @@ ConnectionStateMachine(WorkerSession *session) ConnStatusType status = PQstatus(connection->pgConn); if (status == CONNECTION_OK) { - elog(DEBUG4, "established connection to %s:%d for session %ld", - connection->hostname, connection->port, session->sessionId); + ereport(DEBUG4, (errmsg("established connection to %s:%d for " + "session %ld", + connection->hostname, connection->port, + session->sessionId))); workerPool->activeConnectionCount++; workerPool->idleConnectionCount++; @@ -2141,8 +2180,10 @@ ConnectionStateMachine(WorkerSession *session) } else { - elog(DEBUG4, "established connection to %s:%d for session %ld", - connection->hostname, connection->port, session->sessionId); + ereport(DEBUG4, (errmsg("established connection to %s:%d for " + "session %ld", + connection->hostname, connection->port, + session->sessionId))); workerPool->activeConnectionCount++; workerPool->idleConnectionCount++; @@ -2567,11 +2608,13 @@ CheckConnectionReady(WorkerSession *session) waitFlags = waitFlags | WL_SOCKET_WRITEABLE; } - /* if reading fails, there's not much we can do */ - if (PQconsumeInput(connection->pgConn) == 0) + if ((session->latestUnconsumedWaitEvents & WL_SOCKET_READABLE) != 0) { - connection->connectionState = MULTI_CONNECTION_LOST; - return false; + if (PQconsumeInput(connection->pgConn) == 0) + { + connection->connectionState = MULTI_CONNECTION_LOST; + return false; + } } if (!PQisBusy(connection->pgConn)) @@ -2581,6 +2624,9 @@ CheckConnectionReady(WorkerSession *session) UpdateConnectionWaitFlags(session, waitFlags); + /* don't consume input redundantly if we cycle back into CheckConnectionReady */ + session->latestUnconsumedWaitEvents = 0; + return connectionReady; } @@ -3413,6 +3459,7 @@ BuildWaitEventSet(List *sessionList) WorkerSession *session = lfirst(sessionCell); MultiConnection *connection = session->connection; int socket = 0; + int waitEventSetIndex = 0; if (connection->pgConn == NULL) { @@ -3433,8 +3480,9 @@ BuildWaitEventSet(List *sessionList) continue; } - AddWaitEventToSet(waitEventSet, connection->waitFlags, socket, NULL, - (void *) session); + waitEventSetIndex = AddWaitEventToSet(waitEventSet, connection->waitFlags, socket, + NULL, (void *) session); + session->waitEventSetIndex = waitEventSetIndex; } AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL); @@ -3444,6 +3492,46 @@ BuildWaitEventSet(List *sessionList) } +/* + * UpdateWaitEventSetFlags modifies the given waitEventSet with the wait flags + * for connections in the sessionList. + */ +static void +UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) +{ + ListCell *sessionCell = NULL; + + foreach(sessionCell, sessionList) + { + WorkerSession *session = lfirst(sessionCell); + MultiConnection *connection = session->connection; + int socket = 0; + int waitEventSetIndex = session->waitEventSetIndex; + + if (connection->pgConn == NULL) + { + /* connection died earlier in the transaction */ + continue; + } + + if (connection->waitFlags == 0) + { + /* not currently waiting for this connection */ + continue; + } + + socket = PQsocket(connection->pgConn); + if (socket == -1) + { + /* connection was closed */ + continue; + } + + ModifyWaitEvent(waitEventSet, waitEventSetIndex, connection->waitFlags, NULL); + } +} + + /* * SetLocalForceMaxQueryParallelization simply a C interface for * setting the following: diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index d7396067e..27d7f8b32 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -128,7 +128,6 @@ typedef struct ConnectionHashEntry { ConnectionHashKey key; dlist_head *connections; - int connectionCount; } ConnectionHashEntry; /* hash entry for cached connection parameters */ @@ -191,7 +190,6 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); extern void CloseConnection(MultiConnection *connection); extern void ShutdownConnection(MultiConnection *connection); -extern int NodeConnectionCount(char *nodeName, int nodePort); /* dealing with a connection */ extern void FinishConnectionListEstablishment(List *multiConnectionList);