diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index a6e23bb5b..47809da8e 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -32,6 +32,8 @@ int NodeConnectionTimeout = 5000; +int MaxCachedConnectionsPerWorker = 1; + HTAB *ConnectionHash = NULL; HTAB *ConnParamsHash = NULL; MemoryContext ConnectionContext = NULL; @@ -253,7 +255,6 @@ GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, co * * If user or database are NULL, the current session's defaults are used. The * following flags influence connection establishment behaviour: - * - SESSION_LIFESPAN - the connection should persist after transaction end * - FORCE_NEW_CONNECTION - a new connection is required * * The returned connection has only been initiated, not fully @@ -322,11 +323,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, connection = FindAvailableConnection(entry->connections, flags); if (connection) { - if (flags & SESSION_LIFESPAN) - { - connection->sessionLifespan = true; - } - return connection; } } @@ -340,11 +336,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, dlist_push_tail(entry->connections, &connection->connectionNode); ResetShardPlacementAssociation(connection); - if (flags & SESSION_LIFESPAN) - { - connection->sessionLifespan = true; - } - return connection; } @@ -374,8 +365,9 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) /* - * CloseNodeConnectionsAfterTransaction sets the sessionLifespan flag of the connections - * to a particular node as false. This is mainly used when a worker leaves the cluster. + * CloseNodeConnectionsAfterTransaction sets the forceClose flag of the connections + * to a particular node as true such that the connections are no longer cached. This + * is mainly used when a worker leaves the cluster. */ void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort) @@ -400,7 +392,7 @@ CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort) MultiConnection *connection = dlist_container(MultiConnection, connectionNode, iter.cur); - connection->sessionLifespan = false; + connection->forceCloseAtTransactionEnd = true; } } } @@ -1002,6 +994,7 @@ static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) { dlist_mutable_iter iter; + int cachedConnectionCount = 0; dlist_foreach_modify(iter, entry->connections) { @@ -1023,7 +1016,8 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) /* * Preserve session lifespan connections if they are still healthy. */ - if (!connection->sessionLifespan || + if (cachedConnectionCount >= MaxCachedConnectionsPerWorker || + connection->forceCloseAtTransactionEnd || PQstatus(connection->pgConn) != CONNECTION_OK || !RemoteTransactionIdle(connection)) { @@ -1044,6 +1038,8 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) connection->copyBytesWrittenSinceLastFlush = 0; UnclaimConnection(connection); + + cachedConnectionCount++; } } } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index d3626d53f..dba69fe6f 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -924,7 +924,7 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) bool queryOK = false; bool dontFailOnError = false; int64 currentAffectedTupleCount = 0; - int connectionFlags = SESSION_LIFESPAN; + int connectionFlags = 0; List *placementAccessList = NIL; MultiConnection *connection = NULL; @@ -1262,7 +1262,7 @@ GetModifyConnections(Task *task, bool markCritical) foreach(taskPlacementCell, taskPlacementList) { ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); - int connectionFlags = SESSION_LIFESPAN; + int connectionFlags = 0; MultiConnection *multiConnection = NULL; List *placementAccessList = NIL; ShardPlacementAccess *placementModification = NULL; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index c453927a5..5239bc7f1 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -745,6 +745,20 @@ RegisterCitusConfigVariables(void) GUC_UNIT_MS, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_cached_conns_per_worker", + gettext_noop("Sets the maximum number of connections to cache per worker."), + gettext_noop("Each backend opens connections to the workers to query the " + "shards. At the end of the transaction, the configurated number " + "of connections is kept open to speed up subsequent commands. " + "Increasing this value will reduce the latency of multi-shard " + "queries, but increases overhead on the workers"), + &MaxCachedConnectionsPerWorker, + 1, 0, INT_MAX, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.max_assign_task_batch_size", gettext_noop("Sets the maximum number of tasks to assign per round."), diff --git a/src/backend/distributed/test/run_from_same_connection.c b/src/backend/distributed/test/run_from_same_connection.c index d472e1264..92135ab93 100644 --- a/src/backend/distributed/test/run_from_same_connection.c +++ b/src/backend/distributed/test/run_from_same_connection.c @@ -85,6 +85,7 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS) text *nodeName = PG_GETARG_TEXT_P(0); uint32 nodePort = PG_GETARG_UINT32(1); char *nodeNameString = text_to_cstring(nodeName); + int connectionFlags = 0; CheckCitusVersion(ERROR); @@ -102,7 +103,7 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS) */ if (singleConnection == NULL) { - singleConnection = GetNodeConnection(SESSION_LIFESPAN, nodeNameString, nodePort); + singleConnection = GetNodeConnection(connectionFlags, nodeNameString, nodePort); allowNonIdleRemoteTransactionOnXactHandling = true; } diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 81746dcd0..8d28dd0b1 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -167,7 +167,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode) MemoryContext oldContext = NULL; bool recoveryFailed = false; - int connectionFlags = SESSION_LIFESPAN; + int connectionFlags = 0; MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK) { diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 1cbf8719e..bd5314ca1 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -39,15 +39,12 @@ enum MultiConnectionMode /* force establishment of a new connection */ FORCE_NEW_CONNECTION = 1 << 0, - /* mark returned connection as having session lifespan */ - SESSION_LIFESPAN = 1 << 1, + FOR_DDL = 1 << 1, - FOR_DDL = 1 << 2, - - FOR_DML = 1 << 3, + FOR_DML = 1 << 2, /* open a connection per (co-located set of) placement(s) */ - CONNECTION_PER_PLACEMENT = 1 << 4 + CONNECTION_PER_PLACEMENT = 1 << 3 }; @@ -65,8 +62,8 @@ typedef struct MultiConnection /* underlying libpq connection */ struct pg_conn *pgConn; - /* is the connection intended to be kept after transaction end */ - bool sessionLifespan; + /* force the connection to be closed at the end of the transaction */ + bool forceCloseAtTransactionEnd; /* is the connection currently in use, and shouldn't be used by anything else */ bool claimedExclusively; @@ -130,6 +127,9 @@ typedef struct ConnParamsHashEntry /* maximum duration to wait for connection */ extern int NodeConnectionTimeout; +/* maximum number of connections to cache per worker per session */ +extern int MaxCachedConnectionsPerWorker; + /* parameters used for outbound connections */ extern char *NodeConninfo; diff --git a/src/test/regress/expected/failure_1pc_copy_append.out b/src/test/regress/expected/failure_1pc_copy_append.out index d8b39075c..810508d01 100644 --- a/src/test/regress/expected/failure_1pc_copy_append.out +++ b/src/test/regress/expected/failure_1pc_copy_append.out @@ -50,11 +50,9 @@ SELECT citus.dump_network_traffic(); (0,worker,"[""RowDescription(fieldcount=2,fields=['F(name=min,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)', 'F(name=max,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=2,columns=[""C(length=0,value=b\\'\\')"", ""C(length=1,value=b\\'0\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']") (0,coordinator,"['Query(query=COMMIT)']") (0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']") - (1,coordinator,"[initial message]") - (1,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']") - (1,coordinator,"['Query(query=COPY (SELECT count(1) AS count FROM copy_test_100400 copy_test WHERE true) TO STDOUT)']") - (1,worker,"[""CopyOutResponse(format=0,columncount=1,columns=['Anonymous(format=0)'])"", ""CopyData(data=b'4\\\\n')"", 'CopyDone()', 'CommandComplete(command=COPY 1)', 'ReadyForQuery(state=idle)']") -(22 rows) + (0,coordinator,"['Query(query=COPY (SELECT count(1) AS count FROM copy_test_100400 copy_test WHERE true) TO STDOUT)']") + (0,worker,"[""CopyOutResponse(format=0,columncount=1,columns=['Anonymous(format=0)'])"", ""CopyData(data=b'4\\\\n')"", 'CopyDone()', 'CommandComplete(command=COPY 1)', 'ReadyForQuery(state=idle)']") +(20 rows) ---- all of the following tests test behavior with 2 shard placements ---- SHOW citus.shard_replication_factor; diff --git a/src/test/regress/expected/failure_1pc_copy_hash.out b/src/test/regress/expected/failure_1pc_copy_hash.out index 8eaf3626f..0354b7569 100644 --- a/src/test/regress/expected/failure_1pc_copy_hash.out +++ b/src/test/regress/expected/failure_1pc_copy_hash.out @@ -8,6 +8,7 @@ SET citus.shard_count = 1; SET citus.shard_replication_factor = 2; -- one shard per worker SET citus.multi_shard_commit_protocol TO '1pc'; SET citus.next_shard_id TO 100400; +SET citus.max_cached_conns_per_worker TO 0; ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; CREATE TABLE copy_test (key int, value int); SELECT create_distributed_table('copy_test', 'key'); @@ -212,10 +213,12 @@ SELECT citus.mitmproxy('conn.killall()'); (1 row) COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -WARNING: connection not open -CONTEXT: while executing command on localhost:9060 -COPY copy_test, line 1: "0, 0" -ERROR: failure on connection marked as essential: localhost:9060 +WARNING: connection error: localhost:9060 +DETAIL: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: COPY copy_test, line 1: "0, 0" +ERROR: could not connect to any active placements CONTEXT: COPY copy_test, line 1: "0, 0" SELECT citus.mitmproxy('conn.allow()'); mitmproxy diff --git a/src/test/regress/expected/failure_connection_establishment.out b/src/test/regress/expected/failure_connection_establishment.out index 869d9eeb3..4bf383963 100644 --- a/src/test/regress/expected/failure_connection_establishment.out +++ b/src/test/regress/expected/failure_connection_establishment.out @@ -14,6 +14,7 @@ SELECT citus.mitmproxy('conn.allow()'); CREATE SCHEMA fail_connect; SET search_path TO 'fail_connect'; SET citus.shard_count TO 4; +SET citus.max_cached_conns_per_worker TO 0; ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1450000; ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1450000; CREATE TABLE products ( diff --git a/src/test/regress/expected/failure_copy_on_hash.out b/src/test/regress/expected/failure_copy_on_hash.out index d4c7d7059..9f413641b 100644 --- a/src/test/regress/expected/failure_copy_on_hash.out +++ b/src/test/regress/expected/failure_copy_on_hash.out @@ -13,6 +13,7 @@ SELECT citus.mitmproxy('conn.allow()'); -- With one placement COPY should error out and placement should stay healthy. SET citus.shard_replication_factor TO 1; SET citus.shard_count to 4; +SET citus.max_cached_conns_per_worker to 0; CREATE TABLE test_table(id int, value_1 int); SELECT create_distributed_table('test_table','id'); create_distributed_table diff --git a/src/test/regress/expected/failure_copy_to_reference.out b/src/test/regress/expected/failure_copy_to_reference.out index 2ca89f67e..77f854912 100644 --- a/src/test/regress/expected/failure_copy_to_reference.out +++ b/src/test/regress/expected/failure_copy_to_reference.out @@ -35,10 +35,7 @@ SELECT citus.mitmproxy('conn.kill()'); (1 row) \copy test_table FROM STDIN DELIMITER ',' -ERROR: connection error: localhost:9060 -DETAIL: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. +ERROR: failure on connection marked as essential: localhost:9060 CONTEXT: COPY test_table, line 1: "1,2" SELECT citus.mitmproxy('conn.allow()'); mitmproxy diff --git a/src/test/regress/expected/failure_create_table.out b/src/test/regress/expected/failure_create_table.out index 8990e691f..de6695f0a 100644 --- a/src/test/regress/expected/failure_create_table.out +++ b/src/test/regress/expected/failure_create_table.out @@ -595,14 +595,10 @@ SELECT citus.mitmproxy('conn.kill()'); (1 row) SELECT master_create_worker_shards('test_table_2', 4, 2); -WARNING: connection error: localhost:9060 -DETAIL: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 ERROR: connection error: localhost:9060 -DETAIL: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. +DETAIL: connection not open SELECT count(*) FROM pg_dist_shard; count ------- diff --git a/src/test/regress/expected/failure_single_select.out b/src/test/regress/expected/failure_single_select.out index 25a55c9e6..c974e5036 100644 --- a/src/test/regress/expected/failure_single_select.out +++ b/src/test/regress/expected/failure_single_select.out @@ -197,6 +197,7 @@ ERROR: server closed the connection unexpectedly before or while processing the request. CONTEXT: while executing command on localhost:9060 -- bug from https://github.com/citusdata/citus/issues/1926 +SET citus.max_cached_conns_per_worker TO 0; -- purge cache DROP TABLE select_test; SET citus.shard_count = 2; SET citus.shard_replication_factor = 1; @@ -207,6 +208,7 @@ SELECT create_distributed_table('select_test', 'key'); (1 row) +SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached INSERT INTO select_test VALUES (1, 'test data'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).kill()'); mitmproxy diff --git a/src/test/regress/expected/isolation_citus_dist_activity.out b/src/test/regress/expected/isolation_citus_dist_activity.out index c363621fc..251de2973 100644 --- a/src/test/regress/expected/isolation_citus_dist_activity.out +++ b/src/test/regress/expected/isolation_citus_dist_activity.out @@ -7,12 +7,21 @@ create_distributed_table step s1-begin: BEGIN; + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; + step s2-begin: BEGIN; + + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; step s3-begin: BEGIN; + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; + step s1-alter-table: ALTER TABLE test_table ADD COLUMN x INT; @@ -67,12 +76,21 @@ create_distributed_table step s1-begin: BEGIN; + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; + step s2-begin: BEGIN; + + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; step s3-begin: BEGIN; + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; + step s1-insert: INSERT INTO test_table VALUES (100, 100); @@ -116,12 +134,21 @@ create_distributed_table step s1-begin: BEGIN; + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; + step s2-begin: BEGIN; + + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; step s3-begin: BEGIN; + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; + step s1-select: SELECT count(*) FROM test_table; @@ -171,12 +198,21 @@ create_distributed_table step s1-begin: BEGIN; + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; + step s2-begin: BEGIN; + + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; step s3-begin: BEGIN; + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; + step s1-select-router: SELECT count(*) FROM test_table WHERE column1 = 55; @@ -206,7 +242,6 @@ query query_hostname query_hostport master_query_host_namemaster_query_ SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle Client ClientRead postgres regression SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle Client ClientRead postgres regression SELECT count(*) AS count FROM public.test_table_105951 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 0 idle Client ClientRead postgres regression -COMMIT localhost 57637 0 idle Client ClientRead postgres regression step s2-rollback: ROLLBACK; diff --git a/src/test/regress/specs/isolation_citus_dist_activity.spec b/src/test/regress/specs/isolation_citus_dist_activity.spec index 70a652b9c..c50d46582 100644 --- a/src/test/regress/specs/isolation_citus_dist_activity.spec +++ b/src/test/regress/specs/isolation_citus_dist_activity.spec @@ -1,8 +1,9 @@ setup { - SET citus.shard_replication_factor TO 1; - SET citus.shard_count TO 4; - + SET citus.shard_replication_factor TO 1; + SET citus.shard_count TO 4; + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; CREATE TABLE test_table(column1 int, column2 int); SELECT create_distributed_table('test_table', 'column1'); } @@ -17,6 +18,9 @@ session "s1" step "s1-begin" { BEGIN; + + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; } step "s1-alter-table" @@ -49,6 +53,9 @@ session "s2" step "s2-begin" { BEGIN; + + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; } step "s2-rollback" @@ -72,6 +79,9 @@ session "s3" step "s3-begin" { BEGIN; + + -- we don't want to see any entries related to 2PC recovery + SET citus.max_cached_conns_per_worker TO 0; } step "s3-rollback" diff --git a/src/test/regress/sql/failure_1pc_copy_hash.sql b/src/test/regress/sql/failure_1pc_copy_hash.sql index 83cd1adbf..a15415089 100644 --- a/src/test/regress/sql/failure_1pc_copy_hash.sql +++ b/src/test/regress/sql/failure_1pc_copy_hash.sql @@ -4,6 +4,7 @@ SET citus.shard_count = 1; SET citus.shard_replication_factor = 2; -- one shard per worker SET citus.multi_shard_commit_protocol TO '1pc'; SET citus.next_shard_id TO 100400; +SET citus.max_cached_conns_per_worker TO 0; ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; CREATE TABLE copy_test (key int, value int); diff --git a/src/test/regress/sql/failure_connection_establishment.sql b/src/test/regress/sql/failure_connection_establishment.sql index 2c6fa7ecd..dff93c430 100644 --- a/src/test/regress/sql/failure_connection_establishment.sql +++ b/src/test/regress/sql/failure_connection_establishment.sql @@ -12,6 +12,7 @@ CREATE SCHEMA fail_connect; SET search_path TO 'fail_connect'; SET citus.shard_count TO 4; +SET citus.max_cached_conns_per_worker TO 0; ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1450000; ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1450000; diff --git a/src/test/regress/sql/failure_copy_on_hash.sql b/src/test/regress/sql/failure_copy_on_hash.sql index 45b54d9b5..c8863a408 100644 --- a/src/test/regress/sql/failure_copy_on_hash.sql +++ b/src/test/regress/sql/failure_copy_on_hash.sql @@ -11,6 +11,7 @@ SELECT citus.mitmproxy('conn.allow()'); -- With one placement COPY should error out and placement should stay healthy. SET citus.shard_replication_factor TO 1; SET citus.shard_count to 4; +SET citus.max_cached_conns_per_worker to 0; CREATE TABLE test_table(id int, value_1 int); SELECT create_distributed_table('test_table','id'); diff --git a/src/test/regress/sql/failure_single_select.sql b/src/test/regress/sql/failure_single_select.sql index 8c6461a94..565df402d 100644 --- a/src/test/regress/sql/failure_single_select.sql +++ b/src/test/regress/sql/failure_single_select.sql @@ -81,12 +81,15 @@ SELECT recover_prepared_transactions(); SELECT recover_prepared_transactions(); -- bug from https://github.com/citusdata/citus/issues/1926 +SET citus.max_cached_conns_per_worker TO 0; -- purge cache DROP TABLE select_test; SET citus.shard_count = 2; SET citus.shard_replication_factor = 1; CREATE TABLE select_test (key int, value text); SELECT create_distributed_table('select_test', 'key'); + +SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached INSERT INTO select_test VALUES (1, 'test data'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).kill()');