Implementation of a dedicated maintenance quota

pull/7286/head
ivyazmitinov 2023-10-17 21:08:57 +07:00
parent 4d775ab361
commit 76d10cc413
9 changed files with 125 additions and 107 deletions

View File

@ -61,8 +61,8 @@ static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32
static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections); static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections);
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, const int static bool ShouldShutdownConnection(MultiConnection *connection,
cachedConnectionCount); const int cachedConnectionCount);
static bool RemoteTransactionIdle(MultiConnection *connection); static bool RemoteTransactionIdle(MultiConnection *connection);
static int EventSetSizeForConnectionList(List *connections); static int EventSetSizeForConnectionList(List *connections);
@ -427,10 +427,14 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
ResetShardPlacementAssociation(connection); ResetShardPlacementAssociation(connection);
if ((flags & REQUIRE_METADATA_CONNECTION)) if (flags & REQUIRE_METADATA_CONNECTION)
{ {
connection->useForMetadataOperations = true; connection->useForMetadataOperations = true;
} }
else if (flags & REQUIRE_MAINTENANCE_CONNECTION)
{
connection->useForMaintenanceOperations = true;
}
/* fully initialized the connection, record it */ /* fully initialized the connection, record it */
connection->initializationState = POOL_STATE_INITIALIZED; connection->initializationState = POOL_STATE_INITIALIZED;
@ -1194,7 +1198,10 @@ CitusPQFinish(MultiConnection *connection)
/* behave idempotently, there is no gurantee that CitusPQFinish() is called once */ /* behave idempotently, there is no gurantee that CitusPQFinish() is called once */
if (connection->initializationState >= POOL_STATE_COUNTER_INCREMENTED) if (connection->initializationState >= POOL_STATE_COUNTER_INCREMENTED)
{ {
DecrementSharedConnectionCounter(connection->hostname, connection->port); int sharedCounterFlags = (connection->useForMaintenanceOperations)
? MAINTENANCE_CONNECTION
: 0;
DecrementSharedConnectionCounter(sharedCounterFlags, connection->hostname, connection->port);
connection->initializationState = POOL_STATE_NOT_INITIALIZED; connection->initializationState = POOL_STATE_NOT_INITIALIZED;
} }
} }

View File

@ -240,7 +240,8 @@ DeallocateReservedConnections(void)
* We have not used this reservation, make sure to clean-up from * We have not used this reservation, make sure to clean-up from
* the shared memory as well. * the shared memory as well.
*/ */
DecrementSharedConnectionCounter(entry->key.hostname, entry->key.port); int sharedCounterFlags = 0;
DecrementSharedConnectionCounter(sharedCounterFlags, entry->key.hostname, entry->key.port);
/* for completeness, set it to true */ /* for completeness, set it to true */
entry->usedReservation = true; entry->usedReservation = true;

View File

