From 2c040d2c8f5f06437b92896fb90cb2a030b0a571 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Thu, 12 Dec 2019 17:55:38 +0300 Subject: [PATCH] use a function for duplicate code in connection state machine (#3209) --- .../distributed/executor/adaptive_executor.c | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index fe1bdb9aa..edbef8821 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -581,6 +581,7 @@ static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *work static bool StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, WorkerSession *session); static void ConnectionStateMachine(WorkerSession *session); +static void HandleMultiConnectionSuccess(WorkerSession *session); static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session); static bool TransactionModifiedDistributedTable(DistributedExecution *execution); static void TransactionStateMachine(WorkerSession *session); @@ -2334,14 +2335,7 @@ ConnectionStateMachine(WorkerSession *session) ConnStatusType status = PQstatus(connection->pgConn); if (status == CONNECTION_OK) { - ereport(DEBUG4, (errmsg("established connection to %s:%d for " - "session %ld", - connection->hostname, connection->port, - session->sessionId))); - - workerPool->activeConnectionCount++; - workerPool->idleConnectionCount++; - + HandleMultiConnectionSuccess(session); UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); @@ -2369,14 +2363,7 @@ ConnectionStateMachine(WorkerSession *session) } else { - ereport(DEBUG4, (errmsg("established connection to %s:%d for " - "session %ld", - connection->hostname, connection->port, - session->sessionId))); - - workerPool->activeConnectionCount++; - workerPool->idleConnectionCount++; - + HandleMultiConnectionSuccess(session); UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); @@ -2489,6 +2476,25 @@ ConnectionStateMachine(WorkerSession *session) } +/* + * HandleMultiConnectionSuccess logs the established connection and updates connection's state. + */ +static void +HandleMultiConnectionSuccess(WorkerSession *session) +{ + MultiConnection *connection = session->connection; + WorkerPool *workerPool = session->workerPool; + + ereport(DEBUG4, (errmsg("established connection to %s:%d for " + "session %ld", + connection->hostname, connection->port, + session->sessionId))); + + workerPool->activeConnectionCount++; + workerPool->idleConnectionCount++; +} + + /* * Activate2PCIfModifyingTransactionExpandsToNewNode sets the coordinated * transaction to use 2PC under the following circumstances: