citus/src/backend/distributed/connection/connection_management.c

859 lines
22 KiB
C

/*-------------------------------------------------------------------------
*
* connection_management.c
* Central management of connections and their life-cycle
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/hash.h"
#include "commands/dbcommands.h"
#include "distributed/connection_management.h"
#include "distributed/metadata_cache.h"
#include "distributed/hash_helpers.h"
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/version_compat.h"
#include "mb/pg_wchar.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
int NodeConnectionTimeout = 5000;
HTAB *ConnectionHash = NULL;
HTAB *ConnParamsHash = NULL;
MemoryContext ConnectionContext = NULL;
static uint32 ConnectionHashHash(const void *key, Size keysize);
static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static void DefaultCitusNoticeProcessor(void *arg, const char *message);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
/*
* Initialize per-backend connection management infrastructure.
*/
void
InitializeConnectionManagement(void)
{
HASHCTL info, connParamsInfo;
uint32 hashFlags = 0;
/*
* Create a single context for connection and transaction related memory
* management. Doing so, instead of allocating in TopMemoryContext, makes
* it easier to associate used memory.
*/
ConnectionContext = AllocSetContextCreateExtended(TopMemoryContext,
"Connection Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/* create (host,port,user,database) -> [connection] hash */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(ConnectionHashKey);
info.entrysize = sizeof(ConnectionHashEntry);
info.hash = ConnectionHashHash;
info.match = ConnectionHashCompare;
info.hcxt = ConnectionContext;
hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
memcpy(&connParamsInfo, &info, sizeof(HASHCTL));
connParamsInfo.entrysize = sizeof(ConnParamsHashEntry);
ConnectionHash = hash_create("citus connection cache (host,port,user,database)",
64, &info, hashFlags);
ConnParamsHash = hash_create("citus connparams cache (host,port,user,database)",
64, &connParamsInfo, hashFlags);
}
/*
* Perform connection management activity after the end of a transaction. Both
* COMMIT and ABORT paths are handled here.
*
* This is called by Citus' global transaction callback.
*/
void
AfterXactConnectionHandling(bool isCommit)
{
HASH_SEQ_STATUS status;
ConnectionHashEntry *entry;
hash_seq_init(&status, ConnectionHash);
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
{
AfterXactHostConnectionHandling(entry, isCommit);
/*
* NB: We leave the hash entry in place, even if there's no individual
* connections in it anymore. There seems no benefit in deleting it,
* and it'll save a bit of work in the next transaction.
*/
}
}
/*
* GetNodeConnection() establishes a connection to remote node, using default
* user and database.
*
* See StartNodeUserDatabaseConnection for details.
*/
MultiConnection *
GetNodeConnection(uint32 flags, const char *hostname, int32 port)
{
return GetNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL);
}
/*
* GetNonDataAccessConnection() establishes a connection to remote node, using
* default user and database. The returned connection is guaranteed to not have
* been used for any data access over any placements.
*
* See StartNonDataAccessConnection for details.
*/
MultiConnection *
GetNonDataAccessConnection(const char *hostname, int32 port)
{
MultiConnection *connection;
connection = StartNonDataAccessConnection(hostname, port);
FinishConnectionEstablishment(connection);
return connection;
}
/*
* StartNonDataAccessConnection() initiates a connection that is
* guaranteed to not have been used for any data access over any
* placements.
*
* The returned connection is started with the default user and database.
*/
MultiConnection *
StartNonDataAccessConnection(const char *hostname, int32 port)
{
uint32 flags = 0;
MultiConnection *connection = StartNodeConnection(flags, hostname, port);
if (ConnectionUsedForAnyPlacements(connection))
{
flags = FORCE_NEW_CONNECTION;
connection = StartNodeConnection(flags, hostname, port);
}
return connection;
}
/*
* StartNodeConnection initiates a connection to remote node, using default
* user and database.
*
* See StartNodeUserDatabaseConnection for details.
*/
MultiConnection *
StartNodeConnection(uint32 flags, const char *hostname, int32 port)
{
return StartNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL);
}
/*
* GetNodeUserDatabaseConnection establishes connection to remote node.
*
* See StartNodeUserDatabaseConnection for details.
*/
MultiConnection *
GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const
char *user, const char *database)
{
MultiConnection *connection;
connection = StartNodeUserDatabaseConnection(flags, hostname, port, user, database);
FinishConnectionEstablishment(connection);
return connection;
}
/*
* StartNodeUserDatabaseConnection() initiates a connection to a remote node.
*
* If user or database are NULL, the current session's defaults are used. The
* following flags influence connection establishment behaviour:
* - SESSION_LIFESPAN - the connection should persist after transaction end
* - FORCE_NEW_CONNECTION - a new connection is required
*
* The returned connection has only been initiated, not fully
* established. That's useful to allow parallel connection establishment. If
* that's not desired use the Get* variant.
*/
MultiConnection *
StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const
char *user, const char *database)
{
ConnectionHashKey key;
ConnectionHashEntry *entry = NULL;
MultiConnection *connection;
bool found;
/* do some minimal input checks */
strlcpy(key.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
key.port = port;
if (user)
{
strlcpy(key.user, user, NAMEDATALEN);
}
else
{
strlcpy(key.user, CurrentUserName(), NAMEDATALEN);
}
if (database)
{
strlcpy(key.database, database, NAMEDATALEN);
}
else
{
strlcpy(key.database, get_database_name(MyDatabaseId), NAMEDATALEN);
}
if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
{
CurrentCoordinatedTransactionState = COORD_TRANS_IDLE;
}
/*
* Lookup relevant hash entry. We always enter. If only a cached
* connection is desired, and there's none, we'll simply leave the
* connection list empty.
*/
entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
if (!found)
{
entry->connections = MemoryContextAlloc(ConnectionContext,
sizeof(dlist_head));
dlist_init(entry->connections);
}
/* if desired, check whether there's a usable connection */
if (!(flags & FORCE_NEW_CONNECTION))
{
/* check connection cache for a connection that's not already in use */
connection = FindAvailableConnection(entry->connections, flags);
if (connection)
{
if (flags & SESSION_LIFESPAN)
{
connection->sessionLifespan = true;
}
return connection;
}
}
/*
* Either no caching desired, or no pre-established, non-claimed,
* connection present. Initiate connection establishment.
*/
connection = StartConnectionEstablishment(&key);
dlist_push_tail(entry->connections, &connection->connectionNode);
ResetShardPlacementAssociation(connection);
if (flags & SESSION_LIFESPAN)
{
connection->sessionLifespan = true;
}
return connection;
}
/* StartNodeUserDatabaseConnection() helper */
static MultiConnection *
FindAvailableConnection(dlist_head *connections, uint32 flags)
{
dlist_iter iter;
dlist_foreach(iter, connections)
{
MultiConnection *connection =
dlist_container(MultiConnection, connectionNode, iter.cur);
/* don't return claimed connections */
if (connection->claimedExclusively)
{
continue;
}
return connection;
}
return NULL;
}
/*
* CloseNodeConnectionsAfterTransaction sets the sessionLifespan flag of the connections
* to a particular node as false. This is mainly used when a worker leaves the cluster.
*/
void
CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
{
HASH_SEQ_STATUS status;
ConnectionHashEntry *entry;
hash_seq_init(&status, ConnectionHash);
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
{
dlist_iter iter;
dlist_head *connections = NULL;
if (strcmp(entry->key.hostname, nodeName) != 0 || entry->key.port != nodePort)
{
continue;
}
connections = entry->connections;
dlist_foreach(iter, connections)
{
MultiConnection *connection =
dlist_container(MultiConnection, connectionNode, iter.cur);
connection->sessionLifespan = false;
}
}
}
/*
* Close a previously established connection.
*/
void
CloseConnection(MultiConnection *connection)
{
ConnectionHashKey key;
bool found;
/* close connection */
PQfinish(connection->pgConn);
connection->pgConn = NULL;
strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH);
key.port = connection->port;
strlcpy(key.user, connection->user, NAMEDATALEN);
strlcpy(key.database, connection->database, NAMEDATALEN);
hash_search(ConnectionHash, &key, HASH_FIND, &found);
if (found)
{
/* unlink from list of open connections */
dlist_delete(&connection->connectionNode);
/* same for transaction state and shard/placement machinery */
CloseRemoteTransaction(connection);
CloseShardPlacementAssociation(connection);
/* we leave the per-host entry alive */
pfree(connection);
}
else
{
ereport(ERROR, (errmsg("closing untracked connection")));
}
}
/*
* ShutdownConnection, if necessary cancels the currently running statement,
* and then closes the underlying libpq connection. The MultiConnection
* itself is left intact.
*
* NB: Cancelling a statement requires network IO, and currently is not
* interruptible. Unfortunately libpq does not provide a non-blocking
* implementation of PQcancel(), so we don't have much choice for now.
*/
void
ShutdownConnection(MultiConnection *connection)
{
/*
* Only cancel statement if there's currently one running, and the
* connection is in an OK state.
*/
if (PQstatus(connection->pgConn) == CONNECTION_OK &&
PQtransactionStatus(connection->pgConn) == PQTRANS_ACTIVE)
{
char errorMessage[256] = { 0 };
PGcancel *cancel = PQgetCancel(connection->pgConn);
if (!PQcancel(cancel, errorMessage, sizeof(errorMessage)))
{
ereport(WARNING, (errmsg("could not cancel connection: %s",
errorMessage)));
}
PQfreeCancel(cancel);
}
PQfinish(connection->pgConn);
connection->pgConn = NULL;
}
/*
* FinishConnectionListEstablishment is a wrapper around FinishConnectionEstablishment.
* The function iterates over the multiConnectionList and finishes the connection
* establishment for each multi connection.
*/
void
FinishConnectionListEstablishment(List *multiConnectionList)
{
ListCell *multiConnectionCell = NULL;
foreach(multiConnectionCell, multiConnectionList)
{
MultiConnection *multiConnection = (MultiConnection *) lfirst(
multiConnectionCell);
/* TODO: consider making connection establishment fully in parallel */
FinishConnectionEstablishment(multiConnection);
}
}
/*
* Synchronously finish connection establishment of an individual connection.
*
* TODO: Replace with variant waiting for multiple connections.
*/
void
FinishConnectionEstablishment(MultiConnection *connection)
{
static int checkIntervalMS = 200;
/*
* Loop until connection is established, or failed (possibly just timed
* out).
*/
while (true)
{
ConnStatusType status = PQstatus(connection->pgConn);
PostgresPollingStatusType pollmode;
if (status == CONNECTION_OK)
{
return;
}
/* FIXME: retries? */
if (status == CONNECTION_BAD)
{
return;
}
pollmode = PQconnectPoll(connection->pgConn);
/*
* FIXME: Do we want to add transparent retry support here?
*/
if (pollmode == PGRES_POLLING_FAILED)
{
return;
}
else if (pollmode == PGRES_POLLING_OK)
{
return;
}
else
{
Assert(pollmode == PGRES_POLLING_WRITING ||
pollmode == PGRES_POLLING_READING);
}
/* Loop, to handle poll() being interrupted by signals (EINTR) */
while (true)
{
int pollResult = 0;
/* we use poll(2) if available, otherwise select(2) */
#ifdef HAVE_POLL
struct pollfd pollFileDescriptor;
pollFileDescriptor.fd = PQsocket(connection->pgConn);
if (pollmode == PGRES_POLLING_READING)
{
pollFileDescriptor.events = POLLIN;
}
else
{
pollFileDescriptor.events = POLLOUT;
}
pollFileDescriptor.revents = 0;
/*
* Only sleep for a limited amount of time, so we can react to
* interrupts in time, even if the platform doesn't interrupt
* poll() after signal arrival.
*/
pollResult = poll(&pollFileDescriptor, 1, checkIntervalMS);
#else /* !HAVE_POLL */
fd_set readFileDescriptorSet;
fd_set writeFileDescriptorSet;
fd_set exceptionFileDescriptorSet;
int selectTimeoutUS = checkIntervalMS * 1000;
struct timeval selectTimeout = { 0, selectTimeoutUS };
int selectFileDescriptor = PQsocket(connection->pgConn);
FD_ZERO(&readFileDescriptorSet);
FD_ZERO(&writeFileDescriptorSet);
FD_ZERO(&exceptionFileDescriptorSet);
if (pollmode == PGRES_POLLING_READING)
{
FD_SET(selectFileDescriptor, &readFileDescriptorSet);
}
else if (pollmode == PGRES_POLLING_WRITING)
{
FD_SET(selectFileDescriptor, &writeFileDescriptorSet);
}
pollResult = (select) (selectFileDescriptor + 1, &readFileDescriptorSet,
&writeFileDescriptorSet, &exceptionFileDescriptorSet,
&selectTimeout);
#endif /* HAVE_POLL */
if (pollResult == 0)
{
/*
* Timeout exceeded. Two things to do:
* - check whether any interrupts arrived and handle them
* - check whether establishment for connection already has
* lasted for too long, stop waiting if so.
*/
CHECK_FOR_INTERRUPTS();
if (TimestampDifferenceExceeds(connection->connectionStart,
GetCurrentTimestamp(),
NodeConnectionTimeout))
{
ereport(WARNING, (errmsg("could not establish connection after %u ms",
NodeConnectionTimeout)));
/* close connection, otherwise we take up resource on the other side */
PQfinish(connection->pgConn);
connection->pgConn = NULL;
break;
}
}
else if (pollResult > 0)
{
/*
* IO possible, continue connection establishment. We could
* check for timeouts here as well, but if there's progress
* there seems little point.
*/
break;
}
else
{
int errorCode = errno;
#ifdef WIN32
errorCode = WSAGetLastError();
#endif
if (errorCode == EINTR)
{
/* Retrying, signal interrupted. So check. */
CHECK_FOR_INTERRUPTS();
}
else
{
/*
* We ERROR here, instead of just returning a failed
* connection, because this shouldn't happen, and indicates a
* programming error somewhere, not a network etc. issue.
*/
ereport(ERROR, (errcode_for_socket_access(),
errmsg("poll()/select() failed: %m")));
}
}
}
}
}
/*
* ClaimConnectionExclusively signals that this connection is actively being
* used. That means it'll not be, again, returned by
* StartNodeUserDatabaseConnection() et al until releases with
* UnclaimConnection().
*/
void
ClaimConnectionExclusively(MultiConnection *connection)
{
Assert(!connection->claimedExclusively);
connection->claimedExclusively = true;
}
/*
* UnclaimConnection signals that this connection is not being used
* anymore. That means it again may be returned by
* StartNodeUserDatabaseConnection() et al.
*/
void
UnclaimConnection(MultiConnection *connection)
{
connection->claimedExclusively = false;
}
static uint32
ConnectionHashHash(const void *key, Size keysize)
{
ConnectionHashKey *entry = (ConnectionHashKey *) key;
uint32 hash = 0;
hash = string_hash(entry->hostname, NAMEDATALEN);
hash = hash_combine(hash, hash_uint32(entry->port));
hash = hash_combine(hash, string_hash(entry->user, NAMEDATALEN));
hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN));
return hash;
}
static int
ConnectionHashCompare(const void *a, const void *b, Size keysize)
{
ConnectionHashKey *ca = (ConnectionHashKey *) a;
ConnectionHashKey *cb = (ConnectionHashKey *) b;
if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 ||
ca->port != cb->port ||
strncmp(ca->user, cb->user, NAMEDATALEN) != 0 ||
strncmp(ca->database, cb->database, NAMEDATALEN) != 0)
{
return 1;
}
else
{
return 0;
}
}
/*
* Asynchronously establish connection to a remote node, but don't wait for
* that to finish. DNS lookups etc. are performed synchronously though.
*/
static MultiConnection *
StartConnectionEstablishment(ConnectionHashKey *key)
{
bool found = false;
MultiConnection *connection = NULL;
ConnParamsHashEntry *entry = NULL;
/* search our cache for precomputed connection settings */
entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found);
if (!found || !entry->isValid)
{
/* if they're not found, compute them from GUC, runtime, etc. */
GetConnParams(key, &entry->keywords, &entry->values, ConnectionContext);
entry->isValid = true;
}
connection = MemoryContextAllocZero(ConnectionContext, sizeof(MultiConnection));
strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH);
connection->port = key->port;
strlcpy(connection->database, key->database, NAMEDATALEN);
strlcpy(connection->user, key->user, NAMEDATALEN);
connection->pgConn = PQconnectStartParams((const char **) entry->keywords,
(const char **) entry->values,
false);
connection->connectionStart = GetCurrentTimestamp();
/*
* To avoid issues with interrupts not getting caught all our connections
* are managed in a non-blocking manner. remote_commands.c provides
* wrappers emulating blocking behaviour.
*/
PQsetnonblocking(connection->pgConn, true);
SetCitusNoticeProcessor(connection);
return connection;
}
/*
* Close all remote connections if necessary anymore (i.e. not session
* lifetime), or if in a failed state.
*/
static void
AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
{
dlist_mutable_iter iter;
dlist_foreach_modify(iter, entry->connections)
{
MultiConnection *connection =
dlist_container(MultiConnection, connectionNode, iter.cur);
/*
* To avoid code leaking connections we warn if connections are
* still claimed exclusively. We can only do so if the transaction
* committed, as it's normal that code didn't have chance to clean
* up after errors.
*/
if (isCommit && connection->claimedExclusively)
{
ereport(WARNING,
(errmsg("connection claimed exclusively at transaction commit")));
}
/*
* Preserve session lifespan connections if they are still healthy.
*/
if (!connection->sessionLifespan ||
PQstatus(connection->pgConn) != CONNECTION_OK ||
PQtransactionStatus(connection->pgConn) != PQTRANS_IDLE)
{
ShutdownConnection(connection);
/* unlink from list */
dlist_delete(iter.cur);
pfree(connection);
}
else
{
/* reset per-transaction state */
ResetRemoteTransaction(connection);
ResetShardPlacementAssociation(connection);
/* reset copy state */
connection->copyBytesWrittenSinceLastFlush = 0;
UnclaimConnection(connection);
}
}
}
/*
* SetCitusNoticeProcessor sets the NoticeProcessor to DefaultCitusNoticeProcessor
*/
void
SetCitusNoticeProcessor(MultiConnection *connection)
{
PQsetNoticeProcessor(connection->pgConn, DefaultCitusNoticeProcessor,
connection);
}
/*
* SetCitusNoticeLevel is used to set the notice level for distributed
* queries.
*/
void
SetCitusNoticeLevel(int level)
{
CitusNoticeLogLevel = level;
}
/*
* UnsetCitusNoticeLevel sets the CitusNoticeLogLevel back to
* its default value.
*/
void
UnsetCitusNoticeLevel()
{
CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
}
/*
* DefaultCitusNoticeProcessor is used to redirect worker notices
* from logfile to console.
*/
static void
DefaultCitusNoticeProcessor(void *arg, const char *message)
{
MultiConnection *connection = (MultiConnection *) arg;
char *nodeName = connection->hostname;
uint32 nodePort = connection->port;
char *trimmedMessage = TrimLogLevel(message);
char *level = strtok((char *) message, ":");
ereport(CitusNoticeLogLevel, (errmsg("%s", trimmedMessage),
errdetail("%s from %s:%d",
level, nodeName, nodePort)));
}
/*
* TrimLogLevel returns a copy of the string with the leading log level
* and spaces removed such as
* From:
* INFO: "normal2_102070": scanned 0 of 0 pages...
* To:
* "normal2_102070": scanned 0 of 0 pages...
*/
char *
TrimLogLevel(const char *message)
{
char *chompedMessage = pchomp(message);
size_t n;
n = 0;
while (n < strlen(chompedMessage) && chompedMessage[n] != ':')
{
n++;
}
do {
n++;
} while (n < strlen(chompedMessage) && chompedMessage[n] == ' ');
return chompedMessage + n;
}