Merge pull request #3841 from citusdata/copy_max_adaptive_executor_pool_size

pull/3893/head^2
Hanefi Onaldi 2020-06-10 17:25:12 +03:00 committed by GitHub
commit 6e9324e99d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 235 additions and 8 deletions

View File

@ -155,6 +155,9 @@ typedef struct CopyConnectionState
* In this case, old activePlacementState isn't NULL, is added to this list. * In this case, old activePlacementState isn't NULL, is added to this list.
*/ */
dlist_head bufferedPlacementList; dlist_head bufferedPlacementList;
/* length of bufferedPlacementList, to avoid iterations over the list when needed */
int bufferedPlacementCount;
} CopyConnectionState; } CopyConnectionState;
@ -247,9 +250,15 @@ static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash,
HTAB *connectionStateHash, bool stopOnFailure, HTAB *connectionStateHash, bool stopOnFailure,
bool *found, bool shouldUseLocalCopy, CopyOutState bool *found, bool shouldUseLocalCopy, CopyOutState
copyOutState, bool isCopyToIntermediateFile); copyOutState, bool isCopyToIntermediateFile);
static MultiConnection * CopyGetPlacementConnection(ShardPlacement *placement, static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash,
ShardPlacement *placement,
bool stopOnFailure); bool stopOnFailure);
static bool HasReachedAdaptiveExecutorPoolSize(List *connectionStateHash);
static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateList,
char *nodeName, int nodePort);
static List * ConnectionStateList(HTAB *connectionStateHash); static List * ConnectionStateList(HTAB *connectionStateHash);
static List * ConnectionStateListToNode(HTAB *connectionStateHash,
char *hostname, int port);
static void InitializeCopyShardState(CopyShardState *shardState, static void InitializeCopyShardState(CopyShardState *shardState,
HTAB *connectionStateHash, HTAB *connectionStateHash,
uint64 shardId, bool stopOnFailure, bool uint64 shardId, bool stopOnFailure, bool
@ -282,6 +291,14 @@ static void CopyAttributeOutText(CopyOutState outputState, char *string);
static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer); static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer);
static bool CitusSendTupleToPlacements(TupleTableSlot *slot, static bool CitusSendTupleToPlacements(TupleTableSlot *slot,
CitusCopyDestReceiver *copyDest); CitusCopyDestReceiver *copyDest);
static void AddPlacementStateToCopyConnectionStateBuffer(CopyConnectionState *
connectionState,
CopyPlacementState *
placementState);
static void RemovePlacementStateFromCopyConnectionStateBuffer(CopyConnectionState *
connectionState,
CopyPlacementState *
placementState);
static uint64 ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, static uint64 ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues,
bool *columnNulls); bool *columnNulls);
@ -2348,8 +2365,8 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
/* before switching, make sure to finish the copy */ /* before switching, make sure to finish the copy */
EndPlacementStateCopyCommand(activePlacementState, copyOutState); EndPlacementStateCopyCommand(activePlacementState, copyOutState);
dlist_push_head(&connectionState->bufferedPlacementList, AddPlacementStateToCopyConnectionStateBuffer(connectionState,
&activePlacementState->bufferedPlacementNode); activePlacementState);
} }
if (switchToCurrentPlacement) if (switchToCurrentPlacement)
@ -2357,7 +2374,9 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
StartPlacementStateCopyCommand(currentPlacementState, copyStatement, StartPlacementStateCopyCommand(currentPlacementState, copyStatement,
copyOutState); copyOutState);
dlist_delete(&currentPlacementState->bufferedPlacementNode); RemovePlacementStateFromCopyConnectionStateBuffer(connectionState,
currentPlacementState);
connectionState->activePlacementState = currentPlacementState; connectionState->activePlacementState = currentPlacementState;
/* send previously buffered tuples */ /* send previously buffered tuples */
@ -2411,6 +2430,35 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
} }
/*
* AddPlacementStateToCopyConnectionStateBuffer is a helper function to add a placement
* state to connection state's placement buffer. In addition to that, keep the counter
* up to date.
*/
static void
AddPlacementStateToCopyConnectionStateBuffer(CopyConnectionState *connectionState,
CopyPlacementState *placementState)
{
dlist_push_head(&connectionState->bufferedPlacementList,
&placementState->bufferedPlacementNode);
connectionState->bufferedPlacementCount++;
}
/*
* RemovePlacementStateFromCopyConnectionStateBuffer is a helper function to removes a placement
* state from connection state's placement buffer. In addition to that, keep the counter
* up to date.
*/
static void
RemovePlacementStateFromCopyConnectionStateBuffer(CopyConnectionState *connectionState,
CopyPlacementState *placementState)
{
dlist_delete(&placementState->bufferedPlacementNode);
connectionState->bufferedPlacementCount--;
}
/* /*
* ContainsLocalPlacement returns true if the current node has * ContainsLocalPlacement returns true if the current node has
* a local placement for the given shard id. * a local placement for the given shard id.
@ -3230,6 +3278,7 @@ GetConnectionState(HTAB *connectionStateHash, MultiConnection *connection)
connectionState->socket = sock; connectionState->socket = sock;
connectionState->connection = connection; connectionState->connection = connection;
connectionState->activePlacementState = NULL; connectionState->activePlacementState = NULL;
connectionState->bufferedPlacementCount = 0;
dlist_init(&connectionState->bufferedPlacementList); dlist_init(&connectionState->bufferedPlacementList);
} }
@ -3262,6 +3311,36 @@ ConnectionStateList(HTAB *connectionStateHash)
} }
/*
* ConnectionStateListToNode returns all CopyConnectionState structures in
* the given hash.
*/
static List *
ConnectionStateListToNode(HTAB *connectionStateHash, char *hostname, int port)
{
List *connectionStateList = NIL;
HASH_SEQ_STATUS status;
hash_seq_init(&status, connectionStateHash);
CopyConnectionState *connectionState =
(CopyConnectionState *) hash_seq_search(&status);
while (connectionState != NULL)
{
char *connectionHostname = connectionState->connection->hostname;
if (strncmp(connectionHostname, hostname, MAX_NODE_LENGTH) == 0 &&
connectionState->connection->port == port)
{
connectionStateList = lappend(connectionStateList, connectionState);
}
connectionState = (CopyConnectionState *) hash_seq_search(&status);
}
return connectionStateList;
}
/* /*
* GetShardState finds existing CopyShardState for a shard in the provided * GetShardState finds existing CopyShardState for a shard in the provided
* hash. If not found, then a new shard state is returned with all related * hash. If not found, then a new shard state is returned with all related
@ -3347,7 +3426,7 @@ InitializeCopyShardState(CopyShardState *shardState,
} }
MultiConnection *connection = MultiConnection *connection =
CopyGetPlacementConnection(placement, stopOnFailure); CopyGetPlacementConnection(connectionStateHash, placement, stopOnFailure);
if (connection == NULL) if (connection == NULL)
{ {
failedPlacementCount++; failedPlacementCount++;
@ -3379,8 +3458,7 @@ InitializeCopyShardState(CopyShardState *shardState,
* same time as calling StartPlacementStateCopyCommand() so we actually * same time as calling StartPlacementStateCopyCommand() so we actually
* know the COPY operation for the placement is ongoing. * know the COPY operation for the placement is ongoing.
*/ */
dlist_push_head(&connectionState->bufferedPlacementList, AddPlacementStateToCopyConnectionStateBuffer(connectionState, placementState);
&placementState->bufferedPlacementNode);
shardState->placementStateList = lappend(shardState->placementStateList, shardState->placementStateList = lappend(shardState->placementStateList,
placementState); placementState);
} }
@ -3444,7 +3522,8 @@ LogLocalCopyExecution(uint64 shardId)
* then it reuses the connection. Otherwise, it requests a connection for placement. * then it reuses the connection. Otherwise, it requests a connection for placement.
*/ */
static MultiConnection * static MultiConnection *
CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure) CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, bool
stopOnFailure)
{ {
uint32 connectionFlags = FOR_DML; uint32 connectionFlags = FOR_DML;
char *nodeUser = CurrentUserName(); char *nodeUser = CurrentUserName();
@ -3464,6 +3543,29 @@ CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure)
return connection; return connection;
} }
/*
* If we exceeded citus.max_adaptive_executor_pool_size, we should re-use the
* existing connections to multiplex multiple COPY commands on shards over a
* single connection.
*/
char *nodeName = placement->nodeName;
int nodePort = placement->nodePort;
List *copyConnectionStateList =
ConnectionStateListToNode(connectionStateHash, nodeName, nodePort);
if (HasReachedAdaptiveExecutorPoolSize(copyConnectionStateList))
{
connection =
GetLeastUtilisedCopyConnection(copyConnectionStateList, nodeName, nodePort);
/*
* If we've already reached the executor pool size, there should be at
* least one connection to any given node.
*/
Assert(connection != NULL);
return connection;
}
/* /*
* For placements that haven't been assigned a connection by a previous command * For placements that haven't been assigned a connection by a previous command
* in the current transaction, we use a separate connection per placement for * in the current transaction, we use a separate connection per placement for
@ -3509,6 +3611,62 @@ CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure)
} }
/*
* HasReachedAdaptiveExecutorPoolSize returns true if the number of entries in input
* connection list has greater than or equal to citus.max_adaptive_executor_pool_size.
*/
static bool
HasReachedAdaptiveExecutorPoolSize(List *connectionStateList)
{
if (list_length(connectionStateList) >= MaxAdaptiveExecutorPoolSize)
{
/*
* We've not reached MaxAdaptiveExecutorPoolSize number of
* connections, so we're allowed to establish a new
* connection to the given node.
*/
return true;
}
return false;
}
/*
* GetLeastUtilisedCopyConnection returns a MultiConnection to the given node
* with the least number of placements assigned to it.
*/
static MultiConnection *
GetLeastUtilisedCopyConnection(List *connectionStateList, char *nodeName,
int nodePort)
{
MultiConnection *connection = NULL;
int minPlacementCount = INT32_MAX;
ListCell *connectionStateCell = NULL;
foreach(connectionStateCell, connectionStateList)
{
CopyConnectionState *connectionState = lfirst(connectionStateCell);
int currentConnectionPlacementCount = connectionState->bufferedPlacementCount;
if (connectionState->activePlacementState != NULL)
{
currentConnectionPlacementCount++;
}
Assert(currentConnectionPlacementCount > 0);
if (currentConnectionPlacementCount < minPlacementCount)
{
minPlacementCount = currentConnectionPlacementCount;
connection = connectionState->connection;
}
}
return connection;
}
/* /*
* StartPlacementStateCopyCommand sends the COPY for the given placement. It also * StartPlacementStateCopyCommand sends the COPY for the given placement. It also
* sends binary headers if this is a binary COPY. * sends binary headers if this is a binary COPY.

View File

@ -320,6 +320,26 @@ BEGIN;
(2 rows) (2 rows)
COMMIT; COMMIT;
BEGIN;
-- now allow at most 2 connections for COPY
SET LOCAL citus.max_adaptive_executor_pool_size TO 2;
COPY test FROM STDIN;
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
connection_count_to_node
---------------------------------------------------------------------
2
2
(2 rows)
ROLLBACK;
-- now show that when max_cached_conns_per_worker > 1 -- now show that when max_cached_conns_per_worker > 1
-- Citus forces the first execution to open at least 2 -- Citus forces the first execution to open at least 2
-- connections that are cached. Later, that 2 cached -- connections that are cached. Later, that 2 cached

View File

@ -190,6 +190,55 @@ BEGIN;
hostname, port; hostname, port;
COMMIT; COMMIT;
BEGIN;
-- now allow at most 2 connections for COPY
SET LOCAL citus.max_adaptive_executor_pool_size TO 2;
COPY test FROM STDIN;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
\.
SELECT
connection_count_to_node
FROM
citus_remote_connection_stats()
WHERE
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
database_name = 'regression'
ORDER BY
hostname, port;
ROLLBACK;
-- now show that when max_cached_conns_per_worker > 1 -- now show that when max_cached_conns_per_worker > 1
-- Citus forces the first execution to open at least 2 -- Citus forces the first execution to open at least 2