Merge pull request #863 from citusdata/feature/connection-lifecycle

Connection Lifecycle Management Infrastructure
pull/876/head
Andres Freund 2016-12-07 11:51:02 -08:00 committed by GitHub
commit f7d9074aa3
23 changed files with 1415 additions and 379 deletions

View File

@ -16,7 +16,7 @@ DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)
DATA_built = $(foreach v,$(EXTVERSIONS),$(EXTENSION)--$(v).sql) DATA_built = $(foreach v,$(EXTVERSIONS),$(EXTENSION)--$(v).sql)
# directories with source files # directories with source files
SUBDIRS = . commands executor master metadata planner relay test transaction utils worker SUBDIRS = . commands connection executor master metadata planner relay test transaction utils worker
# That patsubst rule searches all directories listed in SUBDIRS for .c # That patsubst rule searches all directories listed in SUBDIRS for .c
# files, and adds the corresponding .o files to OBJS # files, and adds the corresponding .o files to OBJS

View File

@ -329,13 +329,13 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
PQclear(queryResult); PQclear(queryResult);
/* close the connection */ /* close the connection */
PQfinish(masterConnection); CloseConnectionByPGconn(masterConnection);
masterConnection = NULL; masterConnection = NULL;
} }
PG_CATCH(); PG_CATCH();
{ {
/* close the connection */ /* close the connection */
PQfinish(masterConnection); CloseConnectionByPGconn(masterConnection);
masterConnection = NULL; masterConnection = NULL;
PG_RE_THROW(); PG_RE_THROW();

View File

@ -0,0 +1,638 @@
/*-------------------------------------------------------------------------
*
* connection_management.c
* Central management of connections and their life-cycle
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/hash.h"
#include "commands/dbcommands.h"
#include "distributed/connection_management.h"
#include "distributed/metadata_cache.h"
#include "distributed/hash_helpers.h"
#include "mb/pg_wchar.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
HTAB *ConnectionHash = NULL;
MemoryContext ConnectionContext = NULL;
static uint32 ConnectionHashHash(const void *key, Size keysize);
static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
/*
* Initialize per-backend connection management infrastructure.
*/
void
InitializeConnectionManagement(void)
{
HASHCTL info;
uint32 hashFlags = 0;
/*
* Create a single context for connection and transaction related memory
* management. Doing so, instead of allocating in TopMemoryContext, makes
* it easier to associate used memory.
*/
ConnectionContext = AllocSetContextCreate(TopMemoryContext, "Connection Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/* create (host,port,user,database) -> [connection] hash */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(ConnectionHashKey);
info.entrysize = sizeof(ConnectionHashEntry);
info.hash = ConnectionHashHash;
info.match = ConnectionHashCompare;
info.hcxt = ConnectionContext;
hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
ConnectionHash = hash_create("citus connection cache (host,port,user,database)",
64, &info, hashFlags);
}
/*
* Perform connection management activity after the end of a transaction. Both
* COMMIT and ABORT paths are handled here.
*
* This is called by Citus' global transaction callback.
*/
void
AfterXactConnectionHandling(bool isCommit)
{
HASH_SEQ_STATUS status;
ConnectionHashEntry *entry;
hash_seq_init(&status, ConnectionHash);
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
{
AfterXactHostConnectionHandling(entry, isCommit);
/*
* NB: We leave the hash entry in place, even if there's no individual
* connections in it anymore. There seems no benefit in deleting it,
* and it'll save a bit of work in the next transaction.
*/
}
}
/*
* GetNodeConnection() establishes a connection to remote node, using default
* user and database.
*
* See StartNodeUserDatabaseConnection for details.
*/
MultiConnection *
GetNodeConnection(uint32 flags, const char *hostname, int32 port)
{
return GetNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL);
}
/*
* StartNodeConnection initiates a connection to remote node, using default
* user and database.
*
* See StartNodeUserDatabaseConnection for details.
*/
MultiConnection *
StartNodeConnection(uint32 flags, const char *hostname, int32 port)
{
return StartNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL);
}
/*
* GetNodeUserDatabaseConnection establishes connection to remote node.
*
* See StartNodeUserDatabaseConnection for details.
*/
MultiConnection *
GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const
char *user, const char *database)
{
MultiConnection *connection;
connection = StartNodeUserDatabaseConnection(flags, hostname, port, user, database);
FinishConnectionEstablishment(connection);
return connection;
}
/*
* StartNodeUserDatabaseConnection() initiates a connection to a remote node.
*
* If user or database are NULL, the current session's defaults are used. The
* following flags influence connection establishment behaviour:
* - SESSION_LIFESPAN - the connection should persist after transaction end
* - FORCE_NEW_CONNECTION - a new connection is required
*
* The returned connection has only been initiated, not fully
* established. That's useful to allow parallel connection establishment. If
* that's not desired use the Get* variant.
*/
MultiConnection *
StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const
char *user, const char *database)
{
ConnectionHashKey key;
ConnectionHashEntry *entry = NULL;
MultiConnection *connection;
MemoryContext oldContext;
bool found;
/* do some minimal input checks */
strlcpy(key.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
key.port = port;
if (user)
{
strlcpy(key.user, user, NAMEDATALEN);
}
else
{
strlcpy(key.user, CurrentUserName(), NAMEDATALEN);
}
if (database)
{
strlcpy(key.database, database, NAMEDATALEN);
}
else
{
strlcpy(key.database, get_database_name(MyDatabaseId), NAMEDATALEN);
}
if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
{
CurrentCoordinatedTransactionState = COORD_TRANS_IDLE;
}
/*
* Lookup relevant hash entry. We always enter. If only a cached
* connection is desired, and there's none, we'll simply leave the
* connection list empty.
*/
entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
if (!found)
{
entry->connections = MemoryContextAlloc(ConnectionContext,
sizeof(dlist_head));
dlist_init(entry->connections);
}
/* if desired, check whether there's a usable connection */
if (!(flags & FORCE_NEW_CONNECTION))
{
/* check connection cache for a connection that's not already in use */
connection = FindAvailableConnection(entry->connections, flags);
if (connection)
{
if (flags & SESSION_LIFESPAN)
{
connection->sessionLifespan = true;
}
return connection;
}
}
/*
* Either no caching desired, or no pre-established, non-claimed,
* connection present. Initiate connection establishment.
*/
connection = StartConnectionEstablishment(&key);
oldContext = MemoryContextSwitchTo(ConnectionContext);
dlist_push_tail(entry->connections, &connection->connectionNode);
MemoryContextSwitchTo(oldContext);
if (flags & SESSION_LIFESPAN)
{
connection->sessionLifespan = true;
}
return connection;
}
/* StartNodeUserDatabaseConnection() helper */
static MultiConnection *
FindAvailableConnection(dlist_head *connections, uint32 flags)
{
dlist_iter iter;
dlist_foreach(iter, connections)
{
MultiConnection *connection =
dlist_container(MultiConnection, connectionNode, iter.cur);
/* don't return claimed connections */
if (connection->claimedExclusively)
{
continue;
}
return connection;
}
return NULL;
}
/*
* Return MultiConnection associated with the libpq connection.
*
* Note that this is comparatively expensive. Should only be used for
* backward-compatibility purposes.
*/
MultiConnection *
GetConnectionFromPGconn(struct pg_conn *pqConn)
{
HASH_SEQ_STATUS status;
ConnectionHashEntry *entry;
hash_seq_init(&status, ConnectionHash);
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
{
dlist_head *connections = entry->connections;
dlist_iter iter;
/* check connection cache for a connection that's not already in use */
dlist_foreach(iter, connections)
{
MultiConnection *connection =
dlist_container(MultiConnection, connectionNode, iter.cur);
if (connection->pgConn == pqConn)
{
hash_seq_term(&status);
return connection;
}
}
}
return NULL;
}
/*
* Close a previously established connection.
*/
void
CloseConnection(MultiConnection *connection)
{
ConnectionHashKey key;
bool found;
/* close connection */
PQfinish(connection->pgConn);
connection->pgConn = NULL;
strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH);
key.port = connection->port;
strlcpy(key.user, connection->user, NAMEDATALEN);
strlcpy(key.database, connection->database, NAMEDATALEN);
hash_search(ConnectionHash, &key, HASH_FIND, &found);
if (found)
{
/* unlink from list */
dlist_delete(&connection->connectionNode);
/* we leave the per-host entry alive */
pfree(connection);
}
else
{
ereport(ERROR, (errmsg("closing untracked connection")));
}
}
/*
* Close a previously established connection.
*
* This function closes the MultiConnection associatated with the libpq
* connection.
*
* Note that this is comparatively expensive. Should only be used for
* backward-compatibility purposes.
*/
void
CloseConnectionByPGconn(PGconn *pqConn)
{
MultiConnection *connection = GetConnectionFromPGconn(pqConn);
if (connection)
{
CloseConnection(connection);
}
else
{
ereport(WARNING, (errmsg("could not find connection to close")));
}
}
/*
* Synchronously finish connection establishment of an individual connection.
*
* TODO: Replace with variant waiting for multiple connections.
*/
void
FinishConnectionEstablishment(MultiConnection *connection)
{
static int checkIntervalMS = 200;
/*
* Loop until connection is established, or failed (possibly just timed
* out).
*/
while (true)
{
ConnStatusType status = PQstatus(connection->pgConn);
PostgresPollingStatusType pollmode;
if (status == CONNECTION_OK)
{
return;
}
/* FIXME: retries? */
if (status == CONNECTION_BAD)
{
return;
}
pollmode = PQconnectPoll(connection->pgConn);
/*
* FIXME: Do we want to add transparent retry support here?
*/
if (pollmode == PGRES_POLLING_FAILED)
{
return;
}
else if (pollmode == PGRES_POLLING_OK)
{
return;
}
else
{
Assert(pollmode == PGRES_POLLING_WRITING ||
pollmode == PGRES_POLLING_READING);
}
/* Loop, to handle poll() being interrupted by signals (EINTR) */
while (true)
{
struct pollfd pollFileDescriptor;
int pollResult = 0;
pollFileDescriptor.fd = PQsocket(connection->pgConn);
if (pollmode == PGRES_POLLING_READING)
{
pollFileDescriptor.events = POLLIN;
}
else
{
pollFileDescriptor.events = POLLOUT;
}
pollFileDescriptor.revents = 0;
/*
* Only sleep for a limited amount of time, so we can react to
* interrupts in time, even if the platform doesn't interrupt
* poll() after signal arrival.
*/
pollResult = poll(&pollFileDescriptor, 1, checkIntervalMS);
if (pollResult == 0)
{
/*
* Timeout exceeded. Two things to do:
* - check whether any interrupts arrived and handle them
* - check whether establishment for connection already has
* lasted for too long, stop waiting if so.
*/
CHECK_FOR_INTERRUPTS();
if (TimestampDifferenceExceeds(connection->connectionStart,
GetCurrentTimestamp(),
CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000))
{
ereport(WARNING, (errmsg("could not establish connection after %u ms",
CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000)));
/* close connection, otherwise we take up resource on the other side */
PQfinish(connection->pgConn);
connection->pgConn = NULL;
break;
}
}
else if (pollResult > 0)
{
/*
* IO possible, continue connection establishment. We could
* check for timeouts here as well, but if there's progress
* there seems little point.
*/
break;
}
else if (pollResult != EINTR)
{
/* Retrying, signal interrupted. So check. */
CHECK_FOR_INTERRUPTS();
}
else
{
/*
* We ERROR here, instead of just returning a failed
* connection, because this shouldn't happen, and indicates a
* programming error somewhere, not a network etc. issue.
*/
ereport(ERROR, (errcode_for_socket_access(),
errmsg("poll() failed: %m")));
}
}
}
}
/*
* ClaimConnectionExclusively signals that this connection is actively being
* used. That means it'll not be, again, returned by
* StartNodeUserDatabaseConnection() et al until releases with
* UnclaimConnection().
*/
void
ClaimConnectionExclusively(MultiConnection *connection)
{
Assert(!connection->claimedExclusively);
connection->claimedExclusively = true;
}
/*
* UnclaimConnection signals that this connection is not being used
* anymore. That means it again may be returned by
* StartNodeUserDatabaseConnection() et al.
*/
void
UnclaimConnection(MultiConnection *connection)
{
connection->claimedExclusively = false;
}
static uint32
ConnectionHashHash(const void *key, Size keysize)
{
ConnectionHashKey *entry = (ConnectionHashKey *) key;
uint32 hash = 0;
hash = string_hash(entry->hostname, NAMEDATALEN);
hash = hash_combine(hash, hash_uint32(entry->port));
hash = hash_combine(hash, string_hash(entry->user, NAMEDATALEN));
hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN));
return hash;
}
static int
ConnectionHashCompare(const void *a, const void *b, Size keysize)
{
ConnectionHashKey *ca = (ConnectionHashKey *) a;
ConnectionHashKey *cb = (ConnectionHashKey *) b;
if (strncmp(ca->hostname, cb->hostname, NAMEDATALEN) != 0 ||
ca->port != cb->port ||
strncmp(ca->user, cb->user, NAMEDATALEN) != 0 ||
strncmp(ca->database, cb->database, NAMEDATALEN) != 0)
{
return 1;
}
else
{
return 0;
}
}
/*
* Asynchronously establish connection to a remote node, but don't wait for
* that to finish. DNS lookups etc. are performed synchronously though.
*/
static MultiConnection *
StartConnectionEstablishment(ConnectionHashKey *key)
{
char nodePortString[12];
const char *clientEncoding = GetDatabaseEncodingName();
MultiConnection *connection = NULL;
const char *keywords[] = {
"host", "port", "dbname", "user",
"client_encoding", "fallback_application_name",
NULL
};
const char *values[] = {
key->hostname, nodePortString, key->database, key->user,
clientEncoding, "citus", NULL
};
connection = MemoryContextAllocZero(ConnectionContext, sizeof(MultiConnection));
sprintf(nodePortString, "%d", key->port);
strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH);
connection->port = key->port;
strlcpy(connection->database, key->database, NAMEDATALEN);
strlcpy(connection->user, key->user, NAMEDATALEN);
connection->pgConn = PQconnectStartParams(keywords, values, false);
connection->connectionStart = GetCurrentTimestamp();
return connection;
}
/*
* Close all remote connections if necessary anymore (i.e. not session
* lifetime), or if in a failed state.
*/
static void
AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
{
dlist_mutable_iter iter;
dlist_foreach_modify(iter, entry->connections)
{
MultiConnection *connection =
dlist_container(MultiConnection, connectionNode, iter.cur);
/*
* To avoid code leaking connections we warn if connections are
* still claimed exclusively. We can only do so if the transaction
* committed, as it's normal that code didn't have chance to clean
* up after errors.
*/
if (isCommit && connection->claimedExclusively)
{
ereport(WARNING,
(errmsg("connection claimed exclusively at transaction commit")));
}
/*
* Preserve session lifespan connections if they are still healthy.
*/
if (!connection->sessionLifespan ||
PQstatus(connection->pgConn) != CONNECTION_OK ||
PQtransactionStatus(connection->pgConn) != PQTRANS_IDLE)
{
PQfinish(connection->pgConn);
connection->pgConn = NULL;
/* unlink from list */
dlist_delete(iter.cur);
pfree(connection);
}
else
{
UnclaimConnection(connection);
}
}
}

View File

@ -0,0 +1,195 @@
/*-------------------------------------------------------------------------
*
* remote_commands.c
* Helpers to make it easier to execute command on remote nodes.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "libpq-fe.h"
#include "distributed/connection_management.h"
#include "distributed/remote_commands.h"
/* GUC, determining whether statements sent to remote nodes are logged */
bool LogRemoteCommands = false;
/* simple helpers */
/*
* IsResponseOK checks whether the result is a successful one.
*/
bool
IsResponseOK(PGresult *result)
{
ExecStatusType resultStatus = PQresultStatus(result);
if (resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK ||
resultStatus == PGRES_COMMAND_OK)
{
return true;
}
return false;
}
/*
* ForgetResults clears a connection from pending activity.
*
* XXX: In the future it might be a good idea to use use PQcancel() if results
* would require network IO.
*/
void
ForgetResults(MultiConnection *connection)
{
while (true)
{
PGresult *result = NULL;
result = PQgetResult(connection->pgConn);
if (result == NULL)
{
break;
}
if (PQresultStatus(result) == PGRES_COPY_IN)
{
PQputCopyEnd(connection->pgConn, NULL);
/* TODO: mark transaction as failed, once we can. */
}
PQclear(result);
}
}
/*
* SqlStateMatchesCategory returns true if the given sql state (which may be
* NULL if unknown) is in the given error category. Note that we use
* ERRCODE_TO_CATEGORY macro to determine error category of the sql state and
* expect the caller to use the same macro for the error category.
*/
bool
SqlStateMatchesCategory(char *sqlStateString, int category)
{
bool sqlStateMatchesCategory = false;
int sqlState = 0;
int sqlStateCategory = 0;
if (sqlStateString == NULL)
{
return false;
}
sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2],
sqlStateString[3], sqlStateString[4]);
sqlStateCategory = ERRCODE_TO_CATEGORY(sqlState);
if (sqlStateCategory == category)
{
sqlStateMatchesCategory = true;
}
return sqlStateMatchesCategory;
}
/* report errors & warnings */
/*
* Report libpq failure that's not associated with a result.
*/
void
ReportConnectionError(MultiConnection *connection, int elevel)
{
char *nodeName = connection->hostname;
int nodePort = connection->port;
ereport(elevel, (errmsg("connection error: %s:%d", nodeName, nodePort),
errdetail("%s", PQerrorMessage(connection->pgConn))));
}
/*
* ReportResultError reports libpq failure associated with a result.
*/
void
ReportResultError(MultiConnection *connection, PGresult *result, int elevel)
{
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
char *messagePrimary = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
char *messageDetail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL);
char *messageHint = PQresultErrorField(result, PG_DIAG_MESSAGE_HINT);
char *messageContext = PQresultErrorField(result, PG_DIAG_CONTEXT);
char *nodeName = connection->hostname;
int nodePort = connection->port;
int sqlState = ERRCODE_INTERNAL_ERROR;
if (sqlStateString != NULL)
{
sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2],
sqlStateString[3], sqlStateString[4]);
}
/*
* If the PGresult did not contain a message, the connection may provide a
* suitable top level one. At worst, this is an empty string.
*/
if (messagePrimary == NULL)
{
char *lastNewlineIndex = NULL;
messagePrimary = PQerrorMessage(connection->pgConn);
lastNewlineIndex = strrchr(messagePrimary, '\n');
/* trim trailing newline, if any */
if (lastNewlineIndex != NULL)
{
*lastNewlineIndex = '\0';
}
}
ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary),
messageDetail ? errdetail("%s", messageDetail) : 0,
messageHint ? errhint("%s", messageHint) : 0,
messageContext ? errcontext("%s", messageContext) : 0,
errcontext("while executing command on %s:%d",
nodeName, nodePort)));
}
/*
* LogRemoteCommand logs commands send to remote nodes if
* citus.log_remote_commands wants us to do so.
*/
void
LogRemoteCommand(MultiConnection *connection, const char *command)
{
if (!LogRemoteCommands)
{
return;
}
ereport(LOG, (errmsg("issuing %s", command),
errdetail("on server %s:%d", connection->hostname, connection->port)));
}
/* wrappers around libpq functions, with command logging support */
/*
* SendRemoteCommand is a tiny PQsendQuery wrapper that logs remote commands,
* and accepts a MultiConnection instead of a plain PGconn.
*/
int
SendRemoteCommand(MultiConnection *connection, const char *command)
{
LogRemoteCommand(connection, command);
return PQsendQuery(connection->pgConn, command);
}

