mirror of https://github.com/citusdata/citus.git
Move connection establishment for intermediate results after query execution
When we have a query like the following: ```SQL WITH a AS (SELECT * FROM foo LIMIT 10) SELECT max(x) FROM a JOIN bar 2 USING (y); ``` Citus currently opens side channels for doing the `COPY "1_1"` FROM STDIN (format 'result') before starting the execution of `SELECT * FROM foo LIMIT 10` Since we need at least 1 connection per worker to do `SELECT * FROM foo LIMIT 10` We need to have 2 connections to worker in order to broadcast the results. However, we don't actually send a single row over the side channel until the execution of `SELECT * FROM foo LIMIT 10` is completely done (and connections unclaimed) and the results are written to a tuple store. We could actually reuse the same connection for doing the `COPY "1_1"` FROM STDIN (format 'result'). This also fixes the issue that Citus doesn't obey `citus.max_adaptive_executor_pool_size` when the query includes an intermediate result.pull/3703/head
parent
721daec9a5
commit
4f7c902c6c
|
@ -251,8 +251,6 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
|
|
||||||
resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
|
resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
|
||||||
copyOutState->binary);
|
copyOutState->binary);
|
||||||
|
|
||||||
PrepareIntermediateResultBroadcast(resultDest);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -372,6 +370,15 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
{
|
{
|
||||||
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest;
|
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest;
|
||||||
|
|
||||||
|
if (resultDest->tuplesSent == 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We get the first tuple, lets initialize the remote connections
|
||||||
|
* and/or the local file.
|
||||||
|
*/
|
||||||
|
PrepareIntermediateResultBroadcast(resultDest);
|
||||||
|
}
|
||||||
|
|
||||||
TupleDesc tupleDescriptor = resultDest->tupleDescriptor;
|
TupleDesc tupleDescriptor = resultDest->tupleDescriptor;
|
||||||
|
|
||||||
List *connectionList = resultDest->connectionList;
|
List *connectionList = resultDest->connectionList;
|
||||||
|
@ -441,6 +448,17 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver)
|
||||||
{
|
{
|
||||||
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver;
|
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver;
|
||||||
|
|
||||||
|
if (resultDest->tuplesSent == 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We have not received any tuples (when the intermediate result
|
||||||
|
* returns zero rows). Still, we want to create the necessary
|
||||||
|
* intermediate result files even if they are empty, as the query
|
||||||
|
* execution requires the files to be present.
|
||||||
|
*/
|
||||||
|
PrepareIntermediateResultBroadcast(resultDest);
|
||||||
|
}
|
||||||
|
|
||||||
List *connectionList = resultDest->connectionList;
|
List *connectionList = resultDest->connectionList;
|
||||||
CopyOutState copyOutState = resultDest->copyOutState;
|
CopyOutState copyOutState = resultDest->copyOutState;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue