diff --git a/src/backend/distributed/master/master_citus_tools.c b/src/backend/distributed/master/master_citus_tools.c index b46b60739..6ed438c45 100644 --- a/src/backend/distributed/master/master_citus_tools.c +++ b/src/backend/distributed/master/master_citus_tools.c @@ -21,6 +21,7 @@ #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/multi_server_executor.h" +#include "distributed/remote_commands.h" #include "distributed/worker_protocol.h" #include "lib/stringinfo.h" #include "utils/builtins.h" @@ -39,11 +40,11 @@ static void ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, bool *statusArray, StringInfo *resultStringArray, int commmandCount); -static bool GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, +static bool GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus, StringInfo queryResultString); -static bool EvaluateQueryResult(PGconn *connection, PGresult *queryResult, StringInfo - queryResultString); -static void StoreErrorMessage(PGconn *connection, StringInfo queryResultString); +static bool EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult, + StringInfo queryResultString); +static void StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString); static void ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray, StringInfo *commandStringArray, @@ -221,7 +222,8 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor int commmandCount) { int commandIndex = 0; - PGconn **connectionArray = palloc0(commmandCount * sizeof(PGconn *)); + MultiConnection **connectionArray = + palloc0(commmandCount * sizeof(MultiConnection *)); int finishedCount = 0; /* establish connections */ @@ -230,14 +232,13 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor char *nodeName = nodeNameArray[commandIndex]->data; int nodePort = nodePortArray[commandIndex]; int connectionFlags = FORCE_NEW_CONNECTION; - MultiConnection *multiConnection = + MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); - PGconn *connection = multiConnection->pgConn; StringInfo queryResultString = resultStringArray[commandIndex]; statusArray[commandIndex] = true; - if (PQstatus(multiConnection->pgConn) != CONNECTION_OK) + if (PQstatus(connection->pgConn) != CONNECTION_OK) { appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, (int) nodePort); @@ -254,7 +255,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) { int querySent = 0; - PGconn *connection = connectionArray[commandIndex]; + MultiConnection *connection = connectionArray[commandIndex]; char *queryString = commandStringArray[commandIndex]->data; StringInfo queryResultString = resultStringArray[commandIndex]; @@ -267,13 +268,17 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor continue; } - querySent = PQsendQuery(connection, queryString); + /* + * NB: this intentionally uses PQsendQuery rather than + * SendRemoteCommand as multiple commands are allowed. + */ + querySent = PQsendQuery(connection->pgConn, queryString); if (querySent == 0) { StoreErrorMessage(connection, queryResultString); statusArray[commandIndex] = false; - CloseConnectionByPGconn(connection); + CloseConnection(connection); connectionArray[commandIndex] = NULL; finishedCount++; } @@ -284,7 +289,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor { for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) { - PGconn *connection = connectionArray[commandIndex]; + MultiConnection *connection = connectionArray[commandIndex]; StringInfo queryResultString = resultStringArray[commandIndex]; bool success = false; bool queryFinished = false; @@ -302,7 +307,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor finishedCount++; statusArray[commandIndex] = success; connectionArray[commandIndex] = NULL; - CloseConnectionByPGconn(connection); + CloseConnection(connection); } } @@ -326,11 +331,11 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor * reported upon completion of the query. */ static bool -GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, +GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus, StringInfo queryResultString) { bool finished = true; - ConnStatusType connectionStatus = PQstatus(connection); + ConnStatusType connectionStatus = PQstatus(connection->pgConn); int consumeInput = 0; PGresult *queryResult = NULL; bool success = false; @@ -344,7 +349,7 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, return finished; } - consumeInput = PQconsumeInput(connection); + consumeInput = PQconsumeInput(connection->pgConn); if (consumeInput == 0) { appendStringInfo(queryResultString, "query result unavailable"); @@ -352,14 +357,14 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, } /* check later if busy */ - if (PQisBusy(connection) != 0) + if (PQisBusy(connection->pgConn) != 0) { finished = false; return finished; } /* query result is available at this point */ - queryResult = PQgetResult(connection); + queryResult = PQgetResult(connection->pgConn); success = EvaluateQueryResult(connection, queryResult, queryResultString); PQclear(queryResult); @@ -377,7 +382,7 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, * error otherwise. */ static bool -EvaluateQueryResult(PGconn *connection, PGresult *queryResult, +EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult, StringInfo queryResultString) { bool success = false; @@ -432,9 +437,9 @@ EvaluateQueryResult(PGconn *connection, PGresult *queryResult, * otherwise it would return a default error message. */ static void -StoreErrorMessage(PGconn *connection, StringInfo queryResultString) +StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString) { - char *errorMessage = PQerrorMessage(connection); + char *errorMessage = PQerrorMessage(connection->pgConn); if (errorMessage != NULL) { char *firstNewlineIndex = strchr(errorMessage, '\n'); @@ -497,26 +502,27 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, StringInfo queryResultString) { int connectionFlags = FORCE_NEW_CONNECTION; - MultiConnection *multiConnection = + MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); - PGconn *nodeConnection = multiConnection->pgConn; bool success = false; PGresult *queryResult = NULL; + bool raiseInterrupts = true; - if (PQstatus(multiConnection->pgConn) != CONNECTION_OK) + if (PQstatus(connection->pgConn) != CONNECTION_OK) { appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, (int) nodePort); return false; } - queryResult = PQexec(nodeConnection, queryString); - success = EvaluateQueryResult(nodeConnection, queryResult, queryResultString); + SendRemoteCommand(connection, queryString); + queryResult = GetRemoteCommandResult(connection, raiseInterrupts); + success = EvaluateQueryResult(connection, queryResult, queryResultString); PQclear(queryResult); /* close the connection */ - CloseConnection(multiConnection); + CloseConnection(connection); return success; }