View File

@ -20,8 +20,10 @@
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
#include "distributed/connection_management.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/remote_commands.h"
#include <errno.h> #include <errno.h>
#include <unistd.h> #include <unistd.h>
@ -38,7 +40,7 @@
/* Local pool to track active connections */ /* Local pool to track active connections */
static PGconn *ClientConnectionArray[MAX_CONNECTION_COUNT]; static MultiConnection *ClientConnectionArray[MAX_CONNECTION_COUNT];
/* /*
* The value at any position on ClientPollingStatusArray is only defined when * The value at any position on ClientPollingStatusArray is only defined when
@ -48,8 +50,8 @@ static PostgresPollingStatusType ClientPollingStatusArray[MAX_CONNECTION_COUNT];
/* Local functions forward declarations */ /* Local functions forward declarations */
static void ClearRemainingResults(PGconn *connection); static void ClearRemainingResults(MultiConnection *connection);
static bool ClientConnectionReady(PGconn *connection, static bool ClientConnectionReady(MultiConnection *connection,
PostgresPollingStatusType pollingStatus); PostgresPollingStatusType pollingStatus);
@ -63,7 +65,7 @@ AllocateConnectionId(void)
/* allocate connectionId from connection pool */ /* allocate connectionId from connection pool */
for (connIndex = 0; connIndex < MAX_CONNECTION_COUNT; connIndex++) for (connIndex = 0; connIndex < MAX_CONNECTION_COUNT; connIndex++)
{ {
PGconn *connection = ClientConnectionArray[connIndex]; MultiConnection *connection = ClientConnectionArray[connIndex];
if (connection == NULL) if (connection == NULL)
{ {
connectionId = connIndex; connectionId = connIndex;
@ -87,12 +89,16 @@ int32
MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDatabase, MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDatabase,
const char *userName) const char *userName)
{ {
PGconn *connection = NULL; MultiConnection *connection = NULL;
char connInfoString[STRING_BUFFER_SIZE];
ConnStatusType connStatusType = CONNECTION_OK; ConnStatusType connStatusType = CONNECTION_OK;
int32 connectionId = AllocateConnectionId(); int32 connectionId = AllocateConnectionId();
char *effectiveDatabaseName = NULL; int connectionFlags = FORCE_NEW_CONNECTION; /* no cached connections for now */
char *effectiveUserName = NULL;
if (connectionId == INVALID_CONNECTION_ID)
{
ereport(WARNING, (errmsg("could not allocate connection in connection pool")));
return connectionId;
}
if (XactModificationLevel > XACT_MODIFICATION_NONE) if (XactModificationLevel > XACT_MODIFICATION_NONE)
{ {
@ -101,46 +107,11 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
"command within a transaction"))); "command within a transaction")));
} }
if (connectionId == INVALID_CONNECTION_ID)
{
ereport(WARNING, (errmsg("could not allocate connection in connection pool")));
return connectionId;
}
if (nodeDatabase == NULL)
{
effectiveDatabaseName = get_database_name(MyDatabaseId);
}
else
{
effectiveDatabaseName = pstrdup(nodeDatabase);
}
if (userName == NULL)
{
effectiveUserName = CurrentUserName();
}
else
{
effectiveUserName = pstrdup(userName);
}
/*
* FIXME: This code is bad on several levels. It completely forgoes any
* escaping, it misses setting a number of parameters, it works with a
* limited string size without erroring when it's too long. We shouldn't
* even build a query string this way, there's PQconnectdbParams()!
*/
/* transcribe connection paremeters to string */
snprintf(connInfoString, STRING_BUFFER_SIZE, CONN_INFO_TEMPLATE,
nodeName, nodePort,
effectiveDatabaseName, effectiveUserName,
CLIENT_CONNECT_TIMEOUT);
/* establish synchronous connection to worker node */ /* establish synchronous connection to worker node */
connection = PQconnectdb(connInfoString); connection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
connStatusType = PQstatus(connection); userName, nodeDatabase);
connStatusType = PQstatus(connection->pgConn);
if (connStatusType == CONNECTION_OK) if (connStatusType == CONNECTION_OK)
{ {
@ -148,15 +119,11 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
} }
else else
{ {
WarnRemoteError(connection, NULL); ReportConnectionError(connection, WARNING);
CloseConnection(connection);
PQfinish(connection);
connectionId = INVALID_CONNECTION_ID; connectionId = INVALID_CONNECTION_ID;
} }
pfree(effectiveDatabaseName);
pfree(effectiveUserName);
return connectionId; return connectionId;
} }
@ -169,12 +136,11 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
int32 int32
MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeDatabase) MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeDatabase)
{ {
PGconn *connection = NULL; MultiConnection *connection = NULL;
char connInfoString[STRING_BUFFER_SIZE]; ConnStatusType connStatusType = CONNECTION_OK;
ConnStatusType connStatusType = CONNECTION_BAD;
char *userName = CurrentUserName();
int32 connectionId = AllocateConnectionId(); int32 connectionId = AllocateConnectionId();
int connectionFlags = FORCE_NEW_CONNECTION; /* no cached connections for now */
if (connectionId == INVALID_CONNECTION_ID) if (connectionId == INVALID_CONNECTION_ID)
{ {
ereport(WARNING, (errmsg("could not allocate connection in connection pool"))); ereport(WARNING, (errmsg("could not allocate connection in connection pool")));
@ -188,13 +154,9 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
"command within a transaction"))); "command within a transaction")));
} }
/* transcribe connection paremeters to string */
snprintf(connInfoString, STRING_BUFFER_SIZE, CONN_INFO_TEMPLATE,
nodeName, nodePort, nodeDatabase, userName, CLIENT_CONNECT_TIMEOUT);
/* prepare asynchronous request for worker node connection */ /* prepare asynchronous request for worker node connection */
connection = PQconnectStart(connInfoString); connection = StartNodeConnection(connectionFlags, nodeName, nodePort);
connStatusType = PQstatus(connection); connStatusType = PQstatus(connection->pgConn);
/* /*
* If prepared, we save the connection, and set its initial polling status * If prepared, we save the connection, and set its initial polling status
@ -208,9 +170,9 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
} }
else else
{ {
WarnRemoteError(connection, NULL); ReportConnectionError(connection, WARNING);
CloseConnection(connection);
PQfinish(connection);
connectionId = INVALID_CONNECTION_ID; connectionId = INVALID_CONNECTION_ID;
} }
@ -222,7 +184,7 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
ConnectStatus ConnectStatus
MultiClientConnectPoll(int32 connectionId) MultiClientConnectPoll(int32 connectionId)
{ {
PGconn *connection = NULL; MultiConnection *connection = NULL;
PostgresPollingStatusType pollingStatus = PGRES_POLLING_OK; PostgresPollingStatusType pollingStatus = PGRES_POLLING_OK;
ConnectStatus connectStatus = CLIENT_INVALID_CONNECT; ConnectStatus connectStatus = CLIENT_INVALID_CONNECT;
@ -240,7 +202,7 @@ MultiClientConnectPoll(int32 connectionId)
bool readReady = ClientConnectionReady(connection, PGRES_POLLING_READING); bool readReady = ClientConnectionReady(connection, PGRES_POLLING_READING);
if (readReady) if (readReady)
{ {
ClientPollingStatusArray[connectionId] = PQconnectPoll(connection); ClientPollingStatusArray[connectionId] = PQconnectPoll(connection->pgConn);
connectStatus = CLIENT_CONNECTION_BUSY; connectStatus = CLIENT_CONNECTION_BUSY;
} }
else else
@ -253,7 +215,7 @@ MultiClientConnectPoll(int32 connectionId)
bool writeReady = ClientConnectionReady(connection, PGRES_POLLING_WRITING); bool writeReady = ClientConnectionReady(connection, PGRES_POLLING_WRITING);
if (writeReady) if (writeReady)
{ {
ClientPollingStatusArray[connectionId] = PQconnectPoll(connection); ClientPollingStatusArray[connectionId] = PQconnectPoll(connection->pgConn);
connectStatus = CLIENT_CONNECTION_BUSY; connectStatus = CLIENT_CONNECTION_BUSY;
} }
else else
@ -263,7 +225,7 @@ MultiClientConnectPoll(int32 connectionId)
} }
else if (pollingStatus == PGRES_POLLING_FAILED) else if (pollingStatus == PGRES_POLLING_FAILED)
{ {
WarnRemoteError(connection, NULL); ReportConnectionError(connection, WARNING);
connectStatus = CLIENT_CONNECTION_BAD; connectStatus = CLIENT_CONNECTION_BAD;
} }
@ -276,14 +238,14 @@ MultiClientConnectPoll(int32 connectionId)
void void
MultiClientDisconnect(int32 connectionId) MultiClientDisconnect(int32 connectionId)
{ {
PGconn *connection = NULL; MultiConnection *connection = NULL;
const int InvalidPollingStatus = -1; const int InvalidPollingStatus = -1;
Assert(connectionId != INVALID_CONNECTION_ID); Assert(connectionId != INVALID_CONNECTION_ID);
connection = ClientConnectionArray[connectionId]; connection = ClientConnectionArray[connectionId];
Assert(connection != NULL); Assert(connection != NULL);
PQfinish(connection); CloseConnection(connection);
ClientConnectionArray[connectionId] = NULL; ClientConnectionArray[connectionId] = NULL;
ClientPollingStatusArray[connectionId] = InvalidPollingStatus; ClientPollingStatusArray[connectionId] = InvalidPollingStatus;
@ -297,7 +259,7 @@ MultiClientDisconnect(int32 connectionId)
bool bool
MultiClientConnectionUp(int32 connectionId) MultiClientConnectionUp(int32 connectionId)
{ {
PGconn *connection = NULL; MultiConnection *connection = NULL;
ConnStatusType connStatusType = CONNECTION_OK; ConnStatusType connStatusType = CONNECTION_OK;
bool connectionUp = true; bool connectionUp = true;
@ -305,7 +267,7 @@ MultiClientConnectionUp(int32 connectionId)
connection = ClientConnectionArray[connectionId]; connection = ClientConnectionArray[connectionId];
Assert(connection != NULL); Assert(connection != NULL);
connStatusType = PQstatus(connection); connStatusType = PQstatus(connection->pgConn);
if (connStatusType == CONNECTION_BAD) if (connStatusType == CONNECTION_BAD)
{ {
connectionUp = false; connectionUp = false;
@ -339,7 +301,7 @@ MultiClientExecute(int32 connectionId, const char *query, void **queryResult,
bool bool
MultiClientSendQuery(int32 connectionId, const char *query) MultiClientSendQuery(int32 connectionId, const char *query)
{ {
PGconn *connection = NULL; MultiConnection *connection = NULL;
bool success = true; bool success = true;
int querySent = 0; int querySent = 0;
@ -347,10 +309,10 @@ MultiClientSendQuery(int32 connectionId, const char *query)
connection = ClientConnectionArray[connectionId]; connection = ClientConnectionArray[connectionId];
Assert(connection != NULL); Assert(connection != NULL);
querySent = PQsendQuery(connection, query); querySent = PQsendQuery(connection->pgConn, query);
if (querySent == 0) if (querySent == 0)
{ {
char *errorMessage = PQerrorMessage(connection); char *errorMessage = PQerrorMessage(connection->pgConn);
ereport(WARNING, (errmsg("could not send remote query \"%s\"", query), ereport(WARNING, (errmsg("could not send remote query \"%s\"", query),
errdetail("Client error: %s", errorMessage))); errdetail("Client error: %s", errorMessage)));
@ -365,7 +327,7 @@ MultiClientSendQuery(int32 connectionId, const char *query)
bool bool
MultiClientCancel(int32 connectionId) MultiClientCancel(int32 connectionId)
{ {
PGconn *connection = NULL; MultiConnection *connection = NULL;
PGcancel *cancelObject = NULL; PGcancel *cancelObject = NULL;
int cancelSent = 0; int cancelSent = 0;
bool canceled = true; bool canceled = true;
@ -375,7 +337,7 @@ MultiClientCancel(int32 connectionId)
connection = ClientConnectionArray[connectionId]; connection = ClientConnectionArray[connectionId];
Assert(connection != NULL); Assert(connection != NULL);
cancelObject = PQgetCancel(connection); cancelObject = PQgetCancel(connection->pgConn);
cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer)); cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer));
if (cancelSent == 0) if (cancelSent == 0)
@ -396,7 +358,7 @@ MultiClientCancel(int32 connectionId)
ResultStatus ResultStatus
MultiClientResultStatus(int32 connectionId) MultiClientResultStatus(int32 connectionId)
{ {
PGconn *connection = NULL; MultiConnection *connection = NULL;
int consumed = 0; int consumed = 0;
ConnStatusType connStatusType = CONNECTION_OK; ConnStatusType connStatusType = CONNECTION_OK;
ResultStatus resultStatus = CLIENT_INVALID_RESULT_STATUS; ResultStatus resultStatus = CLIENT_INVALID_RESULT_STATUS;
@ -405,7 +367,7 @@ MultiClientResultStatus(int32 connectionId)
connection = ClientConnectionArray[connectionId]; connection = ClientConnectionArray[connectionId];
Assert(connection != NULL); Assert(connection != NULL);
connStatusType = PQstatus(connection); connStatusType = PQstatus(connection->pgConn);
if (connStatusType == CONNECTION_BAD) if (connStatusType == CONNECTION_BAD)
{ {
ereport(WARNING, (errmsg("could not maintain connection to worker node"))); ereport(WARNING, (errmsg("could not maintain connection to worker node")));
@ -413,10 +375,10 @@ MultiClientResultStatus(int32 connectionId)
} }
/* consume input to allow status change */ /* consume input to allow status change */
consumed = PQconsumeInput(connection); consumed = PQconsumeInput(connection->pgConn);
if (consumed != 0) if (consumed != 0)
{ {
int connectionBusy = PQisBusy(connection); int connectionBusy = PQisBusy(connection->pgConn);
if (connectionBusy == 0) if (connectionBusy == 0)
{ {
resultStatus = CLIENT_RESULT_READY; resultStatus = CLIENT_RESULT_READY;
@ -441,7 +403,7 @@ bool
MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
int *columnCount) int *columnCount)
{ {
PGconn *connection = NULL; MultiConnection *connection = NULL;
PGresult *result = NULL; PGresult *result = NULL;
ConnStatusType connStatusType = CONNECTION_OK; ConnStatusType connStatusType = CONNECTION_OK;
ExecStatusType resultStatus = PGRES_COMMAND_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK;
@ -450,14 +412,14 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
connection = ClientConnectionArray[connectionId]; connection = ClientConnectionArray[connectionId];
Assert(connection != NULL); Assert(connection != NULL);
connStatusType = PQstatus(connection); connStatusType = PQstatus(connection->pgConn);
if (connStatusType == CONNECTION_BAD) if (connStatusType == CONNECTION_BAD)
{ {
ereport(WARNING, (errmsg("could not maintain connection to worker node"))); ereport(WARNING, (errmsg("could not maintain connection to worker node")));
return false; return false;
} }
result = PQgetResult(connection); result = PQgetResult(connection->pgConn);
resultStatus = PQresultStatus(result); resultStatus = PQresultStatus(result);
if (resultStatus == PGRES_TUPLES_OK) if (resultStatus == PGRES_TUPLES_OK)
{ {
@ -467,7 +429,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
} }
else else
{ {
WarnRemoteError(connection, result); ReportResultError(connection, result, WARNING);
PQclear(result); PQclear(result);
return false; return false;
@ -493,7 +455,7 @@ BatchQueryStatus
MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount,
int *columnCount) int *columnCount)
{ {
PGconn *connection = NULL; MultiConnection *connection = NULL;
PGresult *result = NULL; PGresult *result = NULL;
ConnStatusType connStatusType = CONNECTION_OK; ConnStatusType connStatusType = CONNECTION_OK;
ExecStatusType resultStatus = PGRES_COMMAND_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK;
@ -508,14 +470,14 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount,
(*rowCount) = -1; (*rowCount) = -1;
(*columnCount) = -1; (*columnCount) = -1;
connStatusType = PQstatus(connection); connStatusType = PQstatus(connection->pgConn);
if (connStatusType == CONNECTION_BAD) if (connStatusType == CONNECTION_BAD)
{ {
ereport(WARNING, (errmsg("could not maintain connection to worker node"))); ereport(WARNING, (errmsg("could not maintain connection to worker node")));
return CLIENT_BATCH_QUERY_FAILED; return CLIENT_BATCH_QUERY_FAILED;
} }
result = PQgetResult(connection); result = PQgetResult(connection->pgConn);
if (result == NULL) if (result == NULL)
{ {
return CLIENT_BATCH_QUERY_DONE; return CLIENT_BATCH_QUERY_DONE;
@ -536,7 +498,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount,
} }
else else
{ {
WarnRemoteError(connection, result); ReportResultError(connection, result, WARNING);
PQclear(result); PQclear(result);
queryStatus = CLIENT_BATCH_QUERY_FAILED; queryStatus = CLIENT_BATCH_QUERY_FAILED;
} }
@ -575,7 +537,7 @@ MultiClientClearResult(void *queryResult)
QueryStatus QueryStatus
MultiClientQueryStatus(int32 connectionId) MultiClientQueryStatus(int32 connectionId)
{ {
PGconn *connection = NULL; MultiConnection *connection = NULL;
PGresult *result = NULL; PGresult *result = NULL;
int tupleCount PG_USED_FOR_ASSERTS_ONLY = 0; int tupleCount PG_USED_FOR_ASSERTS_ONLY = 0;
bool copyResults = false; bool copyResults = false;
@ -587,7 +549,7 @@ MultiClientQueryStatus(int32 connectionId)
connection = ClientConnectionArray[connectionId]; connection = ClientConnectionArray[connectionId];
Assert(connection != NULL); Assert(connection != NULL);
connStatusType = PQstatus(connection); connStatusType = PQstatus(connection->pgConn);
if (connStatusType == CONNECTION_BAD) if (connStatusType == CONNECTION_BAD)
{ {
ereport(WARNING, (errmsg("could not maintain connection to worker node"))); ereport(WARNING, (errmsg("could not maintain connection to worker node")));
@ -599,7 +561,7 @@ MultiClientQueryStatus(int32 connectionId)
* isn't ready yet (the caller didn't wait for the connection to be ready), * isn't ready yet (the caller didn't wait for the connection to be ready),
* we will block on this call. * we will block on this call.
*/ */
result = PQgetResult(connection); result = PQgetResult(connection->pgConn);
resultStatus = PQresultStatus(result); resultStatus = PQresultStatus(result);
if (resultStatus == PGRES_COMMAND_OK) if (resultStatus == PGRES_COMMAND_OK)
@ -630,7 +592,7 @@ MultiClientQueryStatus(int32 connectionId)
copyResults = true; copyResults = true;
} }
WarnRemoteError(connection, result); ReportResultError(connection, result, WARNING);
} }
/* clear the result object */ /* clear the result object */
@ -653,7 +615,7 @@ MultiClientQueryStatus(int32 connectionId)
CopyStatus CopyStatus
MultiClientCopyData(int32 connectionId, int32 fileDescriptor) MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
{ {
PGconn *connection = NULL; MultiConnection *connection = NULL;
char *receiveBuffer = NULL; char *receiveBuffer = NULL;
int consumed = 0; int consumed = 0;
int receiveLength = 0; int receiveLength = 0;
@ -668,7 +630,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
* Consume input to handle the case where previous copy operation might have * Consume input to handle the case where previous copy operation might have
* received zero bytes. * received zero bytes.
*/ */
consumed = PQconsumeInput(connection); consumed = PQconsumeInput(connection->pgConn);
if (consumed == 0) if (consumed == 0)
{ {
ereport(WARNING, (errmsg("could not read data from worker node"))); ereport(WARNING, (errmsg("could not read data from worker node")));
@ -676,7 +638,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
} }
/* receive copy data message in an asynchronous manner */ /* receive copy data message in an asynchronous manner */
receiveLength = PQgetCopyData(connection, &receiveBuffer, asynchronous); receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous);
while (receiveLength > 0) while (receiveLength > 0)
{ {
/* received copy data; append these data to file */ /* received copy data; append these data to file */
@ -697,7 +659,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
PQfreemem(receiveBuffer); PQfreemem(receiveBuffer);
receiveLength = PQgetCopyData(connection, &receiveBuffer, asynchronous); receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous);
} }
/* we now check the last received length returned by copy data */ /* we now check the last received length returned by copy data */
@ -709,7 +671,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
else if (receiveLength == -1) else if (receiveLength == -1)
{ {
/* received copy done message */ /* received copy done message */
PGresult *result = PQgetResult(connection); PGresult *result = PQgetResult(connection->pgConn);
ExecStatusType resultStatus = PQresultStatus(result); ExecStatusType resultStatus = PQresultStatus(result);
if (resultStatus == PGRES_COMMAND_OK) if (resultStatus == PGRES_COMMAND_OK)
@ -720,7 +682,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
{ {
copyStatus = CLIENT_COPY_FAILED; copyStatus = CLIENT_COPY_FAILED;
WarnRemoteError(connection, result); ReportResultError(connection, result, WARNING);
} }
PQclear(result); PQclear(result);
@ -730,7 +692,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
/* received an error */ /* received an error */
copyStatus = CLIENT_COPY_FAILED; copyStatus = CLIENT_COPY_FAILED;
WarnRemoteError(connection, NULL); ReportConnectionError(connection, WARNING);
} }
/* if copy out completed, make sure we drain all results from libpq */ /* if copy out completed, make sure we drain all results from libpq */
@ -793,7 +755,7 @@ void
MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus, MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus,
int32 connectionId) int32 connectionId)
{ {
PGconn *connection = NULL; MultiConnection *connection = NULL;
struct pollfd *pollfd = NULL; struct pollfd *pollfd = NULL;
Assert(waitInfo->registeredWaiters < waitInfo->maxWaiters); Assert(waitInfo->registeredWaiters < waitInfo->maxWaiters);
@ -811,7 +773,7 @@ MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus,
connection = ClientConnectionArray[connectionId]; connection = ClientConnectionArray[connectionId];
pollfd = &waitInfo->pollfds[waitInfo->registeredWaiters]; pollfd = &waitInfo->pollfds[waitInfo->registeredWaiters];
pollfd->fd = PQsocket(connection); pollfd->fd = PQsocket(connection->pgConn);
if (executionStatus == TASK_STATUS_SOCKET_READ) if (executionStatus == TASK_STATUS_SOCKET_READ)
{ {
pollfd->events = POLLERR | POLLIN; pollfd->events = POLLERR | POLLIN;
@ -903,13 +865,13 @@ MultiClientWait(WaitInfo *waitInfo)
* query. * query.
*/ */
static void static void
ClearRemainingResults(PGconn *connection) ClearRemainingResults(MultiConnection *connection)
{ {
PGresult *result = PQgetResult(connection); PGresult *result = PQgetResult(connection->pgConn);
while (result != NULL) while (result != NULL)
{ {
PQclear(result); PQclear(result);
result = PQgetResult(connection); result = PQgetResult(connection->pgConn);
} }
} }
@ -920,7 +882,8 @@ ClearRemainingResults(PGconn *connection)
* and libpq_select() at libpqwalreceiver.c. * and libpq_select() at libpqwalreceiver.c.
*/ */
static bool static bool
ClientConnectionReady(PGconn *connection, PostgresPollingStatusType pollingStatus) ClientConnectionReady(MultiConnection *connection,
PostgresPollingStatusType pollingStatus)
{ {
bool clientConnectionReady = false; bool clientConnectionReady = false;
int pollResult = 0; int pollResult = 0;
@ -941,7 +904,7 @@ ClientConnectionReady(PGconn *connection, PostgresPollingStatusType pollingStatu
pollEventMask = POLLERR | POLLOUT; pollEventMask = POLLERR | POLLOUT;
} }
pollFileDescriptor.fd = PQsocket(connection); pollFileDescriptor.fd = PQsocket(connection->pgConn);
pollFileDescriptor.events = pollEventMask; pollFileDescriptor.events = pollEventMask;
pollFileDescriptor.revents = 0; pollFileDescriptor.revents = 0;

View File

@ -30,6 +30,7 @@
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/commit_protocol.h" #include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
#include "distributed/connection_management.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -39,6 +40,7 @@
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "executor/execdesc.h" #include "executor/execdesc.h"
#include "executor/executor.h" #include "executor/executor.h"
@ -109,7 +111,7 @@ static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescrip
Tuplestorestate *tupleStore); Tuplestorestate *tupleStore);
static PGconn * GetConnectionForPlacement(ShardPlacement *placement, static PGconn * GetConnectionForPlacement(ShardPlacement *placement,
bool isModificationQuery); bool isModificationQuery);
static void PurgeConnectionForPlacement(ShardPlacement *placement); static void PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement);
static void RemoveXactConnection(PGconn *connection); static void RemoveXactConnection(PGconn *connection);
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
Oid **parameterTypes, Oid **parameterTypes,
@ -275,7 +277,7 @@ InitTransactionStateForTask(Task *task)
if (PQresultStatus(result) != PGRES_COMMAND_OK) if (PQresultStatus(result) != PGRES_COMMAND_OK)
{ {
WarnRemoteError(connection, result); WarnRemoteError(connection, result);
PurgeConnection(connection); CloseConnectionByPGconn(connection);
connection = NULL; connection = NULL;
} }
@ -792,7 +794,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
if (!queryOK) if (!queryOK)
{ {
PurgeConnectionForPlacement(taskPlacement); PurgeConnectionForPlacement(connection, taskPlacement);
failedPlacementList = lappend(failedPlacementList, taskPlacement); failedPlacementList = lappend(failedPlacementList, taskPlacement);
continue; continue;
} }
@ -850,7 +852,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
} }
else else
{ {
PurgeConnectionForPlacement(taskPlacement); PurgeConnectionForPlacement(connection, taskPlacement);
failedPlacementList = lappend(failedPlacementList, taskPlacement); failedPlacementList = lappend(failedPlacementList, taskPlacement);
@ -954,8 +956,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
"commands"))); "commands")));
} }
XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD;
shardIntervalList = TaskShardIntervalList(taskList); shardIntervalList = TaskShardIntervalList(taskList);
/* ensure that there are no concurrent modifications on the same shards */ /* ensure that there are no concurrent modifications on the same shards */
@ -964,6 +964,8 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
/* open connection to all relevant placements, if not already open */ /* open connection to all relevant placements, if not already open */
OpenTransactionsToAllShardPlacements(shardIntervalList, userName); OpenTransactionsToAllShardPlacements(shardIntervalList, userName);
XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD;
/* iterate over placements in rounds, to ensure in-order execution */ /* iterate over placements in rounds, to ensure in-order execution */
while (tasksPending) while (tasksPending)
{ {
@ -1232,17 +1234,9 @@ GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery)
* for the transaction in addition to purging the connection cache's entry. * for the transaction in addition to purging the connection cache's entry.
*/ */
static void static void
PurgeConnectionForPlacement(ShardPlacement *placement) PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement)
{ {
NodeConnectionKey nodeKey; CloseConnectionByPGconn(connection);
char *currentUser = CurrentUserName();
MemSet(&nodeKey, 0, sizeof(NodeConnectionKey));
strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
nodeKey.nodePort = placement->nodePort;
strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN);
PurgeConnectionByKey(&nodeKey);
/* /*
* The following is logically identical to RemoveXactConnection, but since * The following is logically identical to RemoveXactConnection, but since
@ -1254,7 +1248,13 @@ PurgeConnectionForPlacement(ShardPlacement *placement)
{ {
NodeConnectionEntry *participantEntry = NULL; NodeConnectionEntry *participantEntry = NULL;
bool entryFound = false; bool entryFound = false;
NodeConnectionKey nodeKey;
char *currentUser = CurrentUserName();
MemSet(&nodeKey, 0, sizeof(NodeConnectionKey));
strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
nodeKey.nodePort = placement->nodePort;
strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN);
Assert(IsTransactionBlock()); Assert(IsTransactionBlock());
/* the participant hash doesn't use the user field */ /* the participant hash doesn't use the user field */
@ -1797,7 +1797,6 @@ RouterTransactionCallback(XactEvent event, void *arg)
} }
/* reset transaction state */ /* reset transaction state */
XactModificationLevel = XACT_MODIFICATION_NONE;
xactParticipantHash = NULL; xactParticipantHash = NULL;
xactShardConnSetList = NIL; xactShardConnSetList = NIL;
subXactAbortAttempted = false; subXactAbortAttempted = false;
@ -1861,7 +1860,7 @@ ExecuteTransactionEnd(bool commit)
else else
{ {
WarnRemoteError(connection, result); WarnRemoteError(connection, result);
PurgeConnection(participant->connection); CloseConnectionByPGconn(participant->connection);
participant->connection = NULL; participant->connection = NULL;
} }

