mirror of https://github.com/citusdata/citus.git
Cache a configurable number of connections at xact end
parent
caa8fffbd0
commit
bb3a96eacb
|
@ -32,6 +32,8 @@
|
||||||
|
|
||||||
|
|
||||||
int NodeConnectionTimeout = 5000;
|
int NodeConnectionTimeout = 5000;
|
||||||
|
int MaxCachedConnectionsPerWorker = 1;
|
||||||
|
|
||||||
HTAB *ConnectionHash = NULL;
|
HTAB *ConnectionHash = NULL;
|
||||||
HTAB *ConnParamsHash = NULL;
|
HTAB *ConnParamsHash = NULL;
|
||||||
MemoryContext ConnectionContext = NULL;
|
MemoryContext ConnectionContext = NULL;
|
||||||
|
@ -253,7 +255,6 @@ GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, co
|
||||||
*
|
*
|
||||||
* If user or database are NULL, the current session's defaults are used. The
|
* If user or database are NULL, the current session's defaults are used. The
|
||||||
* following flags influence connection establishment behaviour:
|
* following flags influence connection establishment behaviour:
|
||||||
* - SESSION_LIFESPAN - the connection should persist after transaction end
|
|
||||||
* - FORCE_NEW_CONNECTION - a new connection is required
|
* - FORCE_NEW_CONNECTION - a new connection is required
|
||||||
*
|
*
|
||||||
* The returned connection has only been initiated, not fully
|
* The returned connection has only been initiated, not fully
|
||||||
|
@ -322,11 +323,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
connection = FindAvailableConnection(entry->connections, flags);
|
connection = FindAvailableConnection(entry->connections, flags);
|
||||||
if (connection)
|
if (connection)
|
||||||
{
|
{
|
||||||
if (flags & SESSION_LIFESPAN)
|
|
||||||
{
|
|
||||||
connection->sessionLifespan = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -340,11 +336,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
dlist_push_tail(entry->connections, &connection->connectionNode);
|
dlist_push_tail(entry->connections, &connection->connectionNode);
|
||||||
ResetShardPlacementAssociation(connection);
|
ResetShardPlacementAssociation(connection);
|
||||||
|
|
||||||
if (flags & SESSION_LIFESPAN)
|
|
||||||
{
|
|
||||||
connection->sessionLifespan = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -374,8 +365,9 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CloseNodeConnectionsAfterTransaction sets the sessionLifespan flag of the connections
|
* CloseNodeConnectionsAfterTransaction sets the forceClose flag of the connections
|
||||||
* to a particular node as false. This is mainly used when a worker leaves the cluster.
|
* to a particular node as true such that the connections are no longer cached. This
|
||||||
|
* is mainly used when a worker leaves the cluster.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
|
CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
|
||||||
|
@ -400,7 +392,7 @@ CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
|
||||||
MultiConnection *connection =
|
MultiConnection *connection =
|
||||||
dlist_container(MultiConnection, connectionNode, iter.cur);
|
dlist_container(MultiConnection, connectionNode, iter.cur);
|
||||||
|
|
||||||
connection->sessionLifespan = false;
|
connection->forceCloseAtTransactionEnd = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1002,6 +994,7 @@ static void
|
||||||
AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
||||||
{
|
{
|
||||||
dlist_mutable_iter iter;
|
dlist_mutable_iter iter;
|
||||||
|
int cachedConnectionCount = 0;
|
||||||
|
|
||||||
dlist_foreach_modify(iter, entry->connections)
|
dlist_foreach_modify(iter, entry->connections)
|
||||||
{
|
{
|
||||||
|
@ -1023,7 +1016,8 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
||||||
/*
|
/*
|
||||||
* Preserve session lifespan connections if they are still healthy.
|
* Preserve session lifespan connections if they are still healthy.
|
||||||
*/
|
*/
|
||||||
if (!connection->sessionLifespan ||
|
if (cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
|
||||||
|
connection->forceCloseAtTransactionEnd ||
|
||||||
PQstatus(connection->pgConn) != CONNECTION_OK ||
|
PQstatus(connection->pgConn) != CONNECTION_OK ||
|
||||||
!RemoteTransactionIdle(connection))
|
!RemoteTransactionIdle(connection))
|
||||||
{
|
{
|
||||||
|
@ -1044,6 +1038,8 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
||||||
connection->copyBytesWrittenSinceLastFlush = 0;
|
connection->copyBytesWrittenSinceLastFlush = 0;
|
||||||
|
|
||||||
UnclaimConnection(connection);
|
UnclaimConnection(connection);
|
||||||
|
|
||||||
|
cachedConnectionCount++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -924,7 +924,7 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
|
||||||
bool queryOK = false;
|
bool queryOK = false;
|
||||||
bool dontFailOnError = false;
|
bool dontFailOnError = false;
|
||||||
int64 currentAffectedTupleCount = 0;
|
int64 currentAffectedTupleCount = 0;
|
||||||
int connectionFlags = SESSION_LIFESPAN;
|
int connectionFlags = 0;
|
||||||
List *placementAccessList = NIL;
|
List *placementAccessList = NIL;
|
||||||
MultiConnection *connection = NULL;
|
MultiConnection *connection = NULL;
|
||||||
|
|
||||||
|
@ -1262,7 +1262,7 @@ GetModifyConnections(Task *task, bool markCritical)
|
||||||
foreach(taskPlacementCell, taskPlacementList)
|
foreach(taskPlacementCell, taskPlacementList)
|
||||||
{
|
{
|
||||||
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
|
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
|
||||||
int connectionFlags = SESSION_LIFESPAN;
|
int connectionFlags = 0;
|
||||||
MultiConnection *multiConnection = NULL;
|
MultiConnection *multiConnection = NULL;
|
||||||
List *placementAccessList = NIL;
|
List *placementAccessList = NIL;
|
||||||
ShardPlacementAccess *placementModification = NULL;
|
ShardPlacementAccess *placementModification = NULL;
|
||||||
|
|
|
@ -745,6 +745,20 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_UNIT_MS,
|
GUC_UNIT_MS,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomIntVariable(
|
||||||
|
"citus.max_cached_conns_per_worker",
|
||||||
|
gettext_noop("Sets the maximum number of connections to cache per worker."),
|
||||||
|
gettext_noop("Each backend opens connections to the workers to query the "
|
||||||
|
"shards. At the end of the transaction, the configurated number "
|
||||||
|
"of connections is kept open to speed up subsequent commands. "
|
||||||
|
"Increasing this value will reduce the latency of multi-shard "
|
||||||
|
"queries, but increases overhead on the workers"),
|
||||||
|
&MaxCachedConnectionsPerWorker,
|
||||||
|
1, 0, INT_MAX,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_NO_SHOW_ALL,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomIntVariable(
|
DefineCustomIntVariable(
|
||||||
"citus.max_assign_task_batch_size",
|
"citus.max_assign_task_batch_size",
|
||||||
gettext_noop("Sets the maximum number of tasks to assign per round."),
|
gettext_noop("Sets the maximum number of tasks to assign per round."),
|
||||||
|
|
|
@ -85,6 +85,7 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS)
|
||||||
text *nodeName = PG_GETARG_TEXT_P(0);
|
text *nodeName = PG_GETARG_TEXT_P(0);
|
||||||
uint32 nodePort = PG_GETARG_UINT32(1);
|
uint32 nodePort = PG_GETARG_UINT32(1);
|
||||||
char *nodeNameString = text_to_cstring(nodeName);
|
char *nodeNameString = text_to_cstring(nodeName);
|
||||||
|
int connectionFlags = 0;
|
||||||
|
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
@ -102,7 +103,7 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS)
|
||||||
*/
|
*/
|
||||||
if (singleConnection == NULL)
|
if (singleConnection == NULL)
|
||||||
{
|
{
|
||||||
singleConnection = GetNodeConnection(SESSION_LIFESPAN, nodeNameString, nodePort);
|
singleConnection = GetNodeConnection(connectionFlags, nodeNameString, nodePort);
|
||||||
allowNonIdleRemoteTransactionOnXactHandling = true;
|
allowNonIdleRemoteTransactionOnXactHandling = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -167,7 +167,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
||||||
MemoryContext oldContext = NULL;
|
MemoryContext oldContext = NULL;
|
||||||
bool recoveryFailed = false;
|
bool recoveryFailed = false;
|
||||||
|
|
||||||
int connectionFlags = SESSION_LIFESPAN;
|
int connectionFlags = 0;
|
||||||
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
|
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
|
||||||
if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK)
|
if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK)
|
||||||
{
|
{
|
||||||
|
|
|
@ -39,15 +39,12 @@ enum MultiConnectionMode
|
||||||
/* force establishment of a new connection */
|
/* force establishment of a new connection */
|
||||||
FORCE_NEW_CONNECTION = 1 << 0,
|
FORCE_NEW_CONNECTION = 1 << 0,
|
||||||
|
|
||||||
/* mark returned connection as having session lifespan */
|
FOR_DDL = 1 << 1,
|
||||||
SESSION_LIFESPAN = 1 << 1,
|
|
||||||
|
|
||||||
FOR_DDL = 1 << 2,
|
FOR_DML = 1 << 2,
|
||||||
|
|
||||||
FOR_DML = 1 << 3,
|
|
||||||
|
|
||||||
/* open a connection per (co-located set of) placement(s) */
|
/* open a connection per (co-located set of) placement(s) */
|
||||||
CONNECTION_PER_PLACEMENT = 1 << 4
|
CONNECTION_PER_PLACEMENT = 1 << 3
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -65,8 +62,8 @@ typedef struct MultiConnection
|
||||||
/* underlying libpq connection */
|
/* underlying libpq connection */
|
||||||
struct pg_conn *pgConn;
|
struct pg_conn *pgConn;
|
||||||
|
|
||||||
/* is the connection intended to be kept after transaction end */
|
/* force the connection to be closed at the end of the transaction */
|
||||||
bool sessionLifespan;
|
bool forceCloseAtTransactionEnd;
|
||||||
|
|
||||||
/* is the connection currently in use, and shouldn't be used by anything else */
|
/* is the connection currently in use, and shouldn't be used by anything else */
|
||||||
bool claimedExclusively;
|
bool claimedExclusively;
|
||||||
|
@ -130,6 +127,9 @@ typedef struct ConnParamsHashEntry
|
||||||
/* maximum duration to wait for connection */
|
/* maximum duration to wait for connection */
|
||||||
extern int NodeConnectionTimeout;
|
extern int NodeConnectionTimeout;
|
||||||
|
|
||||||
|
/* maximum number of connections to cache per worker per session */
|
||||||
|
extern int MaxCachedConnectionsPerWorker;
|
||||||
|
|
||||||
/* parameters used for outbound connections */
|
/* parameters used for outbound connections */
|
||||||
extern char *NodeConninfo;
|
extern char *NodeConninfo;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue