From 5d5a3574875994bf35feedfd775ff4fdb6bf6d02 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 10 Feb 2021 13:42:58 +0100 Subject: [PATCH 1/2] Do not connection re-use for intermediate results /* * Colocated intermediate results are just files and not required to use * the same connections with their co-located shards. So, we are free to * use any connection we can get. * * Also, the current connection re-use logic does not know how to handle * intermediate results as the intermediate results always truncates the * existing files. That's why, we use one connection per intermediate * result. */ --- src/backend/distributed/commands/multi_copy.c | 81 ++++++++++++++----- .../expected/shared_connection_stats.out | 26 ++++-- .../regress/sql/shared_connection_stats.sql | 8 +- 3 files changed, 86 insertions(+), 29 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 9824a1fe5..c912e67ba 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -280,7 +280,8 @@ static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, copyOutState, bool isColocatedIntermediateResult); static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, - bool stopOnFailure); + bool stopOnFailure, + bool colocatedIntermediateResult); static bool HasReachedAdaptiveExecutorPoolSize(List *connectionStateHash); static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateList, char *nodeName, int nodePort); @@ -2291,7 +2292,9 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, /* define the template for the COPY statement that is sent to workers */ CopyStmt *copyStatement = makeNode(CopyStmt); - if (copyDest->colocatedIntermediateResultIdPrefix != NULL) + bool colocatedIntermediateResults = + copyDest->colocatedIntermediateResultIdPrefix != NULL; + if (colocatedIntermediateResults) { copyStatement->relation = makeRangeVar(NULL, copyDest-> @@ -2330,19 +2333,27 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML); /* - * For all the primary (e.g., writable) remote nodes, reserve a shared - * connection. We do this upfront because we cannot know which nodes - * are going to be accessed. Since the order of the reservation is - * important, we need to do it right here. For the details on why the - * order important, see EnsureConnectionPossibilityForNodeList(). - * - * We don't need to care about local node because we either get a - * connection or use local connection, so it cannot be part of - * the starvation. As an edge case, if it cannot get a connection - * and cannot switch to local execution (e.g., disabled by user), - * COPY would fail hinting the user to change the relevant settiing. + * Colocated intermediate results do not honor citus.max_shared_pool_size, + * so we don't need to reserve any connections. Each result file is sent + * over a single connection. */ - EnsureConnectionPossibilityForRemotePrimaryNodes(); + if (!colocatedIntermediateResults) + { + /* + * For all the primary (e.g., writable) remote nodes, reserve a shared + * connection. We do this upfront because we cannot know which nodes + * are going to be accessed. Since the order of the reservation is + * important, we need to do it right here. For the details on why the + * order important, see EnsureConnectionPossibilityForNodeList(). + * + * We don't need to care about local node because we either get a + * connection or use local connection, so it cannot be part of + * the starvation. As an edge case, if it cannot get a connection + * and cannot switch to local execution (e.g., disabled by user), + * COPY would fail hinting the user to change the relevant settiing. + */ + EnsureConnectionPossibilityForRemotePrimaryNodes(); + } LocalCopyStatus localCopyStatus = GetLocalCopyStatus(); if (localCopyStatus == LOCAL_COPY_DISABLED) @@ -3580,7 +3591,8 @@ InitializeCopyShardState(CopyShardState *shardState, } MultiConnection *connection = - CopyGetPlacementConnection(connectionStateHash, placement, stopOnFailure); + CopyGetPlacementConnection(connectionStateHash, placement, stopOnFailure, + colocatedIntermediateResult); if (connection == NULL) { failedPlacementCount++; @@ -3691,11 +3703,40 @@ LogLocalCopyToFileExecution(uint64 shardId) * then it reuses the connection. Otherwise, it requests a connection for placement. */ static MultiConnection * -CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, bool - stopOnFailure) +CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, + bool stopOnFailure, bool colocatedIntermediateResult) { - uint32 connectionFlags = FOR_DML; - char *nodeUser = CurrentUserName(); + if (colocatedIntermediateResult) + { + /* + * Colocated intermediate results are just files and not required to use + * the same connections with their co-located shards. So, we are free to + * use any connection we can get. + * + * Also, the current connection re-use logic does not know how to handle + * intermediate results as the intermediate results always truncates the + * existing files. That's why we we use one connection per intermediate + * result. + * + * Also note that we are breaking the guarantees of citus.shared_pool_size + * as we cannot rely on optional connections. + */ + uint32 connectionFlagsForIntermediateResult = 0; + MultiConnection *connection = + GetNodeConnection(connectionFlagsForIntermediateResult, placement->nodeName, + placement->nodePort); + + /* + * As noted above, we want each intermediate file to go over + * a separate connection. + */ + ClaimConnectionExclusively(connection); + + /* and, we cannot afford to handle failures when anything goes wrong */ + MarkRemoteTransactionCritical(connection); + + return connection; + } /* * Determine whether the task has to be assigned to a particular connection @@ -3703,6 +3744,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, */ ShardPlacementAccess *placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_DML); + uint32 connectionFlags = FOR_DML; MultiConnection *connection = GetConnectionIfPlacementAccessedInXact(connectionFlags, list_make1(placementAccess), NULL); @@ -3791,6 +3833,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, connectionFlags |= REQUIRE_CLEAN_CONNECTION; } + char *nodeUser = CurrentUserName(); connection = GetPlacementConnection(connectionFlags, placement, nodeUser); if (connection == NULL) { diff --git a/src/test/regress/expected/shared_connection_stats.out b/src/test/regress/expected/shared_connection_stats.out index 68be0f760..7f9c84da5 100644 --- a/src/test/regress/expected/shared_connection_stats.out +++ b/src/test/regress/expected/shared_connection_stats.out @@ -495,7 +495,7 @@ BEGIN; (2 rows) ROLLBACK; --- INSERT SELECT with RETURNING/ON CONFLICT clauses should honor shared_pool_size +-- INSERT SELECT with RETURNING/ON CONFLICT clauses does not honor shared_pool_size -- in underlying COPY commands BEGIN; SELECT pg_sleep(0.1); @@ -504,7 +504,9 @@ BEGIN; (1 row) - INSERT INTO test SELECT i FROM generate_series(0,10) i RETURNING *; + -- make sure that we hit at least 4 shards per node, where 20 rows + -- is enough + INSERT INTO test SELECT i FROM generate_series(0,20) i RETURNING *; a --------------------------------------------------------------------- 0 @@ -518,10 +520,20 @@ BEGIN; 8 9 10 -(11 rows) + 11 + 12 + 13 + 14 + 15 + 16 + 17 + 18 + 19 + 20 +(21 rows) SELECT - connection_count_to_node + connection_count_to_node > current_setting('citus.max_shared_pool_size')::int FROM citus_remote_connection_stats() WHERE @@ -529,10 +541,10 @@ BEGIN; database_name = 'regression' ORDER BY hostname, port; - connection_count_to_node + ?column? --------------------------------------------------------------------- - 3 - 3 + t + t (2 rows) ROLLBACK; diff --git a/src/test/regress/sql/shared_connection_stats.sql b/src/test/regress/sql/shared_connection_stats.sql index 1aa904ff2..dbb988315 100644 --- a/src/test/regress/sql/shared_connection_stats.sql +++ b/src/test/regress/sql/shared_connection_stats.sql @@ -340,14 +340,16 @@ BEGIN; hostname, port; ROLLBACK; --- INSERT SELECT with RETURNING/ON CONFLICT clauses should honor shared_pool_size +-- INSERT SELECT with RETURNING/ON CONFLICT clauses does not honor shared_pool_size -- in underlying COPY commands BEGIN; SELECT pg_sleep(0.1); - INSERT INTO test SELECT i FROM generate_series(0,10) i RETURNING *; + -- make sure that we hit at least 4 shards per node, where 20 rows + -- is enough + INSERT INTO test SELECT i FROM generate_series(0,20) i RETURNING *; SELECT - connection_count_to_node + connection_count_to_node > current_setting('citus.max_shared_pool_size')::int FROM citus_remote_connection_stats() WHERE From f297c96ec56df51303ac057c000a9f03a42951e0 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 10 Feb 2021 16:46:31 +0100 Subject: [PATCH 2/2] Add regression tests for COPY into colocated intermediate results To add the tests without too much data, make the copy switchover configurable. --- src/backend/distributed/commands/multi_copy.c | 4 +-- src/backend/distributed/shared_library_init.c | 16 +++++++++ src/include/distributed/commands/multi_copy.h | 4 +++ .../expected/multi_insert_select_conflict.out | 34 +++++++++++++++++++ .../sql/multi_insert_select_conflict.sql | 19 +++++++++++ 5 files changed, 75 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index c912e67ba..c584ac96f 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -121,7 +121,7 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; * 4MB is a good balance between memory usage and performance. Note that this * is irrelevant in the common case where we open one connection per placement. */ -#define COPY_SWITCH_OVER_THRESHOLD (4 * 1024 * 1024) +int CopySwitchOverThresholdBytes = 4 * 1024 * 1024; #define FILE_IS_OPEN(x) (x > -1) @@ -2513,7 +2513,7 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest switchToCurrentPlacement = true; } else if (currentPlacementState != activePlacementState && - currentPlacementState->data->len > COPY_SWITCH_OVER_THRESHOLD) + currentPlacementState->data->len > CopySwitchOverThresholdBytes) { switchToCurrentPlacement = true; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index eec8356db..e5de6c689 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -575,6 +575,22 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.copy_switchover_threshold", + gettext_noop("Sets the threshold for copy to be switched " + "over per connection."), + gettext_noop("Data size threshold to switch over the active placement for " + "a connection. If this is too low, overhead of starting COPY " + "commands will hurt the performance. If this is too high, " + "buffered data will use lots of memory. 4MB is a good balance " + "between memory usage and performance. Note that this is irrelevant " + "in the common case where we open one connection per placement."), + &CopySwitchOverThresholdBytes, + 4 * 1024 * 1024, 1, INT_MAX, + PGC_USERSET, + GUC_UNIT_BYTE | GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_local_execution", gettext_noop("Enables queries on shards that are local to the current node " diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index 996cb8c97..a5f414208 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -141,6 +141,10 @@ typedef struct CitusCopyDestReceiver } CitusCopyDestReceiver; +/* managed via GUC, the default is 4MB */ +extern int CopySwitchOverThresholdBytes; + + /* function declarations for copying into a distributed table */ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, List *columnNameList, diff --git a/src/test/regress/expected/multi_insert_select_conflict.out b/src/test/regress/expected/multi_insert_select_conflict.out index f9145b0a6..1810fa69f 100644 --- a/src/test/regress/expected/multi_insert_select_conflict.out +++ b/src/test/regress/expected/multi_insert_select_conflict.out @@ -543,6 +543,40 @@ SELECT * FROM target_table ORDER BY 1; 10 | 0 (10 rows) +-- make sure that even if COPY switchover happens +-- the results are correct +SET citus.copy_switchover_threshold TO 1; +TRUNCATE target_table; +-- load some data to make sure copy commands switch over connections +INSERT INTO target_table SELECT i,0 FROM generate_series(0,500)i; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- make sure that SELECT only uses 1 connection 1 node +-- yet still COPY commands use 1 connection per co-located +-- intermediate result file +SET citus.max_adaptive_executor_pool_size TO 1; +INSERT INTO target_table SELECT * FROM target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1; +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: push down of limit count: 10000 +DEBUG: Collecting INSERT ... SELECT results on coordinator +SELECT DISTINCT col_2 FROM target_table; + col_2 +--------------------------------------------------------------------- + 1 +(1 row) + +WITH cte_1 AS (INSERT INTO target_table SELECT * FROM target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1 RETURNING *) +SELECT DISTINCT col_2 FROM cte_1; +DEBUG: generating subplan XXX_1 for CTE cte_1: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM on_conflict.target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = (excluded.col_2 OPERATOR(pg_catalog.+) 1) RETURNING target_table.col_1, target_table.col_2 +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: push down of limit count: 10000 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT DISTINCT col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_1 +DEBUG: Collecting INSERT ... SELECT results on coordinator + col_2 +--------------------------------------------------------------------- + 2 +(1 row) + RESET client_min_messages; DROP SCHEMA on_conflict CASCADE; NOTICE: drop cascades to 7 other objects diff --git a/src/test/regress/sql/multi_insert_select_conflict.sql b/src/test/regress/sql/multi_insert_select_conflict.sql index cd0057fe0..bcbe278e9 100644 --- a/src/test/regress/sql/multi_insert_select_conflict.sql +++ b/src/test/regress/sql/multi_insert_select_conflict.sql @@ -314,5 +314,24 @@ WITH cte AS( INSERT INTO target_table SELECT * FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1; SELECT * FROM target_table ORDER BY 1; +-- make sure that even if COPY switchover happens +-- the results are correct +SET citus.copy_switchover_threshold TO 1; +TRUNCATE target_table; + +-- load some data to make sure copy commands switch over connections +INSERT INTO target_table SELECT i,0 FROM generate_series(0,500)i; + +-- make sure that SELECT only uses 1 connection 1 node +-- yet still COPY commands use 1 connection per co-located +-- intermediate result file +SET citus.max_adaptive_executor_pool_size TO 1; + +INSERT INTO target_table SELECT * FROM target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1; +SELECT DISTINCT col_2 FROM target_table; + +WITH cte_1 AS (INSERT INTO target_table SELECT * FROM target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1 RETURNING *) +SELECT DISTINCT col_2 FROM cte_1; + RESET client_min_messages; DROP SCHEMA on_conflict CASCADE;