diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index f86b20726..c89458211 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -13,6 +13,7 @@ */ #include "postgres.h" +#include "libpq-fe.h" #include "funcapi.h" #include "miscadmin.h" #include @@ -26,12 +27,14 @@ #include "commands/extension.h" #include "commands/sequence.h" #include "distributed/citus_ruleutils.h" +#include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_server_executor.h" #include "distributed/relay_utility.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/task_tracker.h" #include "distributed/worker_protocol.h" @@ -1015,59 +1018,44 @@ List * ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser, StringInfo queryString) { - int32 connectionId = -1; - bool querySent = false; - bool queryReady = false; - bool queryOK = false; - void *queryResult = NULL; + int querySent = 0; int rowCount = 0; int rowIndex = 0; - int columnCount = 0; + int PG_USED_FOR_ASSERTS_ONLY columnCount = 0; List *resultList = NIL; + int connectionFlags = FORCE_NEW_CONNECTION; + MultiConnection *multiConnection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, nodePort, + runAsUser, NULL); + PGresult *queryResult = NULL; + ExecStatusType resultStatus = PGRES_COMMAND_OK; - connectionId = MultiClientConnect(nodeName, nodePort, NULL, runAsUser); - if (connectionId == INVALID_CONNECTION_ID) + querySent = ExecuteOptionalRemoteCommand(multiConnection, queryString->data, + &queryResult); + if (querySent != 0) { + CloseConnection(multiConnection); return NIL; } - querySent = MultiClientSendQuery(connectionId, queryString->data); - if (!querySent) + resultStatus = PQresultStatus(queryResult); + if (resultStatus == PGRES_TUPLES_OK) { - MultiClientDisconnect(connectionId); - return NIL; + rowCount = PQntuples(queryResult); + columnCount = PQnfields(queryResult); } - - while (!queryReady) + else { - ResultStatus resultStatus = MultiClientResultStatus(connectionId); - if (resultStatus == CLIENT_RESULT_READY) - { - queryReady = true; - } - else if (resultStatus == CLIENT_RESULT_BUSY) - { - long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L; - pg_usleep(sleepIntervalPerCycle); - } - else - { - MultiClientDisconnect(connectionId); - return NIL; - } - } - - queryOK = MultiClientQueryResult(connectionId, &queryResult, &rowCount, &columnCount); - if (!queryOK) - { - MultiClientDisconnect(connectionId); + ReportResultError(multiConnection, queryResult, WARNING); + PQclear(queryResult); + CloseConnection(multiConnection); return NIL; } for (rowIndex = 0; rowIndex < rowCount; rowIndex++) { const int columnIndex = 0; - char *rowValue = MultiClientGetValue(queryResult, rowIndex, columnIndex); + char *rowValue = PQgetvalue(queryResult, rowIndex, columnIndex); StringInfo rowValueString = makeStringInfo(); appendStringInfoString(rowValueString, rowValue); @@ -1076,8 +1064,9 @@ ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser, resultList = lappend(resultList, rowValueString); } - MultiClientClearResult(queryResult); - MultiClientDisconnect(connectionId); + PQclear(queryResult); + ForgetResults(multiConnection); + CloseConnection(multiConnection); return resultList; } @@ -1092,52 +1081,41 @@ ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser, bool ExecuteRemoteCommand(const char *nodeName, uint32 nodePort, StringInfo queryString) { - char *nodeDatabase = get_database_name(MyDatabaseId); - int32 connectionId = -1; - QueryStatus queryStatus = CLIENT_INVALID_QUERY; bool querySent = false; - bool queryReady = false; bool queryDone = false; + int connectionFlags = FORCE_NEW_CONNECTION; + MultiConnection *multiConnection = + GetNodeConnection(connectionFlags, nodeName, nodePort); + PGresult *queryResult = NULL; + ExecStatusType resultStatus = PGRES_COMMAND_OK; - connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, NULL); - if (connectionId == INVALID_CONNECTION_ID) + querySent = ExecuteOptionalRemoteCommand(multiConnection, queryString->data, + &queryResult); + if (querySent != 0) { + CloseConnection(multiConnection); return false; } - querySent = MultiClientSendQuery(connectionId, queryString->data); - if (!querySent) - { - MultiClientDisconnect(connectionId); - return false; - } - - while (!queryReady) - { - ResultStatus resultStatus = MultiClientResultStatus(connectionId); - if (resultStatus == CLIENT_RESULT_READY) - { - queryReady = true; - } - else if (resultStatus == CLIENT_RESULT_BUSY) - { - long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L; - pg_usleep(sleepIntervalPerCycle); - } - else - { - MultiClientDisconnect(connectionId); - return false; - } - } - - queryStatus = MultiClientQueryStatus(connectionId); - if (queryStatus == CLIENT_QUERY_DONE) + resultStatus = PQresultStatus(queryResult); + if (resultStatus == PGRES_COMMAND_OK) { queryDone = true; } + else if (resultStatus == PGRES_TUPLES_OK) + { + /* + * We use the client executor to only issue a select query that returns + * a void value. We therefore should not have more than one value here. + */ + int PG_USED_FOR_ASSERTS_ONLY tupleCount = PQntuples(queryResult); + Assert(tupleCount <= 1); + + queryDone = true; + } + + CloseConnection(multiConnection); - MultiClientDisconnect(connectionId); return queryDone; }