From eeb8c81de208ee13a6fddf03840366098c0477b9 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 18 May 2020 21:52:43 +0200 Subject: [PATCH] Implement shared connection count reservation & enable `citus.max_shared_pool_size` for COPY With this patch, we introduce `locally_reserved_shared_connections.c/h` files which are responsible for reserving some space in shared memory counters upfront. We sometimes need to reserve connections, but not necessarily establish them. For example: - COPY command should reserve connections as it cannot know which connections it needs in which order. COPY establishes connections as any input data hits the workers. For example, for router COPY command, it only establishes 1 connection. As discussed here (https://github.com/citusdata/citus/pull/3849#pullrequestreview-431792473), COPY needs to reserve connections up-front, otherwise we can end up with resource starvation/un-detected deadlocks. --- src/backend/distributed/commands/multi_copy.c | 112 +++- .../connection/connection_management.c | 35 +- .../locally_reserved_shared_connections.c | 511 ++++++++++++++++++ .../connection/placement_connection.c | 53 +- .../connection/shared_connection_stats.c | 35 +- .../distributed/executor/adaptive_executor.c | 7 +- .../distributed/operations/delete_protocol.c | 6 + .../distributed/operations/stage_protocol.c | 18 +- .../distributed/planner/multi_explain.c | 6 + src/backend/distributed/shared_library_init.c | 9 + .../transaction/transaction_management.c | 16 +- .../distributed/connection_management.h | 11 +- .../locally_reserved_shared_connections.h | 26 + src/include/distributed/multi_executor.h | 1 + .../distributed/placement_connection.h | 1 + .../distributed/shared_connection_stats.h | 5 +- .../expected/set_role_in_transaction.out | 11 + .../expected/shared_connection_stats.out | 426 ++++++++++++++- .../regress/sql/set_role_in_transaction.sql | 18 + .../regress/sql/shared_connection_stats.sql | 320 +++++++++-- 20 files changed, 1540 insertions(+), 87 deletions(-) create mode 100644 src/backend/distributed/connection/locally_reserved_shared_connections.c create mode 100644 src/include/distributed/locally_reserved_shared_connections.h diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 8d85b5ed9..f8580a6b9 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -79,12 +79,14 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_executor.h" +#include "distributed/locally_reserved_shared_connections.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" #include "distributed/resource_lock.h" #include "distributed/shard_pruning.h" +#include "distributed/shared_connection_stats.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" #include "distributed/local_multi_copy.h" @@ -259,7 +261,7 @@ static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateLis char *nodeName, int nodePort); static List * ConnectionStateList(HTAB *connectionStateHash); static List * ConnectionStateListToNode(HTAB *connectionStateHash, - char *hostname, int port); + const char *hostname, int32 port); static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, bool stopOnFailure, bool @@ -815,6 +817,12 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, MultiConnection *connection = GetPlacementConnection(connectionFlags, placement, nodeUser); + /* + * This code-path doesn't support optional connections, so we don't expect + * NULL connections. + */ + Assert(connection != NULL); + if (PQstatus(connection->pgConn) != CONNECTION_OK) { if (stopOnFailure) @@ -2265,6 +2273,15 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyDest->connectionStateHash = CreateConnectionStateHash(TopTransactionContext); RecordRelationAccessIfReferenceTable(tableId, PLACEMENT_ACCESS_DML); + + /* + * For all the primary (e.g., writable) nodes, reserve a shared connection. + * We do this upfront because we cannot know which nodes are going to be + * accessed. Since the order of the reservation is important, we need to + * do it right here. For the details on why the order important, see + * the function. + */ + EnsureConnectionPossibilityForPrimaryNodes(); } @@ -2938,6 +2955,12 @@ CitusCopyTo(CopyStmt *copyStatement, char *completionTag) shardPlacement, userName); + /* + * This code-path doesn't support optional connections, so we don't expect + * NULL connections. + */ + Assert(connection != NULL); + if (placementIndex == list_length(shardPlacementList) - 1) { /* last chance for this shard */ @@ -3286,10 +3309,10 @@ ConnectionStateList(HTAB *connectionStateHash) /* * ConnectionStateListToNode returns all CopyConnectionState structures in - * the given hash. + * the given hash for a given hostname and port values. */ static List * -ConnectionStateListToNode(HTAB *connectionStateHash, char *hostname, int port) +ConnectionStateListToNode(HTAB *connectionStateHash, const char *hostname, int32 port) { List *connectionStateList = NIL; HASH_SEQ_STATUS status; @@ -3533,18 +3556,37 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, ConnectionStateListToNode(connectionStateHash, nodeName, nodePort); if (HasReachedAdaptiveExecutorPoolSize(copyConnectionStateList)) { - connection = - GetLeastUtilisedCopyConnection(copyConnectionStateList, nodeName, nodePort); - /* * If we've already reached the executor pool size, there should be at * least one connection to any given node. + * + * Note that we don't need to mark the connection as critical, since the + * connection was already returned by this function before. */ - Assert(connection != NULL); + connection = GetLeastUtilisedCopyConnection(copyConnectionStateList, + nodeName, + nodePort); return connection; } + if (IsReservationPossible()) + { + /* + * Enforce the requirements for adaptive connection management + * (a.k.a., throttle connections if citus.max_shared_pool_size + * reached). + * + * Given that we have done reservations per node, we do not ever + * need to pass WAIT_FOR_CONNECTION, we are sure that there is a + * connection either reserved for this backend or already established + * by the previous commands in the same transaction block. + */ + int adaptiveConnectionManagementFlag = OPTIONAL_CONNECTION; + connectionFlags |= adaptiveConnectionManagementFlag; + } + + /* * For placements that haven't been assigned a connection by a previous command * in the current transaction, we use a separate connection per placement for @@ -3571,6 +3613,52 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, } connection = GetPlacementConnection(connectionFlags, placement, nodeUser); + if (connection == NULL) + { + if (list_length(copyConnectionStateList) > 0) + { + /* + * The connection manager throttled any new connections, so pick an existing + * connection with least utilization. + * + * Note that we don't need to mark the connection as critical, since the + * connection was already returned by this function before. + */ + connection = + GetLeastUtilisedCopyConnection(copyConnectionStateList, nodeName, + nodePort); + } + else + { + /* + * For this COPY command, we have not established any connections + * and adaptive connection management throttled the new connection + * request. This could only happen if this COPY command is the + * second (or later) COPY command in a transaction block as the + * first COPY command always gets a connection per node thanks to + * the connection reservation. + * + * As we know that there has been at least one COPY command happened + * earlier, we need to find the connection to that node, and use it. + */ + connection = + ConnectionAvailableToNode(nodeName, nodePort, CurrentUserName(), + CurrentDatabaseName()); + + /* + * We do not expect this to happen, but still instead of an assert, + * we prefer explicit error message. + */ + if (connection == NULL) + { + ereport(ERROR, (errmsg("could not find an available connection"), + errhint("Set citus.max_shared_pool_size TO -1 to let " + "COPY command finish"))); + } + } + + return connection; + } if (PQstatus(connection->pgConn) != CONNECTION_OK) { @@ -3628,6 +3716,8 @@ HasReachedAdaptiveExecutorPoolSize(List *connectionStateList) /* * GetLeastUtilisedCopyConnection returns a MultiConnection to the given node * with the least number of placements assigned to it. + * + * It is assumed that there exists at least one connection to the node. */ static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateList, char *nodeName, @@ -3637,6 +3727,14 @@ GetLeastUtilisedCopyConnection(List *connectionStateList, char *nodeName, int minPlacementCount = PG_INT32_MAX; ListCell *connectionStateCell = NULL; + /* + * We only pick the least utilised connection when some connection limits are + * reached such as max_shared_pool_size or max_adaptive_executor_pool_size. + * + * Therefore there should be some connections to choose from. + */ + Assert(list_length(connectionStateList) > 0); + foreach(connectionStateCell, connectionStateList) { CopyConnectionState *connectionState = lfirst(connectionStateCell); diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 3cc8bb0c5..c25c0a67b 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -46,18 +46,19 @@ int MaxCachedConnectionsPerWorker = 1; HTAB *ConnectionHash = NULL; HTAB *ConnParamsHash = NULL; + MemoryContext ConnectionContext = NULL; static uint32 ConnectionHashHash(const void *key, Size keysize); static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static void StartConnectionEstablishment(MultiConnection *connectionn, ConnectionHashKey *key); +static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static bool ShouldShutdownConnection(MultiConnection *connection, const int cachedConnectionCount); static void ResetConnection(MultiConnection *connection); -static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); @@ -497,6 +498,38 @@ CloseAllConnectionsAfterTransaction(void) } +/* + * ConnectionAvailableToNode returns a MultiConnection if the session has at least + * one connection established and avaliable to use to the give node. Else, returns + * false. + */ +MultiConnection * +ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName, + const char *database) +{ + ConnectionHashKey key; + bool found = false; + + strlcpy(key.hostname, hostName, MAX_NODE_LENGTH); + key.port = nodePort; + strlcpy(key.user, userName, NAMEDATALEN); + strlcpy(key.database, database, NAMEDATALEN); + + ConnectionHashEntry *entry = + (ConnectionHashEntry *) hash_search(ConnectionHash, &key, HASH_FIND, &found); + + if (!found) + { + return false; + } + + int flags = 0; + MultiConnection *connection = FindAvailableConnection(entry->connections, flags); + + return connection; +} + + /* * CloseNodeConnectionsAfterTransaction sets the forceClose flag of the connections * to a particular node as true such that the connections are no longer cached. This diff --git a/src/backend/distributed/connection/locally_reserved_shared_connections.c b/src/backend/distributed/connection/locally_reserved_shared_connections.c new file mode 100644 index 000000000..a4dc337cd --- /dev/null +++ b/src/backend/distributed/connection/locally_reserved_shared_connections.c @@ -0,0 +1,511 @@ +/*------------------------------------------------------------------------- + * + * locally_reserved_shared_connections.c + * + * Keeps track of the number of reserved connections to remote nodes + * for this backend. The primary goal is to complement the logic + * implemented in shared_connections.c which aims to prevent excessive + * number of connections (typically > max_connections) to any worker node. + * With this locally reserved connection stats, we enforce the same + * constraints considering these locally reserved shared connections. + * + * To be more precise, shared connection stats are incremented only with two + * operations: (a) Establishing a connection to a remote node + * (b) Reserving connections, the logic that this + * file implements. + * + * Finally, as the name already implies, once a node has reserved a shared + * connection, it is guaranteed to have the right to establish a connection + * to the given remote node when needed. + * + * For COPY command, we use this fact to reserve connections to the remote nodes + * in the same order as the adaptive executor in order to prevent any resource + * starvations. We need to do this because COPY establishes connections when it + * recieves a tuple that targets a remote node. This is a valuable optimization + * to prevent unnecessary connection establishments, which are pretty expensive. + * Instead, COPY command can reserve connections upfront, and utilize them when + * they are actually needed. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" + +#include "access/hash.h" +#include "commands/dbcommands.h" +#include "distributed/listutils.h" +#include "distributed/locally_reserved_shared_connections.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" +#include "distributed/placement_connection.h" +#include "distributed/shared_connection_stats.h" +#include "distributed/tuplestore.h" +#include "distributed/worker_manager.h" +#include "utils/hashutils.h" +#include "utils/builtins.h" + + +#define RESERVED_CONNECTION_COLUMNS 4 + + +/* session specific hash map*/ +static HTAB *SessionLocalReservedConnections = NULL; + + +/* + * Hash key for connection reservations + */ +typedef struct ReservedConnectionHashKey +{ + char hostname[MAX_NODE_LENGTH]; + int32 port; + Oid databaseOid; + Oid userId; +} ReservedConnectionHashKey; + +/* + * Hash entry for per worker information. The rules are as follows: + * - If there is no entry in the hash, we can make a reservation. + * - If usedReservation is false, we have a reservation that we can use. + * - If usedReservation is true, we used the reservation and cannot make more reservations. + */ +typedef struct ReservedConnectionHashEntry +{ + ReservedConnectionHashKey key; + + bool usedReservation; +} ReservedConnectionHashEntry; + + +static void StoreAllReservedConnections(Tuplestorestate *tupleStore, + TupleDesc tupleDescriptor); +static ReservedConnectionHashEntry * AllocateOrGetReservedConectionEntry(char *hostName, + int nodePort, Oid + userId, Oid + databaseOid, + bool *found); +static void EnsureConnectionPossibilityForNodeList(List *nodeList); +static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize); +static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize); + + +PG_FUNCTION_INFO_V1(citus_reserved_connection_stats); + +/* + * citus_reserved_connection_stats returns all the avaliable information about all + * the reserved connections. This function is used mostly for testing. + */ +Datum +citus_reserved_connection_stats(PG_FUNCTION_ARGS) +{ + TupleDesc tupleDescriptor = NULL; + + CheckCitusVersion(ERROR); + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); + + StoreAllReservedConnections(tupleStore, tupleDescriptor); + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupleStore); + + PG_RETURN_VOID(); +} + + +/* + * StoreAllReservedConnections gets connections established from the current node + * and inserts them into the given tuplestore. + * + * We don't need to enforce any access privileges as the number of backends + * on any node is already visible on pg_stat_activity to all users. + */ +static void +StoreAllReservedConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) +{ + Datum values[RESERVED_CONNECTION_COLUMNS]; + bool isNulls[RESERVED_CONNECTION_COLUMNS]; + + HASH_SEQ_STATUS status; + ReservedConnectionHashEntry *connectionEntry = NULL; + + hash_seq_init(&status, SessionLocalReservedConnections); + while ((connectionEntry = + (ReservedConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + /* get ready for the next tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + char *databaseName = get_database_name(connectionEntry->key.databaseOid); + if (databaseName == NULL) + { + /* database might have been dropped */ + continue; + } + + values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.hostname)); + values[1] = Int32GetDatum(connectionEntry->key.port); + values[2] = PointerGetDatum(cstring_to_text(databaseName)); + values[3] = BoolGetDatum(connectionEntry->usedReservation); + + tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); + } +} + + +/* + * InitializeLocallyReservedSharedConnections initializes the hashmap in + * ConnectionContext. + */ +void +InitializeLocallyReservedSharedConnections(void) +{ + HASHCTL reservedConnectionInfo; + + memset(&reservedConnectionInfo, 0, sizeof(reservedConnectionInfo)); + reservedConnectionInfo.keysize = sizeof(ReservedConnectionHashKey); + reservedConnectionInfo.entrysize = sizeof(ReservedConnectionHashEntry); + + /* + * ConnectionContext is the session local memory context that is used for + * tracking remote connections. + */ + reservedConnectionInfo.hcxt = ConnectionContext; + + reservedConnectionInfo.hash = LocalConnectionReserveHashHash; + reservedConnectionInfo.match = LocalConnectionReserveHashCompare; + + uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE); + + SessionLocalReservedConnections = + hash_create("citus session level reserved connectios (host,port,database,user)", + 64, &reservedConnectionInfo, hashFlags); +} + + +/* + * CanUseReservedConnection returns true if we have already reserved at least + * one shared connection in this session that is not used. + */ +bool +CanUseReservedConnection(const char *hostName, int nodePort, Oid userId, + Oid databaseOid) +{ + ReservedConnectionHashKey key; + + strlcpy(key.hostname, hostName, MAX_NODE_LENGTH); + key.userId = userId; + key.port = nodePort; + key.databaseOid = databaseOid; + + bool found = false; + ReservedConnectionHashEntry *entry = + (ReservedConnectionHashEntry *) hash_search(SessionLocalReservedConnections, &key, + HASH_FIND, &found); + + if (!found || !entry) + { + return false; + } + + return !entry->usedReservation; +} + + +/* + * DeallocateReservedConnections is responsible for two things. First, if the operation + * has reserved a connection but not used, it gives back the connection back to the + * shared memory pool. Second, for all cases, it deallocates the session local entry from + * the hash. + */ +void +DeallocateReservedConnections(void) +{ + HASH_SEQ_STATUS status; + ReservedConnectionHashEntry *entry; + + hash_seq_init(&status, SessionLocalReservedConnections); + while ((entry = (ReservedConnectionHashEntry *) hash_seq_search(&status)) != 0) + { + if (!entry->usedReservation) + { + /* + * We have not used this reservation, make sure to clean-up from + * the shared memory as well. + */ + DecrementSharedConnectionCounter(entry->key.hostname, entry->key.port); + + /* for completeness, set it to true */ + entry->usedReservation = true; + } + + /* + * We cleaned up all the entries because we may not need reserved connections + * in the next iteration. + */ + bool found = false; + hash_search(SessionLocalReservedConnections, entry, HASH_REMOVE, &found); + Assert(found); + } +} + + +/* + * MarkReservedConnectionUsed sets the local hash that the reservation is used. + */ +void +MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId, + Oid databaseOid) +{ + ReservedConnectionHashKey key; + + strlcpy(key.hostname, hostName, MAX_NODE_LENGTH); + key.userId = userId; + key.port = nodePort; + key.databaseOid = databaseOid; + + bool found = false; + ReservedConnectionHashEntry *entry = + (ReservedConnectionHashEntry *) hash_search( + SessionLocalReservedConnections, &key, HASH_FIND, &found); + + if (!found) + { + ereport(ERROR, (errmsg("BUG: untracked reserved connection"), + errhint("Set citus.max_shared_pool_size TO -1 to " + "disable reserved connection counters"))); + } + + /* a reservation can only be used once */ + Assert(!entry->usedReservation); + + entry->usedReservation = true; +} + + +/* + * EnsureConnectionPossibilityForPrimaryNodes is a wrapper around + * EnsureConnectionPossibilityForNodeList. + */ +void +EnsureConnectionPossibilityForPrimaryNodes(void) +{ + /* + * By using NoLock there is a tiny risk of that we miss to reserve a + * connection for a concurrently added node. However, that doesn't + * seem to cause any problems as none of the placements that we are + * going to access would be on the new node. + */ + List *primaryNodeList = ActivePrimaryNodeList(NoLock); + + EnsureConnectionPossibilityForNodeList(primaryNodeList); +} + + +/* + * EnsureConnectionPossibilityForNodeList reserves a shared connection + * counter per node in the nodeList unless: + * - Reservation is needed (see IsReservationPossible()) + * - there is at least one connection to the node so that we are guranteed + * to get a connection + * - An earlier call already reserved a connection (e.g., we allow only a + * single reservation per backend) + */ +static void +EnsureConnectionPossibilityForNodeList(List *nodeList) +{ + if (!IsReservationPossible()) + { + return; + } + + /* + * We sort the workerList because adaptive connection management + * (e.g., OPTIONAL_CONNECTION) requires any concurrent executions + * to wait for the connections in the same order to prevent any + * starvation. If we don't sort, we might end up with: + * Execution 1: Get connection for worker 1, wait for worker 2 + * Execution 2: Get connection for worker 2, wait for worker 1 + * + * and, none could proceed. Instead, we enforce every execution establish + * the required connections to workers in the same order. + */ + nodeList = SortList(nodeList, CompareWorkerNodes); + + char *databaseName = get_database_name(MyDatabaseId); + Oid userId = GetUserId(); + char *userName = GetUserNameFromId(userId, false); + + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, nodeList) + { + if (ConnectionAvailableToNode(workerNode->workerName, workerNode->workerPort, + userName, databaseName) != NULL) + { + /* + * The same user has already an active connection for the node. It + * means that the execution can use the same connection, so reservation + * is not necessary. + */ + continue; + } + + /* + * We are trying to be defensive here by ensuring that the required hash + * table entry can be allocated. The main goal is that we don't want to be + * in a situation where shared connection counter is incremented but not + * the local reserved counter due to out-of-memory. + * + * Note that shared connection stats operate on the shared memory, and we + * pre-allocate all the necessary memory. In other words, it would never + * throw out of memory error. + */ + bool found = false; + ReservedConnectionHashEntry *hashEntry = + AllocateOrGetReservedConectionEntry(workerNode->workerName, + workerNode->workerPort, + userId, MyDatabaseId, &found); + + if (found) + { + /* + * We have already reserved a connection for this user and database + * on the worker. We only allow a single reservation per + * transaction block. The reason is that the earlier command (either in + * a transaction block or a function call triggered by a single command) + * was able to reserve or establish a connection. That connection is + * guranteed to be avaliable for us. + */ + continue; + } + + /* + * Increment the shared counter, we may need to wait if there are + * no space left. + */ + WaitLoopForSharedConnection(workerNode->workerName, workerNode->workerPort); + + /* locally mark that we have one connection reserved */ + hashEntry->usedReservation = false; + } +} + + +/* + * IsReservationPossible returns true if the state of the current + * session is eligible for shared connection reservation. + */ +bool +IsReservationPossible(void) +{ + if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) + { + /* connection throttling disabled */ + return false; + } + + if (UseConnectionPerPlacement()) + { + /* + * For this case, we are not enforcing adaptive + * connection management anyway. + */ + return false; + } + + if (SessionLocalReservedConnections == NULL) + { + /* + * This is unexpected as SessionLocalReservedConnections hash table is + * created at startup. Still, let's be defensive. + */ + return false; + } + + return true; +} + + +/* + * AllocateReservedConectionEntry allocates the required entry in the hash + * map by HASH_ENTER. The function throws an error if it cannot allocate + * the entry. + */ +static ReservedConnectionHashEntry * +AllocateOrGetReservedConectionEntry(char *hostName, int nodePort, Oid userId, + Oid databaseOid, bool *found) +{ + ReservedConnectionHashKey key; + + *found = false; + + strlcpy(key.hostname, hostName, MAX_NODE_LENGTH); + key.userId = userId; + key.port = nodePort; + key.databaseOid = databaseOid; + + /* + * Entering a new entry with HASH_ENTER flag is enough as it would + * throw out-of-memory error as it internally does palloc. + */ + ReservedConnectionHashEntry *entry = + (ReservedConnectionHashEntry *) hash_search(SessionLocalReservedConnections, + &key, HASH_ENTER, found); + + if (!*found) + { + /* + * Until we reserve connection in the shared memory, we treat + * as if have used the reservation. + */ + entry->usedReservation = true; + } + + return entry; +} + + +/* + * LocalConnectionReserveHashHash is a utilty function to calculate hash of + * ReservedConnectionHashKey. + */ +static uint32 +LocalConnectionReserveHashHash(const void *key, Size keysize) +{ + ReservedConnectionHashKey *entry = (ReservedConnectionHashKey *) key; + + uint32 hash = string_hash(entry->hostname, MAX_NODE_LENGTH); + hash = hash_combine(hash, hash_uint32(entry->userId)); + hash = hash_combine(hash, hash_uint32(entry->port)); + hash = hash_combine(hash, hash_uint32(entry->databaseOid)); + + return hash; +} + + +/* + * LocalConnectionReserveHashCompare is a utilty function to compare + * ReservedConnectionHashKeys. + */ +static int +LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize) +{ + ReservedConnectionHashKey *ca = (ReservedConnectionHashKey *) a; + ReservedConnectionHashKey *cb = (ReservedConnectionHashKey *) b; + + if (ca->port != cb->port || + ca->databaseOid != cb->databaseOid || + ca->userId != cb->userId || + strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0) + { + return 1; + } + else + { + return 0; + } +} diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 5cfd25e39..abaf334d9 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -18,6 +18,7 @@ #include "distributed/listutils.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" #include "distributed/distributed_planner.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/placement_connection.h" @@ -209,6 +210,14 @@ GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *user { MultiConnection *connection = StartPlacementConnection(flags, placement, userName); + if (connection == NULL) + { + /* connection can only be NULL for optional connections */ + Assert((flags & OPTIONAL_CONNECTION)); + + return NULL; + } + FinishConnectionEstablishment(connection); return connection; } @@ -293,25 +302,25 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, */ chosenConnection = StartNodeUserDatabaseConnection(flags, nodeName, nodePort, userName, NULL); + if (chosenConnection == NULL) + { + /* connection can only be NULL for optional connections */ + Assert((flags & OPTIONAL_CONNECTION)); - /* - * chosenConnection can only be NULL for optional connections, which we - * don't support in this codepath. - */ - Assert((flags & OPTIONAL_CONNECTION) == 0); - Assert(chosenConnection != NULL); + return NULL; + } if ((flags & REQUIRE_CLEAN_CONNECTION) && ConnectionAccessedDifferentPlacement(chosenConnection, placement)) { /* * Cached connection accessed a non-co-located placement in the same - * table or co-location group, while the caller asked for a connection - * per placement. Open a new connection instead. + * table or co-location group, while the caller asked for a clean + * connection. Open a new connection instead. * * We use this for situations in which we want to use a different * connection for every placement, such as COPY. If we blindly returned - * a cached conection that already modified a different, non-co-located + * a cached connection that already modified a different, non-co-located * placement B in the same table or in a table with the same co-location * ID as the current placement, then we'd no longer able to write to * placement B later in the COPY. @@ -321,12 +330,13 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, nodeName, nodePort, userName, NULL); - /* - * chosenConnection can only be NULL for optional connections, - * which we don't support in this codepath. - */ - Assert((flags & OPTIONAL_CONNECTION) == 0); - Assert(chosenConnection != NULL); + if (chosenConnection == NULL) + { + /* connection can only be NULL for optional connections */ + Assert((flags & OPTIONAL_CONNECTION)); + + return NULL; + } Assert(!ConnectionAccessedDifferentPlacement(chosenConnection, placement)); } @@ -1175,6 +1185,19 @@ InitPlacementConnectionManagement(void) } +/* + * UseConnectionPerPlacement returns whether we should use as separate connection + * per placement even if another connection is idle. We mostly use this in testing + * scenarios. + */ +bool +UseConnectionPerPlacement(void) +{ + return ForceMaxQueryParallelization && + MultiShardConnectionType != SEQUENTIAL_CONNECTION; +} + + static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize) { diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 632891c7f..65a5536c4 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -24,9 +24,12 @@ #include "distributed/cancel_utils.h" #include "distributed/connection_management.h" #include "distributed/listutils.h" +#include "distributed/locally_reserved_shared_connections.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" +#include "distributed/placement_connection.h" #include "distributed/shared_connection_stats.h" +#include "distributed/worker_manager.h" #include "distributed/time_constants.h" #include "distributed/tuplestore.h" #include "utils/builtins.h" @@ -37,8 +40,6 @@ #define REMOTE_CONNECTION_STATS_COLUMNS 4 -#define ADJUST_POOLSIZE_AUTOMATICALLY 0 -#define DISABLE_CONNECTION_THROTTLING -1 /* * The data structure used to store data in shared memory. This data structure is only @@ -55,6 +56,7 @@ typedef struct ConnectionStatsSharedData ConditionVariable waitersConditionVariable; } ConnectionStatsSharedData; + typedef struct SharedConnStatsHashKey { /* @@ -106,8 +108,8 @@ static void UnLockConnectionSharedMemory(void); static void SharedConnectionStatsShmemInit(void); static size_t SharedConnectionStatsShmemSize(void); static bool ShouldWaitForConnection(int currentConnectionCount); -static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); static uint32 SharedConnectionHashHash(const void *key, Size keysize); +static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); PG_FUNCTION_INFO_V1(citus_remote_connection_stats); @@ -247,6 +249,18 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) MAX_NODE_LENGTH))); } + /* + * The local session might already have some reserved connections to the given + * node. In that case, we don't need to go through the shared memory. + */ + Oid userId = GetUserId(); + if (CanUseReservedConnection(hostname, port, userId, MyDatabaseId)) + { + MarkReservedConnectionUsed(hostname, port, userId, MyDatabaseId); + + return true; + } + connKey.port = port; connKey.databaseOid = MyDatabaseId; @@ -363,7 +377,7 @@ IncrementSharedConnectionCounter(const char *hostname, int port) /* * DecrementSharedConnectionCounter decrements the shared counter - * for the given hostname and port. + * for the given hostname and port for the given count. */ void DecrementSharedConnectionCounter(const char *hostname, int port) @@ -634,19 +648,6 @@ AdaptiveConnectionManagementFlag(int activeConnectionCount) } -/* - * UseConnectionPerPlacement returns whether we should use a separate connection - * per placement even if another connection is idle. We mostly use this in testing - * scenarios. - */ -bool -UseConnectionPerPlacement(void) -{ - return ForceMaxQueryParallelization && - MultiShardConnectionType != SEQUENTIAL_CONNECTION; -} - - /* * ShouldWaitForConnection returns true if the workerPool should wait to * get the next connection until one slot is empty within diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 4348b697e..aa1998bf0 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -140,6 +140,7 @@ #include "distributed/connection_management.h" #include "distributed/commands/multi_copy.h" #include "distributed/deparse_shard_query.h" +#include "distributed/shared_connection_stats.h" #include "distributed/distributed_execution_locks.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" @@ -1924,10 +1925,8 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) /* - * WorkerPoolCompare is based on WorkerNodeCompare function. - * - * The function compares two worker nodes by their host name and port - * number. + * WorkerPoolCompare is based on WorkerNodeCompare function. The function + * compares two worker nodes by their host name and port number. */ static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey) diff --git a/src/backend/distributed/operations/delete_protocol.c b/src/backend/distributed/operations/delete_protocol.c index 1fb6b54bc..95a72c3da 100644 --- a/src/backend/distributed/operations/delete_protocol.c +++ b/src/backend/distributed/operations/delete_protocol.c @@ -545,6 +545,12 @@ ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement, shardPlacement, NULL); + /* + * This code-path doesn't support optional connections, so we don't expect + * NULL connections. + */ + Assert(connection != NULL); + RemoteTransactionBeginIfNecessary(connection); if (PQstatus(connection->pgConn) != CONNECTION_OK) diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index c551817a1..e1d33ecd8 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -292,8 +292,16 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) ShardPlacement *shardPlacement = NULL; foreach_ptr(shardPlacement, shardPlacementList) { - MultiConnection *connection = GetPlacementConnection(FOR_DML, shardPlacement, - NULL); + int connectionFlags = FOR_DML; + MultiConnection *connection = + GetPlacementConnection(connectionFlags, shardPlacement, NULL); + + /* + * This code-path doesn't support optional connections, so we don't expect + * NULL connections. + */ + Assert(connection != NULL); + PGresult *queryResult = NULL; StringInfo workerAppendQuery = makeStringInfo(); @@ -862,6 +870,12 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam MultiConnection *connection = GetPlacementConnection(connectionFlags, placement, NULL); + /* + * This code-path doesn't support optional connections, so we don't expect + * NULL connections. + */ + Assert(connection != NULL); + *shardSize = 0; *shardMinValue = NULL; *shardMaxValue = NULL; diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index e3020d1b4..13082b2ca 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -625,6 +625,12 @@ FetchRemoteExplainFromWorkers(Task *task, ExplainState *es) MultiConnection *connection = GetPlacementConnection(connectionFlags, taskPlacement, NULL); + /* + * This code-path doesn't support optional connections, so we don't expect + * NULL connections. + */ + Assert(connection != NULL); + /* try other placements if we fail to connect this one */ if (PQstatus(connection->pgConn) != CONNECTION_OK) { diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 4262067bd..ffa811b85 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -38,6 +38,7 @@ #include "distributed/insert_select_executor.h" #include "distributed/intermediate_result_pruning.h" #include "distributed/local_executor.h" +#include "distributed/locally_reserved_shared_connections.h" #include "distributed/maintenanced.h" #include "distributed/metadata_utility.h" #include "distributed/coordinator_protocol.h" @@ -284,6 +285,7 @@ _PG_init(void) InitPlacementConnectionManagement(); InitializeCitusQueryStats(); InitializeSharedConnectionStats(); + InitializeLocallyReservedSharedConnections(); /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) @@ -426,6 +428,13 @@ CitusCleanupConnectionsAtExit(int code, Datum arg) { /* properly close all the cached connections */ ShutdownAllConnections(); + + /* + * Make sure that we give the shared connections back to the shared + * pool if any. This operation is a no-op if the reserved connections + * are already given away. + */ + DeallocateReservedConnections(); } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 01b55d6bb..d54bd3edf 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -27,6 +27,7 @@ #include "distributed/intermediate_results.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" +#include "distributed/locally_reserved_shared_connections.h" #include "distributed/multi_executor.h" #include "distributed/multi_explain.h" #include "distributed/repartition_join_execution.h" @@ -263,6 +264,13 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) ResetGlobalVariables(); + /* + * Make sure that we give the shared connections back to the shared + * pool if any. This operation is a no-op if the reserved connections + * are already given away. + */ + DeallocateReservedConnections(); + UnSetDistributedTransactionId(); /* empty the CommitContext to ensure we're not leaking memory */ @@ -315,6 +323,13 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) ResetGlobalVariables(); + /* + * Make sure that we give the shared connections back to the shared + * pool if any. This operation is a no-op if the reserved connections + * are already given away. + */ + DeallocateReservedConnections(); + /* * We reset these mainly for posterity. The only way we would normally * get here with ExecutorLevel or PlannerLevel > 0 is during a fatal @@ -459,7 +474,6 @@ ResetGlobalVariables() activeSetStmts = NULL; CoordinatedTransactionUses2PC = false; TransactionModifiedNodeMetadata = false; - ResetWorkerErrorIndication(); } diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 875baee96..05e629fc6 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -46,7 +46,11 @@ enum MultiConnectionMode FOR_DML = 1 << 2, - /* connection must not have accessed any non-co-located placements */ + /* + * During COPY we do not want to use a connection that accessed non-co-located + * placements. If there is a connection that did not access another placement, + * then use it. Otherwise open a new clean connection. + */ REQUIRE_CLEAN_CONNECTION = 1 << 3, OUTSIDE_TRANSACTION = 1 << 4, @@ -195,7 +199,7 @@ extern int MaxCachedConnectionsPerWorker; /* parameters used for outbound connections */ extern char *NodeConninfo; -/* the hash table */ +/* the hash tables are externally accessiable */ extern HTAB *ConnectionHash; extern HTAB *ConnParamsHash; @@ -234,6 +238,9 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, const char *database); extern void CloseAllConnectionsAfterTransaction(void); extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); +extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort, + const char *userName, + const char *database); extern void CloseConnection(MultiConnection *connection); extern void ShutdownAllConnections(void); extern void ShutdownConnection(MultiConnection *connection); diff --git a/src/include/distributed/locally_reserved_shared_connections.h b/src/include/distributed/locally_reserved_shared_connections.h new file mode 100644 index 000000000..a282beac0 --- /dev/null +++ b/src/include/distributed/locally_reserved_shared_connections.h @@ -0,0 +1,26 @@ +/*------------------------------------------------------------------------- + * + * locally_reserved_shared_connection_stats.h + * Management of connection reservations in shard memory pool + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ +#define LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ + +#include "distributed/connection_management.h" + + +extern void InitializeLocallyReservedSharedConnections(void); +extern bool CanUseReservedConnection(const char *hostName, int nodePort, + Oid userId, Oid databaseOid); +extern void MarkReservedConnectionUsed(const char *hostName, int nodePort, + Oid userId, Oid databaseOid); +extern void DeallocateReservedConnections(void); +extern void EnsureConnectionPossibilityForPrimaryNodes(void); +extern bool IsReservationPossible(void); + +#endif /* LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ */ diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index a7d68b4e1..000859616 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -76,6 +76,7 @@ extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState); extern TupleTableSlot * AdaptiveExecutor(CitusScanState *scanState); + /* * ExecutionParams contains parameters that are used during the execution. * Some of these can be the zero value if it is not needed during the execution. diff --git a/src/include/distributed/placement_connection.h b/src/include/distributed/placement_connection.h index 02beed1f5..df1688d63 100644 --- a/src/include/distributed/placement_connection.h +++ b/src/include/distributed/placement_connection.h @@ -42,5 +42,6 @@ extern void InitPlacementConnectionManagement(void); extern bool ConnectionModifiedPlacement(MultiConnection *connection); extern bool ConnectionUsedForAnyPlacements(MultiConnection *connection); +extern bool UseConnectionPerPlacement(void); #endif /* PLACEMENT_CONNECTION_H */ diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 8db6256e3..9c31fdcf6 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -11,6 +11,10 @@ #ifndef SHARED_CONNECTION_STATS_H #define SHARED_CONNECTION_STATS_H +#define ADJUST_POOLSIZE_AUTOMATICALLY 0 +#define DISABLE_CONNECTION_THROTTLING -1 + + extern int MaxSharedPoolSize; @@ -23,6 +27,5 @@ extern void WaitLoopForSharedConnection(const char *hostname, int port); extern void DecrementSharedConnectionCounter(const char *hostname, int port); extern void IncrementSharedConnectionCounter(const char *hostname, int port); extern int AdaptiveConnectionManagementFlag(int activeConnectionCount); -extern bool UseConnectionPerPlacement(void); #endif /* SHARED_CONNECTION_STATS_H */ diff --git a/src/test/regress/expected/set_role_in_transaction.out b/src/test/regress/expected/set_role_in_transaction.out index c37d5322c..c7bf71e7b 100644 --- a/src/test/regress/expected/set_role_in_transaction.out +++ b/src/test/regress/expected/set_role_in_transaction.out @@ -77,6 +77,17 @@ SET search_path TO set_role_in_transaction; INSERT INTO t values (2); ERROR: cannot perform query on placements that were modified in this transaction by a different user ROLLBACK; +-- we cannot change role in between COPY commands as well +SET ROLE user1; +SET search_path TO set_role_in_transaction; +BEGIN; + COPY t FROM STDIN; + SET ROLE user2; + SET search_path TO set_role_in_transaction; + COPY t FROM STDIN; +ERROR: cannot perform query on placements that were modified in this transaction by a different user +CONTEXT: COPY t, line 1: "1" +ROLLBACK; RESET ROLE; REVOKE ALL ON SCHEMA set_role_in_transaction FROM user1; REVOKE ALL ON SCHEMA set_role_in_transaction FROM user2; diff --git a/src/test/regress/expected/shared_connection_stats.out b/src/test/regress/expected/shared_connection_stats.out index 3af0ed1ad..72b654488 100644 --- a/src/test/regress/expected/shared_connection_stats.out +++ b/src/test/regress/expected/shared_connection_stats.out @@ -1,5 +1,18 @@ CREATE SCHEMA shared_connection_stats; SET search_path TO shared_connection_stats; +SET citus.next_shard_id TO 14000000; +-- returns the reserved connections per backend +-- given that the code aggresively cleans up reserved connections +-- this function returns empty set in all the tests +-- In fact, we're testing that no reserved connections remain +CREATE OR REPLACE FUNCTION citus_reserved_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT used_reserved_connection bool) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS 'citus', $$citus_reserved_connection_stats$$; -- set the cached connections to zero -- and execute a distributed query so that -- we end up with zero cached connections afterwards @@ -323,7 +336,7 @@ COMMIT; BEGIN; -- now allow at most 2 connections for COPY SET LOCAL citus.max_adaptive_executor_pool_size TO 2; -COPY test FROM STDIN; + COPY test FROM PROGRAM 'seq 32'; SELECT connection_count_to_node FROM @@ -340,6 +353,270 @@ COPY test FROM STDIN; (2 rows) ROLLBACK; +-- now, show that COPY doesn't open more connections than the shared_pool_size +-- now, decrease the shared pool size, and show that COPY doesn't exceed that +ALTER SYSTEM SET citus.max_shared_pool_size TO 3; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +BEGIN; +COPY test FROM PROGRAM 'seq 32'; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 3 + 3 +(2 rows) + +ROLLBACK; +BEGIN; + -- in this test, we trigger touching only one of the workers + -- the first copy touches 3 shards + COPY test FROM STDIN; + -- we see one worker has 3 connections, the other is 1, which is not + -- an already established connection, but a reservation + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 3 + 1 +(2 rows) + +-- in this second COPY, we access the same node but different shards +-- so we test the case where the second COPY cannot get any new connections +-- due to adaptive connection management, and can still continue +COPY test FROM STDIN; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 3 + 1 +(2 rows) + +ROLLBACK; +BEGIN; + -- in this test, we trigger touching only one of the workers + -- the first copy touches 3 shards + SELECT count(*) FROM test WHERE a IN (2,4,5); + count +--------------------------------------------------------------------- + 3 +(1 row) + + -- we see one worker has 3 connections, the other is 1, which is not + -- an already established connection, but a reservation + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 3 +(1 row) + +-- in this second COPY, we access the same node but different shards +-- so we test the case where the second COPY cannot get any new connections +-- due to adaptive connection management, and can still continue +COPY test FROM STDIN; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 3 + 1 +(2 rows) + +ROLLBACK; +BEGIN; + -- when COPY is used with _max_query_parallelization + -- it ignores the shared pool size + SET LOCAL citus.force_max_query_parallelization TO ON; + COPY test FROM PROGRAM 'seq 32'; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 10 + 11 +(2 rows) + +ROLLBACK; +-- INSERT SELECT with RETURNING/ON CONFLICT clauses should honor shared_pool_size +-- in underlying COPY commands +BEGIN; + SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + + INSERT INTO test SELECT i FROM generate_series(0,10) i RETURNING *; + a +--------------------------------------------------------------------- + 0 + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10 +(11 rows) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 3 + 3 +(2 rows) + +ROLLBACK; +-- COPY operations to range partitioned tables will honor max_shared_pool_size +-- as we use a single connection to each worker +CREATE TABLE range_table(a int); +SELECT create_distributed_table('range_table', 'a', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('range_table', + '{0,25,50,76}', + '{24,49,75,200}'); +BEGIN; + SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + + COPY range_table FROM PROGRAM 'seq 32'; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +ROLLBACK; +-- COPY operations to reference tables will use one connection per worker +-- so we will always honor max_shared_pool_size. +CREATE TABLE ref_table(a int); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +BEGIN; + SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + + COPY ref_table FROM PROGRAM 'seq 32'; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +ROLLBACK; +-- reset max_shared_pool_size to default +ALTER SYSTEM RESET citus.max_shared_pool_size; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + -- now show that when max_cached_conns_per_worker > 1 -- Citus forces the first execution to open at least 2 -- connections that are cached. Later, that 2 cached @@ -388,6 +665,149 @@ BEGIN; t (2 rows) +COMMIT; +-- we should not have any reserved connection +-- as all of them have already been either used +-- or cleaned up +SELECT * FROM citus_reserved_connection_stats(); + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- +(0 rows) + +-- reconnect to get rid of cached connections +\c - - - :master_port +SET search_path TO shared_connection_stats; +BEGIN; + INSERT INTO test SELECT i FROM generate_series(0,10)i; + -- after COPY finishes, citus should see the used + -- reserved connections + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- + localhost | 57637 | regression | t + localhost | 57638 | regression | t +(2 rows) + +ROLLBACK; +BEGIN; + -- even if we hit a single shard, all the other reserved + -- connections should be cleaned-up because we do not + -- reserve for the second call as we have the cached + -- connections + INSERT INTO test SELECT 1 FROM generate_series(0,100)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- + localhost | 57637 | regression | f + localhost | 57638 | regression | t +(2 rows) + +ROLLBACK; +BEGIN; + TRUNCATE test; + CREATE UNIQUE INDEX test_unique_index ON test(a); + -- even if we hit a single shard and later fail, all the + -- other reserved connections should be cleaned-up + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; +ERROR: duplicate key value violates unique constraint "test_unique_index_14000001" +DETAIL: Key (a)=(1) already exists. +ROLLBACK; +SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- +(0 rows) + +BEGIN; + -- hits a single shard + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; + -- if COPY hits a single shard, we should have reserved connections + -- to the other nodes + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- + localhost | 57637 | regression | f + localhost | 57638 | regression | t +(2 rows) + + -- we should be able to see this again if the query hits + -- the same shard + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- + localhost | 57637 | regression | f + localhost | 57638 | regression | t +(2 rows) + + -- but when the query hits the other shard(s), we should + -- see that all the reserved connections are used + INSERT INTO test SELECT i FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- + localhost | 57637 | regression | t + localhost | 57638 | regression | t +(2 rows) + +ROLLBACK; +-- at the end of the transaction, all should be cleared +SELECT * FROM citus_reserved_connection_stats(); + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- +(0 rows) + +BEGIN; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + -- the above command used at least one connection per node + -- so the next commands would not need any reserved connections + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- +(0 rows) + + INSERT INTO test SELECT i FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- +(0 rows) + +COMMIT; +-- checkout the reserved connections with cached connections +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 1; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- cache connections to the nodes +SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 123 +(1 row) + +BEGIN; + -- we should not have any reserved connections + -- because we already have available connections + COPY test FROM PROGRAM 'seq 32'; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + hostname | port | database_name | used_reserved_connection +--------------------------------------------------------------------- +(0 rows) + COMMIT; -- in case other tests relies on these setting, reset them ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; @@ -399,5 +819,7 @@ SELECT pg_reload_conf(); t (1 row) +BEGIN; +SET LOCAL client_min_messages TO WARNING; DROP SCHEMA shared_connection_stats CASCADE; -NOTICE: drop cascades to table test +COMMIT; diff --git a/src/test/regress/sql/set_role_in_transaction.sql b/src/test/regress/sql/set_role_in_transaction.sql index c4dc9cee5..7d27ba3e2 100644 --- a/src/test/regress/sql/set_role_in_transaction.sql +++ b/src/test/regress/sql/set_role_in_transaction.sql @@ -48,6 +48,24 @@ SET search_path TO set_role_in_transaction; INSERT INTO t values (2); ROLLBACK; +-- we cannot change role in between COPY commands as well +SET ROLE user1; +SET search_path TO set_role_in_transaction; +BEGIN; + COPY t FROM STDIN; +1 +2 +3 +\. + SET ROLE user2; + SET search_path TO set_role_in_transaction; + COPY t FROM STDIN; +1 +2 +3 +\. +ROLLBACK; + RESET ROLE; REVOKE ALL ON SCHEMA set_role_in_transaction FROM user1; diff --git a/src/test/regress/sql/shared_connection_stats.sql b/src/test/regress/sql/shared_connection_stats.sql index a0cb1cef0..7488f2d88 100644 --- a/src/test/regress/sql/shared_connection_stats.sql +++ b/src/test/regress/sql/shared_connection_stats.sql @@ -1,6 +1,21 @@ CREATE SCHEMA shared_connection_stats; SET search_path TO shared_connection_stats; +SET citus.next_shard_id TO 14000000; + +-- returns the reserved connections per backend +-- given that the code aggresively cleans up reserved connections +-- this function returns empty set in all the tests +-- In fact, we're testing that no reserved connections remain +CREATE OR REPLACE FUNCTION citus_reserved_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT used_reserved_connection bool) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS 'citus', $$citus_reserved_connection_stats$$; + -- set the cached connections to zero -- and execute a distributed query so that -- we end up with zero cached connections afterwards @@ -193,41 +208,7 @@ COMMIT; BEGIN; -- now allow at most 2 connections for COPY SET LOCAL citus.max_adaptive_executor_pool_size TO 2; - -COPY test FROM STDIN; -1 -2 -3 -4 -5 -6 -7 -8 -9 -10 -11 -12 -13 -14 -15 -16 -17 -18 -19 -20 -21 -22 -23 -24 -25 -26 -27 -28 -29 -30 -31 -32 -\. + COPY test FROM PROGRAM 'seq 32'; SELECT connection_count_to_node @@ -240,6 +221,188 @@ COPY test FROM STDIN; hostname, port; ROLLBACK; +-- now, show that COPY doesn't open more connections than the shared_pool_size + +-- now, decrease the shared pool size, and show that COPY doesn't exceed that +ALTER SYSTEM SET citus.max_shared_pool_size TO 3; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +BEGIN; + +COPY test FROM PROGRAM 'seq 32'; + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +ROLLBACK; + +BEGIN; + -- in this test, we trigger touching only one of the workers + -- the first copy touches 3 shards + COPY test FROM STDIN; +2 +4 +5 +\. + + -- we see one worker has 3 connections, the other is 1, which is not + -- an already established connection, but a reservation + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + +-- in this second COPY, we access the same node but different shards +-- so we test the case where the second COPY cannot get any new connections +-- due to adaptive connection management, and can still continue +COPY test FROM STDIN; +6 +8 +9 +\. + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +ROLLBACK; + + +BEGIN; + -- in this test, we trigger touching only one of the workers + -- the first copy touches 3 shards + SELECT count(*) FROM test WHERE a IN (2,4,5); + + -- we see one worker has 3 connections, the other is 1, which is not + -- an already established connection, but a reservation + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + +-- in this second COPY, we access the same node but different shards +-- so we test the case where the second COPY cannot get any new connections +-- due to adaptive connection management, and can still continue +COPY test FROM STDIN; +6 +8 +9 +\. + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +ROLLBACK; + + +BEGIN; + -- when COPY is used with _max_query_parallelization + -- it ignores the shared pool size + SET LOCAL citus.force_max_query_parallelization TO ON; + COPY test FROM PROGRAM 'seq 32'; + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +ROLLBACK; + +-- INSERT SELECT with RETURNING/ON CONFLICT clauses should honor shared_pool_size +-- in underlying COPY commands +BEGIN; + SELECT pg_sleep(0.1); + INSERT INTO test SELECT i FROM generate_series(0,10) i RETURNING *; + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +ROLLBACK; + +-- COPY operations to range partitioned tables will honor max_shared_pool_size +-- as we use a single connection to each worker +CREATE TABLE range_table(a int); +SELECT create_distributed_table('range_table', 'a', 'range'); +CALL public.create_range_partitioned_shards('range_table', + '{0,25,50,76}', + '{24,49,75,200}'); +BEGIN; + SELECT pg_sleep(0.1); + COPY range_table FROM PROGRAM 'seq 32'; + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +ROLLBACK; + + +-- COPY operations to reference tables will use one connection per worker +-- so we will always honor max_shared_pool_size. +CREATE TABLE ref_table(a int); +SELECT create_reference_table('ref_table'); + +BEGIN; + SELECT pg_sleep(0.1); + COPY ref_table FROM PROGRAM 'seq 32'; + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +ROLLBACK; + +-- reset max_shared_pool_size to default +ALTER SYSTEM RESET citus.max_shared_pool_size; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + -- now show that when max_cached_conns_per_worker > 1 -- Citus forces the first execution to open at least 2 -- connections that are cached. Later, that 2 cached @@ -268,10 +431,97 @@ BEGIN; hostname, port; COMMIT; +-- we should not have any reserved connection +-- as all of them have already been either used +-- or cleaned up +SELECT * FROM citus_reserved_connection_stats(); + +-- reconnect to get rid of cached connections +\c - - - :master_port +SET search_path TO shared_connection_stats; + +BEGIN; + INSERT INTO test SELECT i FROM generate_series(0,10)i; + + -- after COPY finishes, citus should see the used + -- reserved connections + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; +ROLLBACK; + + +BEGIN; + -- even if we hit a single shard, all the other reserved + -- connections should be cleaned-up because we do not + -- reserve for the second call as we have the cached + -- connections + INSERT INTO test SELECT 1 FROM generate_series(0,100)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; +ROLLBACK; + +BEGIN; + TRUNCATE test; + CREATE UNIQUE INDEX test_unique_index ON test(a); + + -- even if we hit a single shard and later fail, all the + -- other reserved connections should be cleaned-up + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; +ROLLBACK; +SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + +BEGIN; + -- hits a single shard + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; + + -- if COPY hits a single shard, we should have reserved connections + -- to the other nodes + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + + -- we should be able to see this again if the query hits + -- the same shard + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + + -- but when the query hits the other shard(s), we should + -- see that all the reserved connections are used + INSERT INTO test SELECT i FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; +ROLLBACK; + +-- at the end of the transaction, all should be cleared +SELECT * FROM citus_reserved_connection_stats(); + +BEGIN; + SELECT count(*) FROM test; + + -- the above command used at least one connection per node + -- so the next commands would not need any reserved connections + INSERT INTO test SELECT 1 FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; + INSERT INTO test SELECT i FROM generate_series(0,10)i; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; +COMMIT; + +-- checkout the reserved connections with cached connections +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 1; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +-- cache connections to the nodes +SELECT count(*) FROM test; +BEGIN; + -- we should not have any reserved connections + -- because we already have available connections + COPY test FROM PROGRAM 'seq 32'; + SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; +COMMIT; + -- in case other tests relies on these setting, reset them ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; ALTER SYSTEM RESET citus.recover_2pc_interval; ALTER SYSTEM RESET citus.max_cached_conns_per_worker; SELECT pg_reload_conf(); +BEGIN; +SET LOCAL client_min_messages TO WARNING; DROP SCHEMA shared_connection_stats CASCADE; +COMMIT;