- Adapt locally_reserved_shared_connections to maintenance connection pool

- Fixed tests
pull/7286/head
ivyazmitinov 2023-11-30 15:16:48 +03:00
parent 09917f846f
commit 115ed00c06
5 changed files with 34 additions and 29 deletions

View File

@ -2160,6 +2160,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
copyDest->connectionStateHash = CreateConnectionStateHash(TopTransactionContext);
RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML);
uint32 connectionFlags = 0;
/*
* Colocated intermediate results do not honor citus.max_shared_pool_size,
@ -2181,7 +2182,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
* and cannot switch to local execution (e.g., disabled by user),
* COPY would fail hinting the user to change the relevant settiing.
*/
EnsureConnectionPossibilityForRemotePrimaryNodes();
EnsureConnectionPossibilityForRemotePrimaryNodes(connectionFlags);
}
LocalCopyStatus localCopyStatus = GetLocalCopyStatus();
@ -2211,7 +2212,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
*/
if (ShardIntervalListHasLocalPlacements(shardIntervalList))
{
bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode();
bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode(connectionFlags);
copyDest->shouldUseLocalCopy = !reservedConnection;
}
}
@ -3634,7 +3635,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
return connection;
}
if (IsReservationPossible())
if (IsReservationPossible(connectionFlags))
{
/*
* Enforce the requirements for adaptive connection management

View File

@ -92,9 +92,10 @@ static ReservedConnectionHashEntry * AllocateOrGetReservedConnectionEntry(char *
userId, Oid
databaseOid,
bool *found);
static void EnsureConnectionPossibilityForNodeList(List *nodeList);
static void EnsureConnectionPossibilityForNodeList(List *nodeList, uint32 connectionFlags);
static bool EnsureConnectionPossibilityForNode(WorkerNode *workerNode,
bool waitForConnection);
bool waitForConnection,
uint32 connectionFlags);
static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize);
static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize);
@ -296,7 +297,7 @@ MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId,
* EnsureConnectionPossibilityForNodeList.
*/
void
EnsureConnectionPossibilityForRemotePrimaryNodes(void)
EnsureConnectionPossibilityForRemotePrimaryNodes(uint32 connectionFlags)
{
/*
* By using NoLock there is a tiny risk of that we miss to reserve a
@ -305,7 +306,7 @@ EnsureConnectionPossibilityForRemotePrimaryNodes(void)
* going to access would be on the new node.
*/
List *remoteNodeList = ActivePrimaryRemoteNodeList(NoLock);
EnsureConnectionPossibilityForNodeList(remoteNodeList);
EnsureConnectionPossibilityForNodeList(remoteNodeList, connectionFlags);
}
@ -315,7 +316,7 @@ EnsureConnectionPossibilityForRemotePrimaryNodes(void)
* If not, the function returns false.
*/
bool
TryConnectionPossibilityForLocalPrimaryNode(void)
TryConnectionPossibilityForLocalPrimaryNode(uint32 connectionFlags)
{
bool nodeIsInMetadata = false;
WorkerNode *localNode =
@ -331,7 +332,7 @@ TryConnectionPossibilityForLocalPrimaryNode(void)
}
bool waitForConnection = false;
return EnsureConnectionPossibilityForNode(localNode, waitForConnection);
return EnsureConnectionPossibilityForNode(localNode, waitForConnection, connectionFlags);
}
@ -345,7 +346,7 @@ TryConnectionPossibilityForLocalPrimaryNode(void)
* single reservation per backend)
*/
static void
EnsureConnectionPossibilityForNodeList(List *nodeList)
EnsureConnectionPossibilityForNodeList(List *nodeList, uint32 connectionFlags)
{
/*
* We sort the workerList because adaptive connection management
@ -364,7 +365,7 @@ EnsureConnectionPossibilityForNodeList(List *nodeList)
foreach_ptr(workerNode, nodeList)
{
bool waitForConnection = true;
EnsureConnectionPossibilityForNode(workerNode, waitForConnection);
EnsureConnectionPossibilityForNode(workerNode, waitForConnection, connectionFlags);
}
}
@ -383,9 +384,9 @@ EnsureConnectionPossibilityForNodeList(List *nodeList)
* return false.
*/
static bool
EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection)
EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection, uint32 connectionFlags)
{
if (!IsReservationPossible())
if (!IsReservationPossible(connectionFlags))
{
return false;
}
@ -479,10 +480,13 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio
* session is eligible for shared connection reservation.
*/
bool
IsReservationPossible(void)
IsReservationPossible(uint32 connectionFlags)
{
// TODO add check for maintenance connection
if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING)
bool connectionThrottlingDisabled =
connectionFlags & REQUIRE_MAINTENANCE_CONNECTION
? GetMaxMaintenanceSharedPoolSize() <= 0
: GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING;
if (connectionThrottlingDisabled)
{
/* connection throttling disabled */
return false;

View File

@ -20,8 +20,8 @@ extern bool CanUseReservedConnection(const char *hostName, int nodePort,
extern void MarkReservedConnectionUsed(const char *hostName, int nodePort,
Oid userId, Oid databaseOid);
extern void DeallocateReservedConnections(void);
extern void EnsureConnectionPossibilityForRemotePrimaryNodes(void);
extern bool TryConnectionPossibilityForLocalPrimaryNode(void);
extern bool IsReservationPossible(void);
extern void EnsureConnectionPossibilityForRemotePrimaryNodes(uint32 connectionFlags);
extern bool TryConnectionPossibilityForLocalPrimaryNode(uint32 connectionFlags);
extern bool IsReservationPossible(uint32 connectionFlags);
#endif /* LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ */

View File

@ -224,7 +224,7 @@ BEGIN;
COMMIT;
-- pg_sleep forces almost 1 connection per placement
-- now, some of the optional connections would be skipped,
-- and only 4 connections (5 minus the maintenance quota) are used per node
-- and only 5 connections are used per node
BEGIN;
SET LOCAL citus.max_adaptive_executor_pool_size TO 16;
with cte_1 as (select pg_sleep(0.1) is null, a from test) SELECT a from cte_1 ORDER By 1 LIMIT 1;
@ -244,8 +244,8 @@ BEGIN;
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
4
4
5
5
(2 rows)
COMMIT;
@ -382,8 +382,8 @@ COPY test FROM PROGRAM 'seq 32';
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
2
2
3
3
(2 rows)
ROLLBACK;
@ -404,7 +404,7 @@ BEGIN;
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
2
3
1
(2 rows)
@ -423,7 +423,7 @@ COPY test FROM STDIN;
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
2
3
1
(2 rows)
@ -450,7 +450,7 @@ BEGIN;
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
2
3
(1 row)
-- in this second COPY, we access the same node but different shards
@ -468,7 +468,7 @@ COPY test FROM STDIN;
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
2
3
1
(2 rows)

View File

@ -146,7 +146,7 @@ COMMIT;
-- pg_sleep forces almost 1 connection per placement
-- now, some of the optional connections would be skipped,
-- and only 4 connections (5 minus the maintenance quota) are used per node
-- and only 5 connections are used per node
BEGIN;
SET LOCAL citus.max_adaptive_executor_pool_size TO 16;
with cte_1 as (select pg_sleep(0.1) is null, a from test) SELECT a from cte_1 ORDER By 1 LIMIT 1;