diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index da484a13b..b4d68299e 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -25,7 +25,11 @@ #include "utils/palloc.h" -#define MAX_PUT_COPY_DATA_BUFFER_SIZE (8 * 1024 * 1024) +/* + * Setting that controls how many bytes of COPY data libpq is allowed to buffer + * internally before we force a flush. + */ +int RemoteCopyFlushThreshold = 8 * 1024 * 1024; /* GUC, determining whether statements sent to remote nodes are logged */ @@ -620,7 +624,7 @@ PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes) */ connection->copyBytesWrittenSinceLastFlush += nbytes; - if (connection->copyBytesWrittenSinceLastFlush > MAX_PUT_COPY_DATA_BUFFER_SIZE) + if (connection->copyBytesWrittenSinceLastFlush > RemoteCopyFlushThreshold) { connection->copyBytesWrittenSinceLastFlush = 0; return FinishConnectionIO(connection, allowInterrupts); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index efa467801..b0499c8d8 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -701,6 +701,19 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NoticeIfSubqueryPushdownEnabled, NULL, NULL); + DefineCustomIntVariable( + "citus.remote_copy_flush_threshold", + gettext_noop("Sets the threshold for remote copy to be flushed."), + gettext_noop("When sending data over remote connections via the COPY protocol, " + "bytes are first buffered internally by libpq. If the number of " + "bytes buffered exceeds the threshold, Citus waits for all the " + "bytes to flush."), + &RemoteCopyFlushThreshold, + 8 * 1024 * 1024, 0, INT_MAX, + PGC_USERSET, + GUC_UNIT_BYTE | GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.local_copy_flush_threshold", gettext_noop("Sets the threshold for local copy to be flushed."), diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index b4b7c11e9..4463ad8f0 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -21,6 +21,9 @@ /* GUC, determining whether statements sent to remote nodes are logged */ extern bool LogRemoteCommands; +/* GUC that determines the number of bytes after which remote COPY is flushed */ +extern int RemoteCopyFlushThreshold; + /* simple helpers */ extern bool IsResponseOK(PGresult *result); diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 6d2343960..e556d3e8e 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -3059,6 +3059,13 @@ DO UPDATE SET user_id = 42 RETURNING user_id, value_1_agg; ERROR: modifying the partition value of rows is not allowed +-- test a small citus.remote_copy_flush_threshold +BEGIN; +SET LOCAL citus.remote_copy_flush_threshold TO 1; +INSERT INTO raw_events_first +SELECT * FROM raw_events_first OFFSET 0 +ON CONFLICT DO NOTHING; +ABORT; -- wrap in a transaction to improve performance BEGIN; DROP TABLE coerce_events; diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index af63780b6..2ab82dad4 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -2251,6 +2251,14 @@ DO UPDATE SET user_id = 42 RETURNING user_id, value_1_agg; +-- test a small citus.remote_copy_flush_threshold +BEGIN; +SET LOCAL citus.remote_copy_flush_threshold TO 1; +INSERT INTO raw_events_first +SELECT * FROM raw_events_first OFFSET 0 +ON CONFLICT DO NOTHING; +ABORT; + -- wrap in a transaction to improve performance BEGIN; DROP TABLE coerce_events;