Local execution considers intermediate result size limit

With this commit, we make sure that local execution adds the
intermediate result size as the distributed execution adds. Plus,
it enforces the citus.max_intermediate_result_size value.
pull/4236/head
Onder Kalaci 2020-10-13 09:36:04 +02:00
parent ded4561661
commit fe3caf3bc8
11 changed files with 173 additions and 88 deletions

View File

@ -275,9 +275,6 @@ typedef struct DistributedExecution
*/
uint64 rowsProcessed;
/* statistics on distributed execution */
DistributedExecutionStats *executionStats;
/*
* The following fields are used while receiving results from remote nodes.
* We store this information here to avoid re-allocating it every time.
@ -1085,8 +1082,6 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
execution->localTaskList = NIL;
execution->remoteTaskList = NIL;
execution->executionStats =
(DistributedExecutionStats *) palloc0(sizeof(DistributedExecutionStats));
execution->paramListInfo = paramListInfo;
execution->workerList = NIL;
execution->sessionList = NIL;
@ -3681,7 +3676,6 @@ ReceiveResults(WorkerSession *session, bool storeRows)
MultiConnection *connection = session->connection;
WorkerPool *workerPool = session->workerPool;
DistributedExecution *execution = workerPool->distributedExecution;
DistributedExecutionStats *executionStats = execution->executionStats;
TaskPlacementExecution *placementExecution = session->currentTask;
ShardCommandExecution *shardCommandExecution =
placementExecution->shardCommandExecution;
@ -3822,10 +3816,10 @@ ReceiveResults(WorkerSession *session, bool storeRows)
*/
Assert(EnableBinaryProtocol || !binaryResults);
uint64 tupleLibpqSize = 0;
for (uint32 rowIndex = 0; rowIndex < rowsProcessed; rowIndex++)
{
uint64 tupleLibpqSize = 0;
/*
* Switch to a temporary memory context that we reset after each
* tuple. This protects us from any memory leaks that might be
@ -3864,10 +3858,7 @@ ReceiveResults(WorkerSession *session, bool storeRows)
}
columnArray[columnIndex] = value;
}
if (SubPlanLevel > 0 && executionStats != NULL)
{
executionStats->totalIntermediateResultSize += valueLength;
}
tupleLibpqSize += valueLength;
}
}
@ -3898,11 +3889,6 @@ ReceiveResults(WorkerSession *session, bool storeRows)
}
PQclear(result);
if (executionStats != NULL && CheckIfSizeLimitIsExceeded(executionStats))
{
ErrorSizeLimitIsExceeded();
}
}
/* the context is local to the function, so not needed anymore */

View File

@ -88,9 +88,10 @@
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/local_plan_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_server_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h" /* to access LogRemoteCommands */
#include "distributed/transaction_management.h"
@ -284,10 +285,9 @@ ExecuteLocalTaskListExtended(List *taskList,
if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST)
{
List *queryStringList = task->taskQuery.data.queryStringList;
totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries(
queryStringList,
tupleDest,
task);
totalRowsProcessed +=
LocallyPlanAndExecuteMultipleQueries(queryStringList, tupleDest,
task);
continue;
}

View File

@ -27,6 +27,7 @@
#include "distributed/multi_server_executor.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/subplan_execution.h"
#include "distributed/tuple_destination.h"
#include "distributed/worker_protocol.h"
#include "utils/lsyscache.h"
@ -93,45 +94,3 @@ JobExecutorType(DistributedPlan *distributedPlan)
return MULTI_EXECUTOR_ADAPTIVE;
}
/*
* CheckIfSizeLimitIsExceeded checks if the limit is exceeded by intermediate
* results, if there is any.
*/
bool
CheckIfSizeLimitIsExceeded(DistributedExecutionStats *executionStats)
{
if (!SubPlanLevel || MaxIntermediateResult < 0)
{
return false;
}
uint64 maxIntermediateResultInBytes = MaxIntermediateResult * 1024L;
if (executionStats->totalIntermediateResultSize < maxIntermediateResultInBytes)
{
return false;
}
return true;
}
/*
* This function is called when the intermediate result size limitation is
* exceeded. It basically errors out with a detailed explanation.
*/
void
ErrorSizeLimitIsExceeded()
{
ereport(ERROR, (errmsg("the intermediate result size exceeds "
"citus.max_intermediate_result_size (currently %d kB)",
MaxIntermediateResult),
errdetail("Citus restricts the size of intermediate "
"results of complex subqueries and CTEs to "
"avoid accidentally pulling large result sets "
"into once place."),
errhint("To run the current query, set "
"citus.max_intermediate_result_size to a higher"
" value or -1 to disable.")));
}

View File

