diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 6066ef4bf..6e39c8589 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -61,8 +61,8 @@ static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); -static bool ShouldShutdownConnection(MultiConnection *connection, const int - cachedConnectionCount); +static bool ShouldShutdownConnection(MultiConnection *connection, + const int cachedConnectionCount); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); @@ -427,10 +427,14 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, ResetShardPlacementAssociation(connection); - if ((flags & REQUIRE_METADATA_CONNECTION)) + if (flags & REQUIRE_METADATA_CONNECTION) { connection->useForMetadataOperations = true; - } + } + else if (flags & REQUIRE_MAINTENANCE_CONNECTION) + { + connection->useForMaintenanceOperations = true; + } /* fully initialized the connection, record it */ connection->initializationState = POOL_STATE_INITIALIZED; @@ -1194,7 +1198,10 @@ CitusPQFinish(MultiConnection *connection) /* behave idempotently, there is no gurantee that CitusPQFinish() is called once */ if (connection->initializationState >= POOL_STATE_COUNTER_INCREMENTED) { - DecrementSharedConnectionCounter(connection->hostname, connection->port); + int sharedCounterFlags = (connection->useForMaintenanceOperations) + ? MAINTENANCE_CONNECTION + : 0; + DecrementSharedConnectionCounter(sharedCounterFlags, connection->hostname, connection->port); connection->initializationState = POOL_STATE_NOT_INITIALIZED; } } diff --git a/src/backend/distributed/connection/locally_reserved_shared_connections.c b/src/backend/distributed/connection/locally_reserved_shared_connections.c index a2d2fac24..dc31ad080 100644 --- a/src/backend/distributed/connection/locally_reserved_shared_connections.c +++ b/src/backend/distributed/connection/locally_reserved_shared_connections.c @@ -240,7 +240,8 @@ DeallocateReservedConnections(void) * We have not used this reservation, make sure to clean-up from * the shared memory as well. */ - DecrementSharedConnectionCounter(entry->key.hostname, entry->key.port); + int sharedCounterFlags = 0; + DecrementSharedConnectionCounter(sharedCounterFlags, entry->key.hostname, entry->key.port); /* for completeness, set it to true */ entry->usedReservation = true; diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 4ea94abdb..55df40680 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -84,7 +84,8 @@ typedef struct SharedWorkerNodeConnStatsHashEntry { SharedWorkerNodeConnStatsHashKey key; - int count; + int regularConnectionsCount; + int maintenanceConnectionsCount; } SharedWorkerNodeConnStatsHashEntry; /* hash entry for per database on worker stats */ @@ -141,9 +142,7 @@ static uint32 SharedConnectionHashHash(const void *key, Size keysize); static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize); static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize); -static bool IsConnectionToLocalNode(SharedWorkerNodeConnStatsHashKey *connKey); -static bool isConnectionSlotAvailable(uint32 flags, SharedWorkerNodeConnStatsHashKey *connKey, - const SharedWorkerNodeConnStatsHashEntry *connectionEntry); +static bool isConnectionThrottlingDisabled(); static bool IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool checkLimits, const char *hostname, int port, Oid database); @@ -152,7 +151,7 @@ static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey int port, Oid database); static void -DecrementSharedConnectionCounterInternal(const char *hostname, int port); +DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostname, int port); PG_FUNCTION_INFO_V1(citus_remote_connection_stats); @@ -316,11 +315,10 @@ WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port) bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) { - if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) - { - /* connection throttling disabled */ - return true; - } + if (isConnectionThrottlingDisabled()) + { + return true; + } /* * The local session might already have some reserved connections to the given @@ -334,7 +332,11 @@ TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int po return true; } - return IncrementSharedConnectionCounterInternal(flags, true, hostname, port, MyDatabaseId); + return IncrementSharedConnectionCounterInternal(flags, + true, + hostname, + port, + MyDatabaseId); } @@ -345,13 +347,16 @@ TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int po void IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) { - if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING) + if (isConnectionThrottlingDisabled()) { - /* connection throttling disabled */ return; } - IncrementSharedConnectionCounterInternal(flags, false, hostname, port, MyDatabaseId); + IncrementSharedConnectionCounterInternal(flags, + false, + hostname, + port, + MyDatabaseId); } @@ -392,7 +397,8 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags, if (!workerNodeEntryFound) { /* we successfully allocated the entry for the first time, so initialize it */ - workerNodeConnectionEntry->count = 0; + workerNodeConnectionEntry->regularConnectionsCount = 0; + workerNodeConnectionEntry->maintenanceConnectionsCount = 0; } /* Initialize SharedWorkerNodeDatabaseConnStatsHash the same way */ @@ -418,15 +424,57 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags, /* Increment counter if a slot available */ bool connectionSlotAvailable = true; - connectionSlotAvailable = - !checkLimits || - isConnectionSlotAvailable(externalFlags, - &workerNodeKey, - workerNodeConnectionEntry); + + /* When GetSharedPoolSizeMaintenanceQuota() == 0, treat maintenance connections as regular */ + bool maintenanceConnection = (GetSharedPoolSizeMaintenanceQuota() > 0 && (externalFlags & MAINTENANCE_CONNECTION)); + if (checkLimits) + { + WorkerNode *workerNode = FindWorkerNode(hostname, port); + bool connectionToLocalNode = workerNode && (workerNode->groupId == GetLocalGroupId()); + int currentConnectionsLimit = connectionToLocalNode + ? GetLocalSharedPoolSize() + : GetMaxSharedPoolSize(); + int maintenanceQuota = (int) ceil((double) currentConnectionsLimit * GetSharedPoolSizeMaintenanceQuota()); + /* Connections limit should never go below 1 */ + currentConnectionsLimit = Max(maintenanceConnection + ? maintenanceQuota + : currentConnectionsLimit - maintenanceQuota, 1); + int currentConnectionsCount = maintenanceConnection + ? workerNodeConnectionEntry->maintenanceConnectionsCount + : workerNodeConnectionEntry->regularConnectionsCount; + bool remoteNodeLimitExceeded = currentConnectionsCount + 1 > currentConnectionsLimit; + /* + * For local nodes, solely relying on citus.max_shared_pool_size or + * max_connections might not be sufficient. The former gives us + * a preview of the future (e.g., we let the new connections to establish, + * but they are not established yet). The latter gives us the close to + * precise view of the past (e.g., the active number of client backends). + * + * Overall, we want to limit both of the metrics. The former limit typically + * kicks in under regular loads, where the load of the database increases in + * a reasonable pace. The latter limit typically kicks in when the database + * is issued lots of concurrent sessions at the same time, such as benchmarks. + */ + bool localNodeLimitExceeded = + connectionToLocalNode && + (GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES || + GetExternalClientBackendCount() + 1 > currentConnectionsLimit); + if (remoteNodeLimitExceeded || localNodeLimitExceeded) + { + connectionSlotAvailable = false; + } + } if (connectionSlotAvailable) { - workerNodeConnectionEntry->count += 1; + if (maintenanceConnection) + { + workerNodeConnectionEntry->maintenanceConnectionsCount += 1; + } + else + { + workerNodeConnectionEntry->regularConnectionsCount += 1; + } workerNodeDatabaseEntry->count += 1; } @@ -435,86 +483,29 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags, return connectionSlotAvailable; } -static bool IsConnectionToLocalNode(SharedWorkerNodeConnStatsHashKey *connKey) -{ - WorkerNode *workerNode = FindWorkerNode(connKey->hostname, connKey->port); - return workerNode && (workerNode->groupId == GetLocalGroupId()); -} - - -static bool isConnectionSlotAvailable(uint32 flags, - SharedWorkerNodeConnStatsHashKey *connKey, - const SharedWorkerNodeConnStatsHashEntry *connectionEntry) -{ - bool connectionSlotAvailable = true; - bool connectionToLocalNode = IsConnectionToLocalNode(connKey); - /* - * Use full capacity for maintenance connections, - */ - int maintenanceConnectionsQuota = - (flags & MAINTENANCE_CONNECTION) - ? 0 - : (int) floor((double) GetMaxSharedPoolSize() * GetSharedPoolSizeMaintenanceQuota()); - if (connectionToLocalNode) - { - bool remoteConnectionsForLocalQueriesDisabled = - GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES; - /* - * For local nodes, solely relying on citus.max_shared_pool_size or - * max_connections might not be sufficient. The former gives us - * a preview of the future (e.g., we let the new connections to establish, - * but they are not established yet). The latter gives us the close to - * precise view of the past (e.g., the active number of client backends). - * - * Overall, we want to limit both of the metrics. The former limit typically - * kicks in under regular loads, where the load of the database increases in - * a reasonable pace. The latter limit typically kicks in when the database - * is issued lots of concurrent sessions at the same time, such as benchmarks. - */ - bool localConnectionLimitExceeded = - GetExternalClientBackendCount() + 1 > GetLocalSharedPoolSize() || - connectionEntry->count + 1 > GetLocalSharedPoolSize(); - if (remoteConnectionsForLocalQueriesDisabled || localConnectionLimitExceeded) - { - connectionSlotAvailable = false; - } - } - else if (connectionEntry->count + 1 > (GetMaxSharedPoolSize() - maintenanceConnectionsQuota)) - { - connectionSlotAvailable = false; - } - return connectionSlotAvailable; -} - - /* * DecrementSharedConnectionCounter decrements the shared counter * for the given hostname and port for the given count. */ void -DecrementSharedConnectionCounter(const char *hostname, int port) +DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int port) { - /* - * Do not call GetMaxSharedPoolSize() here, since it may read from - * the catalog and we may be in the process exit handler. - */ - if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING) + if (isConnectionThrottlingDisabled()) { - /* connection throttling disabled */ return; } LockConnectionSharedMemory(LW_EXCLUSIVE); - DecrementSharedConnectionCounterInternal(hostname, port); + DecrementSharedConnectionCounterInternal(externalFlags, hostname, port); UnLockConnectionSharedMemory(); WakeupWaiterBackendsForSharedConnection(); } static void -DecrementSharedConnectionCounterInternal(const char *hostname, int port) +DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostname, int port) { bool workerNodeEntryFound = false; SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); @@ -530,10 +521,17 @@ DecrementSharedConnectionCounterInternal(const char *hostname, int port) } /* we should never go below 0 */ - Assert(workerNodeEntry->count > 0); + Assert(workerNodeEntry->regularConnectionsCount > 0 || workerNodeEntry->maintenanceConnectionsCount > 0); - - workerNodeEntry->count -= 1; + /* When GetSharedPoolSizeMaintenanceQuota() == 0, treat maintenance connections as regular */ + if ((GetSharedPoolSizeMaintenanceQuota() > 0 && (externalFlags & MAINTENANCE_CONNECTION))) + { + workerNodeEntry->maintenanceConnectionsCount -= 1; + } + else + { + workerNodeEntry->regularConnectionsCount -= 1; + } /* @@ -543,7 +541,7 @@ DecrementSharedConnectionCounterInternal(const char *hostname, int port) * not busy, and given the default value of MaxCachedConnectionsPerWorker = 1, * we're unlikely to trigger this often. */ - if (workerNodeEntry->count == 0) + if (workerNodeEntry->regularConnectionsCount == 0 && workerNodeEntry->maintenanceConnectionsCount == 0) { hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_REMOVE, NULL); } @@ -920,3 +918,12 @@ SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize) ca->workerNodeKey.port != cb->workerNodeKey.port || ca->database != cb->database; } + +static bool isConnectionThrottlingDisabled() +{ + /* + * Do not call Get*PoolSize() functions here, since it may read from + * the catalog and we may be in the process exit handler. + */ + return MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING; +} diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 594e9f23f..32eb28b94 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -39,7 +39,7 @@ /* Config variables managed via guc.c */ char *WorkerListFileName; int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size */ -int DatabasesPerWorker = 10; /* determine database per worker hash table size */ +int DatabasesPerWorker = 1; /* determine database per worker hash table size */ /* Local functions forward declarations */ diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 653b962db..bc960796d 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -159,7 +159,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode) bool recoveryFailed = false; - int connectionFlags = 0; + int connectionFlags = WAIT_FOR_CONNECTION | REQUIRE_MAINTENANCE_CONNECTION; MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK) { diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 487b5f12f..91e1e9222 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -127,8 +127,8 @@ enum MultiConnectionMode /* * This flag specifies that connection is required for maintenance operations, e.g. - * transaction recovery, distributed deadlock detection. Such connections may have - * special treatment, like dedicated share of pool, etc. + * transaction recovery, distributed deadlock detection. Such connections have + * a reserved quota of the MaxSharedPoolSize. */ REQUIRE_MAINTENANCE_CONNECTION = 1 << 9 }; @@ -230,6 +230,9 @@ typedef struct MultiConnection /* replication option */ bool requiresReplication; + /* See REQUIRE_MAINTENANCE_CONNECTION */ + bool useForMaintenanceOperations; + MultiConnectionStructInitializationState initializationState; } MultiConnection; diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 33e1e31b4..817f80b54 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -41,7 +41,7 @@ extern double GetSharedPoolSizeMaintenanceQuota(void); extern int GetLocalSharedPoolSize(void); extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port); extern void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port); -extern void DecrementSharedConnectionCounter(const char *hostname, int port); +extern void DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int port); extern void IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port); extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode, int activeConnectionCount); diff --git a/src/test/regress/expected/shared_connection_stats.out b/src/test/regress/expected/shared_connection_stats.out index 0ce22548f..91d35db5d 100644 --- a/src/test/regress/expected/shared_connection_stats.out +++ b/src/test/regress/expected/shared_connection_stats.out @@ -224,7 +224,7 @@ BEGIN; COMMIT; -- pg_sleep forces almost 1 connection per placement -- now, some of the optional connections would be skipped, --- and only 5 connections are used per node +-- and only 4 connections (5 minus the maintenance quota) are used per node BEGIN; SET LOCAL citus.max_adaptive_executor_pool_size TO 16; with cte_1 as (select pg_sleep(0.1) is null, a from test) SELECT a from cte_1 ORDER By 1 LIMIT 1; @@ -244,8 +244,8 @@ BEGIN; hostname, port; connection_count_to_node --------------------------------------------------------------------- - 5 - 5 + 4 + 4 (2 rows) COMMIT; @@ -382,8 +382,8 @@ COPY test FROM PROGRAM 'seq 32'; hostname, port; connection_count_to_node --------------------------------------------------------------------- - 3 - 3 + 2 + 2 (2 rows) ROLLBACK; @@ -404,7 +404,7 @@ BEGIN; hostname, port; connection_count_to_node --------------------------------------------------------------------- - 3 + 2 1 (2 rows) @@ -423,7 +423,7 @@ COPY test FROM STDIN; hostname, port; connection_count_to_node --------------------------------------------------------------------- - 3 + 2 1 (2 rows) @@ -450,7 +450,7 @@ BEGIN; hostname, port; connection_count_to_node --------------------------------------------------------------------- - 3 + 2 (1 row) -- in this second COPY, we access the same node but different shards @@ -468,7 +468,7 @@ COPY test FROM STDIN; hostname, port; connection_count_to_node --------------------------------------------------------------------- - 3 + 2 1 (2 rows) diff --git a/src/test/regress/sql/shared_connection_stats.sql b/src/test/regress/sql/shared_connection_stats.sql index 7c653e788..7c040af5c 100644 --- a/src/test/regress/sql/shared_connection_stats.sql +++ b/src/test/regress/sql/shared_connection_stats.sql @@ -146,7 +146,7 @@ COMMIT; -- pg_sleep forces almost 1 connection per placement -- now, some of the optional connections would be skipped, --- and only 5 connections are used per node +-- and only 4 connections (5 minus the maintenance quota) are used per node BEGIN; SET LOCAL citus.max_adaptive_executor_pool_size TO 16; with cte_1 as (select pg_sleep(0.1) is null, a from test) SELECT a from cte_1 ORDER By 1 LIMIT 1;