From aa6b641828a017f3e6f037995b65150dbe974816 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 24 Mar 2020 13:20:40 +0100 Subject: [PATCH] Throttle connections to the worker nodes With this commit, we're introducing a new infrastructure to throttle connections to the worker nodes. This infrastructure is useful for multi-shard queries, router queries are have not been affected by this. The goal is to prevent establishing more than citus.max_shared_pool_size number of connections per worker node in total, across sessions. To do that, we've introduced a new connection flag OPTIONAL_CONNECTION. The idea is that some connections are optional such as the second (and further connections) for the adaptive executor. A single connection is enough to finish the distributed execution, the others are useful to execute the query faster. Thus, they can be consider as optional connections. When an optional connection is not allowed to the adaptive executor, it simply skips it and continues the execution with the already established connections. However, it'll keep retrying to establish optional connections, in case some slots are open again. --- .../distributed/commands/utility_hook.c | 1 - .../connection/connection_management.c | 104 ++++- .../connection/shared_connection_stats.c | 430 ++++++++++++++++-- .../distributed/executor/adaptive_executor.c | 90 +++- .../distributed/metadata/node_metadata.c | 20 +- src/backend/distributed/shared_library_init.c | 112 +++-- .../distributed/sql/citus--9.2-4--9.3-2.sql | 1 + .../distributed/sql/citus--9.3-1--9.3-2.sql | 1 - .../citus_remote_connection_stats/9.3-2.sql | 26 +- .../citus_remote_connection_stats/latest.sql | 26 +- .../test/shared_connection_counters.c | 66 +++ .../transaction/transaction_management.c | 1 + .../distributed/connection_management.h | 38 +- .../distributed/shared_connection_stats.h | 11 +- .../ensure_no_shared_connection_leak.out | 160 +++++++ .../expected/shared_connection_stats.out | 391 ++++++++++++++++ .../expected/shared_connection_waits.out | 33 ++ src/test/regress/failure_schedule | 6 + src/test/regress/isolation_schedule | 1 + src/test/regress/multi_mx_schedule | 6 + src/test/regress/multi_schedule | 12 + .../regress/multi_task_tracker_extra_schedule | 6 + .../regress/spec/shared_connection_waits.spec | 65 +++ .../sql/ensure_no_shared_connection_leak.sql | 83 ++++ src/test/regress/sql/node_conninfo_reload.sql | 9 +- .../regress/sql/shared_connection_stats.sql | 228 ++++++++++ 26 files changed, 1815 insertions(+), 112 deletions(-) delete mode 100644 src/backend/distributed/sql/citus--9.3-1--9.3-2.sql create mode 100644 src/backend/distributed/test/shared_connection_counters.c create mode 100644 src/test/regress/expected/ensure_no_shared_connection_leak.out create mode 100644 src/test/regress/expected/shared_connection_stats.out create mode 100644 src/test/regress/expected/shared_connection_waits.out create mode 100644 src/test/regress/spec/shared_connection_waits.spec create mode 100644 src/test/regress/sql/ensure_no_shared_connection_leak.sql create mode 100644 src/test/regress/sql/shared_connection_stats.sql diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 2dfd4386f..0b5e3d109 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -64,7 +64,6 @@ #include "utils/lsyscache.h" #include "utils/syscache.h" - bool EnableDDLPropagation = true; /* ddl propagation is enabled */ PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */ static bool shouldInvalidateForeignKeyGraph = false; diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index f9b13f8ce..fcdceea8b 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -28,11 +28,13 @@ #include "distributed/hash_helpers.h" #include "distributed/placement_connection.h" #include "distributed/run_from_same_connection.h" +#include "distributed/shared_connection_stats.h" #include "distributed/cancel_utils.h" #include "distributed/remote_commands.h" #include "distributed/version_compat.h" #include "mb/pg_wchar.h" #include "portability/instr_time.h" +#include "storage/ipc.h" #include "utils/hsearch.h" #include "utils/memutils.h" @@ -46,7 +48,8 @@ MemoryContext ConnectionContext = NULL; static uint32 ConnectionHashHash(const void *key, Size keysize); static int ConnectionHashCompare(const void *a, const void *b, Size keysize); -static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); +static void StartConnectionEstablishment(MultiConnection *connectionn, + ConnectionHashKey *key); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static bool ShouldShutdownConnection(MultiConnection *connection, const int @@ -78,7 +81,7 @@ static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount); static void CloseNotReadyMultiConnectionStates(List *connectionStates); static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState); - +static void CitusPQFinish(MultiConnection *connection); static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL; @@ -257,7 +260,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const char *user, const char *database) { ConnectionHashKey key; - MultiConnection *connection; bool found; /* do some minimal input checks */ @@ -310,24 +312,75 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, if (!(flags & FORCE_NEW_CONNECTION)) { /* check connection cache for a connection that's not already in use */ - connection = FindAvailableConnection(entry->connections, flags); + MultiConnection *connection = FindAvailableConnection(entry->connections, flags); if (connection) { return connection; } } + /* * Either no caching desired, or no pre-established, non-claimed, * connection present. Initiate connection establishment. */ - - connection = StartConnectionEstablishment(&key); - + MultiConnection *connection = MemoryContextAllocZero(ConnectionContext, + sizeof(MultiConnection)); + connection->initilizationState = POOL_STATE_NOT_INITIALIZED; dlist_push_tail(entry->connections, &connection->connectionNode); + /* these two flags are by nature cannot happen at the same time */ + Assert(!((flags & WAIT_FOR_CONNECTION) && (flags & OPTIONAL_CONNECTION))); + + if (flags & WAIT_FOR_CONNECTION) + { + WaitLoopForSharedConnection(hostname, port); + } + else if (flags & OPTIONAL_CONNECTION) + { + /* + * We can afford to skip establishing an optional connection. For + * non-optional connections, we first retry for some time. If we still + * cannot reserve the right to establish a connection, we prefer to + * error out. + */ + if (!TryToIncrementSharedConnectionCounter(hostname, port)) + { + /* do not track the connection anymore */ + dlist_delete(&connection->connectionNode); + pfree(connection); + + return NULL; + } + } + else + { + /* + * The caller doesn't want the connection manager to wait + * until a connection slot is available on the remote node. + * In the end, we might fail to establish connection to the + * remote node as it might not have any space in + * max_connections for this connection establishment. + * + * Still, we keep track of the connnection counter. + */ + IncrementSharedConnectionCounter(hostname, port); + } + + + /* + * We've already incremented the counter above, so we should decrement + * when we're done with the connection. + */ + connection->initilizationState = POOL_STATE_COUNTER_INCREMENTED; + + StartConnectionEstablishment(connection, &key); + ResetShardPlacementAssociation(connection); + /* fully initialized the connection, record it */ + connection->initilizationState = POOL_STATE_INITIALIZED; + return connection; } @@ -456,8 +509,7 @@ CloseConnection(MultiConnection *connection) bool found; /* close connection */ - PQfinish(connection->pgConn); - connection->pgConn = NULL; + CitusPQFinish(connection); strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH); key.port = connection->port; @@ -537,8 +589,7 @@ ShutdownConnection(MultiConnection *connection) { SendCancelationRequest(connection); } - PQfinish(connection->pgConn); - connection->pgConn = NULL; + CitusPQFinish(connection); } @@ -903,9 +954,30 @@ CloseNotReadyMultiConnectionStates(List *connectionStates) } /* close connection, otherwise we take up resource on the other side */ + CitusPQFinish(connection); + } +} + + +/* + * CitusPQFinish is a wrapper around PQfinish and does book keeping on shared connection + * counters. + */ +static void +CitusPQFinish(MultiConnection *connection) +{ + if (connection->pgConn != NULL) + { PQfinish(connection->pgConn); connection->pgConn = NULL; } + + /* behave idempotently, there is no gurantee that CitusPQFinish() is called once */ + if (connection->initilizationState >= POOL_STATE_COUNTER_INCREMENTED) + { + DecrementSharedConnectionCounter(connection->hostname, connection->port); + connection->initilizationState = POOL_STATE_NOT_INITIALIZED; + } } @@ -985,8 +1057,8 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize) * Asynchronously establish connection to a remote node, but don't wait for * that to finish. DNS lookups etc. are performed synchronously though. */ -static MultiConnection * -StartConnectionEstablishment(ConnectionHashKey *key) +static void +StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key) { bool found = false; static uint64 connectionId = 1; @@ -1018,9 +1090,6 @@ StartConnectionEstablishment(ConnectionHashKey *key) entry->isValid = true; } - MultiConnection *connection = MemoryContextAllocZero(ConnectionContext, - sizeof(MultiConnection)); - strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH); connection->port = key->port; strlcpy(connection->database, key->database, NAMEDATALEN); @@ -1040,8 +1109,6 @@ StartConnectionEstablishment(ConnectionHashKey *key) PQsetnonblocking(connection->pgConn, true); SetCitusNoticeProcessor(connection); - - return connection; } @@ -1166,6 +1233,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection } return isCitusInitiatedBackend || + connection->initilizationState != POOL_STATE_INITIALIZED || cachedConnectionCount >= MaxCachedConnectionsPerWorker || connection->forceCloseAtTransactionEnd || PQstatus(connection->pgConn) != CONNECTION_OK || diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 7fb9750bc..ffcd3ca30 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -20,20 +20,27 @@ #include "access/hash.h" #include "access/htup_details.h" #include "catalog/pg_authid.h" +#include "commands/dbcommands.h" +#include "distributed/cancel_utils.h" #include "distributed/connection_management.h" +#include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/shared_connection_stats.h" +#include "distributed/time_constants.h" #include "distributed/tuplestore.h" +#include "utils/builtins.h" #include "utils/hashutils.h" #include "utils/hsearch.h" #include "storage/ipc.h" -#define REMOTE_CONNECTION_STATS_COLUMNS 6 +#define REMOTE_CONNECTION_STATS_COLUMNS 4 +#define ADJUST_POOLSIZE_AUTOMATICALLY 0 +#define DISABLE_CONNECTION_THROTTLING -1 /* - * The data structure used to store data in shared memory. This data structure only + * The data structure used to store data in shared memory. This data structure is only * used for storing the lock. The actual statistics about the connections are stored * in the hashmap, which is allocated separately, as Postgres provides different APIs * for allocating hashmaps in the shared memory. @@ -42,22 +49,26 @@ typedef struct ConnectionStatsSharedData { int sharedConnectionHashTrancheId; char *sharedConnectionHashTrancheName; + LWLock sharedConnectionHashLock; + ConditionVariable waitersConditionVariable; } ConnectionStatsSharedData; typedef struct SharedConnStatsHashKey { /* - * Using nodeId (over hostname/hostport) make the tracking resiliant to - * master_update_node(). Plus, requires a little less memory. + * We keep the entries in the shared memory even after master_update_node() + * as there might be some cached connections to the old node. + * That's why, we prefer to use "hostname/port" over nodeId. */ - uint32 nodeId; + char hostname[MAX_NODE_LENGTH]; + int32 port; /* * Given that citus.shared_max_pool_size can be defined per database, we * should keep track of shared connections per database. */ - char database[NAMEDATALEN]; + Oid databaseOid; } SharedConnStatsHashKey; /* hash entry for per worker stats */ @@ -70,16 +81,15 @@ typedef struct SharedConnStatsHashEntry /* - * Controlled via a GUC. - * - * By default, Citus tracks 1024 worker nodes, which is already - * very unlikely number of worker nodes. Given that the shared - * memory required per worker is pretty small (~120 Bytes), we think it - * is a good default that wouldn't hurt any users in any dimension. + * Controlled via a GUC, never access directly, use GetMaxSharedPoolSize(). + * "0" means adjust MaxSharedPoolSize automatically by using MaxConnections. + * "-1" means do not apply connection throttling + * Anything else means use that number */ -int MaxTrackedWorkerNodes = 1024; +int MaxSharedPoolSize = 0; -/* the following two structs used for accessing shared memory */ + +/* the following two structs are used for accessing shared memory */ static HTAB *SharedConnStatsHash = NULL; static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL; @@ -88,9 +98,10 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL; /* local function declarations */ -static void StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); -static void UnLockConnectionSharedMemory(void); +static void StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc + tupleDescriptor); static void LockConnectionSharedMemory(LWLockMode lockMode); +static void UnLockConnectionSharedMemory(void); static void SharedConnectionStatsShmemInit(void); static size_t SharedConnectionStatsShmemSize(void); static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); @@ -112,7 +123,7 @@ citus_remote_connection_stats(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); - StoreAllConnections(tupleStore, tupleDescriptor); + StoreAllRemoteConnectionStats(tupleStore, tupleDescriptor); /* clean up and return the tuplestore */ tuplestore_donestoring(tupleStore); @@ -122,19 +133,19 @@ citus_remote_connection_stats(PG_FUNCTION_ARGS) /* - * StoreAllConnections gets connections established from the current node + * StoreAllRemoteConnectionStats gets connections established from the current node * and inserts them into the given tuplestore. * * We don't need to enforce any access privileges as the number of backends * on any node is already visible on pg_stat_activity to all users. */ static void -StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) +StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) { Datum values[REMOTE_CONNECTION_STATS_COLUMNS]; bool isNulls[REMOTE_CONNECTION_STATS_COLUMNS]; - /* we're reading all distributed transactions, prevent new backends */ + /* we're reading all shared connections, prevent any changes */ LockConnectionSharedMemory(LW_SHARED); HASH_SEQ_STATUS status; @@ -147,9 +158,17 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) memset(values, 0, sizeof(values)); memset(isNulls, false, sizeof(isNulls)); - values[0] = Int32GetDatum(connectionEntry->key.nodeId); - values[1] = PointerGetDatum(connectionEntry->key.database); - values[2] = Int32GetDatum(connectionEntry->connectionCount); + char *databaseName = get_database_name(connectionEntry->key.databaseOid); + if (databaseName == NULL) + { + /* database might have been dropped */ + continue; + } + + values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.hostname)); + values[1] = Int32GetDatum(connectionEntry->key.port); + values[2] = PointerGetDatum(cstring_to_text(databaseName)); + values[3] = Int32GetDatum(connectionEntry->connectionCount); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); } @@ -158,6 +177,313 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) } +/* + * RemoveInactiveNodesFromSharedConnections goes over the SharedConnStatsHash + * and removes the inactive entries. + */ +void +RemoveInactiveNodesFromSharedConnections(void) +{ + /* we're modifying connections, prevent any changes */ + LockConnectionSharedMemory(LW_EXCLUSIVE); + + HASH_SEQ_STATUS status; + SharedConnStatsHashEntry *connectionEntry = NULL; + + /* + * In the first iteration, try to remove worker nodes that doesn't have any active + * conections and the node does not exits in the metadata anymore. + */ + hash_seq_init(&status, SharedConnStatsHash); + while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0) + { + SharedConnStatsHashKey connectionKey = connectionEntry->key; + WorkerNode *workerNode = + FindWorkerNode(connectionKey.hostname, connectionKey.port); + + if (connectionEntry->connectionCount == 0 && + (workerNode == NULL || !workerNode->isActive)) + { + hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL); + } + } + + int entryCount = hash_get_num_entries(SharedConnStatsHash); + if (entryCount + 1 < MaxWorkerNodesTracked) + { + /* we're good, we have at least one more space for a new worker */ + UnLockConnectionSharedMemory(); + + return; + } + + /* + * We aimed to remove nodes that don't have any open connections. If we + * failed to find one, we have to be more aggressive and remove at least + * one of the inactive ones. + */ + hash_seq_init(&status, SharedConnStatsHash); + while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0) + { + SharedConnStatsHashKey connectionKey = connectionEntry->key; + WorkerNode *workerNode = + FindWorkerNode(connectionKey.hostname, connectionKey.port); + + if (workerNode == NULL || !workerNode->isActive) + { + hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL); + + hash_seq_term(&status); + + break; + } + } + + + UnLockConnectionSharedMemory(); +} + + +/* + * GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled + * via a GUC. + * "0" means adjust MaxSharedPoolSize automatically by using MaxConnections + * "-1" means do not apply connection throttling + * Anything else means use that number + */ +int +GetMaxSharedPoolSize(void) +{ + if (MaxSharedPoolSize == ADJUST_POOLSIZE_AUTOMATICALLY) + { + return MaxConnections; + } + + return MaxSharedPoolSize; +} + + +/* + * WaitLoopForSharedConnection tries to increment the shared connection + * counter for the given hostname/port and the current database in + * SharedConnStatsHash. + * + * The function implements a retry mechanism via a condition variable. + */ +void +WaitLoopForSharedConnection(const char *hostname, int port) +{ + while (!TryToIncrementSharedConnectionCounter(hostname, port)) + { + CHECK_FOR_INTERRUPTS(); + + WaitForSharedConnection(); + } + + ConditionVariableCancelSleep(); +} + + +/* + * TryToIncrementSharedConnectionCounter tries to increment the shared + * connection counter for the given nodeId and the current database in + * SharedConnStatsHash. + * + * If the function returns true, the caller is allowed (and expected) + * to establish a new connection to the given node. Else, the caller + * is not allowed to establish a new connection. + */ +bool +TryToIncrementSharedConnectionCounter(const char *hostname, int port) +{ + if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) + { + /* connection throttling disabled */ + return true; + } + + bool counterIncremented = false; + SharedConnStatsHashKey connKey; + + strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + + connKey.port = port; + connKey.databaseOid = MyDatabaseId; + + LockConnectionSharedMemory(LW_EXCLUSIVE); + + /* + * As the hash map is allocated in shared memory, it doesn't rely on palloc for + * memory allocation, so we could get NULL via HASH_ENTER_NULL when there is no + * space in the shared memory. That's why we prefer continuing the execution + * instead of throwing an error. + */ + bool entryFound = false; + SharedConnStatsHashEntry *connectionEntry = + hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound); + + /* + * It is possible to throw an error at this point, but that doesn't help us in anyway. + * Instead, we try our best, let the connection establishment continue by-passing the + * connection throttling. + */ + if (!connectionEntry) + { + UnLockConnectionSharedMemory(); + return true; + } + + if (!entryFound) + { + /* we successfully allocated the entry for the first time, so initialize it */ + connectionEntry->connectionCount = 1; + + counterIncremented = true; + } + else if (connectionEntry->connectionCount + 1 > GetMaxSharedPoolSize()) + { + /* there is no space left for this connection */ + counterIncremented = false; + } + else + { + connectionEntry->connectionCount++; + counterIncremented = true; + } + + UnLockConnectionSharedMemory(); + + return counterIncremented; +} + + +/* + * IncrementSharedConnectionCounter increments the shared counter + * for the given hostname and port. + */ +void +IncrementSharedConnectionCounter(const char *hostname, int port) +{ + SharedConnStatsHashKey connKey; + + if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) + { + /* connection throttling disabled */ + return; + } + + strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + + connKey.port = port; + connKey.databaseOid = MyDatabaseId; + + LockConnectionSharedMemory(LW_EXCLUSIVE); + + /* + * As the hash map is allocated in shared memory, it doesn't rely on palloc for + * memory allocation, so we could get NULL via HASH_ENTER_NULL. That's why we prefer + * continuing the execution instead of throwing an error. + */ + bool entryFound = false; + SharedConnStatsHashEntry *connectionEntry = + hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound); + + /* + * It is possible to throw an error at this point, but that doesn't help us in anyway. + * Instead, we try our best, let the connection establishment continue by-passing the + * connection throttling. + */ + if (!connectionEntry) + { + UnLockConnectionSharedMemory(); + + ereport(DEBUG4, (errmsg("No entry found for node %s:%d while incrementing " + "connection counter", hostname, port))); + + return; + } + + if (!entryFound) + { + /* we successfully allocated the entry for the first time, so initialize it */ + connectionEntry->connectionCount = 0; + } + + connectionEntry->connectionCount += 1; + + UnLockConnectionSharedMemory(); +} + + +/* + * DecrementSharedConnectionCounter decrements the shared counter + * for the given hostname and port. + */ +void +DecrementSharedConnectionCounter(const char *hostname, int port) +{ + SharedConnStatsHashKey connKey; + + if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) + { + /* connection throttling disabled */ + return; + } + + strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + + connKey.port = port; + connKey.databaseOid = MyDatabaseId; + + LockConnectionSharedMemory(LW_EXCLUSIVE); + + bool entryFound = false; + SharedConnStatsHashEntry *connectionEntry = + hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound); + + /* this worker node is removed or updated, no need to care */ + if (!entryFound) + { + UnLockConnectionSharedMemory(); + + /* wake up any waiters in case any backend is waiting for this node */ + WakeupWaiterBackendsForSharedConnection(); + + ereport(DEBUG4, (errmsg("No entry found for node %s:%d while decrementing " + "connection counter", hostname, port))); + + return; + } + + /* we should never go below 0 */ + Assert(connectionEntry->connectionCount > 0); + + connectionEntry->connectionCount -= 1; + + UnLockConnectionSharedMemory(); + + WakeupWaiterBackendsForSharedConnection(); +} + + /* * LockConnectionSharedMemory is a utility function that should be used when * accessing to the SharedConnStatsHash, which is in the shared memory. @@ -180,6 +506,43 @@ UnLockConnectionSharedMemory(void) } +/* + * WakeupWaiterBackendsForSharedConnection is a wrapper around the condition variable + * broadcast operation. + * + * We use a single condition variable, for all worker nodes, to implement the connection + * throttling mechanism. Combination of all the backends are allowed to establish + * MaxSharedPoolSize number of connections per worker node. If a backend requires a + * non-optional connection (see WAIT_FOR_CONNECTION for details), it is not allowed + * to establish it immediately if the total connections are equal to MaxSharedPoolSize. + * Instead, the backend waits on the condition variable. When any other backend + * terminates an existing connection to any remote node, this function is called. + * The main goal is to trigger all waiting backends to try getting a connection slot + * in MaxSharedPoolSize. The ones which can get connection slot are allowed to continue + * with the connection establishments. Others should wait another backend to call + * this function. + */ +void +WakeupWaiterBackendsForSharedConnection(void) +{ + ConditionVariableBroadcast(&ConnectionStatsSharedState->waitersConditionVariable); +} + + +/* + * WaitForSharedConnection is a wrapper around the condition variable sleep operation. + * + * For the details of the use of the condition variable, see + * WakeupWaiterBackendsForSharedConnection(). + */ +void +WaitForSharedConnection(void) +{ + ConditionVariableSleep(&ConnectionStatsSharedState->waitersConditionVariable, + PG_WAIT_EXTENSION); +} + + /* * InitializeSharedConnectionStats requests the necessary shared memory * from Postgres and sets up the shared memory startup hook. @@ -208,9 +571,8 @@ SharedConnectionStatsShmemSize(void) Size size = 0; size = add_size(size, sizeof(ConnectionStatsSharedData)); - size = add_size(size, mul_size(sizeof(LWLock), MaxTrackedWorkerNodes)); - Size hashSize = hash_estimate_size(MaxTrackedWorkerNodes, + Size hashSize = hash_estimate_size(MaxWorkerNodesTracked, sizeof(SharedConnStatsHashEntry)); size = add_size(size, hashSize); @@ -229,7 +591,7 @@ SharedConnectionStatsShmemInit(void) bool alreadyInitialized = false; HASHCTL info; - /* create (nodeId,database) -> [counter] */ + /* create (hostname, port, database) -> [counter] */ memset(&info, 0, sizeof(info)); info.keysize = sizeof(SharedConnStatsHashKey); info.entrysize = sizeof(SharedConnStatsHashEntry); @@ -260,12 +622,14 @@ SharedConnectionStatsShmemInit(void) LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock, ConnectionStatsSharedState->sharedConnectionHashTrancheId); + + ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable); } /* allocate hash table */ SharedConnStatsHash = - ShmemInitHash("Shared Conn. Stats Hash", MaxTrackedWorkerNodes, - MaxTrackedWorkerNodes, &info, hashFlags); + ShmemInitHash("Shared Conn. Stats Hash", MaxWorkerNodesTracked, + MaxWorkerNodesTracked, &info, hashFlags); LWLockRelease(AddinShmemInitLock); @@ -284,8 +648,9 @@ SharedConnectionHashHash(const void *key, Size keysize) { SharedConnStatsHashKey *entry = (SharedConnStatsHashKey *) key; - uint32 hash = hash_uint32(entry->nodeId); - hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN)); + uint32 hash = string_hash(entry->hostname, NAMEDATALEN); + hash = hash_combine(hash, hash_uint32(entry->port)); + hash = hash_combine(hash, hash_uint32(entry->databaseOid)); return hash; } @@ -297,8 +662,9 @@ SharedConnectionHashCompare(const void *a, const void *b, Size keysize) SharedConnStatsHashKey *ca = (SharedConnStatsHashKey *) a; SharedConnStatsHashKey *cb = (SharedConnStatsHashKey *) b; - if (ca->nodeId != cb->nodeId || - strncmp(ca->database, cb->database, NAMEDATALEN) != 0) + if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 || + ca->port != cb->port || + ca->databaseOid != cb->databaseOid) { return 1; } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 4f8863edc..10d411bea 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -589,6 +589,7 @@ static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection); static void ManageWorkerPool(WorkerPool *workerPool); +static bool ShouldWaitForConnection(WorkerPool *workerPool); static void CheckConnectionTimeout(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); @@ -1944,9 +1945,6 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node workerPool->nodeName = pstrdup(nodeName); workerPool->nodePort = nodePort; - INSTR_TIME_SET_ZERO(workerPool->poolStartTime); - workerPool->distributedExecution = execution; - /* "open" connections aggressively when there are cached connections */ int nodeConnectionCount = MaxCachedConnectionsPerWorker; workerPool->maxNewConnectionsPerCycle = Max(1, nodeConnectionCount); @@ -1954,6 +1952,8 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node dlist_init(&workerPool->pendingTaskQueue); dlist_init(&workerPool->readyTaskQueue); + workerPool->distributedExecution = execution; + execution->workerList = lappend(execution->workerList, workerPool); return workerPool; @@ -2385,11 +2385,48 @@ ManageWorkerPool(WorkerPool *workerPool) connectionFlags |= OUTSIDE_TRANSACTION; } + if (UseConnectionPerPlacement()) + { + /* + * User wants one connection per placement, so no throttling is desired + * and we do not set any flags. + * + * The primary reason for this is that allowing multiple backends to use + * connection per placement could lead to unresolved self deadlocks. In other + * words, each backend may stuck waiting for other backends to get a slot + * in the shared connection counters. + */ + } + else if (ShouldWaitForConnection(workerPool)) + { + /* + * We need this connection to finish the execution. If it is not + * available based on the current number of connections to the worker + * then wait for it. + */ + connectionFlags |= WAIT_FOR_CONNECTION; + } + else + { + /* + * The executor can finish the execution with a single connection, + * remaining are optional. If the executor can get more connections, + * it can increase the parallelism. + */ + connectionFlags |= OPTIONAL_CONNECTION; + } + /* open a new connection to the worker */ MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags, workerPool->nodeName, workerPool->nodePort, NULL, NULL); + if (!connection) + { + /* connection can only be NULL for optional connections */ + Assert((connectionFlags & OPTIONAL_CONNECTION)); + continue; + } /* * Assign the initial state in the connection state machine. The connection @@ -2404,6 +2441,17 @@ ManageWorkerPool(WorkerPool *workerPool) */ connection->claimedExclusively = true; + if (list_length(workerPool->sessionList) == 0) + { + /* + * The worker pool has just started to establish connections. We need to + * defer this initilization after StartNodeUserDatabaseConnection() + * because for non-optional connections, we have some logic to wait + * until a connection is allowed to be established. + */ + INSTR_TIME_SET_ZERO(workerPool->poolStartTime); + } + /* create a session for the connection */ WorkerSession *session = FindOrCreateWorkerSession(workerPool, connection); @@ -2416,6 +2464,42 @@ ManageWorkerPool(WorkerPool *workerPool) } +/* + * ShouldWaitForConnection returns true if the workerPool should wait to + * get the next connection until one slot is empty within + * citus.max_shared_pool_size on the worker. Note that, if there is an + * empty slot, the connection will not wait anyway. + */ +static bool +ShouldWaitForConnection(WorkerPool *workerPool) +{ + if (list_length(workerPool->sessionList) == 0) + { + /* + * We definitely need at least 1 connection to finish the execution. + * All single shard queries hit here with the default settings. + */ + return true; + } + + if (list_length(workerPool->sessionList) < MaxCachedConnectionsPerWorker) + { + /* + * Until this session caches MaxCachedConnectionsPerWorker connections, + * this might lead some optional connections to be considered as non-optional + * when MaxCachedConnectionsPerWorker > 1. + * + * However, once the session caches MaxCachedConnectionsPerWorker (which is + * the second transaction executed in the session), Citus would utilize the + * cached connections as much as possible. + */ + return true; + } + + return false; +} + + /* * CheckConnectionTimeout makes sure that the execution enforces the connection * establishment timeout defined by the user (NodeConnectionTimeout). diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 768a9e1df..1489f97ac 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -39,6 +39,7 @@ #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" +#include "distributed/shared_connection_stats.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" #include "lib/stringinfo.h" @@ -293,6 +294,9 @@ master_disable_node(PG_FUNCTION_ARGS) bool onlyConsiderActivePlacements = false; MemoryContext savedContext = CurrentMemoryContext; + /* remove the shared connection counters to have some space */ + RemoveInactiveNodesFromSharedConnections(); + PG_TRY(); { if (NodeIsPrimary(workerNode)) @@ -604,6 +608,9 @@ ActivateNode(char *nodeName, int nodePort) { bool isActive = true; + /* remove the shared connection counters to have some space */ + RemoveInactiveNodesFromSharedConnections(); + /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); @@ -646,6 +653,9 @@ master_update_node(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); + /* remove the shared connection counters to have some space */ + RemoveInactiveNodesFromSharedConnections(); + WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString, newNodePort); if (workerNodeWithSameAddress != NULL) @@ -672,6 +682,7 @@ master_update_node(PG_FUNCTION_ARGS) errmsg("node %u not found", nodeId))); } + /* * If the node is a primary node we block reads and writes. * @@ -715,8 +726,9 @@ master_update_node(PG_FUNCTION_ARGS) UpdateNodeLocation(nodeId, newNodeNameString, newNodePort); - strlcpy(workerNode->workerName, newNodeNameString, WORKER_LENGTH); - workerNode->workerPort = newNodePort; + /* we should be able to find the new node from the metadata */ + workerNode = FindWorkerNode(newNodeNameString, newNodePort); + Assert(workerNode->nodeId == nodeId); /* * Propagate the updated pg_dist_node entry to all metadata workers. @@ -1012,8 +1024,10 @@ ReadDistNode(bool includeNodesFromOtherClusters) static void RemoveNodeFromCluster(char *nodeName, int32 nodePort) { - WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); + /* remove the shared connection counters to have some space */ + RemoveInactiveNodesFromSharedConnections(); + WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); if (NodeIsPrimary(workerNode)) { bool onlyConsiderActivePlacements = false; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 156acf630..7811b1236 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -73,6 +73,7 @@ #include "distributed/adaptive_executor.h" #include "port/atomics.h" #include "postmaster/postmaster.h" +#include "storage/ipc.h" #include "optimizer/planner.h" #include "optimizer/paths.h" #include "tcop/tcopprot.h" @@ -89,9 +90,10 @@ static char *CitusVersion = CITUS_VERSION; void _PG_init(void); -static void CitusBackendAtExit(void); static void ResizeStackToMaximumDepth(void); static void multi_log_hook(ErrorData *edata); +static void RegisterConnectionCleanup(void); +static void CitusCleanupConnectionsAtExit(int code, Datum arg); static void CreateRequiredDirectories(void); static void RegisterCitusConfigVariables(void); static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra, @@ -99,6 +101,7 @@ static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra, static bool WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source); static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source); static void NodeConninfoGucAssignHook(const char *newval, void *extra); +static const char * MaxSharedPoolSizeGucShowHook(void); static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source); @@ -274,8 +277,6 @@ _PG_init(void) InitializeCitusQueryStats(); InitializeSharedConnectionStats(); - atexit(CitusBackendAtExit); - /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) { @@ -285,18 +286,6 @@ _PG_init(void) } -/* - * CitusBackendAtExit is called atexit of the backend for the purposes of - * any clean-up needed. - */ -static void -CitusBackendAtExit(void) -{ - /* properly close all the cached connections */ - ShutdownAllConnections(); -} - - /* * Stack size increase during high memory load may cause unexpected crashes. * With this alloca call, we are increasing stack size explicitly, so that if @@ -382,6 +371,37 @@ StartupCitusBackend(void) { InitializeMaintenanceDaemonBackend(); InitializeBackendData(); + RegisterConnectionCleanup(); +} + + +/* + * RegisterConnectionCleanup cleans up any resources left at the end of the + * session. We prefer to cleanup before shared memory exit to make sure that + * this session properly releases anything hold in the shared memory. + */ +static void +RegisterConnectionCleanup(void) +{ + static bool registeredCleanup = false; + if (registeredCleanup == false) + { + before_shmem_exit(CitusCleanupConnectionsAtExit, 0); + + registeredCleanup = true; + } +} + + +/* + * CitusCleanupConnectionsAtExit is called before_shmem_exit() of the + * backend for the purposes of any clean-up needed. + */ +static void +CitusCleanupConnectionsAtExit(int code, Datum arg) +{ + /* properly close all the cached connections */ + ShutdownAllConnections(); } @@ -930,6 +950,20 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_shared_pool_size", + gettext_noop("Sets the maximum number of connections allowed per worker node " + "across all the backends from this node. Setting to -1 disables " + "connections throttling. Setting to 0 makes it auto-adjust, meaning " + "equal to max_connections on the coordinator."), + gettext_noop("As a rule of thumb, the value should be at most equal to the " + "max_connections on the remote nodes."), + &MaxSharedPoolSize, + 0, -1, INT_MAX, + PGC_SIGHUP, + GUC_SUPERUSER_ONLY, + NULL, NULL, MaxSharedPoolSizeGucShowHook); + DefineCustomIntVariable( "citus.max_worker_nodes_tracked", gettext_noop("Sets the maximum number of worker nodes that are tracked."), @@ -937,9 +971,14 @@ RegisterCitusConfigVariables(void) "health status are tracked in a shared hash table on " "the master node. This configuration value limits the " "size of the hash table, and consequently the maximum " - "number of worker nodes that can be tracked."), + "number of worker nodes that can be tracked." + "Citus keeps some information about the worker nodes " + "in the shared memory for certain optimizations. The " + "optimizations are enforced up to this number of worker " + "nodes. Any additional worker nodes may not benefit from" + "the optimizations."), &MaxWorkerNodesTracked, - 2048, 8, INT_MAX, + 2048, 1024, INT_MAX, PGC_POSTMASTER, GUC_STANDARD, NULL, NULL, NULL); @@ -1013,23 +1052,6 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); - DefineCustomIntVariable( - "citus.max_tracked_worker_nodes", - gettext_noop("Sets the maximum number of worker tracked."), - gettext_noop("Citus doesn't have any limitations in terms of the " - "number of worker nodes allowed in the cluster. But, " - "Citus keeps some information about the worker nodes " - "in the shared memory for certain optimizations. The " - "optimizations are enforced up to this number of worker " - "nodes. Any additional worker nodes may not benefit from" - "the optimizations."), - &MaxTrackedWorkerNodes, - 1024, 256, INT_MAX, - PGC_POSTMASTER, - GUC_STANDARD, - NULL, NULL, NULL); - - DefineCustomIntVariable( "citus.max_running_tasks_per_node", gettext_noop("Sets the maximum number of tasks to run concurrently per node."), @@ -1539,6 +1561,28 @@ NodeConninfoGucAssignHook(const char *newval, void *extra) } +/* + * MaxSharedPoolSizeGucShowHook overrides the value that is shown to the + * user when the default value has not been set. + */ +static const char * +MaxSharedPoolSizeGucShowHook(void) +{ + StringInfo newvalue = makeStringInfo(); + + if (MaxSharedPoolSize == 0) + { + appendStringInfo(newvalue, "%d", GetMaxSharedPoolSize()); + } + else + { + appendStringInfo(newvalue, "%d", MaxSharedPoolSize); + } + + return (const char *) newvalue->data; +} + + static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source) { diff --git a/src/backend/distributed/sql/citus--9.2-4--9.3-2.sql b/src/backend/distributed/sql/citus--9.2-4--9.3-2.sql index f8c3b14f9..31b6af6b0 100644 --- a/src/backend/distributed/sql/citus--9.2-4--9.3-2.sql +++ b/src/backend/distributed/sql/citus--9.2-4--9.3-2.sql @@ -5,3 +5,4 @@ #include "udfs/citus_extradata_container/9.3-2.sql" #include "udfs/update_distributed_table_colocation/9.3-2.sql" #include "udfs/replicate_reference_tables/9.3-2.sql" +#include "udfs/citus_remote_connection_stats/9.3-2.sql" diff --git a/src/backend/distributed/sql/citus--9.3-1--9.3-2.sql b/src/backend/distributed/sql/citus--9.3-1--9.3-2.sql deleted file mode 100644 index d1f8f5185..000000000 --- a/src/backend/distributed/sql/citus--9.3-1--9.3-2.sql +++ /dev/null @@ -1 +0,0 @@ -#include "udfs/citus_remote_connection_stats/9.3-2.sql" diff --git a/src/backend/distributed/sql/udfs/citus_remote_connection_stats/9.3-2.sql b/src/backend/distributed/sql/udfs/citus_remote_connection_stats/9.3-2.sql index 3369c1ac7..bb4dccd87 100644 --- a/src/backend/distributed/sql/udfs/citus_remote_connection_stats/9.3-2.sql +++ b/src/backend/distributed/sql/udfs/citus_remote_connection_stats/9.3-2.sql @@ -1,6 +1,22 @@ -CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) - RETURNS SETOF RECORD - LANGUAGE C STRICT - AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$; - COMMENT ON FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) +CREATE OR REPLACE FUNCTION pg_catalog.citus_remote_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT connection_count_to_node int) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$; + +COMMENT ON FUNCTION pg_catalog.citus_remote_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT connection_count_to_node int) IS 'returns statistics about remote connections'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_remote_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT connection_count_to_node int) +FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_remote_connection_stats/latest.sql b/src/backend/distributed/sql/udfs/citus_remote_connection_stats/latest.sql index 3369c1ac7..bb4dccd87 100644 --- a/src/backend/distributed/sql/udfs/citus_remote_connection_stats/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_remote_connection_stats/latest.sql @@ -1,6 +1,22 @@ -CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) - RETURNS SETOF RECORD - LANGUAGE C STRICT - AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$; - COMMENT ON FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) +CREATE OR REPLACE FUNCTION pg_catalog.citus_remote_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT connection_count_to_node int) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$; + +COMMENT ON FUNCTION pg_catalog.citus_remote_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT connection_count_to_node int) IS 'returns statistics about remote connections'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_remote_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT connection_count_to_node int) +FROM PUBLIC; diff --git a/src/backend/distributed/test/shared_connection_counters.c b/src/backend/distributed/test/shared_connection_counters.c new file mode 100644 index 000000000..e95a2ccbb --- /dev/null +++ b/src/backend/distributed/test/shared_connection_counters.c @@ -0,0 +1,66 @@ +/*------------------------------------------------------------------------- + * + * test/src/sequential_execution.c + * + * This file contains functions to test setting citus.multi_shard_modify_mode + * GUC. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" +#include "fmgr.h" + +#include "distributed/shared_connection_stats.h" +#include "distributed/listutils.h" +#include "nodes/parsenodes.h" +#include "utils/guc.h" + +/* exports for SQL callable functions */ +PG_FUNCTION_INFO_V1(wake_up_connection_pool_waiters); +PG_FUNCTION_INFO_V1(set_max_shared_pool_size); + + +/* + * wake_up_waiters_backends is a SQL + * interface for testing WakeupWaiterBackendsForSharedConnection(). + */ +Datum +wake_up_connection_pool_waiters(PG_FUNCTION_ARGS) +{ + WakeupWaiterBackendsForSharedConnection(); + + PG_RETURN_VOID(); +} + + +/* + * set_max_shared_pool_size is a SQL + * interface for setting MaxSharedPoolSize. We use this function in isolation + * tester where ALTER SYSTEM is not allowed. + */ +Datum +set_max_shared_pool_size(PG_FUNCTION_ARGS) +{ + int value = PG_GETARG_INT32(0); + + AlterSystemStmt *alterSystemStmt = palloc0(sizeof(AlterSystemStmt)); + + A_Const *aConstValue = makeNode(A_Const); + + aConstValue->val = *makeInteger(value); + alterSystemStmt->setstmt = makeNode(VariableSetStmt); + alterSystemStmt->setstmt->name = "citus.max_shared_pool_size"; + alterSystemStmt->setstmt->is_local = false; + alterSystemStmt->setstmt->kind = VAR_SET_VALUE; + alterSystemStmt->setstmt->args = list_make1(aConstValue); + + AlterSystemSetConfigFile(alterSystemStmt); + + kill(PostmasterPid, SIGHUP); + + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 1783d4a73..b6a541807 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -31,6 +31,7 @@ #include "distributed/repartition_join_execution.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" +#include "distributed/shared_connection_stats.h" #include "distributed/subplan_execution.h" #include "distributed/version_compat.h" #include "utils/hsearch.h" diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 015546158..5471bc3cc 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -52,10 +52,29 @@ enum MultiConnectionMode /* open a connection per (co-located set of) placement(s) */ CONNECTION_PER_PLACEMENT = 1 << 3, - OUTSIDE_TRANSACTION = 1 << 4 + OUTSIDE_TRANSACTION = 1 << 4, + + /* + * Some connections are optional such as when adaptive executor is executing + * a multi-shard command and requires the second (or further) connections + * per node. In that case, the connection manager may decide not to allow the + * connection. + */ + OPTIONAL_CONNECTION = 1 << 5, + + /* + * When this flag is passed, via connection throttling, the connection + * establishments may be suspended until a connection slot is available to + * the remote host. + */ + WAIT_FOR_CONNECTION = 1 << 6 }; +/* + * This state is used for keeping track of the initilization + * of the underlying pg_conn struct. + */ typedef enum MultiConnectionState { MULTI_CONNECTION_INITIAL, @@ -65,6 +84,21 @@ typedef enum MultiConnectionState MULTI_CONNECTION_LOST } MultiConnectionState; + +/* + * This state is used for keeping track of the initilization + * of MultiConnection struct, not specifically the underlying + * pg_conn. The state is useful to determine the action during + * clean-up of connections. + */ +typedef enum MultiConnectionStructInitializationState +{ + POOL_STATE_NOT_INITIALIZED, + POOL_STATE_COUNTER_INCREMENTED, + POOL_STATE_INITIALIZED +} MultiConnectionStructInitializationState; + + /* declaring this directly above makes uncrustify go crazy */ typedef enum MultiConnectionMode MultiConnectionMode; @@ -114,6 +148,8 @@ typedef struct MultiConnection /* number of bytes sent to PQputCopyData() since last flush */ uint64 copyBytesWrittenSinceLastFlush; + + MultiConnectionStructInitializationState initilizationState; } MultiConnection; diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 9e9e2102e..1964d50b7 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -11,8 +11,17 @@ #ifndef SHARED_CONNECTION_STATS_H #define SHARED_CONNECTION_STATS_H -extern int MaxTrackedWorkerNodes; +extern int MaxSharedPoolSize; + extern void InitializeSharedConnectionStats(void); +extern void WaitForSharedConnection(void); +extern void WakeupWaiterBackendsForSharedConnection(void); +extern void RemoveInactiveNodesFromSharedConnections(void); +extern int GetMaxSharedPoolSize(void); +extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); +extern void WaitLoopForSharedConnection(const char *hostname, int port); +extern void DecrementSharedConnectionCounter(const char *hostname, int port); +extern void IncrementSharedConnectionCounter(const char *hostname, int port); #endif /* SHARED_CONNECTION_STATS_H */ diff --git a/src/test/regress/expected/ensure_no_shared_connection_leak.out b/src/test/regress/expected/ensure_no_shared_connection_leak.out new file mode 100644 index 000000000..060313c60 --- /dev/null +++ b/src/test/regress/expected/ensure_no_shared_connection_leak.out @@ -0,0 +1,160 @@ +-- this test file is intended to be called at the end +-- of any test schedule, ensuring that there is not +-- leak/wrong calculation of the connection stats +-- in the shared memory +CREATE SCHEMA ensure_no_shared_connection_leak; +SET search_path TO ensure_no_shared_connection_leak; +-- set the cached connections to zero +-- and execute a distributed query so that +-- we end up with zero cached connections afterwards +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- disable deadlock detection and re-trigger 2PC recovery +-- once more when citus.max_cached_conns_per_worker is zero +-- so that we can be sure that the connections established for +-- maintanince daemon is closed properly. +-- this is to prevent random failures in the tests (otherwise, we +-- might see connections established for this operations) +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- now that last 2PC recovery is done, we're good to disable it +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +CREATE TABLE test (a int); +SELECT create_distributed_table('test', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- in case of MX, we should prevent deadlock detection and +-- 2PC recover from the workers as well +\c - - - :worker_1_port +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :master_port +SET search_path TO ensure_no_shared_connection_leak; +-- ensure that we only have at most citus.max_cached_conns_per_worker +-- connections per node +select + (connection_count_to_node = 0) as no_connection_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY 1; + no_connection_to_node +--------------------------------------------------------------------- + t + t +(2 rows) + +-- now, ensure this from the workers perspective +-- we should only see the connection/backend that is running the command below +SELECT + result, success +FROM + run_command_on_workers($$select count(*) from pg_stat_activity WHERE backend_type = 'client backend';$$) +ORDER BY 1, 2; + result | success +--------------------------------------------------------------------- + 1 | t + 1 | t +(2 rows) + +-- in case other tests relies on these setting, reset them +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.recover_2pc_interval; +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +DROP SCHEMA ensure_no_shared_connection_leak CASCADE; +NOTICE: drop cascades to table test diff --git a/src/test/regress/expected/shared_connection_stats.out b/src/test/regress/expected/shared_connection_stats.out new file mode 100644 index 000000000..229a00aff --- /dev/null +++ b/src/test/regress/expected/shared_connection_stats.out @@ -0,0 +1,391 @@ +CREATE SCHEMA shared_connection_stats; +SET search_path TO shared_connection_stats; +-- set the cached connections to zero +-- and execute a distributed query so that +-- we end up with zero cached connections afterwards +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- disable deadlock detection and re-trigger 2PC recovery +-- once more when citus.max_cached_conns_per_worker is zero +-- so that we can be sure that the connections established for +-- maintanince daemon is closed properly. +-- this is to prevent random failures in the tests (otherwise, we +-- might see connections established for this operations) +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- now that last 2PC recovery is done, we're good to disable it +ALTER SYSTEM SET citus.recover_2pc_interval TO '1h'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; +CREATE TABLE test (a int); +SELECT create_distributed_table('test', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test SELECT i FROM generate_series(0,100)i; +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +-- single shard queries require single connection per node +BEGIN; + SELECT count(*) FROM test WHERE a = 1; + count +--------------------------------------------------------------------- + 1 +(1 row) + + SELECT count(*) FROM test WHERE a = 2; + count +--------------------------------------------------------------------- + 1 +(1 row) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +COMMIT; +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +-- executor is only allowed to establish a single connection per node +BEGIN; + SET LOCAL citus.max_adaptive_executor_pool_size TO 1; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +COMMIT; +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +-- sequential mode is allowed to establish a single connection per node +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +COMMIT; +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +-- now, decrease the shared pool size, and still force +-- one connection per placement +ALTER SYSTEM SET citus.max_shared_pool_size TO 5; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +BEGIN; + SET LOCAL citus.node_connection_timeout TO 1000; + SET LOCAL citus.force_max_query_parallelization TO ON; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + +COMMIT; +-- pg_sleep forces almost 1 connection per placement +-- now, some of the optional connections would be skipped, +-- and only 5 connections are used per node +BEGIN; + SELECT count(*), pg_sleep(0.1) FROM test; + count | pg_sleep +--------------------------------------------------------------------- + 101 | +(1 row) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 5 + 5 +(2 rows) + +COMMIT; +SHOW citus.max_shared_pool_size; + citus.max_shared_pool_size +--------------------------------------------------------------------- + 5 +(1 row) + +-- by default max_shared_pool_size equals to max_connections; +ALTER SYSTEM RESET citus.max_shared_pool_size; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SHOW citus.max_shared_pool_size; + citus.max_shared_pool_size +--------------------------------------------------------------------- + 100 +(1 row) + +SHOW max_connections; + max_connections +--------------------------------------------------------------------- + 100 +(1 row) + +-- now, each node gets 16 connections as we force 1 connection per placement +BEGIN; + SET LOCAL citus.force_max_query_parallelization TO ON; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 16 + 16 +(2 rows) + +COMMIT; +BEGIN; + -- now allow at most 1 connection, and ensure that intermediate + -- results don't require any extra connections + SET LOCAL citus.max_adaptive_executor_pool_size TO 1; + SET LOCAL citus.task_assignment_policy TO "round-robin"; + SELECT cnt FROM (SELECT count(*) as cnt, random() FROM test LIMIT 1) as foo; + cnt +--------------------------------------------------------------------- + 101 +(1 row) + + -- queries with intermediate results don't use any extra connections + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +COMMIT; +-- now show that when max_cached_conns_per_worker > 1 +-- Citus forces the first execution to open at least 2 +-- connections that are cached. Later, that 2 cached +-- connections are user +BEGIN; + SET LOCAL citus.max_cached_conns_per_worker TO 2; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + SELECT + connection_count_to_node >= 2 + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + ?column? +--------------------------------------------------------------------- + t + t +(2 rows) + + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + SELECT + connection_count_to_node >= 2 + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + ?column? +--------------------------------------------------------------------- + t + t +(2 rows) + +COMMIT; +-- in case other tests relies on these setting, reset them +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.recover_2pc_interval; +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +DROP SCHEMA shared_connection_stats CASCADE; +NOTICE: drop cascades to table test diff --git a/src/test/regress/expected/shared_connection_waits.out b/src/test/regress/expected/shared_connection_waits.out new file mode 100644 index 000000000..b42419d48 --- /dev/null +++ b/src/test/regress/expected/shared_connection_waits.out @@ -0,0 +1,33 @@ +Parsed test spec with 3 sessions + +starting permutation: s3-lower-pool-size s1-begin s1-count-slow s3-increase-pool-size s2-select s1-commit +step s3-lower-pool-size: + SELECT set_max_shared_pool_size(5); + +set_max_shared_pool_size + + +step s1-begin: + BEGIN; + +step s1-count-slow: + SELECT pg_sleep(0.1), count(*) FROM test; + +pg_sleep count + + 101 +step s3-increase-pool-size: + SELECT set_max_shared_pool_size(100); + +set_max_shared_pool_size + + +step s2-select: + SELECT count(*) FROM test; + +count + +101 +step s1-commit: + COMMIT; + diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index 4d6236858..e7d183399 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -37,3 +37,9 @@ test: failure_connection_establishment # test that no tests leaked intermediate results. This should always be last test: ensure_no_intermediate_data_leak + +# --------- +# ensures that we never leak any connection counts +# in the shared memory +# -------- +test: ensure_no_shared_connection_leak diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 91f0a8065..4451812e5 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -62,6 +62,7 @@ test: isolation_validate_vs_insert test: isolation_insert_select_conflict test: isolation_ref2ref_foreign_keys test: isolation_multiuser_locking +test: shared_connection_waits # MX tests test: isolation_reference_on_mx diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 97c8ffdd4..8c6f81536 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -50,3 +50,9 @@ test: locally_execute_intermediate_results # test that no tests leaked intermediate results. This should always be last test: ensure_no_intermediate_data_leak + +# --------- +# ensures that we never leak any connection counts +# in the shared memory +# -------- +test: ensure_no_shared_connection_leak diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 71814636e..394652f6f 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -341,7 +341,19 @@ test: distributed_procedure # --------- test: multi_deparse_function multi_deparse_procedure +# -------- +# cannot be run in parallel with any other tests as it checks +# statistics across sessions +# -------- +test: shared_connection_stats + # --------- # test that no tests leaked intermediate results. This should always be last # --------- test: ensure_no_intermediate_data_leak + +# --------- +# ensures that we never leak any connection counts +# in the shared memory +# -------- +test: ensure_no_shared_connection_leak diff --git a/src/test/regress/multi_task_tracker_extra_schedule b/src/test/regress/multi_task_tracker_extra_schedule index 2e65ee286..e5f155cc1 100644 --- a/src/test/regress/multi_task_tracker_extra_schedule +++ b/src/test/regress/multi_task_tracker_extra_schedule @@ -115,3 +115,9 @@ test: multi_schema_support # test that no tests leaked intermediate results. This should always be last # ---------- test: ensure_no_intermediate_data_leak + +# --------- +# ensures that we never leak any connection counts +# in the shared memory +# -------- +test: ensure_no_shared_connection_leak diff --git a/src/test/regress/spec/shared_connection_waits.spec b/src/test/regress/spec/shared_connection_waits.spec new file mode 100644 index 000000000..80a8cb9e3 --- /dev/null +++ b/src/test/regress/spec/shared_connection_waits.spec @@ -0,0 +1,65 @@ +setup +{ + CREATE OR REPLACE FUNCTION wake_up_connection_pool_waiters() + RETURNS void + LANGUAGE C STABLE STRICT + AS 'citus', $$wake_up_connection_pool_waiters$$; + + CREATE OR REPLACE FUNCTION set_max_shared_pool_size(int) + RETURNS void + LANGUAGE C STABLE STRICT + AS 'citus', $$set_max_shared_pool_size$$; + + CREATE TABLE test (a int, b int); + SET citus.shard_count TO 32; + SELECT create_distributed_table('test', 'a'); + INSERT INTO test SELECT i, i FROM generate_series(0,100)i; +} + +teardown +{ + + SELECT set_max_shared_pool_size(100); + DROP FUNCTION wake_up_connection_pool_waiters(); + DROP FUNCTION set_max_shared_pool_size(int); +} + +session "s1" + + +step "s1-begin" +{ + BEGIN; +} + +step "s1-count-slow" +{ + SELECT pg_sleep(0.1), count(*) FROM test; +} + +step "s1-commit" +{ + COMMIT; +} + +session "s2" + +step "s2-select" +{ + SELECT count(*) FROM test; +} + +session "s3" + +step "s3-lower-pool-size" +{ + SELECT set_max_shared_pool_size(5); +} + +step "s3-increase-pool-size" +{ + SELECT set_max_shared_pool_size(100); +} + +permutation "s3-lower-pool-size" "s1-begin" "s1-count-slow" "s3-increase-pool-size" "s2-select" "s1-commit" + diff --git a/src/test/regress/sql/ensure_no_shared_connection_leak.sql b/src/test/regress/sql/ensure_no_shared_connection_leak.sql new file mode 100644 index 000000000..df4847cfb --- /dev/null +++ b/src/test/regress/sql/ensure_no_shared_connection_leak.sql @@ -0,0 +1,83 @@ +-- this test file is intended to be called at the end +-- of any test schedule, ensuring that there is not +-- leak/wrong calculation of the connection stats +-- in the shared memory +CREATE SCHEMA ensure_no_shared_connection_leak; +SET search_path TO ensure_no_shared_connection_leak; + +-- set the cached connections to zero +-- and execute a distributed query so that +-- we end up with zero cached connections afterwards +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + +-- disable deadlock detection and re-trigger 2PC recovery +-- once more when citus.max_cached_conns_per_worker is zero +-- so that we can be sure that the connections established for +-- maintanince daemon is closed properly. +-- this is to prevent random failures in the tests (otherwise, we +-- might see connections established for this operations) +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +-- now that last 2PC recovery is done, we're good to disable it +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); + +CREATE TABLE test (a int); +SELECT create_distributed_table('test', 'a'); +SELECT count(*) FROM test; + +-- in case of MX, we should prevent deadlock detection and +-- 2PC recover from the workers as well +\c - - - :worker_1_port +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); +\c - - - :worker_2_port +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); + +\c - - - :master_port +SET search_path TO ensure_no_shared_connection_leak; + +-- ensure that we only have at most citus.max_cached_conns_per_worker +-- connections per node +select + (connection_count_to_node = 0) as no_connection_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY 1; + +-- now, ensure this from the workers perspective +-- we should only see the connection/backend that is running the command below +SELECT + result, success +FROM + run_command_on_workers($$select count(*) from pg_stat_activity WHERE backend_type = 'client backend';$$) +ORDER BY 1, 2; + + +-- in case other tests relies on these setting, reset them +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.recover_2pc_interval; +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +SELECT pg_reload_conf(); + +DROP SCHEMA ensure_no_shared_connection_leak CASCADE; diff --git a/src/test/regress/sql/node_conninfo_reload.sql b/src/test/regress/sql/node_conninfo_reload.sql index dda8dae21..3754130a7 100644 --- a/src/test/regress/sql/node_conninfo_reload.sql +++ b/src/test/regress/sql/node_conninfo_reload.sql @@ -10,7 +10,6 @@ select create_distributed_table('test', 'a'); -- Make sure a connection is opened and cached select count(*) from test where a = 0; - show citus.node_conninfo; -- Set sslmode to something that does not work when connecting @@ -30,8 +29,8 @@ show citus.node_conninfo; -- Should work again select count(*) from test where a = 0; - ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist'; + BEGIN; -- Should still work (no SIGHUP yet); select count(*) from test where a = 0; @@ -43,19 +42,15 @@ show citus.node_conninfo; -- query select count(*) from test where a = 0; COMMIT; - -- Should fail now with connection error, when transaction is finished select count(*) from test where a = 0; - -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; select pg_reload_conf(); select pg_sleep(0.1); -- wait for config reload to apply show citus.node_conninfo; - -- Should work again select count(*) from test where a = 0; - ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist'; BEGIN; -- Should still work (no SIGHUP yet); @@ -68,10 +63,8 @@ show citus.node_conninfo; -- query select count(*) from test where a = 0; COMMIT; - -- Should fail now, when transaction is finished select count(*) from test where a = 0; - -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; select pg_reload_conf(); diff --git a/src/test/regress/sql/shared_connection_stats.sql b/src/test/regress/sql/shared_connection_stats.sql new file mode 100644 index 000000000..8200e416d --- /dev/null +++ b/src/test/regress/sql/shared_connection_stats.sql @@ -0,0 +1,228 @@ +CREATE SCHEMA shared_connection_stats; +SET search_path TO shared_connection_stats; + +-- set the cached connections to zero +-- and execute a distributed query so that +-- we end up with zero cached connections afterwards +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + +-- disable deadlock detection and re-trigger 2PC recovery +-- once more when citus.max_cached_conns_per_worker is zero +-- so that we can be sure that the connections established for +-- maintanince daemon is closed properly. +-- this is to prevent random failures in the tests (otherwise, we +-- might see connections established for this operations) +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +-- now that last 2PC recovery is done, we're good to disable it +ALTER SYSTEM SET citus.recover_2pc_interval TO '1h'; +SELECT pg_reload_conf(); + +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; +CREATE TABLE test (a int); +SELECT create_distributed_table('test', 'a'); +INSERT INTO test SELECT i FROM generate_series(0,100)i; + +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + +-- single shard queries require single connection per node +BEGIN; + SELECT count(*) FROM test WHERE a = 1; + SELECT count(*) FROM test WHERE a = 2; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + +-- executor is only allowed to establish a single connection per node +BEGIN; + SET LOCAL citus.max_adaptive_executor_pool_size TO 1; + SELECT count(*) FROM test; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + +-- sequential mode is allowed to establish a single connection per node +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT count(*) FROM test; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + +-- now, decrease the shared pool size, and still force +-- one connection per placement +ALTER SYSTEM SET citus.max_shared_pool_size TO 5; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +BEGIN; + SET LOCAL citus.node_connection_timeout TO 1000; + SET LOCAL citus.force_max_query_parallelization TO ON; + SELECT count(*) FROM test; +COMMIT; + +-- pg_sleep forces almost 1 connection per placement +-- now, some of the optional connections would be skipped, +-- and only 5 connections are used per node +BEGIN; + SELECT count(*), pg_sleep(0.1) FROM test; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + + +SHOW citus.max_shared_pool_size; + +-- by default max_shared_pool_size equals to max_connections; +ALTER SYSTEM RESET citus.max_shared_pool_size; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +SHOW citus.max_shared_pool_size; +SHOW max_connections; + +-- now, each node gets 16 connections as we force 1 connection per placement +BEGIN; + SET LOCAL citus.force_max_query_parallelization TO ON; + SELECT count(*) FROM test; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + +BEGIN; + -- now allow at most 1 connection, and ensure that intermediate + -- results don't require any extra connections + SET LOCAL citus.max_adaptive_executor_pool_size TO 1; + SET LOCAL citus.task_assignment_policy TO "round-robin"; + SELECT cnt FROM (SELECT count(*) as cnt, random() FROM test LIMIT 1) as foo; + + -- queries with intermediate results don't use any extra connections + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + + +-- now show that when max_cached_conns_per_worker > 1 +-- Citus forces the first execution to open at least 2 +-- connections that are cached. Later, that 2 cached +-- connections are user +BEGIN; + SET LOCAL citus.max_cached_conns_per_worker TO 2; + SELECT count(*) FROM test; + SELECT + connection_count_to_node >= 2 + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + SELECT count(*) FROM test; + SELECT + connection_count_to_node >= 2 + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + +-- in case other tests relies on these setting, reset them +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.recover_2pc_interval; +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +SELECT pg_reload_conf(); + +DROP SCHEMA shared_connection_stats CASCADE;