diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 8d85b5ed9..f8580a6b9 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -79,12 +79,14 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_executor.h" +#include "distributed/locally_reserved_shared_connections.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" #include "distributed/resource_lock.h" #include "distributed/shard_pruning.h" +#include "distributed/shared_connection_stats.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" #include "distributed/local_multi_copy.h" @@ -259,7 +261,7 @@ static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateLis char *nodeName, int nodePort); static List * ConnectionStateList(HTAB *connectionStateHash); static List * ConnectionStateListToNode(HTAB *connectionStateHash, - char *hostname, int port); + const char *hostname, int32 port); static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, bool stopOnFailure, bool @@ -815,6 +817,12 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, MultiConnection *connection = GetPlacementConnection(connectionFlags, placement, nodeUser); + /* + * This code-path doesn't support optional connections, so we don't expect + * NULL connections. + */ + Assert(connection != NULL); + if (PQstatus(connection->pgConn) != CONNECTION_OK) { if (stopOnFailure) @@ -2265,6 +2273,15 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyDest->connectionStateHash = CreateConnectionStateHash(TopTransactionContext); RecordRelationAccessIfReferenceTable(tableId, PLACEMENT_ACCESS_DML); + + /* + * For all the primary (e.g., writable) nodes, reserve a shared connection. + * We do this upfront because we cannot know which nodes are going to be + * accessed. Since the order of the reservation is important, we need to + * do it right here. For the details on why the order important, see + * the function. + */ + EnsureConnectionPossibilityForPrimaryNodes(); } @@ -2938,6 +2955,12 @@ CitusCopyTo(CopyStmt *copyStatement, char *completionTag) shardPlacement, userName); + /* + * This code-path doesn't support optional connections, so we don't expect + * NULL connections. + */ + Assert(connection != NULL); + if (placementIndex == list_length(shardPlacementList) - 1) { /* last chance for this shard */ @@ -3286,10 +3309,10 @@ ConnectionStateList(HTAB *connectionStateHash) /* * ConnectionStateListToNode returns all CopyConnectionState structures in - * the given hash. + * the given hash for a given hostname and port values. */ static List * -ConnectionStateListToNode(HTAB *connectionStateHash, char *hostname, int port) +ConnectionStateListToNode(HTAB *connectionStateHash, const char *hostname, int32 port) { List *connectionStateList = NIL; HASH_SEQ_STATUS status; @@ -3533,18 +3556,37 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, ConnectionStateListToNode(connectionStateHash, nodeName, nodePort); if (HasReachedAdaptiveExecutorPoolSize(copyConnectionStateList)) { - connection = - GetLeastUtilisedCopyConnection(copyConnectionStateList, nodeName, nodePort); - /* * If we've already reached the executor pool size, there should be at * least one connection to any given node. + * + * Note that we don't need to mark the connection as critical, since the + * connection was already returned by this function before. */ - Assert(connection != NULL); + connection = GetLeastUtilisedCopyConnection(copyConnectionStateList, + nodeName, + nodePort); return connection; } + if (IsReservationPossible()) + { + /* + * Enforce the requirements for adaptive connection management + * (a.k.a., throttle connections if citus.max_shared_pool_size + * reached). + * + * Given that we have done reservations per node, we do not ever + * need to pass WAIT_FOR_CONNECTION, we are sure that there is a + * connection either reserved for this backend or already established + * by the previous commands in the same transaction block. + */ + int adaptiveConnectionManagementFlag = OPTIONAL_CONNECTION; + connectionFlags |= adaptiveConnectionManagementFlag; + } + + /* * For placements that haven't been assigned a connection by a previous command * in the current transaction, we use a separate connection per placement for @@ -3571,6 +3613,52 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, } connection = GetPlacementConnection(connectionFlags, placement, nodeUser); + if (connection == NULL) + { + if (list_length(copyConnectionStateList) > 0) + { + /* + * The connection manager throttled any new connections, so pick an existing + * connection with least utilization. + * + * Note that we don't need to mark the connection as critical, since the + * connection was already returned by this function before. + */ + connection = + GetLeastUtilisedCopyConnection(copyConnectionStateList, nodeName, + nodePort); + } + else + { + /* + * For this COPY command, we have not established any connections + * and adaptive connection management throttled the new connection + * request. This could only happen if this COPY command is the + * second (or later) COPY command in a transaction block as the + * first COPY command always gets a connection per node thanks to + * the connection reservation. + * + * As we know that there has been at least one COPY command happened + * earlier, we need to find the connection to that node, and use it. + */ + connection = + ConnectionAvailableToNode(nodeName, nodePort, CurrentUserName(), + CurrentDatabaseName()); + + /* + * We do not expect this to happen, but still instead of an assert, + * we prefer explicit error message. + */ + if (connection == NULL) + { + ereport(ERROR, (errmsg("could not find an available connection"), + errhint("Set citus.max_shared_pool_size TO -1 to let " + "COPY command finish"))); + } + } + + return connection; + } if (PQstatus(connection->pgConn) != CONNECTION_OK) { @@ -3628,6 +3716,8 @@ HasReachedAdaptiveExecutorPoolSize(List *connectionStateList) /* * GetLeastUtilisedCopyConnection returns a MultiConnection to the given node * with the least number of placements assigned to it. + * + * It is assumed that there exists at least one connection to the node. */ static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateList, char *nodeName, @@ -3637,6 +3727,14 @@ GetLeastUtilisedCopyConnection(List *connectionStateList, char *nodeName, int minPlacementCount = PG_INT32_MAX; ListCell *connectionStateCell = NULL; + /* + * We only pick the least utilised connection when some connection limits are + * reached such as max_shared_pool_size or max_adaptive_executor_pool_size. + * + * Therefore there should be some connections to choose from. + */ + Assert(list_length(connectionStateList) > 0); + foreach(connectionStateCell, connectionStateList) { CopyConnectionState *connectionState = lfirst(connectionStateCell); diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 3cc8bb0c5..c25c0a67b 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -46,18 +46,19 @@ int MaxCachedConnectionsPerWorker = 1; HTAB *ConnectionHash = NULL; HTAB *ConnParamsHash = NULL; + MemoryContext ConnectionContext = NULL; static uint32 ConnectionHashHash(const void *key, Size keysize); static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static void StartConnectionEstablishment(MultiConnection *connectionn, ConnectionHashKey *key); +static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static bool ShouldShutdownConnection(MultiConnection *connection, const int cachedConnectionCount); static void ResetConnection(MultiConnection *connection); -static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); @@ -497,6 +498,38 @@ CloseAllConnectionsAfterTransaction(void) } +/* + * ConnectionAvailableToNode returns a MultiConnection if the session has at least + * one connection established and avaliable to use to the give node. Else, returns + * false. + */ +MultiConnection * +ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName, + const char *database) +{ + ConnectionHashKey key; + bool found = false; + + strlcpy(key.hostname, hostName, MAX_NODE_LENGTH); + key.port = nodePort; + strlcpy(key.user, userName, NAMEDATALEN); + strlcpy(key.database, database, NAMEDATALEN); + + ConnectionHashEntry *entry = + (ConnectionHashEntry *) hash_search(ConnectionHash, &key, HASH_FIND, &found); + + if (!found) + { + return false; + } + + int flags = 0; + MultiConnection *connection = FindAvailableConnection(entry->connections, flags); + + return connection; +} + + /* * CloseNodeConnectionsAfterTransaction sets the forceClose flag of the connections * to a particular node as true such that the connections are no longer cached. This diff --git a/src/backend/distributed/connection/locally_reserved_shared_connections.c b/src/backend/distributed/connection/locally_reserved_shared_connections.c new file mode 100644 index 000000000..a4dc337cd --- /dev/null +++ b/src/backend/distributed/connection/locally_reserved_shared_connections.c @@ -0,0 +1,511 @@ +/*------------------------------------------------------------------------- + * + * locally_reserved_shared_connections.c + * + * Keeps track of the number of reserved connections to remote nodes + * for this backend. The primary goal is to complement the logic + * implemented in shared_connections.c which aims to prevent excessive + * number of connections (typically > max_connections) to any worker node. + * With this locally reserved connection stats, we enforce the same + * constraints considering these locally reserved shared connections. + * + * To be more precise, shared connection stats are incremented only with two + * operations: (a) Establishing a connection to a remote node + * (b) Reserving connections, the logic that this + * file implements. + * + * Finally, as the name already implies, once a node has reserved a shared + * connection, it is guaranteed to have the right to establish a connection + * to the given remote node when needed. + * + * For COPY command, we use this fact to reserve connections to the remote nodes + * in the same order as the adaptive executor in order to prevent any resource + * starvations. We need to do this because COPY establishes connections when it + * recieves a tuple that targets a remote node. This is a valuable optimization + * to prevent unnecessary connection establishments, which are pretty expensive. + * Instead, COPY command can reserve connections upfront, and utilize them when + * they are actually needed. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" + +#include "access/hash.h" +#include "commands/dbcommands.h" +#include "distributed/listutils.h" +#include "distributed/locally_reserved_shared_connections.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" +#include "distributed/placement_connection.h" +#include "distributed/shared_connection_stats.h" +#include "distributed/tuplestore.h" +#include "distributed/worker_manager.h" +#include "utils/hashutils.h" +#include "utils/builtins.h" + + +#define RESERVED_CONNECTION_COLUMNS 4 + + +/* session specific hash map*/ +static HTAB *SessionLocalReservedConnections = NULL; + + +/* + * Hash key for connection reservations + */ +typedef struct ReservedConnectionHashKey +{ + char hostname[MAX_NODE_LENGTH]; + int32 port; + Oid databaseOid; + Oid userId; +} ReservedConnectionHashKey; + +/* + * Hash entry for per worker information. The rules are as follows: + * - If there is no entry in the hash, we can make a reservation. + * - If usedReservation is false, we have a reservation that we can use. + * - If usedReservation is true, we used the reservation and cannot make more reservations. + */ +typedef struct ReservedConnectionHashEntry +{ + ReservedConnectionHashKey key; + + bool usedReservation; +} ReservedConnectionHashEntry; + + +static void StoreAllReservedConnections(Tuplestorestate *tupleStore, + TupleDesc tupleDescriptor); +static ReservedConnectionHashEntry * AllocateOrGetReservedConectionEntry(char *hostName, + int nodePort, Oid + userId, Oid + databaseOid, + bool *found); +static void EnsureConnectionPossibilityForNodeList(List *nodeList); +static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize); +static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize); + + +PG_FUNCTION_INFO_V1(citus_reserved_connection_stats); + +/* + * citus_reserved_connection_stats returns all the avaliable information about all + * the reserved connections. This function is used mostly for testing. + */ +Datum +citus_reserved_connection_stats(PG_FUNCTION_ARGS) +{ + TupleDesc tupleDescriptor = NULL; + + CheckCitusVersion(ERROR); + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); + + StoreAllReservedConnections(tupleStore, tupleDescriptor); + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupleStore); + + PG_RETURN_VOID(); +} + + +/* + * StoreAllReservedConnections gets connections established from the current node + * and inserts them into the given tuplestore. + * + * We don't need to enforce any access privileges as the number of backends + * on any node is already visible on pg_stat_activity to all users. + */ +static void +StoreAllReservedConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) +{ + Datum values[RESERVED_CONNECTION_COLUMNS]; + bool isNulls[RESERVED_CONNECTION_COLUMNS]; + + HASH_SEQ_STATUS status; + ReservedConnectionHashEntry *connectionEntry = NULL; + + hash_seq_init(&status, SessionLocalReservedConnections); + while ((connectionEntry = + (ReservedConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + /* get ready for the next tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + char *databaseName = get_database_name(connectionEntry->key.databaseOid); + if (databaseName == NULL) + { + /* database might have been dropped */ + continue; + } + + values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.hostname)); + values[1] = Int32GetDatum(connectionEntry->key.port); + values[2] = PointerGetDatum(cstring_to_text(databaseName)); + values[3] = BoolGetDatum(connectionEntry->usedReservation); + + tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); + } +} + + +/* + * InitializeLocallyReservedSharedConnections initializes the hashmap in + * ConnectionContext. + */ +void +InitializeLocallyReservedSharedConnections(void) +{ + HASHCTL reservedConnectionInfo; + + memset(&reservedConnectionInfo, 0, sizeof(reservedConnectionInfo)); + reservedConnectionInfo.keysize = sizeof(ReservedConnectionHashKey); + reservedConnectionInfo.entrysize = sizeof(ReservedConnectionHashEntry); + + /* + * ConnectionContext is the session local memory context that is used for + * tracking remote connections. + */ + reservedConnectionInfo.hcxt = ConnectionContext; + + reservedConnectionInfo.hash = LocalConnectionReserveHashHash; + reservedConnectionInfo.match = LocalConnectionReserveHashCompare; + + uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE); + + SessionLocalReservedConnections = + hash_create("citus session level reserved connectios (host,port,database,user)", + 64, &reservedConnectionInfo, hashFlags); +} + + +/* + * CanUseReservedConnection returns true if we have already reserved at least + * one shared connection in this session that is not used. + */ +bool +CanUseReservedConnection(const char *hostName, int nodePort, Oid userId, + Oid databaseOid) +{ + ReservedConnectionHashKey key; + + strlcpy(key.hostname, hostName, MAX_NODE_LENGTH); + key.userId = userId; + key.port = nodePort; + key.databaseOid = databaseOid; + + bool found = false; + ReservedConnectionHashEntry *entry = + (ReservedConnectionHashEntry *) hash_search(SessionLocalReservedConnections, &key, + HASH_FIND, &found); + + if (!found || !entry) + { + return false; + } + + return !entry->usedReservation; +} + + +/* + * DeallocateReservedConnections is responsible for two things. First, if the operation + * has reserved a connection but not used, it gives back the connection back to the + * shared memory pool. Second, for all cases, it deallocates the session local entry from + * the hash. + */ +void +DeallocateReservedConnections(void) +{ + HASH_SEQ_STATUS status; + ReservedConnectionHashEntry *entry; + + hash_seq_init(&status, SessionLocalReservedConnections); + while ((entry = (ReservedConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + if (!entry->usedReservation) + { + /* + * We have not used this reservation, make sure to clean-up from + * the shared memory as well. + */ + DecrementSharedConnectionCounter(entry->key.hostname, entry->key.port); + + /* for completeness, set it to true */ + entry->usedReservation = true; + } + + /* + * We cleaned up all the entries because we may not need reserved connections + * in the next iteration. + */ + bool found = false; + hash_search(SessionLocalReservedConnections, entry, HASH_REMOVE, &found); + Assert(found); + } +} + + +/* + * MarkReservedConnectionUsed sets the local hash that the reservation is used. + */ +void +MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId, + Oid databaseOid) +{ + ReservedConnectionHashKey key; + + strlcpy(key.hostname, hostName, MAX_NODE_LENGTH); + key.userId = userId; + key.port = nodePort; + key.databaseOid = databaseOid; + + bool found = false; + ReservedConnectionHashEntry *entry = + (ReservedConnectionHashEntry *) hash_search( + SessionLocalReservedConnections, &key, HASH_FIND, &found); + + if (!found) + { + ereport(ERROR, (errmsg("BUG: untracked reserved connection"), + errhint("Set citus.max_shared_pool_size TO -1 to " + "disable reserved connection counters"))); + } + + /* a reservation can only be used once */ + Assert(!entry->usedReservation); + + entry->usedReservation = true; +} + + +/* + * EnsureConnectionPossibilityForPrimaryNodes is a wrapper around + * EnsureConnectionPossibilityForNodeList. + */ +void +EnsureConnectionPossibilityForPrimaryNodes(void) +{ + /* + * By using NoLock there is a tiny risk of that we miss to reserve a + * connection for a concurrently added node. However, that doesn't + * seem to cause any problems as none of the placements that we are + * going to access would be on the new node. + */ + List *primaryNodeList = ActivePrimaryNodeList(NoLock); + + EnsureConnectionPossibilityForNodeList(primaryNodeList); +} + + +/* + * EnsureConnectionPossibilityForNodeList reserves a shared connection + * counter per node in the nodeList unless: + * - Reservation is needed (see IsReservationPossible()) + * - there is at least one connection to the node so that we are guranteed + * to get a connection + * - An earlier call already reserved a connection (e.g., we allow only a + * single reservation per backend) + */ +static void +EnsureConnectionPossibilityForNodeList(List *nodeList) +{ + if (!IsReservationPossible()) + { + return; + } + + /* + * We sort the workerList because adaptive connection management + * (e.g., OPTIONAL_CONNECTION) requires any concurrent executions + * to wait for the connections in the same order to prevent any + * starvation. If we don't sort, we might end up with: + * Execution 1: Get connection for worker 1, wait for worker 2 + * Execution 2: Get connection for worker 2, wait for worker 1 + * + * and, none could proceed. Instead, we enforce every execution establish + * the required connections to workers in the same order. + */ + nodeList = SortList(nodeList, CompareWorkerNodes); + + char *databaseName = get_database_name(MyDatabaseId); + Oid userId = GetUserId(); + char *userName = GetUserNameFromId(userId, false); + + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, nodeList) + { + if (ConnectionAvailableToNode(workerNode->workerName, workerNode->workerPort, + userName, databaseName) != NULL) + { + /* + * The same user has already an active connection for the node. It + * means that the execution can use the same connection, so reservation + * is not necessary. + */ + continue; + } + + /* + * We are trying to be defensive here by ensuring that the required hash + * table entry can be allocated. The main goal is that we don't want to be + * in a situation where shared connection counter is incremented but not + * the local reserved counter due to out-of-memory. + * + * Note that shared connection stats operate on the shared memory, and we + * pre-allocate all the necessary memory. In other words, it would never + * throw out of memory error. + */ + bool found = false; + ReservedConnectionHashEntry *hashEntry = + AllocateOrGetReservedConectionEntry(workerNode->workerName, + workerNode->workerPort, + userId, MyDatabaseId, &found); + + if (found) + { + /* + * We have already reserved a connection for this user and database + * on the worker. We only allow a single reservation per + * transaction block. The reason is that the earlier command (either in + * a transaction block or a function call triggered by a single command) + * was able to reserve or establish a connection. That connection is + * guranteed to be avaliable for us. + */ + continue; + } + + /* + * Increment the shared counter, we may need to wait if there are + * no space left. + */ + WaitLoopForSharedConnection(workerNode->workerName, workerNode->workerPort); + + /* locally mark that we have one connection reserved */ + hashEntry->usedReservation = false; + } +} + + +/* + * IsReservationPossible returns true if the state of the current + * session is eligible for shared connection reservation. + */ +bool +IsReservationPossible(void) +{ + if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) + { + /* connection throttling disabled */ + return false; + } + + if (UseConnectionPerPlacement()) + { + /* + * For this case, we are not enforcing adaptive + * connection management anyway. + */ + return false; + } + + if (SessionLocalReservedConnections == NULL) + { + /* + * This is unexpected as SessionLocalReservedConnections hash table is + * created at startup. Still, let's be defensive. + */ + return false; + } + + return true; +} + + +/* + * AllocateReservedConectionEntry allocates the required entry in the hash + * map by HASH_ENTER. The function throws an error if it cannot allocate + * the entry. + */ +static ReservedConnectionHashEntry * +AllocateOrGetReservedConectionEntry(char *hostName, int nodePort, Oid userId, + Oid databaseOid, bool *found) +{ + ReservedConnectionHashKey key; + + *found = false; + + strlcpy(key.hostname, hostName, MAX_NODE_LENGTH); + key.userId = userId; + key.port = nodePort; + key.databaseOid = databaseOid; + + /* + * Entering a new entry with HASH_ENTER flag is enough as it would + * throw out-of-memory error as it internally does palloc. + */ + ReservedConnectionHashEntry *entry = + (ReservedConnectionHashEntry *) hash_search(SessionLocalReservedConnections, + &key, HASH_ENTER, found); + + if (!*found) + { + /* + * Until we reserve connection in the shared memory, we treat + * as if have used the reservation. + */ + entry->usedReservation = true; + } + + return entry; +} + + +/* + * LocalConnectionReserveHashHash is a utilty function to calculate hash of + * ReservedConnectionHashKey. + */ +static uint32 +LocalConnectionReserveHashHash(const void *key, Size keysize) +{ + ReservedConnectionHashKey *entry = (ReservedConnectionHashKey *) key; + + uint32 hash = string_hash(entry->hostname, MAX_NODE_LENGTH); + hash = hash_combine(hash, hash_uint32(entry->userId)); + hash = hash_combine(hash, hash_uint32(entry->port)); + hash = hash_combine(hash, hash_uint32(entry->databaseOid)); + + return hash; +} + + +/* + * LocalConnectionReserveHashCompare is a utilty function to compare + * ReservedConnectionHashKeys. + */ +static int +LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize) +{ + ReservedConnectionHashKey *ca = (ReservedConnectionHashKey *) a; + ReservedConnectionHashKey *cb = (ReservedConnectionHashKey *) b; + + if (ca->port != cb->port || + ca->databaseOid != cb->databaseOid || + ca->userId != cb->userId || + strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0) + { + return 1; + } + else + { + return 0; + } +} diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 5cfd25e39..abaf334d9 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -18,6 +18,7 @@ #include "distributed/listutils.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" #include "distributed/distributed_planner.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/placement_connection.h" @@ -209,6 +210,14 @@ GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *user { MultiConnection *connection = StartPlacementConnection(flags, placement, userName); + if (connection == NULL) + { + /* connection can only be NULL for optional connections */ + Assert((flags & OPTIONAL_CONNECTION)); + + return NULL; + } + FinishConnectionEstablishment(connection); return connection; } @@ -293,25 +302,25 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, */ chosenConnection = StartNodeUserDatabaseConnection(flags, nodeName, nodePort, userName, NULL); + if (chosenConnection == NULL) + { + /* connection can only be NULL for optional connections */ + Assert((flags & OPTIONAL_CONNECTION)); - /* - * chosenConnection can only be NULL for optional connections, which we - * don't support in this codepath. - */ - Assert((flags & OPTIONAL_CONNECTION) == 0); - Assert(chosenConnection != NULL); + return NULL; + } if ((flags & REQUIRE_CLEAN_CONNECTION) && ConnectionAccessedDifferentPlacement(chosenConnection, placement)) { /* * Cached connection accessed a non-co-located placement in the same - * table or co-location group, while the caller asked for a connection - * per placement. Open a new connection instead. + * table or co-location group, while the caller asked for a clean + * connection. Open a new connection instead. * * We use this for situations in which we want to use a different * connection for every placement, such as COPY. If we blindly returned - * a cached conection that already modified a different, non-co-located + * a cached connection that already modified a different, non-co-located * placement B in the same table or in a table with the same co-location * ID as the current placement, then we'd no longer able to write to * placement B later in the COPY. @@ -321,12 +330,13 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, nodeName, nodePort, userName, NULL); - /* - * chosenConnection can only be NULL for optional connections, - * which we don't support in this codepath. - */ - Assert((flags & OPTIONAL_CONNECTION) == 0); - Assert(chosenConnection != NULL); + if (chosenConnection == NULL) + { + /* connection can only be NULL for optional connections */ + Assert((flags & OPTIONAL_CONNECTION)); + + return NULL; + } Assert(!ConnectionAccessedDifferentPlacement(chosenConnection, placement)); } @@ -1175,6 +1185,19 @@ InitPlacementConnectionManagement(void) } +/* + * UseConnectionPerPlacement returns whether we should use as separate connection + * per placement even if another connection is idle. We mostly use this in testing + * scenarios. + */ +bool +UseConnectionPerPlacement(void) +{ + return ForceMaxQueryParallelization && + MultiShardConnectionType != SEQUENTIAL_CONNECTION; +} + + static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize) { diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 632891c7f..65a5536c4 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -24,9 +24,12 @@ #include "distributed/cancel_utils.h" #include "distributed/connection_management.h" #include "distributed/listutils.h" +#include "distributed/locally_reserved_shared_connections.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" +#include "distributed/placement_connection.h" #include "distributed/shared_connection_stats.h" +#include "distributed/worker_manager.h" #include "distributed/time_constants.h" #include "distributed/tuplestore.h" #include "utils/builtins.h" @@ -37,8 +40,6 @@ #define REMOTE_CONNECTION_STATS_COLUMNS 4 -#define ADJUST_POOLSIZE_AUTOMATICALLY 0 -#define DISABLE_CONNECTION_THROTTLING -1 /* * The data structure used to store data in shared memory. This data structure is only @@ -55,6 +56,7 @@ typedef struct ConnectionStatsSharedData ConditionVariable waitersConditionVariable; } ConnectionStatsSharedData; + typedef struct SharedConnStatsHashKey { /* @@ -106,8 +108,8 @@ static void UnLockConnectionSharedMemory(void); static void SharedConnectionStatsShmemInit(void); static size_t SharedConnectionStatsShmemSize(void); static bool ShouldWaitForConnection(int currentConnectionCount); -static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); static uint32 SharedConnectionHashHash(const void *key, Size keysize); +static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); PG_FUNCTION_INFO_V1(citus_remote_connection_stats); @@ -247,6 +249,18 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) MAX_NODE_LENGTH))); } + /* + * The local session might already have some reserved connections to the given + * node. In that case, we don't need to go through the shared memory. + */ + Oid userId = GetUserId(); + if (CanUseReservedConnection(hostname, port, userId, MyDatabaseId)) + { + MarkReservedConnectionUsed(hostname, port, userId, MyDatabaseId); + + return true; + } + connKey.port = port; connKey.databaseOid = MyDatabaseId; @@ -363,7 +377,7 @@ IncrementSharedConnectionCounter(const char *hostname, int port) /* * DecrementSharedConnectionCounter decrements the shared counter - * for the given hostname and port. + * for the given hostname and port for the given count. */ void DecrementSharedConnectionCounter(const char *hostname, int port) @@ -634,19 +648,6 @@ AdaptiveConnectionManagementFlag(int activeConnectionCount) } -/* - * UseConnectionPerPlacement returns whether we should use a separate connection - * per placement even if another connection is idle. We mostly use this in testing - * scenarios. - */ -bool -UseConnectionPerPlacement(void) -{ - return ForceMaxQueryParallelization && - MultiShardConnectionType != SEQUENTIAL_CONNECTION; -} - - /* * ShouldWaitForConnection returns true if the workerPool should wait to * get the next connection until one slot is empty within diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 4348b697e..aa1998bf0 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -140,6 +140,7 @@ #include "distributed/connection_management.h" #include "distributed/commands/multi_copy.h" #include "distributed/deparse_shard_query.h" +#include "distributed/shared_connection_stats.h" #include "distributed/distributed_execution_locks.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" @@ -1924,10 +1925,8 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) /* - * WorkerPoolCompare is based on WorkerNodeCompare function. - * - * The function compares two worker nodes by their host name and port - * number. + * WorkerPoolCompare is based on WorkerNodeCompare function. The function + * compares two worker nodes by their host name and port number. */ static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey) diff --git a/src/backend/distributed/operations/delete_protocol.c b/src/backend/distributed/operations/delete_protocol.c index 1fb6b54bc..95a72c3da 100644 --- a/src/backend/distributed/operations/delete_protocol.c +++ b/src/backend/distributed/operations/delete_protocol.c @@ -545,6 +545,12 @@ ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement, shardPlacement, NULL); + /* + * This code-path doesn't support optional connections, so we don't expect + * NULL connections. + */ + Assert(connection != NULL); + RemoteTransactionBeginIfNecessary(connection); if (PQstatus(connection->pgConn) != CONNECTION_OK) diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index c551817a1..e1d33ecd8 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -292,8 +292,16 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) ShardPlacement *shardPlacement = NULL; foreach_ptr(shardPlacement, shardPlacementList) { - MultiConnection *connection = GetPlacementConnection(FOR_DML, shardPlacement, - NULL); + int connectionFlags = FOR_DML; + MultiConnection *connection = + GetPlacementConnection(connectionFlags, shardPlacement, NULL); + + /* + * This code-path doesn't support optional connections, so we don't expect + * NULL connections. + */ + Assert(connection != NULL); + PGresult *queryResult = NULL; StringInfo workerAppendQuery = makeStringInfo(); @@ -862,6 +870,12 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam MultiConnection *connection = GetPlacementConnection(connectionFlags, placement, NULL); + /* + * This code-path doesn't support optional connections, so we don't expect + * NULL connections. + */ + Assert(connection != NULL); + *shardSize = 0; *shardMinValue = NULL; *shardMaxValue = NULL; diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index e3020d1b4..13082b2ca 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -625,6 +625,12 @@ FetchRemoteExplainFromWorkers(Task *task, ExplainState *es) MultiConnection *connection = GetPlacementConnection(connectionFlags, taskPlacement, NULL); + /* + * This code-path doesn't support optional connections, so we don't expect + * NULL connections. + */ + Assert(connection != NULL); + /* try other placements if we fail to connect this one */ if (PQstatus(connection->pgConn) != CONNECTION_OK) { diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 4262067bd..ffa811b85 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -38,6 +38,7 @@ #include "distributed/insert_select_executor.h" #include "distributed/intermediate_result_pruning.h" #include "distributed/local_executor.h" +#include "distributed/locally_reserved_shared_connections.h" #include "distributed/maintenanced.h" #include "distributed/metadata_utility.h" #include "distributed/coordinator_protocol.h" @@ -284,6 +285,7 @@ _PG_init(void) InitPlacementConnectionManagement(); InitializeCitusQueryStats(); InitializeSharedConnectionStats(); + InitializeLocallyReservedSharedConnections(); /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) @@ -426,6 +428,13 @@ CitusCleanupConnectionsAtExit(int code, Datum arg) { /* properly close all the cached connections */ ShutdownAllConnections(); + + /* + * Make sure that we give the shared connections back to the shared + * pool if any. This operation is a no-op if the reserved connections + * are already given away. + */ + DeallocateReservedConnections(); } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 01b55d6bb..d54bd3edf 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -27,6 +27,7 @@ #include "distributed/intermediate_results.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" +#include "distributed/locally_reserved_shared_connections.h" #include "distributed/multi_executor.h" #include "distributed/multi_explain.h" #include "distributed/repartition_join_execution.h" @@ -263,6 +264,13 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) ResetGlobalVariables(); + /* + * Make sure that we give the shared connections back to the shared + * pool if any. This operation is a no-op if the reserved connections + * are already given away. + */ + DeallocateReservedConnections(); + UnSetDistributedTransactionId(); /* empty the CommitContext to ensure we're not leaking memory */ @@ -315,6 +323,13 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) ResetGlobalVariables(); + /* + * Make sure that we give the shared connections back to the shared + * pool if any. This operation is a no-op if the reserved connections + * are already given away. + */ + DeallocateReservedConnections(); + /* * We reset these mainly for posterity. The only way we would normally * get here with ExecutorLevel or PlannerLevel > 0 is during a fatal @@ -459,7 +474,6 @@ ResetGlobalVariables() activeSetStmts = NULL; CoordinatedTransactionUses2PC = false; TransactionModifiedNodeMetadata = false; - ResetWorkerErrorIndication(); } diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 875baee96..05e629fc6 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -46,7 +46,11 @@ enum MultiConnectionMode FOR_DML = 1 << 2, - /* connection must not have accessed any non-co-located placements */ + /* + * During COPY we do not want to use a connection that accessed non-co-located + * placements. If there is a connection that did not access another placement, + * then use it. Otherwise open a new clean connection. + */ REQUIRE_CLEAN_CONNECTION = 1 << 3, OUTSIDE_TRANSACTION = 1 << 4, @@ -195,7 +199,7 @@ extern int MaxCachedConnectionsPerWorker; /* parameters used for outbound connections */ extern char *NodeConninfo; -/* the hash table */ +/* the hash tables are externally accessiable */ extern HTAB *ConnectionHash; extern HTAB *ConnParamsHash; @@ -234,6 +238,9 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, const char *database); extern void CloseAllConnectionsAfterTransaction(void); extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); +extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort, + const char *userName, + const char *database); extern void CloseConnection(MultiConnection *connection); extern void ShutdownAllConnections(void); extern void ShutdownConnection(MultiConnection *connection); diff --git a/src/include/distributed/locally_reserved_shared_connections.h b/src/include/distributed/locally_reserved_shared_connections.h new file mode 100644 index 000000000..a282beac0 --- /dev/null +++ b/src/include/distributed/locally_reserved_shared_connections.h @@ -0,0 +1,26 @@ +/*------------------------------------------------------------------------- + * + * locally_reserved_shared_connection_stats.h + * Management of connection reservations in shard memory pool + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ +#define LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ + +#include "distributed/connection_management.h" + + +extern void InitializeLocallyReservedSharedConnections(void); +extern bool CanUseReservedConnection(const char *hostName, int nodePort, + Oid userId, Oid databaseOid); +extern void MarkReservedConnectionUsed(const char *hostName, int nodePort, + Oid userId, Oid databaseOid); +extern void DeallocateReservedConnections(void); +extern void EnsureConnectionPossibilityForPrimaryNodes(void); +extern bool IsReservationPossible(void); + +#endif /* LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ */ diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index a7d68b4e1..000859616 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -76,6 +76,7 @@ extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState); extern TupleTableSlot * AdaptiveExecutor(CitusScanState *scanState); + /* * ExecutionParams contains parameters that are used during the execution. * Some of these can be the zero value if it is not needed during the execution. diff --git a/src/include/distributed/placement_connection.h b/src/include/distributed/placement_connection.h index 02beed1f5..df1688d63 100644 --- a/src/include/distributed/placement_connection.h +++ b/src/include/distributed/placement_connection.h @@ -42,5 +42,6 @@ extern void InitPlacementConnectionManagement(void); extern bool ConnectionModifiedPlacement(MultiConnection *connection); extern bool ConnectionUsedForAnyPlacements(MultiConnection *connection); +extern bool UseConnectionPerPlacement(void); #endif /* PLACEMENT_CONNECTION_H */ diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 8db6256e3..9c31fdcf6 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -11,6 +11,10 @@ #ifndef SHARED_CONNECTION_STATS_H #define SHARED_CONNECTION_STATS_H +#define ADJUST_POOLSIZE_AUTOMATICALLY 0 +#define DISABLE_CONNECTION_THROTTLING -1 + + extern int MaxSharedPoolSize; @@ -23,6 +27,5 @@ extern void WaitLoopForSharedConnection(const char *hostname, int port); extern void DecrementSharedConnectionCounter(const char *hostname, int port); extern void IncrementSharedConnectionCounter(const char *hostname, int port); extern int AdaptiveConnectionManagementFlag(int activeConnectionCount); -extern bool UseConnectionPerPlacement(void); #endif /* SHARED_CONNECTION_STATS_H */ diff --git a/src/test/regress/expected/set_role_in_transaction.out b/src/test/regress/expected/set_role_in_transaction.out index c37d5322c..c7bf71e7b 100644 --- a/src/test/regress/expected/set_role_in_transaction.out +++ b/src/test/regress/expected/set_role_in_transaction.out @@ -77,6 +77,17 @@ SET search_path TO set_role_in_transaction; INSERT INTO t values (2); ERROR: cannot perform query on placements that were modified in this transaction by a different user ROLLBACK; +-- we cannot change role in between COPY commands as well +SET ROLE user1; +SET search_path TO set_role_in_transaction; +BEGIN; + COPY t FROM STDIN; + SET ROLE user2; + SET search_path TO set_role_in_transaction; + COPY t FROM STDIN; +ERROR: cannot perform query on placements that were modified in this transaction by a different user +CONTEXT: COPY t, line 1: "1" +ROLLBACK; RESET ROLE; REVOKE ALL ON SCHEMA set_role_in_transaction FROM user1; REVOKE ALL ON SCHEMA set_role_in_transaction FROM user2; diff --git a/src/test/regress/expected/shared_connection_stats.out b/src/test/regress/expected/shared_connection_stats.out index 3af0ed1ad..72b654488 100644 --- a/src/test/regress/expected/shared_connection_stats.out +++ b/src/test/regress/expected/shared_connection_stats.out @@ -1,5 +1,18 @@ CREATE SCHEMA shared_connection_stats; SET search_path TO shared_connection_stats; +SET citus.next_shard_id TO 14000000; +-- returns the reserved connections per backend +-- given that the code aggresively cleans up reserved connections +-- this function returns empty set in all the tests +-- In fact, we're testing that no reserved connections remain +CREATE OR REPLACE FUNCTION citus_reserved_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT used_reserved_connection bool) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS 'citus', $$citus_reserved_connection_stats$$; -- set the cached connections to zero -- and execute a distributed query so that -- we end up with zero cached connections afterwards @@ -323,7 +336,7 @@ COMMIT; BEGIN; -- now allow at most 2 connections for COPY SET LOCAL citus.max_adaptive_executor_pool_size TO 2; -COPY test FROM STDIN; + COPY test FROM PROGRAM 'seq 32'; SELECT connection_count_to_node FROM @@ -340,6 +353,270 @@ COPY test FROM STDIN; (2 rows) ROLLBACK; +-- now, show that COPY doesn't open more connections than the shared_pool_size +-- now, decrease the shared pool size, and show that COPY doesn't exceed that +ALTER SYSTEM SET citus.max_shared_pool_size TO 3; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +BEGIN; +COPY test FROM PROGRAM 'seq 32'; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 3 + 3 +(2 rows) + +ROLLBACK; +BEGIN; + -- in this test, we trigger touching only one of the workers + -- the first copy touches 3 shards + COPY test FROM STDIN; + -- we see one worker has 3 connections, the other is 1, which is not + -- an already established connection, but a reservation + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 3 + 1 +(2 rows) + +-- in this second COPY, we access the same node but different shards +-- so we test the case where the second COPY cannot get any new connections +-- due to adaptive connection management, and can still continue +COPY test FROM STDIN; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 3 + 1 +(2 rows) + +ROLLBACK; +BEGIN; + -- in this test, we trigger touching only one of the workers + -- the first copy touches 3 shards + SELECT count(*) FROM test WHERE a IN (2,4,5); + count +--------------------------------------------------------------------- + 3 +(1 row) + + -- we see one worker has 3 connections, the other is 1, which is not + -- an already established connection, but a reservation + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 3 +(1 row) + +-- in this second COPY, we access the same node but different shards +-- so we test the case where the second COPY cannot get any new connections +-- due to adaptive connection management, and can still continue +COPY test FROM STDIN; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 3 + 1 +(2 rows) + +ROLLBACK; +BEGIN; + -- when COPY is used with _max_query_parallelization + -- it ignores the shared pool size + SET LOCAL citus.force_max_query_parallelization TO ON; + COPY test FROM PROGRAM 'seq 32'; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 10 + 11 +(2 rows) + +ROLLBACK; +-- INSERT SELECT with RETURNING/ON CONFLICT clauses should honor shared_pool_size +-- in underlying COPY commands +BEGIN; + SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + + INSERT INTO test SELECT i FROM generate_series(0,10) i RETURNING *; + a +--------------------------------------------------------------------- + 0 + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10 +(11 rows) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 3 + 3 +(2 rows) + +ROLLBACK; +-- COPY operations to range partitioned tables will honor max_shared_pool_size +-- as we use a single connection to each worker +CREATE TABLE range_table(a int); +SELECT create_distributed_table('range_table', 'a', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('range_table', + '{0,25,50,76}', + '{24,49,75,200}'); +BEGIN; + SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + + COPY range_table FROM PROGRAM 'seq 32'; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +ROLLBACK; +-- COPY operations to reference tables will use one connection per worker +-- so we will always honor max_shared_pool_size. +CREATE TABLE ref_table(a int); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +BEGIN; + SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + + COPY ref_table FROM PROGRAM 'seq 32'; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +ROLLBACK; +-- reset max_shared_pool_size to default +ALTER SYSTEM RESET citus.max_shared_pool_size; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + -- now show that when max_cached_conns_per_worker > 1 -- Citus forces the first execution to open at least 2 -- connections that are cached. Later, that 2 cached @@ -388,6 +665,149 @@ BEGIN; t (2 rows) +COMMIT; +-- we should not have any reserved connection +-- as all of them have already been either used +-- or cleaned up +SELECT * FROM citus_reserved_connection_stats(); + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- +(0 rows) + +-- reconnect to get rid of cached connections +\c - - - :master_port +SET search_path TO shared_connection_stats; +BEGIN; + INSERT INTO test SELECT i FROM generate_series(0,10)i; + -- after COPY finishes, citus should see the used + -- reserved connections + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- + localhost | 57637 | regression | t + localhost | 57638 | regression | t +(2 rows) + +ROLLBACK; +BEGIN; + -- even if we hit a single shard, all the other reserved + -- connections should be cleaned-up because we do not + -- reserve for the second call as we have the cached + -- connections + INSERT INTO test SELECT 1 FROM generate_series(0,100)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- + localhost | 57637 | regression | f + localhost | 57638 | regression | t +(2 rows) + +ROLLBACK; +BEGIN; + TRUNCATE test; + CREATE UNIQUE INDEX test_unique_index ON test(a); + -- even if we hit a single shard and later fail, all the + -- other reserved connections should be cleaned-up + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; +ERROR: duplicate key value violates unique constraint "test_unique_index_14000001" +DETAIL: Key (a)=(1) already exists. +ROLLBACK; +SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- +(0 rows) + +BEGIN; + -- hits a single shard + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; + -- if COPY hits a single shard, we should have reserved connections + -- to the other nodes + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- + localhost | 57637 | regression | f + localhost | 57638 | regression | t +(2 rows) + + -- we should be able to see this again if the query hits + -- the same shard + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- + localhost | 57637 | regression | f + localhost | 57638 | regression | t +(2 rows) + + -- but when the query hits the other shard(s), we should + -- see that all the reserved connections are used + INSERT INTO test SELECT i FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- + localhost | 57637 | regression | t + localhost | 57638 | regression | t +(2 rows) + +ROLLBACK; +-- at the end of the transaction, all should be cleared +SELECT * FROM citus_reserved_connection_stats(); + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- +(0 rows) + +BEGIN; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + -- the above command used at least one connection per node + -- so the next commands would not need any reserved connections + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- +(0 rows) + + INSERT INTO test SELECT i FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- +(0 rows) + +COMMIT; +-- checkout the reserved connections with cached connections +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 1; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- cache connections to the nodes +SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 123 +(1 row) + +BEGIN; + -- we should not have any reserved connections + -- because we already have available connections + COPY test FROM PROGRAM 'seq 32'; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- +(0 rows) + COMMIT; -- in case other tests relies on these setting, reset them ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; @@ -399,5 +819,7 @@ SELECT pg_reload_conf(); t (1 row) +BEGIN; +SET LOCAL client_min_messages TO WARNING; DROP SCHEMA shared_connection_stats CASCADE; -NOTICE: drop cascades to table test +COMMIT; diff --git a/src/test/regress/sql/set_role_in_transaction.sql b/src/test/regress/sql/set_role_in_transaction.sql index c4dc9cee5..7d27ba3e2 100644 --- a/src/test/regress/sql/set_role_in_transaction.sql +++ b/src/test/regress/sql/set_role_in_transaction.sql @@ -48,6 +48,24 @@ SET search_path TO set_role_in_transaction; INSERT INTO t values (2); ROLLBACK; +-- we cannot change role in between COPY commands as well +SET ROLE user1; +SET search_path TO set_role_in_transaction; +BEGIN; + COPY t FROM STDIN; +1 +2 +3 +\. + SET ROLE user2; + SET search_path TO set_role_in_transaction; + COPY t FROM STDIN; +1 +2 +3 +\. +ROLLBACK; + RESET ROLE; REVOKE ALL ON SCHEMA set_role_in_transaction FROM user1; diff --git a/src/test/regress/sql/shared_connection_stats.sql b/src/test/regress/sql/shared_connection_stats.sql index a0cb1cef0..7488f2d88 100644 --- a/src/test/regress/sql/shared_connection_stats.sql +++ b/src/test/regress/sql/shared_connection_stats.sql @@ -1,6 +1,21 @@ CREATE SCHEMA shared_connection_stats; SET search_path TO shared_connection_stats; +SET citus.next_shard_id TO 14000000; + +-- returns the reserved connections per backend +-- given that the code aggresively cleans up reserved connections +-- this function returns empty set in all the tests +-- In fact, we're testing that no reserved connections remain +CREATE OR REPLACE FUNCTION citus_reserved_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT used_reserved_connection bool) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS 'citus', $$citus_reserved_connection_stats$$; + -- set the cached connections to zero -- and execute a distributed query so that -- we end up with zero cached connections afterwards @@ -193,41 +208,7 @@ COMMIT; BEGIN; -- now allow at most 2 connections for COPY SET LOCAL citus.max_adaptive_executor_pool_size TO 2; - -COPY test FROM STDIN; -1 -2 -3 -4 -5 -6 -7 -8 -9 -10 -11 -12 -13 -14 -15 -16 -17 -18 -19 -20 -21 -22 -23 -24 -25 -26 -27 -28 -29 -30 -31 -32 -\. + COPY test FROM PROGRAM 'seq 32'; SELECT connection_count_to_node @@ -240,6 +221,188 @@ COPY test FROM STDIN; hostname, port; ROLLBACK; +-- now, show that COPY doesn't open more connections than the shared_pool_size + +-- now, decrease the shared pool size, and show that COPY doesn't exceed that +ALTER SYSTEM SET citus.max_shared_pool_size TO 3; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +BEGIN; + +COPY test FROM PROGRAM 'seq 32'; + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +ROLLBACK; + +BEGIN; + -- in this test, we trigger touching only one of the workers + -- the first copy touches 3 shards + COPY test FROM STDIN; +2 +4 +5 +\. + + -- we see one worker has 3 connections, the other is 1, which is not + -- an already established connection, but a reservation + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + +-- in this second COPY, we access the same node but different shards +-- so we test the case where the second COPY cannot get any new connections +-- due to adaptive connection management, and can still continue +COPY test FROM STDIN; +6 +8 +9 +\. + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +ROLLBACK; + + +BEGIN; + -- in this test, we trigger touching only one of the workers + -- the first copy touches 3 shards + SELECT count(*) FROM test WHERE a IN (2,4,5); + + -- we see one worker has 3 connections, the other is 1, which is not + -- an already established connection, but a reservation + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + +-- in this second COPY, we access the same node but different shards +-- so we test the case where the second COPY cannot get any new connections +-- due to adaptive connection management, and can still continue +COPY test FROM STDIN; +6 +8 +9 +\. + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +ROLLBACK; + + +BEGIN; + -- when COPY is used with _max_query_parallelization + -- it ignores the shared pool size + SET LOCAL citus.force_max_query_parallelization TO ON; + COPY test FROM PROGRAM 'seq 32'; + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +ROLLBACK; + +-- INSERT SELECT with RETURNING/ON CONFLICT clauses should honor shared_pool_size +-- in underlying COPY commands +BEGIN; + SELECT pg_sleep(0.1); + INSERT INTO test SELECT i FROM generate_series(0,10) i RETURNING *; + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +ROLLBACK; + +-- COPY operations to range partitioned tables will honor max_shared_pool_size +-- as we use a single connection to each worker +CREATE TABLE range_table(a int); +SELECT create_distributed_table('range_table', 'a', 'range'); +CALL public.create_range_partitioned_shards('range_table', + '{0,25,50,76}', + '{24,49,75,200}'); +BEGIN; + SELECT pg_sleep(0.1); + COPY range_table FROM PROGRAM 'seq 32'; + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +ROLLBACK; + + +-- COPY operations to reference tables will use one connection per worker +-- so we will always honor max_shared_pool_size. +CREATE TABLE ref_table(a int); +SELECT create_reference_table('ref_table'); + +BEGIN; + SELECT pg_sleep(0.1); + COPY ref_table FROM PROGRAM 'seq 32'; + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +ROLLBACK; + +-- reset max_shared_pool_size to default +ALTER SYSTEM RESET citus.max_shared_pool_size; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + -- now show that when max_cached_conns_per_worker > 1 -- Citus forces the first execution to open at least 2 -- connections that are cached. Later, that 2 cached @@ -268,10 +431,97 @@ BEGIN; hostname, port; COMMIT; +-- we should not have any reserved connection +-- as all of them have already been either used +-- or cleaned up +SELECT * FROM citus_reserved_connection_stats(); + +-- reconnect to get rid of cached connections +\c - - - :master_port +SET search_path TO shared_connection_stats; + +BEGIN; + INSERT INTO test SELECT i FROM generate_series(0,10)i; + + -- after COPY finishes, citus should see the used + -- reserved connections + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; +ROLLBACK; + + +BEGIN; + -- even if we hit a single shard, all the other reserved + -- connections should be cleaned-up because we do not + -- reserve for the second call as we have the cached + -- connections + INSERT INTO test SELECT 1 FROM generate_series(0,100)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; +ROLLBACK; + +BEGIN; + TRUNCATE test; + CREATE UNIQUE INDEX test_unique_index ON test(a); + + -- even if we hit a single shard and later fail, all the + -- other reserved connections should be cleaned-up + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; +ROLLBACK; +SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + +BEGIN; + -- hits a single shard + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; + + -- if COPY hits a single shard, we should have reserved connections + -- to the other nodes + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + + -- we should be able to see this again if the query hits + -- the same shard + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + + -- but when the query hits the other shard(s), we should + -- see that all the reserved connections are used + INSERT INTO test SELECT i FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; +ROLLBACK; + +-- at the end of the transaction, all should be cleared +SELECT * FROM citus_reserved_connection_stats(); + +BEGIN; + SELECT count(*) FROM test; + + -- the above command used at least one connection per node + -- so the next commands would not need any reserved connections + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + INSERT INTO test SELECT i FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; +COMMIT; + +-- checkout the reserved connections with cached connections +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 1; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +-- cache connections to the nodes +SELECT count(*) FROM test; +BEGIN; + -- we should not have any reserved connections + -- because we already have available connections + COPY test FROM PROGRAM 'seq 32'; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; +COMMIT; + -- in case other tests relies on these setting, reset them ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; ALTER SYSTEM RESET citus.recover_2pc_interval; ALTER SYSTEM RESET citus.max_cached_conns_per_worker; SELECT pg_reload_conf(); +BEGIN; +SET LOCAL client_min_messages TO WARNING; DROP SCHEMA shared_connection_stats CASCADE; +COMMIT;