View File

@ -19,6 +19,7 @@
#include "access/htup_details.h" #include "access/htup_details.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
#include "distributed/connection_management.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
@ -269,7 +270,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
{ {
StoreErrorMessage(connection, queryResultString); StoreErrorMessage(connection, queryResultString);
statusArray[commandIndex] = false; statusArray[commandIndex] = false;
PQfinish(connection); CloseConnectionByPGconn(connection);
connectionArray[commandIndex] = NULL; connectionArray[commandIndex] = NULL;
finishedCount++; finishedCount++;
} }
@ -298,7 +299,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
finishedCount++; finishedCount++;
statusArray[commandIndex] = success; statusArray[commandIndex] = success;
connectionArray[commandIndex] = NULL; connectionArray[commandIndex] = NULL;
PQfinish(connection); CloseConnectionByPGconn(connection);
} }
} }
@ -509,7 +510,7 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString,
PQclear(queryResult); PQclear(queryResult);
/* close the connection */ /* close the connection */
PQfinish(nodeConnection); CloseConnectionByPGconn(nodeConnection);
nodeConnection = NULL; nodeConnection = NULL;
} }
PG_CATCH(); PG_CATCH();
@ -517,7 +518,7 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString,
StoreErrorMessage(nodeConnection, queryResultString); StoreErrorMessage(nodeConnection, queryResultString);
/* close the connection */ /* close the connection */
PQfinish(nodeConnection); CloseConnectionByPGconn(nodeConnection);
nodeConnection = NULL; nodeConnection = NULL;
} }
PG_END_TRY(); PG_END_TRY();

