From fe3caf3bc82a306b86d3e1151d8e80f3b4fd80ea Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 13 Oct 2020 09:36:04 +0200 Subject: [PATCH] Local execution considers intermediate result size limit With this commit, we make sure that local execution adds the intermediate result size as the distributed execution adds. Plus, it enforces the citus.max_intermediate_result_size value. --- .../distributed/executor/adaptive_executor.c | 20 +---- .../distributed/executor/local_executor.c | 10 +-- .../executor/multi_server_executor.c | 43 +-------- .../distributed/executor/tuple_destination.c | 88 ++++++++++++++++++- .../distributed/multi_physical_planner.h | 3 +- .../distributed/multi_server_executor.h | 15 ---- src/include/distributed/tuple_destination.h | 21 +++++ .../expected/limit_intermediate_size.out | 10 +-- src/test/regress/expected/single_node.out | 28 ++++++ .../regress/sql/limit_intermediate_size.sql | 2 +- src/test/regress/sql/single_node.sql | 21 +++++ 11 files changed, 173 insertions(+), 88 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 5ecdad20a..d87be78d8 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -275,9 +275,6 @@ typedef struct DistributedExecution */ uint64 rowsProcessed; - /* statistics on distributed execution */ - DistributedExecutionStats *executionStats; - /* * The following fields are used while receiving results from remote nodes. * We store this information here to avoid re-allocating it every time. @@ -1085,8 +1082,6 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, execution->localTaskList = NIL; execution->remoteTaskList = NIL; - execution->executionStats = - (DistributedExecutionStats *) palloc0(sizeof(DistributedExecutionStats)); execution->paramListInfo = paramListInfo; execution->workerList = NIL; execution->sessionList = NIL; @@ -3681,7 +3676,6 @@ ReceiveResults(WorkerSession *session, bool storeRows) MultiConnection *connection = session->connection; WorkerPool *workerPool = session->workerPool; DistributedExecution *execution = workerPool->distributedExecution; - DistributedExecutionStats *executionStats = execution->executionStats; TaskPlacementExecution *placementExecution = session->currentTask; ShardCommandExecution *shardCommandExecution = placementExecution->shardCommandExecution; @@ -3822,10 +3816,10 @@ ReceiveResults(WorkerSession *session, bool storeRows) */ Assert(EnableBinaryProtocol || !binaryResults); - uint64 tupleLibpqSize = 0; - for (uint32 rowIndex = 0; rowIndex < rowsProcessed; rowIndex++) { + uint64 tupleLibpqSize = 0; + /* * Switch to a temporary memory context that we reset after each * tuple. This protects us from any memory leaks that might be @@ -3864,10 +3858,7 @@ ReceiveResults(WorkerSession *session, bool storeRows) } columnArray[columnIndex] = value; } - if (SubPlanLevel > 0 && executionStats != NULL) - { - executionStats->totalIntermediateResultSize += valueLength; - } + tupleLibpqSize += valueLength; } } @@ -3898,11 +3889,6 @@ ReceiveResults(WorkerSession *session, bool storeRows) } PQclear(result); - - if (executionStats != NULL && CheckIfSizeLimitIsExceeded(executionStats)) - { - ErrorSizeLimitIsExceeded(); - } } /* the context is local to the function, so not needed anymore */ diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index bcbdf823f..cb461fa94 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -88,9 +88,10 @@ #include "distributed/listutils.h" #include "distributed/local_executor.h" #include "distributed/local_plan_cache.h" -#include "distributed/multi_executor.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" +#include "distributed/multi_server_executor.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" /* to access LogRemoteCommands */ #include "distributed/transaction_management.h" @@ -284,10 +285,9 @@ ExecuteLocalTaskListExtended(List *taskList, if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) { List *queryStringList = task->taskQuery.data.queryStringList; - totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries( - queryStringList, - tupleDest, - task); + totalRowsProcessed += + LocallyPlanAndExecuteMultipleQueries(queryStringList, tupleDest, + task); continue; } diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 4c60d8342..5348a0b9c 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -27,6 +27,7 @@ #include "distributed/multi_server_executor.h" #include "distributed/coordinator_protocol.h" #include "distributed/subplan_execution.h" +#include "distributed/tuple_destination.h" #include "distributed/worker_protocol.h" #include "utils/lsyscache.h" @@ -93,45 +94,3 @@ JobExecutorType(DistributedPlan *distributedPlan) return MULTI_EXECUTOR_ADAPTIVE; } - - -/* - * CheckIfSizeLimitIsExceeded checks if the limit is exceeded by intermediate - * results, if there is any. - */ -bool -CheckIfSizeLimitIsExceeded(DistributedExecutionStats *executionStats) -{ - if (!SubPlanLevel || MaxIntermediateResult < 0) - { - return false; - } - - uint64 maxIntermediateResultInBytes = MaxIntermediateResult * 1024L; - if (executionStats->totalIntermediateResultSize < maxIntermediateResultInBytes) - { - return false; - } - - return true; -} - - -/* - * This function is called when the intermediate result size limitation is - * exceeded. It basically errors out with a detailed explanation. - */ -void -ErrorSizeLimitIsExceeded() -{ - ereport(ERROR, (errmsg("the intermediate result size exceeds " - "citus.max_intermediate_result_size (currently %d kB)", - MaxIntermediateResult), - errdetail("Citus restricts the size of intermediate " - "results of complex subqueries and CTEs to " - "avoid accidentally pulling large result sets " - "into once place."), - errhint("To run the current query, set " - "citus.max_intermediate_result_size to a higher" - " value or -1 to disable."))); -} diff --git a/src/backend/distributed/executor/tuple_destination.c b/src/backend/distributed/executor/tuple_destination.c index a6f5420d4..0fe1442fa 100644 --- a/src/backend/distributed/executor/tuple_destination.c +++ b/src/backend/distributed/executor/tuple_destination.c @@ -7,8 +7,12 @@ #include #include +#include "access/htup_details.h" +#include "distributed/multi_server_executor.h" +#include "distributed/subplan_execution.h" #include "distributed/tuple_destination.h" + /* * TupleStoreTupleDestination is internal representation of a TupleDestination * which forwards tuples to a tuple store. @@ -43,6 +47,8 @@ typedef struct TupleDestDestReceiver static void TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task, int placementIndex, int queryNumber, HeapTuple heapTuple, uint64 tupleLibpqSize); +static void EnsureIntermediateSizeLimitNotExceeded(TupleDestinationStats * + tupleDestinationStats); static TupleDesc TupleStoreTupleDestTupleDescForQuery(TupleDestination *self, int queryNumber); static void TupleDestNonePutTuple(TupleDestination *self, Task *task, @@ -66,12 +72,17 @@ CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor { TupleStoreTupleDestination *tupleStoreTupleDest = palloc0( sizeof(TupleStoreTupleDestination)); + tupleStoreTupleDest->tupleStore = tupleStore; tupleStoreTupleDest->tupleDesc = tupleDescriptor; tupleStoreTupleDest->pub.putTuple = TupleStoreTupleDestPutTuple; tupleStoreTupleDest->pub.tupleDescForQuery = TupleStoreTupleDestTupleDescForQuery; + TupleDestination *tupleDestination = &tupleStoreTupleDest->pub; + tupleDestination->tupleDestinationStats = + (TupleDestinationStats *) palloc0(sizeof(TupleDestinationStats)); + return (TupleDestination *) tupleStoreTupleDest; } @@ -86,11 +97,83 @@ TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task, HeapTuple heapTuple, uint64 tupleLibpqSize) { TupleStoreTupleDestination *tupleDest = (TupleStoreTupleDestination *) self; + + /* + * Remote execution sets tupleLibpqSize, however it is 0 for local execution. We prefer + * to use tupleLibpqSize for the remote execution because that reflects the exact data + * transfer size over the network. For local execution, we rely on the size of the + * tuple. + */ + uint64 tupleSize = tupleLibpqSize; + if (tupleSize == 0) + { + tupleSize = HeapTupleHeaderGetDatumLength(heapTuple); + } + + /* + * Enfoce citus.max_intermediate_result_size for subPlans if + * the caller requested. + */ + TupleDestinationStats *tupleDestinationStats = self->tupleDestinationStats; + if (SubPlanLevel > 0 && tupleDestinationStats != NULL) + { + tupleDestinationStats->totalIntermediateResultSize += tupleSize; + EnsureIntermediateSizeLimitNotExceeded(tupleDestinationStats); + } + + /* do the actual work */ tuplestore_puttuple(tupleDest->tupleStore, heapTuple); + + /* we record tuples received over network */ task->totalReceivedTupleData += tupleLibpqSize; } +/* + * EnsureIntermediateSizeLimitNotExceeded is a helper function for checking the current + * state of the tupleDestinationStats and throws error if necessary. + */ +static void +EnsureIntermediateSizeLimitNotExceeded(TupleDestinationStats *tupleDestinationStats) +{ + if (!tupleDestinationStats) + { + /* unexpected, still prefer defensive approach */ + return; + } + + /* + * We only care about subPlans. Also, if user disabled, no need to + * check further. + */ + if (SubPlanLevel == 0 || MaxIntermediateResult < 0) + { + return; + } + + uint64 maxIntermediateResultInBytes = MaxIntermediateResult * 1024L; + if (tupleDestinationStats->totalIntermediateResultSize < maxIntermediateResultInBytes) + { + /* + * We have not reached the size limit that the user requested, so + * nothing to do for now. + */ + return; + } + + ereport(ERROR, (errmsg("the intermediate result size exceeds " + "citus.max_intermediate_result_size (currently %d kB)", + MaxIntermediateResult), + errdetail("Citus restricts the size of intermediate " + "results of complex subqueries and CTEs to " + "avoid accidentally pulling large result sets " + "into once place."), + errhint("To run the current query, set " + "citus.max_intermediate_result_size to a higher" + " value or -1 to disable."))); +} + + /* * TupleStoreTupleDestTupleDescForQuery implements TupleDestination->TupleDescForQuery * for TupleStoreTupleDestination. @@ -203,7 +286,10 @@ TupleDestDestReceiverReceive(TupleTableSlot *slot, HeapTuple heapTuple = ExecFetchSlotTuple(slot); #endif - tupleDest->putTuple(tupleDest, task, placementIndex, queryNumber, heapTuple, 0); + uint64 tupleLibpqSize = 0; + + tupleDest->putTuple(tupleDest, task, placementIndex, queryNumber, heapTuple, + tupleLibpqSize); return true; } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index b8a94e929..1518756fa 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -311,7 +311,8 @@ typedef struct Task /* * totalReceivedTupleData only counts the data for a single placement. So * for RETURNING DML this is not really correct. This is used by - * EXPLAIN ANALYZE, to display the amount of received bytes. + * EXPLAIN ANALYZE, to display the amount of received bytes. The local execution + * does not increment this value, so only used for remote execution. */ uint64 totalReceivedTupleData; diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index 52b260880..fab0d31d6 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -15,7 +15,6 @@ #define MULTI_SERVER_EXECUTOR_H #include "distributed/multi_physical_planner.h" - #include "distributed/worker_manager.h" /* Adaptive executor repartioning related defines */ @@ -34,18 +33,6 @@ typedef enum } MultiExecutorType; -/* - * DistributedExecutionStats holds the execution related stats. - * - * totalIntermediateResultSize is a counter to keep the size - * of the intermediate results of complex subqueries and CTEs - * so that we can put a limit on the size. - */ -typedef struct DistributedExecutionStats -{ - uint64 totalIntermediateResultSize; -} DistributedExecutionStats; - /* Config variable managed via guc.c */ extern int RemoteTaskCheckInterval; extern int TaskExecutorType; @@ -55,7 +42,5 @@ extern int MultiTaskQueryLogLevel; /* Function declarations common to more than one executor */ extern MultiExecutorType JobExecutorType(DistributedPlan *distributedPlan); -extern bool CheckIfSizeLimitIsExceeded(DistributedExecutionStats *executionStats); -extern void ErrorSizeLimitIsExceeded(void); #endif /* MULTI_SERVER_EXECUTOR_H */ diff --git a/src/include/distributed/tuple_destination.h b/src/include/distributed/tuple_destination.h index 24e4b4aad..0480ffdc4 100644 --- a/src/include/distributed/tuple_destination.h +++ b/src/include/distributed/tuple_destination.h @@ -15,8 +15,23 @@ #include "tcop/dest.h" #include "utils/tuplestore.h" + typedef struct TupleDestination TupleDestination; + +/* + * TupleDestinationStats holds the size related stats. + * + * totalIntermediateResultSize is a counter to keep the size + * of the intermediate results of complex subqueries and CTEs + * so that we can put a limit on the size. + */ +typedef struct TupleDestinationStats +{ + uint64 totalIntermediateResultSize; +} TupleDestinationStats; + + /* * TupleDestination provides a generic interface for where to send tuples. * @@ -36,6 +51,12 @@ struct TupleDestination /* tupleDescForQuery returns tuple descriptor for a query number. Can return NULL. */ TupleDesc (*tupleDescForQuery)(TupleDestination *self, int queryNumber); + + /* + * Used to enforce citus.max_intermediate_result_size, could be NULL + * if the caller is not interested in the size. + */ + TupleDestinationStats *tupleDestinationStats; }; extern TupleDestination * CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc diff --git a/src/test/regress/expected/limit_intermediate_size.out b/src/test/regress/expected/limit_intermediate_size.out index 1bdda90be..66b2342fd 100644 --- a/src/test/regress/expected/limit_intermediate_size.out +++ b/src/test/regress/expected/limit_intermediate_size.out @@ -133,12 +133,10 @@ WITH cte AS ( SELECT * FROM cte2, cte3 WHERE cte2.value_1 IN (SELECT value_2 FROM cte3) ) SELECT count(*) FROM cte; - count ---------------------------------------------------------------------- - 1824 -(1 row) - --- this will fail in real_time_executor +ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 3 kB) +DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. +HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. +-- this will fail in remote execution SET citus.max_intermediate_result_size TO 2; WITH cte AS ( WITH cte2 AS ( diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 75b7eec03..b2f6b01d3 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -93,6 +93,34 @@ WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER 5 | 6 (5 rows) +-- we should be able to limit intermediate results +BEGIN; + SET LOCAL citus.max_intermediate_result_size TO 0; + WITH cte_1 AS (SELECT * FROM test OFFSET 0) SELECT * FROM cte_1; +ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 0 kB) +DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. +HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. +ROLLBACK; +-- the first cte (cte_1) does not exceed the limit +-- but the second (cte_2) exceeds, so we error out +BEGIN; + SET LOCAL citus.max_intermediate_result_size TO '1kB'; + INSERT INTO test SELECT i,i from generate_series(0,1000)i; + -- only pulls 1 row, should not hit the limit + WITH cte_1 AS (SELECT * FROM test LIMIT 1) SELECT count(*) FROM cte_1; + count +--------------------------------------------------------------------- + 1 +(1 row) + + -- cte_1 only pulls 1 row, but cte_2 all rows + WITH cte_1 AS (SELECT * FROM test LIMIT 1), + cte_2 AS (SELECT * FROM test OFFSET 0) + SELECT count(*) FROM cte_1, cte_2; +ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 1 kB) +DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. +HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. +ROLLBACK; -- single shard and multi-shard delete -- inside a transaction block BEGIN; diff --git a/src/test/regress/sql/limit_intermediate_size.sql b/src/test/regress/sql/limit_intermediate_size.sql index 6d4862443..09922a10b 100644 --- a/src/test/regress/sql/limit_intermediate_size.sql +++ b/src/test/regress/sql/limit_intermediate_size.sql @@ -116,7 +116,7 @@ WITH cte AS ( SELECT count(*) FROM cte; --- this will fail in real_time_executor +-- this will fail in remote execution SET citus.max_intermediate_result_size TO 2; WITH cte AS ( WITH cte2 AS ( diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index c6a734e8a..a0ab64b35 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -39,6 +39,27 @@ SELECT * FROM test ORDER BY x; UPDATE test SET y = y + 1 RETURNING *; WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER BY 1,2; +-- we should be able to limit intermediate results +BEGIN; + SET LOCAL citus.max_intermediate_result_size TO 0; + WITH cte_1 AS (SELECT * FROM test OFFSET 0) SELECT * FROM cte_1; +ROLLBACK; + +-- the first cte (cte_1) does not exceed the limit +-- but the second (cte_2) exceeds, so we error out +BEGIN; + SET LOCAL citus.max_intermediate_result_size TO '1kB'; + INSERT INTO test SELECT i,i from generate_series(0,1000)i; + + -- only pulls 1 row, should not hit the limit + WITH cte_1 AS (SELECT * FROM test LIMIT 1) SELECT count(*) FROM cte_1; + + -- cte_1 only pulls 1 row, but cte_2 all rows + WITH cte_1 AS (SELECT * FROM test LIMIT 1), + cte_2 AS (SELECT * FROM test OFFSET 0) + SELECT count(*) FROM cte_1, cte_2; +ROLLBACK; + -- single shard and multi-shard delete -- inside a transaction block BEGIN;