pull/1201/merge
Eren Başak 2017-06-08 15:43:18 +00:00 committed by GitHub
commit ff79c51bbb
2 changed files with 43 additions and 44 deletions

View File

@ -15,6 +15,7 @@
#include "postgres.h" #include "postgres.h"
#include "funcapi.h" #include "funcapi.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "libpq-fe.h"
#include <unistd.h> #include <unistd.h>
#include <sys/stat.h> #include <sys/stat.h>
@ -26,12 +27,14 @@
#include "commands/extension.h" #include "commands/extension.h"
#include "commands/sequence.h" #include "commands/sequence.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/connection_management.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/task_tracker.h" #include "distributed/task_tracker.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
@ -1024,78 +1027,75 @@ ForeignFilePath(const char *nodeName, uint32 nodePort, const char *tableName)
/* /*
* ExecuteRemoteQuery executes the given query, copies the query's results to a * ExecuteRemoteQuery executes the given query, copies the query's results to a
* sorted list, and returns this list. The function assumes that query results * sorted list, and returns this list. The function assumes that query results
* have a single column, and asserts on that assumption. If results are empty, * have a single column and ignores other columns there are more than one. If
* or an error occurs during query runtime, the function returns an empty list. * an error occurs during query runtime, the function errors out. If userName is NULL,
* If asUser is NULL the connection is established as the current user, * the connection is established as the current user, otherwise as the specified user.
* otherwise as the specified user.
*/ */
List * List *
ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser, ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *userName,
StringInfo queryString) StringInfo queryString)
{ {
int32 connectionId = -1; MultiConnection *connection = NULL;
bool querySent = false; uint32 connectionFlags = 0;
bool queryReady = false; PGresult *queryResult = NULL;
bool queryOK = false; ExecStatusType resultStatus = PGRES_COMMAND_OK;
void *queryResult = NULL;
int rowCount = 0; int rowCount = 0;
int rowIndex = 0; int rowIndex = 0;
int columnCount = 0; int querySent = 0;
List *resultList = NIL; List *resultList = NIL;
connectionId = MultiClientConnect(nodeName, nodePort, NULL, runAsUser); /* get the connection to the node */
if (connectionId == INVALID_CONNECTION_ID) connection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
userName, NULL);
RemoteTransactionBeginIfNecessary(connection);
/* check the status of the connection */
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{ {
return NIL; ereport(ERROR, (errmsg("could not connect to node \"%s:%u\"",
nodeName, nodePort)));
} }
querySent = MultiClientSendQuery(connectionId, queryString->data); /* send the query and check if it is sent properly */
if (!querySent) querySent = SendRemoteCommand(connection, queryString->data);
if (querySent == 0)
{ {
MultiClientDisconnect(connectionId); ReportConnectionError(connection, ERROR);
return NIL;
} }
while (!queryReady) /* check if the results are fetched without a problem */
queryResult = GetRemoteCommandResult(connection, false);
if (!IsResponseOK(queryResult))
{ {
ResultStatus resultStatus = MultiClientResultStatus(connectionId); ReportResultError(connection, queryResult, ERROR);
if (resultStatus == CLIENT_RESULT_READY)
{
queryReady = true;
}
else if (resultStatus == CLIENT_RESULT_BUSY)
{
long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L;
pg_usleep(sleepIntervalPerCycle);
}
else
{
MultiClientDisconnect(connectionId);
return NIL;
}
} }
queryOK = MultiClientQueryResult(connectionId, &queryResult, &rowCount, &columnCount); /* treat the bad results as an empty result ret */
if (!queryOK) resultStatus = PQresultStatus(queryResult);
if (resultStatus != PGRES_TUPLES_OK)
{ {
MultiClientDisconnect(connectionId); ereport(ERROR, (errmsg("invalid query results from node \"%s:%u\"",
return NIL; nodeName, nodePort),
errdetail("Failed query: %s", queryString->data)));
} }
/* iterate over the results and add them to the result list as strings */
rowCount = PQntuples(queryResult);
for (rowIndex = 0; rowIndex < rowCount; rowIndex++) for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
{ {
const int columnIndex = 0; const int columnIndex = 0;
char *rowValue = MultiClientGetValue(queryResult, rowIndex, columnIndex); char *rowValue = PQgetvalue(queryResult, rowIndex, columnIndex);
StringInfo rowValueString = makeStringInfo(); StringInfo rowValueString = makeStringInfo();
appendStringInfoString(rowValueString, rowValue); appendStringInfoString(rowValueString, rowValue);
Assert(columnCount == 1);
resultList = lappend(resultList, rowValueString); resultList = lappend(resultList, rowValueString);
} }
MultiClientClearResult(queryResult); /* clean-up the connection to allow future queries */
MultiClientDisconnect(connectionId); PQclear(queryResult);
ForgetResults(connection);
return resultList; return resultList;
} }

View File

@ -472,9 +472,8 @@ COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636); COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
-- Test if there is no relation to copy data with the worker copy -- Test if there is no relation to copy data with the worker copy
COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
WARNING: relation "lineitem_copy_none" does not exist ERROR: relation "lineitem_copy_none" does not exist
CONTEXT: while executing command on localhost:57636 CONTEXT: while executing command on localhost:57636
ERROR: could not run copy from the worker node
-- Connect back to the master node -- Connect back to the master node
\c - - - 57636 \c - - - 57636
-- Test the content of the table -- Test the content of the table