From 38579d52d07ace1ae03cce3858b338bf294c0936 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 24 Dec 2018 08:47:01 -0500 Subject: [PATCH] Speed-up run_command_on_shards(). (#2564) We were establishing connections synchronously. Establishing connections asynchronously results in some parallelization, saving hundreds of milliseconds. In a test I did, this decreased the query time from 150ms to 40ms. --- .../distributed/master/master_citus_tools.c | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/backend/distributed/master/master_citus_tools.c b/src/backend/distributed/master/master_citus_tools.c index f04cae685..2c8f7175e 100644 --- a/src/backend/distributed/master/master_citus_tools.c +++ b/src/backend/distributed/master/master_citus_tools.c @@ -229,28 +229,37 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor palloc0(commmandCount * sizeof(MultiConnection *)); int finishedCount = 0; - /* establish connections */ + /* start connections asynchronously */ for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) { char *nodeName = nodeNameArray[commandIndex]->data; int nodePort = nodePortArray[commandIndex]; int connectionFlags = FORCE_NEW_CONNECTION; - MultiConnection *connection = - GetNodeConnection(connectionFlags, nodeName, nodePort); - StringInfo queryResultString = resultStringArray[commandIndex]; + connectionArray[commandIndex] = + StartNodeConnection(connectionFlags, nodeName, nodePort); + } - statusArray[commandIndex] = true; + /* establish connections */ + for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) + { + MultiConnection *connection = connectionArray[commandIndex]; + StringInfo queryResultString = resultStringArray[commandIndex]; + char *nodeName = nodeNameArray[commandIndex]->data; + int nodePort = nodePortArray[commandIndex]; + + FinishConnectionEstablishment(connection); if (PQstatus(connection->pgConn) != CONNECTION_OK) { appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, (int) nodePort); statusArray[commandIndex] = false; + connectionArray[commandIndex] = NULL; finishedCount++; } else { - connectionArray[commandIndex] = connection; + statusArray[commandIndex] = true; } }