View File

@ -20,6 +20,7 @@
#include "executor/executor.h" #include "executor/executor.h"
#include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodefuncs.h"
#include "distributed/commit_protocol.h" #include "distributed/commit_protocol.h"
#include "distributed/connection_management.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/multi_copy.h" #include "distributed/multi_copy.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
@ -32,7 +33,9 @@
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "distributed/multi_utility.h" #include "distributed/multi_utility.h"
#include "distributed/remote_commands.h"
#include "distributed/task_tracker.h" #include "distributed/task_tracker.h"
#include "distributed/transaction_management.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "postmaster/postmaster.h" #include "postmaster/postmaster.h"
@ -154,6 +157,10 @@ _PG_init(void)
/* organize that task tracker is started once server is up */ /* organize that task tracker is started once server is up */
TaskTrackerRegister(); TaskTrackerRegister();
/* initialize coordinated transaction management */
InitializeTransactionManagement();
InitializeConnectionManagement();
/* initialize transaction callbacks */ /* initialize transaction callbacks */
RegisterRouterExecutorXactCallbacks(); RegisterRouterExecutorXactCallbacks();
RegisterShardPlacementXactCallbacks(); RegisterShardPlacementXactCallbacks();
@ -270,6 +277,16 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL, GUC_NO_SHOW_ALL,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.log_remote_commands",
gettext_noop("Log queries sent to other nodes in the server log"),
NULL,
&LogRemoteCommands,
false,
PGC_USERSET,
0,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.explain_multi_logical_plan", "citus.explain_multi_logical_plan",
gettext_noop("Enables Explain to print out distributed logical plans."), gettext_noop("Enables Explain to print out distributed logical plans."),

