Add regression tests for COPY into colocated intermediate results

To add the tests without too much data, make the copy switchover
configurable.
pull/4683/head
Onder Kalaci 2021-02-10 16:46:31 +01:00
parent 5d5a357487
commit f297c96ec5
5 changed files with 75 additions and 2 deletions

View File

@ -121,7 +121,7 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
* 4MB is a good balance between memory usage and performance. Note that this * 4MB is a good balance between memory usage and performance. Note that this
* is irrelevant in the common case where we open one connection per placement. * is irrelevant in the common case where we open one connection per placement.
*/ */
#define COPY_SWITCH_OVER_THRESHOLD (4 * 1024 * 1024) int CopySwitchOverThresholdBytes = 4 * 1024 * 1024;
#define FILE_IS_OPEN(x) (x > -1) #define FILE_IS_OPEN(x) (x > -1)
@ -2513,7 +2513,7 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
switchToCurrentPlacement = true; switchToCurrentPlacement = true;
} }
else if (currentPlacementState != activePlacementState && else if (currentPlacementState != activePlacementState &&
currentPlacementState->data->len > COPY_SWITCH_OVER_THRESHOLD) currentPlacementState->data->len > CopySwitchOverThresholdBytes)
{ {
switchToCurrentPlacement = true; switchToCurrentPlacement = true;

View File

@ -575,6 +575,22 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.copy_switchover_threshold",
gettext_noop("Sets the threshold for copy to be switched "
"over per connection."),
gettext_noop("Data size threshold to switch over the active placement for "
"a connection. If this is too low, overhead of starting COPY "
"commands will hurt the performance. If this is too high, "
"buffered data will use lots of memory. 4MB is a good balance "
"between memory usage and performance. Note that this is irrelevant "
"in the common case where we open one connection per placement."),
&CopySwitchOverThresholdBytes,
4 * 1024 * 1024, 1, INT_MAX,
PGC_USERSET,
GUC_UNIT_BYTE | GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.enable_local_execution", "citus.enable_local_execution",
gettext_noop("Enables queries on shards that are local to the current node " gettext_noop("Enables queries on shards that are local to the current node "

View File

@ -141,6 +141,10 @@ typedef struct CitusCopyDestReceiver
} CitusCopyDestReceiver; } CitusCopyDestReceiver;
/* managed via GUC, the default is 4MB */
extern int CopySwitchOverThresholdBytes;
/* function declarations for copying into a distributed table */ /* function declarations for copying into a distributed table */
extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
List *columnNameList, List *columnNameList,

View File

@ -543,6 +543,40 @@ SELECT * FROM target_table ORDER BY 1;
10 | 0 10 | 0
(10 rows) (10 rows)
-- make sure that even if COPY switchover happens
-- the results are correct
SET citus.copy_switchover_threshold TO 1;
TRUNCATE target_table;
-- load some data to make sure copy commands switch over connections
INSERT INTO target_table SELECT i,0 FROM generate_series(0,500)i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- make sure that SELECT only uses 1 connection 1 node
-- yet still COPY commands use 1 connection per co-located
-- intermediate result file
SET citus.max_adaptive_executor_pool_size TO 1;
INSERT INTO target_table SELECT * FROM target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: push down of limit count: 10000
DEBUG: Collecting INSERT ... SELECT results on coordinator
SELECT DISTINCT col_2 FROM target_table;
col_2
---------------------------------------------------------------------
1
(1 row)
WITH cte_1 AS (INSERT INTO target_table SELECT * FROM target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1 RETURNING *)
SELECT DISTINCT col_2 FROM cte_1;
DEBUG: generating subplan XXX_1 for CTE cte_1: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM on_conflict.target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = (excluded.col_2 OPERATOR(pg_catalog.+) 1) RETURNING target_table.col_1, target_table.col_2
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: push down of limit count: 10000
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT DISTINCT col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_1
DEBUG: Collecting INSERT ... SELECT results on coordinator
col_2
---------------------------------------------------------------------
2
(1 row)
RESET client_min_messages; RESET client_min_messages;
DROP SCHEMA on_conflict CASCADE; DROP SCHEMA on_conflict CASCADE;
NOTICE: drop cascades to 7 other objects NOTICE: drop cascades to 7 other objects

View File

@ -314,5 +314,24 @@ WITH cte AS(
INSERT INTO target_table SELECT * FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1; INSERT INTO target_table SELECT * FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
SELECT * FROM target_table ORDER BY 1; SELECT * FROM target_table ORDER BY 1;
-- make sure that even if COPY switchover happens
-- the results are correct
SET citus.copy_switchover_threshold TO 1;
TRUNCATE target_table;
-- load some data to make sure copy commands switch over connections
INSERT INTO target_table SELECT i,0 FROM generate_series(0,500)i;
-- make sure that SELECT only uses 1 connection 1 node
-- yet still COPY commands use 1 connection per co-located
-- intermediate result file
SET citus.max_adaptive_executor_pool_size TO 1;
INSERT INTO target_table SELECT * FROM target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
SELECT DISTINCT col_2 FROM target_table;
WITH cte_1 AS (INSERT INTO target_table SELECT * FROM target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1 RETURNING *)
SELECT DISTINCT col_2 FROM cte_1;
RESET client_min_messages; RESET client_min_messages;
DROP SCHEMA on_conflict CASCADE; DROP SCHEMA on_conflict CASCADE;