Minimal citus tools conversion to new connection API.

pull/1155/head
Andres Freund 2017-01-20 22:49:34 -08:00
parent 92254ac6b2
commit 52c3369f79
1 changed files with 20 additions and 27 deletions

View File

@ -18,7 +18,6 @@
#include "access/htup_details.h" #include "access/htup_details.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/connection_cache.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
@ -222,7 +221,6 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
int commmandCount) int commmandCount)
{ {
int commandIndex = 0; int commandIndex = 0;
char *nodeUser = CurrentUserName();
PGconn **connectionArray = palloc0(commmandCount * sizeof(PGconn *)); PGconn **connectionArray = palloc0(commmandCount * sizeof(PGconn *));
int finishedCount = 0; int finishedCount = 0;
@ -231,20 +229,25 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
{ {
char *nodeName = nodeNameArray[commandIndex]->data; char *nodeName = nodeNameArray[commandIndex]->data;
int nodePort = nodePortArray[commandIndex]; int nodePort = nodePortArray[commandIndex];
PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser); int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *multiConnection =
GetNodeConnection(connectionFlags, nodeName, nodePort);
PGconn *connection = multiConnection->pgConn;
StringInfo queryResultString = resultStringArray[commandIndex]; StringInfo queryResultString = resultStringArray[commandIndex];
statusArray[commandIndex] = true; statusArray[commandIndex] = true;
connectionArray[commandIndex] = connection; if (PQstatus(multiConnection->pgConn) != CONNECTION_OK)
if (connection == NULL)
{ {
appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName,
(int) nodePort); (int) nodePort);
statusArray[commandIndex] = false; statusArray[commandIndex] = false;
finishedCount++; finishedCount++;
} }
else
{
connectionArray[commandIndex] = connection;
}
} }
/* send queries at once */ /* send queries at once */
@ -491,37 +494,27 @@ static bool
ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString,
StringInfo queryResultString) StringInfo queryResultString)
{ {
char *nodeUser = CurrentUserName(); int connectionFlags = FORCE_NEW_CONNECTION;
PGconn *nodeConnection = ConnectToNode(nodeName, nodePort, nodeUser); MultiConnection *multiConnection =
GetNodeConnection(connectionFlags, nodeName, nodePort);
PGconn *nodeConnection = multiConnection->pgConn;
bool success = false; bool success = false;
PGresult *queryResult = NULL;
if (nodeConnection == NULL) if (PQstatus(multiConnection->pgConn) != CONNECTION_OK)
{ {
appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName,
(int) nodePort); (int) nodePort);
return false; return false;
} }
PG_TRY(); queryResult = PQexec(nodeConnection, queryString);
{
PGresult *queryResult = PQexec(nodeConnection, queryString);
success = EvaluateQueryResult(nodeConnection, queryResult, queryResultString); success = EvaluateQueryResult(nodeConnection, queryResult, queryResultString);
PQclear(queryResult); PQclear(queryResult);
/* close the connection */ /* close the connection */
CloseConnectionByPGconn(nodeConnection); CloseConnection(multiConnection);
nodeConnection = NULL;
}
PG_CATCH();
{
StoreErrorMessage(nodeConnection, queryResultString);
/* close the connection */
CloseConnectionByPGconn(nodeConnection);
nodeConnection = NULL;
}
PG_END_TRY();
return success; return success;
} }