Move multi_client_executor.[ch] ontop of connection_management.[ch].

That way connections can be automatically closed after errors and such,
and the connection management infrastructure gets wider testing.  It
also fixes a few issues around connection string building.
pull/863/head
Andres Freund 2016-12-01 14:36:42 -08:00
parent a77cf36778
commit 2374905c89
2 changed files with 71 additions and 111 deletions

View File

@ -23,6 +23,7 @@
#include "distributed/connection_management.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_server_executor.h"
#include "distributed/remote_commands.h"
#include <errno.h>
#include <unistd.h>
@ -39,7 +40,7 @@
/* Local pool to track active connections */
static PGconn *ClientConnectionArray[MAX_CONNECTION_COUNT];
static MultiConnection *ClientConnectionArray[MAX_CONNECTION_COUNT];
/*
* The value at any position on ClientPollingStatusArray is only defined when
@ -49,8 +50,8 @@ static PostgresPollingStatusType ClientPollingStatusArray[MAX_CONNECTION_COUNT];
/* Local functions forward declarations */
static void ClearRemainingResults(PGconn *connection);
static bool ClientConnectionReady(PGconn *connection,
static void ClearRemainingResults(MultiConnection *connection);
static bool ClientConnectionReady(MultiConnection *connection,
PostgresPollingStatusType pollingStatus);
@ -64,7 +65,7 @@ AllocateConnectionId(void)
/* allocate connectionId from connection pool */
for (connIndex = 0; connIndex < MAX_CONNECTION_COUNT; connIndex++)
{
PGconn *connection = ClientConnectionArray[connIndex];
MultiConnection *connection = ClientConnectionArray[connIndex];
if (connection == NULL)
{
connectionId = connIndex;
@ -88,12 +89,16 @@ int32
MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDatabase,
const char *userName)
{
PGconn *connection = NULL;
char connInfoString[STRING_BUFFER_SIZE];
MultiConnection *connection = NULL;
ConnStatusType connStatusType = CONNECTION_OK;
int32 connectionId = AllocateConnectionId();
char *effectiveDatabaseName = NULL;
char *effectiveUserName = NULL;
int connectionFlags = FORCE_NEW_CONNECTION; /* no cached connections for now */
if (connectionId == INVALID_CONNECTION_ID)
{
ereport(WARNING, (errmsg("could not allocate connection in connection pool")));
return connectionId;
}
if (XactModificationLevel > XACT_MODIFICATION_NONE)
{
@ -102,46 +107,11 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
"command within a transaction")));
}
if (connectionId == INVALID_CONNECTION_ID)
{
ereport(WARNING, (errmsg("could not allocate connection in connection pool")));
return connectionId;
}
if (nodeDatabase == NULL)
{
effectiveDatabaseName = get_database_name(MyDatabaseId);
}
else
{
effectiveDatabaseName = pstrdup(nodeDatabase);
}
if (userName == NULL)
{
effectiveUserName = CurrentUserName();
}
else
{
effectiveUserName = pstrdup(userName);
}
/*
* FIXME: This code is bad on several levels. It completely forgoes any
* escaping, it misses setting a number of parameters, it works with a
* limited string size without erroring when it's too long. We shouldn't
* even build a query string this way, there's PQconnectdbParams()!
*/
/* transcribe connection paremeters to string */
snprintf(connInfoString, STRING_BUFFER_SIZE, CONN_INFO_TEMPLATE,
nodeName, nodePort,
effectiveDatabaseName, effectiveUserName,
CLIENT_CONNECT_TIMEOUT);
/* establish synchronous connection to worker node */
connection = PQconnectdb(connInfoString);
connStatusType = PQstatus(connection);
connection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
userName, nodeDatabase);
connStatusType = PQstatus(connection->pgConn);
if (connStatusType == CONNECTION_OK)
{
@ -149,15 +119,11 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
}
else
{
WarnRemoteError(connection, NULL);
PQfinish(connection);
ReportConnectionError(connection, WARNING);
CloseConnection(connection);
connectionId = INVALID_CONNECTION_ID;
}
pfree(effectiveDatabaseName);
pfree(effectiveUserName);
return connectionId;
}
@ -170,12 +136,11 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
int32
MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeDatabase)
{
PGconn *connection = NULL;
char connInfoString[STRING_BUFFER_SIZE];
ConnStatusType connStatusType = CONNECTION_BAD;
char *userName = CurrentUserName();
MultiConnection *connection = NULL;
ConnStatusType connStatusType = CONNECTION_OK;
int32 connectionId = AllocateConnectionId();
int connectionFlags = FORCE_NEW_CONNECTION; /* no cached connections for now */
if (connectionId == INVALID_CONNECTION_ID)
{
ereport(WARNING, (errmsg("could not allocate connection in connection pool")));
@ -189,13 +154,9 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
"command within a transaction")));
}
/* transcribe connection paremeters to string */
snprintf(connInfoString, STRING_BUFFER_SIZE, CONN_INFO_TEMPLATE,
nodeName, nodePort, nodeDatabase, userName, CLIENT_CONNECT_TIMEOUT);
/* prepare asynchronous request for worker node connection */
connection = PQconnectStart(connInfoString);
connStatusType = PQstatus(connection);
connection = StartNodeConnection(connectionFlags, nodeName, nodePort);
connStatusType = PQstatus(connection->pgConn);
/*
* If prepared, we save the connection, and set its initial polling status
@ -209,9 +170,9 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
}
else
{
WarnRemoteError(connection, NULL);
ReportConnectionError(connection, WARNING);
CloseConnection(connection);
PQfinish(connection);
connectionId = INVALID_CONNECTION_ID;
}
@ -223,7 +184,7 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
ConnectStatus
MultiClientConnectPoll(int32 connectionId)
{
PGconn *connection = NULL;
MultiConnection *connection = NULL;
PostgresPollingStatusType pollingStatus = PGRES_POLLING_OK;
ConnectStatus connectStatus = CLIENT_INVALID_CONNECT;
@ -241,7 +202,7 @@ MultiClientConnectPoll(int32 connectionId)
bool readReady = ClientConnectionReady(connection, PGRES_POLLING_READING);
if (readReady)
{
ClientPollingStatusArray[connectionId] = PQconnectPoll(connection);
ClientPollingStatusArray[connectionId] = PQconnectPoll(connection->pgConn);
connectStatus = CLIENT_CONNECTION_BUSY;
}
else
@ -254,7 +215,7 @@ MultiClientConnectPoll(int32 connectionId)
bool writeReady = ClientConnectionReady(connection, PGRES_POLLING_WRITING);
if (writeReady)
{
ClientPollingStatusArray[connectionId] = PQconnectPoll(connection);
ClientPollingStatusArray[connectionId] = PQconnectPoll(connection->pgConn);
connectStatus = CLIENT_CONNECTION_BUSY;
}
else
@ -264,7 +225,7 @@ MultiClientConnectPoll(int32 connectionId)
}
else if (pollingStatus == PGRES_POLLING_FAILED)
{
WarnRemoteError(connection, NULL);
ReportConnectionError(connection, WARNING);
connectStatus = CLIENT_CONNECTION_BAD;
}
@ -277,14 +238,14 @@ MultiClientConnectPoll(int32 connectionId)
void
MultiClientDisconnect(int32 connectionId)
{
PGconn *connection = NULL;
MultiConnection *connection = NULL;
const int InvalidPollingStatus = -1;
Assert(connectionId != INVALID_CONNECTION_ID);
connection = ClientConnectionArray[connectionId];
Assert(connection != NULL);
PQfinish(connection);
CloseConnection(connection);
ClientConnectionArray[connectionId] = NULL;
ClientPollingStatusArray[connectionId] = InvalidPollingStatus;
@ -298,7 +259,7 @@ MultiClientDisconnect(int32 connectionId)
bool
MultiClientConnectionUp(int32 connectionId)
{
PGconn *connection = NULL;
MultiConnection *connection = NULL;
ConnStatusType connStatusType = CONNECTION_OK;
bool connectionUp = true;
@ -306,7 +267,7 @@ MultiClientConnectionUp(int32 connectionId)
connection = ClientConnectionArray[connectionId];
Assert(connection != NULL);
connStatusType = PQstatus(connection);
connStatusType = PQstatus(connection->pgConn);
if (connStatusType == CONNECTION_BAD)
{
connectionUp = false;
@ -340,7 +301,7 @@ MultiClientExecute(int32 connectionId, const char *query, void **queryResult,
bool
MultiClientSendQuery(int32 connectionId, const char *query)
{
PGconn *connection = NULL;
MultiConnection *connection = NULL;
bool success = true;
int querySent = 0;
@ -348,10 +309,10 @@ MultiClientSendQuery(int32 connectionId, const char *query)
connection = ClientConnectionArray[connectionId];
Assert(connection != NULL);
querySent = PQsendQuery(connection, query);
querySent = PQsendQuery(connection->pgConn, query);
if (querySent == 0)
{
char *errorMessage = PQerrorMessage(connection);
char *errorMessage = PQerrorMessage(connection->pgConn);
ereport(WARNING, (errmsg("could not send remote query \"%s\"", query),
errdetail("Client error: %s", errorMessage)));
@ -366,7 +327,7 @@ MultiClientSendQuery(int32 connectionId, const char *query)
bool
MultiClientCancel(int32 connectionId)
{
PGconn *connection = NULL;
MultiConnection *connection = NULL;
PGcancel *cancelObject = NULL;
int cancelSent = 0;
bool canceled = true;
@ -376,7 +337,7 @@ MultiClientCancel(int32 connectionId)
connection = ClientConnectionArray[connectionId];
Assert(connection != NULL);
cancelObject = PQgetCancel(connection);
cancelObject = PQgetCancel(connection->pgConn);
cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer));
if (cancelSent == 0)
@ -397,7 +358,7 @@ MultiClientCancel(int32 connectionId)
ResultStatus
MultiClientResultStatus(int32 connectionId)
{
PGconn *connection = NULL;
MultiConnection *connection = NULL;
int consumed = 0;
ConnStatusType connStatusType = CONNECTION_OK;
ResultStatus resultStatus = CLIENT_INVALID_RESULT_STATUS;
@ -406,7 +367,7 @@ MultiClientResultStatus(int32 connectionId)
connection = ClientConnectionArray[connectionId];
Assert(connection != NULL);
connStatusType = PQstatus(connection);
connStatusType = PQstatus(connection->pgConn);
if (connStatusType == CONNECTION_BAD)
{
ereport(WARNING, (errmsg("could not maintain connection to worker node")));
@ -414,10 +375,10 @@ MultiClientResultStatus(int32 connectionId)
}
/* consume input to allow status change */
consumed = PQconsumeInput(connection);
consumed = PQconsumeInput(connection->pgConn);
if (consumed != 0)
{
int connectionBusy = PQisBusy(connection);
int connectionBusy = PQisBusy(connection->pgConn);
if (connectionBusy == 0)
{
resultStatus = CLIENT_RESULT_READY;
@ -442,7 +403,7 @@ bool
MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
int *columnCount)
{
PGconn *connection = NULL;
MultiConnection *connection = NULL;
PGresult *result = NULL;
ConnStatusType connStatusType = CONNECTION_OK;
ExecStatusType resultStatus = PGRES_COMMAND_OK;
@ -451,14 +412,14 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
connection = ClientConnectionArray[connectionId];
Assert(connection != NULL);
connStatusType = PQstatus(connection);
connStatusType = PQstatus(connection->pgConn);
if (connStatusType == CONNECTION_BAD)
{
ereport(WARNING, (errmsg("could not maintain connection to worker node")));
return false;
}
result = PQgetResult(connection);
result = PQgetResult(connection->pgConn);
resultStatus = PQresultStatus(result);
if (resultStatus == PGRES_TUPLES_OK)
{
@ -468,7 +429,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
}
else
{
WarnRemoteError(connection, result);
ReportResultError(connection, result, WARNING);
PQclear(result);
return false;
@ -494,7 +455,7 @@ BatchQueryStatus
MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount,
int *columnCount)
{
PGconn *connection = NULL;
MultiConnection *connection = NULL;
PGresult *result = NULL;
ConnStatusType connStatusType = CONNECTION_OK;
ExecStatusType resultStatus = PGRES_COMMAND_OK;
@ -509,14 +470,14 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount,
(*rowCount) = -1;
(*columnCount) = -1;
connStatusType = PQstatus(connection);
connStatusType = PQstatus(connection->pgConn);
if (connStatusType == CONNECTION_BAD)
{
ereport(WARNING, (errmsg("could not maintain connection to worker node")));
return CLIENT_BATCH_QUERY_FAILED;
}
result = PQgetResult(connection);
result = PQgetResult(connection->pgConn);
if (result == NULL)
{
return CLIENT_BATCH_QUERY_DONE;
@ -537,7 +498,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount,
}
else
{
WarnRemoteError(connection, result);
ReportResultError(connection, result, WARNING);
PQclear(result);
queryStatus = CLIENT_BATCH_QUERY_FAILED;
}
@ -576,7 +537,7 @@ MultiClientClearResult(void *queryResult)
QueryStatus
MultiClientQueryStatus(int32 connectionId)
{
PGconn *connection = NULL;
MultiConnection *connection = NULL;
PGresult *result = NULL;
int tupleCount PG_USED_FOR_ASSERTS_ONLY = 0;
bool copyResults = false;
@ -588,7 +549,7 @@ MultiClientQueryStatus(int32 connectionId)
connection = ClientConnectionArray[connectionId];
Assert(connection != NULL);
connStatusType = PQstatus(connection);
connStatusType = PQstatus(connection->pgConn);
if (connStatusType == CONNECTION_BAD)
{
ereport(WARNING, (errmsg("could not maintain connection to worker node")));
@ -600,7 +561,7 @@ MultiClientQueryStatus(int32 connectionId)
* isn't ready yet (the caller didn't wait for the connection to be ready),
* we will block on this call.
*/
result = PQgetResult(connection);
result = PQgetResult(connection->pgConn);
resultStatus = PQresultStatus(result);
if (resultStatus == PGRES_COMMAND_OK)
@ -631,7 +592,7 @@ MultiClientQueryStatus(int32 connectionId)
copyResults = true;
}
WarnRemoteError(connection, result);
ReportResultError(connection, result, WARNING);
}
/* clear the result object */
@ -654,7 +615,7 @@ MultiClientQueryStatus(int32 connectionId)
CopyStatus
MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
{
PGconn *connection = NULL;
MultiConnection *connection = NULL;
char *receiveBuffer = NULL;
int consumed = 0;
int receiveLength = 0;
@ -669,7 +630,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
* Consume input to handle the case where previous copy operation might have
* received zero bytes.
*/
consumed = PQconsumeInput(connection);
consumed = PQconsumeInput(connection->pgConn);
if (consumed == 0)
{
ereport(WARNING, (errmsg("could not read data from worker node")));
@ -677,7 +638,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
}
/* receive copy data message in an asynchronous manner */
receiveLength = PQgetCopyData(connection, &receiveBuffer, asynchronous);
receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous);
while (receiveLength > 0)
{
/* received copy data; append these data to file */
@ -698,7 +659,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
PQfreemem(receiveBuffer);
receiveLength = PQgetCopyData(connection, &receiveBuffer, asynchronous);
receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous);
}
/* we now check the last received length returned by copy data */
@ -710,7 +671,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
else if (receiveLength == -1)
{
/* received copy done message */
PGresult *result = PQgetResult(connection);
PGresult *result = PQgetResult(connection->pgConn);
ExecStatusType resultStatus = PQresultStatus(result);
if (resultStatus == PGRES_COMMAND_OK)
@ -721,7 +682,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
{
copyStatus = CLIENT_COPY_FAILED;
WarnRemoteError(connection, result);
ReportResultError(connection, result, WARNING);
}
PQclear(result);
@ -731,7 +692,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
/* received an error */
copyStatus = CLIENT_COPY_FAILED;
WarnRemoteError(connection, NULL);
ReportConnectionError(connection, WARNING);
}
/* if copy out completed, make sure we drain all results from libpq */
@ -794,7 +755,7 @@ void
MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus,
int32 connectionId)
{
PGconn *connection = NULL;
MultiConnection *connection = NULL;
struct pollfd *pollfd = NULL;
Assert(waitInfo->registeredWaiters < waitInfo->maxWaiters);
@ -812,7 +773,7 @@ MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus,
connection = ClientConnectionArray[connectionId];
pollfd = &waitInfo->pollfds[waitInfo->registeredWaiters];
pollfd->fd = PQsocket(connection);
pollfd->fd = PQsocket(connection->pgConn);
if (executionStatus == TASK_STATUS_SOCKET_READ)
{
pollfd->events = POLLERR | POLLIN;
@ -904,13 +865,13 @@ MultiClientWait(WaitInfo *waitInfo)
* query.
*/
static void
ClearRemainingResults(PGconn *connection)
ClearRemainingResults(MultiConnection *connection)
{
PGresult *result = PQgetResult(connection);
PGresult *result = PQgetResult(connection->pgConn);
while (result != NULL)
{
PQclear(result);
result = PQgetResult(connection);
result = PQgetResult(connection->pgConn);
}
}
@ -921,7 +882,8 @@ ClearRemainingResults(PGconn *connection)
* and libpq_select() at libpqwalreceiver.c.
*/
static bool
ClientConnectionReady(PGconn *connection, PostgresPollingStatusType pollingStatus)
ClientConnectionReady(MultiConnection *connection,
PostgresPollingStatusType pollingStatus)
{
bool clientConnectionReady = false;
int pollResult = 0;
@ -942,7 +904,7 @@ ClientConnectionReady(PGconn *connection, PostgresPollingStatusType pollingStatu
pollEventMask = POLLERR | POLLOUT;
}
pollFileDescriptor.fd = PQsocket(connection);
pollFileDescriptor.fd = PQsocket(connection->pgConn);
pollFileDescriptor.events = pollEventMask;
pollFileDescriptor.revents = 0;

View File

@ -15,10 +15,8 @@
#define MULTI_CLIENT_EXECUTOR_H
#define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */
#define CLIENT_CONNECT_TIMEOUT 5 /* connection timeout in seconds */
#define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */
#define STRING_BUFFER_SIZE 1024 /* buffer size for character arrays */
#define CONN_INFO_TEMPLATE "host=%s port=%u dbname=%s user=%s connect_timeout=%u"
/* Enumeration to track one client connection's status */