From ddb06519670b8916356b744da44c178cfba297a0 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH] Move citus tools to interrupt aware libpq wrappers. --- .../distributed/master/master_citus_tools.c | 60 ++++++++++--------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/src/backend/distributed/master/master_citus_tools.c b/src/backend/distributed/master/master_citus_tools.c index 946ec16cf..62679a83f 100644 --- a/src/backend/distributed/master/master_citus_tools.c +++ b/src/backend/distributed/master/master_citus_tools.c @@ -21,6 +21,7 @@ #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/multi_server_executor.h" +#include "distributed/remote_commands.h" #include "distributed/worker_protocol.h" #include "lib/stringinfo.h" #include "utils/builtins.h" @@ -39,11 +40,11 @@ static void ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, bool *statusArray, StringInfo *resultStringArray, int commmandCount); -static bool GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, +static bool GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus, StringInfo queryResultString); -static bool EvaluateQueryResult(PGconn *connection, PGresult *queryResult, StringInfo - queryResultString); -static void StoreErrorMessage(PGconn *connection, StringInfo queryResultString); +static bool EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult, + StringInfo queryResultString); +static void StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString); static void ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray, StringInfo *commandStringArray, @@ -223,7 +224,8 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor int commmandCount) { int commandIndex = 0; - PGconn **connectionArray = palloc0(commmandCount * sizeof(PGconn *)); + MultiConnection **connectionArray = + palloc0(commmandCount * sizeof(MultiConnection *)); int finishedCount = 0; /* establish connections */ @@ -232,14 +234,13 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor char *nodeName = nodeNameArray[commandIndex]->data; int nodePort = nodePortArray[commandIndex]; int connectionFlags = FORCE_NEW_CONNECTION; - MultiConnection *multiConnection = + MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); - PGconn *connection = multiConnection->pgConn; StringInfo queryResultString = resultStringArray[commandIndex]; statusArray[commandIndex] = true; - if (PQstatus(multiConnection->pgConn) != CONNECTION_OK) + if (PQstatus(connection->pgConn) != CONNECTION_OK) { appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, (int) nodePort); @@ -256,7 +257,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) { int querySent = 0; - PGconn *connection = connectionArray[commandIndex]; + MultiConnection *connection = connectionArray[commandIndex]; char *queryString = commandStringArray[commandIndex]->data; StringInfo queryResultString = resultStringArray[commandIndex]; @@ -269,13 +270,17 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor continue; } - querySent = PQsendQuery(connection, queryString); + /* + * NB: this intentionally uses PQsendQuery rather than + * SendRemoteCommand as multiple commands are allowed. + */ + querySent = PQsendQuery(connection->pgConn, queryString); if (querySent == 0) { StoreErrorMessage(connection, queryResultString); statusArray[commandIndex] = false; - CloseConnectionByPGconn(connection); + CloseConnection(connection); connectionArray[commandIndex] = NULL; finishedCount++; } @@ -286,7 +291,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor { for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) { - PGconn *connection = connectionArray[commandIndex]; + MultiConnection *connection = connectionArray[commandIndex]; StringInfo queryResultString = resultStringArray[commandIndex]; bool success = false; bool queryFinished = false; @@ -304,7 +309,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor finishedCount++; statusArray[commandIndex] = success; connectionArray[commandIndex] = NULL; - CloseConnectionByPGconn(connection); + CloseConnection(connection); } } @@ -328,11 +333,11 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor * reported upon completion of the query. */ static bool -GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, +GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus, StringInfo queryResultString) { bool finished = true; - ConnStatusType connectionStatus = PQstatus(connection); + ConnStatusType connectionStatus = PQstatus(connection->pgConn); int consumeInput = 0; PGresult *queryResult = NULL; bool success = false; @@ -346,7 +351,7 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, return finished; } - consumeInput = PQconsumeInput(connection); + consumeInput = PQconsumeInput(connection->pgConn); if (consumeInput == 0) { appendStringInfo(queryResultString, "query result unavailable"); @@ -354,14 +359,14 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, } /* check later if busy */ - if (PQisBusy(connection) != 0) + if (PQisBusy(connection->pgConn) != 0) { finished = false; return finished; } /* query result is available at this point */ - queryResult = PQgetResult(connection); + queryResult = PQgetResult(connection->pgConn); success = EvaluateQueryResult(connection, queryResult, queryResultString); PQclear(queryResult); @@ -379,7 +384,7 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, * error otherwise. */ static bool -EvaluateQueryResult(PGconn *connection, PGresult *queryResult, +EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult, StringInfo queryResultString) { bool success = false; @@ -434,9 +439,9 @@ EvaluateQueryResult(PGconn *connection, PGresult *queryResult, * otherwise it would return a default error message. */ static void -StoreErrorMessage(PGconn *connection, StringInfo queryResultString) +StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString) { - char *errorMessage = PQerrorMessage(connection); + char *errorMessage = PQerrorMessage(connection->pgConn); if (errorMessage != NULL) { char *firstNewlineIndex = strchr(errorMessage, '\n'); @@ -499,26 +504,27 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, StringInfo queryResultString) { int connectionFlags = FORCE_NEW_CONNECTION; - MultiConnection *multiConnection = + MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); - PGconn *nodeConnection = multiConnection->pgConn; bool success = false; PGresult *queryResult = NULL; + bool raiseInterrupts = true; - if (PQstatus(multiConnection->pgConn) != CONNECTION_OK) + if (PQstatus(connection->pgConn) != CONNECTION_OK) { appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, (int) nodePort); return false; } - queryResult = PQexec(nodeConnection, queryString); - success = EvaluateQueryResult(nodeConnection, queryResult, queryResultString); + SendRemoteCommand(connection, queryString); + queryResult = GetRemoteCommandResult(connection, raiseInterrupts); + success = EvaluateQueryResult(connection, queryResult, queryResultString); PQclear(queryResult); /* close the connection */ - CloseConnection(multiConnection); + CloseConnection(connection); return success; }