Convert ExecuteRemoteCommand and ExecuteRemoteQuery to new connection api

pull/1229/head
Murat Tuncer 2017-02-13 15:57:47 +03:00 committed by Metin Doslu
parent 701aaccd9c
commit df00b725a4
1 changed files with 51 additions and 73 deletions

View File

@ -13,6 +13,7 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "libpq-fe.h"
#include "funcapi.h" #include "funcapi.h"
#include "miscadmin.h" #include "miscadmin.h"
#include <unistd.h> #include <unistd.h>
@ -26,12 +27,14 @@
#include "commands/extension.h" #include "commands/extension.h"
#include "commands/sequence.h" #include "commands/sequence.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/connection_management.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/task_tracker.h" #include "distributed/task_tracker.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
@ -1015,59 +1018,44 @@ List *
ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser, ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser,
StringInfo queryString) StringInfo queryString)
{ {
int32 connectionId = -1; int querySent = 0;
bool querySent = false;
bool queryReady = false;
bool queryOK = false;
void *queryResult = NULL;
int rowCount = 0; int rowCount = 0;
int rowIndex = 0; int rowIndex = 0;
int columnCount = 0; int PG_USED_FOR_ASSERTS_ONLY columnCount = 0;
List *resultList = NIL; List *resultList = NIL;
int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *multiConnection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
runAsUser, NULL);
PGresult *queryResult = NULL;
ExecStatusType resultStatus = PGRES_COMMAND_OK;
connectionId = MultiClientConnect(nodeName, nodePort, NULL, runAsUser); querySent = ExecuteOptionalRemoteCommand(multiConnection, queryString->data,
if (connectionId == INVALID_CONNECTION_ID) &queryResult);
if (querySent != 0)
{ {
CloseConnection(multiConnection);
return NIL; return NIL;
} }
querySent = MultiClientSendQuery(connectionId, queryString->data); resultStatus = PQresultStatus(queryResult);
if (!querySent) if (resultStatus == PGRES_TUPLES_OK)
{ {
MultiClientDisconnect(connectionId); rowCount = PQntuples(queryResult);
return NIL; columnCount = PQnfields(queryResult);
} }
else
while (!queryReady)
{ {
ResultStatus resultStatus = MultiClientResultStatus(connectionId); ReportResultError(multiConnection, queryResult, WARNING);
if (resultStatus == CLIENT_RESULT_READY) PQclear(queryResult);
{ CloseConnection(multiConnection);
queryReady = true;
}
else if (resultStatus == CLIENT_RESULT_BUSY)
{
long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L;
pg_usleep(sleepIntervalPerCycle);
}
else
{
MultiClientDisconnect(connectionId);
return NIL;
}
}
queryOK = MultiClientQueryResult(connectionId, &queryResult, &rowCount, &columnCount);
if (!queryOK)
{
MultiClientDisconnect(connectionId);
return NIL; return NIL;
} }
for (rowIndex = 0; rowIndex < rowCount; rowIndex++) for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
{ {
const int columnIndex = 0; const int columnIndex = 0;
char *rowValue = MultiClientGetValue(queryResult, rowIndex, columnIndex); char *rowValue = PQgetvalue(queryResult, rowIndex, columnIndex);
StringInfo rowValueString = makeStringInfo(); StringInfo rowValueString = makeStringInfo();
appendStringInfoString(rowValueString, rowValue); appendStringInfoString(rowValueString, rowValue);
@ -1076,8 +1064,9 @@ ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser,
resultList = lappend(resultList, rowValueString); resultList = lappend(resultList, rowValueString);
} }
MultiClientClearResult(queryResult); PQclear(queryResult);
MultiClientDisconnect(connectionId); ForgetResults(multiConnection);
CloseConnection(multiConnection);
return resultList; return resultList;
} }
@ -1092,52 +1081,41 @@ ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser,
bool bool
ExecuteRemoteCommand(const char *nodeName, uint32 nodePort, StringInfo queryString) ExecuteRemoteCommand(const char *nodeName, uint32 nodePort, StringInfo queryString)
{ {
char *nodeDatabase = get_database_name(MyDatabaseId);
int32 connectionId = -1;
QueryStatus queryStatus = CLIENT_INVALID_QUERY;
bool querySent = false; bool querySent = false;
bool queryReady = false;
bool queryDone = false; bool queryDone = false;
int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *multiConnection =
GetNodeConnection(connectionFlags, nodeName, nodePort);
PGresult *queryResult = NULL;
ExecStatusType resultStatus = PGRES_COMMAND_OK;
connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, NULL); querySent = ExecuteOptionalRemoteCommand(multiConnection, queryString->data,
if (connectionId == INVALID_CONNECTION_ID) &queryResult);
if (querySent != 0)
{ {
CloseConnection(multiConnection);
return false; return false;
} }
querySent = MultiClientSendQuery(connectionId, queryString->data); resultStatus = PQresultStatus(queryResult);
if (!querySent) if (resultStatus == PGRES_COMMAND_OK)
{
MultiClientDisconnect(connectionId);
return false;
}
while (!queryReady)
{
ResultStatus resultStatus = MultiClientResultStatus(connectionId);
if (resultStatus == CLIENT_RESULT_READY)
{
queryReady = true;
}
else if (resultStatus == CLIENT_RESULT_BUSY)
{
long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L;
pg_usleep(sleepIntervalPerCycle);
}
else
{
MultiClientDisconnect(connectionId);
return false;
}
}
queryStatus = MultiClientQueryStatus(connectionId);
if (queryStatus == CLIENT_QUERY_DONE)
{ {
queryDone = true; queryDone = true;
} }
else if (resultStatus == PGRES_TUPLES_OK)
{
/*
* We use the client executor to only issue a select query that returns
* a void value. We therefore should not have more than one value here.
*/
int PG_USED_FOR_ASSERTS_ONLY tupleCount = PQntuples(queryResult);
Assert(tupleCount <= 1);
queryDone = true;
}
CloseConnection(multiConnection);
MultiClientDisconnect(connectionId);
return queryDone; return queryDone;
} }