diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index e7252ef62..827387489 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -44,7 +44,6 @@ static PostgresPollingStatusType ClientPollingStatusArray[MAX_CONNECTION_COUNT]; /* Local functions forward declarations */ -static void ClearRemainingResults(MultiConnection *connection); static bool ClientConnectionReady(MultiConnection *connection, PostgresPollingStatusType pollingStatus); @@ -401,6 +400,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, PGresult *result = NULL; ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; + bool raiseInterrupts = true; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; @@ -413,7 +413,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, return false; } - result = PQgetResult(connection->pgConn); + result = GetRemoteCommandResult(connection, raiseInterrupts); resultStatus = PQresultStatus(result); if (resultStatus == PGRES_TUPLES_OK) { @@ -430,7 +430,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, } /* clear extra result objects */ - ClearRemainingResults(connection); + ForgetResults(connection); return true; } @@ -454,6 +454,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; BatchQueryStatus queryStatus = CLIENT_INVALID_BATCH_QUERY; + bool raiseInterrupts = true; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; @@ -471,7 +472,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, return CLIENT_BATCH_QUERY_FAILED; } - result = PQgetResult(connection->pgConn); + result = GetRemoteCommandResult(connection, raiseInterrupts); if (result == NULL) { return CLIENT_BATCH_QUERY_DONE; @@ -538,6 +539,7 @@ MultiClientQueryStatus(int32 connectionId) ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; QueryStatus queryStatus = CLIENT_INVALID_QUERY; + bool raiseInterrupts = true; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; @@ -555,7 +557,7 @@ MultiClientQueryStatus(int32 connectionId) * isn't ready yet (the caller didn't wait for the connection to be ready), * we will block on this call. */ - result = PQgetResult(connection->pgConn); + result = GetRemoteCommandResult(connection, raiseInterrupts); resultStatus = PQresultStatus(result); if (resultStatus == PGRES_COMMAND_OK) @@ -598,7 +600,7 @@ MultiClientQueryStatus(int32 connectionId) */ if (!copyResults) { - ClearRemainingResults(connection); + ForgetResults(connection); } return queryStatus; @@ -665,7 +667,8 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) else if (receiveLength == -1) { /* received copy done message */ - PGresult *result = PQgetResult(connection->pgConn); + bool raiseInterrupts = true; + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); ExecStatusType resultStatus = PQresultStatus(result); if (resultStatus == PGRES_COMMAND_OK) @@ -692,7 +695,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) /* if copy out completed, make sure we drain all results from libpq */ if (receiveLength < 0) { - ClearRemainingResults(connection); + ForgetResults(connection); } return copyStatus; @@ -853,23 +856,6 @@ MultiClientWait(WaitInfo *waitInfo) } -/* - * ClearRemainingResults reads result objects from the connection until we get - * null, and clears these results. This is the last step in completing an async - * query. - */ -static void -ClearRemainingResults(MultiConnection *connection) -{ - PGresult *result = PQgetResult(connection->pgConn); - while (result != NULL) - { - PQclear(result); - result = PQgetResult(connection->pgConn); - } -} - - /* * ClientConnectionReady checks if the given connection is ready for non-blocking * reads or writes. This function is loosely based on pqSocketCheck() at fe-misc.c