diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 23847ac01..0db780d1b 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2160,6 +2160,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyDest->connectionStateHash = CreateConnectionStateHash(TopTransactionContext); RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML); + uint32 connectionFlags = 0; /* * Colocated intermediate results do not honor citus.max_shared_pool_size, @@ -2181,7 +2182,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, * and cannot switch to local execution (e.g., disabled by user), * COPY would fail hinting the user to change the relevant settiing. */ - EnsureConnectionPossibilityForRemotePrimaryNodes(); + EnsureConnectionPossibilityForRemotePrimaryNodes(connectionFlags); } LocalCopyStatus localCopyStatus = GetLocalCopyStatus(); @@ -2211,7 +2212,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, */ if (ShardIntervalListHasLocalPlacements(shardIntervalList)) { - bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode(); + bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode(connectionFlags); copyDest->shouldUseLocalCopy = !reservedConnection; } } @@ -3634,7 +3635,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, return connection; } - if (IsReservationPossible()) + if (IsReservationPossible(connectionFlags)) { /* * Enforce the requirements for adaptive connection management diff --git a/src/backend/distributed/connection/locally_reserved_shared_connections.c b/src/backend/distributed/connection/locally_reserved_shared_connections.c index 023c9b7c0..a9079f752 100644 --- a/src/backend/distributed/connection/locally_reserved_shared_connections.c +++ b/src/backend/distributed/connection/locally_reserved_shared_connections.c @@ -92,9 +92,10 @@ static ReservedConnectionHashEntry * AllocateOrGetReservedConnectionEntry(char * userId, Oid databaseOid, bool *found); -static void EnsureConnectionPossibilityForNodeList(List *nodeList); +static void EnsureConnectionPossibilityForNodeList(List *nodeList, uint32 connectionFlags); static bool EnsureConnectionPossibilityForNode(WorkerNode *workerNode, - bool waitForConnection); + bool waitForConnection, + uint32 connectionFlags); static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize); static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize); @@ -296,7 +297,7 @@ MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId, * EnsureConnectionPossibilityForNodeList. */ void -EnsureConnectionPossibilityForRemotePrimaryNodes(void) +EnsureConnectionPossibilityForRemotePrimaryNodes(uint32 connectionFlags) { /* * By using NoLock there is a tiny risk of that we miss to reserve a @@ -305,7 +306,7 @@ EnsureConnectionPossibilityForRemotePrimaryNodes(void) * going to access would be on the new node. */ List *remoteNodeList = ActivePrimaryRemoteNodeList(NoLock); - EnsureConnectionPossibilityForNodeList(remoteNodeList); + EnsureConnectionPossibilityForNodeList(remoteNodeList, connectionFlags); } @@ -315,7 +316,7 @@ EnsureConnectionPossibilityForRemotePrimaryNodes(void) * If not, the function returns false. */ bool -TryConnectionPossibilityForLocalPrimaryNode(void) +TryConnectionPossibilityForLocalPrimaryNode(uint32 connectionFlags) { bool nodeIsInMetadata = false; WorkerNode *localNode = @@ -331,7 +332,7 @@ TryConnectionPossibilityForLocalPrimaryNode(void) } bool waitForConnection = false; - return EnsureConnectionPossibilityForNode(localNode, waitForConnection); + return EnsureConnectionPossibilityForNode(localNode, waitForConnection, connectionFlags); } @@ -345,7 +346,7 @@ TryConnectionPossibilityForLocalPrimaryNode(void) * single reservation per backend) */ static void -EnsureConnectionPossibilityForNodeList(List *nodeList) +EnsureConnectionPossibilityForNodeList(List *nodeList, uint32 connectionFlags) { /* * We sort the workerList because adaptive connection management @@ -364,7 +365,7 @@ EnsureConnectionPossibilityForNodeList(List *nodeList) foreach_ptr(workerNode, nodeList) { bool waitForConnection = true; - EnsureConnectionPossibilityForNode(workerNode, waitForConnection); + EnsureConnectionPossibilityForNode(workerNode, waitForConnection, connectionFlags); } } @@ -383,9 +384,9 @@ EnsureConnectionPossibilityForNodeList(List *nodeList) * return false. */ static bool -EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection) +EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection, uint32 connectionFlags) { - if (!IsReservationPossible()) + if (!IsReservationPossible(connectionFlags)) { return false; } @@ -479,10 +480,13 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio * session is eligible for shared connection reservation. */ bool -IsReservationPossible(void) +IsReservationPossible(uint32 connectionFlags) { - // TODO add check for maintenance connection - if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) + bool connectionThrottlingDisabled = + connectionFlags & REQUIRE_MAINTENANCE_CONNECTION + ? GetMaxMaintenanceSharedPoolSize() <= 0 + : GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING; + if (connectionThrottlingDisabled) { /* connection throttling disabled */ return false; diff --git a/src/include/distributed/locally_reserved_shared_connections.h b/src/include/distributed/locally_reserved_shared_connections.h index adec8c9c4..637cbce47 100644 --- a/src/include/distributed/locally_reserved_shared_connections.h +++ b/src/include/distributed/locally_reserved_shared_connections.h @@ -20,8 +20,8 @@ extern bool CanUseReservedConnection(const char *hostName, int nodePort, extern void MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId, Oid databaseOid); extern void DeallocateReservedConnections(void); -extern void EnsureConnectionPossibilityForRemotePrimaryNodes(void); -extern bool TryConnectionPossibilityForLocalPrimaryNode(void); -extern bool IsReservationPossible(void); +extern void EnsureConnectionPossibilityForRemotePrimaryNodes(uint32 connectionFlags); +extern bool TryConnectionPossibilityForLocalPrimaryNode(uint32 connectionFlags); +extern bool IsReservationPossible(uint32 connectionFlags); #endif /* LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ */ diff --git a/src/test/regress/expected/shared_connection_stats.out b/src/test/regress/expected/shared_connection_stats.out index 91d35db5d..0ce22548f 100644 --- a/src/test/regress/expected/shared_connection_stats.out +++ b/src/test/regress/expected/shared_connection_stats.out @@ -224,7 +224,7 @@ BEGIN; COMMIT; -- pg_sleep forces almost 1 connection per placement -- now, some of the optional connections would be skipped, --- and only 4 connections (5 minus the maintenance quota) are used per node +-- and only 5 connections are used per node BEGIN; SET LOCAL citus.max_adaptive_executor_pool_size TO 16; with cte_1 as (select pg_sleep(0.1) is null, a from test) SELECT a from cte_1 ORDER By 1 LIMIT 1; @@ -244,8 +244,8 @@ BEGIN; hostname, port; connection_count_to_node --------------------------------------------------------------------- - 4 - 4 + 5 + 5 (2 rows) COMMIT; @@ -382,8 +382,8 @@ COPY test FROM PROGRAM 'seq 32'; hostname, port; connection_count_to_node --------------------------------------------------------------------- - 2 - 2 + 3 + 3 (2 rows) ROLLBACK; @@ -404,7 +404,7 @@ BEGIN; hostname, port; connection_count_to_node --------------------------------------------------------------------- - 2 + 3 1 (2 rows) @@ -423,7 +423,7 @@ COPY test FROM STDIN; hostname, port; connection_count_to_node --------------------------------------------------------------------- - 2 + 3 1 (2 rows) @@ -450,7 +450,7 @@ BEGIN; hostname, port; connection_count_to_node --------------------------------------------------------------------- - 2 + 3 (1 row) -- in this second COPY, we access the same node but different shards @@ -468,7 +468,7 @@ COPY test FROM STDIN; hostname, port; connection_count_to_node --------------------------------------------------------------------- - 2 + 3 1 (2 rows) diff --git a/src/test/regress/sql/shared_connection_stats.sql b/src/test/regress/sql/shared_connection_stats.sql index 7c040af5c..7c653e788 100644 --- a/src/test/regress/sql/shared_connection_stats.sql +++ b/src/test/regress/sql/shared_connection_stats.sql @@ -146,7 +146,7 @@ COMMIT; -- pg_sleep forces almost 1 connection per placement -- now, some of the optional connections would be skipped, --- and only 4 connections (5 minus the maintenance quota) are used per node +-- and only 5 connections are used per node BEGIN; SET LOCAL citus.max_adaptive_executor_pool_size TO 16; with cte_1 as (select pg_sleep(0.1) is null, a from test) SELECT a from cte_1 ORDER By 1 LIMIT 1;