From 883af02b54503e3f65836b991c91b71cea35cbfa Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 7 Oct 2016 17:35:20 -0700 Subject: [PATCH 1/5] Add some basic helpers to make use of dynahash hashtables easier. --- src/backend/distributed/utils/hash_helpers.c | 34 ++++++++++++++++++++ src/include/distributed/hash_helpers.h | 30 +++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 src/backend/distributed/utils/hash_helpers.c create mode 100644 src/include/distributed/hash_helpers.h diff --git a/src/backend/distributed/utils/hash_helpers.c b/src/backend/distributed/utils/hash_helpers.c new file mode 100644 index 000000000..e74616b37 --- /dev/null +++ b/src/backend/distributed/utils/hash_helpers.c @@ -0,0 +1,34 @@ +/*------------------------------------------------------------------------- + * + * hash_helpers.c + * Helpers for dynahash.c style hash tables. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "distributed/hash_helpers.h" +#include "utils/hsearch.h" + + +/* + * Empty a hash, without destroying the hash table itself. + */ +void +hash_delete_all(HTAB *htab) +{ + HASH_SEQ_STATUS status; + void *entry = NULL; + + hash_seq_init(&status, htab); + while ((entry = hash_seq_search(&status)) != 0) + { + bool found = false; + + hash_search(htab, entry, HASH_REMOVE, &found); + Assert(found); + } +} diff --git a/src/include/distributed/hash_helpers.h b/src/include/distributed/hash_helpers.h new file mode 100644 index 000000000..0fecf0c03 --- /dev/null +++ b/src/include/distributed/hash_helpers.h @@ -0,0 +1,30 @@ +/*------------------------------------------------------------------------- + * hash_helpers.h + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef HASH_HELPERS_H +#define HASH_HELPERS_H + +#include "utils/hsearch.h" + +/* + * Combine two hash values, resulting in another hash value, with decent bit + * mixing. + * + * Similar to boost's hash_combine(). + */ +static inline uint32 +hash_combine(uint32 a, uint32 b) +{ + a ^= b + 0x9e3779b9 + (a << 6) + (a >> 2); + return a; +} + + +extern void hash_delete_all(HTAB *htab); + +#endif From 3223b3c92d67b23f18763c9b3b7503f481f71cee Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 7 Oct 2016 14:48:25 -0700 Subject: [PATCH 2/5] Centralized Connection Lifetime Management. Connections are tracked and released by integrating into postgres' transaction handling. That allows to to use connections without having to resort to having to disable interrupts or using PG_TRY/CATCH blocks to avoid leaking connections. This is intended to eventually replace multi_client_executor.c and connection_cache.c, and to provide the basis of a centralized transaction management. The newly introduced transaction hook should, in the future, be the only one in citus, to allow for proper ordering between operations. For now this central handler is responsible for releasing connections and resetting XactModificationLevel after a transaction. --- src/backend/distributed/Makefile | 2 +- .../connection/connection_management.c | 638 ++++++++++++++++++ .../executor/multi_client_executor.c | 1 + .../executor/multi_router_executor.c | 3 +- src/backend/distributed/shared_library_init.c | 7 + .../distributed/transaction/commit_protocol.c | 4 - .../transaction/multi_shard_transaction.c | 1 - .../transaction/transaction_management.c | 144 ++++ .../distributed/utils/connection_cache.c | 4 - src/include/distributed/commit_protocol.h | 12 +- src/include/distributed/connection_cache.h | 14 - .../distributed/connection_management.h | 128 ++++ .../distributed/transaction_management.h | 57 ++ 13 files changed, 979 insertions(+), 36 deletions(-) create mode 100644 src/backend/distributed/connection/connection_management.c create mode 100644 src/backend/distributed/transaction/transaction_management.c create mode 100644 src/include/distributed/connection_management.h create mode 100644 src/include/distributed/transaction_management.h diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index c0210b92d..ee5b115ef 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -16,7 +16,7 @@ DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir) DATA_built = $(foreach v,$(EXTVERSIONS),$(EXTENSION)--$(v).sql) # directories with source files -SUBDIRS = . commands executor master metadata planner relay test transaction utils worker +SUBDIRS = . commands connection executor master metadata planner relay test transaction utils worker # That patsubst rule searches all directories listed in SUBDIRS for .c # files, and adds the corresponding .o files to OBJS diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c new file mode 100644 index 000000000..b023fcbec --- /dev/null +++ b/src/backend/distributed/connection/connection_management.c @@ -0,0 +1,638 @@ +/*------------------------------------------------------------------------- + * + * connection_management.c + * Central management of connections and their life-cycle + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#ifdef HAVE_POLL_H +#include +#endif + +#include "libpq-fe.h" + +#include "miscadmin.h" + +#include "access/hash.h" +#include "commands/dbcommands.h" +#include "distributed/connection_management.h" +#include "distributed/metadata_cache.h" +#include "distributed/hash_helpers.h" +#include "mb/pg_wchar.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" + + +HTAB *ConnectionHash = NULL; +MemoryContext ConnectionContext = NULL; + +static uint32 ConnectionHashHash(const void *key, Size keysize); +static int ConnectionHashCompare(const void *a, const void *b, Size keysize); +static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); +static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); +static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); + + +/* + * Initialize per-backend connection management infrastructure. + */ +void +InitializeConnectionManagement(void) +{ + HASHCTL info; + uint32 hashFlags = 0; + + + /* + * Create a single context for connection and transaction related memory + * management. Doing so, instead of allocating in TopMemoryContext, makes + * it easier to associate used memory. + */ + ConnectionContext = AllocSetContextCreate(TopMemoryContext, "Connection Context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* create (host,port,user,database) -> [connection] hash */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ConnectionHashKey); + info.entrysize = sizeof(ConnectionHashEntry); + info.hash = ConnectionHashHash; + info.match = ConnectionHashCompare; + info.hcxt = ConnectionContext; + hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE); + + ConnectionHash = hash_create("citus connection cache (host,port,user,database)", + 64, &info, hashFlags); +} + + +/* + * Perform connection management activity after the end of a transaction. Both + * COMMIT and ABORT paths are handled here. + * + * This is called by Citus' global transaction callback. + */ +void +AfterXactConnectionHandling(bool isCommit) +{ + HASH_SEQ_STATUS status; + ConnectionHashEntry *entry; + + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + AfterXactHostConnectionHandling(entry, isCommit); + + /* + * NB: We leave the hash entry in place, even if there's no individual + * connections in it anymore. There seems no benefit in deleting it, + * and it'll save a bit of work in the next transaction. + */ + } +} + + +/* + * GetNodeConnection() establishes a connection to remote node, using default + * user and database. + * + * See StartNodeUserDatabaseConnection for details. + */ +MultiConnection * +GetNodeConnection(uint32 flags, const char *hostname, int32 port) +{ + return GetNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL); +} + + +/* + * StartNodeConnection initiates a connection to remote node, using default + * user and database. + * + * See StartNodeUserDatabaseConnection for details. + */ +MultiConnection * +StartNodeConnection(uint32 flags, const char *hostname, int32 port) +{ + return StartNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL); +} + + +/* + * GetNodeUserDatabaseConnection establishes connection to remote node. + * + * See StartNodeUserDatabaseConnection for details. + */ +MultiConnection * +GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const + char *user, const char *database) +{ + MultiConnection *connection; + + connection = StartNodeUserDatabaseConnection(flags, hostname, port, user, database); + + FinishConnectionEstablishment(connection); + + return connection; +} + + +/* + * StartNodeUserDatabaseConnection() initiates a connection to a remote node. + * + * If user or database are NULL, the current session's defaults are used. The + * following flags influence connection establishment behaviour: + * - SESSION_LIFESPAN - the connection should persist after transaction end + * - FORCE_NEW_CONNECTION - a new connection is required + * + * The returned connection has only been initiated, not fully + * established. That's useful to allow parallel connection establishment. If + * that's not desired use the Get* variant. + */ +MultiConnection * +StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const + char *user, const char *database) +{ + ConnectionHashKey key; + ConnectionHashEntry *entry = NULL; + MultiConnection *connection; + MemoryContext oldContext; + bool found; + + /* do some minimal input checks */ + strlcpy(key.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + + key.port = port; + if (user) + { + strlcpy(key.user, user, NAMEDATALEN); + } + else + { + strlcpy(key.user, CurrentUserName(), NAMEDATALEN); + } + if (database) + { + strlcpy(key.database, database, NAMEDATALEN); + } + else + { + strlcpy(key.database, get_database_name(MyDatabaseId), NAMEDATALEN); + } + + if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE) + { + CurrentCoordinatedTransactionState = COORD_TRANS_IDLE; + } + + /* + * Lookup relevant hash entry. We always enter. If only a cached + * connection is desired, and there's none, we'll simply leave the + * connection list empty. + */ + + entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); + if (!found) + { + entry->connections = MemoryContextAlloc(ConnectionContext, + sizeof(dlist_head)); + dlist_init(entry->connections); + } + + /* if desired, check whether there's a usable connection */ + if (!(flags & FORCE_NEW_CONNECTION)) + { + /* check connection cache for a connection that's not already in use */ + connection = FindAvailableConnection(entry->connections, flags); + if (connection) + { + if (flags & SESSION_LIFESPAN) + { + connection->sessionLifespan = true; + } + + return connection; + } + } + + /* + * Either no caching desired, or no pre-established, non-claimed, + * connection present. Initiate connection establishment. + */ + connection = StartConnectionEstablishment(&key); + + oldContext = MemoryContextSwitchTo(ConnectionContext); + dlist_push_tail(entry->connections, &connection->connectionNode); + + MemoryContextSwitchTo(oldContext); + + if (flags & SESSION_LIFESPAN) + { + connection->sessionLifespan = true; + } + + return connection; +} + + +/* StartNodeUserDatabaseConnection() helper */ +static MultiConnection * +FindAvailableConnection(dlist_head *connections, uint32 flags) +{ + dlist_iter iter; + + dlist_foreach(iter, connections) + { + MultiConnection *connection = + dlist_container(MultiConnection, connectionNode, iter.cur); + + /* don't return claimed connections */ + if (connection->claimedExclusively) + { + continue; + } + + return connection; + } + + return NULL; +} + + +/* + * Return MultiConnection associated with the libpq connection. + * + * Note that this is comparatively expensive. Should only be used for + * backward-compatibility purposes. + */ +MultiConnection * +GetConnectionFromPGconn(struct pg_conn *pqConn) +{ + HASH_SEQ_STATUS status; + ConnectionHashEntry *entry; + + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + dlist_head *connections = entry->connections; + dlist_iter iter; + + /* check connection cache for a connection that's not already in use */ + dlist_foreach(iter, connections) + { + MultiConnection *connection = + dlist_container(MultiConnection, connectionNode, iter.cur); + + if (connection->pgConn == pqConn) + { + hash_seq_term(&status); + return connection; + } + } + } + + return NULL; +} + + +/* + * Close a previously established connection. + */ +void +CloseConnection(MultiConnection *connection) +{ + ConnectionHashKey key; + bool found; + + /* close connection */ + PQfinish(connection->pgConn); + connection->pgConn = NULL; + + strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH); + key.port = connection->port; + strlcpy(key.user, connection->user, NAMEDATALEN); + strlcpy(key.database, connection->database, NAMEDATALEN); + + hash_search(ConnectionHash, &key, HASH_FIND, &found); + + if (found) + { + /* unlink from list */ + dlist_delete(&connection->connectionNode); + + /* we leave the per-host entry alive */ + pfree(connection); + } + else + { + ereport(ERROR, (errmsg("closing untracked connection"))); + } +} + + +/* + * Close a previously established connection. + * + * This function closes the MultiConnection associatated with the libpq + * connection. + * + * Note that this is comparatively expensive. Should only be used for + * backward-compatibility purposes. + */ +void +CloseConnectionByPGconn(PGconn *pqConn) +{ + MultiConnection *connection = GetConnectionFromPGconn(pqConn); + + if (connection) + { + CloseConnection(connection); + } + else + { + ereport(WARNING, (errmsg("could not find connection to close"))); + } +} + + +/* + * Synchronously finish connection establishment of an individual connection. + * + * TODO: Replace with variant waiting for multiple connections. + */ +void +FinishConnectionEstablishment(MultiConnection *connection) +{ + static int checkIntervalMS = 200; + + /* + * Loop until connection is established, or failed (possibly just timed + * out). + */ + while (true) + { + ConnStatusType status = PQstatus(connection->pgConn); + PostgresPollingStatusType pollmode; + + if (status == CONNECTION_OK) + { + return; + } + + /* FIXME: retries? */ + if (status == CONNECTION_BAD) + { + return; + } + + pollmode = PQconnectPoll(connection->pgConn); + + /* + * FIXME: Do we want to add transparent retry support here? + */ + if (pollmode == PGRES_POLLING_FAILED) + { + return; + } + else if (pollmode == PGRES_POLLING_OK) + { + return; + } + else + { + Assert(pollmode == PGRES_POLLING_WRITING || + pollmode == PGRES_POLLING_READING); + } + + /* Loop, to handle poll() being interrupted by signals (EINTR) */ + while (true) + { + struct pollfd pollFileDescriptor; + int pollResult = 0; + + pollFileDescriptor.fd = PQsocket(connection->pgConn); + if (pollmode == PGRES_POLLING_READING) + { + pollFileDescriptor.events = POLLIN; + } + else + { + pollFileDescriptor.events = POLLOUT; + } + pollFileDescriptor.revents = 0; + + /* + * Only sleep for a limited amount of time, so we can react to + * interrupts in time, even if the platform doesn't interrupt + * poll() after signal arrival. + */ + pollResult = poll(&pollFileDescriptor, 1, checkIntervalMS); + + if (pollResult == 0) + { + /* + * Timeout exceeded. Two things to do: + * - check whether any interrupts arrived and handle them + * - check whether establishment for connection already has + * lasted for too long, stop waiting if so. + */ + CHECK_FOR_INTERRUPTS(); + + if (TimestampDifferenceExceeds(connection->connectionStart, + GetCurrentTimestamp(), + CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000)) + { + ereport(WARNING, (errmsg("could not establish connection after %u ms", + CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000))); + + /* close connection, otherwise we take up resource on the other side */ + PQfinish(connection->pgConn); + connection->pgConn = NULL; + break; + } + } + else if (pollResult > 0) + { + /* + * IO possible, continue connection establishment. We could + * check for timeouts here as well, but if there's progress + * there seems little point. + */ + break; + } + else if (pollResult != EINTR) + { + /* Retrying, signal interrupted. So check. */ + CHECK_FOR_INTERRUPTS(); + } + else + { + /* + * We ERROR here, instead of just returning a failed + * connection, because this shouldn't happen, and indicates a + * programming error somewhere, not a network etc. issue. + */ + ereport(ERROR, (errcode_for_socket_access(), + errmsg("poll() failed: %m"))); + } + } + } +} + + +/* + * ClaimConnectionExclusively signals that this connection is actively being + * used. That means it'll not be, again, returned by + * StartNodeUserDatabaseConnection() et al until releases with + * UnclaimConnection(). + */ +void +ClaimConnectionExclusively(MultiConnection *connection) +{ + Assert(!connection->claimedExclusively); + connection->claimedExclusively = true; +} + + +/* + * UnclaimConnection signals that this connection is not being used + * anymore. That means it again may be returned by + * StartNodeUserDatabaseConnection() et al. + */ +void +UnclaimConnection(MultiConnection *connection) +{ + connection->claimedExclusively = false; +} + + +static uint32 +ConnectionHashHash(const void *key, Size keysize) +{ + ConnectionHashKey *entry = (ConnectionHashKey *) key; + uint32 hash = 0; + + hash = string_hash(entry->hostname, NAMEDATALEN); + hash = hash_combine(hash, hash_uint32(entry->port)); + hash = hash_combine(hash, string_hash(entry->user, NAMEDATALEN)); + hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN)); + + return hash; +} + + +static int +ConnectionHashCompare(const void *a, const void *b, Size keysize) +{ + ConnectionHashKey *ca = (ConnectionHashKey *) a; + ConnectionHashKey *cb = (ConnectionHashKey *) b; + + if (strncmp(ca->hostname, cb->hostname, NAMEDATALEN) != 0 || + ca->port != cb->port || + strncmp(ca->user, cb->user, NAMEDATALEN) != 0 || + strncmp(ca->database, cb->database, NAMEDATALEN) != 0) + { + return 1; + } + else + { + return 0; + } +} + + +/* + * Asynchronously establish connection to a remote node, but don't wait for + * that to finish. DNS lookups etc. are performed synchronously though. + */ +static MultiConnection * +StartConnectionEstablishment(ConnectionHashKey *key) +{ + char nodePortString[12]; + const char *clientEncoding = GetDatabaseEncodingName(); + MultiConnection *connection = NULL; + + const char *keywords[] = { + "host", "port", "dbname", "user", + "client_encoding", "fallback_application_name", + NULL + }; + const char *values[] = { + key->hostname, nodePortString, key->database, key->user, + clientEncoding, "citus", NULL + }; + + connection = MemoryContextAllocZero(ConnectionContext, sizeof(MultiConnection)); + sprintf(nodePortString, "%d", key->port); + + strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH); + connection->port = key->port; + strlcpy(connection->database, key->database, NAMEDATALEN); + strlcpy(connection->user, key->user, NAMEDATALEN); + + connection->pgConn = PQconnectStartParams(keywords, values, false); + connection->connectionStart = GetCurrentTimestamp(); + + return connection; +} + + +/* + * Close all remote connections if necessary anymore (i.e. not session + * lifetime), or if in a failed state. + */ +static void +AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) +{ + dlist_mutable_iter iter; + + dlist_foreach_modify(iter, entry->connections) + { + MultiConnection *connection = + dlist_container(MultiConnection, connectionNode, iter.cur); + + /* + * To avoid code leaking connections we warn if connections are + * still claimed exclusively. We can only do so if the transaction + * committed, as it's normal that code didn't have chance to clean + * up after errors. + */ + if (isCommit && connection->claimedExclusively) + { + ereport(WARNING, + (errmsg("connection claimed exclusively at transaction commit"))); + } + + /* + * Preserve session lifespan connections if they are still healthy. + */ + if (!connection->sessionLifespan || + PQstatus(connection->pgConn) != CONNECTION_OK || + PQtransactionStatus(connection->pgConn) != PQTRANS_IDLE) + { + PQfinish(connection->pgConn); + connection->pgConn = NULL; + + /* unlink from list */ + dlist_delete(iter.cur); + + pfree(connection); + } + else + { + UnclaimConnection(connection); + } + } +} diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index da852d5c1..50b25a6cf 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -20,6 +20,7 @@ #include "commands/dbcommands.h" #include "distributed/metadata_cache.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_server_executor.h" diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index aee7e09a5..d51d047a4 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -30,6 +30,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" @@ -39,6 +40,7 @@ #include "distributed/multi_router_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/relay_utility.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -1797,7 +1799,6 @@ RouterTransactionCallback(XactEvent event, void *arg) } /* reset transaction state */ - XactModificationLevel = XACT_MODIFICATION_NONE; xactParticipantHash = NULL; xactShardConnSetList = NIL; subXactAbortAttempted = false; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 7e1ffa904..91f609d35 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -20,6 +20,7 @@ #include "executor/executor.h" #include "distributed/citus_nodefuncs.h" #include "distributed/commit_protocol.h" +#include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/multi_copy.h" #include "distributed/multi_executor.h" @@ -32,7 +33,9 @@ #include "distributed/multi_server_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/multi_utility.h" +#include "distributed/remote_commands.h" #include "distributed/task_tracker.h" +#include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "postmaster/postmaster.h" @@ -154,6 +157,10 @@ _PG_init(void) /* organize that task tracker is started once server is up */ TaskTrackerRegister(); + /* initialize coordinated transaction management */ + InitializeTransactionManagement(); + InitializeConnectionManagement(); + /* initialize transaction callbacks */ RegisterRouterExecutorXactCallbacks(); RegisterShardPlacementXactCallbacks(); diff --git a/src/backend/distributed/transaction/commit_protocol.c b/src/backend/distributed/transaction/commit_protocol.c index 1c5ac7417..dd81d6f5c 100644 --- a/src/backend/distributed/transaction/commit_protocol.c +++ b/src/backend/distributed/transaction/commit_protocol.c @@ -26,10 +26,6 @@ static uint32 DistributedTransactionId = 0; -/* the commit protocol to use for COPY commands */ -int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC; - - /* * InitializeDistributedTransaction prepares the distributed transaction ID * used in transaction names. diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index 3829746ba..c94b77abc 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -332,7 +332,6 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg) CloseConnections(connectionList); shardConnectionHash = NULL; - XactModificationLevel = XACT_MODIFICATION_NONE; subXactAbortAttempted = false; } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c new file mode 100644 index 000000000..ef0dcc2e3 --- /dev/null +++ b/src/backend/distributed/transaction/transaction_management.c @@ -0,0 +1,144 @@ +/*------------------------------------------------------------------------- + * + * transaction_management.c + * + * Transaction management for Citus. Most of the work is delegated to other + * subsystems, this files, and especially CoordinatedTransactionCallback, + * coordinates the work between them. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "libpq-fe.h" + +#include "miscadmin.h" + +#include "access/xact.h" +#include "distributed/connection_management.h" +#include "distributed/hash_helpers.h" +#include "distributed/transaction_management.h" +#include "utils/hsearch.h" + + +CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE; + +/* GUC, the commit protocol to use for commands affecting more than one connection */ +int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC; + +/* state needed to keep track of operations used during a transaction */ +XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; + + +static bool subXactAbortAttempted = false; + + +/* transaction management functions */ +static void CoordinatedTransactionCallback(XactEvent event, void *arg); +static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, + SubTransactionId parentSubid, void *arg); + + +void +InitializeTransactionManagement(void) +{ + /* hook into transaction machinery */ + RegisterXactCallback(CoordinatedTransactionCallback, NULL); + RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL); +} + + +/* + * Transaction management callback, handling coordinated transaction, and + * transaction independent connection management. + * + * NB: There should only ever be a single transaction callback in citus, the + * ordering between the callbacks and thee actions within those callbacks + * otherwise becomes too undeterministic / hard to reason about. + */ +static void +CoordinatedTransactionCallback(XactEvent event, void *arg) +{ + switch (event) + { + case XACT_EVENT_COMMIT: + { + /* close connections etc. */ + if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) + { + AfterXactConnectionHandling(true); + } + + Assert(!subXactAbortAttempted); + CurrentCoordinatedTransactionState = COORD_TRANS_NONE; + XactModificationLevel = XACT_MODIFICATION_NONE; + } + break; + + case XACT_EVENT_ABORT: + { + /* close connections etc. */ + if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) + { + AfterXactConnectionHandling(false); + } + + CurrentCoordinatedTransactionState = COORD_TRANS_NONE; + XactModificationLevel = XACT_MODIFICATION_NONE; + subXactAbortAttempted = false; + } + break; + + case XACT_EVENT_PARALLEL_COMMIT: + case XACT_EVENT_PARALLEL_ABORT: + case XACT_EVENT_PREPARE: + { } + break; + + case XACT_EVENT_PRE_COMMIT: + { + if (subXactAbortAttempted) + { + subXactAbortAttempted = false; + + if (XactModificationLevel != XACT_MODIFICATION_NONE) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot ROLLBACK TO SAVEPOINT in transactions " + "which modify distributed tables"))); + } + } + } + break; + + case XACT_EVENT_PARALLEL_PRE_COMMIT: + case XACT_EVENT_PRE_PREPARE: + { + if (CurrentCoordinatedTransactionState > COORD_TRANS_NONE) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use 2PC in transactions involving " + "multiple servers"))); + } + } + break; + } +} + + +/* + * Subtransaction callback - currently only used to remember whether a + * savepoint has been rolled back, as we don't support that. + */ +static void +CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, + SubTransactionId parentSubid, void *arg) +{ + if (event == SUBXACT_EVENT_ABORT_SUB) + { + subXactAbortAttempted = true; + } +} diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index 28f97a8a7..3ad76f307 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -29,10 +29,6 @@ #include "utils/memutils.h" #include "utils/palloc.h" - -/* state needed to keep track of operations used during a transaction */ -XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; - /* * NodeConnectionHash is the connection hash itself. It begins uninitialized. * The first call to GetOrEstablishConnection triggers hash creation. diff --git a/src/include/distributed/commit_protocol.h b/src/include/distributed/commit_protocol.h index 140b0ade0..5805dfe9f 100644 --- a/src/include/distributed/commit_protocol.h +++ b/src/include/distributed/commit_protocol.h @@ -14,18 +14,12 @@ #include "access/xact.h" +#include "distributed/connection_management.h" #include "libpq-fe.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" -/* Enumeration that defines the different commit protocols available */ -typedef enum -{ - COMMIT_PROTOCOL_1PC = 0, - COMMIT_PROTOCOL_2PC = 1 -} CommitProtocolType; - /* Enumeration that defines different remote transaction states */ typedef enum { @@ -51,10 +45,6 @@ typedef struct TransactionConnection } TransactionConnection; -/* config variable managed via guc.c */ -extern int MultiShardCommitProtocol; - - /* Functions declarations for transaction and connection management */ extern void InitializeDistributedTransaction(void); extern void PrepareRemoteTransactions(List *connectionList); diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 7623cafb2..187fe88da 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -52,20 +52,6 @@ typedef struct NodeConnectionEntry } NodeConnectionEntry; -/* describes what kind of modifications have occurred in the current transaction */ -typedef enum -{ - XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */ - XACT_MODIFICATION_NONE, /* no modifications have taken place */ - XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */ - XACT_MODIFICATION_MULTI_SHARD /* multi-shard modifications have occurred */ -} XactModificationType; - - -/* state needed to prevent new connections during modifying transactions */ -extern XactModificationType XactModificationLevel; - - /* function declarations for obtaining and using a connection */ extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); extern void PurgeConnection(PGconn *connection); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h new file mode 100644 index 000000000..be88c79a3 --- /dev/null +++ b/src/include/distributed/connection_management.h @@ -0,0 +1,128 @@ +/*------------------------------------------------------------------------- + * + * connection_management.h + * Central management of connections and their life-cycle + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CONNECTION_MANAGMENT_H +#define CONNECTION_MANAGMENT_H + +#include "distributed/transaction_management.h" +#include "lib/ilist.h" +#include "utils/hsearch.h" +#include "utils/timestamp.h" + +/* maximum (textual) lengths of hostname and port */ +#define MAX_NODE_LENGTH 255 /* includes 0 byte */ + +#define CLIENT_CONNECT_TIMEOUT_SECONDS_INT 5 + +/* forward declare, to avoid forcing large headers on everyone */ +struct pg_conn; /* target of the PGconn typedef */ +struct MemoryContextData; + +/* + * Flags determining connection establishment behaviour. + */ +enum MultiConnectionMode +{ + /* force establishment of a new connection */ + FORCE_NEW_CONNECTION = 1 << 0, + + /* mark returned connection as having session lifespan */ + SESSION_LIFESPAN = 1 << 1 +}; + + +/* declaring this directly above makes uncrustify go crazy */ +typedef enum MultiConnectionMode MultiConnectionMode; + +typedef struct MultiConnection +{ + /* connection details, useful for error messages and such. */ + char hostname[MAX_NODE_LENGTH]; + int32 port; + char user[NAMEDATALEN]; + char database[NAMEDATALEN]; + + /* underlying libpq connection */ + struct pg_conn *pgConn; + + /* is the connection intended to be kept after transaction end */ + bool sessionLifespan; + + /* is the connection currently in use, and shouldn't be used by anything else */ + bool claimedExclusively; + + /* time connection establishment was started, for timeout */ + TimestampTz connectionStart; + + /* membership in list of list of connections in ConnectionHashEntry */ + dlist_node connectionNode; +} MultiConnection; + + +/* + * Central connection management hash, mapping (host, port, user, database) to + * a list of connections. + * + * This hash is used to keep track of which connections are open to which + * node. Besides allowing connection reuse, that information is e.g. used to + * handle closing connections after the end of a transaction. + */ + +/* hash key */ +typedef struct ConnectionHashKey +{ + char hostname[MAX_NODE_LENGTH]; + int32 port; + char user[NAMEDATALEN]; + char database[NAMEDATALEN]; +} ConnectionHashKey; + +/* hash entry */ +typedef struct ConnectionHashEntry +{ + ConnectionHashKey key; + dlist_head *connections; +} ConnectionHashEntry; + +/* the hash table */ +extern HTAB *ConnectionHash; + +/* context for all connection and transaction related memory */ +extern struct MemoryContextData *ConnectionContext; + + +extern void AfterXactConnectionHandling(bool isCommit); +extern void InitializeConnectionManagement(void); + + +/* Low-level connection establishment APIs */ +extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname, + int32 port); +extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname, + int32 port); +extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, + int32 port, const char *user, const + char *database); +extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, + const char *hostname, + int32 port, + const char *user, + const char *database); +extern MultiConnection * GetConnectionFromPGconn(struct pg_conn *pqConn); +extern void CloseConnection(MultiConnection *connection); +extern void CloseConnectionByPGconn(struct pg_conn *pqConn); + +/* dealing with a connection */ +extern void FinishConnectionEstablishment(MultiConnection *connection); +extern void ClaimConnectionExclusively(MultiConnection *connection); +extern void UnclaimConnection(MultiConnection *connection); + + +#endif /* CONNECTION_MANAGMENT_H */ diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h new file mode 100644 index 000000000..91a5cd181 --- /dev/null +++ b/src/include/distributed/transaction_management.h @@ -0,0 +1,57 @@ +/*------------------------------------------------------------------------- + * transaction_management.h + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef TRANSACTION_MANAGMENT_H +#define TRANSACTION_MANAGMENT_H + +/* describes what kind of modifications have occurred in the current transaction */ +typedef enum +{ + XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */ + XACT_MODIFICATION_NONE, /* no modifications have taken place */ + XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */ + XACT_MODIFICATION_MULTI_SHARD /* multi-shard modifications have occurred */ +} XactModificationType; + + +/* + * Enum defining the state of a coordinated (i.e. a transaction potentially + * spanning several nodes). + */ +typedef enum CoordinatedTransactionState +{ + /* no coordinated transaction in progress, no connections established */ + COORD_TRANS_NONE, + + /* no coordinated transaction in progress, but connections established */ + COORD_TRANS_IDLE +} CoordinatedTransactionState; + + +/* Enumeration that defines the different commit protocols available */ +typedef enum +{ + COMMIT_PROTOCOL_1PC = 0, + COMMIT_PROTOCOL_2PC = 1 +} CommitProtocolType; + +/* config variable managed via guc.c */ +extern int MultiShardCommitProtocol; + +/* state needed to prevent new connections during modifying transactions */ +extern XactModificationType XactModificationLevel; + + +extern CoordinatedTransactionState CurrentCoordinatedTransactionState; + + +/* initialization function(s) */ +extern void InitializeTransactionManagement(void); + + +#endif /* TRANSACTION_MANAGMENT_H */ From 3505d431cd2fe841436cde8e2ab56f785b2ef12a Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 6 Dec 2016 14:38:58 -0800 Subject: [PATCH 3/5] Add initial helpers to make interactions with MultiConnection et al. easier. This includes basic infrastructure for logging of commands sent to remote/worker nodes. Note that this has no effect as of yet, since no callers are converted to the new infrastructure. --- .../distributed/connection/remote_commands.c | 195 ++++++++++++++++++ src/backend/distributed/shared_library_init.c | 10 + .../distributed/utils/connection_cache.c | 31 --- src/include/distributed/connection_cache.h | 1 - src/include/distributed/remote_commands.h | 36 ++++ 5 files changed, 241 insertions(+), 32 deletions(-) create mode 100644 src/backend/distributed/connection/remote_commands.c create mode 100644 src/include/distributed/remote_commands.h diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c new file mode 100644 index 000000000..22ba1c9f7 --- /dev/null +++ b/src/backend/distributed/connection/remote_commands.c @@ -0,0 +1,195 @@ +/*------------------------------------------------------------------------- + * + * remote_commands.c + * Helpers to make it easier to execute command on remote nodes. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "libpq-fe.h" + +#include "distributed/connection_management.h" +#include "distributed/remote_commands.h" + + +/* GUC, determining whether statements sent to remote nodes are logged */ +bool LogRemoteCommands = false; + + +/* simple helpers */ + +/* + * IsResponseOK checks whether the result is a successful one. + */ +bool +IsResponseOK(PGresult *result) +{ + ExecStatusType resultStatus = PQresultStatus(result); + + if (resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK || + resultStatus == PGRES_COMMAND_OK) + { + return true; + } + + return false; +} + + +/* + * ForgetResults clears a connection from pending activity. + * + * XXX: In the future it might be a good idea to use use PQcancel() if results + * would require network IO. + */ +void +ForgetResults(MultiConnection *connection) +{ + while (true) + { + PGresult *result = NULL; + result = PQgetResult(connection->pgConn); + if (result == NULL) + { + break; + } + if (PQresultStatus(result) == PGRES_COPY_IN) + { + PQputCopyEnd(connection->pgConn, NULL); + + /* TODO: mark transaction as failed, once we can. */ + } + PQclear(result); + } +} + + +/* + * SqlStateMatchesCategory returns true if the given sql state (which may be + * NULL if unknown) is in the given error category. Note that we use + * ERRCODE_TO_CATEGORY macro to determine error category of the sql state and + * expect the caller to use the same macro for the error category. + */ +bool +SqlStateMatchesCategory(char *sqlStateString, int category) +{ + bool sqlStateMatchesCategory = false; + int sqlState = 0; + int sqlStateCategory = 0; + + if (sqlStateString == NULL) + { + return false; + } + + sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2], + sqlStateString[3], sqlStateString[4]); + + sqlStateCategory = ERRCODE_TO_CATEGORY(sqlState); + if (sqlStateCategory == category) + { + sqlStateMatchesCategory = true; + } + + return sqlStateMatchesCategory; +} + + +/* report errors & warnings */ + +/* + * Report libpq failure that's not associated with a result. + */ +void +ReportConnectionError(MultiConnection *connection, int elevel) +{ + char *nodeName = connection->hostname; + int nodePort = connection->port; + + ereport(elevel, (errmsg("connection error: %s:%d", nodeName, nodePort), + errdetail("%s", PQerrorMessage(connection->pgConn)))); +} + + +/* + * ReportResultError reports libpq failure associated with a result. + */ +void +ReportResultError(MultiConnection *connection, PGresult *result, int elevel) +{ + char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); + char *messagePrimary = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY); + char *messageDetail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL); + char *messageHint = PQresultErrorField(result, PG_DIAG_MESSAGE_HINT); + char *messageContext = PQresultErrorField(result, PG_DIAG_CONTEXT); + + char *nodeName = connection->hostname; + int nodePort = connection->port; + int sqlState = ERRCODE_INTERNAL_ERROR; + + if (sqlStateString != NULL) + { + sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2], + sqlStateString[3], sqlStateString[4]); + } + + /* + * If the PGresult did not contain a message, the connection may provide a + * suitable top level one. At worst, this is an empty string. + */ + if (messagePrimary == NULL) + { + char *lastNewlineIndex = NULL; + + messagePrimary = PQerrorMessage(connection->pgConn); + lastNewlineIndex = strrchr(messagePrimary, '\n'); + + /* trim trailing newline, if any */ + if (lastNewlineIndex != NULL) + { + *lastNewlineIndex = '\0'; + } + } + + ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary), + messageDetail ? errdetail("%s", messageDetail) : 0, + messageHint ? errhint("%s", messageHint) : 0, + messageContext ? errcontext("%s", messageContext) : 0, + errcontext("while executing command on %s:%d", + nodeName, nodePort))); +} + + +/* + * LogRemoteCommand logs commands send to remote nodes if + * citus.log_remote_commands wants us to do so. + */ +void +LogRemoteCommand(MultiConnection *connection, const char *command) +{ + if (!LogRemoteCommands) + { + return; + } + + ereport(LOG, (errmsg("issuing %s", command), + errdetail("on server %s:%d", connection->hostname, connection->port))); +} + + +/* wrappers around libpq functions, with command logging support */ + +/* + * SendRemoteCommand is a tiny PQsendQuery wrapper that logs remote commands, + * and accepts a MultiConnection instead of a plain PGconn. + */ +int +SendRemoteCommand(MultiConnection *connection, const char *command) +{ + LogRemoteCommand(connection, command); + return PQsendQuery(connection->pgConn, command); +} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 91f609d35..60c4f3656 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -277,6 +277,16 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.log_remote_commands", + gettext_noop("Log queries sent to other nodes in the server log"), + NULL, + &LogRemoteCommands, + false, + PGC_USERSET, + 0, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.explain_multi_logical_plan", gettext_noop("Enables Explain to print out distributed logical plans."), diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index 3ad76f307..db252d8c6 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -193,37 +193,6 @@ PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey) } -/* - * SqlStateMatchesCategory returns true if the given sql state (which may be - * NULL if unknown) is in the given error category. Note that we use - * ERRCODE_TO_CATEGORY macro to determine error category of the sql state and - * expect the caller to use the same macro for the error category. - */ -bool -SqlStateMatchesCategory(char *sqlStateString, int category) -{ - bool sqlStateMatchesCategory = false; - int sqlState = 0; - int sqlStateCategory = 0; - - if (sqlStateString == NULL) - { - return false; - } - - sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2], - sqlStateString[3], sqlStateString[4]); - - sqlStateCategory = ERRCODE_TO_CATEGORY(sqlState); - if (sqlStateCategory == category) - { - sqlStateMatchesCategory = true; - } - - return sqlStateMatchesCategory; -} - - /* * WarnRemoteError retrieves error fields from a remote result and produces an * error report at the WARNING level after amending the error with a CONTEXT diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 187fe88da..3d9fa1260 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -57,7 +57,6 @@ extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); extern void PurgeConnection(PGconn *connection); extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey); extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey); -extern bool SqlStateMatchesCategory(char *sqlStateString, int category); extern void WarnRemoteError(PGconn *connection, PGresult *result); extern void ReraiseRemoteError(PGconn *connection, PGresult *result); extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser); diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h new file mode 100644 index 000000000..6ce25ccfd --- /dev/null +++ b/src/include/distributed/remote_commands.h @@ -0,0 +1,36 @@ +/*------------------------------------------------------------------------- + * + * remote_commands.h + * Helpers to execute commands on remote nodes, over libpq. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef REMOTE_COMMAND_H +#define REMOTE_COMMAND_H + +#include "distributed/connection_management.h" + + +/* GUC, determining whether statements sent to remote nodes are logged */ +extern bool LogRemoteCommands; + + +/* simple helpers */ +extern bool IsResponseOK(struct pg_result *result); +extern void ForgetResults(MultiConnection *connection); +extern bool SqlStateMatchesCategory(char *sqlStateString, int category); + +/* report errors & warnings */ +extern void ReportConnectionError(MultiConnection *connection, int elevel); +extern void ReportResultError(MultiConnection *connection, struct pg_result *result, + int elevel); +extern void LogRemoteCommand(MultiConnection *connection, const char *command); + +/* wrappers around libpq functions, with command logging support */ +extern int SendRemoteCommand(MultiConnection *connection, const char *command); + + +#endif /* REMOTE_COMMAND_H */ From a77cf36778ff537a5570fb97aea40c0ad0a9d5be Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 1 Dec 2016 02:06:11 -0800 Subject: [PATCH 4/5] Use connection_management.c from within connection_cache.c. This is a temporary step towards removing connection_cache.c. --- src/backend/distributed/commands/multi_copy.c | 4 +- .../executor/multi_router_executor.c | 32 ++- .../distributed/master/master_citus_tools.c | 9 +- .../distributed/test/connection_cache.c | 5 +- .../transaction/multi_shard_transaction.c | 3 +- .../transaction/worker_transaction.c | 6 +- .../distributed/utils/connection_cache.c | 182 +++--------------- src/include/distributed/connection_cache.h | 17 +- .../expected/multi_connection_cache.out | 2 +- 9 files changed, 60 insertions(+), 200 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index f8745d466..a36423881 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -329,13 +329,13 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag) PQclear(queryResult); /* close the connection */ - PQfinish(masterConnection); + CloseConnectionByPGconn(masterConnection); masterConnection = NULL; } PG_CATCH(); { /* close the connection */ - PQfinish(masterConnection); + CloseConnectionByPGconn(masterConnection); masterConnection = NULL; PG_RE_THROW(); diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index d51d047a4..f2294fca0 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -111,7 +111,7 @@ static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescrip Tuplestorestate *tupleStore); static PGconn * GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery); -static void PurgeConnectionForPlacement(ShardPlacement *placement); +static void PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement); static void RemoveXactConnection(PGconn *connection); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, @@ -277,7 +277,7 @@ InitTransactionStateForTask(Task *task) if (PQresultStatus(result) != PGRES_COMMAND_OK) { WarnRemoteError(connection, result); - PurgeConnection(connection); + CloseConnectionByPGconn(connection); connection = NULL; } @@ -794,7 +794,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { - PurgeConnectionForPlacement(taskPlacement); + PurgeConnectionForPlacement(connection, taskPlacement); failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; } @@ -852,7 +852,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, } else { - PurgeConnectionForPlacement(taskPlacement); + PurgeConnectionForPlacement(connection, taskPlacement); failedPlacementList = lappend(failedPlacementList, taskPlacement); @@ -956,8 +956,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn "commands"))); } - XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; - shardIntervalList = TaskShardIntervalList(taskList); /* ensure that there are no concurrent modifications on the same shards */ @@ -966,6 +964,8 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn /* open connection to all relevant placements, if not already open */ OpenTransactionsToAllShardPlacements(shardIntervalList, userName); + XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; + /* iterate over placements in rounds, to ensure in-order execution */ while (tasksPending) { @@ -1234,17 +1234,9 @@ GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery) * for the transaction in addition to purging the connection cache's entry. */ static void -PurgeConnectionForPlacement(ShardPlacement *placement) +PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement) { - NodeConnectionKey nodeKey; - char *currentUser = CurrentUserName(); - - MemSet(&nodeKey, 0, sizeof(NodeConnectionKey)); - strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); - nodeKey.nodePort = placement->nodePort; - strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN); - - PurgeConnectionByKey(&nodeKey); + CloseConnectionByPGconn(connection); /* * The following is logically identical to RemoveXactConnection, but since @@ -1256,7 +1248,13 @@ PurgeConnectionForPlacement(ShardPlacement *placement) { NodeConnectionEntry *participantEntry = NULL; bool entryFound = false; + NodeConnectionKey nodeKey; + char *currentUser = CurrentUserName(); + MemSet(&nodeKey, 0, sizeof(NodeConnectionKey)); + strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); + nodeKey.nodePort = placement->nodePort; + strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN); Assert(IsTransactionBlock()); /* the participant hash doesn't use the user field */ @@ -1862,7 +1860,7 @@ ExecuteTransactionEnd(bool commit) else { WarnRemoteError(connection, result); - PurgeConnection(participant->connection); + CloseConnectionByPGconn(participant->connection); participant->connection = NULL; } diff --git a/src/backend/distributed/master/master_citus_tools.c b/src/backend/distributed/master/master_citus_tools.c index d7d571e97..f8632d6a2 100644 --- a/src/backend/distributed/master/master_citus_tools.c +++ b/src/backend/distributed/master/master_citus_tools.c @@ -19,6 +19,7 @@ #include "access/htup_details.h" #include "catalog/pg_type.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/multi_server_executor.h" #include "distributed/worker_protocol.h" @@ -269,7 +270,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor { StoreErrorMessage(connection, queryResultString); statusArray[commandIndex] = false; - PQfinish(connection); + CloseConnectionByPGconn(connection); connectionArray[commandIndex] = NULL; finishedCount++; } @@ -298,7 +299,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor finishedCount++; statusArray[commandIndex] = success; connectionArray[commandIndex] = NULL; - PQfinish(connection); + CloseConnectionByPGconn(connection); } } @@ -509,7 +510,7 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, PQclear(queryResult); /* close the connection */ - PQfinish(nodeConnection); + CloseConnectionByPGconn(nodeConnection); nodeConnection = NULL; } PG_CATCH(); @@ -517,7 +518,7 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, StoreErrorMessage(nodeConnection, queryResultString); /* close the connection */ - PQfinish(nodeConnection); + CloseConnectionByPGconn(nodeConnection); nodeConnection = NULL; } PG_END_TRY(); diff --git a/src/backend/distributed/test/connection_cache.c b/src/backend/distributed/test/connection_cache.c index 8c46d0dd5..e88204c5a 100644 --- a/src/backend/distributed/test/connection_cache.c +++ b/src/backend/distributed/test/connection_cache.c @@ -21,6 +21,7 @@ #include "catalog/pg_type.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/test_helper_functions.h" /* IWYU pragma: keep */ #include "utils/elog.h" @@ -124,7 +125,7 @@ get_and_purge_connection(PG_FUNCTION_ARGS) PG_RETURN_BOOL(false); } - PurgeConnection(connection); + CloseConnectionByPGconn(connection); PG_RETURN_BOOL(true); } @@ -148,7 +149,7 @@ connect_and_purge_connection(PG_FUNCTION_ARGS) PG_RETURN_BOOL(false); } - PurgeConnection(connection); + CloseConnectionByPGconn(connection); PG_RETURN_BOOL(true); } diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index c94b77abc..9f3d15c0f 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -16,6 +16,7 @@ #include "distributed/colocation_utils.h" #include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_shard_transaction.h" @@ -361,6 +362,6 @@ CloseConnections(List *connectionList) (TransactionConnection *) lfirst(connectionCell); PGconn *connection = transactionConnection->connection; - PQfinish(connection); + CloseConnectionByPGconn(connection); } } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index ba5cebdb4..faca72802 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -233,12 +233,12 @@ SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char PQgetResult(workerConnection); /* we no longer need this connection */ - PQfinish(workerConnection); + CloseConnectionByPGconn(workerConnection); } PG_CATCH(); { /* close the connection */ - PQfinish(workerConnection); + CloseConnectionByPGconn(workerConnection); PG_RE_THROW(); } @@ -280,7 +280,7 @@ RemoveWorkerTransaction(char *nodeName, int32 nodePort) PGconn *connection = transactionConnection->connection; /* closing the connection will rollback all uncommited transactions */ - PQfinish(connection); + CloseConnectionByPGconn(connection); workerConnectionList = list_delete(workerConnectionList, transactionConnection); } diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index db252d8c6..7db3c35e4 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -2,7 +2,7 @@ * * connection_cache.c * - * This file contains functions to implement a connection hash. + * Legacy connection caching layer. Will be removed entirely. * * Copyright (c) 2014-2016, Citus Data, Inc. * @@ -19,8 +19,10 @@ #include #include "commands/dbcommands.h" +#include "distributed/connection_management.h" #include "distributed/connection_cache.h" #include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" #include "mb/pg_wchar.h" #include "utils/builtins.h" #include "utils/elog.h" @@ -29,15 +31,8 @@ #include "utils/memutils.h" #include "utils/palloc.h" -/* - * NodeConnectionHash is the connection hash itself. It begins uninitialized. - * The first call to GetOrEstablishConnection triggers hash creation. - */ -static HTAB *NodeConnectionHash = NULL; - /* local function forward declarations */ -static HTAB * CreateNodeConnectionHash(void); static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError); @@ -56,77 +51,26 @@ static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseEr PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort) { + int connectionFlags = SESSION_LIFESPAN; PGconn *connection = NULL; - NodeConnectionKey nodeConnectionKey; - NodeConnectionEntry *nodeConnectionEntry = NULL; - bool entryFound = false; - bool needNewConnection = true; - char *userName = CurrentUserName(); + MultiConnection *multiConnection = + GetNodeConnection(connectionFlags, nodeName, nodePort); - /* check input */ - if (strnlen(nodeName, MAX_NODE_LENGTH + 1) > MAX_NODE_LENGTH) + if (PQstatus(multiConnection->pgConn) == CONNECTION_OK) { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("hostname exceeds the maximum length of %d", - MAX_NODE_LENGTH))); + connection = multiConnection->pgConn; } - - /* if first call, initialize the connection hash */ - if (NodeConnectionHash == NULL) + else { - NodeConnectionHash = CreateNodeConnectionHash(); - } - - memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey)); - strlcpy(nodeConnectionKey.nodeName, nodeName, MAX_NODE_LENGTH + 1); - nodeConnectionKey.nodePort = nodePort; - strlcpy(nodeConnectionKey.nodeUser, userName, NAMEDATALEN); - - nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey, - HASH_FIND, &entryFound); - if (entryFound) - { - connection = nodeConnectionEntry->connection; - if (PQstatus(connection) == CONNECTION_OK) - { - needNewConnection = false; - } - else - { - PurgeConnection(connection); - } - } - - if (needNewConnection) - { - connection = ConnectToNode(nodeName, nodePort, nodeConnectionKey.nodeUser); - if (connection != NULL) - { - nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey, - HASH_ENTER, &entryFound); - nodeConnectionEntry->connection = connection; - } + ReportConnectionError(multiConnection, WARNING); + CloseConnection(multiConnection); + connection = NULL; } return connection; } -/* - * PurgeConnection removes the given connection from the connection hash and - * closes it using PQfinish. If our hash does not contain the given connection, - * this method simply prints a warning and exits. - */ -void -PurgeConnection(PGconn *connection) -{ - NodeConnectionKey nodeConnectionKey; - - BuildKeyForConnection(connection, &nodeConnectionKey); - PurgeConnectionByKey(&nodeConnectionKey); -} - - /* * Utility method to simplify populating a connection cache key with relevant * fields from a provided connection. @@ -170,29 +114,6 @@ BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey) } -PGconn * -PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey) -{ - bool entryFound = false; - NodeConnectionEntry *nodeConnectionEntry = NULL; - PGconn *connection = NULL; - - if (NodeConnectionHash != NULL) - { - nodeConnectionEntry = hash_search(NodeConnectionHash, nodeConnectionKey, - HASH_REMOVE, &entryFound); - } - - if (entryFound) - { - connection = nodeConnectionEntry->connection; - PQfinish(nodeConnectionEntry->connection); - } - - return connection; -} - - /* * WarnRemoteError retrieves error fields from a remote result and produces an * error report at the WARNING level after amending the error with a CONTEXT @@ -261,13 +182,11 @@ ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError) } /* - * If requested, actually raise an error. This necessitates purging the - * connection so it doesn't remain in the hash in an invalid state. + * If requested, actually raise an error. */ if (raiseError) { errorLevel = ERROR; - PurgeConnection(connection); } if (sqlState == ERRCODE_CONNECTION_FAILURE) @@ -288,79 +207,34 @@ ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError) } -/* - * CreateNodeConnectionHash returns a newly created hash table suitable for - * storing unlimited connections indexed by node name and port. - */ -static HTAB * -CreateNodeConnectionHash(void) -{ - HTAB *nodeConnectionHash = NULL; - HASHCTL info; - int hashFlags = 0; - - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(NodeConnectionKey); - info.entrysize = sizeof(NodeConnectionEntry); - info.hash = tag_hash; - info.hcxt = CacheMemoryContext; - hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); - - nodeConnectionHash = hash_create("citus connection cache", 32, &info, hashFlags); - - return nodeConnectionHash; -} - - /* * ConnectToNode opens a connection to a remote PostgreSQL server. The function * configures the connection's fallback application name to 'citus' and sets * the remote encoding to match the local one. All parameters are required to * be non NULL. * - * We attempt to connect up to MAX_CONNECT_ATTEMPT times. After that we give up - * and return NULL. + * This is only a thin layer over connection_management.[ch], and will be + * removed soon. */ PGconn * ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser) { + /* don't want already established connections */ + int connectionFlags = FORCE_NEW_CONNECTION; PGconn *connection = NULL; - const char *clientEncoding = GetDatabaseEncodingName(); - const char *dbname = get_database_name(MyDatabaseId); - int attemptIndex = 0; + MultiConnection *multiConnection = + GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, nodeUser, + NULL); - const char *keywordArray[] = { - "host", "port", "fallback_application_name", - "client_encoding", "connect_timeout", "dbname", "user", NULL - }; - char nodePortString[12]; - const char *valueArray[] = { - nodeName, nodePortString, "citus", clientEncoding, - CLIENT_CONNECT_TIMEOUT_SECONDS, dbname, nodeUser, NULL - }; - - sprintf(nodePortString, "%d", nodePort); - - Assert(sizeof(keywordArray) == sizeof(valueArray)); - - for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++) + if (PQstatus(multiConnection->pgConn) == CONNECTION_OK) { - connection = PQconnectdbParams(keywordArray, valueArray, false); - if (PQstatus(connection) == CONNECTION_OK) - { - break; - } - else - { - /* warn if still erroring on final attempt */ - if (attemptIndex == MAX_CONNECT_ATTEMPTS - 1) - { - WarnRemoteError(connection, NULL); - } - - PQfinish(connection); - connection = NULL; - } + connection = multiConnection->pgConn; + } + else + { + ReportConnectionError(multiConnection, WARNING); + CloseConnection(multiConnection); + connection = NULL; } return connection; diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 3d9fa1260..363042a94 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -16,21 +16,8 @@ #include "c.h" #include "libpq-fe.h" -#include "nodes/pg_list.h" #include "utils/hsearch.h" - -/* maximum duration to wait for connection */ -#define CLIENT_CONNECT_TIMEOUT_SECONDS "5" - -/* maximum (textual) lengths of hostname and port */ -#define MAX_NODE_LENGTH 255 -#define MAX_PORT_LENGTH 10 - -/* times to attempt connection (or reconnection) */ -#define MAX_CONNECT_ATTEMPTS 2 - -/* SQL statement for testing */ -#define TEST_SQL "DO $$ BEGIN RAISE EXCEPTION 'Raised remotely!'; END $$" +#include "distributed/connection_management.h" /* * NodeConnectionKey acts as the key to index into the (process-local) hash @@ -54,9 +41,7 @@ typedef struct NodeConnectionEntry /* function declarations for obtaining and using a connection */ extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); -extern void PurgeConnection(PGconn *connection); extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey); -extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey); extern void WarnRemoteError(PGconn *connection, PGresult *result); extern void ReraiseRemoteError(PGconn *connection, PGresult *result); extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser); diff --git a/src/test/regress/expected/multi_connection_cache.out b/src/test/regress/expected/multi_connection_cache.out index 79cf59dd2..9ac26c029 100644 --- a/src/test/regress/expected/multi_connection_cache.out +++ b/src/test/regress/expected/multi_connection_cache.out @@ -32,7 +32,7 @@ CREATE FUNCTION set_connection_status_bad(cstring, integer) \set VERBOSITY terse -- connect to non-existent host SELECT initialize_remote_temp_table('dummy-host-name', 12345); -WARNING: connection failed to dummy-host-name:12345 +WARNING: connection error: dummy-host-name:12345 initialize_remote_temp_table ------------------------------ f From 2374905c8958725fa2845aecbd39b3927ffd9a59 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 1 Dec 2016 14:36:42 -0800 Subject: [PATCH 5/5] Move multi_client_executor.[ch] ontop of connection_management.[ch]. That way connections can be automatically closed after errors and such, and the connection management infrastructure gets wider testing. It also fixes a few issues around connection string building. --- .../executor/multi_client_executor.c | 180 +++++++----------- .../distributed/multi_client_executor.h | 2 - 2 files changed, 71 insertions(+), 111 deletions(-) diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 50b25a6cf..539b8060d 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -23,6 +23,7 @@ #include "distributed/connection_management.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_server_executor.h" +#include "distributed/remote_commands.h" #include #include @@ -39,7 +40,7 @@ /* Local pool to track active connections */ -static PGconn *ClientConnectionArray[MAX_CONNECTION_COUNT]; +static MultiConnection *ClientConnectionArray[MAX_CONNECTION_COUNT]; /* * The value at any position on ClientPollingStatusArray is only defined when @@ -49,8 +50,8 @@ static PostgresPollingStatusType ClientPollingStatusArray[MAX_CONNECTION_COUNT]; /* Local functions forward declarations */ -static void ClearRemainingResults(PGconn *connection); -static bool ClientConnectionReady(PGconn *connection, +static void ClearRemainingResults(MultiConnection *connection); +static bool ClientConnectionReady(MultiConnection *connection, PostgresPollingStatusType pollingStatus); @@ -64,7 +65,7 @@ AllocateConnectionId(void) /* allocate connectionId from connection pool */ for (connIndex = 0; connIndex < MAX_CONNECTION_COUNT; connIndex++) { - PGconn *connection = ClientConnectionArray[connIndex]; + MultiConnection *connection = ClientConnectionArray[connIndex]; if (connection == NULL) { connectionId = connIndex; @@ -88,12 +89,16 @@ int32 MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDatabase, const char *userName) { - PGconn *connection = NULL; - char connInfoString[STRING_BUFFER_SIZE]; + MultiConnection *connection = NULL; ConnStatusType connStatusType = CONNECTION_OK; int32 connectionId = AllocateConnectionId(); - char *effectiveDatabaseName = NULL; - char *effectiveUserName = NULL; + int connectionFlags = FORCE_NEW_CONNECTION; /* no cached connections for now */ + + if (connectionId == INVALID_CONNECTION_ID) + { + ereport(WARNING, (errmsg("could not allocate connection in connection pool"))); + return connectionId; + } if (XactModificationLevel > XACT_MODIFICATION_NONE) { @@ -102,46 +107,11 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba "command within a transaction"))); } - if (connectionId == INVALID_CONNECTION_ID) - { - ereport(WARNING, (errmsg("could not allocate connection in connection pool"))); - return connectionId; - } - - if (nodeDatabase == NULL) - { - effectiveDatabaseName = get_database_name(MyDatabaseId); - } - else - { - effectiveDatabaseName = pstrdup(nodeDatabase); - } - - if (userName == NULL) - { - effectiveUserName = CurrentUserName(); - } - else - { - effectiveUserName = pstrdup(userName); - } - - /* - * FIXME: This code is bad on several levels. It completely forgoes any - * escaping, it misses setting a number of parameters, it works with a - * limited string size without erroring when it's too long. We shouldn't - * even build a query string this way, there's PQconnectdbParams()! - */ - - /* transcribe connection paremeters to string */ - snprintf(connInfoString, STRING_BUFFER_SIZE, CONN_INFO_TEMPLATE, - nodeName, nodePort, - effectiveDatabaseName, effectiveUserName, - CLIENT_CONNECT_TIMEOUT); - /* establish synchronous connection to worker node */ - connection = PQconnectdb(connInfoString); - connStatusType = PQstatus(connection); + connection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, + userName, nodeDatabase); + + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_OK) { @@ -149,15 +119,11 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba } else { - WarnRemoteError(connection, NULL); - - PQfinish(connection); + ReportConnectionError(connection, WARNING); + CloseConnection(connection); connectionId = INVALID_CONNECTION_ID; } - pfree(effectiveDatabaseName); - pfree(effectiveUserName); - return connectionId; } @@ -170,12 +136,11 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeDatabase) { - PGconn *connection = NULL; - char connInfoString[STRING_BUFFER_SIZE]; - ConnStatusType connStatusType = CONNECTION_BAD; - char *userName = CurrentUserName(); - + MultiConnection *connection = NULL; + ConnStatusType connStatusType = CONNECTION_OK; int32 connectionId = AllocateConnectionId(); + int connectionFlags = FORCE_NEW_CONNECTION; /* no cached connections for now */ + if (connectionId == INVALID_CONNECTION_ID) { ereport(WARNING, (errmsg("could not allocate connection in connection pool"))); @@ -189,13 +154,9 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD "command within a transaction"))); } - /* transcribe connection paremeters to string */ - snprintf(connInfoString, STRING_BUFFER_SIZE, CONN_INFO_TEMPLATE, - nodeName, nodePort, nodeDatabase, userName, CLIENT_CONNECT_TIMEOUT); - /* prepare asynchronous request for worker node connection */ - connection = PQconnectStart(connInfoString); - connStatusType = PQstatus(connection); + connection = StartNodeConnection(connectionFlags, nodeName, nodePort); + connStatusType = PQstatus(connection->pgConn); /* * If prepared, we save the connection, and set its initial polling status @@ -209,9 +170,9 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD } else { - WarnRemoteError(connection, NULL); + ReportConnectionError(connection, WARNING); + CloseConnection(connection); - PQfinish(connection); connectionId = INVALID_CONNECTION_ID; } @@ -223,7 +184,7 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD ConnectStatus MultiClientConnectPoll(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; PostgresPollingStatusType pollingStatus = PGRES_POLLING_OK; ConnectStatus connectStatus = CLIENT_INVALID_CONNECT; @@ -241,7 +202,7 @@ MultiClientConnectPoll(int32 connectionId) bool readReady = ClientConnectionReady(connection, PGRES_POLLING_READING); if (readReady) { - ClientPollingStatusArray[connectionId] = PQconnectPoll(connection); + ClientPollingStatusArray[connectionId] = PQconnectPoll(connection->pgConn); connectStatus = CLIENT_CONNECTION_BUSY; } else @@ -254,7 +215,7 @@ MultiClientConnectPoll(int32 connectionId) bool writeReady = ClientConnectionReady(connection, PGRES_POLLING_WRITING); if (writeReady) { - ClientPollingStatusArray[connectionId] = PQconnectPoll(connection); + ClientPollingStatusArray[connectionId] = PQconnectPoll(connection->pgConn); connectStatus = CLIENT_CONNECTION_BUSY; } else @@ -264,7 +225,7 @@ MultiClientConnectPoll(int32 connectionId) } else if (pollingStatus == PGRES_POLLING_FAILED) { - WarnRemoteError(connection, NULL); + ReportConnectionError(connection, WARNING); connectStatus = CLIENT_CONNECTION_BAD; } @@ -277,14 +238,14 @@ MultiClientConnectPoll(int32 connectionId) void MultiClientDisconnect(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; const int InvalidPollingStatus = -1; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - PQfinish(connection); + CloseConnection(connection); ClientConnectionArray[connectionId] = NULL; ClientPollingStatusArray[connectionId] = InvalidPollingStatus; @@ -298,7 +259,7 @@ MultiClientDisconnect(int32 connectionId) bool MultiClientConnectionUp(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; ConnStatusType connStatusType = CONNECTION_OK; bool connectionUp = true; @@ -306,7 +267,7 @@ MultiClientConnectionUp(int32 connectionId) connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - connStatusType = PQstatus(connection); + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_BAD) { connectionUp = false; @@ -340,7 +301,7 @@ MultiClientExecute(int32 connectionId, const char *query, void **queryResult, bool MultiClientSendQuery(int32 connectionId, const char *query) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; bool success = true; int querySent = 0; @@ -348,10 +309,10 @@ MultiClientSendQuery(int32 connectionId, const char *query) connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - querySent = PQsendQuery(connection, query); + querySent = PQsendQuery(connection->pgConn, query); if (querySent == 0) { - char *errorMessage = PQerrorMessage(connection); + char *errorMessage = PQerrorMessage(connection->pgConn); ereport(WARNING, (errmsg("could not send remote query \"%s\"", query), errdetail("Client error: %s", errorMessage))); @@ -366,7 +327,7 @@ MultiClientSendQuery(int32 connectionId, const char *query) bool MultiClientCancel(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; PGcancel *cancelObject = NULL; int cancelSent = 0; bool canceled = true; @@ -376,7 +337,7 @@ MultiClientCancel(int32 connectionId) connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - cancelObject = PQgetCancel(connection); + cancelObject = PQgetCancel(connection->pgConn); cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer)); if (cancelSent == 0) @@ -397,7 +358,7 @@ MultiClientCancel(int32 connectionId) ResultStatus MultiClientResultStatus(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; int consumed = 0; ConnStatusType connStatusType = CONNECTION_OK; ResultStatus resultStatus = CLIENT_INVALID_RESULT_STATUS; @@ -406,7 +367,7 @@ MultiClientResultStatus(int32 connectionId) connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - connStatusType = PQstatus(connection); + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_BAD) { ereport(WARNING, (errmsg("could not maintain connection to worker node"))); @@ -414,10 +375,10 @@ MultiClientResultStatus(int32 connectionId) } /* consume input to allow status change */ - consumed = PQconsumeInput(connection); + consumed = PQconsumeInput(connection->pgConn); if (consumed != 0) { - int connectionBusy = PQisBusy(connection); + int connectionBusy = PQisBusy(connection->pgConn); if (connectionBusy == 0) { resultStatus = CLIENT_RESULT_READY; @@ -442,7 +403,7 @@ bool MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, int *columnCount) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; PGresult *result = NULL; ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; @@ -451,14 +412,14 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - connStatusType = PQstatus(connection); + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_BAD) { ereport(WARNING, (errmsg("could not maintain connection to worker node"))); return false; } - result = PQgetResult(connection); + result = PQgetResult(connection->pgConn); resultStatus = PQresultStatus(result); if (resultStatus == PGRES_TUPLES_OK) { @@ -468,7 +429,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, } else { - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); PQclear(result); return false; @@ -494,7 +455,7 @@ BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, int *columnCount) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; PGresult *result = NULL; ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; @@ -509,14 +470,14 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, (*rowCount) = -1; (*columnCount) = -1; - connStatusType = PQstatus(connection); + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_BAD) { ereport(WARNING, (errmsg("could not maintain connection to worker node"))); return CLIENT_BATCH_QUERY_FAILED; } - result = PQgetResult(connection); + result = PQgetResult(connection->pgConn); if (result == NULL) { return CLIENT_BATCH_QUERY_DONE; @@ -537,7 +498,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, } else { - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); PQclear(result); queryStatus = CLIENT_BATCH_QUERY_FAILED; } @@ -576,7 +537,7 @@ MultiClientClearResult(void *queryResult) QueryStatus MultiClientQueryStatus(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; PGresult *result = NULL; int tupleCount PG_USED_FOR_ASSERTS_ONLY = 0; bool copyResults = false; @@ -588,7 +549,7 @@ MultiClientQueryStatus(int32 connectionId) connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - connStatusType = PQstatus(connection); + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_BAD) { ereport(WARNING, (errmsg("could not maintain connection to worker node"))); @@ -600,7 +561,7 @@ MultiClientQueryStatus(int32 connectionId) * isn't ready yet (the caller didn't wait for the connection to be ready), * we will block on this call. */ - result = PQgetResult(connection); + result = PQgetResult(connection->pgConn); resultStatus = PQresultStatus(result); if (resultStatus == PGRES_COMMAND_OK) @@ -631,7 +592,7 @@ MultiClientQueryStatus(int32 connectionId) copyResults = true; } - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); } /* clear the result object */ @@ -654,7 +615,7 @@ MultiClientQueryStatus(int32 connectionId) CopyStatus MultiClientCopyData(int32 connectionId, int32 fileDescriptor) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; char *receiveBuffer = NULL; int consumed = 0; int receiveLength = 0; @@ -669,7 +630,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) * Consume input to handle the case where previous copy operation might have * received zero bytes. */ - consumed = PQconsumeInput(connection); + consumed = PQconsumeInput(connection->pgConn); if (consumed == 0) { ereport(WARNING, (errmsg("could not read data from worker node"))); @@ -677,7 +638,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) } /* receive copy data message in an asynchronous manner */ - receiveLength = PQgetCopyData(connection, &receiveBuffer, asynchronous); + receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous); while (receiveLength > 0) { /* received copy data; append these data to file */ @@ -698,7 +659,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) PQfreemem(receiveBuffer); - receiveLength = PQgetCopyData(connection, &receiveBuffer, asynchronous); + receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous); } /* we now check the last received length returned by copy data */ @@ -710,7 +671,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) else if (receiveLength == -1) { /* received copy done message */ - PGresult *result = PQgetResult(connection); + PGresult *result = PQgetResult(connection->pgConn); ExecStatusType resultStatus = PQresultStatus(result); if (resultStatus == PGRES_COMMAND_OK) @@ -721,7 +682,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) { copyStatus = CLIENT_COPY_FAILED; - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); } PQclear(result); @@ -731,7 +692,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) /* received an error */ copyStatus = CLIENT_COPY_FAILED; - WarnRemoteError(connection, NULL); + ReportConnectionError(connection, WARNING); } /* if copy out completed, make sure we drain all results from libpq */ @@ -794,7 +755,7 @@ void MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus, int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; struct pollfd *pollfd = NULL; Assert(waitInfo->registeredWaiters < waitInfo->maxWaiters); @@ -812,7 +773,7 @@ MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus, connection = ClientConnectionArray[connectionId]; pollfd = &waitInfo->pollfds[waitInfo->registeredWaiters]; - pollfd->fd = PQsocket(connection); + pollfd->fd = PQsocket(connection->pgConn); if (executionStatus == TASK_STATUS_SOCKET_READ) { pollfd->events = POLLERR | POLLIN; @@ -904,13 +865,13 @@ MultiClientWait(WaitInfo *waitInfo) * query. */ static void -ClearRemainingResults(PGconn *connection) +ClearRemainingResults(MultiConnection *connection) { - PGresult *result = PQgetResult(connection); + PGresult *result = PQgetResult(connection->pgConn); while (result != NULL) { PQclear(result); - result = PQgetResult(connection); + result = PQgetResult(connection->pgConn); } } @@ -921,7 +882,8 @@ ClearRemainingResults(PGconn *connection) * and libpq_select() at libpqwalreceiver.c. */ static bool -ClientConnectionReady(PGconn *connection, PostgresPollingStatusType pollingStatus) +ClientConnectionReady(MultiConnection *connection, + PostgresPollingStatusType pollingStatus) { bool clientConnectionReady = false; int pollResult = 0; @@ -942,7 +904,7 @@ ClientConnectionReady(PGconn *connection, PostgresPollingStatusType pollingStatu pollEventMask = POLLERR | POLLOUT; } - pollFileDescriptor.fd = PQsocket(connection); + pollFileDescriptor.fd = PQsocket(connection->pgConn); pollFileDescriptor.events = pollEventMask; pollFileDescriptor.revents = 0; diff --git a/src/include/distributed/multi_client_executor.h b/src/include/distributed/multi_client_executor.h index e9658ee22..56d889f4d 100644 --- a/src/include/distributed/multi_client_executor.h +++ b/src/include/distributed/multi_client_executor.h @@ -15,10 +15,8 @@ #define MULTI_CLIENT_EXECUTOR_H #define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */ -#define CLIENT_CONNECT_TIMEOUT 5 /* connection timeout in seconds */ #define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */ #define STRING_BUFFER_SIZE 1024 /* buffer size for character arrays */ -#define CONN_INFO_TEMPLATE "host=%s port=%u dbname=%s user=%s connect_timeout=%u" /* Enumeration to track one client connection's status */