Merge pull request #2716 from citusdata/max_cached_connections

Replace session lifespan flag with a configurable number of connections
pull/2745/head
Önder Kalacı 2019-05-29 15:05:09 +02:00 committed by GitHub
commit 27b0f0023c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 111 additions and 51 deletions

View File

@ -32,6 +32,8 @@
int NodeConnectionTimeout = 5000; int NodeConnectionTimeout = 5000;
int MaxCachedConnectionsPerWorker = 1;
HTAB *ConnectionHash = NULL; HTAB *ConnectionHash = NULL;
HTAB *ConnParamsHash = NULL; HTAB *ConnParamsHash = NULL;
MemoryContext ConnectionContext = NULL; MemoryContext ConnectionContext = NULL;
@ -253,7 +255,6 @@ GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, co
* *
* If user or database are NULL, the current session's defaults are used. The * If user or database are NULL, the current session's defaults are used. The
* following flags influence connection establishment behaviour: * following flags influence connection establishment behaviour:
* - SESSION_LIFESPAN - the connection should persist after transaction end
* - FORCE_NEW_CONNECTION - a new connection is required * - FORCE_NEW_CONNECTION - a new connection is required
* *
* The returned connection has only been initiated, not fully * The returned connection has only been initiated, not fully
@ -322,11 +323,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
connection = FindAvailableConnection(entry->connections, flags); connection = FindAvailableConnection(entry->connections, flags);
if (connection) if (connection)
{ {
if (flags & SESSION_LIFESPAN)
{
connection->sessionLifespan = true;
}
return connection; return connection;
} }
} }
@ -340,11 +336,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
dlist_push_tail(entry->connections, &connection->connectionNode); dlist_push_tail(entry->connections, &connection->connectionNode);
ResetShardPlacementAssociation(connection); ResetShardPlacementAssociation(connection);
if (flags & SESSION_LIFESPAN)
{
connection->sessionLifespan = true;
}
return connection; return connection;
} }
@ -374,8 +365,9 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
/* /*
* CloseNodeConnectionsAfterTransaction sets the sessionLifespan flag of the connections * CloseNodeConnectionsAfterTransaction sets the forceClose flag of the connections
* to a particular node as false. This is mainly used when a worker leaves the cluster. * to a particular node as true such that the connections are no longer cached. This
* is mainly used when a worker leaves the cluster.
*/ */
void void
CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort) CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
@ -400,7 +392,7 @@ CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
MultiConnection *connection = MultiConnection *connection =
dlist_container(MultiConnection, connectionNode, iter.cur); dlist_container(MultiConnection, connectionNode, iter.cur);
connection->sessionLifespan = false; connection->forceCloseAtTransactionEnd = true;
} }
} }
} }
@ -1002,6 +994,7 @@ static void
AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
{ {
dlist_mutable_iter iter; dlist_mutable_iter iter;
int cachedConnectionCount = 0;
dlist_foreach_modify(iter, entry->connections) dlist_foreach_modify(iter, entry->connections)
{ {
@ -1023,7 +1016,8 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
/* /*
* Preserve session lifespan connections if they are still healthy. * Preserve session lifespan connections if they are still healthy.
*/ */
if (!connection->sessionLifespan || if (cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
connection->forceCloseAtTransactionEnd ||
PQstatus(connection->pgConn) != CONNECTION_OK || PQstatus(connection->pgConn) != CONNECTION_OK ||
!RemoteTransactionIdle(connection)) !RemoteTransactionIdle(connection))
{ {
@ -1044,6 +1038,8 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
connection->copyBytesWrittenSinceLastFlush = 0; connection->copyBytesWrittenSinceLastFlush = 0;
UnclaimConnection(connection); UnclaimConnection(connection);
cachedConnectionCount++;
} }
} }
} }

View File

