mirror of https://github.com/citusdata/citus.git
Replace MAX_PUT_COPY_DATA_BUFFER_SIZE by citus.remote_copy_flush_threshold GUC
parent
43f600bc46
commit
fbc2147e11
|
@ -25,7 +25,11 @@
|
||||||
#include "utils/palloc.h"
|
#include "utils/palloc.h"
|
||||||
|
|
||||||
|
|
||||||
#define MAX_PUT_COPY_DATA_BUFFER_SIZE (8 * 1024 * 1024)
|
/*
|
||||||
|
* Setting that controls how many bytes of COPY data libpq is allowed to buffer
|
||||||
|
* internally before we force a flush.
|
||||||
|
*/
|
||||||
|
int RemoteCopyFlushThreshold = 8 * 1024 * 1024;
|
||||||
|
|
||||||
|
|
||||||
/* GUC, determining whether statements sent to remote nodes are logged */
|
/* GUC, determining whether statements sent to remote nodes are logged */
|
||||||
|
@ -620,7 +624,7 @@ PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
connection->copyBytesWrittenSinceLastFlush += nbytes;
|
connection->copyBytesWrittenSinceLastFlush += nbytes;
|
||||||
if (connection->copyBytesWrittenSinceLastFlush > MAX_PUT_COPY_DATA_BUFFER_SIZE)
|
if (connection->copyBytesWrittenSinceLastFlush > RemoteCopyFlushThreshold)
|
||||||
{
|
{
|
||||||
connection->copyBytesWrittenSinceLastFlush = 0;
|
connection->copyBytesWrittenSinceLastFlush = 0;
|
||||||
return FinishConnectionIO(connection, allowInterrupts);
|
return FinishConnectionIO(connection, allowInterrupts);
|
||||||
|
|
|
@ -701,6 +701,19 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_NO_SHOW_ALL,
|
GUC_NO_SHOW_ALL,
|
||||||
NoticeIfSubqueryPushdownEnabled, NULL, NULL);
|
NoticeIfSubqueryPushdownEnabled, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomIntVariable(
|
||||||
|
"citus.remote_copy_flush_threshold",
|
||||||
|
gettext_noop("Sets the threshold for remote copy to be flushed."),
|
||||||
|
gettext_noop("When sending data over remote connections via the COPY protocol, "
|
||||||
|
"bytes are first buffered internally by libpq. If the number of "
|
||||||
|
"bytes buffered exceeds the threshold, Citus waits for all the "
|
||||||
|
"bytes to flush."),
|
||||||
|
&RemoteCopyFlushThreshold,
|
||||||
|
8 * 1024 * 1024, 0, INT_MAX,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_UNIT_BYTE | GUC_NO_SHOW_ALL,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomIntVariable(
|
DefineCustomIntVariable(
|
||||||
"citus.local_copy_flush_threshold",
|
"citus.local_copy_flush_threshold",
|
||||||
gettext_noop("Sets the threshold for local copy to be flushed."),
|
gettext_noop("Sets the threshold for local copy to be flushed."),
|
||||||
|
|
|
@ -21,6 +21,9 @@
|
||||||
/* GUC, determining whether statements sent to remote nodes are logged */
|
/* GUC, determining whether statements sent to remote nodes are logged */
|
||||||
extern bool LogRemoteCommands;
|
extern bool LogRemoteCommands;
|
||||||
|
|
||||||
|
/* GUC that determines the number of bytes after which remote COPY is flushed */
|
||||||
|
extern int RemoteCopyFlushThreshold;
|
||||||
|
|
||||||
|
|
||||||
/* simple helpers */
|
/* simple helpers */
|
||||||
extern bool IsResponseOK(PGresult *result);
|
extern bool IsResponseOK(PGresult *result);
|
||||||
|
|
|
@ -3059,6 +3059,13 @@ DO UPDATE
|
||||||
SET user_id = 42
|
SET user_id = 42
|
||||||
RETURNING user_id, value_1_agg;
|
RETURNING user_id, value_1_agg;
|
||||||
ERROR: modifying the partition value of rows is not allowed
|
ERROR: modifying the partition value of rows is not allowed
|
||||||
|
-- test a small citus.remote_copy_flush_threshold
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.remote_copy_flush_threshold TO 1;
|
||||||
|
INSERT INTO raw_events_first
|
||||||
|
SELECT * FROM raw_events_first OFFSET 0
|
||||||
|
ON CONFLICT DO NOTHING;
|
||||||
|
ABORT;
|
||||||
-- wrap in a transaction to improve performance
|
-- wrap in a transaction to improve performance
|
||||||
BEGIN;
|
BEGIN;
|
||||||
DROP TABLE coerce_events;
|
DROP TABLE coerce_events;
|
||||||
|
|
|
@ -2251,6 +2251,14 @@ DO UPDATE
|
||||||
SET user_id = 42
|
SET user_id = 42
|
||||||
RETURNING user_id, value_1_agg;
|
RETURNING user_id, value_1_agg;
|
||||||
|
|
||||||
|
-- test a small citus.remote_copy_flush_threshold
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.remote_copy_flush_threshold TO 1;
|
||||||
|
INSERT INTO raw_events_first
|
||||||
|
SELECT * FROM raw_events_first OFFSET 0
|
||||||
|
ON CONFLICT DO NOTHING;
|
||||||
|
ABORT;
|
||||||
|
|
||||||
-- wrap in a transaction to improve performance
|
-- wrap in a transaction to improve performance
|
||||||
BEGIN;
|
BEGIN;
|
||||||
DROP TABLE coerce_events;
|
DROP TABLE coerce_events;
|
||||||
|
|
Loading…
Reference in New Issue