diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 48084569d..b92333d20 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -111,7 +111,7 @@ static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescrip Tuplestorestate *tupleStore); static PGconn * GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery); -static void PurgeConnectionForPlacement(ShardPlacement *placement); +static void PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement); static void RemoveXactConnection(PGconn *connection); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, @@ -794,7 +794,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { - PurgeConnectionForPlacement(taskPlacement); + PurgeConnectionForPlacement(connection, taskPlacement); failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; } @@ -852,7 +852,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, } else { - PurgeConnectionForPlacement(taskPlacement); + PurgeConnectionForPlacement(connection, taskPlacement); failedPlacementList = lappend(failedPlacementList, taskPlacement); @@ -1234,17 +1234,9 @@ GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery) * for the transaction in addition to purging the connection cache's entry. */ static void -PurgeConnectionForPlacement(ShardPlacement *placement) +PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement) { - NodeConnectionKey nodeKey; - char *currentUser = CurrentUserName(); - - MemSet(&nodeKey, 0, sizeof(NodeConnectionKey)); - strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); - nodeKey.nodePort = placement->nodePort; - strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN); - - PurgeConnectionByKey(&nodeKey); + PurgeConnection(connection); /* * The following is logically identical to RemoveXactConnection, but since @@ -1256,6 +1248,13 @@ PurgeConnectionForPlacement(ShardPlacement *placement) { NodeConnectionEntry *participantEntry = NULL; bool entryFound = false; + NodeConnectionKey nodeKey; + char *currentUser = CurrentUserName(); + + MemSet(&nodeKey, 0, sizeof(NodeConnectionKey)); + strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); + nodeKey.nodePort = placement->nodePort; + strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN); Assert(IsTransactionBlock()); diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 24ca5540c..14ae9abd6 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -69,7 +69,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { - AfterXactConnectionHandling(true); + AfterXactResetConnections(true); } Assert(!subXactAbortAttempted); @@ -83,7 +83,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { - AfterXactConnectionHandling(false); + AfterXactResetConnections(false); } CurrentCoordinatedTransactionState = COORD_TRANS_NONE; diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index f2a082651..c8377981e 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -51,19 +51,17 @@ static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseEr PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort) { - int connectionFlags = NEW_CONNECTION | CACHED_CONNECTION | SESSION_LIFESPAN; PGconn *connection = NULL; - MultiConnection *mconnection = - GetNodeConnection(connectionFlags, nodeName, nodePort); + MultiConnection *multiConnection = GetNodeConnection(nodeName, nodePort, 0); - if (PQstatus(mconnection->conn) == CONNECTION_OK) + if (PQstatus(multiConnection->conn) == CONNECTION_OK) { - connection = mconnection->conn; + connection = multiConnection->conn; } else { - ReportConnectionError(mconnection, WARNING); - CloseConnection(mconnection); + ReportConnectionError(multiConnection, WARNING); + CloseConnectionByPGconn(multiConnection->conn); connection = NULL; } @@ -79,11 +77,7 @@ GetOrEstablishConnection(char *nodeName, int32 nodePort) void PurgeConnection(PGconn *connection) { - NodeConnectionKey nodeConnectionKey; - - BuildKeyForConnection(connection, &nodeConnectionKey); - - PurgeConnectionByKey(&nodeConnectionKey); + CloseConnectionByPGconn(connection); } @@ -130,27 +124,6 @@ BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey) } -void -PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey) -{ - int connectionFlags = CACHED_CONNECTION; - MultiConnection *connection; - - connection = - StartNodeUserDatabaseConnection( - connectionFlags, - nodeConnectionKey->nodeName, - nodeConnectionKey->nodePort, - nodeConnectionKey->nodeUser, - NULL); - - if (connection) - { - CloseConnection(connection); - } -} - - /* * WarnRemoteError retrieves error fields from a remote result and produces an * error report at the WARNING level after amending the error with a CONTEXT diff --git a/src/backend/distributed/utils/connection_management.c b/src/backend/distributed/utils/connection_management.c index 98d60b469..08f7e5c86 100644 --- a/src/backend/distributed/utils/connection_management.c +++ b/src/backend/distributed/utils/connection_management.c @@ -34,7 +34,8 @@ MemoryContext ConnectionContext = NULL; static uint32 ConnectionHashHash(const void *key, Size keysize); static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); -static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); +static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); +static void AfterXactResetHostConnections(ConnectionHashEntry *entry, bool isCommit); /* @@ -71,32 +72,6 @@ InitializeConnectionManagement(void) } -/* - * Perform connection management activity after the end of a transaction. Both - * COMMIT and ABORT paths are handled here. - * - * This is called by Citus' global transaction callback. - */ -void -AfterXactConnectionHandling(bool isCommit) -{ - HASH_SEQ_STATUS status; - ConnectionHashEntry *entry; - - hash_seq_init(&status, ConnectionHash); - while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) - { - AfterXactHostConnectionHandling(entry, isCommit); - - /* - * NB: We leave the hash entry in place, even if there's no individual - * connections in it anymore. There seems no benefit in deleting it, - * and it'll save a bit of work in the next transaction. - */ - } -} - - /* * GetNodeConnection() establishes a connection to remote node, using default * user and database. @@ -104,9 +79,9 @@ AfterXactConnectionHandling(bool isCommit) * See StartNodeUserDatabaseConnection for details. */ MultiConnection * -GetNodeConnection(uint32 flags, const char *hostname, int32 port) +GetNodeConnection(const char *hostname, int32 port, uint32 flags) { - return GetNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL); + return GetNodeUserDatabaseConnection(hostname, port, NULL, NULL, flags); } @@ -117,9 +92,9 @@ GetNodeConnection(uint32 flags, const char *hostname, int32 port) * See StartNodeUserDatabaseConnection for details. */ MultiConnection * -StartNodeConnection(uint32 flags, const char *hostname, int32 port) +StartNodeConnection(const char *hostname, int32 port, uint32 flags) { - return StartNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL); + return StartNodeUserDatabaseConnection(hostname, port, NULL, NULL, flags); } @@ -129,12 +104,12 @@ StartNodeConnection(uint32 flags, const char *hostname, int32 port) * See StartNodeUserDatabaseConnection for details. */ MultiConnection * -GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const - char *user, const char *database) +GetNodeUserDatabaseConnection(const char *hostname, int32 port, const + char *user, const char *database, uint32 flags) { MultiConnection *connection; - connection = StartNodeUserDatabaseConnection(flags, hostname, port, user, database); + connection = StartNodeUserDatabaseConnection(hostname, port, user, database, flags); FinishConnectionEstablishment(connection); @@ -145,30 +120,18 @@ GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, co /* * StartNodeUserDatabaseConnection() initiates a connection to a remote node. * - * If user or database are NULL, the current session's defaults are used. The - * following flags influence connection establishment behaviour: - * - NEW_CONNECTION - it is permitted to establish a new connection - * - CACHED_CONNECTION - it is permitted to re-use an established connection - * - SESSION_LIFESPAN - the connection should persist after transaction end - * - FOR_DML - only meaningful for placement associated connections - * - FOR_DDL - only meaningful for placement associated connections - * - CRITICAL_CONNECTION - transaction failures on this connection fail the entire - * coordinated transaction - * * The returned connection has only been initiated, not fully * established. That's useful to allow parallel connection establishment. If * that's not desired use the Get* variant. */ MultiConnection * -StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const - char *user, const char *database) +StartNodeUserDatabaseConnection(const char *hostname, int32 port, const + char *user, const char *database, uint32 flags) { ConnectionHashKey key; ConnectionHashEntry *entry = NULL; - MultiConnection *connection; - MemoryContext oldContext; - bool found; - dlist_iter iter; + MultiConnection *connection = NULL; + bool found = false; /* do some minimal input checks */ strlcpy(key.hostname, hostname, MAX_NODE_LENGTH); @@ -197,17 +160,9 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, strlcpy(key.database, get_database_name(MyDatabaseId), NAMEDATALEN); } - if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE) - { - CurrentCoordinatedTransactionState = COORD_TRANS_IDLE; - } - /* - * Lookup relevant hash entry. We always enter. If only a cached - * connection is desired, and there's none, we'll simply leave the - * connection list empty. + * Lookup relevant hash entry or enter a new one. */ - entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); if (!found) { @@ -216,123 +171,106 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, dlist_init(entry->connections); } - /* if desired, check whether there's a usable connection */ - if (flags & CACHED_CONNECTION) + connection = FindAvailableConnection(entry->connections, flags); + if (connection == NULL) { - /* check connection cache for a connection that's not already in use */ - dlist_foreach(iter, entry->connections) - { - connection = dlist_container(MultiConnection, node, iter.cur); + MemoryContext oldContext = NULL; - /* don't return claimed connections */ - if (connection->claimedExclusively) - { - continue; - } + /* + * Either no caching desired, or no pre-established, non-claimed, + * connection present. Initiate connection establishment. + */ + connection = StartConnectionEstablishment(&key); - /* - * If we're not allowed to open new connections right now, and the - * current connection hasn't yet been used in this transaction, we - * can't use it. - */ - if (!connection->activeInTransaction && - XactModificationLevel > XACT_MODIFICATION_DATA) - { - continue; - } + oldContext = MemoryContextSwitchTo(ConnectionContext); + dlist_push_tail(entry->connections, &connection->node); - if (flags & SESSION_LIFESPAN) - { - connection->sessionLifespan = true; - } - connection->activeInTransaction = true; - - /* - * One could argue for erroring out when the connection is in a - * failed state. But that'd be a bad idea for two reasons: - * - * 1) Generally starting a connection might fail, after calling - * this function, so calling code needs to handle that anyway. - * 2) This might be used in code that transparently handles - * connection failure. - */ - return connection; - } + MemoryContextSwitchTo(oldContext); } - /* no connection available, done if a new connection isn't desirable */ - if (!(flags & NEW_CONNECTION)) + if (flags & IN_TRANSACTION) { - return NULL; + connection->activeInTransaction = true; } - /* - * Check whether we're right now allowed to open new connections. - * - * FIXME: This should be removed soon, once all connections go through - * this API. - */ - if (XactModificationLevel > XACT_MODIFICATION_DATA) + if (flags & CLAIM_EXCLUSIVELY) { - ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot open new connections after the first modification " - "command within a transaction"))); + ClaimConnectionExclusively(connection); } - /* - * Either no caching desired, or no pre-established, non-claimed, - * connection present. Initiate connection establishment. - */ - connection = StartConnectionEstablishment(&key); - - oldContext = MemoryContextSwitchTo(ConnectionContext); - dlist_push_tail(entry->connections, &connection->node); - - MemoryContextSwitchTo(oldContext); - - if (flags & SESSION_LIFESPAN) - { - connection->sessionLifespan = true; - } - - connection->activeInTransaction = true; - return connection; } +/* + * FindAvailableConnection finds an available connection from the given list + * of connections. A connection is available if it has not been claimed + * exclusively and, if the IN_TRANSACTION flag is not set, is not in a + * transaction. + */ +static MultiConnection * +FindAvailableConnection(dlist_head *connections, uint32 flags) +{ + dlist_iter iter; + + /* check connection cache for a connection that's not already in use */ + dlist_foreach(iter, connections) + { + MultiConnection *connection = + dlist_container(MultiConnection, node, iter.cur); + + /* don't return claimed connections */ + if (connection->claimedExclusively) + { + continue; + } + + /* + * Don't return connections that are active in the coordinated transaction if + * not explicitly requested. + */ + if (!(flags & IN_TRANSACTION) && connection->activeInTransaction) + { + continue; + } + + return connection; + } + + return NULL; +} + + /* * Close a previously established connection. */ void -CloseConnection(MultiConnection *connection) +CloseConnectionByPGconn(PGconn *pqConn) { - ConnectionHashKey key; - bool found; + HASH_SEQ_STATUS status; + ConnectionHashEntry *entry; - /* close connection */ - PQfinish(connection->conn); - connection->conn = NULL; - - strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH); - key.port = connection->port; - strlcpy(key.user, connection->user, NAMEDATALEN); - strlcpy(key.database, connection->database, NAMEDATALEN); - - hash_search(ConnectionHash, &key, HASH_FIND, &found); - - if (found) + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) { - /* unlink from list */ - dlist_delete(&connection->node); + dlist_head *connections = entry->connections; + dlist_iter iter; - /* we leave the per-host entry alive */ - pfree(connection); - } - else - { - /* XXX: we could error out instead */ - ereport(WARNING, (errmsg("closing untracked connection"))); + /* check connection cache for a connection that's not already in use */ + dlist_foreach(iter, connections) + { + MultiConnection *connection = + dlist_container(MultiConnection, node, iter.cur); + + if (connection->conn == pqConn) + { + /* unlink from list */ + dlist_delete(&connection->node); + + /* we leave the per-host entry alive */ + pfree(connection); + } + } } } @@ -560,13 +498,40 @@ StartConnectionEstablishment(ConnectionHashKey *key) /* - * Close all remote connections if necessary anymore (i.e. not session - * lifetime), or if in a failed state. + * Perform connection management activity after the end of a transaction. Both + * COMMIT and ABORT paths are handled here. + * + * This is called by Citus' global transaction callback. + */ +void +AfterXactResetConnections(bool isCommit) +{ + HASH_SEQ_STATUS status; + ConnectionHashEntry *entry; + + hash_seq_init(&status, ConnectionHash); + while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + AfterXactResetHostConnections(entry, isCommit); + + /* + * NB: We leave the hash entry in place, even if there's no individual + * connections in it anymore. There seems no benefit in deleting it, + * and it'll save a bit of work in the next transaction. + */ + } +} + + +/* + * Close all remote connections if necessary anymore (i.e. not cached), + * or if in a failed state. */ static void -AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) +AfterXactResetHostConnections(ConnectionHashEntry *entry, bool isCommit) { dlist_mutable_iter iter; + bool cachedConnection = false; dlist_foreach_modify(iter, entry->connections) { @@ -585,11 +550,10 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) } /* - * Only let a connection life longer than a single transaction if - * instructed to do so by the caller. We also skip doing so if - * it's in a state that wouldn't allow us to run queries again. + * Close connection if there was an error or we already cached + * a connection for this node. */ - if (!connection->sessionLifespan || + if (cachedConnection || PQstatus(connection->conn) != CONNECTION_OK || PQtransactionStatus(connection->conn) != PQTRANS_IDLE) { @@ -603,10 +567,12 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) } else { - /* reset per-transaction state */ + /* reset connection state */ + UnclaimConnection(connection); connection->activeInTransaction = false; - UnclaimConnection(connection); + /* close remaining connections */ + cachedConnection = true; } } } diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 073b4290f..6696c8664 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -56,7 +56,6 @@ typedef struct NodeConnectionEntry extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); extern void PurgeConnection(PGconn *connection); extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey); -extern void PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey); extern void WarnRemoteError(PGconn *connection, PGresult *result); extern void ReraiseRemoteError(PGconn *connection, PGresult *result); extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index c37f9d516..64ec16a5d 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -30,23 +30,11 @@ struct MemoryContextData; */ enum MultiConnectionMode { - /* allow establishment of new connections */ - NEW_CONNECTION = 1 << 0, + /* connection should be part of the coordinated transaction */ + IN_TRANSACTION = 1 << 1, - /* allow use of pre-established connections */ - CACHED_CONNECTION = 1 << 1, - - /* mark returned connection having session lifespan */ - SESSION_LIFESPAN = 1 << 2, - - /* the connection will be used for DML */ - FOR_DML = 1 << 3, - - /* the connection will be used for DDL */ - FOR_DDL = 1 << 4, - - /* failures on this connection will fail entire coordinated transaction */ - CRITICAL_CONNECTION = 1 << 5 + /* connection should be claimed exclusively for the caller */ + CLAIM_EXCLUSIVELY = 1 << 2, }; @@ -64,13 +52,10 @@ typedef struct MultiConnection /* underlying libpq connection */ struct pg_conn *conn; - /* is the connection intended to be kept after transaction end */ - bool sessionLifespan; - /* is the connection currently in use, and shouldn't be used by anything else */ bool claimedExclusively; - /* has the connection been used in the current coordinated transaction? */ + /* is the connection currently part of the coordinated transaction */ bool activeInTransaction; /* time connection establishment was started, for timeout */ @@ -112,24 +97,25 @@ extern HTAB *ConnectionHash; extern struct MemoryContextData *ConnectionContext; -extern void AfterXactConnectionHandling(bool isCommit); +extern void AfterXactResetConnections(bool isCommit); extern void InitializeConnectionManagement(void); /* Low-level connection establishment APIs */ -extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname, - int32 port); -extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname, - int32 port); -extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, - int32 port, const char *user, const - char *database); -extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, - const char *hostname, +extern MultiConnection * GetNodeConnection(const char *hostname, int32 port, + uint32 flags); +extern MultiConnection * StartNodeConnection(const char *hostname, int32 port, + uint32 flags); +extern MultiConnection * GetNodeUserDatabaseConnection(const char *hostname, int32 port, + const char *user, + const char *database, + uint32 flags); +extern MultiConnection * StartNodeUserDatabaseConnection(const char *hostname, int32 port, const char *user, - const char *database); -extern void CloseConnection(MultiConnection *connection); + const char *database, + uint32 flags); +extern void CloseConnectionByPGconn(struct pg_conn *pqConn); /* dealing with a connection */ extern void FinishConnectionEstablishment(MultiConnection *connection);