mirror of https://github.com/citusdata/citus.git
Centralized Connection Lifetime Management.
Connections are tracked and released by integrating into postgres' transaction handling. That allows to to use connections without having to resort to having to disable interrupts or using PG_TRY/CATCH blocks to avoid leaking connections. This is intended to eventually replace multi_client_executor.c and connection_cache.c, and to provide the basis of a centralized transaction management. The newly introduced transaction hook should, in the future, be the only one in citus, to allow for proper ordering between operations. For now this central handler is responsible for releasing connections and resetting XactModificationLevel after a transaction.pull/863/head
parent
883af02b54
commit
3223b3c92d
|
@ -16,7 +16,7 @@ DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)
|
|||
DATA_built = $(foreach v,$(EXTVERSIONS),$(EXTENSION)--$(v).sql)
|
||||
|
||||
# directories with source files
|
||||
SUBDIRS = . commands executor master metadata planner relay test transaction utils worker
|
||||
SUBDIRS = . commands connection executor master metadata planner relay test transaction utils worker
|
||||
|
||||
# That patsubst rule searches all directories listed in SUBDIRS for .c
|
||||
# files, and adds the corresponding .o files to OBJS
|
||||
|
|
|
@ -0,0 +1,638 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* connection_management.c
|
||||
* Central management of connections and their life-cycle
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#ifdef HAVE_POLL_H
|
||||
#include <poll.h>
|
||||
#endif
|
||||
|
||||
#include "libpq-fe.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "access/hash.h"
|
||||
#include "commands/dbcommands.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
#include "mb/pg_wchar.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
|
||||
HTAB *ConnectionHash = NULL;
|
||||
MemoryContext ConnectionContext = NULL;
|
||||
|
||||
static uint32 ConnectionHashHash(const void *key, Size keysize);
|
||||
static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
|
||||
static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key);
|
||||
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
|
||||
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
|
||||
|
||||
|
||||
/*
|
||||
* Initialize per-backend connection management infrastructure.
|
||||
*/
|
||||
void
|
||||
InitializeConnectionManagement(void)
|
||||
{
|
||||
HASHCTL info;
|
||||
uint32 hashFlags = 0;
|
||||
|
||||
|
||||
/*
|
||||
* Create a single context for connection and transaction related memory
|
||||
* management. Doing so, instead of allocating in TopMemoryContext, makes
|
||||
* it easier to associate used memory.
|
||||
*/
|
||||
ConnectionContext = AllocSetContextCreate(TopMemoryContext, "Connection Context",
|
||||
ALLOCSET_DEFAULT_MINSIZE,
|
||||
ALLOCSET_DEFAULT_INITSIZE,
|
||||
ALLOCSET_DEFAULT_MAXSIZE);
|
||||
|
||||
/* create (host,port,user,database) -> [connection] hash */
|
||||
memset(&info, 0, sizeof(info));
|
||||
info.keysize = sizeof(ConnectionHashKey);
|
||||
info.entrysize = sizeof(ConnectionHashEntry);
|
||||
info.hash = ConnectionHashHash;
|
||||
info.match = ConnectionHashCompare;
|
||||
info.hcxt = ConnectionContext;
|
||||
hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
|
||||
|
||||
ConnectionHash = hash_create("citus connection cache (host,port,user,database)",
|
||||
64, &info, hashFlags);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Perform connection management activity after the end of a transaction. Both
|
||||
* COMMIT and ABORT paths are handled here.
|
||||
*
|
||||
* This is called by Citus' global transaction callback.
|
||||
*/
|
||||
void
|
||||
AfterXactConnectionHandling(bool isCommit)
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
ConnectionHashEntry *entry;
|
||||
|
||||
hash_seq_init(&status, ConnectionHash);
|
||||
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
|
||||
{
|
||||
AfterXactHostConnectionHandling(entry, isCommit);
|
||||
|
||||
/*
|
||||
* NB: We leave the hash entry in place, even if there's no individual
|
||||
* connections in it anymore. There seems no benefit in deleting it,
|
||||
* and it'll save a bit of work in the next transaction.
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetNodeConnection() establishes a connection to remote node, using default
|
||||
* user and database.
|
||||
*
|
||||
* See StartNodeUserDatabaseConnection for details.
|
||||
*/
|
||||
MultiConnection *
|
||||
GetNodeConnection(uint32 flags, const char *hostname, int32 port)
|
||||
{
|
||||
return GetNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StartNodeConnection initiates a connection to remote node, using default
|
||||
* user and database.
|
||||
*
|
||||
* See StartNodeUserDatabaseConnection for details.
|
||||
*/
|
||||
MultiConnection *
|
||||
StartNodeConnection(uint32 flags, const char *hostname, int32 port)
|
||||
{
|
||||
return StartNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetNodeUserDatabaseConnection establishes connection to remote node.
|
||||
*
|
||||
* See StartNodeUserDatabaseConnection for details.
|
||||
*/
|
||||
MultiConnection *
|
||||
GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const
|
||||
char *user, const char *database)
|
||||
{
|
||||
MultiConnection *connection;
|
||||
|
||||
connection = StartNodeUserDatabaseConnection(flags, hostname, port, user, database);
|
||||
|
||||
FinishConnectionEstablishment(connection);
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StartNodeUserDatabaseConnection() initiates a connection to a remote node.
|
||||
*
|
||||
* If user or database are NULL, the current session's defaults are used. The
|
||||
* following flags influence connection establishment behaviour:
|
||||
* - SESSION_LIFESPAN - the connection should persist after transaction end
|
||||
* - FORCE_NEW_CONNECTION - a new connection is required
|
||||
*
|
||||
* The returned connection has only been initiated, not fully
|
||||
* established. That's useful to allow parallel connection establishment. If
|
||||
* that's not desired use the Get* variant.
|
||||
*/
|
||||
MultiConnection *
|
||||
StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const
|
||||
char *user, const char *database)
|
||||
{
|
||||
ConnectionHashKey key;
|
||||
ConnectionHashEntry *entry = NULL;
|
||||
MultiConnection *connection;
|
||||
MemoryContext oldContext;
|
||||
bool found;
|
||||
|
||||
/* do some minimal input checks */
|
||||
strlcpy(key.hostname, hostname, MAX_NODE_LENGTH);
|
||||
if (strlen(hostname) > MAX_NODE_LENGTH)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("hostname exceeds the maximum length of %d",
|
||||
MAX_NODE_LENGTH)));
|
||||
}
|
||||
|
||||
key.port = port;
|
||||
if (user)
|
||||
{
|
||||
strlcpy(key.user, user, NAMEDATALEN);
|
||||
}
|
||||
else
|
||||
{
|
||||
strlcpy(key.user, CurrentUserName(), NAMEDATALEN);
|
||||
}
|
||||
if (database)
|
||||
{
|
||||
strlcpy(key.database, database, NAMEDATALEN);
|
||||
}
|
||||
else
|
||||
{
|
||||
strlcpy(key.database, get_database_name(MyDatabaseId), NAMEDATALEN);
|
||||
}
|
||||
|
||||
if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
|
||||
{
|
||||
CurrentCoordinatedTransactionState = COORD_TRANS_IDLE;
|
||||
}
|
||||
|
||||
/*
|
||||
* Lookup relevant hash entry. We always enter. If only a cached
|
||||
* connection is desired, and there's none, we'll simply leave the
|
||||
* connection list empty.
|
||||
*/
|
||||
|
||||
entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
|
||||
if (!found)
|
||||
{
|
||||
entry->connections = MemoryContextAlloc(ConnectionContext,
|
||||
sizeof(dlist_head));
|
||||
dlist_init(entry->connections);
|
||||
}
|
||||
|
||||
/* if desired, check whether there's a usable connection */
|
||||
if (!(flags & FORCE_NEW_CONNECTION))
|
||||
{
|
||||
/* check connection cache for a connection that's not already in use */
|
||||
connection = FindAvailableConnection(entry->connections, flags);
|
||||
if (connection)
|
||||
{
|
||||
if (flags & SESSION_LIFESPAN)
|
||||
{
|
||||
connection->sessionLifespan = true;
|
||||
}
|
||||
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Either no caching desired, or no pre-established, non-claimed,
|
||||
* connection present. Initiate connection establishment.
|
||||
*/
|
||||
connection = StartConnectionEstablishment(&key);
|
||||
|
||||
oldContext = MemoryContextSwitchTo(ConnectionContext);
|
||||
dlist_push_tail(entry->connections, &connection->connectionNode);
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
if (flags & SESSION_LIFESPAN)
|
||||
{
|
||||
connection->sessionLifespan = true;
|
||||
}
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
||||
/* StartNodeUserDatabaseConnection() helper */
|
||||
static MultiConnection *
|
||||
FindAvailableConnection(dlist_head *connections, uint32 flags)
|
||||
{
|
||||
dlist_iter iter;
|
||||
|
||||
dlist_foreach(iter, connections)
|
||||
{
|
||||
MultiConnection *connection =
|
||||
dlist_container(MultiConnection, connectionNode, iter.cur);
|
||||
|
||||
/* don't return claimed connections */
|
||||
if (connection->claimedExclusively)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Return MultiConnection associated with the libpq connection.
|
||||
*
|
||||
* Note that this is comparatively expensive. Should only be used for
|
||||
* backward-compatibility purposes.
|
||||
*/
|
||||
MultiConnection *
|
||||
GetConnectionFromPGconn(struct pg_conn *pqConn)
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
ConnectionHashEntry *entry;
|
||||
|
||||
hash_seq_init(&status, ConnectionHash);
|
||||
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
|
||||
{
|
||||
dlist_head *connections = entry->connections;
|
||||
dlist_iter iter;
|
||||
|
||||
/* check connection cache for a connection that's not already in use */
|
||||
dlist_foreach(iter, connections)
|
||||
{
|
||||
MultiConnection *connection =
|
||||
dlist_container(MultiConnection, connectionNode, iter.cur);
|
||||
|
||||
if (connection->pgConn == pqConn)
|
||||
{
|
||||
hash_seq_term(&status);
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Close a previously established connection.
|
||||
*/
|
||||
void
|
||||
CloseConnection(MultiConnection *connection)
|
||||
{
|
||||
ConnectionHashKey key;
|
||||
bool found;
|
||||
|
||||
/* close connection */
|
||||
PQfinish(connection->pgConn);
|
||||
connection->pgConn = NULL;
|
||||
|
||||
strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH);
|
||||
key.port = connection->port;
|
||||
strlcpy(key.user, connection->user, NAMEDATALEN);
|
||||
strlcpy(key.database, connection->database, NAMEDATALEN);
|
||||
|
||||
hash_search(ConnectionHash, &key, HASH_FIND, &found);
|
||||
|
||||
if (found)
|
||||
{
|
||||
/* unlink from list */
|
||||
dlist_delete(&connection->connectionNode);
|
||||
|
||||
/* we leave the per-host entry alive */
|
||||
pfree(connection);
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errmsg("closing untracked connection")));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Close a previously established connection.
|
||||
*
|
||||
* This function closes the MultiConnection associatated with the libpq
|
||||
* connection.
|
||||
*
|
||||
* Note that this is comparatively expensive. Should only be used for
|
||||
* backward-compatibility purposes.
|
||||
*/
|
||||
void
|
||||
CloseConnectionByPGconn(PGconn *pqConn)
|
||||
{
|
||||
MultiConnection *connection = GetConnectionFromPGconn(pqConn);
|
||||
|
||||
if (connection)
|
||||
{
|
||||
CloseConnection(connection);
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(WARNING, (errmsg("could not find connection to close")));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Synchronously finish connection establishment of an individual connection.
|
||||
*
|
||||
* TODO: Replace with variant waiting for multiple connections.
|
||||
*/
|
||||
void
|
||||
FinishConnectionEstablishment(MultiConnection *connection)
|
||||
{
|
||||
static int checkIntervalMS = 200;
|
||||
|
||||
/*
|
||||
* Loop until connection is established, or failed (possibly just timed
|
||||
* out).
|
||||
*/
|
||||
while (true)
|
||||
{
|
||||
ConnStatusType status = PQstatus(connection->pgConn);
|
||||
PostgresPollingStatusType pollmode;
|
||||
|
||||
if (status == CONNECTION_OK)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/* FIXME: retries? */
|
||||
if (status == CONNECTION_BAD)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
pollmode = PQconnectPoll(connection->pgConn);
|
||||
|
||||
/*
|
||||
* FIXME: Do we want to add transparent retry support here?
|
||||
*/
|
||||
if (pollmode == PGRES_POLLING_FAILED)
|
||||
{
|
||||
return;
|
||||
}
|
||||
else if (pollmode == PGRES_POLLING_OK)
|
||||
{
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert(pollmode == PGRES_POLLING_WRITING ||
|
||||
pollmode == PGRES_POLLING_READING);
|
||||
}
|
||||
|
||||
/* Loop, to handle poll() being interrupted by signals (EINTR) */
|
||||
while (true)
|
||||
{
|
||||
struct pollfd pollFileDescriptor;
|
||||
int pollResult = 0;
|
||||
|
||||
pollFileDescriptor.fd = PQsocket(connection->pgConn);
|
||||
if (pollmode == PGRES_POLLING_READING)
|
||||
{
|
||||
pollFileDescriptor.events = POLLIN;
|
||||
}
|
||||
else
|
||||
{
|
||||
pollFileDescriptor.events = POLLOUT;
|
||||
}
|
||||
pollFileDescriptor.revents = 0;
|
||||
|
||||
/*
|
||||
* Only sleep for a limited amount of time, so we can react to
|
||||
* interrupts in time, even if the platform doesn't interrupt
|
||||
* poll() after signal arrival.
|
||||
*/
|
||||
pollResult = poll(&pollFileDescriptor, 1, checkIntervalMS);
|
||||
|
||||
if (pollResult == 0)
|
||||
{
|
||||
/*
|
||||
* Timeout exceeded. Two things to do:
|
||||
* - check whether any interrupts arrived and handle them
|
||||
* - check whether establishment for connection already has
|
||||
* lasted for too long, stop waiting if so.
|
||||
*/
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
if (TimestampDifferenceExceeds(connection->connectionStart,
|
||||
GetCurrentTimestamp(),
|
||||
CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000))
|
||||
{
|
||||
ereport(WARNING, (errmsg("could not establish connection after %u ms",
|
||||
CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000)));
|
||||
|
||||
/* close connection, otherwise we take up resource on the other side */
|
||||
PQfinish(connection->pgConn);
|
||||
connection->pgConn = NULL;
|
||||
break;
|
||||
}
|
||||
}
|
||||
else if (pollResult > 0)
|
||||
{
|
||||
/*
|
||||
* IO possible, continue connection establishment. We could
|
||||
* check for timeouts here as well, but if there's progress
|
||||
* there seems little point.
|
||||
*/
|
||||
break;
|
||||
}
|
||||
else if (pollResult != EINTR)
|
||||
{
|
||||
/* Retrying, signal interrupted. So check. */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* We ERROR here, instead of just returning a failed
|
||||
* connection, because this shouldn't happen, and indicates a
|
||||
* programming error somewhere, not a network etc. issue.
|
||||
*/
|
||||
ereport(ERROR, (errcode_for_socket_access(),
|
||||
errmsg("poll() failed: %m")));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ClaimConnectionExclusively signals that this connection is actively being
|
||||
* used. That means it'll not be, again, returned by
|
||||
* StartNodeUserDatabaseConnection() et al until releases with
|
||||
* UnclaimConnection().
|
||||
*/
|
||||
void
|
||||
ClaimConnectionExclusively(MultiConnection *connection)
|
||||
{
|
||||
Assert(!connection->claimedExclusively);
|
||||
connection->claimedExclusively = true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UnclaimConnection signals that this connection is not being used
|
||||
* anymore. That means it again may be returned by
|
||||
* StartNodeUserDatabaseConnection() et al.
|
||||
*/
|
||||
void
|
||||
UnclaimConnection(MultiConnection *connection)
|
||||
{
|
||||
connection->claimedExclusively = false;
|
||||
}
|
||||
|
||||
|
||||
static uint32
|
||||
ConnectionHashHash(const void *key, Size keysize)
|
||||
{
|
||||
ConnectionHashKey *entry = (ConnectionHashKey *) key;
|
||||
uint32 hash = 0;
|
||||
|
||||
hash = string_hash(entry->hostname, NAMEDATALEN);
|
||||
hash = hash_combine(hash, hash_uint32(entry->port));
|
||||
hash = hash_combine(hash, string_hash(entry->user, NAMEDATALEN));
|
||||
hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN));
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
ConnectionHashCompare(const void *a, const void *b, Size keysize)
|
||||
{
|
||||
ConnectionHashKey *ca = (ConnectionHashKey *) a;
|
||||
ConnectionHashKey *cb = (ConnectionHashKey *) b;
|
||||
|
||||
if (strncmp(ca->hostname, cb->hostname, NAMEDATALEN) != 0 ||
|
||||
ca->port != cb->port ||
|
||||
strncmp(ca->user, cb->user, NAMEDATALEN) != 0 ||
|
||||
strncmp(ca->database, cb->database, NAMEDATALEN) != 0)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Asynchronously establish connection to a remote node, but don't wait for
|
||||
* that to finish. DNS lookups etc. are performed synchronously though.
|
||||
*/
|
||||
static MultiConnection *
|
||||
StartConnectionEstablishment(ConnectionHashKey *key)
|
||||
{
|
||||
char nodePortString[12];
|
||||
const char *clientEncoding = GetDatabaseEncodingName();
|
||||
MultiConnection *connection = NULL;
|
||||
|
||||
const char *keywords[] = {
|
||||
"host", "port", "dbname", "user",
|
||||
"client_encoding", "fallback_application_name",
|
||||
NULL
|
||||
};
|
||||
const char *values[] = {
|
||||
key->hostname, nodePortString, key->database, key->user,
|
||||
clientEncoding, "citus", NULL
|
||||
};
|
||||
|
||||
connection = MemoryContextAllocZero(ConnectionContext, sizeof(MultiConnection));
|
||||
sprintf(nodePortString, "%d", key->port);
|
||||
|
||||
strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH);
|
||||
connection->port = key->port;
|
||||
strlcpy(connection->database, key->database, NAMEDATALEN);
|
||||
strlcpy(connection->user, key->user, NAMEDATALEN);
|
||||
|
||||
connection->pgConn = PQconnectStartParams(keywords, values, false);
|
||||
connection->connectionStart = GetCurrentTimestamp();
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Close all remote connections if necessary anymore (i.e. not session
|
||||
* lifetime), or if in a failed state.
|
||||
*/
|
||||
static void
|
||||
AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
||||
{
|
||||
dlist_mutable_iter iter;
|
||||
|
||||
dlist_foreach_modify(iter, entry->connections)
|
||||
{
|
||||
MultiConnection *connection =
|
||||
dlist_container(MultiConnection, connectionNode, iter.cur);
|
||||
|
||||
/*
|
||||
* To avoid code leaking connections we warn if connections are
|
||||
* still claimed exclusively. We can only do so if the transaction
|
||||
* committed, as it's normal that code didn't have chance to clean
|
||||
* up after errors.
|
||||
*/
|
||||
if (isCommit && connection->claimedExclusively)
|
||||
{
|
||||
ereport(WARNING,
|
||||
(errmsg("connection claimed exclusively at transaction commit")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Preserve session lifespan connections if they are still healthy.
|
||||
*/
|
||||
if (!connection->sessionLifespan ||
|
||||
PQstatus(connection->pgConn) != CONNECTION_OK ||
|
||||
PQtransactionStatus(connection->pgConn) != PQTRANS_IDLE)
|
||||
{
|
||||
PQfinish(connection->pgConn);
|
||||
connection->pgConn = NULL;
|
||||
|
||||
/* unlink from list */
|
||||
dlist_delete(iter.cur);
|
||||
|
||||
pfree(connection);
|
||||
}
|
||||
else
|
||||
{
|
||||
UnclaimConnection(connection);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@
|
|||
#include "commands/dbcommands.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/connection_cache.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/multi_client_executor.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/commit_protocol.h"
|
||||
#include "distributed/connection_cache.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
|
@ -39,6 +40,7 @@
|
|||
#include "distributed/multi_router_executor.h"
|
||||
#include "distributed/multi_shard_transaction.h"
|
||||
#include "distributed/relay_utility.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "executor/execdesc.h"
|
||||
#include "executor/executor.h"
|
||||
|
@ -1797,7 +1799,6 @@ RouterTransactionCallback(XactEvent event, void *arg)
|
|||
}
|
||||
|
||||
/* reset transaction state */
|
||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||
xactParticipantHash = NULL;
|
||||
xactShardConnSetList = NIL;
|
||||
subXactAbortAttempted = false;
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "executor/executor.h"
|
||||
#include "distributed/citus_nodefuncs.h"
|
||||
#include "distributed/commit_protocol.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
|
@ -32,7 +33,9 @@
|
|||
#include "distributed/multi_server_executor.h"
|
||||
#include "distributed/multi_shard_transaction.h"
|
||||
#include "distributed/multi_utility.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/task_tracker.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
|
@ -154,6 +157,10 @@ _PG_init(void)
|
|||
/* organize that task tracker is started once server is up */
|
||||
TaskTrackerRegister();
|
||||
|
||||
/* initialize coordinated transaction management */
|
||||
InitializeTransactionManagement();
|
||||
InitializeConnectionManagement();
|
||||
|
||||
/* initialize transaction callbacks */
|
||||
RegisterRouterExecutorXactCallbacks();
|
||||
RegisterShardPlacementXactCallbacks();
|
||||
|
|
|
@ -26,10 +26,6 @@
|
|||
static uint32 DistributedTransactionId = 0;
|
||||
|
||||
|
||||
/* the commit protocol to use for COPY commands */
|
||||
int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC;
|
||||
|
||||
|
||||
/*
|
||||
* InitializeDistributedTransaction prepares the distributed transaction ID
|
||||
* used in transaction names.
|
||||
|
|
|
@ -332,7 +332,6 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg)
|
|||
|
||||
CloseConnections(connectionList);
|
||||
shardConnectionHash = NULL;
|
||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||
subXactAbortAttempted = false;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,144 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* transaction_management.c
|
||||
*
|
||||
* Transaction management for Citus. Most of the work is delegated to other
|
||||
* subsystems, this files, and especially CoordinatedTransactionCallback,
|
||||
* coordinates the work between them.
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "utils/hsearch.h"
|
||||
|
||||
|
||||
CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
||||
|
||||
/* GUC, the commit protocol to use for commands affecting more than one connection */
|
||||
int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC;
|
||||
|
||||
/* state needed to keep track of operations used during a transaction */
|
||||
XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||
|
||||
|
||||
static bool subXactAbortAttempted = false;
|
||||
|
||||
|
||||
/* transaction management functions */
|
||||
static void CoordinatedTransactionCallback(XactEvent event, void *arg);
|
||||
static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
|
||||
SubTransactionId parentSubid, void *arg);
|
||||
|
||||
|
||||
void
|
||||
InitializeTransactionManagement(void)
|
||||
{
|
||||
/* hook into transaction machinery */
|
||||
RegisterXactCallback(CoordinatedTransactionCallback, NULL);
|
||||
RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Transaction management callback, handling coordinated transaction, and
|
||||
* transaction independent connection management.
|
||||
*
|
||||
* NB: There should only ever be a single transaction callback in citus, the
|
||||
* ordering between the callbacks and thee actions within those callbacks
|
||||
* otherwise becomes too undeterministic / hard to reason about.
|
||||
*/
|
||||
static void
|
||||
CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||
{
|
||||
switch (event)
|
||||
{
|
||||
case XACT_EVENT_COMMIT:
|
||||
{
|
||||
/* close connections etc. */
|
||||
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
|
||||
{
|
||||
AfterXactConnectionHandling(true);
|
||||
}
|
||||
|
||||
Assert(!subXactAbortAttempted);
|
||||
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||
}
|
||||
break;
|
||||
|
||||
case XACT_EVENT_ABORT:
|
||||
{
|
||||
/* close connections etc. */
|
||||
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
|
||||
{
|
||||
AfterXactConnectionHandling(false);
|
||||
}
|
||||
|
||||
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||
subXactAbortAttempted = false;
|
||||
}
|
||||
break;
|
||||
|
||||
case XACT_EVENT_PARALLEL_COMMIT:
|
||||
case XACT_EVENT_PARALLEL_ABORT:
|
||||
case XACT_EVENT_PREPARE:
|
||||
{ }
|
||||
break;
|
||||
|
||||
case XACT_EVENT_PRE_COMMIT:
|
||||
{
|
||||
if (subXactAbortAttempted)
|
||||
{
|
||||
subXactAbortAttempted = false;
|
||||
|
||||
if (XactModificationLevel != XACT_MODIFICATION_NONE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot ROLLBACK TO SAVEPOINT in transactions "
|
||||
"which modify distributed tables")));
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case XACT_EVENT_PARALLEL_PRE_COMMIT:
|
||||
case XACT_EVENT_PRE_PREPARE:
|
||||
{
|
||||
if (CurrentCoordinatedTransactionState > COORD_TRANS_NONE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot use 2PC in transactions involving "
|
||||
"multiple servers")));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Subtransaction callback - currently only used to remember whether a
|
||||
* savepoint has been rolled back, as we don't support that.
|
||||
*/
|
||||
static void
|
||||
CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
|
||||
SubTransactionId parentSubid, void *arg)
|
||||
{
|
||||
if (event == SUBXACT_EVENT_ABORT_SUB)
|
||||
{
|
||||
subXactAbortAttempted = true;
|
||||
}
|
||||
}
|
|
@ -29,10 +29,6 @@
|
|||
#include "utils/memutils.h"
|
||||
#include "utils/palloc.h"
|
||||
|
||||
|
||||
/* state needed to keep track of operations used during a transaction */
|
||||
XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||
|
||||
/*
|
||||
* NodeConnectionHash is the connection hash itself. It begins uninitialized.
|
||||
* The first call to GetOrEstablishConnection triggers hash creation.
|
||||
|
|
|
@ -14,18 +14,12 @@
|
|||
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "libpq-fe.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "nodes/pg_list.h"
|
||||
|
||||
|
||||
/* Enumeration that defines the different commit protocols available */
|
||||
typedef enum
|
||||
{
|
||||
COMMIT_PROTOCOL_1PC = 0,
|
||||
COMMIT_PROTOCOL_2PC = 1
|
||||
} CommitProtocolType;
|
||||
|
||||
/* Enumeration that defines different remote transaction states */
|
||||
typedef enum
|
||||
{
|
||||
|
@ -51,10 +45,6 @@ typedef struct TransactionConnection
|
|||
} TransactionConnection;
|
||||
|
||||
|
||||
/* config variable managed via guc.c */
|
||||
extern int MultiShardCommitProtocol;
|
||||
|
||||
|
||||
/* Functions declarations for transaction and connection management */
|
||||
extern void InitializeDistributedTransaction(void);
|
||||
extern void PrepareRemoteTransactions(List *connectionList);
|
||||
|
|
|
@ -52,20 +52,6 @@ typedef struct NodeConnectionEntry
|
|||
} NodeConnectionEntry;
|
||||
|
||||
|
||||
/* describes what kind of modifications have occurred in the current transaction */
|
||||
typedef enum
|
||||
{
|
||||
XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */
|
||||
XACT_MODIFICATION_NONE, /* no modifications have taken place */
|
||||
XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */
|
||||
XACT_MODIFICATION_MULTI_SHARD /* multi-shard modifications have occurred */
|
||||
} XactModificationType;
|
||||
|
||||
|
||||
/* state needed to prevent new connections during modifying transactions */
|
||||
extern XactModificationType XactModificationLevel;
|
||||
|
||||
|
||||
/* function declarations for obtaining and using a connection */
|
||||
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
||||
extern void PurgeConnection(PGconn *connection);
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* connection_management.h
|
||||
* Central management of connections and their life-cycle
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef CONNECTION_MANAGMENT_H
|
||||
#define CONNECTION_MANAGMENT_H
|
||||
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "lib/ilist.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/timestamp.h"
|
||||
|
||||
/* maximum (textual) lengths of hostname and port */
|
||||
#define MAX_NODE_LENGTH 255 /* includes 0 byte */
|
||||
|
||||
#define CLIENT_CONNECT_TIMEOUT_SECONDS_INT 5
|
||||
|
||||
/* forward declare, to avoid forcing large headers on everyone */
|
||||
struct pg_conn; /* target of the PGconn typedef */
|
||||
struct MemoryContextData;
|
||||
|
||||
/*
|
||||
* Flags determining connection establishment behaviour.
|
||||
*/
|
||||
enum MultiConnectionMode
|
||||
{
|
||||
/* force establishment of a new connection */
|
||||
FORCE_NEW_CONNECTION = 1 << 0,
|
||||
|
||||
/* mark returned connection as having session lifespan */
|
||||
SESSION_LIFESPAN = 1 << 1
|
||||
};
|
||||
|
||||
|
||||
/* declaring this directly above makes uncrustify go crazy */
|
||||
typedef enum MultiConnectionMode MultiConnectionMode;
|
||||
|
||||
typedef struct MultiConnection
|
||||
{
|
||||
/* connection details, useful for error messages and such. */
|
||||
char hostname[MAX_NODE_LENGTH];
|
||||
int32 port;
|
||||
char user[NAMEDATALEN];
|
||||
char database[NAMEDATALEN];
|
||||
|
||||
/* underlying libpq connection */
|
||||
struct pg_conn *pgConn;
|
||||
|
||||
/* is the connection intended to be kept after transaction end */
|
||||
bool sessionLifespan;
|
||||
|
||||
/* is the connection currently in use, and shouldn't be used by anything else */
|
||||
bool claimedExclusively;
|
||||
|
||||
/* time connection establishment was started, for timeout */
|
||||
TimestampTz connectionStart;
|
||||
|
||||
/* membership in list of list of connections in ConnectionHashEntry */
|
||||
dlist_node connectionNode;
|
||||
} MultiConnection;
|
||||
|
||||
|
||||
/*
|
||||
* Central connection management hash, mapping (host, port, user, database) to
|
||||
* a list of connections.
|
||||
*
|
||||
* This hash is used to keep track of which connections are open to which
|
||||
* node. Besides allowing connection reuse, that information is e.g. used to
|
||||
* handle closing connections after the end of a transaction.
|
||||
*/
|
||||
|
||||
/* hash key */
|
||||
typedef struct ConnectionHashKey
|
||||
{
|
||||
char hostname[MAX_NODE_LENGTH];
|
||||
int32 port;
|
||||
char user[NAMEDATALEN];
|
||||
char database[NAMEDATALEN];
|
||||
} ConnectionHashKey;
|
||||
|
||||
/* hash entry */
|
||||
typedef struct ConnectionHashEntry
|
||||
{
|
||||
ConnectionHashKey key;
|
||||
dlist_head *connections;
|
||||
} ConnectionHashEntry;
|
||||
|
||||
/* the hash table */
|
||||
extern HTAB *ConnectionHash;
|
||||
|
||||
/* context for all connection and transaction related memory */
|
||||
extern struct MemoryContextData *ConnectionContext;
|
||||
|
||||
|
||||
extern void AfterXactConnectionHandling(bool isCommit);
|
||||
extern void InitializeConnectionManagement(void);
|
||||
|
||||
|
||||
/* Low-level connection establishment APIs */
|
||||
extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname,
|
||||
int32 port);
|
||||
extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname,
|
||||
int32 port);
|
||||
extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname,
|
||||
int32 port, const char *user, const
|
||||
char *database);
|
||||
extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
|
||||
const char *hostname,
|
||||
int32 port,
|
||||
const char *user,
|
||||
const char *database);
|
||||
extern MultiConnection * GetConnectionFromPGconn(struct pg_conn *pqConn);
|
||||
extern void CloseConnection(MultiConnection *connection);
|
||||
extern void CloseConnectionByPGconn(struct pg_conn *pqConn);
|
||||
|
||||
/* dealing with a connection */
|
||||
extern void FinishConnectionEstablishment(MultiConnection *connection);
|
||||
extern void ClaimConnectionExclusively(MultiConnection *connection);
|
||||
extern void UnclaimConnection(MultiConnection *connection);
|
||||
|
||||
|
||||
#endif /* CONNECTION_MANAGMENT_H */
|
|
@ -0,0 +1,57 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
* transaction_management.h
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef TRANSACTION_MANAGMENT_H
|
||||
#define TRANSACTION_MANAGMENT_H
|
||||
|
||||
/* describes what kind of modifications have occurred in the current transaction */
|
||||
typedef enum
|
||||
{
|
||||
XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */
|
||||
XACT_MODIFICATION_NONE, /* no modifications have taken place */
|
||||
XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */
|
||||
XACT_MODIFICATION_MULTI_SHARD /* multi-shard modifications have occurred */
|
||||
} XactModificationType;
|
||||
|
||||
|
||||
/*
|
||||
* Enum defining the state of a coordinated (i.e. a transaction potentially
|
||||
* spanning several nodes).
|
||||
*/
|
||||
typedef enum CoordinatedTransactionState
|
||||
{
|
||||
/* no coordinated transaction in progress, no connections established */
|
||||
COORD_TRANS_NONE,
|
||||
|
||||
/* no coordinated transaction in progress, but connections established */
|
||||
COORD_TRANS_IDLE
|
||||
} CoordinatedTransactionState;
|
||||
|
||||
|
||||
/* Enumeration that defines the different commit protocols available */
|
||||
typedef enum
|
||||
{
|
||||
COMMIT_PROTOCOL_1PC = 0,
|
||||
COMMIT_PROTOCOL_2PC = 1
|
||||
} CommitProtocolType;
|
||||
|
||||
/* config variable managed via guc.c */
|
||||
extern int MultiShardCommitProtocol;
|
||||
|
||||
/* state needed to prevent new connections during modifying transactions */
|
||||
extern XactModificationType XactModificationLevel;
|
||||
|
||||
|
||||
extern CoordinatedTransactionState CurrentCoordinatedTransactionState;
|
||||
|
||||
|
||||
/* initialization function(s) */
|
||||
extern void InitializeTransactionManagement(void);
|
||||
|
||||
|
||||
#endif /* TRANSACTION_MANAGMENT_H */
|
Loading…
Reference in New Issue