diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index da852d5c1..50b25a6cf 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -20,6 +20,7 @@ #include "commands/dbcommands.h" #include "distributed/metadata_cache.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_server_executor.h" diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 3056bd9f9..caf4eeb2a 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -29,6 +29,7 @@ #include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" @@ -36,6 +37,7 @@ #include "distributed/multi_planner.h" #include "distributed/multi_router_executor.h" #include "distributed/relay_utility.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -1272,7 +1274,6 @@ RouterTransactionCallback(XactEvent event, void *arg) } /* reset transaction state */ - XactModificationLevel = XACT_MODIFICATION_NONE; xactParticipantHash = NULL; xactShardConnSetList = NIL; subXactAbortAttempted = false; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 549d20958..f2e75291a 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -19,6 +19,7 @@ #include "commands/explain.h" #include "executor/executor.h" #include "distributed/commit_protocol.h" +#include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/multi_copy.h" #include "distributed/multi_executor.h" @@ -31,7 +32,9 @@ #include "distributed/multi_server_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/multi_utility.h" +#include "distributed/remote_commands.h" #include "distributed/task_tracker.h" +#include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "postmaster/postmaster.h" @@ -150,6 +153,10 @@ _PG_init(void) /* organize that task tracker is started once server is up */ TaskTrackerRegister(); + /* initialize coordinated transaction management */ + InitializeTransactionManagement(); + InitializeConnectionManagement(); + /* initialize transaction callbacks */ RegisterRouterExecutorXactCallbacks(); RegisterShardPlacementXactCallbacks(); diff --git a/src/backend/distributed/transaction/commit_protocol.c b/src/backend/distributed/transaction/commit_protocol.c index 728f672f4..329512749 100644 --- a/src/backend/distributed/transaction/commit_protocol.c +++ b/src/backend/distributed/transaction/commit_protocol.c @@ -30,10 +30,6 @@ static uint32 DistributedTransactionId = 0; static StringInfo BuildTransactionName(int connectionId); -/* the commit protocol to use for COPY commands */ -int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC; - - /* * InitializeDistributedTransaction prepares the distributed transaction ID * used in transaction names. diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index 712beaa5e..cfbb44ec3 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -307,7 +307,6 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg) CloseConnections(connectionList); shardConnectionHash = NULL; - XactModificationLevel = XACT_MODIFICATION_NONE; subXactAbortAttempted = false; } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c new file mode 100644 index 000000000..e03f1476b --- /dev/null +++ b/src/backend/distributed/transaction/transaction_management.c @@ -0,0 +1,142 @@ +/*------------------------------------------------------------------------- + * + * transaction_management.c + * + * Transaction management for Citus. Most of the work is delegated to other + * subsystems, this files, and especially CoordinatedTransactionCallback, + * coordinates the work between them. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "libpq-fe.h" + +#include "miscadmin.h" + +#include "access/xact.h" +#include "distributed/connection_management.h" +#include "distributed/hash_helpers.h" +#include "distributed/transaction_management.h" +#include "utils/hsearch.h" + + +CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE; + +/* GUC, the commit protocol to use for commands affecting more than one connection */ +int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC; + +/* state needed to keep track of operations used during a transaction */ +XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; + + +static bool subXactAbortAttempted = false; + + +/* transaction management functions */ +static void CoordinatedTransactionCallback(XactEvent event, void *arg); +static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, + SubTransactionId parentSubid, void *arg); + + +void +InitializeTransactionManagement(void) +{ + /* hook into transaction machinery */ + RegisterXactCallback(CoordinatedTransactionCallback, NULL); + RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL); +} + + +/* + * Transaction management callback, handling coordinated transaction, and + * transaction independent connection management. + * + * NB: There should only ever be a single transaction callback in citus, the + * ordering between the callbacks and thee actions within those callbacks + * otherwise becomes too undeterministic / hard to reason about. + */ +static void +CoordinatedTransactionCallback(XactEvent event, void *arg) +{ + switch (event) + { + case XACT_EVENT_COMMIT: + { + /* close connections etc. */ + if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) + { + AtEOXact_Connections(true); + } + + Assert(!subXactAbortAttempted); + CurrentCoordinatedTransactionState = COORD_TRANS_NONE; + XactModificationLevel = XACT_MODIFICATION_NONE; + } + break; + + case XACT_EVENT_ABORT: + { + /* close connections etc. */ + if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) + { + AtEOXact_Connections(false); + } + + CurrentCoordinatedTransactionState = COORD_TRANS_NONE; + XactModificationLevel = XACT_MODIFICATION_NONE; + subXactAbortAttempted = false; + } + break; + + case XACT_EVENT_PARALLEL_COMMIT: + case XACT_EVENT_PARALLEL_ABORT: + case XACT_EVENT_PREPARE: + { } + break; + + case XACT_EVENT_PRE_COMMIT: + { + if (subXactAbortAttempted) + { + subXactAbortAttempted = false; + + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot ROLLBACK TO SAVEPOINT in transactions " + "which modify distributed tables"))); + } + } + break; + + case XACT_EVENT_PARALLEL_PRE_COMMIT: + case XACT_EVENT_PRE_PREPARE: + { + /* + * FIXME: do we want to support this? Or error out? Might be + * annoying to error out as it could prevent experimentation. If + * we error out, we should only do so if a coordinated transaction + * has been started, so independent 2PC usage doesn't cause + * errors. + */ + } + break; + } +} + + +/* + * Subtransaction callback - currently only used to remember whether a + * savepoint has been rolled back, as we don't support that. + */ +static void +CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, + SubTransactionId parentSubid, void *arg) +{ + if (event == SUBXACT_EVENT_ABORT_SUB) + { + subXactAbortAttempted = true; + } +} diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index 8f852fdd4..efdaf46c9 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -29,10 +29,6 @@ #include "utils/memutils.h" #include "utils/palloc.h" - -/* state needed to keep track of operations used during a transaction */ -XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; - /* * NodeConnectionHash is the connection hash itself. It begins uninitialized. * The first call to GetOrEstablishConnection triggers hash creation. @@ -209,37 +205,6 @@ PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey) } -/* - * SqlStateMatchesCategory returns true if the given sql state (which may be - * NULL if unknown) is in the given error category. Note that we use - * ERRCODE_TO_CATEGORY macro to determine error category of the sql state and - * expect the caller to use the same macro for the error category. - */ -bool -SqlStateMatchesCategory(char *sqlStateString, int category) -{ - bool sqlStateMatchesCategory = false; - int sqlState = 0; - int sqlStateCategory = 0; - - if (sqlStateString == NULL) - { - return false; - } - - sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2], - sqlStateString[3], sqlStateString[4]); - - sqlStateCategory = ERRCODE_TO_CATEGORY(sqlState); - if (sqlStateCategory == category) - { - sqlStateMatchesCategory = true; - } - - return sqlStateMatchesCategory; -} - - /* * WarnRemoteError retrieves error fields from a remote result and produces an * error report at the WARNING level after amending the error with a CONTEXT diff --git a/src/backend/distributed/utils/connection_management.c b/src/backend/distributed/utils/connection_management.c new file mode 100644 index 000000000..462cc5061 --- /dev/null +++ b/src/backend/distributed/utils/connection_management.c @@ -0,0 +1,541 @@ +/*------------------------------------------------------------------------- + * + * connection_management.c + * Central management of connections and their life-cycle + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#ifdef HAVE_POLL_H +#include +#endif + +#include "libpq-fe.h" + +#include "miscadmin.h" + +#include "access/hash.h" +#include "commands/dbcommands.h" +#include "distributed/connection_management.h" +#include "distributed/metadata_cache.h" +#include "distributed/hash_helpers.h" +#include "mb/pg_wchar.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" + + +HTAB *ConnectionHash = NULL; +MemoryContext ConnectionContext = NULL; + +static uint32 ConnectionHashHash(const void *key, Size keysize); +static int ConnectionHashCompare(const void *a, const void *b, Size keysize); +static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); + + +/* + * Initialize per-backend connection management infrastructure. + */ +void +InitializeConnectionManagement(void) +{ + HASHCTL info; + uint32 hashFlags = 0; + + + /* + * Create a single context for connection and transaction related memory + * management. Doing so, instead of allocating in TopMemoryContext, makes + * it easier to associate used memory. + */ + ConnectionContext = AllocSetContextCreate(TopMemoryContext, "Connection Context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* create (host,port,user,database) -> [connection] hash */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ConnectionHashKey); + info.entrysize = sizeof(ConnectionHashEntry); + info.hash = ConnectionHashHash; + info.match = ConnectionHashCompare; + info.hcxt = ConnectionContext; + hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE); + + ConnectionHash = hash_create("citus connection cache (host,port,user,database)", + 64, &info, hashFlags); +} + + +/* + * Perform connection management activity after the end of a transaction. Both + * COMMIT and ABORT paths are handled here. + * + * This is called by Citus' global transaction callback. + */ +void +AtEOXact_Connections(bool isCommit) +{ + HASH_SEQ_STATUS status; + ConnectionHashEntry *entry; + + /* + * Close all remote connections if necessary anymore (i.e. not session + * lifetime), or if in a failed state. + */ + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + ListCell *previousCell = NULL; + ListCell *nextCell = NULL; + ListCell *connectionCell = NULL; + + /* + * Have to iterate "manually", to be able to delete connections in the + * middle of the list. + */ + for (connectionCell = list_head(entry->connections); + connectionCell != NULL; + connectionCell = nextCell) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + + nextCell = lnext(connectionCell); + + /* + * To avoid code leaking connections we warn if connections are + * still claimed exclusively. We can only do so if the transaction + * committed, as it's normal that code didn't have chance to clean + * up after errors. + */ + if (isCommit && connection->claimedExclusively) + { + ereport(WARNING, + (errmsg("connection claimed exclusively at transaction commit"))); + } + + /* + * Only let a connection life longer than a single transaction if + * instructed to do so by the caller. We also skip doing so if + * it's in a state that wouldn't allow us to run queries again. + */ + if (!connection->sessionLifespan || + PQstatus(connection->conn) != CONNECTION_OK || + PQtransactionStatus(connection->conn) != PQTRANS_IDLE) + { + PQfinish(connection->conn); + connection->conn = NULL; + + entry->connections = + list_delete_cell(entry->connections, connectionCell, previousCell); + + pfree(connection); + } + else + { + /* reset per-transaction state */ + connection->activeInTransaction = false; + + UnclaimConnection(connection); + + previousCell = connectionCell; + } + } + + /* + * NB: We leave the hash entry in place, even if there's no individual + * connections in it anymore. There seems no benefit in deleting it, + * and it'll save a bit of work in the next transaction. + */ + } +} + + +/* + * GetNodeConnection() establishes a connection to remote node, using default + * user and database. + * + * See StartNodeUserDatabaseConnection for details. + */ +MultiConnection * +GetNodeConnection(uint32 flags, const char *hostname, int32 port) +{ + return GetNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL); +} + + +/* + * StartNodeConnection initiate a connection to remote node, using default + * user and database. + * + * See StartNodeUserDatabaseConnection for details. + */ +MultiConnection * +StartNodeConnection(uint32 flags, const char *hostname, int32 port) +{ + return StartNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL); +} + + +/* + * GetNodeUserDatabaseConnection establishes connection to remote node. + * + * See StartNodeUserDatabaseConnection for details. + */ +MultiConnection * +GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const + char *user, const char *database) +{ + MultiConnection *connection; + + connection = StartNodeUserDatabaseConnection(flags, hostname, port, user, database); + + FinishConnectionEstablishment(connection); + + return connection; +} + + +/* + * StartNodeUserDatabaseConnection() initiates a connection to a remote node. + * + * If user or database are NULL, the current session's defaults are used. The + * following flags influence connection establishment behaviour: + * - NEW_CONNECTION - it is permitted to establish a new connection + * - CACHED_CONNECTION - it is permitted to re-use an established connection + * - SESSION_LIFESPAN - the connection should persist after transaction end + * - FOR_DML - only meaningful for placement associated connections + * - FOR_DDL - only meaningful for placement associated connections + * - CRITICAL_CONNECTION - transaction failures on this connection fail the entire + * coordinated transaction + * + * The returned connection has only been initiated, not fully + * established. That's useful to allow parallel connection establishment. If + * that's not desired use the Get* variant. + */ +MultiConnection * +StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const + char *user, const char *database) +{ + ConnectionHashKey key; + ConnectionHashEntry *entry = NULL; + MultiConnection *connection; + MemoryContext oldContext; + bool found; + + strlcpy(key.hostname, hostname, MAX_NODE_LENGTH); + key.port = port; + if (user) + { + strlcpy(key.user, user, NAMEDATALEN); + } + else + { + strlcpy(key.user, CurrentUserName(), NAMEDATALEN); + } + if (database) + { + strlcpy(key.database, database, NAMEDATALEN); + } + else + { + strlcpy(key.database, get_database_name(MyDatabaseId), NAMEDATALEN); + } + + if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE) + { + CurrentCoordinatedTransactionState = COORD_TRANS_IDLE; + } + + /* + * Lookup relevant hash entry. We always enter. If only a cached + * connection is desired, and there's none, we'll simply leave the + * connection list empty. + */ + + entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); + if (!found) + { + entry->connections = NIL; + } + + if (flags & CACHED_CONNECTION) + { + ListCell *connectionCell = NULL; + + /* check connection cache for a connection that's not already in use */ + foreach(connectionCell, entry->connections) + { + connection = (MultiConnection *) lfirst(connectionCell); + + /* don't return claimed connections */ + if (!connection->claimedExclusively) + { + if (flags & SESSION_LIFESPAN) + { + connection->sessionLifespan = true; + } + connection->activeInTransaction = true; + + /* + * Check whether we're right now allowed to open new + * connections. A cached connection counts as new if it hasn't + * been used in this transaction. + * + * FIXME: This should be removed soon, once all connections go + * through this API. + */ + if (!connection->activeInTransaction && + XactModificationLevel > XACT_MODIFICATION_DATA) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot open new connections after the first " + "modification command within a transaction"))); + } + + return connection; + } + + /* + * One could argue for erroring out when the connection is in a + * failed state. But that'd be a bad idea for two reasons: + * + * 1) Generally starting a connection might fail, after calling + * this function, so calling code needs to handle that anyway. + * 2) This might be used in code that transparently handles + * connection failure. + */ + } + + /* no connection available, done if a new connection isn't desirable */ + if (!(flags & NEW_CONNECTION)) + { + return NULL; + } + } + + /* + * Check whether we're right now allowed to open new connections. + * + * FIXME: This should be removed soon, once all connections go through + * this API. + */ + if (XactModificationLevel > XACT_MODIFICATION_DATA) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot open new connections after the first modification " + "command within a transaction"))); + } + + /* + * Either no caching desired, or no pre-established, non-claimed, + * connection present. Initiate connection establishment. + */ + connection = StartConnectionEstablishment(&key); + + oldContext = MemoryContextSwitchTo(ConnectionContext); + entry->connections = lappend(entry->connections, connection); + MemoryContextSwitchTo(oldContext); + + if (flags & SESSION_LIFESPAN) + { + connection->sessionLifespan = true; + } + + connection->activeInTransaction = true; + + return connection; +} + + +/* + * Synchronously finish connection establishment of an individual connection. + * + * TODO: Replace with variant waiting for multiple connections. + */ +void +FinishConnectionEstablishment(MultiConnection *connection) +{ + /* + * Loop until connection is established, or failed (possibly just timed + * out). + */ + while (true) + { + ConnStatusType status = PQstatus(connection->conn); + PostgresPollingStatusType pollmode; + + if (status == CONNECTION_OK) + { + return; + } + + /* FIXME: retries? */ + if (status == CONNECTION_BAD) + { + return; + } + + pollmode = PQconnectPoll(connection->conn); + + /* + * FIXME: Do we want to add transparent retry support here? + */ + if (pollmode == PGRES_POLLING_FAILED) + { + return; + } + else if (pollmode == PGRES_POLLING_OK) + { + return; + } + else + { + Assert(pollmode == PGRES_POLLING_WRITING || + pollmode == PGRES_POLLING_READING); + } + + /* Loop, to handle poll() being interrupted by signals (EINTR) */ + while (true) + { + struct pollfd pollFileDescriptor; + int pollResult = 0; + + pollFileDescriptor.fd = PQsocket(connection->conn); + if (pollmode == PGRES_POLLING_READING) + { + pollFileDescriptor.events = POLLIN; + } + else + { + pollFileDescriptor.events = POLLOUT; + } + pollFileDescriptor.revents = 0; + + pollResult = poll(&pollFileDescriptor, 1, CLIENT_CONNECT_TIMEOUT_SECONDS_INT); + + if (pollResult == 0) + { + /* timeout exceeded */ + } + else if (pollResult > 0) + { + /* IO possible, continue connection establishment */ + break; + } + else if (pollResult != EINTR) + { + /* retrying, signal */ + } + else + { + /* + * We ERROR here, instead of just returning a failed + * connection, because this shouldn't happen, and indicates a + * programming error somewhere, not a network etc. issue. + */ + ereport(ERROR, (errcode_for_socket_access(), + errmsg("poll() failed: %m"))); + } + } + } +} + + +/* + * ClaimConnectionExclusively signals that this connection is actively being + * used. That means it'll not be, again, returned by + * StartNodeUserDatabaseConnection() et al until releases with + * UnclaimConnection(). + */ +void +ClaimConnectionExclusively(MultiConnection *connection) +{ + Assert(!connection->claimedExclusively); + connection->claimedExclusively = true; +} + + +/* + * UnclaimConnection signals that this connection is not being used + * anymore. That means it again may be returned by returned by + * StartNodeUserDatabaseConnection() et al. + */ +void +UnclaimConnection(MultiConnection *connection) +{ + connection->claimedExclusively = false; +} + + +static uint32 +ConnectionHashHash(const void *key, Size keysize) +{ + ConnectionHashKey *entry = (ConnectionHashKey *) key; + uint32 hash = 0; + + hash = string_hash(entry->hostname, NAMEDATALEN); + hash = hash_combine(hash, hash_uint32(entry->port)); + hash = hash_combine(hash, string_hash(entry->user, NAMEDATALEN)); + hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN)); + + return hash; +} + + +static int +ConnectionHashCompare(const void *a, const void *b, Size keysize) +{ + ConnectionHashKey *ca = (ConnectionHashKey *) a; + ConnectionHashKey *cb = (ConnectionHashKey *) b; + + if (strncmp(ca->hostname, cb->hostname, NAMEDATALEN) != 0 || + ca->port != cb->port || + strncmp(ca->user, cb->user, NAMEDATALEN) != 0 || + strncmp(ca->database, cb->database, NAMEDATALEN) != 0) + { + return 1; + } + else + { + return 0; + } +} + + +/* + * Asynchronously establish connection to a remote node, but don't wait for + * that to finish. DNS lookups etc. are performed synchronously though. + */ +static MultiConnection * +StartConnectionEstablishment(ConnectionHashKey *key) +{ + char nodePortString[12]; + const char *clientEncoding = GetDatabaseEncodingName(); + MultiConnection *connection = NULL; + + const char *keywords[] = { + "host", "port", "dbname", "user", + "client_encoding", "fallback_application_name", + NULL + }; + const char *values[] = { + key->hostname, nodePortString, key->database, key->user, + clientEncoding, "citus", NULL + }; + + connection = MemoryContextAllocZero(ConnectionContext, sizeof(MultiConnection)); + sprintf(nodePortString, "%d", key->port); + + strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH); + connection->port = key->port; + strlcpy(connection->database, key->database, NAMEDATALEN); + strlcpy(connection->user, key->user, NAMEDATALEN); + + connection->conn = PQconnectStartParams(keywords, values, false); + + return connection; +} diff --git a/src/include/distributed/commit_protocol.h b/src/include/distributed/commit_protocol.h index df590bd48..43fbb6328 100644 --- a/src/include/distributed/commit_protocol.h +++ b/src/include/distributed/commit_protocol.h @@ -14,18 +14,12 @@ #include "access/xact.h" +#include "distributed/connection_management.h" #include "libpq-fe.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" -/* Enumeration that defines the different commit protocols available */ -typedef enum -{ - COMMIT_PROTOCOL_1PC = 0, - COMMIT_PROTOCOL_2PC = 1 -} CommitProtocolType; - /* Enumeration that defines different remote transaction states */ typedef enum { @@ -48,10 +42,6 @@ typedef struct TransactionConnection } TransactionConnection; -/* config variable managed via guc.c */ -extern int MultiShardCommitProtocol; - - /* Functions declarations for transaction and connection management */ extern void InitializeDistributedTransaction(void); extern void PrepareRemoteTransactions(List *connectionList); diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 54fb97a22..3d9fa1260 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -52,26 +52,11 @@ typedef struct NodeConnectionEntry } NodeConnectionEntry; -/* describes what kind of modifications have occurred in the current transaction */ -typedef enum -{ - XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */ - XACT_MODIFICATION_NONE, /* no modifications have taken place */ - XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */ - XACT_MODIFICATION_SCHEMA /* schema modifications (DDL) have occurred */ -} XactModificationType; - - -/* state needed to prevent new connections during modifying transactions */ -extern XactModificationType XactModificationLevel; - - /* function declarations for obtaining and using a connection */ extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); extern void PurgeConnection(PGconn *connection); extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey); extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey); -extern bool SqlStateMatchesCategory(char *sqlStateString, int category); extern void WarnRemoteError(PGconn *connection, PGresult *result); extern void ReraiseRemoteError(PGconn *connection, PGresult *result); extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h new file mode 100644 index 000000000..cc22b7794 --- /dev/null +++ b/src/include/distributed/connection_management.h @@ -0,0 +1,133 @@ +/*------------------------------------------------------------------------- + * + * connection_management.h + * Central management of connections and their life-cycle + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CONNECTION_MANAGMENT_H +#define CONNECTION_MANAGMENT_H + +#include "distributed/transaction_management.h" +#include "nodes/pg_list.h" +#include "utils/hsearch.h" + +/* maximum (textual) lengths of hostname and port */ +#define MAX_NODE_LENGTH 255 /* includes 0 byte */ + +#define CLIENT_CONNECT_TIMEOUT_SECONDS_INT 5 + +/* forward declare, to avoid forcing large headers on everyone */ +struct pg_conn; /* target of the PGconn typedef */ +struct MemoryContextData; + +/* + * Flags determining connection establishment behaviour. + */ +enum MultiConnectionMode +{ + /* allow establishment of new connections */ + NEW_CONNECTION = 1 << 0, + + /* allow use of pre-established connections */ + CACHED_CONNECTION = 1 << 1, + + /* mark returned connection having session lifespan */ + SESSION_LIFESPAN = 1 << 2, + + /* the connection will be used for DML */ + FOR_DML = 1 << 3, + + /* the connection will be used for DDL */ + FOR_DDL = 1 << 4, + + /* failures on this connection will fail entire coordinated transaction */ + CRITICAL_CONNECTION = 1 << 5 +}; + + +/* declaring this directly above makes uncrustify go crazy */ +typedef enum MultiConnectionMode MultiConnectionMode; + +typedef struct MultiConnection +{ + /* connection details, useful for error messages and such. */ + char hostname[MAX_NODE_LENGTH]; + int32 port; + char user[NAMEDATALEN]; + char database[NAMEDATALEN]; + + /* underlying libpq connection */ + struct pg_conn *conn; + + /* is the connection intended to be kept after transaction end */ + bool sessionLifespan; + + /* is the connection currently in use, and shouldn't be used by anything else */ + bool claimedExclusively; + + /* has the connection been used in the current coordinated transaction? */ + bool activeInTransaction; +} MultiConnection; + + +/* + * Central connection management hash, mapping (host, port, user, database) to + * a list of connections. + * + * This hash is used to keep track of which connections are open to which + * node. Besides allowing connection reuse, that information is e.g. used to + * handle closing connections after the end of a transaction. + */ + +/* hash key */ +typedef struct ConnectionHashKey +{ + char hostname[MAX_NODE_LENGTH]; + int32 port; + char user[NAMEDATALEN]; + char database[NAMEDATALEN]; +} ConnectionHashKey; + +/* hash entry */ +typedef struct ConnectionHashEntry +{ + ConnectionHashKey key; + List *connections; +} ConnectionHashEntry; + +/* the hash table */ +extern HTAB *ConnectionHash; + +/* context for all connection and transaction related memory */ +extern struct MemoryContextData *ConnectionContext; + + +extern void AtEOXact_Connections(bool isCommit); +extern void InitializeConnectionManagement(void); + + +/* Low-level connection establishment APIs */ +extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname, + int32 port); +extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname, + int32 port); +extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, + int32 port, const char *user, const + char *database); +extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, + const char *hostname, + int32 port, + const char *user, + const char *database); + +/* dealing with a connection */ +extern void FinishConnectionEstablishment(MultiConnection *connection); +extern void ClaimConnectionExclusively(MultiConnection *connection); +extern void UnclaimConnection(MultiConnection *connection); + + +#endif /* CONNECTION_MANAGMENT_H */ diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h new file mode 100644 index 000000000..77f4cfa37 --- /dev/null +++ b/src/include/distributed/transaction_management.h @@ -0,0 +1,59 @@ +/*------------------------------------------------------------------------- + * transaction_management.h + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef TRANSACTION_MANAGMENT_H +#define TRANSACTION_MANAGMENT_H + +/* describes what kind of modifications have occurred in the current transaction */ +typedef enum +{ + XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */ + XACT_MODIFICATION_NONE, /* no modifications have taken place */ + XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */ + XACT_MODIFICATION_SCHEMA /* schema modifications (DDL) have occurred */ +} XactModificationType; + + +/* + * Enum defining the state of a coordinated (i.e. a transaction potentially + * spanning several nodes). + */ +typedef enum CoordinatedTransactionState +{ + /* no coordinated transaction in progress, no connections established */ + COORD_TRANS_NONE, + + /* no coordinated transaction in progress, but connections established */ + COORD_TRANS_IDLE +} CoordinatedTransactionState; + + +/* Enumeration that defines the different commit protocols available */ +typedef enum +{ + COMMIT_PROTOCOL_1PC = 0, + COMMIT_PROTOCOL_2PC = 1 +} CommitProtocolType; + +/* config variable managed via guc.c */ +extern int MultiShardCommitProtocol; + +/* state needed to prevent new connections during modifying transactions */ +extern XactModificationType XactModificationLevel; + + +extern CoordinatedTransactionState CurrentCoordinatedTransactionState; + + +/* + * Initialization. + */ +extern void InitializeTransactionManagement(void); + + +#endif /* TRANSACTION_MANAGMENT_H */