From 4f7c902c6cd1ec7df75197c357e86662355ac48a Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 3 Apr 2020 12:04:07 +0200 Subject: [PATCH] Move connection establishment for intermediate results after query execution When we have a query like the following: ```SQL WITH a AS (SELECT * FROM foo LIMIT 10) SELECT max(x) FROM a JOIN bar 2 USING (y); ``` Citus currently opens side channels for doing the `COPY "1_1"` FROM STDIN (format 'result') before starting the execution of `SELECT * FROM foo LIMIT 10` Since we need at least 1 connection per worker to do `SELECT * FROM foo LIMIT 10` We need to have 2 connections to worker in order to broadcast the results. However, we don't actually send a single row over the side channel until the execution of `SELECT * FROM foo LIMIT 10` is completely done (and connections unclaimed) and the results are written to a tuple store. We could actually reuse the same connection for doing the `COPY "1_1"` FROM STDIN (format 'result'). This also fixes the issue that Citus doesn't obey `citus.max_adaptive_executor_pool_size` when the query includes an intermediate result. --- .../executor/intermediate_results.c | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 9330400f8..ca7b20f08 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -251,8 +251,6 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); - - PrepareIntermediateResultBroadcast(resultDest); } @@ -372,6 +370,15 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest; + if (resultDest->tuplesSent == 0) + { + /* + * We get the first tuple, lets initialize the remote connections + * and/or the local file. + */ + PrepareIntermediateResultBroadcast(resultDest); + } + TupleDesc tupleDescriptor = resultDest->tupleDescriptor; List *connectionList = resultDest->connectionList; @@ -441,6 +448,17 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver) { RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver; + if (resultDest->tuplesSent == 0) + { + /* + * We have not received any tuples (when the intermediate result + * returns zero rows). Still, we want to create the necessary + * intermediate result files even if they are empty, as the query + * execution requires the files to be present. + */ + PrepareIntermediateResultBroadcast(resultDest); + } + List *connectionList = resultDest->connectionList; CopyOutState copyOutState = resultDest->copyOutState;