diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index c0210b92d..ee5b115ef 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -16,7 +16,7 @@ DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir) DATA_built = $(foreach v,$(EXTVERSIONS),$(EXTENSION)--$(v).sql) # directories with source files -SUBDIRS = . commands executor master metadata planner relay test transaction utils worker +SUBDIRS = . commands connection executor master metadata planner relay test transaction utils worker # That patsubst rule searches all directories listed in SUBDIRS for .c # files, and adds the corresponding .o files to OBJS diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index f8745d466..a36423881 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -329,13 +329,13 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag) PQclear(queryResult); /* close the connection */ - PQfinish(masterConnection); + CloseConnectionByPGconn(masterConnection); masterConnection = NULL; } PG_CATCH(); { /* close the connection */ - PQfinish(masterConnection); + CloseConnectionByPGconn(masterConnection); masterConnection = NULL; PG_RE_THROW(); diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c new file mode 100644 index 000000000..b023fcbec --- /dev/null +++ b/src/backend/distributed/connection/connection_management.c @@ -0,0 +1,638 @@ +/*------------------------------------------------------------------------- + * + * connection_management.c + * Central management of connections and their life-cycle + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#ifdef HAVE_POLL_H +#include +#endif + +#include "libpq-fe.h" + +#include "miscadmin.h" + +#include "access/hash.h" +#include "commands/dbcommands.h" +#include "distributed/connection_management.h" +#include "distributed/metadata_cache.h" +#include "distributed/hash_helpers.h" +#include "mb/pg_wchar.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" + + +HTAB *ConnectionHash = NULL; +MemoryContext ConnectionContext = NULL; + +static uint32 ConnectionHashHash(const void *key, Size keysize); +static int ConnectionHashCompare(const void *a, const void *b, Size keysize); +static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); +static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); +static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); + + +/* + * Initialize per-backend connection management infrastructure. + */ +void +InitializeConnectionManagement(void) +{ + HASHCTL info; + uint32 hashFlags = 0; + + + /* + * Create a single context for connection and transaction related memory + * management. Doing so, instead of allocating in TopMemoryContext, makes + * it easier to associate used memory. + */ + ConnectionContext = AllocSetContextCreate(TopMemoryContext, "Connection Context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* create (host,port,user,database) -> [connection] hash */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ConnectionHashKey); + info.entrysize = sizeof(ConnectionHashEntry); + info.hash = ConnectionHashHash; + info.match = ConnectionHashCompare; + info.hcxt = ConnectionContext; + hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE); + + ConnectionHash = hash_create("citus connection cache (host,port,user,database)", + 64, &info, hashFlags); +} + + +/* + * Perform connection management activity after the end of a transaction. Both + * COMMIT and ABORT paths are handled here. + * + * This is called by Citus' global transaction callback. + */ +void +AfterXactConnectionHandling(bool isCommit) +{ + HASH_SEQ_STATUS status; + ConnectionHashEntry *entry; + + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + AfterXactHostConnectionHandling(entry, isCommit); + + /* + * NB: We leave the hash entry in place, even if there's no individual + * connections in it anymore. There seems no benefit in deleting it, + * and it'll save a bit of work in the next transaction. + */ + } +} + + +/* + * GetNodeConnection() establishes a connection to remote node, using default + * user and database. + * + * See StartNodeUserDatabaseConnection for details. + */ +MultiConnection * +GetNodeConnection(uint32 flags, const char *hostname, int32 port) +{ + return GetNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL); +} + + +/* + * StartNodeConnection initiates a connection to remote node, using default + * user and database. + * + * See StartNodeUserDatabaseConnection for details. + */ +MultiConnection * +StartNodeConnection(uint32 flags, const char *hostname, int32 port) +{ + return StartNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL); +} + + +/* + * GetNodeUserDatabaseConnection establishes connection to remote node. + * + * See StartNodeUserDatabaseConnection for details. + */ +MultiConnection * +GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const + char *user, const char *database) +{ + MultiConnection *connection; + + connection = StartNodeUserDatabaseConnection(flags, hostname, port, user, database); + + FinishConnectionEstablishment(connection); + + return connection; +} + + +/* + * StartNodeUserDatabaseConnection() initiates a connection to a remote node. + * + * If user or database are NULL, the current session's defaults are used. The + * following flags influence connection establishment behaviour: + * - SESSION_LIFESPAN - the connection should persist after transaction end + * - FORCE_NEW_CONNECTION - a new connection is required + * + * The returned connection has only been initiated, not fully + * established. That's useful to allow parallel connection establishment. If + * that's not desired use the Get* variant. + */ +MultiConnection * +StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const + char *user, const char *database) +{ + ConnectionHashKey key; + ConnectionHashEntry *entry = NULL; + MultiConnection *connection; + MemoryContext oldContext; + bool found; + + /* do some minimal input checks */ + strlcpy(key.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + + key.port = port; + if (user) + { + strlcpy(key.user, user, NAMEDATALEN); + } + else + { + strlcpy(key.user, CurrentUserName(), NAMEDATALEN); + } + if (database) + { + strlcpy(key.database, database, NAMEDATALEN); + } + else + { + strlcpy(key.database, get_database_name(MyDatabaseId), NAMEDATALEN); + } + + if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE) + { + CurrentCoordinatedTransactionState = COORD_TRANS_IDLE; + } + + /* + * Lookup relevant hash entry. We always enter. If only a cached + * connection is desired, and there's none, we'll simply leave the + * connection list empty. + */ + + entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); + if (!found) + { + entry->connections = MemoryContextAlloc(ConnectionContext, + sizeof(dlist_head)); + dlist_init(entry->connections); + } + + /* if desired, check whether there's a usable connection */ + if (!(flags & FORCE_NEW_CONNECTION)) + { + /* check connection cache for a connection that's not already in use */ + connection = FindAvailableConnection(entry->connections, flags); + if (connection) + { + if (flags & SESSION_LIFESPAN) + { + connection->sessionLifespan = true; + } + + return connection; + } + } + + /* + * Either no caching desired, or no pre-established, non-claimed, + * connection present. Initiate connection establishment. + */ + connection = StartConnectionEstablishment(&key); + + oldContext = MemoryContextSwitchTo(ConnectionContext); + dlist_push_tail(entry->connections, &connection->connectionNode); + + MemoryContextSwitchTo(oldContext); + + if (flags & SESSION_LIFESPAN) + { + connection->sessionLifespan = true; + } + + return connection; +} + + +/* StartNodeUserDatabaseConnection() helper */ +static MultiConnection * +FindAvailableConnection(dlist_head *connections, uint32 flags) +{ + dlist_iter iter; + + dlist_foreach(iter, connections) + { + MultiConnection *connection = + dlist_container(MultiConnection, connectionNode, iter.cur); + + /* don't return claimed connections */ + if (connection->claimedExclusively) + { + continue; + } + + return connection; + } + + return NULL; +} + + +/* + * Return MultiConnection associated with the libpq connection. + * + * Note that this is comparatively expensive. Should only be used for + * backward-compatibility purposes. + */ +MultiConnection * +GetConnectionFromPGconn(struct pg_conn *pqConn) +{ + HASH_SEQ_STATUS status; + ConnectionHashEntry *entry; + + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + dlist_head *connections = entry->connections; + dlist_iter iter; + + /* check connection cache for a connection that's not already in use */ + dlist_foreach(iter, connections) + { + MultiConnection *connection = + dlist_container(MultiConnection, connectionNode, iter.cur); + + if (connection->pgConn == pqConn) + { + hash_seq_term(&status); + return connection; + } + } + } + + return NULL; +} + + +/* + * Close a previously established connection. + */ +void +CloseConnection(MultiConnection *connection) +{ + ConnectionHashKey key; + bool found; + + /* close connection */ + PQfinish(connection->pgConn); + connection->pgConn = NULL; + + strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH); + key.port = connection->port; + strlcpy(key.user, connection->user, NAMEDATALEN); + strlcpy(key.database, connection->database, NAMEDATALEN); + + hash_search(ConnectionHash, &key, HASH_FIND, &found); + + if (found) + { + /* unlink from list */ + dlist_delete(&connection->connectionNode); + + /* we leave the per-host entry alive */ + pfree(connection); + } + else + { + ereport(ERROR, (errmsg("closing untracked connection"))); + } +} + + +/* + * Close a previously established connection. + * + * This function closes the MultiConnection associatated with the libpq + * connection. + * + * Note that this is comparatively expensive. Should only be used for + * backward-compatibility purposes. + */ +void +CloseConnectionByPGconn(PGconn *pqConn) +{ + MultiConnection *connection = GetConnectionFromPGconn(pqConn); + + if (connection) + { + CloseConnection(connection); + } + else + { + ereport(WARNING, (errmsg("could not find connection to close"))); + } +} + + +/* + * Synchronously finish connection establishment of an individual connection. + * + * TODO: Replace with variant waiting for multiple connections. + */ +void +FinishConnectionEstablishment(MultiConnection *connection) +{ + static int checkIntervalMS = 200; + + /* + * Loop until connection is established, or failed (possibly just timed + * out). + */ + while (true) + { + ConnStatusType status = PQstatus(connection->pgConn); + PostgresPollingStatusType pollmode; + + if (status == CONNECTION_OK) + { + return; + } + + /* FIXME: retries? */ + if (status == CONNECTION_BAD) + { + return; + } + + pollmode = PQconnectPoll(connection->pgConn); + + /* + * FIXME: Do we want to add transparent retry support here? + */ + if (pollmode == PGRES_POLLING_FAILED) + { + return; + } + else if (pollmode == PGRES_POLLING_OK) + { + return; + } + else + { + Assert(pollmode == PGRES_POLLING_WRITING || + pollmode == PGRES_POLLING_READING); + } + + /* Loop, to handle poll() being interrupted by signals (EINTR) */ + while (true) + { + struct pollfd pollFileDescriptor; + int pollResult = 0; + + pollFileDescriptor.fd = PQsocket(connection->pgConn); + if (pollmode == PGRES_POLLING_READING) + { + pollFileDescriptor.events = POLLIN; + } + else + { + pollFileDescriptor.events = POLLOUT; + } + pollFileDescriptor.revents = 0; + + /* + * Only sleep for a limited amount of time, so we can react to + * interrupts in time, even if the platform doesn't interrupt + * poll() after signal arrival. + */ + pollResult = poll(&pollFileDescriptor, 1, checkIntervalMS); + + if (pollResult == 0) + { + /* + * Timeout exceeded. Two things to do: + * - check whether any interrupts arrived and handle them + * - check whether establishment for connection already has + * lasted for too long, stop waiting if so. + */ + CHECK_FOR_INTERRUPTS(); + + if (TimestampDifferenceExceeds(connection->connectionStart, + GetCurrentTimestamp(), + CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000)) + { + ereport(WARNING, (errmsg("could not establish connection after %u ms", + CLIENT_CONNECT_TIMEOUT_SECONDS_INT * 1000))); + + /* close connection, otherwise we take up resource on the other side */ + PQfinish(connection->pgConn); + connection->pgConn = NULL; + break; + } + } + else if (pollResult > 0) + { + /* + * IO possible, continue connection establishment. We could + * check for timeouts here as well, but if there's progress + * there seems little point. + */ + break; + } + else if (pollResult != EINTR) + { + /* Retrying, signal interrupted. So check. */ + CHECK_FOR_INTERRUPTS(); + } + else + { + /* + * We ERROR here, instead of just returning a failed + * connection, because this shouldn't happen, and indicates a + * programming error somewhere, not a network etc. issue. + */ + ereport(ERROR, (errcode_for_socket_access(), + errmsg("poll() failed: %m"))); + } + } + } +} + + +/* + * ClaimConnectionExclusively signals that this connection is actively being + * used. That means it'll not be, again, returned by + * StartNodeUserDatabaseConnection() et al until releases with + * UnclaimConnection(). + */ +void +ClaimConnectionExclusively(MultiConnection *connection) +{ + Assert(!connection->claimedExclusively); + connection->claimedExclusively = true; +} + + +/* + * UnclaimConnection signals that this connection is not being used + * anymore. That means it again may be returned by + * StartNodeUserDatabaseConnection() et al. + */ +void +UnclaimConnection(MultiConnection *connection) +{ + connection->claimedExclusively = false; +} + + +static uint32 +ConnectionHashHash(const void *key, Size keysize) +{ + ConnectionHashKey *entry = (ConnectionHashKey *) key; + uint32 hash = 0; + + hash = string_hash(entry->hostname, NAMEDATALEN); + hash = hash_combine(hash, hash_uint32(entry->port)); + hash = hash_combine(hash, string_hash(entry->user, NAMEDATALEN)); + hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN)); + + return hash; +} + + +static int +ConnectionHashCompare(const void *a, const void *b, Size keysize) +{ + ConnectionHashKey *ca = (ConnectionHashKey *) a; + ConnectionHashKey *cb = (ConnectionHashKey *) b; + + if (strncmp(ca->hostname, cb->hostname, NAMEDATALEN) != 0 || + ca->port != cb->port || + strncmp(ca->user, cb->user, NAMEDATALEN) != 0 || + strncmp(ca->database, cb->database, NAMEDATALEN) != 0) + { + return 1; + } + else + { + return 0; + } +} + + +/* + * Asynchronously establish connection to a remote node, but don't wait for + * that to finish. DNS lookups etc. are performed synchronously though. + */ +static MultiConnection * +StartConnectionEstablishment(ConnectionHashKey *key) +{ + char nodePortString[12]; + const char *clientEncoding = GetDatabaseEncodingName(); + MultiConnection *connection = NULL; + + const char *keywords[] = { + "host", "port", "dbname", "user", + "client_encoding", "fallback_application_name", + NULL + }; + const char *values[] = { + key->hostname, nodePortString, key->database, key->user, + clientEncoding, "citus", NULL + }; + + connection = MemoryContextAllocZero(ConnectionContext, sizeof(MultiConnection)); + sprintf(nodePortString, "%d", key->port); + + strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH); + connection->port = key->port; + strlcpy(connection->database, key->database, NAMEDATALEN); + strlcpy(connection->user, key->user, NAMEDATALEN); + + connection->pgConn = PQconnectStartParams(keywords, values, false); + connection->connectionStart = GetCurrentTimestamp(); + + return connection; +} + + +/* + * Close all remote connections if necessary anymore (i.e. not session + * lifetime), or if in a failed state. + */ +static void +AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) +{ + dlist_mutable_iter iter; + + dlist_foreach_modify(iter, entry->connections) + { + MultiConnection *connection = + dlist_container(MultiConnection, connectionNode, iter.cur); + + /* + * To avoid code leaking connections we warn if connections are + * still claimed exclusively. We can only do so if the transaction + * committed, as it's normal that code didn't have chance to clean + * up after errors. + */ + if (isCommit && connection->claimedExclusively) + { + ereport(WARNING, + (errmsg("connection claimed exclusively at transaction commit"))); + } + + /* + * Preserve session lifespan connections if they are still healthy. + */ + if (!connection->sessionLifespan || + PQstatus(connection->pgConn) != CONNECTION_OK || + PQtransactionStatus(connection->pgConn) != PQTRANS_IDLE) + { + PQfinish(connection->pgConn); + connection->pgConn = NULL; + + /* unlink from list */ + dlist_delete(iter.cur); + + pfree(connection); + } + else + { + UnclaimConnection(connection); + } + } +} diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c new file mode 100644 index 000000000..22ba1c9f7 --- /dev/null +++ b/src/backend/distributed/connection/remote_commands.c @@ -0,0 +1,195 @@ +/*------------------------------------------------------------------------- + * + * remote_commands.c + * Helpers to make it easier to execute command on remote nodes. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "libpq-fe.h" + +#include "distributed/connection_management.h" +#include "distributed/remote_commands.h" + + +/* GUC, determining whether statements sent to remote nodes are logged */ +bool LogRemoteCommands = false; + + +/* simple helpers */ + +/* + * IsResponseOK checks whether the result is a successful one. + */ +bool +IsResponseOK(PGresult *result) +{ + ExecStatusType resultStatus = PQresultStatus(result); + + if (resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK || + resultStatus == PGRES_COMMAND_OK) + { + return true; + } + + return false; +} + + +/* + * ForgetResults clears a connection from pending activity. + * + * XXX: In the future it might be a good idea to use use PQcancel() if results + * would require network IO. + */ +void +ForgetResults(MultiConnection *connection) +{ + while (true) + { + PGresult *result = NULL; + result = PQgetResult(connection->pgConn); + if (result == NULL) + { + break; + } + if (PQresultStatus(result) == PGRES_COPY_IN) + { + PQputCopyEnd(connection->pgConn, NULL); + + /* TODO: mark transaction as failed, once we can. */ + } + PQclear(result); + } +} + + +/* + * SqlStateMatchesCategory returns true if the given sql state (which may be + * NULL if unknown) is in the given error category. Note that we use + * ERRCODE_TO_CATEGORY macro to determine error category of the sql state and + * expect the caller to use the same macro for the error category. + */ +bool +SqlStateMatchesCategory(char *sqlStateString, int category) +{ + bool sqlStateMatchesCategory = false; + int sqlState = 0; + int sqlStateCategory = 0; + + if (sqlStateString == NULL) + { + return false; + } + + sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2], + sqlStateString[3], sqlStateString[4]); + + sqlStateCategory = ERRCODE_TO_CATEGORY(sqlState); + if (sqlStateCategory == category) + { + sqlStateMatchesCategory = true; + } + + return sqlStateMatchesCategory; +} + + +/* report errors & warnings */ + +/* + * Report libpq failure that's not associated with a result. + */ +void +ReportConnectionError(MultiConnection *connection, int elevel) +{ + char *nodeName = connection->hostname; + int nodePort = connection->port; + + ereport(elevel, (errmsg("connection error: %s:%d", nodeName, nodePort), + errdetail("%s", PQerrorMessage(connection->pgConn)))); +} + + +/* + * ReportResultError reports libpq failure associated with a result. + */ +void +ReportResultError(MultiConnection *connection, PGresult *result, int elevel) +{ + char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); + char *messagePrimary = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY); + char *messageDetail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL); + char *messageHint = PQresultErrorField(result, PG_DIAG_MESSAGE_HINT); + char *messageContext = PQresultErrorField(result, PG_DIAG_CONTEXT); + + char *nodeName = connection->hostname; + int nodePort = connection->port; + int sqlState = ERRCODE_INTERNAL_ERROR; + + if (sqlStateString != NULL) + { + sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2], + sqlStateString[3], sqlStateString[4]); + } + + /* + * If the PGresult did not contain a message, the connection may provide a + * suitable top level one. At worst, this is an empty string. + */ + if (messagePrimary == NULL) + { + char *lastNewlineIndex = NULL; + + messagePrimary = PQerrorMessage(connection->pgConn); + lastNewlineIndex = strrchr(messagePrimary, '\n'); + + /* trim trailing newline, if any */ + if (lastNewlineIndex != NULL) + { + *lastNewlineIndex = '\0'; + } + } + + ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary), + messageDetail ? errdetail("%s", messageDetail) : 0, + messageHint ? errhint("%s", messageHint) : 0, + messageContext ? errcontext("%s", messageContext) : 0, + errcontext("while executing command on %s:%d", + nodeName, nodePort))); +} + + +/* + * LogRemoteCommand logs commands send to remote nodes if + * citus.log_remote_commands wants us to do so. + */ +void +LogRemoteCommand(MultiConnection *connection, const char *command) +{ + if (!LogRemoteCommands) + { + return; + } + + ereport(LOG, (errmsg("issuing %s", command), + errdetail("on server %s:%d", connection->hostname, connection->port))); +} + + +/* wrappers around libpq functions, with command logging support */ + +/* + * SendRemoteCommand is a tiny PQsendQuery wrapper that logs remote commands, + * and accepts a MultiConnection instead of a plain PGconn. + */ +int +SendRemoteCommand(MultiConnection *connection, const char *command) +{ + LogRemoteCommand(connection, command); + return PQsendQuery(connection->pgConn, command); +} diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index da852d5c1..539b8060d 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -20,8 +20,10 @@ #include "commands/dbcommands.h" #include "distributed/metadata_cache.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_server_executor.h" +#include "distributed/remote_commands.h" #include #include @@ -38,7 +40,7 @@ /* Local pool to track active connections */ -static PGconn *ClientConnectionArray[MAX_CONNECTION_COUNT]; +static MultiConnection *ClientConnectionArray[MAX_CONNECTION_COUNT]; /* * The value at any position on ClientPollingStatusArray is only defined when @@ -48,8 +50,8 @@ static PostgresPollingStatusType ClientPollingStatusArray[MAX_CONNECTION_COUNT]; /* Local functions forward declarations */ -static void ClearRemainingResults(PGconn *connection); -static bool ClientConnectionReady(PGconn *connection, +static void ClearRemainingResults(MultiConnection *connection); +static bool ClientConnectionReady(MultiConnection *connection, PostgresPollingStatusType pollingStatus); @@ -63,7 +65,7 @@ AllocateConnectionId(void) /* allocate connectionId from connection pool */ for (connIndex = 0; connIndex < MAX_CONNECTION_COUNT; connIndex++) { - PGconn *connection = ClientConnectionArray[connIndex]; + MultiConnection *connection = ClientConnectionArray[connIndex]; if (connection == NULL) { connectionId = connIndex; @@ -87,12 +89,16 @@ int32 MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDatabase, const char *userName) { - PGconn *connection = NULL; - char connInfoString[STRING_BUFFER_SIZE]; + MultiConnection *connection = NULL; ConnStatusType connStatusType = CONNECTION_OK; int32 connectionId = AllocateConnectionId(); - char *effectiveDatabaseName = NULL; - char *effectiveUserName = NULL; + int connectionFlags = FORCE_NEW_CONNECTION; /* no cached connections for now */ + + if (connectionId == INVALID_CONNECTION_ID) + { + ereport(WARNING, (errmsg("could not allocate connection in connection pool"))); + return connectionId; + } if (XactModificationLevel > XACT_MODIFICATION_NONE) { @@ -101,46 +107,11 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba "command within a transaction"))); } - if (connectionId == INVALID_CONNECTION_ID) - { - ereport(WARNING, (errmsg("could not allocate connection in connection pool"))); - return connectionId; - } - - if (nodeDatabase == NULL) - { - effectiveDatabaseName = get_database_name(MyDatabaseId); - } - else - { - effectiveDatabaseName = pstrdup(nodeDatabase); - } - - if (userName == NULL) - { - effectiveUserName = CurrentUserName(); - } - else - { - effectiveUserName = pstrdup(userName); - } - - /* - * FIXME: This code is bad on several levels. It completely forgoes any - * escaping, it misses setting a number of parameters, it works with a - * limited string size without erroring when it's too long. We shouldn't - * even build a query string this way, there's PQconnectdbParams()! - */ - - /* transcribe connection paremeters to string */ - snprintf(connInfoString, STRING_BUFFER_SIZE, CONN_INFO_TEMPLATE, - nodeName, nodePort, - effectiveDatabaseName, effectiveUserName, - CLIENT_CONNECT_TIMEOUT); - /* establish synchronous connection to worker node */ - connection = PQconnectdb(connInfoString); - connStatusType = PQstatus(connection); + connection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, + userName, nodeDatabase); + + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_OK) { @@ -148,15 +119,11 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba } else { - WarnRemoteError(connection, NULL); - - PQfinish(connection); + ReportConnectionError(connection, WARNING); + CloseConnection(connection); connectionId = INVALID_CONNECTION_ID; } - pfree(effectiveDatabaseName); - pfree(effectiveUserName); - return connectionId; } @@ -169,12 +136,11 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeDatabase) { - PGconn *connection = NULL; - char connInfoString[STRING_BUFFER_SIZE]; - ConnStatusType connStatusType = CONNECTION_BAD; - char *userName = CurrentUserName(); - + MultiConnection *connection = NULL; + ConnStatusType connStatusType = CONNECTION_OK; int32 connectionId = AllocateConnectionId(); + int connectionFlags = FORCE_NEW_CONNECTION; /* no cached connections for now */ + if (connectionId == INVALID_CONNECTION_ID) { ereport(WARNING, (errmsg("could not allocate connection in connection pool"))); @@ -188,13 +154,9 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD "command within a transaction"))); } - /* transcribe connection paremeters to string */ - snprintf(connInfoString, STRING_BUFFER_SIZE, CONN_INFO_TEMPLATE, - nodeName, nodePort, nodeDatabase, userName, CLIENT_CONNECT_TIMEOUT); - /* prepare asynchronous request for worker node connection */ - connection = PQconnectStart(connInfoString); - connStatusType = PQstatus(connection); + connection = StartNodeConnection(connectionFlags, nodeName, nodePort); + connStatusType = PQstatus(connection->pgConn); /* * If prepared, we save the connection, and set its initial polling status @@ -208,9 +170,9 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD } else { - WarnRemoteError(connection, NULL); + ReportConnectionError(connection, WARNING); + CloseConnection(connection); - PQfinish(connection); connectionId = INVALID_CONNECTION_ID; } @@ -222,7 +184,7 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD ConnectStatus MultiClientConnectPoll(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; PostgresPollingStatusType pollingStatus = PGRES_POLLING_OK; ConnectStatus connectStatus = CLIENT_INVALID_CONNECT; @@ -240,7 +202,7 @@ MultiClientConnectPoll(int32 connectionId) bool readReady = ClientConnectionReady(connection, PGRES_POLLING_READING); if (readReady) { - ClientPollingStatusArray[connectionId] = PQconnectPoll(connection); + ClientPollingStatusArray[connectionId] = PQconnectPoll(connection->pgConn); connectStatus = CLIENT_CONNECTION_BUSY; } else @@ -253,7 +215,7 @@ MultiClientConnectPoll(int32 connectionId) bool writeReady = ClientConnectionReady(connection, PGRES_POLLING_WRITING); if (writeReady) { - ClientPollingStatusArray[connectionId] = PQconnectPoll(connection); + ClientPollingStatusArray[connectionId] = PQconnectPoll(connection->pgConn); connectStatus = CLIENT_CONNECTION_BUSY; } else @@ -263,7 +225,7 @@ MultiClientConnectPoll(int32 connectionId) } else if (pollingStatus == PGRES_POLLING_FAILED) { - WarnRemoteError(connection, NULL); + ReportConnectionError(connection, WARNING); connectStatus = CLIENT_CONNECTION_BAD; } @@ -276,14 +238,14 @@ MultiClientConnectPoll(int32 connectionId) void MultiClientDisconnect(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; const int InvalidPollingStatus = -1; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - PQfinish(connection); + CloseConnection(connection); ClientConnectionArray[connectionId] = NULL; ClientPollingStatusArray[connectionId] = InvalidPollingStatus; @@ -297,7 +259,7 @@ MultiClientDisconnect(int32 connectionId) bool MultiClientConnectionUp(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; ConnStatusType connStatusType = CONNECTION_OK; bool connectionUp = true; @@ -305,7 +267,7 @@ MultiClientConnectionUp(int32 connectionId) connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - connStatusType = PQstatus(connection); + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_BAD) { connectionUp = false; @@ -339,7 +301,7 @@ MultiClientExecute(int32 connectionId, const char *query, void **queryResult, bool MultiClientSendQuery(int32 connectionId, const char *query) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; bool success = true; int querySent = 0; @@ -347,10 +309,10 @@ MultiClientSendQuery(int32 connectionId, const char *query) connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - querySent = PQsendQuery(connection, query); + querySent = PQsendQuery(connection->pgConn, query); if (querySent == 0) { - char *errorMessage = PQerrorMessage(connection); + char *errorMessage = PQerrorMessage(connection->pgConn); ereport(WARNING, (errmsg("could not send remote query \"%s\"", query), errdetail("Client error: %s", errorMessage))); @@ -365,7 +327,7 @@ MultiClientSendQuery(int32 connectionId, const char *query) bool MultiClientCancel(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; PGcancel *cancelObject = NULL; int cancelSent = 0; bool canceled = true; @@ -375,7 +337,7 @@ MultiClientCancel(int32 connectionId) connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - cancelObject = PQgetCancel(connection); + cancelObject = PQgetCancel(connection->pgConn); cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer)); if (cancelSent == 0) @@ -396,7 +358,7 @@ MultiClientCancel(int32 connectionId) ResultStatus MultiClientResultStatus(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; int consumed = 0; ConnStatusType connStatusType = CONNECTION_OK; ResultStatus resultStatus = CLIENT_INVALID_RESULT_STATUS; @@ -405,7 +367,7 @@ MultiClientResultStatus(int32 connectionId) connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - connStatusType = PQstatus(connection); + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_BAD) { ereport(WARNING, (errmsg("could not maintain connection to worker node"))); @@ -413,10 +375,10 @@ MultiClientResultStatus(int32 connectionId) } /* consume input to allow status change */ - consumed = PQconsumeInput(connection); + consumed = PQconsumeInput(connection->pgConn); if (consumed != 0) { - int connectionBusy = PQisBusy(connection); + int connectionBusy = PQisBusy(connection->pgConn); if (connectionBusy == 0) { resultStatus = CLIENT_RESULT_READY; @@ -441,7 +403,7 @@ bool MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, int *columnCount) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; PGresult *result = NULL; ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; @@ -450,14 +412,14 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - connStatusType = PQstatus(connection); + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_BAD) { ereport(WARNING, (errmsg("could not maintain connection to worker node"))); return false; } - result = PQgetResult(connection); + result = PQgetResult(connection->pgConn); resultStatus = PQresultStatus(result); if (resultStatus == PGRES_TUPLES_OK) { @@ -467,7 +429,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, } else { - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); PQclear(result); return false; @@ -493,7 +455,7 @@ BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, int *columnCount) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; PGresult *result = NULL; ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; @@ -508,14 +470,14 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, (*rowCount) = -1; (*columnCount) = -1; - connStatusType = PQstatus(connection); + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_BAD) { ereport(WARNING, (errmsg("could not maintain connection to worker node"))); return CLIENT_BATCH_QUERY_FAILED; } - result = PQgetResult(connection); + result = PQgetResult(connection->pgConn); if (result == NULL) { return CLIENT_BATCH_QUERY_DONE; @@ -536,7 +498,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, } else { - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); PQclear(result); queryStatus = CLIENT_BATCH_QUERY_FAILED; } @@ -575,7 +537,7 @@ MultiClientClearResult(void *queryResult) QueryStatus MultiClientQueryStatus(int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; PGresult *result = NULL; int tupleCount PG_USED_FOR_ASSERTS_ONLY = 0; bool copyResults = false; @@ -587,7 +549,7 @@ MultiClientQueryStatus(int32 connectionId) connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - connStatusType = PQstatus(connection); + connStatusType = PQstatus(connection->pgConn); if (connStatusType == CONNECTION_BAD) { ereport(WARNING, (errmsg("could not maintain connection to worker node"))); @@ -599,7 +561,7 @@ MultiClientQueryStatus(int32 connectionId) * isn't ready yet (the caller didn't wait for the connection to be ready), * we will block on this call. */ - result = PQgetResult(connection); + result = PQgetResult(connection->pgConn); resultStatus = PQresultStatus(result); if (resultStatus == PGRES_COMMAND_OK) @@ -630,7 +592,7 @@ MultiClientQueryStatus(int32 connectionId) copyResults = true; } - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); } /* clear the result object */ @@ -653,7 +615,7 @@ MultiClientQueryStatus(int32 connectionId) CopyStatus MultiClientCopyData(int32 connectionId, int32 fileDescriptor) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; char *receiveBuffer = NULL; int consumed = 0; int receiveLength = 0; @@ -668,7 +630,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) * Consume input to handle the case where previous copy operation might have * received zero bytes. */ - consumed = PQconsumeInput(connection); + consumed = PQconsumeInput(connection->pgConn); if (consumed == 0) { ereport(WARNING, (errmsg("could not read data from worker node"))); @@ -676,7 +638,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) } /* receive copy data message in an asynchronous manner */ - receiveLength = PQgetCopyData(connection, &receiveBuffer, asynchronous); + receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous); while (receiveLength > 0) { /* received copy data; append these data to file */ @@ -697,7 +659,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) PQfreemem(receiveBuffer); - receiveLength = PQgetCopyData(connection, &receiveBuffer, asynchronous); + receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous); } /* we now check the last received length returned by copy data */ @@ -709,7 +671,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) else if (receiveLength == -1) { /* received copy done message */ - PGresult *result = PQgetResult(connection); + PGresult *result = PQgetResult(connection->pgConn); ExecStatusType resultStatus = PQresultStatus(result); if (resultStatus == PGRES_COMMAND_OK) @@ -720,7 +682,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) { copyStatus = CLIENT_COPY_FAILED; - WarnRemoteError(connection, result); + ReportResultError(connection, result, WARNING); } PQclear(result); @@ -730,7 +692,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) /* received an error */ copyStatus = CLIENT_COPY_FAILED; - WarnRemoteError(connection, NULL); + ReportConnectionError(connection, WARNING); } /* if copy out completed, make sure we drain all results from libpq */ @@ -793,7 +755,7 @@ void MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus, int32 connectionId) { - PGconn *connection = NULL; + MultiConnection *connection = NULL; struct pollfd *pollfd = NULL; Assert(waitInfo->registeredWaiters < waitInfo->maxWaiters); @@ -811,7 +773,7 @@ MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus, connection = ClientConnectionArray[connectionId]; pollfd = &waitInfo->pollfds[waitInfo->registeredWaiters]; - pollfd->fd = PQsocket(connection); + pollfd->fd = PQsocket(connection->pgConn); if (executionStatus == TASK_STATUS_SOCKET_READ) { pollfd->events = POLLERR | POLLIN; @@ -903,13 +865,13 @@ MultiClientWait(WaitInfo *waitInfo) * query. */ static void -ClearRemainingResults(PGconn *connection) +ClearRemainingResults(MultiConnection *connection) { - PGresult *result = PQgetResult(connection); + PGresult *result = PQgetResult(connection->pgConn); while (result != NULL) { PQclear(result); - result = PQgetResult(connection); + result = PQgetResult(connection->pgConn); } } @@ -920,7 +882,8 @@ ClearRemainingResults(PGconn *connection) * and libpq_select() at libpqwalreceiver.c. */ static bool -ClientConnectionReady(PGconn *connection, PostgresPollingStatusType pollingStatus) +ClientConnectionReady(MultiConnection *connection, + PostgresPollingStatusType pollingStatus) { bool clientConnectionReady = false; int pollResult = 0; @@ -941,7 +904,7 @@ ClientConnectionReady(PGconn *connection, PostgresPollingStatusType pollingStatu pollEventMask = POLLERR | POLLOUT; } - pollFileDescriptor.fd = PQsocket(connection); + pollFileDescriptor.fd = PQsocket(connection->pgConn); pollFileDescriptor.events = pollEventMask; pollFileDescriptor.revents = 0; diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index aee7e09a5..f2294fca0 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -30,6 +30,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" @@ -39,6 +40,7 @@ #include "distributed/multi_router_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/relay_utility.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -109,7 +111,7 @@ static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescrip Tuplestorestate *tupleStore); static PGconn * GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery); -static void PurgeConnectionForPlacement(ShardPlacement *placement); +static void PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement); static void RemoveXactConnection(PGconn *connection); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, @@ -275,7 +277,7 @@ InitTransactionStateForTask(Task *task) if (PQresultStatus(result) != PGRES_COMMAND_OK) { WarnRemoteError(connection, result); - PurgeConnection(connection); + CloseConnectionByPGconn(connection); connection = NULL; } @@ -792,7 +794,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { - PurgeConnectionForPlacement(taskPlacement); + PurgeConnectionForPlacement(connection, taskPlacement); failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; } @@ -850,7 +852,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, } else { - PurgeConnectionForPlacement(taskPlacement); + PurgeConnectionForPlacement(connection, taskPlacement); failedPlacementList = lappend(failedPlacementList, taskPlacement); @@ -954,8 +956,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn "commands"))); } - XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; - shardIntervalList = TaskShardIntervalList(taskList); /* ensure that there are no concurrent modifications on the same shards */ @@ -964,6 +964,8 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn /* open connection to all relevant placements, if not already open */ OpenTransactionsToAllShardPlacements(shardIntervalList, userName); + XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; + /* iterate over placements in rounds, to ensure in-order execution */ while (tasksPending) { @@ -1232,17 +1234,9 @@ GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery) * for the transaction in addition to purging the connection cache's entry. */ static void -PurgeConnectionForPlacement(ShardPlacement *placement) +PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement) { - NodeConnectionKey nodeKey; - char *currentUser = CurrentUserName(); - - MemSet(&nodeKey, 0, sizeof(NodeConnectionKey)); - strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); - nodeKey.nodePort = placement->nodePort; - strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN); - - PurgeConnectionByKey(&nodeKey); + CloseConnectionByPGconn(connection); /* * The following is logically identical to RemoveXactConnection, but since @@ -1254,7 +1248,13 @@ PurgeConnectionForPlacement(ShardPlacement *placement) { NodeConnectionEntry *participantEntry = NULL; bool entryFound = false; + NodeConnectionKey nodeKey; + char *currentUser = CurrentUserName(); + MemSet(&nodeKey, 0, sizeof(NodeConnectionKey)); + strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); + nodeKey.nodePort = placement->nodePort; + strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN); Assert(IsTransactionBlock()); /* the participant hash doesn't use the user field */ @@ -1797,7 +1797,6 @@ RouterTransactionCallback(XactEvent event, void *arg) } /* reset transaction state */ - XactModificationLevel = XACT_MODIFICATION_NONE; xactParticipantHash = NULL; xactShardConnSetList = NIL; subXactAbortAttempted = false; @@ -1861,7 +1860,7 @@ ExecuteTransactionEnd(bool commit) else { WarnRemoteError(connection, result); - PurgeConnection(participant->connection); + CloseConnectionByPGconn(participant->connection); participant->connection = NULL; } diff --git a/src/backend/distributed/master/master_citus_tools.c b/src/backend/distributed/master/master_citus_tools.c index d7d571e97..f8632d6a2 100644 --- a/src/backend/distributed/master/master_citus_tools.c +++ b/src/backend/distributed/master/master_citus_tools.c @@ -19,6 +19,7 @@ #include "access/htup_details.h" #include "catalog/pg_type.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/multi_server_executor.h" #include "distributed/worker_protocol.h" @@ -269,7 +270,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor { StoreErrorMessage(connection, queryResultString); statusArray[commandIndex] = false; - PQfinish(connection); + CloseConnectionByPGconn(connection); connectionArray[commandIndex] = NULL; finishedCount++; } @@ -298,7 +299,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor finishedCount++; statusArray[commandIndex] = success; connectionArray[commandIndex] = NULL; - PQfinish(connection); + CloseConnectionByPGconn(connection); } } @@ -509,7 +510,7 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, PQclear(queryResult); /* close the connection */ - PQfinish(nodeConnection); + CloseConnectionByPGconn(nodeConnection); nodeConnection = NULL; } PG_CATCH(); @@ -517,7 +518,7 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, StoreErrorMessage(nodeConnection, queryResultString); /* close the connection */ - PQfinish(nodeConnection); + CloseConnectionByPGconn(nodeConnection); nodeConnection = NULL; } PG_END_TRY(); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 7e1ffa904..60c4f3656 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -20,6 +20,7 @@ #include "executor/executor.h" #include "distributed/citus_nodefuncs.h" #include "distributed/commit_protocol.h" +#include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/multi_copy.h" #include "distributed/multi_executor.h" @@ -32,7 +33,9 @@ #include "distributed/multi_server_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/multi_utility.h" +#include "distributed/remote_commands.h" #include "distributed/task_tracker.h" +#include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "postmaster/postmaster.h" @@ -154,6 +157,10 @@ _PG_init(void) /* organize that task tracker is started once server is up */ TaskTrackerRegister(); + /* initialize coordinated transaction management */ + InitializeTransactionManagement(); + InitializeConnectionManagement(); + /* initialize transaction callbacks */ RegisterRouterExecutorXactCallbacks(); RegisterShardPlacementXactCallbacks(); @@ -270,6 +277,16 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.log_remote_commands", + gettext_noop("Log queries sent to other nodes in the server log"), + NULL, + &LogRemoteCommands, + false, + PGC_USERSET, + 0, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.explain_multi_logical_plan", gettext_noop("Enables Explain to print out distributed logical plans."), diff --git a/src/backend/distributed/test/connection_cache.c b/src/backend/distributed/test/connection_cache.c index 8c46d0dd5..e88204c5a 100644 --- a/src/backend/distributed/test/connection_cache.c +++ b/src/backend/distributed/test/connection_cache.c @@ -21,6 +21,7 @@ #include "catalog/pg_type.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/test_helper_functions.h" /* IWYU pragma: keep */ #include "utils/elog.h" @@ -124,7 +125,7 @@ get_and_purge_connection(PG_FUNCTION_ARGS) PG_RETURN_BOOL(false); } - PurgeConnection(connection); + CloseConnectionByPGconn(connection); PG_RETURN_BOOL(true); } @@ -148,7 +149,7 @@ connect_and_purge_connection(PG_FUNCTION_ARGS) PG_RETURN_BOOL(false); } - PurgeConnection(connection); + CloseConnectionByPGconn(connection); PG_RETURN_BOOL(true); } diff --git a/src/backend/distributed/transaction/commit_protocol.c b/src/backend/distributed/transaction/commit_protocol.c index 1c5ac7417..dd81d6f5c 100644 --- a/src/backend/distributed/transaction/commit_protocol.c +++ b/src/backend/distributed/transaction/commit_protocol.c @@ -26,10 +26,6 @@ static uint32 DistributedTransactionId = 0; -/* the commit protocol to use for COPY commands */ -int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC; - - /* * InitializeDistributedTransaction prepares the distributed transaction ID * used in transaction names. diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index 3829746ba..9f3d15c0f 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -16,6 +16,7 @@ #include "distributed/colocation_utils.h" #include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_shard_transaction.h" @@ -332,7 +333,6 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg) CloseConnections(connectionList); shardConnectionHash = NULL; - XactModificationLevel = XACT_MODIFICATION_NONE; subXactAbortAttempted = false; } @@ -362,6 +362,6 @@ CloseConnections(List *connectionList) (TransactionConnection *) lfirst(connectionCell); PGconn *connection = transactionConnection->connection; - PQfinish(connection); + CloseConnectionByPGconn(connection); } } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c new file mode 100644 index 000000000..ef0dcc2e3 --- /dev/null +++ b/src/backend/distributed/transaction/transaction_management.c @@ -0,0 +1,144 @@ +/*------------------------------------------------------------------------- + * + * transaction_management.c + * + * Transaction management for Citus. Most of the work is delegated to other + * subsystems, this files, and especially CoordinatedTransactionCallback, + * coordinates the work between them. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "libpq-fe.h" + +#include "miscadmin.h" + +#include "access/xact.h" +#include "distributed/connection_management.h" +#include "distributed/hash_helpers.h" +#include "distributed/transaction_management.h" +#include "utils/hsearch.h" + + +CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE; + +/* GUC, the commit protocol to use for commands affecting more than one connection */ +int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC; + +/* state needed to keep track of operations used during a transaction */ +XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; + + +static bool subXactAbortAttempted = false; + + +/* transaction management functions */ +static void CoordinatedTransactionCallback(XactEvent event, void *arg); +static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, + SubTransactionId parentSubid, void *arg); + + +void +InitializeTransactionManagement(void) +{ + /* hook into transaction machinery */ + RegisterXactCallback(CoordinatedTransactionCallback, NULL); + RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL); +} + + +/* + * Transaction management callback, handling coordinated transaction, and + * transaction independent connection management. + * + * NB: There should only ever be a single transaction callback in citus, the + * ordering between the callbacks and thee actions within those callbacks + * otherwise becomes too undeterministic / hard to reason about. + */ +static void +CoordinatedTransactionCallback(XactEvent event, void *arg) +{ + switch (event) + { + case XACT_EVENT_COMMIT: + { + /* close connections etc. */ + if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) + { + AfterXactConnectionHandling(true); + } + + Assert(!subXactAbortAttempted); + CurrentCoordinatedTransactionState = COORD_TRANS_NONE; + XactModificationLevel = XACT_MODIFICATION_NONE; + } + break; + + case XACT_EVENT_ABORT: + { + /* close connections etc. */ + if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) + { + AfterXactConnectionHandling(false); + } + + CurrentCoordinatedTransactionState = COORD_TRANS_NONE; + XactModificationLevel = XACT_MODIFICATION_NONE; + subXactAbortAttempted = false; + } + break; + + case XACT_EVENT_PARALLEL_COMMIT: + case XACT_EVENT_PARALLEL_ABORT: + case XACT_EVENT_PREPARE: + { } + break; + + case XACT_EVENT_PRE_COMMIT: + { + if (subXactAbortAttempted) + { + subXactAbortAttempted = false; + + if (XactModificationLevel != XACT_MODIFICATION_NONE) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot ROLLBACK TO SAVEPOINT in transactions " + "which modify distributed tables"))); + } + } + } + break; + + case XACT_EVENT_PARALLEL_PRE_COMMIT: + case XACT_EVENT_PRE_PREPARE: + { + if (CurrentCoordinatedTransactionState > COORD_TRANS_NONE) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use 2PC in transactions involving " + "multiple servers"))); + } + } + break; + } +} + + +/* + * Subtransaction callback - currently only used to remember whether a + * savepoint has been rolled back, as we don't support that. + */ +static void +CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, + SubTransactionId parentSubid, void *arg) +{ + if (event == SUBXACT_EVENT_ABORT_SUB) + { + subXactAbortAttempted = true; + } +} diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index ba5cebdb4..faca72802 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -233,12 +233,12 @@ SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char PQgetResult(workerConnection); /* we no longer need this connection */ - PQfinish(workerConnection); + CloseConnectionByPGconn(workerConnection); } PG_CATCH(); { /* close the connection */ - PQfinish(workerConnection); + CloseConnectionByPGconn(workerConnection); PG_RE_THROW(); } @@ -280,7 +280,7 @@ RemoveWorkerTransaction(char *nodeName, int32 nodePort) PGconn *connection = transactionConnection->connection; /* closing the connection will rollback all uncommited transactions */ - PQfinish(connection); + CloseConnectionByPGconn(connection); workerConnectionList = list_delete(workerConnectionList, transactionConnection); } diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index 28f97a8a7..7db3c35e4 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -2,7 +2,7 @@ * * connection_cache.c * - * This file contains functions to implement a connection hash. + * Legacy connection caching layer. Will be removed entirely. * * Copyright (c) 2014-2016, Citus Data, Inc. * @@ -19,8 +19,10 @@ #include #include "commands/dbcommands.h" +#include "distributed/connection_management.h" #include "distributed/connection_cache.h" #include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" #include "mb/pg_wchar.h" #include "utils/builtins.h" #include "utils/elog.h" @@ -30,18 +32,7 @@ #include "utils/palloc.h" -/* state needed to keep track of operations used during a transaction */ -XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; - -/* - * NodeConnectionHash is the connection hash itself. It begins uninitialized. - * The first call to GetOrEstablishConnection triggers hash creation. - */ -static HTAB *NodeConnectionHash = NULL; - - /* local function forward declarations */ -static HTAB * CreateNodeConnectionHash(void); static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError); @@ -60,77 +51,26 @@ static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseEr PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort) { + int connectionFlags = SESSION_LIFESPAN; PGconn *connection = NULL; - NodeConnectionKey nodeConnectionKey; - NodeConnectionEntry *nodeConnectionEntry = NULL; - bool entryFound = false; - bool needNewConnection = true; - char *userName = CurrentUserName(); + MultiConnection *multiConnection = + GetNodeConnection(connectionFlags, nodeName, nodePort); - /* check input */ - if (strnlen(nodeName, MAX_NODE_LENGTH + 1) > MAX_NODE_LENGTH) + if (PQstatus(multiConnection->pgConn) == CONNECTION_OK) { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("hostname exceeds the maximum length of %d", - MAX_NODE_LENGTH))); + connection = multiConnection->pgConn; } - - /* if first call, initialize the connection hash */ - if (NodeConnectionHash == NULL) + else { - NodeConnectionHash = CreateNodeConnectionHash(); - } - - memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey)); - strlcpy(nodeConnectionKey.nodeName, nodeName, MAX_NODE_LENGTH + 1); - nodeConnectionKey.nodePort = nodePort; - strlcpy(nodeConnectionKey.nodeUser, userName, NAMEDATALEN); - - nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey, - HASH_FIND, &entryFound); - if (entryFound) - { - connection = nodeConnectionEntry->connection; - if (PQstatus(connection) == CONNECTION_OK) - { - needNewConnection = false; - } - else - { - PurgeConnection(connection); - } - } - - if (needNewConnection) - { - connection = ConnectToNode(nodeName, nodePort, nodeConnectionKey.nodeUser); - if (connection != NULL) - { - nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey, - HASH_ENTER, &entryFound); - nodeConnectionEntry->connection = connection; - } + ReportConnectionError(multiConnection, WARNING); + CloseConnection(multiConnection); + connection = NULL; } return connection; } -/* - * PurgeConnection removes the given connection from the connection hash and - * closes it using PQfinish. If our hash does not contain the given connection, - * this method simply prints a warning and exits. - */ -void -PurgeConnection(PGconn *connection) -{ - NodeConnectionKey nodeConnectionKey; - - BuildKeyForConnection(connection, &nodeConnectionKey); - PurgeConnectionByKey(&nodeConnectionKey); -} - - /* * Utility method to simplify populating a connection cache key with relevant * fields from a provided connection. @@ -174,60 +114,6 @@ BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey) } -PGconn * -PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey) -{ - bool entryFound = false; - NodeConnectionEntry *nodeConnectionEntry = NULL; - PGconn *connection = NULL; - - if (NodeConnectionHash != NULL) - { - nodeConnectionEntry = hash_search(NodeConnectionHash, nodeConnectionKey, - HASH_REMOVE, &entryFound); - } - - if (entryFound) - { - connection = nodeConnectionEntry->connection; - PQfinish(nodeConnectionEntry->connection); - } - - return connection; -} - - -/* - * SqlStateMatchesCategory returns true if the given sql state (which may be - * NULL if unknown) is in the given error category. Note that we use - * ERRCODE_TO_CATEGORY macro to determine error category of the sql state and - * expect the caller to use the same macro for the error category. - */ -bool -SqlStateMatchesCategory(char *sqlStateString, int category) -{ - bool sqlStateMatchesCategory = false; - int sqlState = 0; - int sqlStateCategory = 0; - - if (sqlStateString == NULL) - { - return false; - } - - sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2], - sqlStateString[3], sqlStateString[4]); - - sqlStateCategory = ERRCODE_TO_CATEGORY(sqlState); - if (sqlStateCategory == category) - { - sqlStateMatchesCategory = true; - } - - return sqlStateMatchesCategory; -} - - /* * WarnRemoteError retrieves error fields from a remote result and produces an * error report at the WARNING level after amending the error with a CONTEXT @@ -296,13 +182,11 @@ ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError) } /* - * If requested, actually raise an error. This necessitates purging the - * connection so it doesn't remain in the hash in an invalid state. + * If requested, actually raise an error. */ if (raiseError) { errorLevel = ERROR; - PurgeConnection(connection); } if (sqlState == ERRCODE_CONNECTION_FAILURE) @@ -323,79 +207,34 @@ ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError) } -/* - * CreateNodeConnectionHash returns a newly created hash table suitable for - * storing unlimited connections indexed by node name and port. - */ -static HTAB * -CreateNodeConnectionHash(void) -{ - HTAB *nodeConnectionHash = NULL; - HASHCTL info; - int hashFlags = 0; - - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(NodeConnectionKey); - info.entrysize = sizeof(NodeConnectionEntry); - info.hash = tag_hash; - info.hcxt = CacheMemoryContext; - hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); - - nodeConnectionHash = hash_create("citus connection cache", 32, &info, hashFlags); - - return nodeConnectionHash; -} - - /* * ConnectToNode opens a connection to a remote PostgreSQL server. The function * configures the connection's fallback application name to 'citus' and sets * the remote encoding to match the local one. All parameters are required to * be non NULL. * - * We attempt to connect up to MAX_CONNECT_ATTEMPT times. After that we give up - * and return NULL. + * This is only a thin layer over connection_management.[ch], and will be + * removed soon. */ PGconn * ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser) { + /* don't want already established connections */ + int connectionFlags = FORCE_NEW_CONNECTION; PGconn *connection = NULL; - const char *clientEncoding = GetDatabaseEncodingName(); - const char *dbname = get_database_name(MyDatabaseId); - int attemptIndex = 0; + MultiConnection *multiConnection = + GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, nodeUser, + NULL); - const char *keywordArray[] = { - "host", "port", "fallback_application_name", - "client_encoding", "connect_timeout", "dbname", "user", NULL - }; - char nodePortString[12]; - const char *valueArray[] = { - nodeName, nodePortString, "citus", clientEncoding, - CLIENT_CONNECT_TIMEOUT_SECONDS, dbname, nodeUser, NULL - }; - - sprintf(nodePortString, "%d", nodePort); - - Assert(sizeof(keywordArray) == sizeof(valueArray)); - - for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++) + if (PQstatus(multiConnection->pgConn) == CONNECTION_OK) { - connection = PQconnectdbParams(keywordArray, valueArray, false); - if (PQstatus(connection) == CONNECTION_OK) - { - break; - } - else - { - /* warn if still erroring on final attempt */ - if (attemptIndex == MAX_CONNECT_ATTEMPTS - 1) - { - WarnRemoteError(connection, NULL); - } - - PQfinish(connection); - connection = NULL; - } + connection = multiConnection->pgConn; + } + else + { + ReportConnectionError(multiConnection, WARNING); + CloseConnection(multiConnection); + connection = NULL; } return connection; diff --git a/src/backend/distributed/utils/hash_helpers.c b/src/backend/distributed/utils/hash_helpers.c new file mode 100644 index 000000000..e74616b37 --- /dev/null +++ b/src/backend/distributed/utils/hash_helpers.c @@ -0,0 +1,34 @@ +/*------------------------------------------------------------------------- + * + * hash_helpers.c + * Helpers for dynahash.c style hash tables. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "distributed/hash_helpers.h" +#include "utils/hsearch.h" + + +/* + * Empty a hash, without destroying the hash table itself. + */ +void +hash_delete_all(HTAB *htab) +{ + HASH_SEQ_STATUS status; + void *entry = NULL; + + hash_seq_init(&status, htab); + while ((entry = hash_seq_search(&status)) != 0) + { + bool found = false; + + hash_search(htab, entry, HASH_REMOVE, &found); + Assert(found); + } +} diff --git a/src/include/distributed/commit_protocol.h b/src/include/distributed/commit_protocol.h index 140b0ade0..5805dfe9f 100644 --- a/src/include/distributed/commit_protocol.h +++ b/src/include/distributed/commit_protocol.h @@ -14,18 +14,12 @@ #include "access/xact.h" +#include "distributed/connection_management.h" #include "libpq-fe.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" -/* Enumeration that defines the different commit protocols available */ -typedef enum -{ - COMMIT_PROTOCOL_1PC = 0, - COMMIT_PROTOCOL_2PC = 1 -} CommitProtocolType; - /* Enumeration that defines different remote transaction states */ typedef enum { @@ -51,10 +45,6 @@ typedef struct TransactionConnection } TransactionConnection; -/* config variable managed via guc.c */ -extern int MultiShardCommitProtocol; - - /* Functions declarations for transaction and connection management */ extern void InitializeDistributedTransaction(void); extern void PrepareRemoteTransactions(List *connectionList); diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 7623cafb2..363042a94 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -16,21 +16,8 @@ #include "c.h" #include "libpq-fe.h" -#include "nodes/pg_list.h" #include "utils/hsearch.h" - -/* maximum duration to wait for connection */ -#define CLIENT_CONNECT_TIMEOUT_SECONDS "5" - -/* maximum (textual) lengths of hostname and port */ -#define MAX_NODE_LENGTH 255 -#define MAX_PORT_LENGTH 10 - -/* times to attempt connection (or reconnection) */ -#define MAX_CONNECT_ATTEMPTS 2 - -/* SQL statement for testing */ -#define TEST_SQL "DO $$ BEGIN RAISE EXCEPTION 'Raised remotely!'; END $$" +#include "distributed/connection_management.h" /* * NodeConnectionKey acts as the key to index into the (process-local) hash @@ -52,26 +39,9 @@ typedef struct NodeConnectionEntry } NodeConnectionEntry; -/* describes what kind of modifications have occurred in the current transaction */ -typedef enum -{ - XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */ - XACT_MODIFICATION_NONE, /* no modifications have taken place */ - XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */ - XACT_MODIFICATION_MULTI_SHARD /* multi-shard modifications have occurred */ -} XactModificationType; - - -/* state needed to prevent new connections during modifying transactions */ -extern XactModificationType XactModificationLevel; - - /* function declarations for obtaining and using a connection */ extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); -extern void PurgeConnection(PGconn *connection); extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey); -extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey); -extern bool SqlStateMatchesCategory(char *sqlStateString, int category); extern void WarnRemoteError(PGconn *connection, PGresult *result); extern void ReraiseRemoteError(PGconn *connection, PGresult *result); extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h new file mode 100644 index 000000000..be88c79a3 --- /dev/null +++ b/src/include/distributed/connection_management.h @@ -0,0 +1,128 @@ +/*------------------------------------------------------------------------- + * + * connection_management.h + * Central management of connections and their life-cycle + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CONNECTION_MANAGMENT_H +#define CONNECTION_MANAGMENT_H + +#include "distributed/transaction_management.h" +#include "lib/ilist.h" +#include "utils/hsearch.h" +#include "utils/timestamp.h" + +/* maximum (textual) lengths of hostname and port */ +#define MAX_NODE_LENGTH 255 /* includes 0 byte */ + +#define CLIENT_CONNECT_TIMEOUT_SECONDS_INT 5 + +/* forward declare, to avoid forcing large headers on everyone */ +struct pg_conn; /* target of the PGconn typedef */ +struct MemoryContextData; + +/* + * Flags determining connection establishment behaviour. + */ +enum MultiConnectionMode +{ + /* force establishment of a new connection */ + FORCE_NEW_CONNECTION = 1 << 0, + + /* mark returned connection as having session lifespan */ + SESSION_LIFESPAN = 1 << 1 +}; + + +/* declaring this directly above makes uncrustify go crazy */ +typedef enum MultiConnectionMode MultiConnectionMode; + +typedef struct MultiConnection +{ + /* connection details, useful for error messages and such. */ + char hostname[MAX_NODE_LENGTH]; + int32 port; + char user[NAMEDATALEN]; + char database[NAMEDATALEN]; + + /* underlying libpq connection */ + struct pg_conn *pgConn; + + /* is the connection intended to be kept after transaction end */ + bool sessionLifespan; + + /* is the connection currently in use, and shouldn't be used by anything else */ + bool claimedExclusively; + + /* time connection establishment was started, for timeout */ + TimestampTz connectionStart; + + /* membership in list of list of connections in ConnectionHashEntry */ + dlist_node connectionNode; +} MultiConnection; + + +/* + * Central connection management hash, mapping (host, port, user, database) to + * a list of connections. + * + * This hash is used to keep track of which connections are open to which + * node. Besides allowing connection reuse, that information is e.g. used to + * handle closing connections after the end of a transaction. + */ + +/* hash key */ +typedef struct ConnectionHashKey +{ + char hostname[MAX_NODE_LENGTH]; + int32 port; + char user[NAMEDATALEN]; + char database[NAMEDATALEN]; +} ConnectionHashKey; + +/* hash entry */ +typedef struct ConnectionHashEntry +{ + ConnectionHashKey key; + dlist_head *connections; +} ConnectionHashEntry; + +/* the hash table */ +extern HTAB *ConnectionHash; + +/* context for all connection and transaction related memory */ +extern struct MemoryContextData *ConnectionContext; + + +extern void AfterXactConnectionHandling(bool isCommit); +extern void InitializeConnectionManagement(void); + + +/* Low-level connection establishment APIs */ +extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname, + int32 port); +extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname, + int32 port); +extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, + int32 port, const char *user, const + char *database); +extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, + const char *hostname, + int32 port, + const char *user, + const char *database); +extern MultiConnection * GetConnectionFromPGconn(struct pg_conn *pqConn); +extern void CloseConnection(MultiConnection *connection); +extern void CloseConnectionByPGconn(struct pg_conn *pqConn); + +/* dealing with a connection */ +extern void FinishConnectionEstablishment(MultiConnection *connection); +extern void ClaimConnectionExclusively(MultiConnection *connection); +extern void UnclaimConnection(MultiConnection *connection); + + +#endif /* CONNECTION_MANAGMENT_H */ diff --git a/src/include/distributed/hash_helpers.h b/src/include/distributed/hash_helpers.h new file mode 100644 index 000000000..0fecf0c03 --- /dev/null +++ b/src/include/distributed/hash_helpers.h @@ -0,0 +1,30 @@ +/*------------------------------------------------------------------------- + * hash_helpers.h + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef HASH_HELPERS_H +#define HASH_HELPERS_H + +#include "utils/hsearch.h" + +/* + * Combine two hash values, resulting in another hash value, with decent bit + * mixing. + * + * Similar to boost's hash_combine(). + */ +static inline uint32 +hash_combine(uint32 a, uint32 b) +{ + a ^= b + 0x9e3779b9 + (a << 6) + (a >> 2); + return a; +} + + +extern void hash_delete_all(HTAB *htab); + +#endif diff --git a/src/include/distributed/multi_client_executor.h b/src/include/distributed/multi_client_executor.h index e9658ee22..56d889f4d 100644 --- a/src/include/distributed/multi_client_executor.h +++ b/src/include/distributed/multi_client_executor.h @@ -15,10 +15,8 @@ #define MULTI_CLIENT_EXECUTOR_H #define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */ -#define CLIENT_CONNECT_TIMEOUT 5 /* connection timeout in seconds */ #define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */ #define STRING_BUFFER_SIZE 1024 /* buffer size for character arrays */ -#define CONN_INFO_TEMPLATE "host=%s port=%u dbname=%s user=%s connect_timeout=%u" /* Enumeration to track one client connection's status */ diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h new file mode 100644 index 000000000..6ce25ccfd --- /dev/null +++ b/src/include/distributed/remote_commands.h @@ -0,0 +1,36 @@ +/*------------------------------------------------------------------------- + * + * remote_commands.h + * Helpers to execute commands on remote nodes, over libpq. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef REMOTE_COMMAND_H +#define REMOTE_COMMAND_H + +#include "distributed/connection_management.h" + + +/* GUC, determining whether statements sent to remote nodes are logged */ +extern bool LogRemoteCommands; + + +/* simple helpers */ +extern bool IsResponseOK(struct pg_result *result); +extern void ForgetResults(MultiConnection *connection); +extern bool SqlStateMatchesCategory(char *sqlStateString, int category); + +/* report errors & warnings */ +extern void ReportConnectionError(MultiConnection *connection, int elevel); +extern void ReportResultError(MultiConnection *connection, struct pg_result *result, + int elevel); +extern void LogRemoteCommand(MultiConnection *connection, const char *command); + +/* wrappers around libpq functions, with command logging support */ +extern int SendRemoteCommand(MultiConnection *connection, const char *command); + + +#endif /* REMOTE_COMMAND_H */ diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h new file mode 100644 index 000000000..91a5cd181 --- /dev/null +++ b/src/include/distributed/transaction_management.h @@ -0,0 +1,57 @@ +/*------------------------------------------------------------------------- + * transaction_management.h + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef TRANSACTION_MANAGMENT_H +#define TRANSACTION_MANAGMENT_H + +/* describes what kind of modifications have occurred in the current transaction */ +typedef enum +{ + XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */ + XACT_MODIFICATION_NONE, /* no modifications have taken place */ + XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */ + XACT_MODIFICATION_MULTI_SHARD /* multi-shard modifications have occurred */ +} XactModificationType; + + +/* + * Enum defining the state of a coordinated (i.e. a transaction potentially + * spanning several nodes). + */ +typedef enum CoordinatedTransactionState +{ + /* no coordinated transaction in progress, no connections established */ + COORD_TRANS_NONE, + + /* no coordinated transaction in progress, but connections established */ + COORD_TRANS_IDLE +} CoordinatedTransactionState; + + +/* Enumeration that defines the different commit protocols available */ +typedef enum +{ + COMMIT_PROTOCOL_1PC = 0, + COMMIT_PROTOCOL_2PC = 1 +} CommitProtocolType; + +/* config variable managed via guc.c */ +extern int MultiShardCommitProtocol; + +/* state needed to prevent new connections during modifying transactions */ +extern XactModificationType XactModificationLevel; + + +extern CoordinatedTransactionState CurrentCoordinatedTransactionState; + + +/* initialization function(s) */ +extern void InitializeTransactionManagement(void); + + +#endif /* TRANSACTION_MANAGMENT_H */ diff --git a/src/test/regress/expected/multi_connection_cache.out b/src/test/regress/expected/multi_connection_cache.out index 79cf59dd2..9ac26c029 100644 --- a/src/test/regress/expected/multi_connection_cache.out +++ b/src/test/regress/expected/multi_connection_cache.out @@ -32,7 +32,7 @@ CREATE FUNCTION set_connection_status_bad(cstring, integer) \set VERBOSITY terse -- connect to non-existent host SELECT initialize_remote_temp_table('dummy-host-name', 12345); -WARNING: connection failed to dummy-host-name:12345 +WARNING: connection error: dummy-host-name:12345 initialize_remote_temp_table ------------------------------ f