From 0722b3f003f295db069d0a3f39203413576b6be3 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH] Move multi_client_executor to interrupt aware libpq wrappers. --- .../executor/multi_client_executor.c | 36 ++++++------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 022281e3b..f75b94708 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -49,7 +49,6 @@ static PostgresPollingStatusType ClientPollingStatusArray[MAX_CONNECTION_COUNT]; /* Local functions forward declarations */ -static void ClearRemainingResults(MultiConnection *connection); static bool ClientConnectionReady(MultiConnection *connection, PostgresPollingStatusType pollingStatus); @@ -406,6 +405,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, PGresult *result = NULL; ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; + bool raiseInterrupts = true; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; @@ -418,7 +418,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, return false; } - result = PQgetResult(connection->pgConn); + result = GetRemoteCommandResult(connection, raiseInterrupts); resultStatus = PQresultStatus(result); if (resultStatus == PGRES_TUPLES_OK) { @@ -435,7 +435,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, } /* clear extra result objects */ - ClearRemainingResults(connection); + ForgetResults(connection); return true; } @@ -459,6 +459,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; BatchQueryStatus queryStatus = CLIENT_INVALID_BATCH_QUERY; + bool raiseInterrupts = true; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; @@ -476,7 +477,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, return CLIENT_BATCH_QUERY_FAILED; } - result = PQgetResult(connection->pgConn); + result = GetRemoteCommandResult(connection, raiseInterrupts); if (result == NULL) { return CLIENT_BATCH_QUERY_DONE; @@ -543,6 +544,7 @@ MultiClientQueryStatus(int32 connectionId) ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; QueryStatus queryStatus = CLIENT_INVALID_QUERY; + bool raiseInterrupts = true; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; @@ -560,7 +562,7 @@ MultiClientQueryStatus(int32 connectionId) * isn't ready yet (the caller didn't wait for the connection to be ready), * we will block on this call. */ - result = PQgetResult(connection->pgConn); + result = GetRemoteCommandResult(connection, raiseInterrupts); resultStatus = PQresultStatus(result); if (resultStatus == PGRES_COMMAND_OK) @@ -603,7 +605,7 @@ MultiClientQueryStatus(int32 connectionId) */ if (!copyResults) { - ClearRemainingResults(connection); + ForgetResults(connection); } return queryStatus; @@ -670,7 +672,8 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) else if (receiveLength == -1) { /* received copy done message */ - PGresult *result = PQgetResult(connection->pgConn); + bool raiseInterrupts = true; + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); ExecStatusType resultStatus = PQresultStatus(result); if (resultStatus == PGRES_COMMAND_OK) @@ -697,7 +700,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) /* if copy out completed, make sure we drain all results from libpq */ if (receiveLength < 0) { - ClearRemainingResults(connection); + ForgetResults(connection); } return copyStatus; @@ -858,23 +861,6 @@ MultiClientWait(WaitInfo *waitInfo) } -/* - * ClearRemainingResults reads result objects from the connection until we get - * null, and clears these results. This is the last step in completing an async - * query. - */ -static void -ClearRemainingResults(MultiConnection *connection) -{ - PGresult *result = PQgetResult(connection->pgConn); - while (result != NULL) - { - PQclear(result); - result = PQgetResult(connection->pgConn); - } -} - - /* * ClientConnectionReady checks if the given connection is ready for non-blocking * reads or writes. This function is loosely based on pqSocketCheck() at fe-misc.c