mirror of https://github.com/citusdata/citus.git
Set initial pool size to cached connection count
parent
4c0c33365e
commit
4444d92dbc
|
@ -333,7 +333,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
connection = StartConnectionEstablishment(&key);
|
connection = StartConnectionEstablishment(&key);
|
||||||
|
|
||||||
dlist_push_tail(entry->connections, &connection->connectionNode);
|
dlist_push_tail(entry->connections, &connection->connectionNode);
|
||||||
entry->connectionCount++;
|
|
||||||
|
|
||||||
ResetShardPlacementAssociation(connection);
|
ResetShardPlacementAssociation(connection);
|
||||||
|
|
||||||
|
@ -1065,8 +1064,6 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
||||||
cachedConnectionCount++;
|
cachedConnectionCount++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
entry->connectionCount = cachedConnectionCount;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1172,29 +1169,3 @@ TrimLogLevel(const char *message)
|
||||||
|
|
||||||
return chompedMessage + n;
|
return chompedMessage + n;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* NodeConnectionCount gets the number of connections to the given node
|
|
||||||
* for the current username and database.
|
|
||||||
*/
|
|
||||||
int
|
|
||||||
NodeConnectionCount(char *hostname, int port)
|
|
||||||
{
|
|
||||||
ConnectionHashKey key;
|
|
||||||
ConnectionHashEntry *entry = NULL;
|
|
||||||
bool found = false;
|
|
||||||
|
|
||||||
strlcpy(key.hostname, hostname, MAX_NODE_LENGTH);
|
|
||||||
key.port = port;
|
|
||||||
strlcpy(key.user, CurrentUserName(), NAMEDATALEN);
|
|
||||||
strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN);
|
|
||||||
|
|
||||||
entry = (ConnectionHashEntry *) hash_search(ConnectionHash, &key, HASH_FIND, &found);
|
|
||||||
if (!found)
|
|
||||||
{
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
return entry->connectionCount;
|
|
||||||
}
|
|
||||||
|
|
|
@ -1472,8 +1472,7 @@ FindOrCreateWorkerPool(DistributedExecution *execution, WorkerNode *workerNode)
|
||||||
workerPool->distributedExecution = execution;
|
workerPool->distributedExecution = execution;
|
||||||
|
|
||||||
/* "open" connections aggressively when there are cached connections */
|
/* "open" connections aggressively when there are cached connections */
|
||||||
nodeConnectionCount = NodeConnectionCount(workerNode->workerName,
|
nodeConnectionCount = MaxCachedConnectionsPerWorker;
|
||||||
workerNode->workerPort);
|
|
||||||
workerPool->maxNewConnectionsPerCycle = Max(1, nodeConnectionCount);
|
workerPool->maxNewConnectionsPerCycle = Max(1, nodeConnectionCount);
|
||||||
|
|
||||||
dlist_init(&workerPool->pendingTaskQueue);
|
dlist_init(&workerPool->pendingTaskQueue);
|
||||||
|
|
|
@ -128,7 +128,6 @@ typedef struct ConnectionHashEntry
|
||||||
{
|
{
|
||||||
ConnectionHashKey key;
|
ConnectionHashKey key;
|
||||||
dlist_head *connections;
|
dlist_head *connections;
|
||||||
int connectionCount;
|
|
||||||
} ConnectionHashEntry;
|
} ConnectionHashEntry;
|
||||||
|
|
||||||
/* hash entry for cached connection parameters */
|
/* hash entry for cached connection parameters */
|
||||||
|
@ -191,7 +190,6 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
|
||||||
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
|
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
|
||||||
extern void CloseConnection(MultiConnection *connection);
|
extern void CloseConnection(MultiConnection *connection);
|
||||||
extern void ShutdownConnection(MultiConnection *connection);
|
extern void ShutdownConnection(MultiConnection *connection);
|
||||||
extern int NodeConnectionCount(char *nodeName, int nodePort);
|
|
||||||
|
|
||||||
/* dealing with a connection */
|
/* dealing with a connection */
|
||||||
extern void FinishConnectionListEstablishment(List *multiConnectionList);
|
extern void FinishConnectionListEstablishment(List *multiConnectionList);
|
||||||
|
|
Loading…
Reference in New Issue