mirror of https://github.com/citusdata/citus.git
Merge pull request #2656 from citusdata/get_ready_for_unified_executor
Rename MultiConnectionState to MultiConnectionPollStatepull/2657/head
commit
085b3dd6cb
|
@ -53,21 +53,21 @@ enum MultiConnectionPhase
|
||||||
MULTI_CONNECTION_PHASE_CONNECTED,
|
MULTI_CONNECTION_PHASE_CONNECTED,
|
||||||
MULTI_CONNECTION_PHASE_ERROR,
|
MULTI_CONNECTION_PHASE_ERROR,
|
||||||
};
|
};
|
||||||
typedef struct MultiConnectionState
|
typedef struct MultiConnectionPollState
|
||||||
{
|
{
|
||||||
enum MultiConnectionPhase phase;
|
enum MultiConnectionPhase phase;
|
||||||
MultiConnection *connection;
|
MultiConnection *connection;
|
||||||
PostgresPollingStatusType pollmode;
|
PostgresPollingStatusType pollmode;
|
||||||
} MultiConnectionState;
|
} MultiConnectionPollState;
|
||||||
|
|
||||||
|
|
||||||
/* helper functions for async connection management */
|
/* helper functions for async connection management */
|
||||||
static bool MultiConnectionStatePoll(MultiConnectionState *connectionState);
|
static bool MultiConnectionStatePoll(MultiConnectionPollState *connectionState);
|
||||||
static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections,
|
static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections,
|
||||||
int *waitCount);
|
int *waitCount);
|
||||||
static long DeadlineTimestampTzToTimeout(TimestampTz deadline);
|
static long DeadlineTimestampTzToTimeout(TimestampTz deadline);
|
||||||
static void CloseNotReadyMultiConnectionStates(List *connectionStates);
|
static void CloseNotReadyMultiConnectionStates(List *connectionStates);
|
||||||
static uint32 MultiConnectionStateEventMask(MultiConnectionState *connectionState);
|
static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState);
|
||||||
|
|
||||||
|
|
||||||
static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
|
static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
|
||||||
|
@ -482,10 +482,10 @@ ShutdownConnection(MultiConnection *connection)
|
||||||
/*
|
/*
|
||||||
* MultiConnectionStatePoll executes a PQconnectPoll on the connection to progres the
|
* MultiConnectionStatePoll executes a PQconnectPoll on the connection to progres the
|
||||||
* connection establishment. The return value of this function indicates if the
|
* connection establishment. The return value of this function indicates if the
|
||||||
* MultiConnectionState has been changed, which could require a change to the WaitEventSet
|
* MultiConnectionPollState has been changed, which could require a change to the WaitEventSet
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
MultiConnectionStatePoll(MultiConnectionState *connectionState)
|
MultiConnectionStatePoll(MultiConnectionPollState *connectionState)
|
||||||
{
|
{
|
||||||
MultiConnection *connection = connectionState->connection;
|
MultiConnection *connection = connectionState->connection;
|
||||||
ConnStatusType status = PQstatus(connection->pgConn);
|
ConnStatusType status = PQstatus(connection->pgConn);
|
||||||
|
@ -583,7 +583,7 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount)
|
||||||
|
|
||||||
foreach(connectionCell, connections)
|
foreach(connectionCell, connections)
|
||||||
{
|
{
|
||||||
MultiConnectionState *connectionState = (MultiConnectionState *) lfirst(
|
MultiConnectionPollState *connectionState = (MultiConnectionPollState *) lfirst(
|
||||||
connectionCell);
|
connectionCell);
|
||||||
int socket = 0;
|
int socket = 0;
|
||||||
int eventMask = 0;
|
int eventMask = 0;
|
||||||
|
@ -623,7 +623,7 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount)
|
||||||
* returned in its last invocation
|
* returned in its last invocation
|
||||||
*/
|
*/
|
||||||
static uint32
|
static uint32
|
||||||
MultiConnectionStateEventMask(MultiConnectionState *connectionState)
|
MultiConnectionStateEventMask(MultiConnectionPollState *connectionState)
|
||||||
{
|
{
|
||||||
uint32 eventMask = 0;
|
uint32 eventMask = 0;
|
||||||
if (connectionState->pollmode == PGRES_POLLING_READING)
|
if (connectionState->pollmode == PGRES_POLLING_READING)
|
||||||
|
@ -661,14 +661,15 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
||||||
foreach(multiConnectionCell, multiConnectionList)
|
foreach(multiConnectionCell, multiConnectionList)
|
||||||
{
|
{
|
||||||
MultiConnection *connection = (MultiConnection *) lfirst(multiConnectionCell);
|
MultiConnection *connection = (MultiConnection *) lfirst(multiConnectionCell);
|
||||||
MultiConnectionState *connectionState = palloc0(sizeof(MultiConnectionState));
|
MultiConnectionPollState *connectionState =
|
||||||
|
palloc0(sizeof(MultiConnectionPollState));
|
||||||
|
|
||||||
connectionState->connection = connection;
|
connectionState->connection = connection;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* before we can build the waitset to wait for asynchronous IO we need to know the
|
* before we can build the waitset to wait for asynchronous IO we need to know the
|
||||||
* pollmode to use for the sockets. This is populated by executing one round of
|
* pollmode to use for the sockets. This is populated by executing one round of
|
||||||
* PQconnectPoll. This updates the MultiConnectionState struct with its phase and
|
* PQconnectPoll. This updates the MultiConnectionPollState struct with its phase and
|
||||||
* its next poll mode.
|
* its next poll mode.
|
||||||
*/
|
*/
|
||||||
MultiConnectionStatePoll(connectionState);
|
MultiConnectionStatePoll(connectionState);
|
||||||
|
@ -719,8 +720,8 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
||||||
{
|
{
|
||||||
WaitEvent *event = &events[eventIndex];
|
WaitEvent *event = &events[eventIndex];
|
||||||
bool connectionStateChanged = false;
|
bool connectionStateChanged = false;
|
||||||
MultiConnectionState *connectionState =
|
MultiConnectionPollState *connectionState =
|
||||||
(MultiConnectionState *) event->user_data;
|
(MultiConnectionPollState *) event->user_data;
|
||||||
|
|
||||||
if (event->events & WL_POSTMASTER_DEATH)
|
if (event->events & WL_POSTMASTER_DEATH)
|
||||||
{
|
{
|
||||||
|
@ -811,7 +812,7 @@ DeadlineTimestampTzToTimeout(TimestampTz deadline)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CloseNotReadyMultiConnectionStates calls CloseConnection for all MultiConnection's
|
* CloseNotReadyMultiConnectionStates calls CloseConnection for all MultiConnection's
|
||||||
* tracked in the MultiConnectionState list passed in, only if the connection is not yet
|
* tracked in the MultiConnectionPollState list passed in, only if the connection is not yet
|
||||||
* fully established.
|
* fully established.
|
||||||
*
|
*
|
||||||
* This function removes the pointer to the MultiConnection data after the Connections are
|
* This function removes the pointer to the MultiConnection data after the Connections are
|
||||||
|
@ -823,7 +824,7 @@ CloseNotReadyMultiConnectionStates(List *connectionStates)
|
||||||
ListCell *connectionStateCell = NULL;
|
ListCell *connectionStateCell = NULL;
|
||||||
foreach(connectionStateCell, connectionStates)
|
foreach(connectionStateCell, connectionStates)
|
||||||
{
|
{
|
||||||
MultiConnectionState *connectionState = lfirst(connectionStateCell);
|
MultiConnectionPollState *connectionState = lfirst(connectionStateCell);
|
||||||
MultiConnection *connection = connectionState->connection;
|
MultiConnection *connection = connectionState->connection;
|
||||||
|
|
||||||
if (connectionState->phase != MULTI_CONNECTION_PHASE_CONNECTING)
|
if (connectionState->phase != MULTI_CONNECTION_PHASE_CONNECTING)
|
||||||
|
|
Loading…
Reference in New Issue