diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 2dfd4386f..0b5e3d109 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -64,7 +64,6 @@ #include "utils/lsyscache.h" #include "utils/syscache.h" - bool EnableDDLPropagation = true; /* ddl propagation is enabled */ PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */ static bool shouldInvalidateForeignKeyGraph = false; diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index f9b13f8ce..fcdceea8b 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -28,11 +28,13 @@ #include "distributed/hash_helpers.h" #include "distributed/placement_connection.h" #include "distributed/run_from_same_connection.h" +#include "distributed/shared_connection_stats.h" #include "distributed/cancel_utils.h" #include "distributed/remote_commands.h" #include "distributed/version_compat.h" #include "mb/pg_wchar.h" #include "portability/instr_time.h" +#include "storage/ipc.h" #include "utils/hsearch.h" #include "utils/memutils.h" @@ -46,7 +48,8 @@ MemoryContext ConnectionContext = NULL; static uint32 ConnectionHashHash(const void *key, Size keysize); static int ConnectionHashCompare(const void *a, const void *b, Size keysize); -static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); +static void StartConnectionEstablishment(MultiConnection *connectionn, + ConnectionHashKey *key); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static bool ShouldShutdownConnection(MultiConnection *connection, const int @@ -78,7 +81,7 @@ static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount); static void CloseNotReadyMultiConnectionStates(List *connectionStates); static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState); - +static void CitusPQFinish(MultiConnection *connection); static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL; @@ -257,7 +260,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const char *user, const char *database) { ConnectionHashKey key; - MultiConnection *connection; bool found; /* do some minimal input checks */ @@ -310,24 +312,75 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, if (!(flags & FORCE_NEW_CONNECTION)) { /* check connection cache for a connection that's not already in use */ - connection = FindAvailableConnection(entry->connections, flags); + MultiConnection *connection = FindAvailableConnection(entry->connections, flags); if (connection) { return connection; } } + /* * Either no caching desired, or no pre-established, non-claimed, * connection present. Initiate connection establishment. */ - - connection = StartConnectionEstablishment(&key); - + MultiConnection *connection = MemoryContextAllocZero(ConnectionContext, + sizeof(MultiConnection)); + connection->initilizationState = POOL_STATE_NOT_INITIALIZED; dlist_push_tail(entry->connections, &connection->connectionNode); + /* these two flags are by nature cannot happen at the same time */ + Assert(!((flags & WAIT_FOR_CONNECTION) && (flags & OPTIONAL_CONNECTION))); + + if (flags & WAIT_FOR_CONNECTION) + { + WaitLoopForSharedConnection(hostname, port); + } + else if (flags & OPTIONAL_CONNECTION) + { + /* + * We can afford to skip establishing an optional connection. For + * non-optional connections, we first retry for some time. If we still + * cannot reserve the right to establish a connection, we prefer to + * error out. + */ + if (!TryToIncrementSharedConnectionCounter(hostname, port)) + { + /* do not track the connection anymore */ + dlist_delete(&connection->connectionNode); + pfree(connection); + + return NULL; + } + } + else + { + /* + * The caller doesn't want the connection manager to wait + * until a connection slot is available on the remote node. + * In the end, we might fail to establish connection to the + * remote node as it might not have any space in + * max_connections for this connection establishment. + * + * Still, we keep track of the connnection counter. + */ + IncrementSharedConnectionCounter(hostname, port); + } + + + /* + * We've already incremented the counter above, so we should decrement + * when we're done with the connection. + */ + connection->initilizationState = POOL_STATE_COUNTER_INCREMENTED; + + StartConnectionEstablishment(connection, &key); + ResetShardPlacementAssociation(connection); + /* fully initialized the connection, record it */ + connection->initilizationState = POOL_STATE_INITIALIZED; + return connection; } @@ -456,8 +509,7 @@ CloseConnection(MultiConnection *connection) bool found; /* close connection */ - PQfinish(connection->pgConn); - connection->pgConn = NULL; + CitusPQFinish(connection); strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH); key.port = connection->port; @@ -537,8 +589,7 @@ ShutdownConnection(MultiConnection *connection) { SendCancelationRequest(connection); } - PQfinish(connection->pgConn); - connection->pgConn = NULL; + CitusPQFinish(connection); } @@ -903,9 +954,30 @@ CloseNotReadyMultiConnectionStates(List *connectionStates) } /* close connection, otherwise we take up resource on the other side */ + CitusPQFinish(connection); + } +} + + +/* + * CitusPQFinish is a wrapper around PQfinish and does book keeping on shared connection + * counters. + */ +static void +CitusPQFinish(MultiConnection *connection) +{ + if (connection->pgConn != NULL) + { PQfinish(connection->pgConn); connection->pgConn = NULL; } + + /* behave idempotently, there is no gurantee that CitusPQFinish() is called once */ + if (connection->initilizationState >= POOL_STATE_COUNTER_INCREMENTED) + { + DecrementSharedConnectionCounter(connection->hostname, connection->port); + connection->initilizationState = POOL_STATE_NOT_INITIALIZED; + } } @@ -985,8 +1057,8 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize) * Asynchronously establish connection to a remote node, but don't wait for * that to finish. DNS lookups etc. are performed synchronously though. */ -static MultiConnection * -StartConnectionEstablishment(ConnectionHashKey *key) +static void +StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key) { bool found = false; static uint64 connectionId = 1; @@ -1018,9 +1090,6 @@ StartConnectionEstablishment(ConnectionHashKey *key) entry->isValid = true; } - MultiConnection *connection = MemoryContextAllocZero(ConnectionContext, - sizeof(MultiConnection)); - strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH); connection->port = key->port; strlcpy(connection->database, key->database, NAMEDATALEN); @@ -1040,8 +1109,6 @@ StartConnectionEstablishment(ConnectionHashKey *key) PQsetnonblocking(connection->pgConn, true); SetCitusNoticeProcessor(connection); - - return connection; } @@ -1166,6 +1233,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection } return isCitusInitiatedBackend || + connection->initilizationState != POOL_STATE_INITIALIZED || cachedConnectionCount >= MaxCachedConnectionsPerWorker || connection->forceCloseAtTransactionEnd || PQstatus(connection->pgConn) != CONNECTION_OK || diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 7fb9750bc..ffcd3ca30 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -20,20 +20,27 @@ #include "access/hash.h" #include "access/htup_details.h" #include "catalog/pg_authid.h" +#include "commands/dbcommands.h" +#include "distributed/cancel_utils.h" #include "distributed/connection_management.h" +#include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/shared_connection_stats.h" +#include "distributed/time_constants.h" #include "distributed/tuplestore.h" +#include "utils/builtins.h" #include "utils/hashutils.h" #include "utils/hsearch.h" #include "storage/ipc.h" -#define REMOTE_CONNECTION_STATS_COLUMNS 6 +#define REMOTE_CONNECTION_STATS_COLUMNS 4 +#define ADJUST_POOLSIZE_AUTOMATICALLY 0 +#define DISABLE_CONNECTION_THROTTLING -1 /* - * The data structure used to store data in shared memory. This data structure only + * The data structure used to store data in shared memory. This data structure is only * used for storing the lock. The actual statistics about the connections are stored * in the hashmap, which is allocated separately, as Postgres provides different APIs * for allocating hashmaps in the shared memory. @@ -42,22 +49,26 @@ typedef struct ConnectionStatsSharedData { int sharedConnectionHashTrancheId; char *sharedConnectionHashTrancheName; + LWLock sharedConnectionHashLock; + ConditionVariable waitersConditionVariable; } ConnectionStatsSharedData; typedef struct SharedConnStatsHashKey { /* - * Using nodeId (over hostname/hostport) make the tracking resiliant to - * master_update_node(). Plus, requires a little less memory. + * We keep the entries in the shared memory even after master_update_node() + * as there might be some cached connections to the old node. + * That's why, we prefer to use "hostname/port" over nodeId. */ - uint32 nodeId; + char hostname[MAX_NODE_LENGTH]; + int32 port; /* * Given that citus.shared_max_pool_size can be defined per database, we * should keep track of shared connections per database. */ - char database[NAMEDATALEN]; + Oid databaseOid; } SharedConnStatsHashKey; /* hash entry for per worker stats */ @@ -70,16 +81,15 @@ typedef struct SharedConnStatsHashEntry /* - * Controlled via a GUC. - * - * By default, Citus tracks 1024 worker nodes, which is already - * very unlikely number of worker nodes. Given that the shared - * memory required per worker is pretty small (~120 Bytes), we think it - * is a good default that wouldn't hurt any users in any dimension. + * Controlled via a GUC, never access directly, use GetMaxSharedPoolSize(). + * "0" means adjust MaxSharedPoolSize automatically by using MaxConnections. + * "-1" means do not apply connection throttling + * Anything else means use that number */ -int MaxTrackedWorkerNodes = 1024; +int MaxSharedPoolSize = 0; -/* the following two structs used for accessing shared memory */ + +/* the following two structs are used for accessing shared memory */ static HTAB *SharedConnStatsHash = NULL; static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL; @@ -88,9 +98,10 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL; /* local function declarations */ -static void StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); -static void UnLockConnectionSharedMemory(void); +static void StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc + tupleDescriptor); static void LockConnectionSharedMemory(LWLockMode lockMode); +static void UnLockConnectionSharedMemory(void); static void SharedConnectionStatsShmemInit(void); static size_t SharedConnectionStatsShmemSize(void); static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); @@ -112,7 +123,7 @@ citus_remote_connection_stats(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); - StoreAllConnections(tupleStore, tupleDescriptor); + StoreAllRemoteConnectionStats(tupleStore, tupleDescriptor); /* clean up and return the tuplestore */ tuplestore_donestoring(tupleStore); @@ -122,19 +133,19 @@ citus_remote_connection_stats(PG_FUNCTION_ARGS) /* - * StoreAllConnections gets connections established from the current node + * StoreAllRemoteConnectionStats gets connections established from the current node * and inserts them into the given tuplestore. * * We don't need to enforce any access privileges as the number of backends * on any node is already visible on pg_stat_activity to all users. */ static void -StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) +StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) { Datum values[REMOTE_CONNECTION_STATS_COLUMNS]; bool isNulls[REMOTE_CONNECTION_STATS_COLUMNS]; - /* we're reading all distributed transactions, prevent new backends */ + /* we're reading all shared connections, prevent any changes */ LockConnectionSharedMemory(LW_SHARED); HASH_SEQ_STATUS status; @@ -147,9 +158,17 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) memset(values, 0, sizeof(values)); memset(isNulls, false, sizeof(isNulls)); - values[0] = Int32GetDatum(connectionEntry->key.nodeId); - values[1] = PointerGetDatum(connectionEntry->key.database); - values[2] = Int32GetDatum(connectionEntry->connectionCount); + char *databaseName = get_database_name(connectionEntry->key.databaseOid); + if (databaseName == NULL) + { + /* database might have been dropped */ + continue; + } + + values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.hostname)); + values[1] = Int32GetDatum(connectionEntry->key.port); + values[2] = PointerGetDatum(cstring_to_text(databaseName)); + values[3] = Int32GetDatum(connectionEntry->connectionCount); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); } @@ -158,6 +177,313 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) } +/* + * RemoveInactiveNodesFromSharedConnections goes over the SharedConnStatsHash + * and removes the inactive entries. + */ +void +RemoveInactiveNodesFromSharedConnections(void) +{ + /* we're modifying connections, prevent any changes */ + LockConnectionSharedMemory(LW_EXCLUSIVE); + + HASH_SEQ_STATUS status; + SharedConnStatsHashEntry *connectionEntry = NULL; + + /* + * In the first iteration, try to remove worker nodes that doesn't have any active + * conections and the node does not exits in the metadata anymore. + */ + hash_seq_init(&status, SharedConnStatsHash); + while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0) + { + SharedConnStatsHashKey connectionKey = connectionEntry->key; + WorkerNode *workerNode = + FindWorkerNode(connectionKey.hostname, connectionKey.port); + + if (connectionEntry->connectionCount == 0 && + (workerNode == NULL || !workerNode->isActive)) + { + hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL); + } + } + + int entryCount = hash_get_num_entries(SharedConnStatsHash); + if (entryCount + 1 < MaxWorkerNodesTracked) + { + /* we're good, we have at least one more space for a new worker */ + UnLockConnectionSharedMemory(); + + return; + } + + /* + * We aimed to remove nodes that don't have any open connections. If we + * failed to find one, we have to be more aggressive and remove at least + * one of the inactive ones. + */ + hash_seq_init(&status, SharedConnStatsHash); + while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0) + { + SharedConnStatsHashKey connectionKey = connectionEntry->key; + WorkerNode *workerNode = + FindWorkerNode(connectionKey.hostname, connectionKey.port); + + if (workerNode == NULL || !workerNode->isActive) + { + hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL); + + hash_seq_term(&status); + + break; + } + } + + + UnLockConnectionSharedMemory(); +} + + +/* + * GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled + * via a GUC. + * "0" means adjust MaxSharedPoolSize automatically by using MaxConnections + * "-1" means do not apply connection throttling + * Anything else means use that number + */ +int +GetMaxSharedPoolSize(void) +{ + if (MaxSharedPoolSize == ADJUST_POOLSIZE_AUTOMATICALLY) + { + return MaxConnections; + } + + return MaxSharedPoolSize; +} + + +/* + * WaitLoopForSharedConnection tries to increment the shared connection + * counter for the given hostname/port and the current database in + * SharedConnStatsHash. + * + * The function implements a retry mechanism via a condition variable. + */ +void +WaitLoopForSharedConnection(const char *hostname, int port) +{ + while (!TryToIncrementSharedConnectionCounter(hostname, port)) + { + CHECK_FOR_INTERRUPTS(); + + WaitForSharedConnection(); + } + + ConditionVariableCancelSleep(); +} + + +/* + * TryToIncrementSharedConnectionCounter tries to increment the shared + * connection counter for the given nodeId and the current database in + * SharedConnStatsHash. + * + * If the function returns true, the caller is allowed (and expected) + * to establish a new connection to the given node. Else, the caller + * is not allowed to establish a new connection. + */ +bool +TryToIncrementSharedConnectionCounter(const char *hostname, int port) +{ + if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) + { + /* connection throttling disabled */ + return true; + } + + bool counterIncremented = false; + SharedConnStatsHashKey connKey; + + strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + + connKey.port = port; + connKey.databaseOid = MyDatabaseId; + + LockConnectionSharedMemory(LW_EXCLUSIVE); + + /* + * As the hash map is allocated in shared memory, it doesn't rely on palloc for + * memory allocation, so we could get NULL via HASH_ENTER_NULL when there is no + * space in the shared memory. That's why we prefer continuing the execution + * instead of throwing an error. + */ + bool entryFound = false; + SharedConnStatsHashEntry *connectionEntry = + hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound); + + /* + * It is possible to throw an error at this point, but that doesn't help us in anyway. + * Instead, we try our best, let the connection establishment continue by-passing the + * connection throttling. + */ + if (!connectionEntry) + { + UnLockConnectionSharedMemory(); + return true; + } + + if (!entryFound) + { + /* we successfully allocated the entry for the first time, so initialize it */ + connectionEntry->connectionCount = 1; + + counterIncremented = true; + } + else if (connectionEntry->connectionCount + 1 > GetMaxSharedPoolSize()) + { + /* there is no space left for this connection */ + counterIncremented = false; + } + else + { + connectionEntry->connectionCount++; + counterIncremented = true; + } + + UnLockConnectionSharedMemory(); + + return counterIncremented; +} + + +/* + * IncrementSharedConnectionCounter increments the shared counter + * for the given hostname and port. + */ +void +IncrementSharedConnectionCounter(const char *hostname, int port) +{ + SharedConnStatsHashKey connKey; + + if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) + { + /* connection throttling disabled */ + return; + } + + strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + + connKey.port = port; + connKey.databaseOid = MyDatabaseId; + + LockConnectionSharedMemory(LW_EXCLUSIVE); + + /* + * As the hash map is allocated in shared memory, it doesn't rely on palloc for + * memory allocation, so we could get NULL via HASH_ENTER_NULL. That's why we prefer + * continuing the execution instead of throwing an error. + */ + bool entryFound = false; + SharedConnStatsHashEntry *connectionEntry = + hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound); + + /* + * It is possible to throw an error at this point, but that doesn't help us in anyway. + * Instead, we try our best, let the connection establishment continue by-passing the + * connection throttling. + */ + if (!connectionEntry) + { + UnLockConnectionSharedMemory(); + + ereport(DEBUG4, (errmsg("No entry found for node %s:%d while incrementing " + "connection counter", hostname, port))); + + return; + } + + if (!entryFound) + { + /* we successfully allocated the entry for the first time, so initialize it */ + connectionEntry->connectionCount = 0; + } + + connectionEntry->connectionCount += 1; + + UnLockConnectionSharedMemory(); +} + + +/* + * DecrementSharedConnectionCounter decrements the shared counter + * for the given hostname and port. + */ +void +DecrementSharedConnectionCounter(const char *hostname, int port) +{ + SharedConnStatsHashKey connKey; + + if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) + { + /* connection throttling disabled */ + return; + } + + strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + + connKey.port = port; + connKey.databaseOid = MyDatabaseId; + + LockConnectionSharedMemory(LW_EXCLUSIVE); + + bool entryFound = false; + SharedConnStatsHashEntry *connectionEntry = + hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound); + + /* this worker node is removed or updated, no need to care */ + if (!entryFound) + { + UnLockConnectionSharedMemory(); + + /* wake up any waiters in case any backend is waiting for this node */ + WakeupWaiterBackendsForSharedConnection(); + + ereport(DEBUG4, (errmsg("No entry found for node %s:%d while decrementing " + "connection counter", hostname, port))); + + return; + } + + /* we should never go below 0 */ + Assert(connectionEntry->connectionCount > 0); + + connectionEntry->connectionCount -= 1; + + UnLockConnectionSharedMemory(); + + WakeupWaiterBackendsForSharedConnection(); +} + + /* * LockConnectionSharedMemory is a utility function that should be used when * accessing to the SharedConnStatsHash, which is in the shared memory. @@ -180,6 +506,43 @@ UnLockConnectionSharedMemory(void) } +/* + * WakeupWaiterBackendsForSharedConnection is a wrapper around the condition variable + * broadcast operation. + * + * We use a single condition variable, for all worker nodes, to implement the connection + * throttling mechanism. Combination of all the backends are allowed to establish + * MaxSharedPoolSize number of connections per worker node. If a backend requires a + * non-optional connection (see WAIT_FOR_CONNECTION for details), it is not allowed + * to establish it immediately if the total connections are equal to MaxSharedPoolSize. + * Instead, the backend waits on the condition variable. When any other backend + * terminates an existing connection to any remote node, this function is called. + * The main goal is to trigger all waiting backends to try getting a connection slot + * in MaxSharedPoolSize. The ones which can get connection slot are allowed to continue + * with the connection establishments. Others should wait another backend to call + * this function. + */ +void +WakeupWaiterBackendsForSharedConnection(void) +{ + ConditionVariableBroadcast(&ConnectionStatsSharedState->waitersConditionVariable); +} + + +/* + * WaitForSharedConnection is a wrapper around the condition variable sleep operation. + * + * For the details of the use of the condition variable, see + * WakeupWaiterBackendsForSharedConnection(). + */ +void +WaitForSharedConnection(void) +{ + ConditionVariableSleep(&ConnectionStatsSharedState->waitersConditionVariable, + PG_WAIT_EXTENSION); +} + + /* * InitializeSharedConnectionStats requests the necessary shared memory * from Postgres and sets up the shared memory startup hook. @@ -208,9 +571,8 @@ SharedConnectionStatsShmemSize(void) Size size = 0; size = add_size(size, sizeof(ConnectionStatsSharedData)); - size = add_size(size, mul_size(sizeof(LWLock), MaxTrackedWorkerNodes)); - Size hashSize = hash_estimate_size(MaxTrackedWorkerNodes, + Size hashSize = hash_estimate_size(MaxWorkerNodesTracked, sizeof(SharedConnStatsHashEntry)); size = add_size(size, hashSize); @@ -229,7 +591,7 @@ SharedConnectionStatsShmemInit(void) bool alreadyInitialized = false; HASHCTL info; - /* create (nodeId,database) -> [counter] */ + /* create (hostname, port, database) -> [counter] */ memset(&info, 0, sizeof(info)); info.keysize = sizeof(SharedConnStatsHashKey); info.entrysize = sizeof(SharedConnStatsHashEntry); @@ -260,12 +622,14 @@ SharedConnectionStatsShmemInit(void) LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock, ConnectionStatsSharedState->sharedConnectionHashTrancheId); + + ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable); } /* allocate hash table */ SharedConnStatsHash = - ShmemInitHash("Shared Conn. Stats Hash", MaxTrackedWorkerNodes, - MaxTrackedWorkerNodes, &info, hashFlags); + ShmemInitHash("Shared Conn. Stats Hash", MaxWorkerNodesTracked, + MaxWorkerNodesTracked, &info, hashFlags); LWLockRelease(AddinShmemInitLock); @@ -284,8 +648,9 @@ SharedConnectionHashHash(const void *key, Size keysize) { SharedConnStatsHashKey *entry = (SharedConnStatsHashKey *) key; - uint32 hash = hash_uint32(entry->nodeId); - hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN)); + uint32 hash = string_hash(entry->hostname, NAMEDATALEN); + hash = hash_combine(hash, hash_uint32(entry->port)); + hash = hash_combine(hash, hash_uint32(entry->databaseOid)); return hash; } @@ -297,8 +662,9 @@ SharedConnectionHashCompare(const void *a, const void *b, Size keysize) SharedConnStatsHashKey *ca = (SharedConnStatsHashKey *) a; SharedConnStatsHashKey *cb = (SharedConnStatsHashKey *) b; - if (ca->nodeId != cb->nodeId || - strncmp(ca->database, cb->database, NAMEDATALEN) != 0) + if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 || + ca->port != cb->port || + ca->databaseOid != cb->databaseOid) { return 1; } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 4f8863edc..10d411bea 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -589,6 +589,7 @@ static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection); static void ManageWorkerPool(WorkerPool *workerPool); +static bool ShouldWaitForConnection(WorkerPool *workerPool); static void CheckConnectionTimeout(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); @@ -1944,9 +1945,6 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node workerPool->nodeName = pstrdup(nodeName); workerPool->nodePort = nodePort; - INSTR_TIME_SET_ZERO(workerPool->poolStartTime); - workerPool->distributedExecution = execution; - /* "open" connections aggressively when there are cached connections */ int nodeConnectionCount = MaxCachedConnectionsPerWorker; workerPool->maxNewConnectionsPerCycle = Max(1, nodeConnectionCount); @@ -1954,6 +1952,8 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node dlist_init(&workerPool->pendingTaskQueue); dlist_init(&workerPool->readyTaskQueue); + workerPool->distributedExecution = execution; + execution->workerList = lappend(execution->workerList, workerPool); return workerPool; @@ -2385,11 +2385,48 @@ ManageWorkerPool(WorkerPool *workerPool) connectionFlags |= OUTSIDE_TRANSACTION; } + if (UseConnectionPerPlacement()) + { + /* + * User wants one connection per placement, so no throttling is desired + * and we do not set any flags. + * + * The primary reason for this is that allowing multiple backends to use + * connection per placement could lead to unresolved self deadlocks. In other + * words, each backend may stuck waiting for other backends to get a slot + * in the shared connection counters. + */ + } + else if (ShouldWaitForConnection(workerPool)) + { + /* + * We need this connection to finish the execution. If it is not + * available based on the current number of connections to the worker + * then wait for it. + */ + connectionFlags |= WAIT_FOR_CONNECTION; + } + else + { + /* + * The executor can finish the execution with a single connection, + * remaining are optional. If the executor can get more connections, + * it can increase the parallelism. + */ + connectionFlags |= OPTIONAL_CONNECTION; + } + /* open a new connection to the worker */ MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags, workerPool->nodeName, workerPool->nodePort, NULL, NULL); + if (!connection) + { + /* connection can only be NULL for optional connections */ + Assert((connectionFlags & OPTIONAL_CONNECTION)); + continue; + } /* * Assign the initial state in the connection state machine. The connection @@ -2404,6 +2441,17 @@ ManageWorkerPool(WorkerPool *workerPool) */ connection->claimedExclusively = true; + if (list_length(workerPool->sessionList) == 0) + { + /* + * The worker pool has just started to establish connections. We need to + * defer this initilization after StartNodeUserDatabaseConnection() + * because for non-optional connections, we have some logic to wait + * until a connection is allowed to be established. + */ + INSTR_TIME_SET_ZERO(workerPool->poolStartTime); + } + /* create a session for the connection */ WorkerSession *session = FindOrCreateWorkerSession(workerPool, connection); @@ -2416,6 +2464,42 @@ ManageWorkerPool(WorkerPool *workerPool) } +/* + * ShouldWaitForConnection returns true if the workerPool should wait to + * get the next connection until one slot is empty within + * citus.max_shared_pool_size on the worker. Note that, if there is an + * empty slot, the connection will not wait anyway. + */ +static bool +ShouldWaitForConnection(WorkerPool *workerPool) +{ + if (list_length(workerPool->sessionList) == 0) + { + /* + * We definitely need at least 1 connection to finish the execution. + * All single shard queries hit here with the default settings. + */ + return true; + } + + if (list_length(workerPool->sessionList) < MaxCachedConnectionsPerWorker) + { + /* + * Until this session caches MaxCachedConnectionsPerWorker connections, + * this might lead some optional connections to be considered as non-optional + * when MaxCachedConnectionsPerWorker > 1. + * + * However, once the session caches MaxCachedConnectionsPerWorker (which is + * the second transaction executed in the session), Citus would utilize the + * cached connections as much as possible. + */ + return true; + } + + return false; +} + + /* * CheckConnectionTimeout makes sure that the execution enforces the connection * establishment timeout defined by the user (NodeConnectionTimeout). diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 768a9e1df..1489f97ac 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -39,6 +39,7 @@ #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" +#include "distributed/shared_connection_stats.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" #include "lib/stringinfo.h" @@ -293,6 +294,9 @@ master_disable_node(PG_FUNCTION_ARGS) bool onlyConsiderActivePlacements = false; MemoryContext savedContext = CurrentMemoryContext; + /* remove the shared connection counters to have some space */ + RemoveInactiveNodesFromSharedConnections(); + PG_TRY(); { if (NodeIsPrimary(workerNode)) @@ -604,6 +608,9 @@ ActivateNode(char *nodeName, int nodePort) { bool isActive = true; + /* remove the shared connection counters to have some space */ + RemoveInactiveNodesFromSharedConnections(); + /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); @@ -646,6 +653,9 @@ master_update_node(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); + /* remove the shared connection counters to have some space */ + RemoveInactiveNodesFromSharedConnections(); + WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString, newNodePort); if (workerNodeWithSameAddress != NULL) @@ -672,6 +682,7 @@ master_update_node(PG_FUNCTION_ARGS) errmsg("node %u not found", nodeId))); } + /* * If the node is a primary node we block reads and writes. * @@ -715,8 +726,9 @@ master_update_node(PG_FUNCTION_ARGS) UpdateNodeLocation(nodeId, newNodeNameString, newNodePort); - strlcpy(workerNode->workerName, newNodeNameString, WORKER_LENGTH); - workerNode->workerPort = newNodePort; + /* we should be able to find the new node from the metadata */ + workerNode = FindWorkerNode(newNodeNameString, newNodePort); + Assert(workerNode->nodeId == nodeId); /* * Propagate the updated pg_dist_node entry to all metadata workers. @@ -1012,8 +1024,10 @@ ReadDistNode(bool includeNodesFromOtherClusters) static void RemoveNodeFromCluster(char *nodeName, int32 nodePort) { - WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); + /* remove the shared connection counters to have some space */ + RemoveInactiveNodesFromSharedConnections(); + WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); if (NodeIsPrimary(workerNode)) { bool onlyConsiderActivePlacements = false; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 156acf630..7811b1236 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -73,6 +73,7 @@ #include "distributed/adaptive_executor.h" #include "port/atomics.h" #include "postmaster/postmaster.h" +#include "storage/ipc.h" #include "optimizer/planner.h" #include "optimizer/paths.h" #include "tcop/tcopprot.h" @@ -89,9 +90,10 @@ static char *CitusVersion = CITUS_VERSION; void _PG_init(void); -static void CitusBackendAtExit(void); static void ResizeStackToMaximumDepth(void); static void multi_log_hook(ErrorData *edata); +static void RegisterConnectionCleanup(void); +static void CitusCleanupConnectionsAtExit(int code, Datum arg); static void CreateRequiredDirectories(void); static void RegisterCitusConfigVariables(void); static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra, @@ -99,6 +101,7 @@ static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra, static bool WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source); static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source); static void NodeConninfoGucAssignHook(const char *newval, void *extra); +static const char * MaxSharedPoolSizeGucShowHook(void); static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source); @@ -274,8 +277,6 @@ _PG_init(void) InitializeCitusQueryStats(); InitializeSharedConnectionStats(); - atexit(CitusBackendAtExit); - /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) { @@ -285,18 +286,6 @@ _PG_init(void) } -/* - * CitusBackendAtExit is called atexit of the backend for the purposes of - * any clean-up needed. - */ -static void -CitusBackendAtExit(void) -{ - /* properly close all the cached connections */ - ShutdownAllConnections(); -} - - /* * Stack size increase during high memory load may cause unexpected crashes. * With this alloca call, we are increasing stack size explicitly, so that if @@ -382,6 +371,37 @@ StartupCitusBackend(void) { InitializeMaintenanceDaemonBackend(); InitializeBackendData(); + RegisterConnectionCleanup(); +} + + +/* + * RegisterConnectionCleanup cleans up any resources left at the end of the + * session. We prefer to cleanup before shared memory exit to make sure that + * this session properly releases anything hold in the shared memory. + */ +static void +RegisterConnectionCleanup(void) +{ + static bool registeredCleanup = false; + if (registeredCleanup == false) + { + before_shmem_exit(CitusCleanupConnectionsAtExit, 0); + + registeredCleanup = true; + } +} + + +/* + * CitusCleanupConnectionsAtExit is called before_shmem_exit() of the + * backend for the purposes of any clean-up needed. + */ +static void +CitusCleanupConnectionsAtExit(int code, Datum arg) +{ + /* properly close all the cached connections */ + ShutdownAllConnections(); } @@ -930,6 +950,20 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_shared_pool_size", + gettext_noop("Sets the maximum number of connections allowed per worker node " + "across all the backends from this node. Setting to -1 disables " + "connections throttling. Setting to 0 makes it auto-adjust, meaning " + "equal to max_connections on the coordinator."), + gettext_noop("As a rule of thumb, the value should be at most equal to the " + "max_connections on the remote nodes."), + &MaxSharedPoolSize, + 0, -1, INT_MAX, + PGC_SIGHUP, + GUC_SUPERUSER_ONLY, + NULL, NULL, MaxSharedPoolSizeGucShowHook); + DefineCustomIntVariable( "citus.max_worker_nodes_tracked", gettext_noop("Sets the maximum number of worker nodes that are tracked."), @@ -937,9 +971,14 @@ RegisterCitusConfigVariables(void) "health status are tracked in a shared hash table on " "the master node. This configuration value limits the " "size of the hash table, and consequently the maximum " - "number of worker nodes that can be tracked."), + "number of worker nodes that can be tracked." + "Citus keeps some information about the worker nodes " + "in the shared memory for certain optimizations. The " + "optimizations are enforced up to this number of worker " + "nodes. Any additional worker nodes may not benefit from" + "the optimizations."), &MaxWorkerNodesTracked, - 2048, 8, INT_MAX, + 2048, 1024, INT_MAX, PGC_POSTMASTER, GUC_STANDARD, NULL, NULL, NULL); @@ -1013,23 +1052,6 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); - DefineCustomIntVariable( - "citus.max_tracked_worker_nodes", - gettext_noop("Sets the maximum number of worker tracked."), - gettext_noop("Citus doesn't have any limitations in terms of the " - "number of worker nodes allowed in the cluster. But, " - "Citus keeps some information about the worker nodes " - "in the shared memory for certain optimizations. The " - "optimizations are enforced up to this number of worker " - "nodes. Any additional worker nodes may not benefit from" - "the optimizations."), - &MaxTrackedWorkerNodes, - 1024, 256, INT_MAX, - PGC_POSTMASTER, - GUC_STANDARD, - NULL, NULL, NULL); - - DefineCustomIntVariable( "citus.max_running_tasks_per_node", gettext_noop("Sets the maximum number of tasks to run concurrently per node."), @@ -1539,6 +1561,28 @@ NodeConninfoGucAssignHook(const char *newval, void *extra) } +/* + * MaxSharedPoolSizeGucShowHook overrides the value that is shown to the + * user when the default value has not been set. + */ +static const char * +MaxSharedPoolSizeGucShowHook(void) +{ + StringInfo newvalue = makeStringInfo(); + + if (MaxSharedPoolSize == 0) + { + appendStringInfo(newvalue, "%d", GetMaxSharedPoolSize()); + } + else + { + appendStringInfo(newvalue, "%d", MaxSharedPoolSize); + } + + return (const char *) newvalue->data; +} + + static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source) { diff --git a/src/backend/distributed/sql/citus--9.2-4--9.3-2.sql b/src/backend/distributed/sql/citus--9.2-4--9.3-2.sql index f8c3b14f9..31b6af6b0 100644 --- a/src/backend/distributed/sql/citus--9.2-4--9.3-2.sql +++ b/src/backend/distributed/sql/citus--9.2-4--9.3-2.sql @@ -5,3 +5,4 @@ #include "udfs/citus_extradata_container/9.3-2.sql" #include "udfs/update_distributed_table_colocation/9.3-2.sql" #include "udfs/replicate_reference_tables/9.3-2.sql" +#include "udfs/citus_remote_connection_stats/9.3-2.sql" diff --git a/src/backend/distributed/sql/citus--9.3-1--9.3-2.sql b/src/backend/distributed/sql/citus--9.3-1--9.3-2.sql deleted file mode 100644 index d1f8f5185..000000000 --- a/src/backend/distributed/sql/citus--9.3-1--9.3-2.sql +++ /dev/null @@ -1 +0,0 @@ -#include "udfs/citus_remote_connection_stats/9.3-2.sql" diff --git a/src/backend/distributed/sql/udfs/citus_remote_connection_stats/9.3-2.sql b/src/backend/distributed/sql/udfs/citus_remote_connection_stats/9.3-2.sql index 3369c1ac7..bb4dccd87 100644 --- a/src/backend/distributed/sql/udfs/citus_remote_connection_stats/9.3-2.sql +++ b/src/backend/distributed/sql/udfs/citus_remote_connection_stats/9.3-2.sql @@ -1,6 +1,22 @@ -CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) - RETURNS SETOF RECORD - LANGUAGE C STRICT - AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$; - COMMENT ON FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) +CREATE OR REPLACE FUNCTION pg_catalog.citus_remote_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT connection_count_to_node int) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$; + +COMMENT ON FUNCTION pg_catalog.citus_remote_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT connection_count_to_node int) IS 'returns statistics about remote connections'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_remote_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT connection_count_to_node int) +FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_remote_connection_stats/latest.sql b/src/backend/distributed/sql/udfs/citus_remote_connection_stats/latest.sql index 3369c1ac7..bb4dccd87 100644 --- a/src/backend/distributed/sql/udfs/citus_remote_connection_stats/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_remote_connection_stats/latest.sql @@ -1,6 +1,22 @@ -CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) - RETURNS SETOF RECORD - LANGUAGE C STRICT - AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$; - COMMENT ON FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) +CREATE OR REPLACE FUNCTION pg_catalog.citus_remote_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT connection_count_to_node int) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$; + +COMMENT ON FUNCTION pg_catalog.citus_remote_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT connection_count_to_node int) IS 'returns statistics about remote connections'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_remote_connection_stats( + OUT hostname text, + OUT port int, + OUT database_name text, + OUT connection_count_to_node int) +FROM PUBLIC; diff --git a/src/backend/distributed/test/shared_connection_counters.c b/src/backend/distributed/test/shared_connection_counters.c new file mode 100644 index 000000000..e95a2ccbb --- /dev/null +++ b/src/backend/distributed/test/shared_connection_counters.c @@ -0,0 +1,66 @@ +/*------------------------------------------------------------------------- + * + * test/src/sequential_execution.c + * + * This file contains functions to test setting citus.multi_shard_modify_mode + * GUC. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" +#include "fmgr.h" + +#include "distributed/shared_connection_stats.h" +#include "distributed/listutils.h" +#include "nodes/parsenodes.h" +#include "utils/guc.h" + +/* exports for SQL callable functions */ +PG_FUNCTION_INFO_V1(wake_up_connection_pool_waiters); +PG_FUNCTION_INFO_V1(set_max_shared_pool_size); + + +/* + * wake_up_waiters_backends is a SQL + * interface for testing WakeupWaiterBackendsForSharedConnection(). + */ +Datum +wake_up_connection_pool_waiters(PG_FUNCTION_ARGS) +{ + WakeupWaiterBackendsForSharedConnection(); + + PG_RETURN_VOID(); +} + + +/* + * set_max_shared_pool_size is a SQL + * interface for setting MaxSharedPoolSize. We use this function in isolation + * tester where ALTER SYSTEM is not allowed. + */ +Datum +set_max_shared_pool_size(PG_FUNCTION_ARGS) +{ + int value = PG_GETARG_INT32(0); + + AlterSystemStmt *alterSystemStmt = palloc0(sizeof(AlterSystemStmt)); + + A_Const *aConstValue = makeNode(A_Const); + + aConstValue->val = *makeInteger(value); + alterSystemStmt->setstmt = makeNode(VariableSetStmt); + alterSystemStmt->setstmt->name = "citus.max_shared_pool_size"; + alterSystemStmt->setstmt->is_local = false; + alterSystemStmt->setstmt->kind = VAR_SET_VALUE; + alterSystemStmt->setstmt->args = list_make1(aConstValue); + + AlterSystemSetConfigFile(alterSystemStmt); + + kill(PostmasterPid, SIGHUP); + + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 1783d4a73..b6a541807 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -31,6 +31,7 @@ #include "distributed/repartition_join_execution.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" +#include "distributed/shared_connection_stats.h" #include "distributed/subplan_execution.h" #include "distributed/version_compat.h" #include "utils/hsearch.h" diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 015546158..5471bc3cc 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -52,10 +52,29 @@ enum MultiConnectionMode /* open a connection per (co-located set of) placement(s) */ CONNECTION_PER_PLACEMENT = 1 << 3, - OUTSIDE_TRANSACTION = 1 << 4 + OUTSIDE_TRANSACTION = 1 << 4, + + /* + * Some connections are optional such as when adaptive executor is executing + * a multi-shard command and requires the second (or further) connections + * per node. In that case, the connection manager may decide not to allow the + * connection. + */ + OPTIONAL_CONNECTION = 1 << 5, + + /* + * When this flag is passed, via connection throttling, the connection + * establishments may be suspended until a connection slot is available to + * the remote host. + */ + WAIT_FOR_CONNECTION = 1 << 6 }; +/* + * This state is used for keeping track of the initilization + * of the underlying pg_conn struct. + */ typedef enum MultiConnectionState { MULTI_CONNECTION_INITIAL, @@ -65,6 +84,21 @@ typedef enum MultiConnectionState MULTI_CONNECTION_LOST } MultiConnectionState; + +/* + * This state is used for keeping track of the initilization + * of MultiConnection struct, not specifically the underlying + * pg_conn. The state is useful to determine the action during + * clean-up of connections. + */ +typedef enum MultiConnectionStructInitializationState +{ + POOL_STATE_NOT_INITIALIZED, + POOL_STATE_COUNTER_INCREMENTED, + POOL_STATE_INITIALIZED +} MultiConnectionStructInitializationState; + + /* declaring this directly above makes uncrustify go crazy */ typedef enum MultiConnectionMode MultiConnectionMode; @@ -114,6 +148,8 @@ typedef struct MultiConnection /* number of bytes sent to PQputCopyData() since last flush */ uint64 copyBytesWrittenSinceLastFlush; + + MultiConnectionStructInitializationState initilizationState; } MultiConnection; diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 9e9e2102e..1964d50b7 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -11,8 +11,17 @@ #ifndef SHARED_CONNECTION_STATS_H #define SHARED_CONNECTION_STATS_H -extern int MaxTrackedWorkerNodes; +extern int MaxSharedPoolSize; + extern void InitializeSharedConnectionStats(void); +extern void WaitForSharedConnection(void); +extern void WakeupWaiterBackendsForSharedConnection(void); +extern void RemoveInactiveNodesFromSharedConnections(void); +extern int GetMaxSharedPoolSize(void); +extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); +extern void WaitLoopForSharedConnection(const char *hostname, int port); +extern void DecrementSharedConnectionCounter(const char *hostname, int port); +extern void IncrementSharedConnectionCounter(const char *hostname, int port); #endif /* SHARED_CONNECTION_STATS_H */ diff --git a/src/test/regress/expected/ensure_no_shared_connection_leak.out b/src/test/regress/expected/ensure_no_shared_connection_leak.out new file mode 100644 index 000000000..060313c60 --- /dev/null +++ b/src/test/regress/expected/ensure_no_shared_connection_leak.out @@ -0,0 +1,160 @@ +-- this test file is intended to be called at the end +-- of any test schedule, ensuring that there is not +-- leak/wrong calculation of the connection stats +-- in the shared memory +CREATE SCHEMA ensure_no_shared_connection_leak; +SET search_path TO ensure_no_shared_connection_leak; +-- set the cached connections to zero +-- and execute a distributed query so that +-- we end up with zero cached connections afterwards +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- disable deadlock detection and re-trigger 2PC recovery +-- once more when citus.max_cached_conns_per_worker is zero +-- so that we can be sure that the connections established for +-- maintanince daemon is closed properly. +-- this is to prevent random failures in the tests (otherwise, we +-- might see connections established for this operations) +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- now that last 2PC recovery is done, we're good to disable it +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +CREATE TABLE test (a int); +SELECT create_distributed_table('test', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- in case of MX, we should prevent deadlock detection and +-- 2PC recover from the workers as well +\c - - - :worker_1_port +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :master_port +SET search_path TO ensure_no_shared_connection_leak; +-- ensure that we only have at most citus.max_cached_conns_per_worker +-- connections per node +select + (connection_count_to_node = 0) as no_connection_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY 1; + no_connection_to_node +--------------------------------------------------------------------- + t + t +(2 rows) + +-- now, ensure this from the workers perspective +-- we should only see the connection/backend that is running the command below +SELECT + result, success +FROM + run_command_on_workers($$select count(*) from pg_stat_activity WHERE backend_type = 'client backend';$$) +ORDER BY 1, 2; + result | success +--------------------------------------------------------------------- + 1 | t + 1 | t +(2 rows) + +-- in case other tests relies on these setting, reset them +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.recover_2pc_interval; +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +DROP SCHEMA ensure_no_shared_connection_leak CASCADE; +NOTICE: drop cascades to table test diff --git a/src/test/regress/expected/shared_connection_stats.out b/src/test/regress/expected/shared_connection_stats.out new file mode 100644 index 000000000..229a00aff --- /dev/null +++ b/src/test/regress/expected/shared_connection_stats.out @@ -0,0 +1,391 @@ +CREATE SCHEMA shared_connection_stats; +SET search_path TO shared_connection_stats; +-- set the cached connections to zero +-- and execute a distributed query so that +-- we end up with zero cached connections afterwards +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- disable deadlock detection and re-trigger 2PC recovery +-- once more when citus.max_cached_conns_per_worker is zero +-- so that we can be sure that the connections established for +-- maintanince daemon is closed properly. +-- this is to prevent random failures in the tests (otherwise, we +-- might see connections established for this operations) +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- now that last 2PC recovery is done, we're good to disable it +ALTER SYSTEM SET citus.recover_2pc_interval TO '1h'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; +CREATE TABLE test (a int); +SELECT create_distributed_table('test', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test SELECT i FROM generate_series(0,100)i; +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +-- single shard queries require single connection per node +BEGIN; + SELECT count(*) FROM test WHERE a = 1; + count +--------------------------------------------------------------------- + 1 +(1 row) + + SELECT count(*) FROM test WHERE a = 2; + count +--------------------------------------------------------------------- + 1 +(1 row) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +COMMIT; +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +-- executor is only allowed to establish a single connection per node +BEGIN; + SET LOCAL citus.max_adaptive_executor_pool_size TO 1; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +COMMIT; +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +-- sequential mode is allowed to establish a single connection per node +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +COMMIT; +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +-- now, decrease the shared pool size, and still force +-- one connection per placement +ALTER SYSTEM SET citus.max_shared_pool_size TO 5; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +BEGIN; + SET LOCAL citus.node_connection_timeout TO 1000; + SET LOCAL citus.force_max_query_parallelization TO ON; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + +COMMIT; +-- pg_sleep forces almost 1 connection per placement +-- now, some of the optional connections would be skipped, +-- and only 5 connections are used per node +BEGIN; + SELECT count(*), pg_sleep(0.1) FROM test; + count | pg_sleep +--------------------------------------------------------------------- + 101 | +(1 row) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 5 + 5 +(2 rows) + +COMMIT; +SHOW citus.max_shared_pool_size; + citus.max_shared_pool_size +--------------------------------------------------------------------- + 5 +(1 row) + +-- by default max_shared_pool_size equals to max_connections; +ALTER SYSTEM RESET citus.max_shared_pool_size; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SHOW citus.max_shared_pool_size; + citus.max_shared_pool_size +--------------------------------------------------------------------- + 100 +(1 row) + +SHOW max_connections; + max_connections +--------------------------------------------------------------------- + 100 +(1 row) + +-- now, each node gets 16 connections as we force 1 connection per placement +BEGIN; + SET LOCAL citus.force_max_query_parallelization TO ON; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 16 + 16 +(2 rows) + +COMMIT; +BEGIN; + -- now allow at most 1 connection, and ensure that intermediate + -- results don't require any extra connections + SET LOCAL citus.max_adaptive_executor_pool_size TO 1; + SET LOCAL citus.task_assignment_policy TO "round-robin"; + SELECT cnt FROM (SELECT count(*) as cnt, random() FROM test LIMIT 1) as foo; + cnt +--------------------------------------------------------------------- + 101 +(1 row) + + -- queries with intermediate results don't use any extra connections + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +COMMIT; +-- now show that when max_cached_conns_per_worker > 1 +-- Citus forces the first execution to open at least 2 +-- connections that are cached. Later, that 2 cached +-- connections are user +BEGIN; + SET LOCAL citus.max_cached_conns_per_worker TO 2; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + SELECT + connection_count_to_node >= 2 + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + ?column? +--------------------------------------------------------------------- + t + t +(2 rows) + + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + SELECT + connection_count_to_node >= 2 + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + ?column? +--------------------------------------------------------------------- + t + t +(2 rows) + +COMMIT; +-- in case other tests relies on these setting, reset them +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.recover_2pc_interval; +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +DROP SCHEMA shared_connection_stats CASCADE; +NOTICE: drop cascades to table test diff --git a/src/test/regress/expected/shared_connection_waits.out b/src/test/regress/expected/shared_connection_waits.out new file mode 100644 index 000000000..b42419d48 --- /dev/null +++ b/src/test/regress/expected/shared_connection_waits.out @@ -0,0 +1,33 @@ +Parsed test spec with 3 sessions + +starting permutation: s3-lower-pool-size s1-begin s1-count-slow s3-increase-pool-size s2-select s1-commit +step s3-lower-pool-size: + SELECT set_max_shared_pool_size(5); + +set_max_shared_pool_size + + +step s1-begin: + BEGIN; + +step s1-count-slow: + SELECT pg_sleep(0.1), count(*) FROM test; + +pg_sleep count + + 101 +step s3-increase-pool-size: + SELECT set_max_shared_pool_size(100); + +set_max_shared_pool_size + + +step s2-select: + SELECT count(*) FROM test; + +count + +101 +step s1-commit: + COMMIT; + diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index 4d6236858..e7d183399 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -37,3 +37,9 @@ test: failure_connection_establishment # test that no tests leaked intermediate results. This should always be last test: ensure_no_intermediate_data_leak + +# --------- +# ensures that we never leak any connection counts +# in the shared memory +# -------- +test: ensure_no_shared_connection_leak diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 91f0a8065..4451812e5 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -62,6 +62,7 @@ test: isolation_validate_vs_insert test: isolation_insert_select_conflict test: isolation_ref2ref_foreign_keys test: isolation_multiuser_locking +test: shared_connection_waits # MX tests test: isolation_reference_on_mx diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 97c8ffdd4..8c6f81536 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -50,3 +50,9 @@ test: locally_execute_intermediate_results # test that no tests leaked intermediate results. This should always be last test: ensure_no_intermediate_data_leak + +# --------- +# ensures that we never leak any connection counts +# in the shared memory +# -------- +test: ensure_no_shared_connection_leak diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 71814636e..394652f6f 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -341,7 +341,19 @@ test: distributed_procedure # --------- test: multi_deparse_function multi_deparse_procedure +# -------- +# cannot be run in parallel with any other tests as it checks +# statistics across sessions +# -------- +test: shared_connection_stats + # --------- # test that no tests leaked intermediate results. This should always be last # --------- test: ensure_no_intermediate_data_leak + +# --------- +# ensures that we never leak any connection counts +# in the shared memory +# -------- +test: ensure_no_shared_connection_leak diff --git a/src/test/regress/multi_task_tracker_extra_schedule b/src/test/regress/multi_task_tracker_extra_schedule index 2e65ee286..e5f155cc1 100644 --- a/src/test/regress/multi_task_tracker_extra_schedule +++ b/src/test/regress/multi_task_tracker_extra_schedule @@ -115,3 +115,9 @@ test: multi_schema_support # test that no tests leaked intermediate results. This should always be last # ---------- test: ensure_no_intermediate_data_leak + +# --------- +# ensures that we never leak any connection counts +# in the shared memory +# -------- +test: ensure_no_shared_connection_leak diff --git a/src/test/regress/spec/shared_connection_waits.spec b/src/test/regress/spec/shared_connection_waits.spec new file mode 100644 index 000000000..80a8cb9e3 --- /dev/null +++ b/src/test/regress/spec/shared_connection_waits.spec @@ -0,0 +1,65 @@ +setup +{ + CREATE OR REPLACE FUNCTION wake_up_connection_pool_waiters() + RETURNS void + LANGUAGE C STABLE STRICT + AS 'citus', $$wake_up_connection_pool_waiters$$; + + CREATE OR REPLACE FUNCTION set_max_shared_pool_size(int) + RETURNS void + LANGUAGE C STABLE STRICT + AS 'citus', $$set_max_shared_pool_size$$; + + CREATE TABLE test (a int, b int); + SET citus.shard_count TO 32; + SELECT create_distributed_table('test', 'a'); + INSERT INTO test SELECT i, i FROM generate_series(0,100)i; +} + +teardown +{ + + SELECT set_max_shared_pool_size(100); + DROP FUNCTION wake_up_connection_pool_waiters(); + DROP FUNCTION set_max_shared_pool_size(int); +} + +session "s1" + + +step "s1-begin" +{ + BEGIN; +} + +step "s1-count-slow" +{ + SELECT pg_sleep(0.1), count(*) FROM test; +} + +step "s1-commit" +{ + COMMIT; +} + +session "s2" + +step "s2-select" +{ + SELECT count(*) FROM test; +} + +session "s3" + +step "s3-lower-pool-size" +{ + SELECT set_max_shared_pool_size(5); +} + +step "s3-increase-pool-size" +{ + SELECT set_max_shared_pool_size(100); +} + +permutation "s3-lower-pool-size" "s1-begin" "s1-count-slow" "s3-increase-pool-size" "s2-select" "s1-commit" + diff --git a/src/test/regress/sql/ensure_no_shared_connection_leak.sql b/src/test/regress/sql/ensure_no_shared_connection_leak.sql new file mode 100644 index 000000000..df4847cfb --- /dev/null +++ b/src/test/regress/sql/ensure_no_shared_connection_leak.sql @@ -0,0 +1,83 @@ +-- this test file is intended to be called at the end +-- of any test schedule, ensuring that there is not +-- leak/wrong calculation of the connection stats +-- in the shared memory +CREATE SCHEMA ensure_no_shared_connection_leak; +SET search_path TO ensure_no_shared_connection_leak; + +-- set the cached connections to zero +-- and execute a distributed query so that +-- we end up with zero cached connections afterwards +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + +-- disable deadlock detection and re-trigger 2PC recovery +-- once more when citus.max_cached_conns_per_worker is zero +-- so that we can be sure that the connections established for +-- maintanince daemon is closed properly. +-- this is to prevent random failures in the tests (otherwise, we +-- might see connections established for this operations) +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +-- now that last 2PC recovery is done, we're good to disable it +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); + +CREATE TABLE test (a int); +SELECT create_distributed_table('test', 'a'); +SELECT count(*) FROM test; + +-- in case of MX, we should prevent deadlock detection and +-- 2PC recover from the workers as well +\c - - - :worker_1_port +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); +\c - - - :worker_2_port +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); +ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; +SELECT pg_reload_conf(); + +\c - - - :master_port +SET search_path TO ensure_no_shared_connection_leak; + +-- ensure that we only have at most citus.max_cached_conns_per_worker +-- connections per node +select + (connection_count_to_node = 0) as no_connection_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY 1; + +-- now, ensure this from the workers perspective +-- we should only see the connection/backend that is running the command below +SELECT + result, success +FROM + run_command_on_workers($$select count(*) from pg_stat_activity WHERE backend_type = 'client backend';$$) +ORDER BY 1, 2; + + +-- in case other tests relies on these setting, reset them +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.recover_2pc_interval; +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +SELECT pg_reload_conf(); + +DROP SCHEMA ensure_no_shared_connection_leak CASCADE; diff --git a/src/test/regress/sql/node_conninfo_reload.sql b/src/test/regress/sql/node_conninfo_reload.sql index dda8dae21..3754130a7 100644 --- a/src/test/regress/sql/node_conninfo_reload.sql +++ b/src/test/regress/sql/node_conninfo_reload.sql @@ -10,7 +10,6 @@ select create_distributed_table('test', 'a'); -- Make sure a connection is opened and cached select count(*) from test where a = 0; - show citus.node_conninfo; -- Set sslmode to something that does not work when connecting @@ -30,8 +29,8 @@ show citus.node_conninfo; -- Should work again select count(*) from test where a = 0; - ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist'; + BEGIN; -- Should still work (no SIGHUP yet); select count(*) from test where a = 0; @@ -43,19 +42,15 @@ show citus.node_conninfo; -- query select count(*) from test where a = 0; COMMIT; - -- Should fail now with connection error, when transaction is finished select count(*) from test where a = 0; - -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; select pg_reload_conf(); select pg_sleep(0.1); -- wait for config reload to apply show citus.node_conninfo; - -- Should work again select count(*) from test where a = 0; - ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist'; BEGIN; -- Should still work (no SIGHUP yet); @@ -68,10 +63,8 @@ show citus.node_conninfo; -- query select count(*) from test where a = 0; COMMIT; - -- Should fail now, when transaction is finished select count(*) from test where a = 0; - -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; select pg_reload_conf(); diff --git a/src/test/regress/sql/shared_connection_stats.sql b/src/test/regress/sql/shared_connection_stats.sql new file mode 100644 index 000000000..8200e416d --- /dev/null +++ b/src/test/regress/sql/shared_connection_stats.sql @@ -0,0 +1,228 @@ +CREATE SCHEMA shared_connection_stats; +SET search_path TO shared_connection_stats; + +-- set the cached connections to zero +-- and execute a distributed query so that +-- we end up with zero cached connections afterwards +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + +-- disable deadlock detection and re-trigger 2PC recovery +-- once more when citus.max_cached_conns_per_worker is zero +-- so that we can be sure that the connections established for +-- maintanince daemon is closed properly. +-- this is to prevent random failures in the tests (otherwise, we +-- might see connections established for this operations) +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +-- now that last 2PC recovery is done, we're good to disable it +ALTER SYSTEM SET citus.recover_2pc_interval TO '1h'; +SELECT pg_reload_conf(); + +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; +CREATE TABLE test (a int); +SELECT create_distributed_table('test', 'a'); +INSERT INTO test SELECT i FROM generate_series(0,100)i; + +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + +-- single shard queries require single connection per node +BEGIN; + SELECT count(*) FROM test WHERE a = 1; + SELECT count(*) FROM test WHERE a = 2; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + +-- executor is only allowed to establish a single connection per node +BEGIN; + SET LOCAL citus.max_adaptive_executor_pool_size TO 1; + SELECT count(*) FROM test; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + +-- sequential mode is allowed to establish a single connection per node +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT count(*) FROM test; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + +-- now, decrease the shared pool size, and still force +-- one connection per placement +ALTER SYSTEM SET citus.max_shared_pool_size TO 5; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +BEGIN; + SET LOCAL citus.node_connection_timeout TO 1000; + SET LOCAL citus.force_max_query_parallelization TO ON; + SELECT count(*) FROM test; +COMMIT; + +-- pg_sleep forces almost 1 connection per placement +-- now, some of the optional connections would be skipped, +-- and only 5 connections are used per node +BEGIN; + SELECT count(*), pg_sleep(0.1) FROM test; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + + +SHOW citus.max_shared_pool_size; + +-- by default max_shared_pool_size equals to max_connections; +ALTER SYSTEM RESET citus.max_shared_pool_size; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +SHOW citus.max_shared_pool_size; +SHOW max_connections; + +-- now, each node gets 16 connections as we force 1 connection per placement +BEGIN; + SET LOCAL citus.force_max_query_parallelization TO ON; + SELECT count(*) FROM test; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + +BEGIN; + -- now allow at most 1 connection, and ensure that intermediate + -- results don't require any extra connections + SET LOCAL citus.max_adaptive_executor_pool_size TO 1; + SET LOCAL citus.task_assignment_policy TO "round-robin"; + SELECT cnt FROM (SELECT count(*) as cnt, random() FROM test LIMIT 1) as foo; + + -- queries with intermediate results don't use any extra connections + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + + +-- now show that when max_cached_conns_per_worker > 1 +-- Citus forces the first execution to open at least 2 +-- connections that are cached. Later, that 2 cached +-- connections are user +BEGIN; + SET LOCAL citus.max_cached_conns_per_worker TO 2; + SELECT count(*) FROM test; + SELECT + connection_count_to_node >= 2 + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + SELECT count(*) FROM test; + SELECT + connection_count_to_node >= 2 + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + +-- in case other tests relies on these setting, reset them +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.recover_2pc_interval; +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +SELECT pg_reload_conf(); + +DROP SCHEMA shared_connection_stats CASCADE;