diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 50b25a6cf..539b8060d 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -23,6 +23,7 @@ #include "distributed/connection_management.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_server_executor.h" +#include "distributed/remote_commands.h" #include #include @@ -39,7 +40,7 @@ /* Local pool to track active connections */ -static PGconn *ClientConnectionArray[MAX_CONNECTION_COUNT]; +static MultiConnection *ClientConnectionArray[MAX_CONNECTION_COUNT]; /* * The value at any position on ClientPollingStatusArray is only defined when @@ -49,8 +50,8 @@ static PostgresPollingStatusType ClientPollingStatusArray[MAX_CONNECTION_COUNT]; /* Local functions forward declarations */ -static void ClearRemainingResults(PGconn *connection); -static bool ClientConnectionReady(PGconn *connection, +static void ClearRemainingResults(MultiConnection *connection); +static bool ClientConnectionReady(MultiConnection *connection, PostgresPollingStatusType pollingStatus); @@ -64,7 +65,7 @@ AllocateConnectionId(void) /* allocate connectionId from connection pool */ for (connIndex = 0; connIndex < MAX_CONNECTION_COUNT; connIndex++) { - PGconn *connection = ClientConnectionArray[connIndex]; + MultiConnection *connection = ClientConnectionArray[connIndex]; if (connection == NULL) { connectionId = connIndex; @@ -88,12 +89,16 @@ int32 MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDatabase, const char *userName) { - PGconn *connection = NULL; - char connInfoString[STRING_BUFFER_SIZE]; + MultiConnection *connection = NULL; ConnStatusType connStatusType = CONNECTION_OK; int32 connectionId = AllocateConnectionId(); - char *effectiveDatabaseName = NULL; - char *effectiveUserName = NULL; + int connectionFlags = FORCE_NEW_CONNECTION; /* no cached connections for now */ + + if (connectionId == INVALID_CONNECTION_ID) + { + ereport(WARNING, (errmsg("could not allocate connection in connection pool"))); + return connectionId; + } if (XactModificationLevel > XACT_MODIFICATION_NONE) { @@ -102,46 +107,11 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba "command within a transaction"))); } - if (connectionId == INVALID_CONNECTION_ID) - { - ereport(WARNING, (errmsg("could not allocate connection in connection pool"))); - return connectionId; - } - - if (nodeDatabase == NULL) - { - effectiveDatabaseName = get_database_name(MyDatabaseId); - } - else - { - effectiveDatabaseName = pstrdup(nodeDatabase); - } - - if (userName == NULL) - { - effectiveUserName = CurrentUserName(); - } - else - { - effectiveUserName = pstrdup(userName); - } - - /* - * FIXME: This code is bad on several levels. It completely forgoes any - * escaping, it misses setting a number of parameters, it works with a - * limited string size without erroring when it's too long. We shouldn't - * even build a query string this way, there's PQconnectdbParams()! - */ - - /* transcribe connection paremeters to string */ - snprintf(connInfoString, STRING_BUFFER_SIZE, CONN_INFO_TEMPLATE, - nodeName, nodePort, - effectiveDatabaseName, effectiveUserName, - CLIENT_CONNECT_TIMEOUT); - /* establish synchronous connection to worker node */ - connection = PQconnectdb(connInfoString); - connStatusType = PQstatus(connection); + connection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, + userName, nodeDatabase); + + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_OK) { @@ -149,15 +119,11 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba } else { - WarnRemoteError(connection, NULL); - - PQfinish(connection); + ReportConnectionError(connection, WARNING); + CloseConnection(connection); connectionId = INVALID_CONNECTION_ID; } - pfree(effectiveDatabaseName); - pfree(effectiveUserName); - return connectionId; } @@ -170,12 +136,11 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeDatabase) { - PGconn *connection = NULL; - char connInfoString[STRING_BUFFER_SIZE]; - ConnStatusType connStatusType = CONNECTION_BAD; - char *userName = CurrentUserName(); - + MultiConnection *connection = NULL; + ConnStatusType connStatusType = CONNECTION_OK; int32 connectionId = AllocateConnectionId(); + int connectionFlags = FORCE_NEW_CONNECTION; /* no cached connections for now */ + if (connectionId == INVALID_CONNECTION_ID) { ereport(WARNING, (errmsg("could not allocate connection in connection pool"))); @@ -189,13 +154,9 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD "command within a transaction"))); } - /* transcribe connection paremeters to string */ - snprintf(connInfoString, STRING_BUFFER_SIZE, CONN_INFO_TEMPLATE, - nodeName, nodePort, nodeDatabase, userName, CLIENT_CONNECT_TIMEOUT); - /* prepare asynchronous request for worker node connection */ - connection = PQconnectStart(connInfoString); - connStatusType = PQstatus(connection); + connection = StartNodeConnection(connectionFlags, nodeName, nodePort); + connStatusType = PQstatus(connection->pgConn); /* * If prepared, we save the connection, and set its initial polling status @@ -209,9 +170,9 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD } else { - WarnRemoteError(connection, NULL); + ReportConnectionError(connection, WARNING); + CloseConnection(connection); - PQfinish(connection); connectionId = INVALID_CONNECTION_ID; } @@ -223,7 +184,7 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD ConnectStatus MultiClientConnectPoll(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; PostgresPollingStatusType pollingStatus = PGRES_POLLING_OK; ConnectStatus connectStatus = CLIENT_INVALID_CONNECT; @@ -241,7 +202,7 @@ MultiClientConnectPoll(int32 connectionId) bool readReady = ClientConnectionReady(connection, PGRES_POLLING_READING); if (readReady) { - ClientPollingStatusArray[connectionId] = PQconnectPoll(connection); + ClientPollingStatusArray[connectionId] = PQconnectPoll(connection->pgConn); connectStatus = CLIENT_CONNECTION_BUSY; } else @@ -254,7 +215,7 @@ MultiClientConnectPoll(int32 connectionId) bool writeReady = ClientConnectionReady(connection, PGRES_POLLING_WRITING); if (writeReady) { - ClientPollingStatusArray[connectionId] = PQconnectPoll(connection); + ClientPollingStatusArray[connectionId] = PQconnectPoll(connection->pgConn); connectStatus = CLIENT_CONNECTION_BUSY; } else @@ -264,7 +225,7 @@ MultiClientConnectPoll(int32 connectionId) } else if (pollingStatus == PGRES_POLLING_FAILED) { - WarnRemoteError(connection, NULL); + ReportConnectionError(connection, WARNING); connectStatus = CLIENT_CONNECTION_BAD; } @@ -277,14 +238,14 @@ MultiClientConnectPoll(int32 connectionId) void MultiClientDisconnect(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; const int InvalidPollingStatus = -1; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - PQfinish(connection); + CloseConnection(connection); ClientConnectionArray[connectionId] = NULL; ClientPollingStatusArray[connectionId] = InvalidPollingStatus; @@ -298,7 +259,7 @@ MultiClientDisconnect(int32 connectionId) bool MultiClientConnectionUp(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; ConnStatusType connStatusType = CONNECTION_OK; bool connectionUp = true; @@ -306,7 +267,7 @@ MultiClientConnectionUp(int32 connectionId) connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - connStatusType = PQstatus(connection); + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_BAD) { connectionUp = false; @@ -340,7 +301,7 @@ MultiClientExecute(int32 connectionId, const char *query, void **queryResult, bool MultiClientSendQuery(int32 connectionId, const char *query) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; bool success = true; int querySent = 0; @@ -348,10 +309,10 @@ MultiClientSendQuery(int32 connectionId, const char *query) connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - querySent = PQsendQuery(connection, query); + querySent = PQsendQuery(connection->pgConn, query); if (querySent == 0) { - char *errorMessage = PQerrorMessage(connection); + char *errorMessage = PQerrorMessage(connection->pgConn); ereport(WARNING, (errmsg("could not send remote query \"%s\"", query), errdetail("Client error: %s", errorMessage))); @@ -366,7 +327,7 @@ MultiClientSendQuery(int32 connectionId, const char *query) bool MultiClientCancel(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; PGcancel *cancelObject = NULL; int cancelSent = 0; bool canceled = true; @@ -376,7 +337,7 @@ MultiClientCancel(int32 connectionId) connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - cancelObject = PQgetCancel(connection); + cancelObject = PQgetCancel(connection->pgConn); cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer)); if (cancelSent == 0) @@ -397,7 +358,7 @@ MultiClientCancel(int32 connectionId) ResultStatus MultiClientResultStatus(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; int consumed = 0; ConnStatusType connStatusType = CONNECTION_OK; ResultStatus resultStatus = CLIENT_INVALID_RESULT_STATUS; @@ -406,7 +367,7 @@ MultiClientResultStatus(int32 connectionId) connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - connStatusType = PQstatus(connection); + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_BAD) { ereport(WARNING, (errmsg("could not maintain connection to worker node"))); @@ -414,10 +375,10 @@ MultiClientResultStatus(int32 connectionId) } /* consume input to allow status change */ - consumed = PQconsumeInput(connection); + consumed = PQconsumeInput(connection->pgConn); if (consumed != 0) { - int connectionBusy = PQisBusy(connection); + int connectionBusy = PQisBusy(connection->pgConn); if (connectionBusy == 0) { resultStatus = CLIENT_RESULT_READY; @@ -442,7 +403,7 @@ bool MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, int *columnCount) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; PGresult *result = NULL; ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; @@ -451,14 +412,14 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - connStatusType = PQstatus(connection); + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_BAD) { ereport(WARNING, (errmsg("could not maintain connection to worker node"))); return false; } - result = PQgetResult(connection); + result = PQgetResult(connection->pgConn); resultStatus = PQresultStatus(result); if (resultStatus == PGRES_TUPLES_OK) { @@ -468,7 +429,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, } else { - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); PQclear(result); return false; @@ -494,7 +455,7 @@ BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, int *columnCount) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; PGresult *result = NULL; ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; @@ -509,14 +470,14 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, (*rowCount) = -1; (*columnCount) = -1; - connStatusType = PQstatus(connection); + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_BAD) { ereport(WARNING, (errmsg("could not maintain connection to worker node"))); return CLIENT_BATCH_QUERY_FAILED; } - result = PQgetResult(connection); + result = PQgetResult(connection->pgConn); if (result == NULL) { return CLIENT_BATCH_QUERY_DONE; @@ -537,7 +498,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, } else { - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); PQclear(result); queryStatus = CLIENT_BATCH_QUERY_FAILED; } @@ -576,7 +537,7 @@ MultiClientClearResult(void *queryResult) QueryStatus MultiClientQueryStatus(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; PGresult *result = NULL; int tupleCount PG_USED_FOR_ASSERTS_ONLY = 0; bool copyResults = false; @@ -588,7 +549,7 @@ MultiClientQueryStatus(int32 connectionId) connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - connStatusType = PQstatus(connection); + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_BAD) { ereport(WARNING, (errmsg("could not maintain connection to worker node"))); @@ -600,7 +561,7 @@ MultiClientQueryStatus(int32 connectionId) * isn't ready yet (the caller didn't wait for the connection to be ready), * we will block on this call. */ - result = PQgetResult(connection); + result = PQgetResult(connection->pgConn); resultStatus = PQresultStatus(result); if (resultStatus == PGRES_COMMAND_OK) @@ -631,7 +592,7 @@ MultiClientQueryStatus(int32 connectionId) copyResults = true; } - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); } /* clear the result object */ @@ -654,7 +615,7 @@ MultiClientQueryStatus(int32 connectionId) CopyStatus MultiClientCopyData(int32 connectionId, int32 fileDescriptor) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; char *receiveBuffer = NULL; int consumed = 0; int receiveLength = 0; @@ -669,7 +630,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) * Consume input to handle the case where previous copy operation might have * received zero bytes. */ - consumed = PQconsumeInput(connection); + consumed = PQconsumeInput(connection->pgConn); if (consumed == 0) { ereport(WARNING, (errmsg("could not read data from worker node"))); @@ -677,7 +638,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) } /* receive copy data message in an asynchronous manner */ - receiveLength = PQgetCopyData(connection, &receiveBuffer, asynchronous); + receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous); while (receiveLength > 0) { /* received copy data; append these data to file */ @@ -698,7 +659,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) PQfreemem(receiveBuffer); - receiveLength = PQgetCopyData(connection, &receiveBuffer, asynchronous); + receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous); } /* we now check the last received length returned by copy data */ @@ -710,7 +671,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) else if (receiveLength == -1) { /* received copy done message */ - PGresult *result = PQgetResult(connection); + PGresult *result = PQgetResult(connection->pgConn); ExecStatusType resultStatus = PQresultStatus(result); if (resultStatus == PGRES_COMMAND_OK) @@ -721,7 +682,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) { copyStatus = CLIENT_COPY_FAILED; - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); } PQclear(result); @@ -731,7 +692,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) /* received an error */ copyStatus = CLIENT_COPY_FAILED; - WarnRemoteError(connection, NULL); + ReportConnectionError(connection, WARNING); } /* if copy out completed, make sure we drain all results from libpq */ @@ -794,7 +755,7 @@ void MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus, int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; struct pollfd *pollfd = NULL; Assert(waitInfo->registeredWaiters < waitInfo->maxWaiters); @@ -812,7 +773,7 @@ MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus, connection = ClientConnectionArray[connectionId]; pollfd = &waitInfo->pollfds[waitInfo->registeredWaiters]; - pollfd->fd = PQsocket(connection); + pollfd->fd = PQsocket(connection->pgConn); if (executionStatus == TASK_STATUS_SOCKET_READ) { pollfd->events = POLLERR | POLLIN; @@ -904,13 +865,13 @@ MultiClientWait(WaitInfo *waitInfo) * query. */ static void -ClearRemainingResults(PGconn *connection) +ClearRemainingResults(MultiConnection *connection) { - PGresult *result = PQgetResult(connection); + PGresult *result = PQgetResult(connection->pgConn); while (result != NULL) { PQclear(result); - result = PQgetResult(connection); + result = PQgetResult(connection->pgConn); } } @@ -921,7 +882,8 @@ ClearRemainingResults(PGconn *connection) * and libpq_select() at libpqwalreceiver.c. */ static bool -ClientConnectionReady(PGconn *connection, PostgresPollingStatusType pollingStatus) +ClientConnectionReady(MultiConnection *connection, + PostgresPollingStatusType pollingStatus) { bool clientConnectionReady = false; int pollResult = 0; @@ -942,7 +904,7 @@ ClientConnectionReady(PGconn *connection, PostgresPollingStatusType pollingStatu pollEventMask = POLLERR | POLLOUT; } - pollFileDescriptor.fd = PQsocket(connection); + pollFileDescriptor.fd = PQsocket(connection->pgConn); pollFileDescriptor.events = pollEventMask; pollFileDescriptor.revents = 0; diff --git a/src/include/distributed/multi_client_executor.h b/src/include/distributed/multi_client_executor.h index e9658ee22..56d889f4d 100644 --- a/src/include/distributed/multi_client_executor.h +++ b/src/include/distributed/multi_client_executor.h @@ -15,10 +15,8 @@ #define MULTI_CLIENT_EXECUTOR_H #define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */ -#define CLIENT_CONNECT_TIMEOUT 5 /* connection timeout in seconds */ #define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */ #define STRING_BUFFER_SIZE 1024 /* buffer size for character arrays */ -#define CONN_INFO_TEMPLATE "host=%s port=%u dbname=%s user=%s connect_timeout=%u" /* Enumeration to track one client connection's status */