Merge pull request #1454 from citusdata/phase_out_execute_remote_query

Phase out execute remote query and command
pull/1416/head
Burak Velioglu 2017-06-16 11:48:12 +03:00 committed by GitHub
commit bfde7fcd5a
4 changed files with 45 additions and 68 deletions

View File

@ -15,12 +15,15 @@
#include "postgres.h" #include "postgres.h"
#include "funcapi.h" #include "funcapi.h"
#include "libpq-fe.h"
#include "catalog/pg_class.h" #include "catalog/pg_class.h"
#include "distributed/connection_management.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/remote_commands.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "utils/builtins.h" #include "utils/builtins.h"
@ -154,6 +157,9 @@ DropShardsFromWorker(WorkerNode *workerNode, Oid relationId, List *shardInterval
StringInfo workerCommand = makeStringInfo(); StringInfo workerCommand = makeStringInfo();
StringInfo shardNames = makeStringInfo(); StringInfo shardNames = makeStringInfo();
ListCell *shardIntervalCell = NULL; ListCell *shardIntervalCell = NULL;
MultiConnection *connection = NULL;
int connectionFlag = FORCE_NEW_CONNECTION;
PGresult *result = NULL;
if (shardIntervalList == NIL) if (shardIntervalList == NIL)
{ {
@ -191,5 +197,7 @@ DropShardsFromWorker(WorkerNode *workerNode, Oid relationId, List *shardInterval
errmsg("expire target is not a regular or foreign table"))); errmsg("expire target is not a regular or foreign table")));
} }
ExecuteRemoteCommand(workerNode->workerName, workerNode->workerPort, workerCommand); connection = GetNodeConnection(connectionFlag, workerNode->workerName,
workerNode->workerPort);
ExecuteOptionalRemoteCommand(connection, workerCommand->data, &result);
} }

View File

@ -190,8 +190,12 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQ
char *workerNodeName = workerNode->workerName; char *workerNodeName = workerNode->workerName;
uint32 workerNodePort = workerNode->workerPort; uint32 workerNodePort = workerNode->workerPort;
char *tableSizeString; char *tableSizeString;
List *sizeList = NIL;
uint64 tableSize = 0; uint64 tableSize = 0;
MultiConnection *connection = NULL;
uint32 connectionFlag = FORCE_NEW_CONNECTION;
PGresult *result = NULL;
int queryResult = 0;
List *sizeList = NIL;
List *shardIntervalsOnNode = ShardIntervalsOnWorkerNode(workerNode, relationId); List *shardIntervalsOnNode = ShardIntervalsOnWorkerNode(workerNode, relationId);
@ -199,14 +203,16 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQ
shardIntervalsOnNode, shardIntervalsOnNode,
sizeQuery); sizeQuery);
sizeList = ExecuteRemoteQuery(workerNodeName, workerNodePort, NULL, tableSizeQuery); connection = GetNodeConnection(connectionFlag, workerNodeName, workerNodePort);
queryResult = ExecuteOptionalRemoteCommand(connection, tableSizeQuery->data, &result);
if (sizeList == NIL) if (queryResult != 0)
{ {
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot get the size because of a connection error"))); errmsg("cannot get the size because of a connection error")));
} }
sizeList = ReadFirstColumnAsText(result);
tableSizeStringInfo = (StringInfo) linitial(sizeList); tableSizeStringInfo = (StringInfo) linitial(sizeList);
tableSizeString = tableSizeStringInfo->data; tableSizeString = tableSizeStringInfo->data;
tableSize = atol(tableSizeString); tableSize = atol(tableSizeString);

View File

