Move citus tools to interrupt aware libpq wrappers.

pull/1650/head
Andres Freund 2017-06-30 18:20:54 -07:00 committed by Metin Doslu
parent 734921eca6
commit 1e5a8970db
1 changed files with 33 additions and 27 deletions

View File

@ -21,6 +21,7 @@
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/remote_commands.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "utils/builtins.h" #include "utils/builtins.h"
@ -39,11 +40,11 @@ static void ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray,
bool *statusArray, bool *statusArray,
StringInfo *resultStringArray, StringInfo *resultStringArray,
int commmandCount); int commmandCount);
static bool GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, static bool GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus,
StringInfo queryResultString); StringInfo queryResultString);
static bool EvaluateQueryResult(PGconn *connection, PGresult *queryResult, StringInfo static bool EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult,
queryResultString); StringInfo queryResultString);
static void StoreErrorMessage(PGconn *connection, StringInfo queryResultString); static void StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString);
static void ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, static void ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray,
int *nodePortArray, int *nodePortArray,
StringInfo *commandStringArray, StringInfo *commandStringArray,
@ -221,7 +222,8 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
int commmandCount) int commmandCount)
{ {
int commandIndex = 0; int commandIndex = 0;
PGconn **connectionArray = palloc0(commmandCount * sizeof(PGconn *)); MultiConnection **connectionArray =
palloc0(commmandCount * sizeof(MultiConnection *));
int finishedCount = 0; int finishedCount = 0;
/* establish connections */ /* establish connections */
@ -230,14 +232,13 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
char *nodeName = nodeNameArray[commandIndex]->data; char *nodeName = nodeNameArray[commandIndex]->data;
int nodePort = nodePortArray[commandIndex]; int nodePort = nodePortArray[commandIndex];
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *multiConnection = MultiConnection *connection =
GetNodeConnection(connectionFlags, nodeName, nodePort); GetNodeConnection(connectionFlags, nodeName, nodePort);
PGconn *connection = multiConnection->pgConn;
StringInfo queryResultString = resultStringArray[commandIndex]; StringInfo queryResultString = resultStringArray[commandIndex];
statusArray[commandIndex] = true; statusArray[commandIndex] = true;
if (PQstatus(multiConnection->pgConn) != CONNECTION_OK) if (PQstatus(connection->pgConn) != CONNECTION_OK)
{ {
appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName,
(int) nodePort); (int) nodePort);
@ -254,7 +255,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) for (commandIndex = 0; commandIndex < commmandCount; commandIndex++)
{ {
int querySent = 0; int querySent = 0;
PGconn *connection = connectionArray[commandIndex]; MultiConnection *connection = connectionArray[commandIndex];
char *queryString = commandStringArray[commandIndex]->data; char *queryString = commandStringArray[commandIndex]->data;
StringInfo queryResultString = resultStringArray[commandIndex]; StringInfo queryResultString = resultStringArray[commandIndex];
@ -267,13 +268,17 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
continue; continue;
} }
querySent = PQsendQuery(connection, queryString); /*
* NB: this intentionally uses PQsendQuery rather than
* SendRemoteCommand as multiple commands are allowed.
*/
querySent = PQsendQuery(connection->pgConn, queryString);
if (querySent == 0) if (querySent == 0)
{ {
StoreErrorMessage(connection, queryResultString); StoreErrorMessage(connection, queryResultString);
statusArray[commandIndex] = false; statusArray[commandIndex] = false;
CloseConnectionByPGconn(connection); CloseConnection(connection);
connectionArray[commandIndex] = NULL; connectionArray[commandIndex] = NULL;
finishedCount++; finishedCount++;
} }
@ -284,7 +289,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
{ {
for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) for (commandIndex = 0; commandIndex < commmandCount; commandIndex++)
{ {
PGconn *connection = connectionArray[commandIndex]; MultiConnection *connection = connectionArray[commandIndex];
StringInfo queryResultString = resultStringArray[commandIndex]; StringInfo queryResultString = resultStringArray[commandIndex];
bool success = false; bool success = false;
bool queryFinished = false; bool queryFinished = false;
@ -302,7 +307,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
finishedCount++; finishedCount++;
statusArray[commandIndex] = success; statusArray[commandIndex] = success;
connectionArray[commandIndex] = NULL; connectionArray[commandIndex] = NULL;
CloseConnectionByPGconn(connection); CloseConnection(connection);
} }
} }
@ -326,11 +331,11 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
* reported upon completion of the query. * reported upon completion of the query.
*/ */
static bool static bool
GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus,
StringInfo queryResultString) StringInfo queryResultString)
{ {
bool finished = true; bool finished = true;
ConnStatusType connectionStatus = PQstatus(connection); ConnStatusType connectionStatus = PQstatus(connection->pgConn);
int consumeInput = 0; int consumeInput = 0;
PGresult *queryResult = NULL; PGresult *queryResult = NULL;
bool success = false; bool success = false;
@ -344,7 +349,7 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus,
return finished; return finished;
} }
consumeInput = PQconsumeInput(connection); consumeInput = PQconsumeInput(connection->pgConn);
if (consumeInput == 0) if (consumeInput == 0)
{ {
appendStringInfo(queryResultString, "query result unavailable"); appendStringInfo(queryResultString, "query result unavailable");
@ -352,14 +357,14 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus,
} }
/* check later if busy */ /* check later if busy */
if (PQisBusy(connection) != 0) if (PQisBusy(connection->pgConn) != 0)
{ {
finished = false; finished = false;
return finished; return finished;
} }
/* query result is available at this point */ /* query result is available at this point */
queryResult = PQgetResult(connection); queryResult = PQgetResult(connection->pgConn);
success = EvaluateQueryResult(connection, queryResult, queryResultString); success = EvaluateQueryResult(connection, queryResult, queryResultString);
PQclear(queryResult); PQclear(queryResult);
@ -377,7 +382,7 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus,
* error otherwise. * error otherwise.
*/ */
static bool static bool
EvaluateQueryResult(PGconn *connection, PGresult *queryResult, EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult,
StringInfo queryResultString) StringInfo queryResultString)
{ {
bool success = false; bool success = false;
@ -432,9 +437,9 @@ EvaluateQueryResult(PGconn *connection, PGresult *queryResult,
* otherwise it would return a default error message. * otherwise it would return a default error message.
*/ */
static void static void
StoreErrorMessage(PGconn *connection, StringInfo queryResultString) StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString)
{ {
char *errorMessage = PQerrorMessage(connection); char *errorMessage = PQerrorMessage(connection->pgConn);
if (errorMessage != NULL) if (errorMessage != NULL)
{ {
char *firstNewlineIndex = strchr(errorMessage, '\n'); char *firstNewlineIndex = strchr(errorMessage, '\n');
@ -497,26 +502,27 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString,
StringInfo queryResultString) StringInfo queryResultString)
{ {
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *multiConnection = MultiConnection *connection =
GetNodeConnection(connectionFlags, nodeName, nodePort); GetNodeConnection(connectionFlags, nodeName, nodePort);
PGconn *nodeConnection = multiConnection->pgConn;
bool success = false; bool success = false;
PGresult *queryResult = NULL; PGresult *queryResult = NULL;
bool raiseInterrupts = true;
if (PQstatus(multiConnection->pgConn) != CONNECTION_OK) if (PQstatus(connection->pgConn) != CONNECTION_OK)
{ {
appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName,
(int) nodePort); (int) nodePort);
return false; return false;
} }
queryResult = PQexec(nodeConnection, queryString); SendRemoteCommand(connection, queryString);
success = EvaluateQueryResult(nodeConnection, queryResult, queryResultString); queryResult = GetRemoteCommandResult(connection, raiseInterrupts);
success = EvaluateQueryResult(connection, queryResult, queryResultString);
PQclear(queryResult); PQclear(queryResult);
/* close the connection */ /* close the connection */
CloseConnection(multiConnection); CloseConnection(connection);
return success; return success;
} }