mirror of https://github.com/citusdata/citus.git
Reduce the frequency of FinishConnectionIO calls during COPY (#1864)
parent
b1d670bdb6
commit
f91ea6fc59
|
@ -719,6 +719,9 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
|||
ResetRemoteTransaction(connection);
|
||||
ResetShardPlacementAssociation(connection);
|
||||
|
||||
/* reset copy state */
|
||||
connection->copyBytesWrittenSinceLastFlush = 0;
|
||||
|
||||
UnclaimConnection(connection);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,9 @@
|
|||
#include "utils/palloc.h"
|
||||
|
||||
|
||||
#define MAX_PUT_COPY_DATA_BUFFER_SIZE (8 * 1024 * 1024)
|
||||
|
||||
|
||||
/* GUC, determining whether statements sent to remote nodes are logged */
|
||||
bool LogRemoteCommands = false;
|
||||
|
||||
|
@ -580,13 +583,22 @@ PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes)
|
|||
* until the socket is writable to prevent the internal libpq buffers
|
||||
* from growing excessively.
|
||||
*
|
||||
* In the future, we could reduce the frequency of these pushbacks to
|
||||
* achieve higher throughput.
|
||||
* We currently allow the internal buffer to grow to 8MB before
|
||||
* providing back pressure based on experimentation that showed
|
||||
* throughput get worse at 4MB and lower due to the number of CPU
|
||||
* cycles spent in networking system calls.
|
||||
*/
|
||||
|
||||
connection->copyBytesWrittenSinceLastFlush += nbytes;
|
||||
if (connection->copyBytesWrittenSinceLastFlush > MAX_PUT_COPY_DATA_BUFFER_SIZE)
|
||||
{
|
||||
connection->copyBytesWrittenSinceLastFlush = 0;
|
||||
return FinishConnectionIO(connection, allowInterrupts);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PutRemoteCopyEnd is a wrapper around PQputCopyEnd() that handles
|
||||
|
@ -616,6 +628,8 @@ PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg)
|
|||
|
||||
/* see PutRemoteCopyData() */
|
||||
|
||||
connection->copyBytesWrittenSinceLastFlush = 0;
|
||||
|
||||
return FinishConnectionIO(connection, allowInterrupts);
|
||||
}
|
||||
|
||||
|
|
|
@ -79,6 +79,9 @@ typedef struct MultiConnection
|
|||
|
||||
/* list of all placements referenced by this connection */
|
||||
dlist_head referencedPlacements;
|
||||
|
||||
/* number of bytes sent to PQputCopyData() since last flush */
|
||||
uint64 copyBytesWrittenSinceLastFlush;
|
||||
} MultiConnection;
|
||||
|
||||
|
||||
|
|
|
@ -805,3 +805,10 @@ DROP TABLE drop_copy_test_table;
|
|||
\c - - - :worker_1_port
|
||||
SELECT relname FROM pg_class WHERE relname LIKE 'tt1%';
|
||||
\c - - - :master_port
|
||||
|
||||
-- copy >8MB to a single worker to trigger a flush in PutRemoteCopyData
|
||||
BEGIN;
|
||||
CREATE UNLOGGED TABLE trigger_flush AS
|
||||
SELECT 1 AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,150000) s;
|
||||
SELECT create_distributed_table('trigger_flush','a');
|
||||
ABORT;
|
||||
|
|
|
@ -1079,3 +1079,15 @@ SELECT relname FROM pg_class WHERE relname LIKE 'tt1%';
|
|||
(0 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
-- copy >8MB to a single worker to trigger a flush in PutRemoteCopyData
|
||||
BEGIN;
|
||||
CREATE UNLOGGED TABLE trigger_flush AS
|
||||
SELECT 1 AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,150000) s;
|
||||
SELECT create_distributed_table('trigger_flush','a');
|
||||
NOTICE: Copying data from local table...
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ABORT;
|
||||
|
|
Loading…
Reference in New Issue