Replace MAX_PUT_COPY_DATA_BUFFER_SIZE by citus.remote_copy_flush_threshold GUC

(cherry picked from commit fbc2147e11)
pull/5009/head
Marco Slot 2021-03-15 21:01:55 +01:00 committed by Sait Talha Nisanci
parent f5608c2769
commit 61efc87c53
5 changed files with 37 additions and 2 deletions

View File

@ -25,7 +25,11 @@
#include "utils/palloc.h" #include "utils/palloc.h"
#define MAX_PUT_COPY_DATA_BUFFER_SIZE (8 * 1024 * 1024) /*
* Setting that controls how many bytes of COPY data libpq is allowed to buffer
* internally before we force a flush.
*/
int RemoteCopyFlushThreshold = 8 * 1024 * 1024;
/* GUC, determining whether statements sent to remote nodes are logged */ /* GUC, determining whether statements sent to remote nodes are logged */
@ -620,7 +624,7 @@ PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes)
*/ */
connection->copyBytesWrittenSinceLastFlush += nbytes; connection->copyBytesWrittenSinceLastFlush += nbytes;
if (connection->copyBytesWrittenSinceLastFlush > MAX_PUT_COPY_DATA_BUFFER_SIZE) if (connection->copyBytesWrittenSinceLastFlush > RemoteCopyFlushThreshold)
{ {
connection->copyBytesWrittenSinceLastFlush = 0; connection->copyBytesWrittenSinceLastFlush = 0;
return FinishConnectionIO(connection, allowInterrupts); return FinishConnectionIO(connection, allowInterrupts);

View File

@ -701,6 +701,19 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL, GUC_NO_SHOW_ALL,
NoticeIfSubqueryPushdownEnabled, NULL, NULL); NoticeIfSubqueryPushdownEnabled, NULL, NULL);
DefineCustomIntVariable(
"citus.remote_copy_flush_threshold",
gettext_noop("Sets the threshold for remote copy to be flushed."),
gettext_noop("When sending data over remote connections via the COPY protocol, "
"bytes are first buffered internally by libpq. If the number of "
"bytes buffered exceeds the threshold, Citus waits for all the "
"bytes to flush."),
&RemoteCopyFlushThreshold,
8 * 1024 * 1024, 0, INT_MAX,
PGC_USERSET,
GUC_UNIT_BYTE | GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.local_copy_flush_threshold", "citus.local_copy_flush_threshold",
gettext_noop("Sets the threshold for local copy to be flushed."), gettext_noop("Sets the threshold for local copy to be flushed."),

View File

@ -21,6 +21,9 @@
/* GUC, determining whether statements sent to remote nodes are logged */ /* GUC, determining whether statements sent to remote nodes are logged */
extern bool LogRemoteCommands; extern bool LogRemoteCommands;
/* GUC that determines the number of bytes after which remote COPY is flushed */
extern int RemoteCopyFlushThreshold;
/* simple helpers */ /* simple helpers */
extern bool IsResponseOK(PGresult *result); extern bool IsResponseOK(PGresult *result);

View File

@ -3059,6 +3059,13 @@ DO UPDATE
SET user_id = 42 SET user_id = 42
RETURNING user_id, value_1_agg; RETURNING user_id, value_1_agg;
ERROR: modifying the partition value of rows is not allowed ERROR: modifying the partition value of rows is not allowed
-- test a small citus.remote_copy_flush_threshold
BEGIN;
SET LOCAL citus.remote_copy_flush_threshold TO 1;
INSERT INTO raw_events_first
SELECT * FROM raw_events_first OFFSET 0
ON CONFLICT DO NOTHING;
ABORT;
-- wrap in a transaction to improve performance -- wrap in a transaction to improve performance
BEGIN; BEGIN;
DROP TABLE coerce_events; DROP TABLE coerce_events;

View File

@ -2251,6 +2251,14 @@ DO UPDATE
SET user_id = 42 SET user_id = 42
RETURNING user_id, value_1_agg; RETURNING user_id, value_1_agg;
-- test a small citus.remote_copy_flush_threshold
BEGIN;
SET LOCAL citus.remote_copy_flush_threshold TO 1;
INSERT INTO raw_events_first
SELECT * FROM raw_events_first OFFSET 0
ON CONFLICT DO NOTHING;
ABORT;
-- wrap in a transaction to improve performance -- wrap in a transaction to improve performance
BEGIN; BEGIN;
DROP TABLE coerce_events; DROP TABLE coerce_events;