Speed-up run_command_on_shards(). (#2564)

We were establishing connections synchronously. Establishing
connections asynchronously results in some parallelization, saving
hundreds of milliseconds.

In a test I did, this decreased the query time from 150ms to 40ms.
pull/2568/head^2
Hadi Moshayedi 2018-12-24 08:47:01 -05:00 committed by GitHub
parent fb497ddad1
commit 38579d52d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 15 additions and 6 deletions

View File

@ -229,28 +229,37 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
palloc0(commmandCount * sizeof(MultiConnection *)); palloc0(commmandCount * sizeof(MultiConnection *));
int finishedCount = 0; int finishedCount = 0;
/* establish connections */ /* start connections asynchronously */
for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) for (commandIndex = 0; commandIndex < commmandCount; commandIndex++)
{ {
char *nodeName = nodeNameArray[commandIndex]->data; char *nodeName = nodeNameArray[commandIndex]->data;
int nodePort = nodePortArray[commandIndex]; int nodePort = nodePortArray[commandIndex];
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *connection = connectionArray[commandIndex] =
GetNodeConnection(connectionFlags, nodeName, nodePort); StartNodeConnection(connectionFlags, nodeName, nodePort);
StringInfo queryResultString = resultStringArray[commandIndex]; }
statusArray[commandIndex] = true; /* establish connections */
for (commandIndex = 0; commandIndex < commmandCount; commandIndex++)
{
MultiConnection *connection = connectionArray[commandIndex];
StringInfo queryResultString = resultStringArray[commandIndex];
char *nodeName = nodeNameArray[commandIndex]->data;
int nodePort = nodePortArray[commandIndex];
FinishConnectionEstablishment(connection);
if (PQstatus(connection->pgConn) != CONNECTION_OK) if (PQstatus(connection->pgConn) != CONNECTION_OK)
{ {
appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName,
(int) nodePort); (int) nodePort);
statusArray[commandIndex] = false; statusArray[commandIndex] = false;
connectionArray[commandIndex] = NULL;
finishedCount++; finishedCount++;
} }
else else
{ {
connectionArray[commandIndex] = connection; statusArray[commandIndex] = true;
} }
} }