@ -924,7 +924,7 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
bool queryOK = false; bool queryOK = false;
bool dontFailOnError = false; bool dontFailOnError = false;
int64 currentAffectedTupleCount = 0; int64 currentAffectedTupleCount = 0;
int connectionFlags = SESSION_LIFESPAN; int connectionFlags = 0;
List *placementAccessList = NIL; List *placementAccessList = NIL;
MultiConnection *connection = NULL; MultiConnection *connection = NULL;
@ -1262,7 +1262,7 @@ GetModifyConnections(Task *task, bool markCritical)
foreach(taskPlacementCell, taskPlacementList) foreach(taskPlacementCell, taskPlacementList)
{ {
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
int connectionFlags = SESSION_LIFESPAN; int connectionFlags = 0;
MultiConnection *multiConnection = NULL; MultiConnection *multiConnection = NULL;
List *placementAccessList = NIL; List *placementAccessList = NIL;
ShardPlacementAccess *placementModification = NULL; ShardPlacementAccess *placementModification = NULL;

View File

@ -745,6 +745,20 @@ RegisterCitusConfigVariables(void)
GUC_UNIT_MS, GUC_UNIT_MS,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_cached_conns_per_worker",
gettext_noop("Sets the maximum number of connections to cache per worker."),
gettext_noop("Each backend opens connections to the workers to query the "
"shards. At the end of the transaction, the configurated number "
"of connections is kept open to speed up subsequent commands. "
"Increasing this value will reduce the latency of multi-shard "
"queries, but increases overhead on the workers"),
&MaxCachedConnectionsPerWorker,
1, 0, INT_MAX,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.max_assign_task_batch_size", "citus.max_assign_task_batch_size",
gettext_noop("Sets the maximum number of tasks to assign per round."), gettext_noop("Sets the maximum number of tasks to assign per round."),

View File

@ -85,6 +85,7 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS)
text *nodeName = PG_GETARG_TEXT_P(0); text *nodeName = PG_GETARG_TEXT_P(0);
uint32 nodePort = PG_GETARG_UINT32(1); uint32 nodePort = PG_GETARG_UINT32(1);
char *nodeNameString = text_to_cstring(nodeName); char *nodeNameString = text_to_cstring(nodeName);
int connectionFlags = 0;
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
@ -102,7 +103,7 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS)
*/ */
if (singleConnection == NULL) if (singleConnection == NULL)
{ {
singleConnection = GetNodeConnection(SESSION_LIFESPAN, nodeNameString, nodePort); singleConnection = GetNodeConnection(connectionFlags, nodeNameString, nodePort);
allowNonIdleRemoteTransactionOnXactHandling = true; allowNonIdleRemoteTransactionOnXactHandling = true;
} }

View File

@ -167,7 +167,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
MemoryContext oldContext = NULL; MemoryContext oldContext = NULL;
bool recoveryFailed = false; bool recoveryFailed = false;
int connectionFlags = SESSION_LIFESPAN; int connectionFlags = 0;
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK) if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK)
{ {

View File

@ -39,15 +39,12 @@ enum MultiConnectionMode
/* force establishment of a new connection */ /* force establishment of a new connection */
FORCE_NEW_CONNECTION = 1 << 0, FORCE_NEW_CONNECTION = 1 << 0,
/* mark returned connection as having session lifespan */ FOR_DDL = 1 << 1,
SESSION_LIFESPAN = 1 << 1,
FOR_DDL = 1 << 2, FOR_DML = 1 << 2,
FOR_DML = 1 << 3,
/* open a connection per (co-located set of) placement(s) */ /* open a connection per (co-located set of) placement(s) */
CONNECTION_PER_PLACEMENT = 1 << 4 CONNECTION_PER_PLACEMENT = 1 << 3
}; };
@ -65,8 +62,8 @@ typedef struct MultiConnection
/* underlying libpq connection */ /* underlying libpq connection */
struct pg_conn *pgConn; struct pg_conn *pgConn;
/* is the connection intended to be kept after transaction end */ /* force the connection to be closed at the end of the transaction */
bool sessionLifespan; bool forceCloseAtTransactionEnd;
/* is the connection currently in use, and shouldn't be used by anything else */ /* is the connection currently in use, and shouldn't be used by anything else */
bool claimedExclusively; bool claimedExclusively;
@ -130,6 +127,9 @@ typedef struct ConnParamsHashEntry
/* maximum duration to wait for connection */ /* maximum duration to wait for connection */
extern int NodeConnectionTimeout; extern int NodeConnectionTimeout;
/* maximum number of connections to cache per worker per session */
extern int MaxCachedConnectionsPerWorker;
/* parameters used for outbound connections */ /* parameters used for outbound connections */
extern char *NodeConninfo; extern char *NodeConninfo;