@ -7,8 +7,12 @@
#include <sys/stat.h>
#include <unistd.h>
#include "access/htup_details.h"
#include "distributed/multi_server_executor.h"
#include "distributed/subplan_execution.h"
#include "distributed/tuple_destination.h"
/*
* TupleStoreTupleDestination is internal representation of a TupleDestination
* which forwards tuples to a tuple store.
@ -43,6 +47,8 @@ typedef struct TupleDestDestReceiver
static void TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple, uint64 tupleLibpqSize);
static void EnsureIntermediateSizeLimitNotExceeded(TupleDestinationStats *
tupleDestinationStats);
static TupleDesc TupleStoreTupleDestTupleDescForQuery(TupleDestination *self, int
queryNumber);
static void TupleDestNonePutTuple(TupleDestination *self, Task *task,
@ -66,12 +72,17 @@ CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor
{
TupleStoreTupleDestination *tupleStoreTupleDest = palloc0(
sizeof(TupleStoreTupleDestination));
tupleStoreTupleDest->tupleStore = tupleStore;
tupleStoreTupleDest->tupleDesc = tupleDescriptor;
tupleStoreTupleDest->pub.putTuple = TupleStoreTupleDestPutTuple;
tupleStoreTupleDest->pub.tupleDescForQuery =
TupleStoreTupleDestTupleDescForQuery;
TupleDestination *tupleDestination = &tupleStoreTupleDest->pub;
tupleDestination->tupleDestinationStats =
(TupleDestinationStats *) palloc0(sizeof(TupleDestinationStats));
return (TupleDestination *) tupleStoreTupleDest;
}
@ -86,11 +97,83 @@ TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task,
HeapTuple heapTuple, uint64 tupleLibpqSize)
{
TupleStoreTupleDestination *tupleDest = (TupleStoreTupleDestination *) self;
/*
* Remote execution sets tupleLibpqSize, however it is 0 for local execution. We prefer
* to use tupleLibpqSize for the remote execution because that reflects the exact data
* transfer size over the network. For local execution, we rely on the size of the
* tuple.
*/
uint64 tupleSize = tupleLibpqSize;
if (tupleSize == 0)
{
tupleSize = HeapTupleHeaderGetDatumLength(heapTuple);
}
/*
* Enfoce citus.max_intermediate_result_size for subPlans if
* the caller requested.
*/
TupleDestinationStats *tupleDestinationStats = self->tupleDestinationStats;
if (SubPlanLevel > 0 && tupleDestinationStats != NULL)
{
tupleDestinationStats->totalIntermediateResultSize += tupleSize;
EnsureIntermediateSizeLimitNotExceeded(tupleDestinationStats);
}
/* do the actual work */
tuplestore_puttuple(tupleDest->tupleStore, heapTuple);
/* we record tuples received over network */
task->totalReceivedTupleData += tupleLibpqSize;
}
/*
* EnsureIntermediateSizeLimitNotExceeded is a helper function for checking the current
* state of the tupleDestinationStats and throws error if necessary.
*/
static void
EnsureIntermediateSizeLimitNotExceeded(TupleDestinationStats *tupleDestinationStats)
{
if (!tupleDestinationStats)
{
/* unexpected, still prefer defensive approach */
return;
}
/*
* We only care about subPlans. Also, if user disabled, no need to
* check further.
*/
if (SubPlanLevel == 0 || MaxIntermediateResult < 0)
{
return;
}
uint64 maxIntermediateResultInBytes = MaxIntermediateResult * 1024L;
if (tupleDestinationStats->totalIntermediateResultSize < maxIntermediateResultInBytes)
{
/*
* We have not reached the size limit that the user requested, so
* nothing to do for now.
*/
return;
}
ereport(ERROR, (errmsg("the intermediate result size exceeds "
"citus.max_intermediate_result_size (currently %d kB)",
MaxIntermediateResult),
errdetail("Citus restricts the size of intermediate "
"results of complex subqueries and CTEs to "
"avoid accidentally pulling large result sets "
"into once place."),
errhint("To run the current query, set "
"citus.max_intermediate_result_size to a higher"
" value or -1 to disable.")));
}
/*
* TupleStoreTupleDestTupleDescForQuery implements TupleDestination->TupleDescForQuery
* for TupleStoreTupleDestination.
@ -203,7 +286,10 @@ TupleDestDestReceiverReceive(TupleTableSlot *slot,
HeapTuple heapTuple = ExecFetchSlotTuple(slot);
#endif
tupleDest->putTuple(tupleDest, task, placementIndex, queryNumber, heapTuple, 0);
uint64 tupleLibpqSize = 0;
tupleDest->putTuple(tupleDest, task, placementIndex, queryNumber, heapTuple,
tupleLibpqSize);
return true;
}

View File

@ -311,7 +311,8 @@ typedef struct Task
/*
* totalReceivedTupleData only counts the data for a single placement. So
* for RETURNING DML this is not really correct. This is used by
* EXPLAIN ANALYZE, to display the amount of received bytes.
* EXPLAIN ANALYZE, to display the amount of received bytes. The local execution
* does not increment this value, so only used for remote execution.
*/
uint64 totalReceivedTupleData;

View File

