mirror of https://github.com/citusdata/citus.git
Move multi_client_executor to interrupt aware libpq wrappers.
parent
ddb0651967
commit
21c25abbb1
|
@ -44,7 +44,6 @@ static PostgresPollingStatusType ClientPollingStatusArray[MAX_CONNECTION_COUNT];
|
||||||
|
|
||||||
|
|
||||||
/* Local functions forward declarations */
|
/* Local functions forward declarations */
|
||||||
static void ClearRemainingResults(MultiConnection *connection);
|
|
||||||
static bool ClientConnectionReady(MultiConnection *connection,
|
static bool ClientConnectionReady(MultiConnection *connection,
|
||||||
PostgresPollingStatusType pollingStatus);
|
PostgresPollingStatusType pollingStatus);
|
||||||
|
|
||||||
|
@ -401,6 +400,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
|
||||||
PGresult *result = NULL;
|
PGresult *result = NULL;
|
||||||
ConnStatusType connStatusType = CONNECTION_OK;
|
ConnStatusType connStatusType = CONNECTION_OK;
|
||||||
ExecStatusType resultStatus = PGRES_COMMAND_OK;
|
ExecStatusType resultStatus = PGRES_COMMAND_OK;
|
||||||
|
bool raiseInterrupts = true;
|
||||||
|
|
||||||
Assert(connectionId != INVALID_CONNECTION_ID);
|
Assert(connectionId != INVALID_CONNECTION_ID);
|
||||||
connection = ClientConnectionArray[connectionId];
|
connection = ClientConnectionArray[connectionId];
|
||||||
|
@ -413,7 +413,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
result = PQgetResult(connection->pgConn);
|
result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||||
resultStatus = PQresultStatus(result);
|
resultStatus = PQresultStatus(result);
|
||||||
if (resultStatus == PGRES_TUPLES_OK)
|
if (resultStatus == PGRES_TUPLES_OK)
|
||||||
{
|
{
|
||||||
|
@ -430,7 +430,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* clear extra result objects */
|
/* clear extra result objects */
|
||||||
ClearRemainingResults(connection);
|
ForgetResults(connection);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -454,6 +454,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount,
|
||||||
ConnStatusType connStatusType = CONNECTION_OK;
|
ConnStatusType connStatusType = CONNECTION_OK;
|
||||||
ExecStatusType resultStatus = PGRES_COMMAND_OK;
|
ExecStatusType resultStatus = PGRES_COMMAND_OK;
|
||||||
BatchQueryStatus queryStatus = CLIENT_INVALID_BATCH_QUERY;
|
BatchQueryStatus queryStatus = CLIENT_INVALID_BATCH_QUERY;
|
||||||
|
bool raiseInterrupts = true;
|
||||||
|
|
||||||
Assert(connectionId != INVALID_CONNECTION_ID);
|
Assert(connectionId != INVALID_CONNECTION_ID);
|
||||||
connection = ClientConnectionArray[connectionId];
|
connection = ClientConnectionArray[connectionId];
|
||||||
|
@ -471,7 +472,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount,
|
||||||
return CLIENT_BATCH_QUERY_FAILED;
|
return CLIENT_BATCH_QUERY_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
result = PQgetResult(connection->pgConn);
|
result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||||
if (result == NULL)
|
if (result == NULL)
|
||||||
{
|
{
|
||||||
return CLIENT_BATCH_QUERY_DONE;
|
return CLIENT_BATCH_QUERY_DONE;
|
||||||
|
@ -538,6 +539,7 @@ MultiClientQueryStatus(int32 connectionId)
|
||||||
ConnStatusType connStatusType = CONNECTION_OK;
|
ConnStatusType connStatusType = CONNECTION_OK;
|
||||||
ExecStatusType resultStatus = PGRES_COMMAND_OK;
|
ExecStatusType resultStatus = PGRES_COMMAND_OK;
|
||||||
QueryStatus queryStatus = CLIENT_INVALID_QUERY;
|
QueryStatus queryStatus = CLIENT_INVALID_QUERY;
|
||||||
|
bool raiseInterrupts = true;
|
||||||
|
|
||||||
Assert(connectionId != INVALID_CONNECTION_ID);
|
Assert(connectionId != INVALID_CONNECTION_ID);
|
||||||
connection = ClientConnectionArray[connectionId];
|
connection = ClientConnectionArray[connectionId];
|
||||||
|
@ -555,7 +557,7 @@ MultiClientQueryStatus(int32 connectionId)
|
||||||
* isn't ready yet (the caller didn't wait for the connection to be ready),
|
* isn't ready yet (the caller didn't wait for the connection to be ready),
|
||||||
* we will block on this call.
|
* we will block on this call.
|
||||||
*/
|
*/
|
||||||
result = PQgetResult(connection->pgConn);
|
result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||||
resultStatus = PQresultStatus(result);
|
resultStatus = PQresultStatus(result);
|
||||||
|
|
||||||
if (resultStatus == PGRES_COMMAND_OK)
|
if (resultStatus == PGRES_COMMAND_OK)
|
||||||
|
@ -598,7 +600,7 @@ MultiClientQueryStatus(int32 connectionId)
|
||||||
*/
|
*/
|
||||||
if (!copyResults)
|
if (!copyResults)
|
||||||
{
|
{
|
||||||
ClearRemainingResults(connection);
|
ForgetResults(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
return queryStatus;
|
return queryStatus;
|
||||||
|
@ -665,7 +667,8 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
|
||||||
else if (receiveLength == -1)
|
else if (receiveLength == -1)
|
||||||
{
|
{
|
||||||
/* received copy done message */
|
/* received copy done message */
|
||||||
PGresult *result = PQgetResult(connection->pgConn);
|
bool raiseInterrupts = true;
|
||||||
|
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||||
ExecStatusType resultStatus = PQresultStatus(result);
|
ExecStatusType resultStatus = PQresultStatus(result);
|
||||||
|
|
||||||
if (resultStatus == PGRES_COMMAND_OK)
|
if (resultStatus == PGRES_COMMAND_OK)
|
||||||
|
@ -692,7 +695,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
|
||||||
/* if copy out completed, make sure we drain all results from libpq */
|
/* if copy out completed, make sure we drain all results from libpq */
|
||||||
if (receiveLength < 0)
|
if (receiveLength < 0)
|
||||||
{
|
{
|
||||||
ClearRemainingResults(connection);
|
ForgetResults(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
return copyStatus;
|
return copyStatus;
|
||||||
|
@ -853,23 +856,6 @@ MultiClientWait(WaitInfo *waitInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ClearRemainingResults reads result objects from the connection until we get
|
|
||||||
* null, and clears these results. This is the last step in completing an async
|
|
||||||
* query.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
ClearRemainingResults(MultiConnection *connection)
|
|
||||||
{
|
|
||||||
PGresult *result = PQgetResult(connection->pgConn);
|
|
||||||
while (result != NULL)
|
|
||||||
{
|
|
||||||
PQclear(result);
|
|
||||||
result = PQgetResult(connection->pgConn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ClientConnectionReady checks if the given connection is ready for non-blocking
|
* ClientConnectionReady checks if the given connection is ready for non-blocking
|
||||||
* reads or writes. This function is loosely based on pqSocketCheck() at fe-misc.c
|
* reads or writes. This function is loosely based on pqSocketCheck() at fe-misc.c
|
||||||
|
|
Loading…
Reference in New Issue