diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 5ecdad20a..d87be78d8 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -275,9 +275,6 @@ typedef struct DistributedExecution */ uint64 rowsProcessed; - /* statistics on distributed execution */ - DistributedExecutionStats *executionStats; - /* * The following fields are used while receiving results from remote nodes. * We store this information here to avoid re-allocating it every time. @@ -1085,8 +1082,6 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, execution->localTaskList = NIL; execution->remoteTaskList = NIL; - execution->executionStats = - (DistributedExecutionStats *) palloc0(sizeof(DistributedExecutionStats)); execution->paramListInfo = paramListInfo; execution->workerList = NIL; execution->sessionList = NIL; @@ -3681,7 +3676,6 @@ ReceiveResults(WorkerSession *session, bool storeRows) MultiConnection *connection = session->connection; WorkerPool *workerPool = session->workerPool; DistributedExecution *execution = workerPool->distributedExecution; - DistributedExecutionStats *executionStats = execution->executionStats; TaskPlacementExecution *placementExecution = session->currentTask; ShardCommandExecution *shardCommandExecution = placementExecution->shardCommandExecution; @@ -3822,10 +3816,10 @@ ReceiveResults(WorkerSession *session, bool storeRows) */ Assert(EnableBinaryProtocol || !binaryResults); - uint64 tupleLibpqSize = 0; - for (uint32 rowIndex = 0; rowIndex < rowsProcessed; rowIndex++) { + uint64 tupleLibpqSize = 0; + /* * Switch to a temporary memory context that we reset after each * tuple. This protects us from any memory leaks that might be @@ -3864,10 +3858,7 @@ ReceiveResults(WorkerSession *session, bool storeRows) } columnArray[columnIndex] = value; } - if (SubPlanLevel > 0 && executionStats != NULL) - { - executionStats->totalIntermediateResultSize += valueLength; - } + tupleLibpqSize += valueLength; } } @@ -3898,11 +3889,6 @@ ReceiveResults(WorkerSession *session, bool storeRows) } PQclear(result); - - if (executionStats != NULL && CheckIfSizeLimitIsExceeded(executionStats)) - { - ErrorSizeLimitIsExceeded(); - } } /* the context is local to the function, so not needed anymore */ diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index bcbdf823f..cb461fa94 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -88,9 +88,10 @@ #include "distributed/listutils.h" #include "distributed/local_executor.h" #include "distributed/local_plan_cache.h" -#include "distributed/multi_executor.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" +#include "distributed/multi_server_executor.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" /* to access LogRemoteCommands */ #include "distributed/transaction_management.h" @@ -284,10 +285,9 @@ ExecuteLocalTaskListExtended(List *taskList, if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) { List *queryStringList = task->taskQuery.data.queryStringList; - totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries( - queryStringList, - tupleDest, - task); + totalRowsProcessed += + LocallyPlanAndExecuteMultipleQueries(queryStringList, tupleDest, + task); continue; } diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 4c60d8342..5348a0b9c 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -27,6 +27,7 @@ #include "distributed/multi_server_executor.h" #include "distributed/coordinator_protocol.h" #include "distributed/subplan_execution.h" +#include "distributed/tuple_destination.h" #include "distributed/worker_protocol.h" #include "utils/lsyscache.h" @@ -93,45 +94,3 @@ JobExecutorType(DistributedPlan *distributedPlan) return MULTI_EXECUTOR_ADAPTIVE; } - - -/* - * CheckIfSizeLimitIsExceeded checks if the limit is exceeded by intermediate - * results, if there is any. - */ -bool -CheckIfSizeLimitIsExceeded(DistributedExecutionStats *executionStats) -{ - if (!SubPlanLevel || MaxIntermediateResult < 0) - { - return false; - } - - uint64 maxIntermediateResultInBytes = MaxIntermediateResult * 1024L; - if (executionStats->totalIntermediateResultSize < maxIntermediateResultInBytes) - { - return false; - } - - return true; -} - - -/* - * This function is called when the intermediate result size limitation is - * exceeded. It basically errors out with a detailed explanation. - */ -void -ErrorSizeLimitIsExceeded() -{ - ereport(ERROR, (errmsg("the intermediate result size exceeds " - "citus.max_intermediate_result_size (currently %d kB)", - MaxIntermediateResult), - errdetail("Citus restricts the size of intermediate " - "results of complex subqueries and CTEs to " - "avoid accidentally pulling large result sets " - "into once place."), - errhint("To run the current query, set " - "citus.max_intermediate_result_size to a higher" - " value or -1 to disable."))); -} diff --git a/src/backend/distributed/executor/tuple_destination.c b/src/backend/distributed/executor/tuple_destination.c index a6f5420d4..0fe1442fa 100644 --- a/src/backend/distributed/executor/tuple_destination.c +++ b/src/backend/distributed/executor/tuple_destination.c @@ -7,8 +7,12 @@ #include #include +#include "access/htup_details.h" +#include "distributed/multi_server_executor.h" +#include "distributed/subplan_execution.h" #include "distributed/tuple_destination.h" + /* * TupleStoreTupleDestination is internal representation of a TupleDestination * which forwards tuples to a tuple store. @@ -43,6 +47,8 @@ typedef struct TupleDestDestReceiver static void TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task, int placementIndex, int queryNumber, HeapTuple heapTuple, uint64 tupleLibpqSize); +static void EnsureIntermediateSizeLimitNotExceeded(TupleDestinationStats * + tupleDestinationStats); static TupleDesc TupleStoreTupleDestTupleDescForQuery(TupleDestination *self, int queryNumber); static void TupleDestNonePutTuple(TupleDestination *self, Task *task, @@ -66,12 +72,17 @@ CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor { TupleStoreTupleDestination *tupleStoreTupleDest = palloc0( sizeof(TupleStoreTupleDestination)); + tupleStoreTupleDest->tupleStore = tupleStore; tupleStoreTupleDest->tupleDesc = tupleDescriptor; tupleStoreTupleDest->pub.putTuple = TupleStoreTupleDestPutTuple; tupleStoreTupleDest->pub.tupleDescForQuery = TupleStoreTupleDestTupleDescForQuery; + TupleDestination *tupleDestination = &tupleStoreTupleDest->pub; + tupleDestination->tupleDestinationStats = + (TupleDestinationStats *) palloc0(sizeof(TupleDestinationStats)); + return (TupleDestination *) tupleStoreTupleDest; } @@ -86,11 +97,83 @@ TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task, HeapTuple heapTuple, uint64 tupleLibpqSize) { TupleStoreTupleDestination *tupleDest = (TupleStoreTupleDestination *) self; + + /* + * Remote execution sets tupleLibpqSize, however it is 0 for local execution. We prefer + * to use tupleLibpqSize for the remote execution because that reflects the exact data + * transfer size over the network. For local execution, we rely on the size of the + * tuple. + */ + uint64 tupleSize = tupleLibpqSize; + if (tupleSize == 0) + { + tupleSize = HeapTupleHeaderGetDatumLength(heapTuple); + } + + /* + * Enfoce citus.max_intermediate_result_size for subPlans if + * the caller requested. + */ + TupleDestinationStats *tupleDestinationStats = self->tupleDestinationStats; + if (SubPlanLevel > 0 && tupleDestinationStats != NULL) + { + tupleDestinationStats->totalIntermediateResultSize += tupleSize; + EnsureIntermediateSizeLimitNotExceeded(tupleDestinationStats); + } + + /* do the actual work */ tuplestore_puttuple(tupleDest->tupleStore, heapTuple); + + /* we record tuples received over network */ task->totalReceivedTupleData += tupleLibpqSize; } +/* + * EnsureIntermediateSizeLimitNotExceeded is a helper function for checking the current + * state of the tupleDestinationStats and throws error if necessary. + */ +static void +EnsureIntermediateSizeLimitNotExceeded(TupleDestinationStats *tupleDestinationStats) +{ + if (!tupleDestinationStats) + { + /* unexpected, still prefer defensive approach */ + return; + } + + /* + * We only care about subPlans. Also, if user disabled, no need to + * check further. + */ + if (SubPlanLevel == 0 || MaxIntermediateResult < 0) + { + return; + } + + uint64 maxIntermediateResultInBytes = MaxIntermediateResult * 1024L; + if (tupleDestinationStats->totalIntermediateResultSize < maxIntermediateResultInBytes) + { + /* + * We have not reached the size limit that the user requested, so + * nothing to do for now. + */ + return; + } + + ereport(ERROR, (errmsg("the intermediate result size exceeds " + "citus.max_intermediate_result_size (currently %d kB)", + MaxIntermediateResult), + errdetail("Citus restricts the size of intermediate " + "results of complex subqueries and CTEs to " + "avoid accidentally pulling large result sets " + "into once place."), + errhint("To run the current query, set " + "citus.max_intermediate_result_size to a higher" + " value or -1 to disable."))); +} + + /* * TupleStoreTupleDestTupleDescForQuery implements TupleDestination->TupleDescForQuery * for TupleStoreTupleDestination. @@ -203,7 +286,10 @@ TupleDestDestReceiverReceive(TupleTableSlot *slot, HeapTuple heapTuple = ExecFetchSlotTuple(slot); #endif - tupleDest->putTuple(tupleDest, task, placementIndex, queryNumber, heapTuple, 0); + uint64 tupleLibpqSize = 0; + + tupleDest->putTuple(tupleDest, task, placementIndex, queryNumber, heapTuple, + tupleLibpqSize); return true; } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index b8a94e929..1518756fa 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -311,7 +311,8 @@ typedef struct Task /* * totalReceivedTupleData only counts the data for a single placement. So * for RETURNING DML this is not really correct. This is used by - * EXPLAIN ANALYZE, to display the amount of received bytes. + * EXPLAIN ANALYZE, to display the amount of received bytes. The local execution + * does not increment this value, so only used for remote execution. */ uint64 totalReceivedTupleData; diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index 52b260880..fab0d31d6 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -15,7 +15,6 @@ #define MULTI_SERVER_EXECUTOR_H #include "distributed/multi_physical_planner.h" - #include "distributed/worker_manager.h" /* Adaptive executor repartioning related defines */ @@ -34,18 +33,6 @@ typedef enum } MultiExecutorType; -/* - * DistributedExecutionStats holds the execution related stats. - * - * totalIntermediateResultSize is a counter to keep the size - * of the intermediate results of complex subqueries and CTEs - * so that we can put a limit on the size. - */ -typedef struct DistributedExecutionStats -{ - uint64 totalIntermediateResultSize; -} DistributedExecutionStats; - /* Config variable managed via guc.c */ extern int RemoteTaskCheckInterval; extern int TaskExecutorType; @@ -55,7 +42,5 @@ extern int MultiTaskQueryLogLevel; /* Function declarations common to more than one executor */ extern MultiExecutorType JobExecutorType(DistributedPlan *distributedPlan); -extern bool CheckIfSizeLimitIsExceeded(DistributedExecutionStats *executionStats); -extern void ErrorSizeLimitIsExceeded(void); #endif /* MULTI_SERVER_EXECUTOR_H */ diff --git a/src/include/distributed/tuple_destination.h b/src/include/distributed/tuple_destination.h index 24e4b4aad..0480ffdc4 100644 --- a/src/include/distributed/tuple_destination.h +++ b/src/include/distributed/tuple_destination.h @@ -15,8 +15,23 @@ #include "tcop/dest.h" #include "utils/tuplestore.h" + typedef struct TupleDestination TupleDestination; + +/* + * TupleDestinationStats holds the size related stats. + * + * totalIntermediateResultSize is a counter to keep the size + * of the intermediate results of complex subqueries and CTEs + * so that we can put a limit on the size. + */ +typedef struct TupleDestinationStats +{ + uint64 totalIntermediateResultSize; +} TupleDestinationStats; + + /* * TupleDestination provides a generic interface for where to send tuples. * @@ -36,6 +51,12 @@ struct TupleDestination /* tupleDescForQuery returns tuple descriptor for a query number. Can return NULL. */ TupleDesc (*tupleDescForQuery)(TupleDestination *self, int queryNumber); + + /* + * Used to enforce citus.max_intermediate_result_size, could be NULL + * if the caller is not interested in the size. + */ + TupleDestinationStats *tupleDestinationStats; }; extern TupleDestination * CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc diff --git a/src/test/regress/expected/limit_intermediate_size.out b/src/test/regress/expected/limit_intermediate_size.out index 1bdda90be..66b2342fd 100644 --- a/src/test/regress/expected/limit_intermediate_size.out +++ b/src/test/regress/expected/limit_intermediate_size.out @@ -133,12 +133,10 @@ WITH cte AS ( SELECT * FROM cte2, cte3 WHERE cte2.value_1 IN (SELECT value_2 FROM cte3) ) SELECT count(*) FROM cte; - count ---------------------------------------------------------------------- - 1824 -(1 row) - --- this will fail in real_time_executor +ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 3 kB) +DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. +HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. +-- this will fail in remote execution SET citus.max_intermediate_result_size TO 2; WITH cte AS ( WITH cte2 AS ( diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 75b7eec03..b2f6b01d3 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -93,6 +93,34 @@ WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER 5 | 6 (5 rows) +-- we should be able to limit intermediate results +BEGIN; + SET LOCAL citus.max_intermediate_result_size TO 0; + WITH cte_1 AS (SELECT * FROM test OFFSET 0) SELECT * FROM cte_1; +ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 0 kB) +DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. +HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. +ROLLBACK; +-- the first cte (cte_1) does not exceed the limit +-- but the second (cte_2) exceeds, so we error out +BEGIN; + SET LOCAL citus.max_intermediate_result_size TO '1kB'; + INSERT INTO test SELECT i,i from generate_series(0,1000)i; + -- only pulls 1 row, should not hit the limit + WITH cte_1 AS (SELECT * FROM test LIMIT 1) SELECT count(*) FROM cte_1; + count +--------------------------------------------------------------------- + 1 +(1 row) + + -- cte_1 only pulls 1 row, but cte_2 all rows + WITH cte_1 AS (SELECT * FROM test LIMIT 1), + cte_2 AS (SELECT * FROM test OFFSET 0) + SELECT count(*) FROM cte_1, cte_2; +ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 1 kB) +DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. +HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. +ROLLBACK; -- single shard and multi-shard delete -- inside a transaction block BEGIN; diff --git a/src/test/regress/sql/limit_intermediate_size.sql b/src/test/regress/sql/limit_intermediate_size.sql index 6d4862443..09922a10b 100644 --- a/src/test/regress/sql/limit_intermediate_size.sql +++ b/src/test/regress/sql/limit_intermediate_size.sql @@ -116,7 +116,7 @@ WITH cte AS ( SELECT count(*) FROM cte; --- this will fail in real_time_executor +-- this will fail in remote execution SET citus.max_intermediate_result_size TO 2; WITH cte AS ( WITH cte2 AS ( diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index c6a734e8a..a0ab64b35 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -39,6 +39,27 @@ SELECT * FROM test ORDER BY x; UPDATE test SET y = y + 1 RETURNING *; WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER BY 1,2; +-- we should be able to limit intermediate results +BEGIN; + SET LOCAL citus.max_intermediate_result_size TO 0; + WITH cte_1 AS (SELECT * FROM test OFFSET 0) SELECT * FROM cte_1; +ROLLBACK; + +-- the first cte (cte_1) does not exceed the limit +-- but the second (cte_2) exceeds, so we error out +BEGIN; + SET LOCAL citus.max_intermediate_result_size TO '1kB'; + INSERT INTO test SELECT i,i from generate_series(0,1000)i; + + -- only pulls 1 row, should not hit the limit + WITH cte_1 AS (SELECT * FROM test LIMIT 1) SELECT count(*) FROM cte_1; + + -- cte_1 only pulls 1 row, but cte_2 all rows + WITH cte_1 AS (SELECT * FROM test LIMIT 1), + cte_2 AS (SELECT * FROM test OFFSET 0) + SELECT count(*) FROM cte_1, cte_2; +ROLLBACK; + -- single shard and multi-shard delete -- inside a transaction block BEGIN;