mirror of https://github.com/citusdata/citus.git
Move multi_client_executor to interrupt aware libpq wrappers.
parent
1e5a8970db
commit
0722b3f003
|
@ -49,7 +49,6 @@ static PostgresPollingStatusType ClientPollingStatusArray[MAX_CONNECTION_COUNT];
|
||||||
|
|
||||||
|
|
||||||
/* Local functions forward declarations */
|
/* Local functions forward declarations */
|
||||||
static void ClearRemainingResults(MultiConnection *connection);
|
|
||||||
static bool ClientConnectionReady(MultiConnection *connection,
|
static bool ClientConnectionReady(MultiConnection *connection,
|
||||||
PostgresPollingStatusType pollingStatus);
|
PostgresPollingStatusType pollingStatus);
|
||||||
|
|
||||||
|
@ -406,6 +405,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
|
||||||
PGresult *result = NULL;
|
PGresult *result = NULL;
|
||||||
ConnStatusType connStatusType = CONNECTION_OK;
|
ConnStatusType connStatusType = CONNECTION_OK;
|
||||||
ExecStatusType resultStatus = PGRES_COMMAND_OK;
|
ExecStatusType resultStatus = PGRES_COMMAND_OK;
|
||||||
|
bool raiseInterrupts = true;
|
||||||
|
|
||||||
Assert(connectionId != INVALID_CONNECTION_ID);
|
Assert(connectionId != INVALID_CONNECTION_ID);
|
||||||
connection = ClientConnectionArray[connectionId];
|
connection = ClientConnectionArray[connectionId];
|
||||||
|
@ -418,7 +418,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
result = PQgetResult(connection->pgConn);
|
result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||||
resultStatus = PQresultStatus(result);
|
resultStatus = PQresultStatus(result);
|
||||||
if (resultStatus == PGRES_TUPLES_OK)
|
if (resultStatus == PGRES_TUPLES_OK)
|
||||||
{
|
{
|
||||||
|
@ -435,7 +435,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* clear extra result objects */
|
/* clear extra result objects */
|
||||||
ClearRemainingResults(connection);
|
ForgetResults(connection);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -459,6 +459,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount,
|
||||||
ConnStatusType connStatusType = CONNECTION_OK;
|
ConnStatusType connStatusType = CONNECTION_OK;
|
||||||
ExecStatusType resultStatus = PGRES_COMMAND_OK;
|
ExecStatusType resultStatus = PGRES_COMMAND_OK;
|
||||||
BatchQueryStatus queryStatus = CLIENT_INVALID_BATCH_QUERY;
|
BatchQueryStatus queryStatus = CLIENT_INVALID_BATCH_QUERY;
|
||||||
|
bool raiseInterrupts = true;
|
||||||
|
|
||||||
Assert(connectionId != INVALID_CONNECTION_ID);
|
Assert(connectionId != INVALID_CONNECTION_ID);
|
||||||
connection = ClientConnectionArray[connectionId];
|
connection = ClientConnectionArray[connectionId];
|
||||||
|
@ -476,7 +477,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount,
|
||||||
return CLIENT_BATCH_QUERY_FAILED;
|
return CLIENT_BATCH_QUERY_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
result = PQgetResult(connection->pgConn);
|
result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||||
if (result == NULL)
|
if (result == NULL)
|
||||||
{
|
{
|
||||||
return CLIENT_BATCH_QUERY_DONE;
|
return CLIENT_BATCH_QUERY_DONE;
|
||||||
|
@ -543,6 +544,7 @@ MultiClientQueryStatus(int32 connectionId)
|
||||||
ConnStatusType connStatusType = CONNECTION_OK;
|
ConnStatusType connStatusType = CONNECTION_OK;
|
||||||
ExecStatusType resultStatus = PGRES_COMMAND_OK;
|
ExecStatusType resultStatus = PGRES_COMMAND_OK;
|
||||||
QueryStatus queryStatus = CLIENT_INVALID_QUERY;
|
QueryStatus queryStatus = CLIENT_INVALID_QUERY;
|
||||||
|
bool raiseInterrupts = true;
|
||||||
|
|
||||||
Assert(connectionId != INVALID_CONNECTION_ID);
|
Assert(connectionId != INVALID_CONNECTION_ID);
|
||||||
connection = ClientConnectionArray[connectionId];
|
connection = ClientConnectionArray[connectionId];
|
||||||
|
@ -560,7 +562,7 @@ MultiClientQueryStatus(int32 connectionId)
|
||||||
* isn't ready yet (the caller didn't wait for the connection to be ready),
|
* isn't ready yet (the caller didn't wait for the connection to be ready),
|
||||||
* we will block on this call.
|
* we will block on this call.
|
||||||
*/
|
*/
|
||||||
result = PQgetResult(connection->pgConn);
|
result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||||
resultStatus = PQresultStatus(result);
|
resultStatus = PQresultStatus(result);
|
||||||
|
|
||||||
if (resultStatus == PGRES_COMMAND_OK)
|
if (resultStatus == PGRES_COMMAND_OK)
|
||||||
|
@ -603,7 +605,7 @@ MultiClientQueryStatus(int32 connectionId)
|
||||||
*/
|
*/
|
||||||
if (!copyResults)
|
if (!copyResults)
|
||||||
{
|
{
|
||||||
ClearRemainingResults(connection);
|
ForgetResults(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
return queryStatus;
|
return queryStatus;
|
||||||
|
@ -670,7 +672,8 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
|
||||||
else if (receiveLength == -1)
|
else if (receiveLength == -1)
|
||||||
{
|
{
|
||||||
/* received copy done message */
|
/* received copy done message */
|
||||||
PGresult *result = PQgetResult(connection->pgConn);
|
bool raiseInterrupts = true;
|
||||||
|
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||||
ExecStatusType resultStatus = PQresultStatus(result);
|
ExecStatusType resultStatus = PQresultStatus(result);
|
||||||
|
|
||||||
if (resultStatus == PGRES_COMMAND_OK)
|
if (resultStatus == PGRES_COMMAND_OK)
|
||||||
|
@ -697,7 +700,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
|
||||||
/* if copy out completed, make sure we drain all results from libpq */
|
/* if copy out completed, make sure we drain all results from libpq */
|
||||||
if (receiveLength < 0)
|
if (receiveLength < 0)
|
||||||
{
|
{
|
||||||
ClearRemainingResults(connection);
|
ForgetResults(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
return copyStatus;
|
return copyStatus;
|
||||||
|
@ -858,23 +861,6 @@ MultiClientWait(WaitInfo *waitInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ClearRemainingResults reads result objects from the connection until we get
|
|
||||||
* null, and clears these results. This is the last step in completing an async
|
|
||||||
* query.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
ClearRemainingResults(MultiConnection *connection)
|
|
||||||
{
|
|
||||||
PGresult *result = PQgetResult(connection->pgConn);
|
|
||||||
while (result != NULL)
|
|
||||||
{
|
|
||||||
PQclear(result);
|
|
||||||
result = PQgetResult(connection->pgConn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ClientConnectionReady checks if the given connection is ready for non-blocking
|
* ClientConnectionReady checks if the given connection is ready for non-blocking
|
||||||
* reads or writes. This function is loosely based on pqSocketCheck() at fe-misc.c
|
* reads or writes. This function is loosely based on pqSocketCheck() at fe-misc.c
|
||||||
|
|
Loading…
Reference in New Issue