@ -15,7 +15,6 @@
#define MULTI_SERVER_EXECUTOR_H
#include "distributed/multi_physical_planner.h"
#include "distributed/worker_manager.h"
/* Adaptive executor repartioning related defines */
@ -34,18 +33,6 @@ typedef enum
} MultiExecutorType;
/*
* DistributedExecutionStats holds the execution related stats.
*
* totalIntermediateResultSize is a counter to keep the size
* of the intermediate results of complex subqueries and CTEs
* so that we can put a limit on the size.
*/
typedef struct DistributedExecutionStats
{
uint64 totalIntermediateResultSize;
} DistributedExecutionStats;
/* Config variable managed via guc.c */
extern int RemoteTaskCheckInterval;
extern int TaskExecutorType;
@ -55,7 +42,5 @@ extern int MultiTaskQueryLogLevel;
/* Function declarations common to more than one executor */
extern MultiExecutorType JobExecutorType(DistributedPlan *distributedPlan);
extern bool CheckIfSizeLimitIsExceeded(DistributedExecutionStats *executionStats);
extern void ErrorSizeLimitIsExceeded(void);
#endif /* MULTI_SERVER_EXECUTOR_H */

View File

@ -15,8 +15,23 @@
#include "tcop/dest.h"
#include "utils/tuplestore.h"
typedef struct TupleDestination TupleDestination;
/*
* TupleDestinationStats holds the size related stats.
*
* totalIntermediateResultSize is a counter to keep the size
* of the intermediate results of complex subqueries and CTEs
* so that we can put a limit on the size.
*/
typedef struct TupleDestinationStats
{
uint64 totalIntermediateResultSize;
} TupleDestinationStats;
/*
* TupleDestination provides a generic interface for where to send tuples.
*
@ -36,6 +51,12 @@ struct TupleDestination
/* tupleDescForQuery returns tuple descriptor for a query number. Can return NULL. */
TupleDesc (*tupleDescForQuery)(TupleDestination *self, int queryNumber);
/*
* Used to enforce citus.max_intermediate_result_size, could be NULL
* if the caller is not interested in the size.
*/
TupleDestinationStats *tupleDestinationStats;
};
extern TupleDestination * CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc

View File

@ -133,12 +133,10 @@ WITH cte AS (
SELECT * FROM cte2, cte3 WHERE cte2.value_1 IN (SELECT value_2 FROM cte3)
)
SELECT count(*) FROM cte;
count
---------------------------------------------------------------------
1824
(1 row)
-- this will fail in real_time_executor
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 3 kB)
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
-- this will fail in remote execution
SET citus.max_intermediate_result_size TO 2;
WITH cte AS (
WITH cte2 AS (

View File

@ -93,6 +93,34 @@ WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER
5 | 6
(5 rows)
-- we should be able to limit intermediate results
BEGIN;
SET LOCAL citus.max_intermediate_result_size TO 0;
WITH cte_1 AS (SELECT * FROM test OFFSET 0) SELECT * FROM cte_1;
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 0 kB)
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
ROLLBACK;
-- the first cte (cte_1) does not exceed the limit
-- but the second (cte_2) exceeds, so we error out
BEGIN;
SET LOCAL citus.max_intermediate_result_size TO '1kB';
INSERT INTO test SELECT i,i from generate_series(0,1000)i;
-- only pulls 1 row, should not hit the limit
WITH cte_1 AS (SELECT * FROM test LIMIT 1) SELECT count(*) FROM cte_1;
count
---------------------------------------------------------------------
1
(1 row)
-- cte_1 only pulls 1 row, but cte_2 all rows
WITH cte_1 AS (SELECT * FROM test LIMIT 1),
cte_2 AS (SELECT * FROM test OFFSET 0)
SELECT count(*) FROM cte_1, cte_2;
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 1 kB)
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
ROLLBACK;
-- single shard and multi-shard delete
-- inside a transaction block
BEGIN;

View File

@ -116,7 +116,7 @@ WITH cte AS (
SELECT count(*) FROM cte;
-- this will fail in real_time_executor
-- this will fail in remote execution
SET citus.max_intermediate_result_size TO 2;
WITH cte AS (
WITH cte2 AS (

View File

@ -39,6 +39,27 @@ SELECT * FROM test ORDER BY x;
UPDATE test SET y = y + 1 RETURNING *;
WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER BY 1,2;
-- we should be able to limit intermediate results
BEGIN;
SET LOCAL citus.max_intermediate_result_size TO 0;
WITH cte_1 AS (SELECT * FROM test OFFSET 0) SELECT * FROM cte_1;
ROLLBACK;
-- the first cte (cte_1) does not exceed the limit
-- but the second (cte_2) exceeds, so we error out
BEGIN;
SET LOCAL citus.max_intermediate_result_size TO '1kB';
INSERT INTO test SELECT i,i from generate_series(0,1000)i;
-- only pulls 1 row, should not hit the limit
WITH cte_1 AS (SELECT * FROM test LIMIT 1) SELECT count(*) FROM cte_1;
-- cte_1 only pulls 1 row, but cte_2 all rows
WITH cte_1 AS (SELECT * FROM test LIMIT 1),
cte_2 AS (SELECT * FROM test OFFSET 0)
SELECT count(*) FROM cte_1, cte_2;
ROLLBACK;
-- single shard and multi-shard delete
-- inside a transaction block
BEGIN;