View File

@ -21,6 +21,7 @@
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
#include "distributed/connection_management.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */ #include "distributed/test_helper_functions.h" /* IWYU pragma: keep */
#include "utils/elog.h" #include "utils/elog.h"
@ -124,7 +125,7 @@ get_and_purge_connection(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(false); PG_RETURN_BOOL(false);
} }
PurgeConnection(connection); CloseConnectionByPGconn(connection);
PG_RETURN_BOOL(true); PG_RETURN_BOOL(true);
} }
@ -148,7 +149,7 @@ connect_and_purge_connection(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(false); PG_RETURN_BOOL(false);
} }
PurgeConnection(connection); CloseConnectionByPGconn(connection);
PG_RETURN_BOOL(true); PG_RETURN_BOOL(true);
} }

View File

@ -26,10 +26,6 @@
static uint32 DistributedTransactionId = 0; static uint32 DistributedTransactionId = 0;
/* the commit protocol to use for COPY commands */
int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC;
/* /*
* InitializeDistributedTransaction prepares the distributed transaction ID * InitializeDistributedTransaction prepares the distributed transaction ID
* used in transaction names. * used in transaction names.

View File

@ -16,6 +16,7 @@
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/commit_protocol.h" #include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
#include "distributed/connection_management.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
@ -332,7 +333,6 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg)
CloseConnections(connectionList); CloseConnections(connectionList);
shardConnectionHash = NULL; shardConnectionHash = NULL;
XactModificationLevel = XACT_MODIFICATION_NONE;
subXactAbortAttempted = false; subXactAbortAttempted = false;
} }
@ -362,6 +362,6 @@ CloseConnections(List *connectionList)
(TransactionConnection *) lfirst(connectionCell); (TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection; PGconn *connection = transactionConnection->connection;
PQfinish(connection); CloseConnectionByPGconn(connection);
} }
} }

View File

@ -0,0 +1,144 @@
/*-------------------------------------------------------------------------
*
* transaction_management.c
*
* Transaction management for Citus. Most of the work is delegated to other
* subsystems, this files, and especially CoordinatedTransactionCallback,
* coordinates the work between them.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/xact.h"
#include "distributed/connection_management.h"
#include "distributed/hash_helpers.h"
#include "distributed/transaction_management.h"
#include "utils/hsearch.h"
CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
/* GUC, the commit protocol to use for commands affecting more than one connection */
int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC;
/* state needed to keep track of operations used during a transaction */
XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
static bool subXactAbortAttempted = false;
/* transaction management functions */
static void CoordinatedTransactionCallback(XactEvent event, void *arg);
static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
SubTransactionId parentSubid, void *arg);
void
InitializeTransactionManagement(void)
{
/* hook into transaction machinery */
RegisterXactCallback(CoordinatedTransactionCallback, NULL);
RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL);
}
/*
* Transaction management callback, handling coordinated transaction, and
* transaction independent connection management.
*
* NB: There should only ever be a single transaction callback in citus, the
* ordering between the callbacks and thee actions within those callbacks
* otherwise becomes too undeterministic / hard to reason about.
*/
static void
CoordinatedTransactionCallback(XactEvent event, void *arg)
{
switch (event)
{
case XACT_EVENT_COMMIT:
{
/* close connections etc. */
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
{
AfterXactConnectionHandling(true);
}
Assert(!subXactAbortAttempted);
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
XactModificationLevel = XACT_MODIFICATION_NONE;
}
break;
case XACT_EVENT_ABORT:
{
/* close connections etc. */
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
{
AfterXactConnectionHandling(false);
}
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
XactModificationLevel = XACT_MODIFICATION_NONE;
subXactAbortAttempted = false;
}
break;
case XACT_EVENT_PARALLEL_COMMIT:
case XACT_EVENT_PARALLEL_ABORT:
case XACT_EVENT_PREPARE:
{ }
break;
case XACT_EVENT_PRE_COMMIT:
{
if (subXactAbortAttempted)
{
subXactAbortAttempted = false;
if (XactModificationLevel != XACT_MODIFICATION_NONE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot ROLLBACK TO SAVEPOINT in transactions "
"which modify distributed tables")));
}
}
}
break;
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_PREPARE:
{
if (CurrentCoordinatedTransactionState > COORD_TRANS_NONE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use 2PC in transactions involving "
"multiple servers")));
}
}
break;
}
}
/*
* Subtransaction callback - currently only used to remember whether a
* savepoint has been rolled back, as we don't support that.
*/
static void
CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
SubTransactionId parentSubid, void *arg)
{
if (event == SUBXACT_EVENT_ABORT_SUB)
{
subXactAbortAttempted = true;
}
}

View File

@ -233,12 +233,12 @@ SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char
PQgetResult(workerConnection); PQgetResult(workerConnection);
/* we no longer need this connection */ /* we no longer need this connection */
PQfinish(workerConnection); CloseConnectionByPGconn(workerConnection);
} }
PG_CATCH(); PG_CATCH();
{ {
/* close the connection */ /* close the connection */
PQfinish(workerConnection); CloseConnectionByPGconn(workerConnection);
PG_RE_THROW(); PG_RE_THROW();
} }
@ -280,7 +280,7 @@ RemoveWorkerTransaction(char *nodeName, int32 nodePort)
PGconn *connection = transactionConnection->connection; PGconn *connection = transactionConnection->connection;
/* closing the connection will rollback all uncommited transactions */ /* closing the connection will rollback all uncommited transactions */
PQfinish(connection); CloseConnectionByPGconn(connection);
workerConnectionList = list_delete(workerConnectionList, transactionConnection); workerConnectionList = list_delete(workerConnectionList, transactionConnection);
} }

View File