@ -84,7 +84,8 @@ typedef struct SharedWorkerNodeConnStatsHashEntry
{ {
SharedWorkerNodeConnStatsHashKey key; SharedWorkerNodeConnStatsHashKey key;
int count; int regularConnectionsCount;
int maintenanceConnectionsCount;
} SharedWorkerNodeConnStatsHashEntry; } SharedWorkerNodeConnStatsHashEntry;
/* hash entry for per database on worker stats */ /* hash entry for per database on worker stats */
@ -141,9 +142,7 @@ static uint32 SharedConnectionHashHash(const void *key, Size keysize);
static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize);
static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize); static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize);
static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize); static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize);
static bool IsConnectionToLocalNode(SharedWorkerNodeConnStatsHashKey *connKey); static bool isConnectionThrottlingDisabled();
static bool isConnectionSlotAvailable(uint32 flags, SharedWorkerNodeConnStatsHashKey *connKey,
const SharedWorkerNodeConnStatsHashEntry *connectionEntry);
static bool static bool
IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool checkLimits, const char *hostname, int port, IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool checkLimits, const char *hostname, int port,
Oid database); Oid database);
@ -152,7 +151,7 @@ static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey
int port, int port,
Oid database); Oid database);
static void static void
DecrementSharedConnectionCounterInternal(const char *hostname, int port); DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostname, int port);
PG_FUNCTION_INFO_V1(citus_remote_connection_stats); PG_FUNCTION_INFO_V1(citus_remote_connection_stats);
@ -316,11 +315,10 @@ WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port)
bool bool
TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port)
{ {
if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) if (isConnectionThrottlingDisabled())
{ {
/* connection throttling disabled */ return true;
return true; }
}
/* /*
* The local session might already have some reserved connections to the given * The local session might already have some reserved connections to the given
@ -334,7 +332,11 @@ TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int po
return true; return true;
} }
return IncrementSharedConnectionCounterInternal(flags, true, hostname, port, MyDatabaseId); return IncrementSharedConnectionCounterInternal(flags,
true,
hostname,
port,
MyDatabaseId);
} }
@ -345,13 +347,16 @@ TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int po
void void
IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port)
{ {
if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING) if (isConnectionThrottlingDisabled())
{ {
/* connection throttling disabled */
return; return;
} }
IncrementSharedConnectionCounterInternal(flags, false, hostname, port, MyDatabaseId); IncrementSharedConnectionCounterInternal(flags,
false,
hostname,
port,
MyDatabaseId);
} }
@ -392,7 +397,8 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags,
if (!workerNodeEntryFound) if (!workerNodeEntryFound)
{ {
/* we successfully allocated the entry for the first time, so initialize it */ /* we successfully allocated the entry for the first time, so initialize it */
workerNodeConnectionEntry->count = 0; workerNodeConnectionEntry->regularConnectionsCount = 0;
workerNodeConnectionEntry->maintenanceConnectionsCount = 0;
} }
/* Initialize SharedWorkerNodeDatabaseConnStatsHash the same way */ /* Initialize SharedWorkerNodeDatabaseConnStatsHash the same way */
@ -418,15 +424,57 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags,
/* Increment counter if a slot available */ /* Increment counter if a slot available */
bool connectionSlotAvailable = true; bool connectionSlotAvailable = true;
connectionSlotAvailable =
!checkLimits || /* When GetSharedPoolSizeMaintenanceQuota() == 0, treat maintenance connections as regular */
isConnectionSlotAvailable(externalFlags, bool maintenanceConnection = (GetSharedPoolSizeMaintenanceQuota() > 0 && (externalFlags & MAINTENANCE_CONNECTION));
&workerNodeKey, if (checkLimits)
workerNodeConnectionEntry); {
WorkerNode *workerNode = FindWorkerNode(hostname, port);
bool connectionToLocalNode = workerNode && (workerNode->groupId == GetLocalGroupId());
int currentConnectionsLimit = connectionToLocalNode
? GetLocalSharedPoolSize()
: GetMaxSharedPoolSize();
int maintenanceQuota = (int) ceil((double) currentConnectionsLimit * GetSharedPoolSizeMaintenanceQuota());
/* Connections limit should never go below 1 */
currentConnectionsLimit = Max(maintenanceConnection
? maintenanceQuota
: currentConnectionsLimit - maintenanceQuota, 1);
int currentConnectionsCount = maintenanceConnection
? workerNodeConnectionEntry->maintenanceConnectionsCount
: workerNodeConnectionEntry->regularConnectionsCount;
bool remoteNodeLimitExceeded = currentConnectionsCount + 1 > currentConnectionsLimit;
/*
* For local nodes, solely relying on citus.max_shared_pool_size or
* max_connections might not be sufficient. The former gives us
* a preview of the future (e.g., we let the new connections to establish,
* but they are not established yet). The latter gives us the close to
* precise view of the past (e.g., the active number of client backends).
*
* Overall, we want to limit both of the metrics. The former limit typically
* kicks in under regular loads, where the load of the database increases in
* a reasonable pace. The latter limit typically kicks in when the database
* is issued lots of concurrent sessions at the same time, such as benchmarks.
*/
bool localNodeLimitExceeded =
connectionToLocalNode &&
(GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES ||
GetExternalClientBackendCount() + 1 > currentConnectionsLimit);
if (remoteNodeLimitExceeded || localNodeLimitExceeded)
{
connectionSlotAvailable = false;
}
}
if (connectionSlotAvailable) if (connectionSlotAvailable)
{ {
workerNodeConnectionEntry->count += 1; if (maintenanceConnection)
{
workerNodeConnectionEntry->maintenanceConnectionsCount += 1;
}
else
{
workerNodeConnectionEntry->regularConnectionsCount += 1;
}
workerNodeDatabaseEntry->count += 1; workerNodeDatabaseEntry->count += 1;
} }
@ -435,86 +483,29 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags,
return connectionSlotAvailable; return connectionSlotAvailable;
} }
static bool IsConnectionToLocalNode(SharedWorkerNodeConnStatsHashKey *connKey)
{
WorkerNode *workerNode = FindWorkerNode(connKey->hostname, connKey->port);
return workerNode && (workerNode->groupId == GetLocalGroupId());
}
static bool isConnectionSlotAvailable(uint32 flags,
SharedWorkerNodeConnStatsHashKey *connKey,
const SharedWorkerNodeConnStatsHashEntry *connectionEntry)
{
bool connectionSlotAvailable = true;
bool connectionToLocalNode = IsConnectionToLocalNode(connKey);
/*
* Use full capacity for maintenance connections,
*/
int maintenanceConnectionsQuota =
(flags & MAINTENANCE_CONNECTION)
? 0
: (int) floor((double) GetMaxSharedPoolSize() * GetSharedPoolSizeMaintenanceQuota());
if (connectionToLocalNode)
{
bool remoteConnectionsForLocalQueriesDisabled =
GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES;
/*
* For local nodes, solely relying on citus.max_shared_pool_size or
* max_connections might not be sufficient. The former gives us
* a preview of the future (e.g., we let the new connections to establish,
* but they are not established yet). The latter gives us the close to
* precise view of the past (e.g., the active number of client backends).
*
* Overall, we want to limit both of the metrics. The former limit typically
* kicks in under regular loads, where the load of the database increases in
* a reasonable pace. The latter limit typically kicks in when the database
* is issued lots of concurrent sessions at the same time, such as benchmarks.
*/
bool localConnectionLimitExceeded =
GetExternalClientBackendCount() + 1 > GetLocalSharedPoolSize() ||
connectionEntry->count + 1 > GetLocalSharedPoolSize();
if (remoteConnectionsForLocalQueriesDisabled || localConnectionLimitExceeded)
{
connectionSlotAvailable = false;
}
}
else if (connectionEntry->count + 1 > (GetMaxSharedPoolSize() - maintenanceConnectionsQuota))
{
connectionSlotAvailable = false;
}
return connectionSlotAvailable;
}
/* /*
* DecrementSharedConnectionCounter decrements the shared counter * DecrementSharedConnectionCounter decrements the shared counter
* for the given hostname and port for the given count. * for the given hostname and port for the given count.
*/ */
void void
DecrementSharedConnectionCounter(const char *hostname, int port) DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int port)
{ {
/* if (isConnectionThrottlingDisabled())
* Do not call GetMaxSharedPoolSize() here, since it may read from
* the catalog and we may be in the process exit handler.
*/
if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING)
{ {
/* connection throttling disabled */
return; return;
} }
LockConnectionSharedMemory(LW_EXCLUSIVE); LockConnectionSharedMemory(LW_EXCLUSIVE);
DecrementSharedConnectionCounterInternal(hostname, port); DecrementSharedConnectionCounterInternal(externalFlags, hostname, port);
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
WakeupWaiterBackendsForSharedConnection(); WakeupWaiterBackendsForSharedConnection();
} }
static void static void
DecrementSharedConnectionCounterInternal(const char *hostname, int port) DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostname, int port)
{ {
bool workerNodeEntryFound = false; bool workerNodeEntryFound = false;
SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, port);
@ -530,10 +521,17 @@ DecrementSharedConnectionCounterInternal(const char *hostname, int port)
} }
/* we should never go below 0 */ /* we should never go below 0 */
Assert(workerNodeEntry->count > 0); Assert(workerNodeEntry->regularConnectionsCount > 0 || workerNodeEntry->maintenanceConnectionsCount > 0);
/* When GetSharedPoolSizeMaintenanceQuota() == 0, treat maintenance connections as regular */
workerNodeEntry->count -= 1; if ((GetSharedPoolSizeMaintenanceQuota() > 0 && (externalFlags & MAINTENANCE_CONNECTION)))
{
workerNodeEntry->maintenanceConnectionsCount -= 1;
}
else
{
workerNodeEntry->regularConnectionsCount -= 1;
}
/* /*
@ -543,7 +541,7 @@ DecrementSharedConnectionCounterInternal(const char *hostname, int port)
* not busy, and given the default value of MaxCachedConnectionsPerWorker = 1, * not busy, and given the default value of MaxCachedConnectionsPerWorker = 1,
* we're unlikely to trigger this often. * we're unlikely to trigger this often.
*/ */
if (workerNodeEntry->count == 0) if (workerNodeEntry->regularConnectionsCount == 0 && workerNodeEntry->maintenanceConnectionsCount == 0)
{ {
hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_REMOVE, NULL); hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_REMOVE, NULL);
} }
@ -920,3 +918,12 @@ SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize)
ca->workerNodeKey.port != cb->workerNodeKey.port || ca->workerNodeKey.port != cb->workerNodeKey.port ||
ca->database != cb->database; ca->database != cb->database;
} }
static bool isConnectionThrottlingDisabled()
{
/*
* Do not call Get*PoolSize() functions here, since it may read from
* the catalog and we may be in the process exit handler.
*/
return MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING;
}

