diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 4e1045f57..cf5ff7806 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -15,6 +15,7 @@ #include "postgres.h" #include "funcapi.h" #include "miscadmin.h" +#include "libpq-fe.h" #include #include @@ -26,12 +27,14 @@ #include "commands/extension.h" #include "commands/sequence.h" #include "distributed/citus_ruleutils.h" +#include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_server_executor.h" #include "distributed/relay_utility.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/task_tracker.h" #include "distributed/worker_protocol.h" @@ -1024,78 +1027,75 @@ ForeignFilePath(const char *nodeName, uint32 nodePort, const char *tableName) /* * ExecuteRemoteQuery executes the given query, copies the query's results to a * sorted list, and returns this list. The function assumes that query results - * have a single column, and asserts on that assumption. If results are empty, - * or an error occurs during query runtime, the function returns an empty list. - * If asUser is NULL the connection is established as the current user, - * otherwise as the specified user. + * have a single column and ignores other columns there are more than one. If + * an error occurs during query runtime, the function errors out. If userName is NULL, + * the connection is established as the current user, otherwise as the specified user. */ List * -ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser, +ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *userName, StringInfo queryString) { - int32 connectionId = -1; - bool querySent = false; - bool queryReady = false; - bool queryOK = false; - void *queryResult = NULL; + MultiConnection *connection = NULL; + uint32 connectionFlags = 0; + PGresult *queryResult = NULL; + ExecStatusType resultStatus = PGRES_COMMAND_OK; int rowCount = 0; int rowIndex = 0; - int columnCount = 0; + int querySent = 0; List *resultList = NIL; - connectionId = MultiClientConnect(nodeName, nodePort, NULL, runAsUser); - if (connectionId == INVALID_CONNECTION_ID) + /* get the connection to the node */ + connection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, + userName, NULL); + + RemoteTransactionBeginIfNecessary(connection); + + /* check the status of the connection */ + if (PQstatus(connection->pgConn) != CONNECTION_OK) { - return NIL; + ereport(ERROR, (errmsg("could not connect to node \"%s:%u\"", + nodeName, nodePort))); } - querySent = MultiClientSendQuery(connectionId, queryString->data); - if (!querySent) + /* send the query and check if it is sent properly */ + querySent = SendRemoteCommand(connection, queryString->data); + if (querySent == 0) { - MultiClientDisconnect(connectionId); - return NIL; + ReportConnectionError(connection, ERROR); } - while (!queryReady) + /* check if the results are fetched without a problem */ + queryResult = GetRemoteCommandResult(connection, false); + if (!IsResponseOK(queryResult)) { - ResultStatus resultStatus = MultiClientResultStatus(connectionId); - if (resultStatus == CLIENT_RESULT_READY) - { - queryReady = true; - } - else if (resultStatus == CLIENT_RESULT_BUSY) - { - long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L; - pg_usleep(sleepIntervalPerCycle); - } - else - { - MultiClientDisconnect(connectionId); - return NIL; - } + ReportResultError(connection, queryResult, ERROR); } - queryOK = MultiClientQueryResult(connectionId, &queryResult, &rowCount, &columnCount); - if (!queryOK) + /* treat the bad results as an empty result ret */ + resultStatus = PQresultStatus(queryResult); + if (resultStatus != PGRES_TUPLES_OK) { - MultiClientDisconnect(connectionId); - return NIL; + ereport(ERROR, (errmsg("invalid query results from node \"%s:%u\"", + nodeName, nodePort), + errdetail("Failed query: %s", queryString->data))); } + /* iterate over the results and add them to the result list as strings */ + rowCount = PQntuples(queryResult); for (rowIndex = 0; rowIndex < rowCount; rowIndex++) { const int columnIndex = 0; - char *rowValue = MultiClientGetValue(queryResult, rowIndex, columnIndex); + char *rowValue = PQgetvalue(queryResult, rowIndex, columnIndex); StringInfo rowValueString = makeStringInfo(); appendStringInfoString(rowValueString, rowValue); - Assert(columnCount == 1); resultList = lappend(resultList, rowValueString); } - MultiClientClearResult(queryResult); - MultiClientDisconnect(connectionId); + /* clean-up the connection to allow future queries */ + PQclear(queryResult); + ForgetResults(connection); return resultList; } diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 3a68f0f21..f4071f37c 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -472,9 +472,8 @@ COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with ( COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636); -- Test if there is no relation to copy data with the worker copy COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); -WARNING: relation "lineitem_copy_none" does not exist +ERROR: relation "lineitem_copy_none" does not exist CONTEXT: while executing command on localhost:57636 -ERROR: could not run copy from the worker node -- Connect back to the master node \c - - - 57636 -- Test the content of the table