diff --git a/src/backend/distributed/master/master_expire_table_cache.c b/src/backend/distributed/master/master_expire_table_cache.c index 8d4f38b03..ce090a495 100644 --- a/src/backend/distributed/master/master_expire_table_cache.c +++ b/src/backend/distributed/master/master_expire_table_cache.c @@ -15,12 +15,15 @@ #include "postgres.h" #include "funcapi.h" +#include "libpq-fe.h" #include "catalog/pg_class.h" +#include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/pg_dist_shard.h" +#include "distributed/remote_commands.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "utils/builtins.h" @@ -154,6 +157,9 @@ DropShardsFromWorker(WorkerNode *workerNode, Oid relationId, List *shardInterval StringInfo workerCommand = makeStringInfo(); StringInfo shardNames = makeStringInfo(); ListCell *shardIntervalCell = NULL; + MultiConnection *connection = NULL; + int connectionFlag = FORCE_NEW_CONNECTION; + PGresult *result = NULL; if (shardIntervalList == NIL) { @@ -191,5 +197,7 @@ DropShardsFromWorker(WorkerNode *workerNode, Oid relationId, List *shardInterval errmsg("expire target is not a regular or foreign table"))); } - ExecuteRemoteCommand(workerNode->workerName, workerNode->workerPort, workerCommand); + connection = GetNodeConnection(connectionFlag, workerNode->workerName, + workerNode->workerPort); + ExecuteOptionalRemoteCommand(connection, workerCommand->data, &result); } diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 4081ad35b..ad7e293c0 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -190,8 +190,12 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQ char *workerNodeName = workerNode->workerName; uint32 workerNodePort = workerNode->workerPort; char *tableSizeString; - List *sizeList = NIL; uint64 tableSize = 0; + MultiConnection *connection = NULL; + uint32 connectionFlag = FORCE_NEW_CONNECTION; + PGresult *result = NULL; + int queryResult = 0; + List *sizeList = NIL; List *shardIntervalsOnNode = ShardIntervalsOnWorkerNode(workerNode, relationId); @@ -199,14 +203,16 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQ shardIntervalsOnNode, sizeQuery); - sizeList = ExecuteRemoteQuery(workerNodeName, workerNodePort, NULL, tableSizeQuery); + connection = GetNodeConnection(connectionFlag, workerNodeName, workerNodePort); + queryResult = ExecuteOptionalRemoteCommand(connection, tableSizeQuery->data, &result); - if (sizeList == NIL) + if (queryResult != 0) { ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("cannot get the size because of a connection error"))); } + sizeList = ReadFirstColumnAsText(result); tableSizeStringInfo = (StringInfo) linitial(sizeList); tableSizeString = tableSizeStringInfo->data; tableSize = atol(tableSizeString); diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 4e1045f57..30c197242 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -14,6 +14,7 @@ #include "postgres.h" #include "funcapi.h" +#include "libpq-fe.h" #include "miscadmin.h" #include #include @@ -26,12 +27,14 @@ #include "commands/extension.h" #include "commands/sequence.h" #include "distributed/citus_ruleutils.h" +#include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_server_executor.h" #include "distributed/relay_utility.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/task_tracker.h" #include "distributed/worker_protocol.h" @@ -961,11 +964,17 @@ RemoteTableOwner(const char *nodeName, uint32 nodePort, const char *tableName) List *ownerList = NIL; StringInfo queryString = NULL; StringInfo relationOwner; + MultiConnection *connection = NULL; + uint32 connectionFlag = FORCE_NEW_CONNECTION; + PGresult *result = NULL; queryString = makeStringInfo(); appendStringInfo(queryString, GET_TABLE_OWNER, tableName); + connection = GetNodeConnection(connectionFlag, nodeName, nodePort); - ownerList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString); + ExecuteOptionalRemoteCommand(connection, queryString->data, &result); + + ownerList = ReadFirstColumnAsText(result); if (list_length(ownerList) != 1) { return NULL; @@ -987,11 +996,20 @@ TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *tableName { List *ddlCommandList = NIL; StringInfo queryString = NULL; + MultiConnection *connection = NULL; + PGresult *result = NULL; + uint32 connectionFlag = FORCE_NEW_CONNECTION; queryString = makeStringInfo(); appendStringInfo(queryString, GET_TABLE_DDL_EVENTS, tableName); + connection = GetNodeConnection(connectionFlag, nodeName, nodePort); + + ExecuteOptionalRemoteCommand(connection, queryString->data, &result); + ddlCommandList = ReadFirstColumnAsText(result); + + ForgetResults(connection); + CloseConnection(connection); - ddlCommandList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString); return ddlCommandList; } @@ -1007,11 +1025,17 @@ ForeignFilePath(const char *nodeName, uint32 nodePort, const char *tableName) List *foreignPathList = NIL; StringInfo foreignPathCommand = NULL; StringInfo foreignPath = NULL; + MultiConnection *connection = NULL; + PGresult *result = NULL; + int connectionFlag = FORCE_NEW_CONNECTION; foreignPathCommand = makeStringInfo(); appendStringInfo(foreignPathCommand, FOREIGN_FILE_PATH_COMMAND, tableName); + connection = GetNodeConnection(connectionFlag, nodeName, nodePort); - foreignPathList = ExecuteRemoteQuery(nodeName, nodePort, NULL, foreignPathCommand); + ExecuteOptionalRemoteCommand(connection, foreignPathCommand->data, &result); + + foreignPathList = ReadFirstColumnAsText(result); if (foreignPathList != NIL) { foreignPath = (StringInfo) linitial(foreignPathList); @@ -1101,65 +1125,6 @@ ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser, } -/* - * ExecuteRemoteCommand executes the given SQL command. This command could be an - * Insert, Update, or Delete statement, or a utility command that returns - * nothing. If query is successfuly executed, the function returns true. - * Otherwise, it returns false. - */ -bool -ExecuteRemoteCommand(const char *nodeName, uint32 nodePort, StringInfo queryString) -{ - char *nodeDatabase = get_database_name(MyDatabaseId); - int32 connectionId = -1; - QueryStatus queryStatus = CLIENT_INVALID_QUERY; - bool querySent = false; - bool queryReady = false; - bool queryDone = false; - - connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, NULL); - if (connectionId == INVALID_CONNECTION_ID) - { - return false; - } - - querySent = MultiClientSendQuery(connectionId, queryString->data); - if (!querySent) - { - MultiClientDisconnect(connectionId); - return false; - } - - while (!queryReady) - { - ResultStatus resultStatus = MultiClientResultStatus(connectionId); - if (resultStatus == CLIENT_RESULT_READY) - { - queryReady = true; - } - else if (resultStatus == CLIENT_RESULT_BUSY) - { - long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L; - pg_usleep(sleepIntervalPerCycle); - } - else - { - MultiClientDisconnect(connectionId); - return false; - } - } - - queryStatus = MultiClientQueryStatus(connectionId); - if (queryStatus == CLIENT_QUERY_DONE) - { - queryDone = true; - } - - MultiClientDisconnect(connectionId); - return queryDone; -} - - /* * Parses the given DDL command, and returns the tree node for parsed command. */ diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index a8f074be3..bf0760159 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -128,8 +128,6 @@ extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort, extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId); extern List * ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser, StringInfo queryString); -extern bool ExecuteRemoteCommand(const char *nodeName, uint32 nodePort, - StringInfo queryString); extern List * ColumnDefinitionList(List *columnNameList, List *columnTypeList); extern CreateStmt * CreateStatement(RangeVar *relation, List *columnDefinitionList); extern CopyStmt * CopyStatement(RangeVar *relation, char *sourceFilename);