mirror of https://github.com/citusdata/citus.git
use a function for duplicate code in connection state machine (#3209)
parent
a0fe8646e0
commit
2c040d2c8f
|
@ -581,6 +581,7 @@ static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *work
|
||||||
static bool StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
|
static bool StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
|
||||||
WorkerSession *session);
|
WorkerSession *session);
|
||||||
static void ConnectionStateMachine(WorkerSession *session);
|
static void ConnectionStateMachine(WorkerSession *session);
|
||||||
|
static void HandleMultiConnectionSuccess(WorkerSession *session);
|
||||||
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
|
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
|
||||||
static bool TransactionModifiedDistributedTable(DistributedExecution *execution);
|
static bool TransactionModifiedDistributedTable(DistributedExecution *execution);
|
||||||
static void TransactionStateMachine(WorkerSession *session);
|
static void TransactionStateMachine(WorkerSession *session);
|
||||||
|
@ -2334,14 +2335,7 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
ConnStatusType status = PQstatus(connection->pgConn);
|
ConnStatusType status = PQstatus(connection->pgConn);
|
||||||
if (status == CONNECTION_OK)
|
if (status == CONNECTION_OK)
|
||||||
{
|
{
|
||||||
ereport(DEBUG4, (errmsg("established connection to %s:%d for "
|
HandleMultiConnectionSuccess(session);
|
||||||
"session %ld",
|
|
||||||
connection->hostname, connection->port,
|
|
||||||
session->sessionId)));
|
|
||||||
|
|
||||||
workerPool->activeConnectionCount++;
|
|
||||||
workerPool->idleConnectionCount++;
|
|
||||||
|
|
||||||
UpdateConnectionWaitFlags(session,
|
UpdateConnectionWaitFlags(session,
|
||||||
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
||||||
|
|
||||||
|
@ -2369,14 +2363,7 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ereport(DEBUG4, (errmsg("established connection to %s:%d for "
|
HandleMultiConnectionSuccess(session);
|
||||||
"session %ld",
|
|
||||||
connection->hostname, connection->port,
|
|
||||||
session->sessionId)));
|
|
||||||
|
|
||||||
workerPool->activeConnectionCount++;
|
|
||||||
workerPool->idleConnectionCount++;
|
|
||||||
|
|
||||||
UpdateConnectionWaitFlags(session,
|
UpdateConnectionWaitFlags(session,
|
||||||
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
||||||
|
|
||||||
|
@ -2489,6 +2476,25 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* HandleMultiConnectionSuccess logs the established connection and updates connection's state.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
HandleMultiConnectionSuccess(WorkerSession *session)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = session->connection;
|
||||||
|
WorkerPool *workerPool = session->workerPool;
|
||||||
|
|
||||||
|
ereport(DEBUG4, (errmsg("established connection to %s:%d for "
|
||||||
|
"session %ld",
|
||||||
|
connection->hostname, connection->port,
|
||||||
|
session->sessionId)));
|
||||||
|
|
||||||
|
workerPool->activeConnectionCount++;
|
||||||
|
workerPool->idleConnectionCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Activate2PCIfModifyingTransactionExpandsToNewNode sets the coordinated
|
* Activate2PCIfModifyingTransactionExpandsToNewNode sets the coordinated
|
||||||
* transaction to use 2PC under the following circumstances:
|
* transaction to use 2PC under the following circumstances:
|
||||||
|
|
Loading…
Reference in New Issue