@ -2,7 +2,7 @@
* *
* connection_cache.c * connection_cache.c
* *
* This file contains functions to implement a connection hash. * Legacy connection caching layer. Will be removed entirely.
* *
* Copyright (c) 2014-2016, Citus Data, Inc. * Copyright (c) 2014-2016, Citus Data, Inc.
* *
@ -19,8 +19,10 @@
#include <string.h> #include <string.h>
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "distributed/connection_management.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "mb/pg_wchar.h" #include "mb/pg_wchar.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/elog.h" #include "utils/elog.h"
@ -30,18 +32,7 @@
#include "utils/palloc.h" #include "utils/palloc.h"
/* state needed to keep track of operations used during a transaction */
XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
/*
* NodeConnectionHash is the connection hash itself. It begins uninitialized.
* The first call to GetOrEstablishConnection triggers hash creation.
*/
static HTAB *NodeConnectionHash = NULL;
/* local function forward declarations */ /* local function forward declarations */
static HTAB * CreateNodeConnectionHash(void);
static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError); static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError);
@ -60,77 +51,26 @@ static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseEr
PGconn * PGconn *
GetOrEstablishConnection(char *nodeName, int32 nodePort) GetOrEstablishConnection(char *nodeName, int32 nodePort)
{ {
int connectionFlags = SESSION_LIFESPAN;
PGconn *connection = NULL; PGconn *connection = NULL;
NodeConnectionKey nodeConnectionKey; MultiConnection *multiConnection =
NodeConnectionEntry *nodeConnectionEntry = NULL; GetNodeConnection(connectionFlags, nodeName, nodePort);
bool entryFound = false;
bool needNewConnection = true;
char *userName = CurrentUserName();
/* check input */ if (PQstatus(multiConnection->pgConn) == CONNECTION_OK)
if (strnlen(nodeName, MAX_NODE_LENGTH + 1) > MAX_NODE_LENGTH)
{ {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), connection = multiConnection->pgConn;
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
} }
else
/* if first call, initialize the connection hash */
if (NodeConnectionHash == NULL)
{ {
NodeConnectionHash = CreateNodeConnectionHash(); ReportConnectionError(multiConnection, WARNING);
} CloseConnection(multiConnection);
connection = NULL;
memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey));
strlcpy(nodeConnectionKey.nodeName, nodeName, MAX_NODE_LENGTH + 1);
nodeConnectionKey.nodePort = nodePort;
strlcpy(nodeConnectionKey.nodeUser, userName, NAMEDATALEN);
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
HASH_FIND, &entryFound);
if (entryFound)
{
connection = nodeConnectionEntry->connection;
if (PQstatus(connection) == CONNECTION_OK)
{
needNewConnection = false;
}
else
{
PurgeConnection(connection);
}
}
if (needNewConnection)
{
connection = ConnectToNode(nodeName, nodePort, nodeConnectionKey.nodeUser);
if (connection != NULL)
{
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
HASH_ENTER, &entryFound);
nodeConnectionEntry->connection = connection;
}
} }
return connection; return connection;
} }
/*
* PurgeConnection removes the given connection from the connection hash and
* closes it using PQfinish. If our hash does not contain the given connection,
* this method simply prints a warning and exits.
*/
void
PurgeConnection(PGconn *connection)
{
NodeConnectionKey nodeConnectionKey;
BuildKeyForConnection(connection, &nodeConnectionKey);
PurgeConnectionByKey(&nodeConnectionKey);
}
/* /*
* Utility method to simplify populating a connection cache key with relevant * Utility method to simplify populating a connection cache key with relevant
* fields from a provided connection. * fields from a provided connection.
@ -174,60 +114,6 @@ BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey)
} }
PGconn *
PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey)
{
bool entryFound = false;
NodeConnectionEntry *nodeConnectionEntry = NULL;
PGconn *connection = NULL;
if (NodeConnectionHash != NULL)
{
nodeConnectionEntry = hash_search(NodeConnectionHash, nodeConnectionKey,
HASH_REMOVE, &entryFound);
}
if (entryFound)
{
connection = nodeConnectionEntry->connection;
PQfinish(nodeConnectionEntry->connection);
}
return connection;
}
/*
* SqlStateMatchesCategory returns true if the given sql state (which may be
* NULL if unknown) is in the given error category. Note that we use
* ERRCODE_TO_CATEGORY macro to determine error category of the sql state and
* expect the caller to use the same macro for the error category.
*/
bool
SqlStateMatchesCategory(char *sqlStateString, int category)
{
bool sqlStateMatchesCategory = false;
int sqlState = 0;
int sqlStateCategory = 0;
if (sqlStateString == NULL)
{
return false;
}
sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2],
sqlStateString[3], sqlStateString[4]);
sqlStateCategory = ERRCODE_TO_CATEGORY(sqlState);
if (sqlStateCategory == category)
{
sqlStateMatchesCategory = true;
}
return sqlStateMatchesCategory;
}
/* /*
* WarnRemoteError retrieves error fields from a remote result and produces an * WarnRemoteError retrieves error fields from a remote result and produces an
* error report at the WARNING level after amending the error with a CONTEXT * error report at the WARNING level after amending the error with a CONTEXT
@ -296,13 +182,11 @@ ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError)
} }
/* /*
* If requested, actually raise an error. This necessitates purging the * If requested, actually raise an error.
* connection so it doesn't remain in the hash in an invalid state.
*/ */
if (raiseError) if (raiseError)
{ {
errorLevel = ERROR; errorLevel = ERROR;
PurgeConnection(connection);
} }
if (sqlState == ERRCODE_CONNECTION_FAILURE) if (sqlState == ERRCODE_CONNECTION_FAILURE)
@ -323,79 +207,34 @@ ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError)
} }
/*
* CreateNodeConnectionHash returns a newly created hash table suitable for
* storing unlimited connections indexed by node name and port.
*/
static HTAB *
CreateNodeConnectionHash(void)
{
HTAB *nodeConnectionHash = NULL;
HASHCTL info;
int hashFlags = 0;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(NodeConnectionKey);
info.entrysize = sizeof(NodeConnectionEntry);
info.hash = tag_hash;
info.hcxt = CacheMemoryContext;
hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
nodeConnectionHash = hash_create("citus connection cache", 32, &info, hashFlags);
return nodeConnectionHash;
}
/* /*
* ConnectToNode opens a connection to a remote PostgreSQL server. The function * ConnectToNode opens a connection to a remote PostgreSQL server. The function
* configures the connection's fallback application name to 'citus' and sets * configures the connection's fallback application name to 'citus' and sets
* the remote encoding to match the local one. All parameters are required to * the remote encoding to match the local one. All parameters are required to
* be non NULL. * be non NULL.
* *
* We attempt to connect up to MAX_CONNECT_ATTEMPT times. After that we give up * This is only a thin layer over connection_management.[ch], and will be
* and return NULL. * removed soon.
*/ */
PGconn * PGconn *
ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser) ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser)
{ {
/* don't want already established connections */
int connectionFlags = FORCE_NEW_CONNECTION;
PGconn *connection = NULL; PGconn *connection = NULL;
const char *clientEncoding = GetDatabaseEncodingName(); MultiConnection *multiConnection =
const char *dbname = get_database_name(MyDatabaseId); GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, nodeUser,
int attemptIndex = 0; NULL);
const char *keywordArray[] = { if (PQstatus(multiConnection->pgConn) == CONNECTION_OK)
"host", "port", "fallback_application_name",
"client_encoding", "connect_timeout", "dbname", "user", NULL
};
char nodePortString[12];
const char *valueArray[] = {
nodeName, nodePortString, "citus", clientEncoding,
CLIENT_CONNECT_TIMEOUT_SECONDS, dbname, nodeUser, NULL
};
sprintf(nodePortString, "%d", nodePort);
Assert(sizeof(keywordArray) == sizeof(valueArray));
for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++)
{ {
connection = PQconnectdbParams(keywordArray, valueArray, false); connection = multiConnection->pgConn;
if (PQstatus(connection) == CONNECTION_OK) }
{ else
break; {
} ReportConnectionError(multiConnection, WARNING);
else CloseConnection(multiConnection);
{ connection = NULL;
/* warn if still erroring on final attempt */
if (attemptIndex == MAX_CONNECT_ATTEMPTS - 1)
{
WarnRemoteError(connection, NULL);
}
PQfinish(connection);
connection = NULL;
}
} }
return connection; return connection;

View File

@ -0,0 +1,34 @@
/*-------------------------------------------------------------------------
*
* hash_helpers.c
* Helpers for dynahash.c style hash tables.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/hash_helpers.h"
#include "utils/hsearch.h"
/*
* Empty a hash, without destroying the hash table itself.
*/
void
hash_delete_all(HTAB *htab)
{
HASH_SEQ_STATUS status;
void *entry = NULL;
hash_seq_init(&status, htab);
while ((entry = hash_seq_search(&status)) != 0)
{
bool found = false;
hash_search(htab, entry, HASH_REMOVE, &found);
Assert(found);
}
}

View File

@ -14,18 +14,12 @@
#include "access/xact.h" #include "access/xact.h"
#include "distributed/connection_management.h"
#include "libpq-fe.h" #include "libpq-fe.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
/* Enumeration that defines the different commit protocols available */
typedef enum
{
COMMIT_PROTOCOL_1PC = 0,
COMMIT_PROTOCOL_2PC = 1
} CommitProtocolType;
/* Enumeration that defines different remote transaction states */ /* Enumeration that defines different remote transaction states */
typedef enum typedef enum
{ {
@ -51,10 +45,6 @@ typedef struct TransactionConnection
} TransactionConnection; } TransactionConnection;
/* config variable managed via guc.c */
extern int MultiShardCommitProtocol;
/* Functions declarations for transaction and connection management */ /* Functions declarations for transaction and connection management */
extern void InitializeDistributedTransaction(void); extern void InitializeDistributedTransaction(void);
extern void PrepareRemoteTransactions(List *connectionList); extern void PrepareRemoteTransactions(List *connectionList);

