From 23951c562e803a00d298b5b74fab761100b4c3b4 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 10 Feb 2021 13:42:58 +0100 Subject: [PATCH] Do not connection re-use for intermediate results /* * Colocated intermediate results are just files and not required to use * the same connections with their co-located shards. So, we are free to * use any connection we can get. * * Also, the current connection re-use logic does not know how to handle * intermediate results as the intermediate results always truncates the * existing files. That's why, we use one connection per intermediate * result. */ (cherry picked from commit 5d5a3574875994bf35feedfd775ff4fdb6bf6d02) --- src/backend/distributed/commands/multi_copy.c | 70 +++++++++++++++---- .../expected/shared_connection_stats.out | 26 +++++-- .../regress/sql/shared_connection_stats.sql | 8 ++- 3 files changed, 80 insertions(+), 24 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 50ca597e3..593a3feb8 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -261,7 +261,8 @@ static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, copyOutState, bool isCopyToIntermediateFile); static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, - bool stopOnFailure); + bool stopOnFailure, + bool colocatedIntermediateResult); static bool HasReachedAdaptiveExecutorPoolSize(List *connectionStateHash); static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateList, char *nodeName, int nodePort); @@ -2253,8 +2254,9 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, /* define the template for the COPY statement that is sent to workers */ CopyStmt *copyStatement = makeNode(CopyStmt); - - if (copyDest->intermediateResultIdPrefix != NULL) + bool colocatedIntermediateResults = + copyDest->intermediateResultIdPrefix != NULL; + if (colocatedIntermediateResults) { copyStatement->relation = makeRangeVar(NULL, copyDest->intermediateResultIdPrefix, -1); @@ -2291,13 +2293,21 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML); /* - * For all the primary (e.g., writable) nodes, reserve a shared connection. - * We do this upfront because we cannot know which nodes are going to be - * accessed. Since the order of the reservation is important, we need to - * do it right here. For the details on why the order important, see - * the function. + * Colocated intermediate results do not honor citus.max_shared_pool_size, + * so we don't need to reserve any connections. Each result file is sent + * over a single connection. */ - EnsureConnectionPossibilityForPrimaryNodes(); + if (!colocatedIntermediateResults) + { + /* + * For all the primary (e.g., writable) nodes, reserve a shared connection. + * We do this upfront because we cannot know which nodes are going to be + * accessed. Since the order of the reservation is important, we need to + * do it right here. For the details on why the order important, see + * the function. + */ + EnsureConnectionPossibilityForPrimaryNodes(); + } } @@ -3438,7 +3448,8 @@ InitializeCopyShardState(CopyShardState *shardState, } MultiConnection *connection = - CopyGetPlacementConnection(connectionStateHash, placement, stopOnFailure); + CopyGetPlacementConnection(connectionStateHash, placement, stopOnFailure, + isCopyToIntermediateFile); if (connection == NULL) { failedPlacementCount++; @@ -3534,11 +3545,40 @@ LogLocalCopyExecution(uint64 shardId) * then it reuses the connection. Otherwise, it requests a connection for placement. */ static MultiConnection * -CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, bool - stopOnFailure) +CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, + bool stopOnFailure, bool colocatedIntermediateResult) { - uint32 connectionFlags = FOR_DML; - char *nodeUser = CurrentUserName(); + if (colocatedIntermediateResult) + { + /* + * Colocated intermediate results are just files and not required to use + * the same connections with their co-located shards. So, we are free to + * use any connection we can get. + * + * Also, the current connection re-use logic does not know how to handle + * intermediate results as the intermediate results always truncates the + * existing files. That's why we we use one connection per intermediate + * result. + * + * Also note that we are breaking the guarantees of citus.shared_pool_size + * as we cannot rely on optional connections. + */ + uint32 connectionFlagsForIntermediateResult = 0; + MultiConnection *connection = + GetNodeConnection(connectionFlagsForIntermediateResult, placement->nodeName, + placement->nodePort); + + /* + * As noted above, we want each intermediate file to go over + * a separate connection. + */ + ClaimConnectionExclusively(connection); + + /* and, we cannot afford to handle failures when anything goes wrong */ + MarkRemoteTransactionCritical(connection); + + return connection; + } /* * Determine whether the task has to be assigned to a particular connection @@ -3546,6 +3586,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, */ ShardPlacementAccess *placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_DML); + uint32 connectionFlags = FOR_DML; MultiConnection *connection = GetConnectionIfPlacementAccessedInXact(connectionFlags, list_make1(placementAccess), NULL); @@ -3634,6 +3675,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, connectionFlags |= REQUIRE_CLEAN_CONNECTION; } + char *nodeUser = CurrentUserName(); connection = GetPlacementConnection(connectionFlags, placement, nodeUser); if (connection == NULL) { diff --git a/src/test/regress/expected/shared_connection_stats.out b/src/test/regress/expected/shared_connection_stats.out index 72b654488..04876601f 100644 --- a/src/test/regress/expected/shared_connection_stats.out +++ b/src/test/regress/expected/shared_connection_stats.out @@ -493,7 +493,7 @@ BEGIN; (2 rows) ROLLBACK; --- INSERT SELECT with RETURNING/ON CONFLICT clauses should honor shared_pool_size +-- INSERT SELECT with RETURNING/ON CONFLICT clauses does not honor shared_pool_size -- in underlying COPY commands BEGIN; SELECT pg_sleep(0.1); @@ -502,7 +502,9 @@ BEGIN; (1 row) - INSERT INTO test SELECT i FROM generate_series(0,10) i RETURNING *; + -- make sure that we hit at least 4 shards per node, where 20 rows + -- is enough + INSERT INTO test SELECT i FROM generate_series(0,20) i RETURNING *; a --------------------------------------------------------------------- 0 @@ -516,10 +518,20 @@ BEGIN; 8 9 10 -(11 rows) + 11 + 12 + 13 + 14 + 15 + 16 + 17 + 18 + 19 + 20 +(21 rows) SELECT - connection_count_to_node + connection_count_to_node > current_setting('citus.max_shared_pool_size')::int FROM citus_remote_connection_stats() WHERE @@ -527,10 +539,10 @@ BEGIN; database_name = 'regression' ORDER BY hostname, port; - connection_count_to_node + ?column? --------------------------------------------------------------------- - 3 - 3 + t + t (2 rows) ROLLBACK; diff --git a/src/test/regress/sql/shared_connection_stats.sql b/src/test/regress/sql/shared_connection_stats.sql index 7488f2d88..45a89ebb0 100644 --- a/src/test/regress/sql/shared_connection_stats.sql +++ b/src/test/regress/sql/shared_connection_stats.sql @@ -338,14 +338,16 @@ BEGIN; hostname, port; ROLLBACK; --- INSERT SELECT with RETURNING/ON CONFLICT clauses should honor shared_pool_size +-- INSERT SELECT with RETURNING/ON CONFLICT clauses does not honor shared_pool_size -- in underlying COPY commands BEGIN; SELECT pg_sleep(0.1); - INSERT INTO test SELECT i FROM generate_series(0,10) i RETURNING *; + -- make sure that we hit at least 4 shards per node, where 20 rows + -- is enough + INSERT INTO test SELECT i FROM generate_series(0,20) i RETURNING *; SELECT - connection_count_to_node + connection_count_to_node > current_setting('citus.max_shared_pool_size')::int FROM citus_remote_connection_stats() WHERE