From 640717bea242177921721a88df4e92f5b5242558 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 15 May 2020 17:11:40 +0200 Subject: [PATCH] Copy doesn't use more than MaxAdaptiveExecutor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Hanefi Önaldı --- src/backend/distributed/commands/multi_copy.c | 174 +++++++++++++++++- .../expected/shared_connection_stats.out | 20 ++ .../regress/sql/shared_connection_stats.sql | 49 +++++ 3 files changed, 235 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 344c538c0..6e871cd47 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -155,6 +155,9 @@ typedef struct CopyConnectionState * In this case, old activePlacementState isn't NULL, is added to this list. */ dlist_head bufferedPlacementList; + + /* length of bufferedPlacementList, to avoid iterations over the list when needed */ + int bufferedPlacementCount; } CopyConnectionState; @@ -247,9 +250,15 @@ static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool shouldUseLocalCopy, CopyOutState copyOutState, bool isCopyToIntermediateFile); -static MultiConnection * CopyGetPlacementConnection(ShardPlacement *placement, +static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash, + ShardPlacement *placement, bool stopOnFailure); +static bool HasReachedAdaptiveExecutorPoolSize(List *connectionStateHash); +static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateList, + char *nodeName, int nodePort); static List * ConnectionStateList(HTAB *connectionStateHash); +static List * ConnectionStateListToNode(HTAB *connectionStateHash, + char *hostname, int port); static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, bool stopOnFailure, bool @@ -282,6 +291,14 @@ static void CopyAttributeOutText(CopyOutState outputState, char *string); static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer); static bool CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest); +static void AddPlacementStateToCopyConnectionStateBuffer(CopyConnectionState * + connectionState, + CopyPlacementState * + placementState); +static void RemovePlacementStateFromCopyConnectionStateBuffer(CopyConnectionState * + connectionState, + CopyPlacementState * + placementState); static uint64 ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *columnNulls); @@ -2348,8 +2365,8 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest /* before switching, make sure to finish the copy */ EndPlacementStateCopyCommand(activePlacementState, copyOutState); - dlist_push_head(&connectionState->bufferedPlacementList, - &activePlacementState->bufferedPlacementNode); + AddPlacementStateToCopyConnectionStateBuffer(connectionState, + activePlacementState); } if (switchToCurrentPlacement) @@ -2357,7 +2374,9 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest StartPlacementStateCopyCommand(currentPlacementState, copyStatement, copyOutState); - dlist_delete(¤tPlacementState->bufferedPlacementNode); + RemovePlacementStateFromCopyConnectionStateBuffer(connectionState, + currentPlacementState); + connectionState->activePlacementState = currentPlacementState; /* send previously buffered tuples */ @@ -2411,6 +2430,35 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest } +/* + * AddPlacementStateToCopyConnectionStateBuffer is a helper function to add a placement + * state to connection state's placement buffer. In addition to that, keep the counter + * up to date. + */ +static void +AddPlacementStateToCopyConnectionStateBuffer(CopyConnectionState *connectionState, + CopyPlacementState *placementState) +{ + dlist_push_head(&connectionState->bufferedPlacementList, + &placementState->bufferedPlacementNode); + connectionState->bufferedPlacementCount++; +} + + +/* + * RemovePlacementStateFromCopyConnectionStateBuffer is a helper function to removes a placement + * state from connection state's placement buffer. In addition to that, keep the counter + * up to date. + */ +static void +RemovePlacementStateFromCopyConnectionStateBuffer(CopyConnectionState *connectionState, + CopyPlacementState *placementState) +{ + dlist_delete(&placementState->bufferedPlacementNode); + connectionState->bufferedPlacementCount--; +} + + /* * ContainsLocalPlacement returns true if the current node has * a local placement for the given shard id. @@ -3230,6 +3278,7 @@ GetConnectionState(HTAB *connectionStateHash, MultiConnection *connection) connectionState->socket = sock; connectionState->connection = connection; connectionState->activePlacementState = NULL; + connectionState->bufferedPlacementCount = 0; dlist_init(&connectionState->bufferedPlacementList); } @@ -3262,6 +3311,36 @@ ConnectionStateList(HTAB *connectionStateHash) } +/* + * ConnectionStateListToNode returns all CopyConnectionState structures in + * the given hash. + */ +static List * +ConnectionStateListToNode(HTAB *connectionStateHash, char *hostname, int port) +{ + List *connectionStateList = NIL; + HASH_SEQ_STATUS status; + + hash_seq_init(&status, connectionStateHash); + + CopyConnectionState *connectionState = + (CopyConnectionState *) hash_seq_search(&status); + while (connectionState != NULL) + { + char *connectionHostname = connectionState->connection->hostname; + if (strncmp(connectionHostname, hostname, MAX_NODE_LENGTH) == 0 && + connectionState->connection->port == port) + { + connectionStateList = lappend(connectionStateList, connectionState); + } + + connectionState = (CopyConnectionState *) hash_seq_search(&status); + } + + return connectionStateList; +} + + /* * GetShardState finds existing CopyShardState for a shard in the provided * hash. If not found, then a new shard state is returned with all related @@ -3347,7 +3426,7 @@ InitializeCopyShardState(CopyShardState *shardState, } MultiConnection *connection = - CopyGetPlacementConnection(placement, stopOnFailure); + CopyGetPlacementConnection(connectionStateHash, placement, stopOnFailure); if (connection == NULL) { failedPlacementCount++; @@ -3379,8 +3458,7 @@ InitializeCopyShardState(CopyShardState *shardState, * same time as calling StartPlacementStateCopyCommand() so we actually * know the COPY operation for the placement is ongoing. */ - dlist_push_head(&connectionState->bufferedPlacementList, - &placementState->bufferedPlacementNode); + AddPlacementStateToCopyConnectionStateBuffer(connectionState, placementState); shardState->placementStateList = lappend(shardState->placementStateList, placementState); } @@ -3444,7 +3522,8 @@ LogLocalCopyExecution(uint64 shardId) * then it reuses the connection. Otherwise, it requests a connection for placement. */ static MultiConnection * -CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure) +CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, bool + stopOnFailure) { uint32 connectionFlags = FOR_DML; char *nodeUser = CurrentUserName(); @@ -3464,6 +3543,29 @@ CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure) return connection; } + /* + * If we exceeded citus.max_adaptive_executor_pool_size, we should re-use the + * existing connections to multiplex multiple COPY commands on shards over a + * single connection. + */ + char *nodeName = placement->nodeName; + int nodePort = placement->nodePort; + List *copyConnectionStateList = + ConnectionStateListToNode(connectionStateHash, nodeName, nodePort); + if (HasReachedAdaptiveExecutorPoolSize(copyConnectionStateList)) + { + connection = + GetLeastUtilisedCopyConnection(copyConnectionStateList, nodeName, nodePort); + + /* + * If we've already reached the executor pool size, there should be at + * least one connection to any given node. + */ + Assert(connection != NULL); + + return connection; + } + /* * For placements that haven't been assigned a connection by a previous command * in the current transaction, we use a separate connection per placement for @@ -3509,6 +3611,62 @@ CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure) } +/* + * HasReachedAdaptiveExecutorPoolSize returns true if the number of entries in input + * connection list has greater than or equal to citus.max_adaptive_executor_pool_size. + */ +static bool +HasReachedAdaptiveExecutorPoolSize(List *connectionStateList) +{ + if (list_length(connectionStateList) >= MaxAdaptiveExecutorPoolSize) + { + /* + * We've not reached MaxAdaptiveExecutorPoolSize number of + * connections, so we're allowed to establish a new + * connection to the given node. + */ + return true; + } + + return false; +} + + +/* + * GetLeastUtilisedCopyConnection returns a MultiConnection to the given node + * with the least number of placements assigned to it. + */ +static MultiConnection * +GetLeastUtilisedCopyConnection(List *connectionStateList, char *nodeName, + int nodePort) +{ + MultiConnection *connection = NULL; + int minPlacementCount = INT32_MAX; + ListCell *connectionStateCell = NULL; + + foreach(connectionStateCell, connectionStateList) + { + CopyConnectionState *connectionState = lfirst(connectionStateCell); + int currentConnectionPlacementCount = connectionState->bufferedPlacementCount; + + if (connectionState->activePlacementState != NULL) + { + currentConnectionPlacementCount++; + } + + Assert(currentConnectionPlacementCount > 0); + + if (currentConnectionPlacementCount < minPlacementCount) + { + minPlacementCount = currentConnectionPlacementCount; + connection = connectionState->connection; + } + } + + return connection; +} + + /* * StartPlacementStateCopyCommand sends the COPY for the given placement. It also * sends binary headers if this is a binary COPY. diff --git a/src/test/regress/expected/shared_connection_stats.out b/src/test/regress/expected/shared_connection_stats.out index 262c8a6b6..3af0ed1ad 100644 --- a/src/test/regress/expected/shared_connection_stats.out +++ b/src/test/regress/expected/shared_connection_stats.out @@ -320,6 +320,26 @@ BEGIN; (2 rows) COMMIT; +BEGIN; + -- now allow at most 2 connections for COPY + SET LOCAL citus.max_adaptive_executor_pool_size TO 2; +COPY test FROM STDIN; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 2 + 2 +(2 rows) + +ROLLBACK; -- now show that when max_cached_conns_per_worker > 1 -- Citus forces the first execution to open at least 2 -- connections that are cached. Later, that 2 cached diff --git a/src/test/regress/sql/shared_connection_stats.sql b/src/test/regress/sql/shared_connection_stats.sql index 8200e416d..a0cb1cef0 100644 --- a/src/test/regress/sql/shared_connection_stats.sql +++ b/src/test/regress/sql/shared_connection_stats.sql @@ -190,6 +190,55 @@ BEGIN; hostname, port; COMMIT; +BEGIN; + -- now allow at most 2 connections for COPY + SET LOCAL citus.max_adaptive_executor_pool_size TO 2; + +COPY test FROM STDIN; +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +20 +21 +22 +23 +24 +25 +26 +27 +28 +29 +30 +31 +32 +\. + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +ROLLBACK; -- now show that when max_cached_conns_per_worker > 1 -- Citus forces the first execution to open at least 2