View File

@ -16,21 +16,8 @@
#include "c.h" #include "c.h"
#include "libpq-fe.h" #include "libpq-fe.h"
#include "nodes/pg_list.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
#include "distributed/connection_management.h"
/* maximum duration to wait for connection */
#define CLIENT_CONNECT_TIMEOUT_SECONDS "5"
/* maximum (textual) lengths of hostname and port */
#define MAX_NODE_LENGTH 255
#define MAX_PORT_LENGTH 10
/* times to attempt connection (or reconnection) */
#define MAX_CONNECT_ATTEMPTS 2
/* SQL statement for testing */
#define TEST_SQL "DO $$ BEGIN RAISE EXCEPTION 'Raised remotely!'; END $$"
/* /*
* NodeConnectionKey acts as the key to index into the (process-local) hash * NodeConnectionKey acts as the key to index into the (process-local) hash
@ -52,26 +39,9 @@ typedef struct NodeConnectionEntry
} NodeConnectionEntry; } NodeConnectionEntry;
/* describes what kind of modifications have occurred in the current transaction */
typedef enum
{
XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */
XACT_MODIFICATION_NONE, /* no modifications have taken place */
XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */
XACT_MODIFICATION_MULTI_SHARD /* multi-shard modifications have occurred */
} XactModificationType;
/* state needed to prevent new connections during modifying transactions */
extern XactModificationType XactModificationLevel;
/* function declarations for obtaining and using a connection */ /* function declarations for obtaining and using a connection */
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
extern void PurgeConnection(PGconn *connection);
extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey); extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey);
extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey);
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
extern void WarnRemoteError(PGconn *connection, PGresult *result); extern void WarnRemoteError(PGconn *connection, PGresult *result);
extern void ReraiseRemoteError(PGconn *connection, PGresult *result); extern void ReraiseRemoteError(PGconn *connection, PGresult *result);
extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser); extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser);

View File

@ -0,0 +1,128 @@
/*-------------------------------------------------------------------------
*
* connection_management.h
* Central management of connections and their life-cycle
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CONNECTION_MANAGMENT_H
#define CONNECTION_MANAGMENT_H
#include "distributed/transaction_management.h"
#include "lib/ilist.h"
#include "utils/hsearch.h"
#include "utils/timestamp.h"
/* maximum (textual) lengths of hostname and port */
#define MAX_NODE_LENGTH 255 /* includes 0 byte */
#define CLIENT_CONNECT_TIMEOUT_SECONDS_INT 5
/* forward declare, to avoid forcing large headers on everyone */
struct pg_conn; /* target of the PGconn typedef */
struct MemoryContextData;
/*
* Flags determining connection establishment behaviour.
*/
enum MultiConnectionMode
{
/* force establishment of a new connection */
FORCE_NEW_CONNECTION = 1 << 0,
/* mark returned connection as having session lifespan */
SESSION_LIFESPAN = 1 << 1
};
/* declaring this directly above makes uncrustify go crazy */
typedef enum MultiConnectionMode MultiConnectionMode;
typedef struct MultiConnection
{
/* connection details, useful for error messages and such. */
char hostname[MAX_NODE_LENGTH];
int32 port;
char user[NAMEDATALEN];
char database[NAMEDATALEN];
/* underlying libpq connection */
struct pg_conn *pgConn;
/* is the connection intended to be kept after transaction end */
bool sessionLifespan;
/* is the connection currently in use, and shouldn't be used by anything else */
bool claimedExclusively;
/* time connection establishment was started, for timeout */
TimestampTz connectionStart;
/* membership in list of list of connections in ConnectionHashEntry */
dlist_node connectionNode;
} MultiConnection;
/*
* Central connection management hash, mapping (host, port, user, database) to
* a list of connections.
*
* This hash is used to keep track of which connections are open to which
* node. Besides allowing connection reuse, that information is e.g. used to
* handle closing connections after the end of a transaction.
*/
/* hash key */
typedef struct ConnectionHashKey
{
char hostname[MAX_NODE_LENGTH];
int32 port;
char user[NAMEDATALEN];
char database[NAMEDATALEN];
} ConnectionHashKey;
/* hash entry */
typedef struct ConnectionHashEntry
{
ConnectionHashKey key;
dlist_head *connections;
} ConnectionHashEntry;
/* the hash table */
extern HTAB *ConnectionHash;
/* context for all connection and transaction related memory */
extern struct MemoryContextData *ConnectionContext;
extern void AfterXactConnectionHandling(bool isCommit);
extern void InitializeConnectionManagement(void);
/* Low-level connection establishment APIs */
extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname,
int32 port);
extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname,
int32 port);
extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname,
int32 port, const char *user, const
char *database);
extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
const char *hostname,
int32 port,
const char *user,
const char *database);
extern MultiConnection * GetConnectionFromPGconn(struct pg_conn *pqConn);
extern void CloseConnection(MultiConnection *connection);
extern void CloseConnectionByPGconn(struct pg_conn *pqConn);
/* dealing with a connection */
extern void FinishConnectionEstablishment(MultiConnection *connection);
extern void ClaimConnectionExclusively(MultiConnection *connection);
extern void UnclaimConnection(MultiConnection *connection);
#endif /* CONNECTION_MANAGMENT_H */

View File

@ -0,0 +1,30 @@
/*-------------------------------------------------------------------------
* hash_helpers.h
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef HASH_HELPERS_H
#define HASH_HELPERS_H
#include "utils/hsearch.h"
/*
* Combine two hash values, resulting in another hash value, with decent bit
* mixing.
*
* Similar to boost's hash_combine().
*/
static inline uint32
hash_combine(uint32 a, uint32 b)
{
a ^= b + 0x9e3779b9 + (a << 6) + (a >> 2);
return a;
}
extern void hash_delete_all(HTAB *htab);
#endif

View File

@ -15,10 +15,8 @@
#define MULTI_CLIENT_EXECUTOR_H #define MULTI_CLIENT_EXECUTOR_H
#define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */ #define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */
#define CLIENT_CONNECT_TIMEOUT 5 /* connection timeout in seconds */
#define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */ #define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */
#define STRING_BUFFER_SIZE 1024 /* buffer size for character arrays */ #define STRING_BUFFER_SIZE 1024 /* buffer size for character arrays */
#define CONN_INFO_TEMPLATE "host=%s port=%u dbname=%s user=%s connect_timeout=%u"
/* Enumeration to track one client connection's status */ /* Enumeration to track one client connection's status */

View File

@ -0,0 +1,36 @@
/*-------------------------------------------------------------------------
*
* remote_commands.h
* Helpers to execute commands on remote nodes, over libpq.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef REMOTE_COMMAND_H
#define REMOTE_COMMAND_H
#include "distributed/connection_management.h"
/* GUC, determining whether statements sent to remote nodes are logged */
extern bool LogRemoteCommands;
/* simple helpers */
extern bool IsResponseOK(struct pg_result *result);
extern void ForgetResults(MultiConnection *connection);
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
/* report errors & warnings */
extern void ReportConnectionError(MultiConnection *connection, int elevel);
extern void ReportResultError(MultiConnection *connection, struct pg_result *result,
int elevel);
extern void LogRemoteCommand(MultiConnection *connection, const char *command);
/* wrappers around libpq functions, with command logging support */
extern int SendRemoteCommand(MultiConnection *connection, const char *command);
#endif /* REMOTE_COMMAND_H */

View File

@ -0,0 +1,57 @@
/*-------------------------------------------------------------------------
* transaction_management.h
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef TRANSACTION_MANAGMENT_H
#define TRANSACTION_MANAGMENT_H
/* describes what kind of modifications have occurred in the current transaction */
typedef enum
{
XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */
XACT_MODIFICATION_NONE, /* no modifications have taken place */
XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */
XACT_MODIFICATION_MULTI_SHARD /* multi-shard modifications have occurred */
} XactModificationType;
/*
* Enum defining the state of a coordinated (i.e. a transaction potentially
* spanning several nodes).
*/
typedef enum CoordinatedTransactionState
{
/* no coordinated transaction in progress, no connections established */
COORD_TRANS_NONE,
/* no coordinated transaction in progress, but connections established */
COORD_TRANS_IDLE
} CoordinatedTransactionState;
/* Enumeration that defines the different commit protocols available */
typedef enum
{
COMMIT_PROTOCOL_1PC = 0,
COMMIT_PROTOCOL_2PC = 1
} CommitProtocolType;
/* config variable managed via guc.c */
extern int MultiShardCommitProtocol;
/* state needed to prevent new connections during modifying transactions */
extern XactModificationType XactModificationLevel;
extern CoordinatedTransactionState CurrentCoordinatedTransactionState;
/* initialization function(s) */
extern void InitializeTransactionManagement(void);
#endif /* TRANSACTION_MANAGMENT_H */

View File

@ -32,7 +32,7 @@ CREATE FUNCTION set_connection_status_bad(cstring, integer)
\set VERBOSITY terse \set VERBOSITY terse
-- connect to non-existent host -- connect to non-existent host
SELECT initialize_remote_temp_table('dummy-host-name', 12345); SELECT initialize_remote_temp_table('dummy-host-name', 12345);
WARNING: connection failed to dummy-host-name:12345 WARNING: connection error: dummy-host-name:12345
initialize_remote_temp_table initialize_remote_temp_table
------------------------------ ------------------------------
f f