From f91ea6fc59b040b68faaf45236f94a163ae3f4bb Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 14 Dec 2017 19:21:59 +0100 Subject: [PATCH] Reduce the frequency of FinishConnectionIO calls during COPY (#1864) --- .../connection/connection_management.c | 3 +++ .../distributed/connection/remote_commands.c | 20 ++++++++++++++++--- .../distributed/connection_management.h | 3 +++ src/test/regress/input/multi_copy.source | 7 +++++++ src/test/regress/output/multi_copy.source | 12 +++++++++++ 5 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 68c1084b2..23dca4b47 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -719,6 +719,9 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) ResetRemoteTransaction(connection); ResetShardPlacementAssociation(connection); + /* reset copy state */ + connection->copyBytesWrittenSinceLastFlush = 0; + UnclaimConnection(connection); } } diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 662fdb555..e5467fbcb 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -21,6 +21,9 @@ #include "utils/palloc.h" +#define MAX_PUT_COPY_DATA_BUFFER_SIZE (8 * 1024 * 1024) + + /* GUC, determining whether statements sent to remote nodes are logged */ bool LogRemoteCommands = false; @@ -580,11 +583,20 @@ PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes) * until the socket is writable to prevent the internal libpq buffers * from growing excessively. * - * In the future, we could reduce the frequency of these pushbacks to - * achieve higher throughput. + * We currently allow the internal buffer to grow to 8MB before + * providing back pressure based on experimentation that showed + * throughput get worse at 4MB and lower due to the number of CPU + * cycles spent in networking system calls. */ - return FinishConnectionIO(connection, allowInterrupts); + connection->copyBytesWrittenSinceLastFlush += nbytes; + if (connection->copyBytesWrittenSinceLastFlush > MAX_PUT_COPY_DATA_BUFFER_SIZE) + { + connection->copyBytesWrittenSinceLastFlush = 0; + return FinishConnectionIO(connection, allowInterrupts); + } + + return true; } @@ -616,6 +628,8 @@ PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg) /* see PutRemoteCopyData() */ + connection->copyBytesWrittenSinceLastFlush = 0; + return FinishConnectionIO(connection, allowInterrupts); } diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 323e5ed4f..95b11170b 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -79,6 +79,9 @@ typedef struct MultiConnection /* list of all placements referenced by this connection */ dlist_head referencedPlacements; + + /* number of bytes sent to PQputCopyData() since last flush */ + uint64 copyBytesWrittenSinceLastFlush; } MultiConnection; diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index 97a0ba017..093757713 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -805,3 +805,10 @@ DROP TABLE drop_copy_test_table; \c - - - :worker_1_port SELECT relname FROM pg_class WHERE relname LIKE 'tt1%'; \c - - - :master_port + +-- copy >8MB to a single worker to trigger a flush in PutRemoteCopyData +BEGIN; +CREATE UNLOGGED TABLE trigger_flush AS +SELECT 1 AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,150000) s; +SELECT create_distributed_table('trigger_flush','a'); +ABORT; diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 6953d9b2f..098587375 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -1079,3 +1079,15 @@ SELECT relname FROM pg_class WHERE relname LIKE 'tt1%'; (0 rows) \c - - - :master_port +-- copy >8MB to a single worker to trigger a flush in PutRemoteCopyData +BEGIN; +CREATE UNLOGGED TABLE trigger_flush AS +SELECT 1 AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,150000) s; +SELECT create_distributed_table('trigger_flush','a'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +ABORT;