From 6826022ad9e3acd6abbd95dd1f17c49d73ca33b0 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 7 Oct 2016 14:48:25 -0700 Subject: [PATCH] Add beginning of new and central connection lifetime management. This is intended to eventually replace multi_client_executor.c and connection_cache.c, and to provide the basis of a centralized transaction management. The newly introduced transaction hook should, in the future, be the only one in citus, to allow for proper ordering between operations. For now this central handler is responsible for releasing connections and resetting XactModificationLevel after a transaction. --- .../executor/multi_client_executor.c | 1 + .../executor/multi_router_executor.c | 3 +- src/backend/distributed/shared_library_init.c | 7 + .../distributed/transaction/commit_protocol.c | 4 - .../transaction/multi_shard_transaction.c | 1 - .../transaction/transaction_management.c | 145 +++++ .../distributed/utils/connection_cache.c | 35 - .../distributed/utils/connection_management.c | 604 ++++++++++++++++++ src/include/distributed/commit_protocol.h | 12 +- src/include/distributed/connection_cache.h | 15 - .../distributed/connection_management.h | 140 ++++ .../distributed/transaction_management.h | 59 ++ 12 files changed, 959 insertions(+), 67 deletions(-) create mode 100644 src/backend/distributed/transaction/transaction_management.c create mode 100644 src/backend/distributed/utils/connection_management.c create mode 100644 src/include/distributed/connection_management.h create mode 100644 src/include/distributed/transaction_management.h diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index da852d5c1..50b25a6cf 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -20,6 +20,7 @@ #include "commands/dbcommands.h" #include "distributed/metadata_cache.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_server_executor.h" diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index aee7e09a5..d51d047a4 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -30,6 +30,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" @@ -39,6 +40,7 @@ #include "distributed/multi_router_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/relay_utility.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -1797,7 +1799,6 @@ RouterTransactionCallback(XactEvent event, void *arg) } /* reset transaction state */ - XactModificationLevel = XACT_MODIFICATION_NONE; xactParticipantHash = NULL; xactShardConnSetList = NIL; subXactAbortAttempted = false; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 7e1ffa904..91f609d35 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -20,6 +20,7 @@ #include "executor/executor.h" #include "distributed/citus_nodefuncs.h" #include "distributed/commit_protocol.h" +#include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/multi_copy.h" #include "distributed/multi_executor.h" @@ -32,7 +33,9 @@ #include "distributed/multi_server_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/multi_utility.h" +#include "distributed/remote_commands.h" #include "distributed/task_tracker.h" +#include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "postmaster/postmaster.h" @@ -154,6 +157,10 @@ _PG_init(void) /* organize that task tracker is started once server is up */ TaskTrackerRegister(); + /* initialize coordinated transaction management */ + InitializeTransactionManagement(); + InitializeConnectionManagement(); + /* initialize transaction callbacks */ RegisterRouterExecutorXactCallbacks(); RegisterShardPlacementXactCallbacks(); diff --git a/src/backend/distributed/transaction/commit_protocol.c b/src/backend/distributed/transaction/commit_protocol.c index 1c5ac7417..dd81d6f5c 100644 --- a/src/backend/distributed/transaction/commit_protocol.c +++ b/src/backend/distributed/transaction/commit_protocol.c @@ -26,10 +26,6 @@ static uint32 DistributedTransactionId = 0; -/* the commit protocol to use for COPY commands */ -int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC; - - /* * InitializeDistributedTransaction prepares the distributed transaction ID * used in transaction names. diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index 3829746ba..c94b77abc 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -332,7 +332,6 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg) CloseConnections(connectionList); shardConnectionHash = NULL; - XactModificationLevel = XACT_MODIFICATION_NONE; subXactAbortAttempted = false; } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c new file mode 100644 index 000000000..24ca5540c --- /dev/null +++ b/src/backend/distributed/transaction/transaction_management.c @@ -0,0 +1,145 @@ +/*------------------------------------------------------------------------- + * + * transaction_management.c + * + * Transaction management for Citus. Most of the work is delegated to other + * subsystems, this files, and especially CoordinatedTransactionCallback, + * coordinates the work between them. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "libpq-fe.h" + +#include "miscadmin.h" + +#include "access/xact.h" +#include "distributed/connection_management.h" +#include "distributed/hash_helpers.h" +#include "distributed/transaction_management.h" +#include "utils/hsearch.h" + + +CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE; + +/* GUC, the commit protocol to use for commands affecting more than one connection */ +int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC; + +/* state needed to keep track of operations used during a transaction */ +XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; + + +static bool subXactAbortAttempted = false; + + +/* transaction management functions */ +static void CoordinatedTransactionCallback(XactEvent event, void *arg); +static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, + SubTransactionId parentSubid, void *arg); + + +void +InitializeTransactionManagement(void) +{ + /* hook into transaction machinery */ + RegisterXactCallback(CoordinatedTransactionCallback, NULL); + RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL); +} + + +/* + * Transaction management callback, handling coordinated transaction, and + * transaction independent connection management. + * + * NB: There should only ever be a single transaction callback in citus, the + * ordering between the callbacks and thee actions within those callbacks + * otherwise becomes too undeterministic / hard to reason about. + */ +static void +CoordinatedTransactionCallback(XactEvent event, void *arg) +{ + switch (event) + { + case XACT_EVENT_COMMIT: + { + /* close connections etc. */ + if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) + { + AfterXactConnectionHandling(true); + } + + Assert(!subXactAbortAttempted); + CurrentCoordinatedTransactionState = COORD_TRANS_NONE; + XactModificationLevel = XACT_MODIFICATION_NONE; + } + break; + + case XACT_EVENT_ABORT: + { + /* close connections etc. */ + if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) + { + AfterXactConnectionHandling(false); + } + + CurrentCoordinatedTransactionState = COORD_TRANS_NONE; + XactModificationLevel = XACT_MODIFICATION_NONE; + subXactAbortAttempted = false; + } + break; + + case XACT_EVENT_PARALLEL_COMMIT: + case XACT_EVENT_PARALLEL_ABORT: + case XACT_EVENT_PREPARE: + { } + break; + + case XACT_EVENT_PRE_COMMIT: + { + if (subXactAbortAttempted) + { + subXactAbortAttempted = false; + + if (XactModificationLevel != XACT_MODIFICATION_NONE) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot ROLLBACK TO SAVEPOINT in transactions " + "which modify distributed tables"))); + } + } + } + break; + + case XACT_EVENT_PARALLEL_PRE_COMMIT: + case XACT_EVENT_PRE_PREPARE: + { + /* + * FIXME: do we want to support this? Or error out? Might be + * annoying to error out as it could prevent experimentation. If + * we error out, we should only do so if a coordinated transaction + * has been started, so independent 2PC usage doesn't cause + * errors. + */ + } + break; + } +} + + +/* + * Subtransaction callback - currently only used to remember whether a + * savepoint has been rolled back, as we don't support that. + */ +static void +CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, + SubTransactionId parentSubid, void *arg) +{ + if (event == SUBXACT_EVENT_ABORT_SUB) + { + subXactAbortAttempted = true; + } +} diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index 28f97a8a7..db252d8c6 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -29,10 +29,6 @@ #include "utils/memutils.h" #include "utils/palloc.h" - -/* state needed to keep track of operations used during a transaction */ -XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; - /* * NodeConnectionHash is the connection hash itself. It begins uninitialized. * The first call to GetOrEstablishConnection triggers hash creation. @@ -197,37 +193,6 @@ PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey) } -/* - * SqlStateMatchesCategory returns true if the given sql state (which may be - * NULL if unknown) is in the given error category. Note that we use - * ERRCODE_TO_CATEGORY macro to determine error category of the sql state and - * expect the caller to use the same macro for the error category. - */ -bool -SqlStateMatchesCategory(char *sqlStateString, int category) -{ - bool sqlStateMatchesCategory = false; - int sqlState = 0; - int sqlStateCategory = 0; - - if (sqlStateString == NULL) - { - return false; - } - - sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2], - sqlStateString[3], sqlStateString[4]); - - sqlStateCategory = ERRCODE_TO_CATEGORY(sqlState); - if (sqlStateCategory == category) - { - sqlStateMatchesCategory = true; - } - - return sqlStateMatchesCategory; -} - - /* * WarnRemoteError retrieves error fields from a remote result and produces an * error report at the WARNING level after amending the error with a CONTEXT diff --git a/src/backend/distributed/utils/connection_management.c b/src/backend/distributed/utils/connection_management.c new file mode 100644 index 000000000..852d747dc --- /dev/null +++ b/src/backend/distributed/utils/connection_management.c @@ -0,0 +1,604 @@ +/*------------------------------------------------------------------------- + * + * connection_management.c + * Central management of connections and their life-cycle + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#ifdef HAVE_POLL_H +#include +#endif + +#include "libpq-fe.h" + +#include "miscadmin.h" + +#include "access/hash.h" +#include "commands/dbcommands.h" +#include "distributed/connection_management.h" +#include "distributed/metadata_cache.h" +#include "distributed/hash_helpers.h" +#include "mb/pg_wchar.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" + + +HTAB *ConnectionHash = NULL; +MemoryContext ConnectionContext = NULL; + +static uint32 ConnectionHashHash(const void *key, Size keysize); +static int ConnectionHashCompare(const void *a, const void *b, Size keysize); +static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); +static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); + + +/* + * Initialize per-backend connection management infrastructure. + */ +void +InitializeConnectionManagement(void) +{ + HASHCTL info; + uint32 hashFlags = 0; + + + /* + * Create a single context for connection and transaction related memory + * management. Doing so, instead of allocating in TopMemoryContext, makes + * it easier to associate used memory. + */ + ConnectionContext = AllocSetContextCreate(TopMemoryContext, "Connection Context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* create (host,port,user,database) -> [connection] hash */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ConnectionHashKey); + info.entrysize = sizeof(ConnectionHashEntry); + info.hash = ConnectionHashHash; + info.match = ConnectionHashCompare; + info.hcxt = ConnectionContext; + hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE); + + ConnectionHash = hash_create("citus connection cache (host,port,user,database)", + 64, &info, hashFlags); +} + + +/* + * Perform connection management activity after the end of a transaction. Both + * COMMIT and ABORT paths are handled here. + * + * This is called by Citus' global transaction callback. + */ +void +AfterXactConnectionHandling(bool isCommit) +{ + HASH_SEQ_STATUS status; + ConnectionHashEntry *entry; + + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + AfterXactHostConnectionHandling(entry, isCommit); + + /* + * NB: We leave the hash entry in place, even if there's no individual + * connections in it anymore. There seems no benefit in deleting it, + * and it'll save a bit of work in the next transaction. + */ + } +} + + +/* + * GetNodeConnection() establishes a connection to remote node, using default + * user and database. + * + * See StartNodeUserDatabaseConnection for details. + */ +MultiConnection * +GetNodeConnection(uint32 flags, const char *hostname, int32 port) +{ + return GetNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL); +} + + +/* + * StartNodeConnection initiate a connection to remote node, using default + * user and database. + * + * See StartNodeUserDatabaseConnection for details. + */ +MultiConnection * +StartNodeConnection(uint32 flags, const char *hostname, int32 port) +{ + return StartNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL); +} + + +/* + * GetNodeUserDatabaseConnection establishes connection to remote node. + * + * See StartNodeUserDatabaseConnection for details. + */ +MultiConnection * +GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const + char *user, const char *database) +{ + MultiConnection *connection; + + connection = StartNodeUserDatabaseConnection(flags, hostname, port, user, database); + + FinishConnectionEstablishment(connection); + + return connection; +} + + +/* + * StartNodeUserDatabaseConnection() initiates a connection to a remote node. + * + * If user or database are NULL, the current session's defaults are used. The + * following flags influence connection establishment behaviour: + * - NEW_CONNECTION - it is permitted to establish a new connection + * - CACHED_CONNECTION - it is permitted to re-use an established connection + * - SESSION_LIFESPAN - the connection should persist after transaction end + * - FOR_DML - only meaningful for placement associated connections + * - FOR_DDL - only meaningful for placement associated connections + * - CRITICAL_CONNECTION - transaction failures on this connection fail the entire + * coordinated transaction + * + * The returned connection has only been initiated, not fully + * established. That's useful to allow parallel connection establishment. If + * that's not desired use the Get* variant. + */ +MultiConnection * +StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const + char *user, const char *database) +{ + ConnectionHashKey key; + ConnectionHashEntry *entry = NULL; + MultiConnection *connection; + MemoryContext oldContext; + bool found; + dlist_iter iter; + + strlcpy(key.hostname, hostname, MAX_NODE_LENGTH); + key.port = port; + if (user) + { + strlcpy(key.user, user, NAMEDATALEN); + } + else + { + strlcpy(key.user, CurrentUserName(), NAMEDATALEN); + } + if (database) + { + strlcpy(key.database, database, NAMEDATALEN); + } + else + { + strlcpy(key.database, get_database_name(MyDatabaseId), NAMEDATALEN); + } + + if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE) + { + CurrentCoordinatedTransactionState = COORD_TRANS_IDLE; + } + + /* + * Lookup relevant hash entry. We always enter. If only a cached + * connection is desired, and there's none, we'll simply leave the + * connection list empty. + */ + + entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); + if (!found) + { + entry->connections = MemoryContextAlloc(ConnectionContext, + sizeof(dlist_head)); + dlist_init(entry->connections); + } + + /* if desired, check whether there's a usable connection */ + if (flags & CACHED_CONNECTION) + { + /* check connection cache for a connection that's not already in use */ + dlist_foreach(iter, entry->connections) + { + connection = dlist_container(MultiConnection, node, iter.cur); + + /* don't return claimed connections */ + if (connection->claimedExclusively) + { + continue; + } + + /* + * If we're not allowed to open new connections right now, and the + * current connection hasn't yet been used in this transaction, we + * can't use it. + */ + if (!connection->activeInTransaction && + XactModificationLevel > XACT_MODIFICATION_DATA) + { + continue; + } + + if (flags & SESSION_LIFESPAN) + { + connection->sessionLifespan = true; + } + connection->activeInTransaction = true; + + /* + * One could argue for erroring out when the connection is in a + * failed state. But that'd be a bad idea for two reasons: + * + * 1) Generally starting a connection might fail, after calling + * this function, so calling code needs to handle that anyway. + * 2) This might be used in code that transparently handles + * connection failure. + */ + return connection; + } + } + + /* no connection available, done if a new connection isn't desirable */ + if (!(flags & NEW_CONNECTION)) + { + return NULL; + } + + /* + * Check whether we're right now allowed to open new connections. + * + * FIXME: This should be removed soon, once all connections go through + * this API. + */ + if (XactModificationLevel > XACT_MODIFICATION_DATA) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot open new connections after the first modification " + "command within a transaction"))); + } + + /* + * Either no caching desired, or no pre-established, non-claimed, + * connection present. Initiate connection establishment. + */ + connection = StartConnectionEstablishment(&key); + + oldContext = MemoryContextSwitchTo(ConnectionContext); + dlist_push_tail(entry->connections, &connection->node); + + MemoryContextSwitchTo(oldContext); + + if (flags & SESSION_LIFESPAN) + { + connection->sessionLifespan = true; + } + + connection->activeInTransaction = true; + + return connection; +} + + +/* + * Close a previously established connection. + */ +void +CloseConnection(MultiConnection *connection) +{ + ConnectionHashKey key; + bool found; + + /* close connection */ + PQfinish(connection->conn); + connection->conn = NULL; + + strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH); + key.port = connection->port; + strlcpy(key.user, connection->user, NAMEDATALEN); + strlcpy(key.database, connection->database, NAMEDATALEN); + + hash_search(ConnectionHash, &key, HASH_FIND, &found); + + if (found) + { + /* unlink from list */ + dlist_delete(&connection->node); + + /* we leave the per-host entry alive */ + pfree(connection); + } + else + { + /* XXX: we could error out instead */ + ereport(WARNING, (errmsg("closing untracked connection"))); + } +} + + +/* + * Synchronously finish connection establishment of an individual connection. + * + * TODO: Replace with variant waiting for multiple connections. + */ +void +FinishConnectionEstablishment(MultiConnection *connection) +{ + static int checkIntervalMS = 200; + + /* + * Loop until connection is established, or failed (possibly just timed + * out). + */ + while (true) + { + ConnStatusType status = PQstatus(connection->conn); + PostgresPollingStatusType pollmode; + + if (status == CONNECTION_OK) + { + return; + } + + /* FIXME: retries? */ + if (status == CONNECTION_BAD) + { + return; + } + + pollmode = PQconnectPoll(connection->conn); + + /* + * FIXME: Do we want to add transparent retry support here? + */ + if (pollmode == PGRES_POLLING_FAILED) + { + return; + } + else if (pollmode == PGRES_POLLING_OK) + { + return; + } + else + { + Assert(pollmode == PGRES_POLLING_WRITING || + pollmode == PGRES_POLLING_READING); + } + + /* Loop, to handle poll() being interrupted by signals (EINTR) */ + while (true) + { + struct pollfd pollFileDescriptor; + int pollResult = 0; + + pollFileDescriptor.fd = PQsocket(connection->conn); + if (pollmode == PGRES_POLLING_READING) + { + pollFileDescriptor.events = POLLIN; + } + else + { + pollFileDescriptor.events = POLLOUT; + } + pollFileDescriptor.revents = 0; + + /* + * Only sleep for a limited amount of time, so we can react to + * interrupts in time, even if the platform doesn't interrupt + * poll() after signal arrival. + */ + pollResult = poll(&pollFileDescriptor, 1, checkIntervalMS); + + if (pollResult == 0) + { + /* + * Timeout exceeded. Two things to do: + * - check whether any interrupts arrived and handle them + * - check whether establishment for connection alreadly has + * lasted for too long, stop waiting if so. + */ + CHECK_FOR_INTERRUPTS(); + + if (TimestampDifferenceExceeds(connection->connectionStart, + GetCurrentTimestamp(), + CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000)) + { + ereport(WARNING, (errmsg("could not establish connection after %u ms", + CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000))); + + /* close connection, otherwise we take up resource on the other side */ + PQfinish(connection->conn); + connection->conn = NULL; + break; + } + } + else if (pollResult > 0) + { + /* + * IO possible, continue connection establishment. We could + * check for timeouts here as well, but if there's progress + * there seems little point. + */ + break; + } + else if (pollResult != EINTR) + { + /* Retrying, signal interrupted. So check. */ + CHECK_FOR_INTERRUPTS(); + } + else + { + /* + * We ERROR here, instead of just returning a failed + * connection, because this shouldn't happen, and indicates a + * programming error somewhere, not a network etc. issue. + */ + ereport(ERROR, (errcode_for_socket_access(), + errmsg("poll() failed: %m"))); + } + } + } +} + + +/* + * ClaimConnectionExclusively signals that this connection is actively being + * used. That means it'll not be, again, returned by + * StartNodeUserDatabaseConnection() et al until releases with + * UnclaimConnection(). + */ +void +ClaimConnectionExclusively(MultiConnection *connection) +{ + Assert(!connection->claimedExclusively); + connection->claimedExclusively = true; +} + + +/* + * UnclaimConnection signals that this connection is not being used + * anymore. That means it again may be returned by returned by + * StartNodeUserDatabaseConnection() et al. + */ +void +UnclaimConnection(MultiConnection *connection) +{ + connection->claimedExclusively = false; +} + + +static uint32 +ConnectionHashHash(const void *key, Size keysize) +{ + ConnectionHashKey *entry = (ConnectionHashKey *) key; + uint32 hash = 0; + + hash = string_hash(entry->hostname, NAMEDATALEN); + hash = hash_combine(hash, hash_uint32(entry->port)); + hash = hash_combine(hash, string_hash(entry->user, NAMEDATALEN)); + hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN)); + + return hash; +} + + +static int +ConnectionHashCompare(const void *a, const void *b, Size keysize) +{ + ConnectionHashKey *ca = (ConnectionHashKey *) a; + ConnectionHashKey *cb = (ConnectionHashKey *) b; + + if (strncmp(ca->hostname, cb->hostname, NAMEDATALEN) != 0 || + ca->port != cb->port || + strncmp(ca->user, cb->user, NAMEDATALEN) != 0 || + strncmp(ca->database, cb->database, NAMEDATALEN) != 0) + { + return 1; + } + else + { + return 0; + } +} + + +/* + * Asynchronously establish connection to a remote node, but don't wait for + * that to finish. DNS lookups etc. are performed synchronously though. + */ +static MultiConnection * +StartConnectionEstablishment(ConnectionHashKey *key) +{ + char nodePortString[12]; + const char *clientEncoding = GetDatabaseEncodingName(); + MultiConnection *connection = NULL; + + const char *keywords[] = { + "host", "port", "dbname", "user", + "client_encoding", "fallback_application_name", + NULL + }; + const char *values[] = { + key->hostname, nodePortString, key->database, key->user, + clientEncoding, "citus", NULL + }; + + connection = MemoryContextAllocZero(ConnectionContext, sizeof(MultiConnection)); + sprintf(nodePortString, "%d", key->port); + + strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH); + connection->port = key->port; + strlcpy(connection->database, key->database, NAMEDATALEN); + strlcpy(connection->user, key->user, NAMEDATALEN); + + connection->conn = PQconnectStartParams(keywords, values, false); + connection->connectionStart = GetCurrentTimestamp(); + + return connection; +} + + +/* + * Close all remote connections if necessary anymore (i.e. not session + * lifetime), or if in a failed state. + */ +static void +AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) +{ + dlist_mutable_iter iter; + + dlist_foreach_modify(iter, entry->connections) + { + MultiConnection *connection = dlist_container(MultiConnection, node, iter.cur); + + /* + * To avoid code leaking connections we warn if connections are + * still claimed exclusively. We can only do so if the transaction + * committed, as it's normal that code didn't have chance to clean + * up after errors. + */ + if (isCommit && connection->claimedExclusively) + { + ereport(WARNING, + (errmsg("connection claimed exclusively at transaction commit"))); + } + + /* + * Only let a connection life longer than a single transaction if + * instructed to do so by the caller. We also skip doing so if + * it's in a state that wouldn't allow us to run queries again. + */ + if (!connection->sessionLifespan || + PQstatus(connection->conn) != CONNECTION_OK || + PQtransactionStatus(connection->conn) != PQTRANS_IDLE) + { + PQfinish(connection->conn); + connection->conn = NULL; + + /* unlink from list */ + dlist_delete(iter.cur); + + pfree(connection); + } + else + { + /* reset per-transaction state */ + connection->activeInTransaction = false; + + UnclaimConnection(connection); + } + } +} diff --git a/src/include/distributed/commit_protocol.h b/src/include/distributed/commit_protocol.h index 140b0ade0..5805dfe9f 100644 --- a/src/include/distributed/commit_protocol.h +++ b/src/include/distributed/commit_protocol.h @@ -14,18 +14,12 @@ #include "access/xact.h" +#include "distributed/connection_management.h" #include "libpq-fe.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" -/* Enumeration that defines the different commit protocols available */ -typedef enum -{ - COMMIT_PROTOCOL_1PC = 0, - COMMIT_PROTOCOL_2PC = 1 -} CommitProtocolType; - /* Enumeration that defines different remote transaction states */ typedef enum { @@ -51,10 +45,6 @@ typedef struct TransactionConnection } TransactionConnection; -/* config variable managed via guc.c */ -extern int MultiShardCommitProtocol; - - /* Functions declarations for transaction and connection management */ extern void InitializeDistributedTransaction(void); extern void PrepareRemoteTransactions(List *connectionList); diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 7623cafb2..3d9fa1260 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -52,26 +52,11 @@ typedef struct NodeConnectionEntry } NodeConnectionEntry; -/* describes what kind of modifications have occurred in the current transaction */ -typedef enum -{ - XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */ - XACT_MODIFICATION_NONE, /* no modifications have taken place */ - XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */ - XACT_MODIFICATION_MULTI_SHARD /* multi-shard modifications have occurred */ -} XactModificationType; - - -/* state needed to prevent new connections during modifying transactions */ -extern XactModificationType XactModificationLevel; - - /* function declarations for obtaining and using a connection */ extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); extern void PurgeConnection(PGconn *connection); extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey); extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey); -extern bool SqlStateMatchesCategory(char *sqlStateString, int category); extern void WarnRemoteError(PGconn *connection, PGresult *result); extern void ReraiseRemoteError(PGconn *connection, PGresult *result); extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h new file mode 100644 index 000000000..c37f9d516 --- /dev/null +++ b/src/include/distributed/connection_management.h @@ -0,0 +1,140 @@ +/*------------------------------------------------------------------------- + * + * connection_management.h + * Central management of connections and their life-cycle + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CONNECTION_MANAGMENT_H +#define CONNECTION_MANAGMENT_H + +#include "distributed/transaction_management.h" +#include "lib/ilist.h" +#include "utils/hsearch.h" +#include "utils/timestamp.h" + +/* maximum (textual) lengths of hostname and port */ +#define MAX_NODE_LENGTH 255 /* includes 0 byte */ + +#define CLIENT_CONNECT_TIMEOUT_SECONDS_INT 5 + +/* forward declare, to avoid forcing large headers on everyone */ +struct pg_conn; /* target of the PGconn typedef */ +struct MemoryContextData; + +/* + * Flags determining connection establishment behaviour. + */ +enum MultiConnectionMode +{ + /* allow establishment of new connections */ + NEW_CONNECTION = 1 << 0, + + /* allow use of pre-established connections */ + CACHED_CONNECTION = 1 << 1, + + /* mark returned connection having session lifespan */ + SESSION_LIFESPAN = 1 << 2, + + /* the connection will be used for DML */ + FOR_DML = 1 << 3, + + /* the connection will be used for DDL */ + FOR_DDL = 1 << 4, + + /* failures on this connection will fail entire coordinated transaction */ + CRITICAL_CONNECTION = 1 << 5 +}; + + +/* declaring this directly above makes uncrustify go crazy */ +typedef enum MultiConnectionMode MultiConnectionMode; + +typedef struct MultiConnection +{ + /* connection details, useful for error messages and such. */ + char hostname[MAX_NODE_LENGTH]; + int32 port; + char user[NAMEDATALEN]; + char database[NAMEDATALEN]; + + /* underlying libpq connection */ + struct pg_conn *conn; + + /* is the connection intended to be kept after transaction end */ + bool sessionLifespan; + + /* is the connection currently in use, and shouldn't be used by anything else */ + bool claimedExclusively; + + /* has the connection been used in the current coordinated transaction? */ + bool activeInTransaction; + + /* time connection establishment was started, for timeout */ + TimestampTz connectionStart; + + dlist_node node; +} MultiConnection; + + +/* + * Central connection management hash, mapping (host, port, user, database) to + * a list of connections. + * + * This hash is used to keep track of which connections are open to which + * node. Besides allowing connection reuse, that information is e.g. used to + * handle closing connections after the end of a transaction. + */ + +/* hash key */ +typedef struct ConnectionHashKey +{ + char hostname[MAX_NODE_LENGTH]; + int32 port; + char user[NAMEDATALEN]; + char database[NAMEDATALEN]; +} ConnectionHashKey; + +/* hash entry */ +typedef struct ConnectionHashEntry +{ + ConnectionHashKey key; + dlist_head *connections; +} ConnectionHashEntry; + +/* the hash table */ +extern HTAB *ConnectionHash; + +/* context for all connection and transaction related memory */ +extern struct MemoryContextData *ConnectionContext; + + +extern void AfterXactConnectionHandling(bool isCommit); +extern void InitializeConnectionManagement(void); + + +/* Low-level connection establishment APIs */ +extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname, + int32 port); +extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname, + int32 port); +extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, + int32 port, const char *user, const + char *database); +extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, + const char *hostname, + int32 port, + const char *user, + const char *database); +extern void CloseConnection(MultiConnection *connection); + +/* dealing with a connection */ +extern void FinishConnectionEstablishment(MultiConnection *connection); +extern void ClaimConnectionExclusively(MultiConnection *connection); +extern void UnclaimConnection(MultiConnection *connection); + + +#endif /* CONNECTION_MANAGMENT_H */ diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h new file mode 100644 index 000000000..8c154ac05 --- /dev/null +++ b/src/include/distributed/transaction_management.h @@ -0,0 +1,59 @@ +/*------------------------------------------------------------------------- + * transaction_management.h + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef TRANSACTION_MANAGMENT_H +#define TRANSACTION_MANAGMENT_H + +/* describes what kind of modifications have occurred in the current transaction */ +typedef enum +{ + XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */ + XACT_MODIFICATION_NONE, /* no modifications have taken place */ + XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */ + XACT_MODIFICATION_MULTI_SHARD /* multi-shard modifications have occurred */ +} XactModificationType; + + +/* + * Enum defining the state of a coordinated (i.e. a transaction potentially + * spanning several nodes). + */ +typedef enum CoordinatedTransactionState +{ + /* no coordinated transaction in progress, no connections established */ + COORD_TRANS_NONE, + + /* no coordinated transaction in progress, but connections established */ + COORD_TRANS_IDLE +} CoordinatedTransactionState; + + +/* Enumeration that defines the different commit protocols available */ +typedef enum +{ + COMMIT_PROTOCOL_1PC = 0, + COMMIT_PROTOCOL_2PC = 1 +} CommitProtocolType; + +/* config variable managed via guc.c */ +extern int MultiShardCommitProtocol; + +/* state needed to prevent new connections during modifying transactions */ +extern XactModificationType XactModificationLevel; + + +extern CoordinatedTransactionState CurrentCoordinatedTransactionState; + + +/* + * Initialization. + */ +extern void InitializeTransactionManagement(void); + + +#endif /* TRANSACTION_MANAGMENT_H */