Adapt implementation to changing MaintenanceQuota on a fly

pull/7286/head
ivyazmitinov 2023-10-29 01:40:38 +07:00
parent 481aa99205
commit b3bfca9ac6
1 changed files with 21 additions and 10 deletions

View File

@ -425,8 +425,7 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags,
/* Increment counter if a slot available */ /* Increment counter if a slot available */
bool connectionSlotAvailable = true; bool connectionSlotAvailable = true;
/* When GetSharedPoolSizeMaintenanceQuota() == 0, treat maintenance connections as regular */ bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION;
bool maintenanceConnection = (GetSharedPoolSizeMaintenanceQuota() > 0 && (externalFlags & MAINTENANCE_CONNECTION));
if (checkLimits) if (checkLimits)
{ {
WorkerNode *workerNode = FindWorkerNode(hostname, port); WorkerNode *workerNode = FindWorkerNode(hostname, port);
@ -434,15 +433,28 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags,
int currentConnectionsLimit = connectionToLocalNode int currentConnectionsLimit = connectionToLocalNode
? GetLocalSharedPoolSize() ? GetLocalSharedPoolSize()
: GetMaxSharedPoolSize(); : GetMaxSharedPoolSize();
int currentConnectionsCount;
if (GetSharedPoolSizeMaintenanceQuota() > 0)
{
int maintenanceQuota = (int) ceil((double) currentConnectionsLimit * GetSharedPoolSizeMaintenanceQuota()); int maintenanceQuota = (int) ceil((double) currentConnectionsLimit * GetSharedPoolSizeMaintenanceQuota());
/* Connections limit should never go below 1 */ /* Connections limit should never go below 1 */
currentConnectionsLimit = Max(maintenanceConnection currentConnectionsLimit = Max(maintenanceConnection
? maintenanceQuota ? maintenanceQuota
: currentConnectionsLimit - maintenanceQuota, 1); : currentConnectionsLimit - maintenanceQuota, 1);
int currentConnectionsCount = maintenanceConnection currentConnectionsCount = maintenanceConnection
? workerNodeConnectionEntry->maintenanceConnectionsCount ? workerNodeConnectionEntry->maintenanceConnectionsCount
: workerNodeConnectionEntry->regularConnectionsCount; : workerNodeConnectionEntry->regularConnectionsCount;
}
else
{
/* When maintenance quota disabled, all connections treated equally*/
currentConnectionsCount = (workerNodeConnectionEntry->maintenanceConnectionsCount +
workerNodeConnectionEntry->regularConnectionsCount);
}
bool remoteNodeLimitExceeded = currentConnectionsCount + 1 > currentConnectionsLimit; bool remoteNodeLimitExceeded = currentConnectionsCount + 1 > currentConnectionsLimit;
/* /*
* For local nodes, solely relying on citus.max_shared_pool_size or * For local nodes, solely relying on citus.max_shared_pool_size or
* max_connections might not be sufficient. The former gives us * max_connections might not be sufficient. The former gives us
@ -543,8 +555,7 @@ DecrementSharedConnectionCounterInternal(uint32 externalFlags,
Assert(workerNodeConnectionEntry->regularConnectionsCount > 0 || Assert(workerNodeConnectionEntry->regularConnectionsCount > 0 ||
workerNodeConnectionEntry->maintenanceConnectionsCount > 0); workerNodeConnectionEntry->maintenanceConnectionsCount > 0);
/* When GetSharedPoolSizeMaintenanceQuota() == 0, treat maintenance connections as regular */ if (externalFlags & MAINTENANCE_CONNECTION)
if ((GetSharedPoolSizeMaintenanceQuota() > 0 && (externalFlags & MAINTENANCE_CONNECTION)))
{ {
workerNodeConnectionEntry->maintenanceConnectionsCount -= 1; workerNodeConnectionEntry->maintenanceConnectionsCount -= 1;
} }