@ -14,6 +14,7 @@
#include "postgres.h" #include "postgres.h"
#include "funcapi.h" #include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h" #include "miscadmin.h"
#include <unistd.h> #include <unistd.h>
#include <sys/stat.h> #include <sys/stat.h>
@ -26,12 +27,14 @@
#include "commands/extension.h" #include "commands/extension.h"
#include "commands/sequence.h" #include "commands/sequence.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/connection_management.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/task_tracker.h" #include "distributed/task_tracker.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
@ -961,11 +964,17 @@ RemoteTableOwner(const char *nodeName, uint32 nodePort, const char *tableName)
List *ownerList = NIL; List *ownerList = NIL;
StringInfo queryString = NULL; StringInfo queryString = NULL;
StringInfo relationOwner; StringInfo relationOwner;
MultiConnection *connection = NULL;
uint32 connectionFlag = FORCE_NEW_CONNECTION;
PGresult *result = NULL;
queryString = makeStringInfo(); queryString = makeStringInfo();
appendStringInfo(queryString, GET_TABLE_OWNER, tableName); appendStringInfo(queryString, GET_TABLE_OWNER, tableName);
connection = GetNodeConnection(connectionFlag, nodeName, nodePort);
ownerList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString); ExecuteOptionalRemoteCommand(connection, queryString->data, &result);
ownerList = ReadFirstColumnAsText(result);
if (list_length(ownerList) != 1) if (list_length(ownerList) != 1)
{ {
return NULL; return NULL;
@ -987,11 +996,20 @@ TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *tableName
{ {
List *ddlCommandList = NIL; List *ddlCommandList = NIL;
StringInfo queryString = NULL; StringInfo queryString = NULL;
MultiConnection *connection = NULL;
PGresult *result = NULL;
uint32 connectionFlag = FORCE_NEW_CONNECTION;
queryString = makeStringInfo(); queryString = makeStringInfo();
appendStringInfo(queryString, GET_TABLE_DDL_EVENTS, tableName); appendStringInfo(queryString, GET_TABLE_DDL_EVENTS, tableName);
connection = GetNodeConnection(connectionFlag, nodeName, nodePort);
ExecuteOptionalRemoteCommand(connection, queryString->data, &result);
ddlCommandList = ReadFirstColumnAsText(result);
ForgetResults(connection);
CloseConnection(connection);
ddlCommandList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString);
return ddlCommandList; return ddlCommandList;
} }
@ -1007,11 +1025,17 @@ ForeignFilePath(const char *nodeName, uint32 nodePort, const char *tableName)
List *foreignPathList = NIL; List *foreignPathList = NIL;
StringInfo foreignPathCommand = NULL; StringInfo foreignPathCommand = NULL;
StringInfo foreignPath = NULL; StringInfo foreignPath = NULL;
MultiConnection *connection = NULL;
PGresult *result = NULL;
int connectionFlag = FORCE_NEW_CONNECTION;
foreignPathCommand = makeStringInfo(); foreignPathCommand = makeStringInfo();
appendStringInfo(foreignPathCommand, FOREIGN_FILE_PATH_COMMAND, tableName); appendStringInfo(foreignPathCommand, FOREIGN_FILE_PATH_COMMAND, tableName);
connection = GetNodeConnection(connectionFlag, nodeName, nodePort);
foreignPathList = ExecuteRemoteQuery(nodeName, nodePort, NULL, foreignPathCommand); ExecuteOptionalRemoteCommand(connection, foreignPathCommand->data, &result);
foreignPathList = ReadFirstColumnAsText(result);
if (foreignPathList != NIL) if (foreignPathList != NIL)
{ {
foreignPath = (StringInfo) linitial(foreignPathList); foreignPath = (StringInfo) linitial(foreignPathList);
@ -1101,65 +1125,6 @@ ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser,
} }
/*
* ExecuteRemoteCommand executes the given SQL command. This command could be an
* Insert, Update, or Delete statement, or a utility command that returns
* nothing. If query is successfuly executed, the function returns true.
* Otherwise, it returns false.
*/
bool
ExecuteRemoteCommand(const char *nodeName, uint32 nodePort, StringInfo queryString)
{
char *nodeDatabase = get_database_name(MyDatabaseId);
int32 connectionId = -1;
QueryStatus queryStatus = CLIENT_INVALID_QUERY;
bool querySent = false;
bool queryReady = false;
bool queryDone = false;
connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, NULL);
if (connectionId == INVALID_CONNECTION_ID)
{
return false;
}
querySent = MultiClientSendQuery(connectionId, queryString->data);
if (!querySent)
{
MultiClientDisconnect(connectionId);
return false;
}
while (!queryReady)
{
ResultStatus resultStatus = MultiClientResultStatus(connectionId);
if (resultStatus == CLIENT_RESULT_READY)
{
queryReady = true;
}
else if (resultStatus == CLIENT_RESULT_BUSY)
{
long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L;
pg_usleep(sleepIntervalPerCycle);
}
else
{
MultiClientDisconnect(connectionId);
return false;
}
}
queryStatus = MultiClientQueryStatus(connectionId);
if (queryStatus == CLIENT_QUERY_DONE)
{
queryDone = true;
}
MultiClientDisconnect(connectionId);
return queryDone;
}
/* /*
* Parses the given DDL command, and returns the tree node for parsed command. * Parses the given DDL command, and returns the tree node for parsed command.
*/ */

View File

@ -128,8 +128,6 @@ extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId); extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId);
extern List * ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser, extern List * ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser,
StringInfo queryString); StringInfo queryString);
extern bool ExecuteRemoteCommand(const char *nodeName, uint32 nodePort,
StringInfo queryString);
extern List * ColumnDefinitionList(List *columnNameList, List *columnTypeList); extern List * ColumnDefinitionList(List *columnNameList, List *columnTypeList);
extern CreateStmt * CreateStatement(RangeVar *relation, List *columnDefinitionList); extern CreateStmt * CreateStatement(RangeVar *relation, List *columnDefinitionList);
extern CopyStmt * CopyStatement(RangeVar *relation, char *sourceFilename); extern CopyStmt * CopyStatement(RangeVar *relation, char *sourceFilename);