View File

@ -50,11 +50,9 @@ SELECT citus.dump_network_traffic();
(0,worker,"[""RowDescription(fieldcount=2,fields=['F(name=min,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)', 'F(name=max,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=2,columns=[""C(length=0,value=b\\'\\')"", ""C(length=1,value=b\\'0\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']") (0,worker,"[""RowDescription(fieldcount=2,fields=['F(name=min,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)', 'F(name=max,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=2,columns=[""C(length=0,value=b\\'\\')"", ""C(length=1,value=b\\'0\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
(0,coordinator,"['Query(query=COMMIT)']") (0,coordinator,"['Query(query=COMMIT)']")
(0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']") (0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']")
(1,coordinator,"[initial message]") (0,coordinator,"['Query(query=COPY (SELECT count(1) AS count FROM copy_test_100400 copy_test WHERE true) TO STDOUT)']")
(1,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']") (0,worker,"[""CopyOutResponse(format=0,columncount=1,columns=['Anonymous(format=0)'])"", ""CopyData(data=b'4\\\\n')"", 'CopyDone()', 'CommandComplete(command=COPY 1)', 'ReadyForQuery(state=idle)']")
(1,coordinator,"['Query(query=COPY (SELECT count(1) AS count FROM copy_test_100400 copy_test WHERE true) TO STDOUT)']") (20 rows)
(1,worker,"[""CopyOutResponse(format=0,columncount=1,columns=['Anonymous(format=0)'])"", ""CopyData(data=b'4\\\\n')"", 'CopyDone()', 'CommandComplete(command=COPY 1)', 'ReadyForQuery(state=idle)']")
(22 rows)
---- all of the following tests test behavior with 2 shard placements ---- ---- all of the following tests test behavior with 2 shard placements ----
SHOW citus.shard_replication_factor; SHOW citus.shard_replication_factor;

View File

@ -8,6 +8,7 @@ SET citus.shard_count = 1;
SET citus.shard_replication_factor = 2; -- one shard per worker SET citus.shard_replication_factor = 2; -- one shard per worker
SET citus.multi_shard_commit_protocol TO '1pc'; SET citus.multi_shard_commit_protocol TO '1pc';
SET citus.next_shard_id TO 100400; SET citus.next_shard_id TO 100400;
SET citus.max_cached_conns_per_worker TO 0;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100;
CREATE TABLE copy_test (key int, value int); CREATE TABLE copy_test (key int, value int);
SELECT create_distributed_table('copy_test', 'key'); SELECT create_distributed_table('copy_test', 'key');
@ -212,10 +213,12 @@ SELECT citus.mitmproxy('conn.killall()');
(1 row) (1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
WARNING: connection not open WARNING: connection error: localhost:9060
CONTEXT: while executing command on localhost:9060 DETAIL: server closed the connection unexpectedly
COPY copy_test, line 1: "0, 0" This probably means the server terminated abnormally
ERROR: failure on connection marked as essential: localhost:9060 before or while processing the request.
CONTEXT: COPY copy_test, line 1: "0, 0"
ERROR: could not connect to any active placements
CONTEXT: COPY copy_test, line 1: "0, 0" CONTEXT: COPY copy_test, line 1: "0, 0"
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy

View File

@ -14,6 +14,7 @@ SELECT citus.mitmproxy('conn.allow()');
CREATE SCHEMA fail_connect; CREATE SCHEMA fail_connect;
SET search_path TO 'fail_connect'; SET search_path TO 'fail_connect';
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
SET citus.max_cached_conns_per_worker TO 0;
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1450000; ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1450000;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1450000; ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1450000;
CREATE TABLE products ( CREATE TABLE products (

View File

@ -13,6 +13,7 @@ SELECT citus.mitmproxy('conn.allow()');
-- With one placement COPY should error out and placement should stay healthy. -- With one placement COPY should error out and placement should stay healthy.
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.shard_count to 4; SET citus.shard_count to 4;
SET citus.max_cached_conns_per_worker to 0;
CREATE TABLE test_table(id int, value_1 int); CREATE TABLE test_table(id int, value_1 int);
SELECT create_distributed_table('test_table','id'); SELECT create_distributed_table('test_table','id');
create_distributed_table create_distributed_table

View File

@ -35,10 +35,7 @@ SELECT citus.mitmproxy('conn.kill()');
(1 row) (1 row)
\copy test_table FROM STDIN DELIMITER ',' \copy test_table FROM STDIN DELIMITER ','
ERROR: connection error: localhost:9060 ERROR: failure on connection marked as essential: localhost:9060
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: COPY test_table, line 1: "1,2" CONTEXT: COPY test_table, line 1: "1,2"
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy

View File

@ -595,14 +595,10 @@ SELECT citus.mitmproxy('conn.kill()');
(1 row) (1 row)
SELECT master_create_worker_shards('test_table_2', 4, 2); SELECT master_create_worker_shards('test_table_2', 4, 2);
WARNING: connection error: localhost:9060 WARNING: connection not open
DETAIL: server closed the connection unexpectedly CONTEXT: while executing command on localhost:9060
This probably means the server terminated abnormally
before or while processing the request.
ERROR: connection error: localhost:9060 ERROR: connection error: localhost:9060
DETAIL: server closed the connection unexpectedly DETAIL: connection not open
This probably means the server terminated abnormally
before or while processing the request.
SELECT count(*) FROM pg_dist_shard; SELECT count(*) FROM pg_dist_shard;
count count
------- -------

View File

@ -197,6 +197,7 @@ ERROR: server closed the connection unexpectedly
before or while processing the request. before or while processing the request.
CONTEXT: while executing command on localhost:9060 CONTEXT: while executing command on localhost:9060
-- bug from https://github.com/citusdata/citus/issues/1926 -- bug from https://github.com/citusdata/citus/issues/1926
SET citus.max_cached_conns_per_worker TO 0; -- purge cache
DROP TABLE select_test; DROP TABLE select_test;
SET citus.shard_count = 2; SET citus.shard_count = 2;
SET citus.shard_replication_factor = 1; SET citus.shard_replication_factor = 1;
@ -207,6 +208,7 @@ SELECT create_distributed_table('select_test', 'key');
(1 row) (1 row)
SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached
INSERT INTO select_test VALUES (1, 'test data'); INSERT INTO select_test VALUES (1, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).kill()'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).kill()');
mitmproxy mitmproxy

View File

@ -7,12 +7,21 @@ create_distributed_table
step s1-begin: step s1-begin:
BEGIN; BEGIN;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
step s2-begin: step s2-begin:
BEGIN; BEGIN;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
step s3-begin: step s3-begin:
BEGIN; BEGIN;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
step s1-alter-table: step s1-alter-table:
ALTER TABLE test_table ADD COLUMN x INT; ALTER TABLE test_table ADD COLUMN x INT;
@ -67,12 +76,21 @@ create_distributed_table
step s1-begin: step s1-begin:
BEGIN; BEGIN;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
step s2-begin: step s2-begin:
BEGIN; BEGIN;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
step s3-begin: step s3-begin:
BEGIN; BEGIN;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
step s1-insert: step s1-insert:
INSERT INTO test_table VALUES (100, 100); INSERT INTO test_table VALUES (100, 100);
@ -116,12 +134,21 @@ create_distributed_table
step s1-begin: step s1-begin:
BEGIN; BEGIN;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
step s2-begin: step s2-begin:
BEGIN; BEGIN;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
step s3-begin: step s3-begin:
BEGIN; BEGIN;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
step s1-select: step s1-select:
SELECT count(*) FROM test_table; SELECT count(*) FROM test_table;
@ -171,12 +198,21 @@ create_distributed_table
step s1-begin: step s1-begin:
BEGIN; BEGIN;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
step s2-begin: step s2-begin:
BEGIN; BEGIN;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
step s3-begin: step s3-begin:
BEGIN; BEGIN;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
step s1-select-router: step s1-select-router:
SELECT count(*) FROM test_table WHERE column1 = 55; SELECT count(*) FROM test_table WHERE column1 = 55;
@ -206,7 +242,6 @@ query query_hostname query_hostport master_query_host_namemaster_query_
SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle Client ClientRead postgres regression SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle Client ClientRead postgres regression
SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle Client ClientRead postgres regression SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle Client ClientRead postgres regression
SELECT count(*) AS count FROM public.test_table_105951 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 0 idle Client ClientRead postgres regression SELECT count(*) AS count FROM public.test_table_105951 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 0 idle Client ClientRead postgres regression
COMMIT localhost 57637 0 idle Client ClientRead postgres regression
step s2-rollback: step s2-rollback:
ROLLBACK; ROLLBACK;

View File

@ -1,8 +1,9 @@
setup setup
{ {
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
CREATE TABLE test_table(column1 int, column2 int); CREATE TABLE test_table(column1 int, column2 int);
SELECT create_distributed_table('test_table', 'column1'); SELECT create_distributed_table('test_table', 'column1');
} }
@ -17,6 +18,9 @@ session "s1"
step "s1-begin" step "s1-begin"
{ {
BEGIN; BEGIN;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
} }
step "s1-alter-table" step "s1-alter-table"
@ -49,6 +53,9 @@ session "s2"
step "s2-begin" step "s2-begin"
{ {
BEGIN; BEGIN;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
} }
step "s2-rollback" step "s2-rollback"
@ -72,6 +79,9 @@ session "s3"
step "s3-begin" step "s3-begin"
{ {
BEGIN; BEGIN;
-- we don't want to see any entries related to 2PC recovery
SET citus.max_cached_conns_per_worker TO 0;
} }
step "s3-rollback" step "s3-rollback"

View File

@ -4,6 +4,7 @@ SET citus.shard_count = 1;
SET citus.shard_replication_factor = 2; -- one shard per worker SET citus.shard_replication_factor = 2; -- one shard per worker
SET citus.multi_shard_commit_protocol TO '1pc'; SET citus.multi_shard_commit_protocol TO '1pc';
SET citus.next_shard_id TO 100400; SET citus.next_shard_id TO 100400;
SET citus.max_cached_conns_per_worker TO 0;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100;
CREATE TABLE copy_test (key int, value int); CREATE TABLE copy_test (key int, value int);

View File

@ -12,6 +12,7 @@ CREATE SCHEMA fail_connect;
SET search_path TO 'fail_connect'; SET search_path TO 'fail_connect';
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
SET citus.max_cached_conns_per_worker TO 0;
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1450000; ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1450000;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1450000; ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1450000;

View File

@ -11,6 +11,7 @@ SELECT citus.mitmproxy('conn.allow()');
-- With one placement COPY should error out and placement should stay healthy. -- With one placement COPY should error out and placement should stay healthy.
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.shard_count to 4; SET citus.shard_count to 4;
SET citus.max_cached_conns_per_worker to 0;
CREATE TABLE test_table(id int, value_1 int); CREATE TABLE test_table(id int, value_1 int);
SELECT create_distributed_table('test_table','id'); SELECT create_distributed_table('test_table','id');

View File

@ -81,12 +81,15 @@ SELECT recover_prepared_transactions();
SELECT recover_prepared_transactions(); SELECT recover_prepared_transactions();
-- bug from https://github.com/citusdata/citus/issues/1926 -- bug from https://github.com/citusdata/citus/issues/1926
SET citus.max_cached_conns_per_worker TO 0; -- purge cache
DROP TABLE select_test; DROP TABLE select_test;
SET citus.shard_count = 2; SET citus.shard_count = 2;
SET citus.shard_replication_factor = 1; SET citus.shard_replication_factor = 1;
CREATE TABLE select_test (key int, value text); CREATE TABLE select_test (key int, value text);
SELECT create_distributed_table('select_test', 'key'); SELECT create_distributed_table('select_test', 'key');
SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached
INSERT INTO select_test VALUES (1, 'test data'); INSERT INTO select_test VALUES (1, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).kill()'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).kill()');