Do not connection re-use for intermediate results

/*
 * Colocated intermediate results are just files and not required to use
 * the same connections with their co-located shards. So, we are free to
 * use any connection we can get.
 *
 * Also, the current connection re-use logic does not know how to handle
 * intermediate results as the intermediate results always truncates the
 * existing files. That's why, we use one connection per intermediate
 * result.
 */
pull/4683/head
Onder Kalaci 2021-02-10 13:42:58 +01:00
parent 01fb8f8124
commit 5d5a357487
3 changed files with 86 additions and 29 deletions

View File

@ -280,7 +280,8 @@ static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash,
copyOutState, bool isColocatedIntermediateResult); copyOutState, bool isColocatedIntermediateResult);
static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash, static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash,
ShardPlacement *placement, ShardPlacement *placement,
bool stopOnFailure); bool stopOnFailure,
bool colocatedIntermediateResult);
static bool HasReachedAdaptiveExecutorPoolSize(List *connectionStateHash); static bool HasReachedAdaptiveExecutorPoolSize(List *connectionStateHash);
static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateList, static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateList,
char *nodeName, int nodePort); char *nodeName, int nodePort);
@ -2291,7 +2292,9 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
/* define the template for the COPY statement that is sent to workers */ /* define the template for the COPY statement that is sent to workers */
CopyStmt *copyStatement = makeNode(CopyStmt); CopyStmt *copyStatement = makeNode(CopyStmt);
if (copyDest->colocatedIntermediateResultIdPrefix != NULL) bool colocatedIntermediateResults =
copyDest->colocatedIntermediateResultIdPrefix != NULL;
if (colocatedIntermediateResults)
{ {
copyStatement->relation = makeRangeVar(NULL, copyStatement->relation = makeRangeVar(NULL,
copyDest-> copyDest->
@ -2329,6 +2332,13 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML); RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML);
/*
* Colocated intermediate results do not honor citus.max_shared_pool_size,
* so we don't need to reserve any connections. Each result file is sent
* over a single connection.
*/
if (!colocatedIntermediateResults)
{
/* /*
* For all the primary (e.g., writable) remote nodes, reserve a shared * For all the primary (e.g., writable) remote nodes, reserve a shared
* connection. We do this upfront because we cannot know which nodes * connection. We do this upfront because we cannot know which nodes
@ -2343,6 +2353,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
* COPY would fail hinting the user to change the relevant settiing. * COPY would fail hinting the user to change the relevant settiing.
*/ */
EnsureConnectionPossibilityForRemotePrimaryNodes(); EnsureConnectionPossibilityForRemotePrimaryNodes();
}
LocalCopyStatus localCopyStatus = GetLocalCopyStatus(); LocalCopyStatus localCopyStatus = GetLocalCopyStatus();
if (localCopyStatus == LOCAL_COPY_DISABLED) if (localCopyStatus == LOCAL_COPY_DISABLED)
@ -3580,7 +3591,8 @@ InitializeCopyShardState(CopyShardState *shardState,
} }
MultiConnection *connection = MultiConnection *connection =
CopyGetPlacementConnection(connectionStateHash, placement, stopOnFailure); CopyGetPlacementConnection(connectionStateHash, placement, stopOnFailure,
colocatedIntermediateResult);
if (connection == NULL) if (connection == NULL)
{ {
failedPlacementCount++; failedPlacementCount++;
@ -3691,11 +3703,40 @@ LogLocalCopyToFileExecution(uint64 shardId)
* then it reuses the connection. Otherwise, it requests a connection for placement. * then it reuses the connection. Otherwise, it requests a connection for placement.
*/ */
static MultiConnection * static MultiConnection *
CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, bool CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
stopOnFailure) bool stopOnFailure, bool colocatedIntermediateResult)
{ {
uint32 connectionFlags = FOR_DML; if (colocatedIntermediateResult)
char *nodeUser = CurrentUserName(); {
/*
* Colocated intermediate results are just files and not required to use
* the same connections with their co-located shards. So, we are free to
* use any connection we can get.
*
* Also, the current connection re-use logic does not know how to handle
* intermediate results as the intermediate results always truncates the
* existing files. That's why we we use one connection per intermediate
* result.
*
* Also note that we are breaking the guarantees of citus.shared_pool_size
* as we cannot rely on optional connections.
*/
uint32 connectionFlagsForIntermediateResult = 0;
MultiConnection *connection =
GetNodeConnection(connectionFlagsForIntermediateResult, placement->nodeName,
placement->nodePort);
/*
* As noted above, we want each intermediate file to go over
* a separate connection.
*/
ClaimConnectionExclusively(connection);
/* and, we cannot afford to handle failures when anything goes wrong */
MarkRemoteTransactionCritical(connection);
return connection;
}
/* /*
* Determine whether the task has to be assigned to a particular connection * Determine whether the task has to be assigned to a particular connection
@ -3703,6 +3744,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
*/ */
ShardPlacementAccess *placementAccess = CreatePlacementAccess(placement, ShardPlacementAccess *placementAccess = CreatePlacementAccess(placement,
PLACEMENT_ACCESS_DML); PLACEMENT_ACCESS_DML);
uint32 connectionFlags = FOR_DML;
MultiConnection *connection = MultiConnection *connection =
GetConnectionIfPlacementAccessedInXact(connectionFlags, GetConnectionIfPlacementAccessedInXact(connectionFlags,
list_make1(placementAccess), NULL); list_make1(placementAccess), NULL);
@ -3791,6 +3833,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
connectionFlags |= REQUIRE_CLEAN_CONNECTION; connectionFlags |= REQUIRE_CLEAN_CONNECTION;
} }
char *nodeUser = CurrentUserName();
connection = GetPlacementConnection(connectionFlags, placement, nodeUser); connection = GetPlacementConnection(connectionFlags, placement, nodeUser);
if (connection == NULL) if (connection == NULL)
{ {

View File

@ -495,7 +495,7 @@ BEGIN;
(2 rows) (2 rows)
ROLLBACK; ROLLBACK;
-- INSERT SELECT with RETURNING/ON CONFLICT clauses should honor shared_pool_size -- INSERT SELECT with RETURNING/ON CONFLICT clauses does not honor shared_pool_size
-- in underlying COPY commands -- in underlying COPY commands
BEGIN; BEGIN;
SELECT pg_sleep(0.1); SELECT pg_sleep(0.1);
@ -504,7 +504,9 @@ BEGIN;
(1 row) (1 row)
INSERT INTO test SELECT i FROM generate_series(0,10) i RETURNING *; -- make sure that we hit at least 4 shards per node, where 20 rows
-- is enough
INSERT INTO test SELECT i FROM generate_series(0,20) i RETURNING *;
a a
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
@ -518,10 +520,20 @@ BEGIN;
8 8
9 9
10 10
(11 rows) 11
12
13
14
15
16
17
18
19
20
(21 rows)
SELECT SELECT
connection_count_to_node connection_count_to_node > current_setting('citus.max_shared_pool_size')::int
FROM FROM
citus_remote_connection_stats() citus_remote_connection_stats()
WHERE WHERE
@ -529,10 +541,10 @@ BEGIN;
database_name = 'regression' database_name = 'regression'
ORDER BY ORDER BY
hostname, port; hostname, port;
connection_count_to_node ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
3 t
3 t
(2 rows) (2 rows)
ROLLBACK; ROLLBACK;

View File

@ -340,14 +340,16 @@ BEGIN;
hostname, port; hostname, port;
ROLLBACK; ROLLBACK;
-- INSERT SELECT with RETURNING/ON CONFLICT clauses should honor shared_pool_size -- INSERT SELECT with RETURNING/ON CONFLICT clauses does not honor shared_pool_size
-- in underlying COPY commands -- in underlying COPY commands
BEGIN; BEGIN;
SELECT pg_sleep(0.1); SELECT pg_sleep(0.1);
INSERT INTO test SELECT i FROM generate_series(0,10) i RETURNING *; -- make sure that we hit at least 4 shards per node, where 20 rows
-- is enough
INSERT INTO test SELECT i FROM generate_series(0,20) i RETURNING *;
SELECT SELECT
connection_count_to_node connection_count_to_node > current_setting('citus.max_shared_pool_size')::int
FROM FROM
citus_remote_connection_stats() citus_remote_connection_stats()
WHERE WHERE