View File

@ -39,7 +39,7 @@
/* Config variables managed via guc.c */ /* Config variables managed via guc.c */
char *WorkerListFileName; char *WorkerListFileName;
int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size */ int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size */
int DatabasesPerWorker = 10; /* determine database per worker hash table size */ int DatabasesPerWorker = 1; /* determine database per worker hash table size */
/* Local functions forward declarations */ /* Local functions forward declarations */

View File

@ -159,7 +159,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
bool recoveryFailed = false; bool recoveryFailed = false;
int connectionFlags = 0; int connectionFlags = WAIT_FOR_CONNECTION | REQUIRE_MAINTENANCE_CONNECTION;
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK) if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK)
{ {

View File

@ -127,8 +127,8 @@ enum MultiConnectionMode
/* /*
* This flag specifies that connection is required for maintenance operations, e.g. * This flag specifies that connection is required for maintenance operations, e.g.
* transaction recovery, distributed deadlock detection. Such connections may have * transaction recovery, distributed deadlock detection. Such connections have
* special treatment, like dedicated share of pool, etc. * a reserved quota of the MaxSharedPoolSize.
*/ */
REQUIRE_MAINTENANCE_CONNECTION = 1 << 9 REQUIRE_MAINTENANCE_CONNECTION = 1 << 9
}; };
@ -230,6 +230,9 @@ typedef struct MultiConnection
/* replication option */ /* replication option */
bool requiresReplication; bool requiresReplication;
/* See REQUIRE_MAINTENANCE_CONNECTION */
bool useForMaintenanceOperations;
MultiConnectionStructInitializationState initializationState; MultiConnectionStructInitializationState initializationState;
} MultiConnection; } MultiConnection;

View File

@ -41,7 +41,7 @@ extern double GetSharedPoolSizeMaintenanceQuota(void);
extern int GetLocalSharedPoolSize(void); extern int GetLocalSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port); extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port);
extern void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port); extern void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port);
extern void DecrementSharedConnectionCounter(const char *hostname, int port); extern void DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int port);
extern void IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port); extern void IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port);
extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode, int extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode, int
activeConnectionCount); activeConnectionCount);

View File

@ -224,7 +224,7 @@ BEGIN;
COMMIT; COMMIT;
-- pg_sleep forces almost 1 connection per placement -- pg_sleep forces almost 1 connection per placement
-- now, some of the optional connections would be skipped, -- now, some of the optional connections would be skipped,
-- and only 5 connections are used per node -- and only 4 connections (5 minus the maintenance quota) are used per node
BEGIN; BEGIN;
SET LOCAL citus.max_adaptive_executor_pool_size TO 16; SET LOCAL citus.max_adaptive_executor_pool_size TO 16;
with cte_1 as (select pg_sleep(0.1) is null, a from test) SELECT a from cte_1 ORDER By 1 LIMIT 1; with cte_1 as (select pg_sleep(0.1) is null, a from test) SELECT a from cte_1 ORDER By 1 LIMIT 1;
@ -244,8 +244,8 @@ BEGIN;
hostname, port; hostname, port;
connection_count_to_node connection_count_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
5 4
5 4
(2 rows) (2 rows)
COMMIT; COMMIT;
@ -382,8 +382,8 @@ COPY test FROM PROGRAM 'seq 32';
hostname, port; hostname, port;
connection_count_to_node connection_count_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
3 2
3 2
(2 rows) (2 rows)
ROLLBACK; ROLLBACK;
@ -404,7 +404,7 @@ BEGIN;
hostname, port; hostname, port;
connection_count_to_node connection_count_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
3 2
1 1
(2 rows) (2 rows)
@ -423,7 +423,7 @@ COPY test FROM STDIN;
hostname, port; hostname, port;
connection_count_to_node connection_count_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
3 2
1 1
(2 rows) (2 rows)
@ -450,7 +450,7 @@ BEGIN;
hostname, port; hostname, port;
connection_count_to_node connection_count_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
3 2
(1 row) (1 row)
-- in this second COPY, we access the same node but different shards -- in this second COPY, we access the same node but different shards
@ -468,7 +468,7 @@ COPY test FROM STDIN;
hostname, port; hostname, port;
connection_count_to_node connection_count_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
3 2
1 1
(2 rows) (2 rows)

View File

@ -146,7 +146,7 @@ COMMIT;
-- pg_sleep forces almost 1 connection per placement -- pg_sleep forces almost 1 connection per placement
-- now, some of the optional connections would be skipped, -- now, some of the optional connections would be skipped,
-- and only 5 connections are used per node -- and only 4 connections (5 minus the maintenance quota) are used per node
BEGIN; BEGIN;
SET LOCAL citus.max_adaptive_executor_pool_size TO 16; SET LOCAL citus.max_adaptive_executor_pool_size TO 16;
with cte_1 as (select pg_sleep(0.1) is null, a from test) SELECT a from cte_1 ORDER By 1 LIMIT 1; with cte_1 as (select pg_sleep(0.1) is null, a from test) SELECT a from cte_1 ORDER By 1 LIMIT 1;