Merge pull request #4236 from citusdata/fix_intermediate_size

Local execution enforces citus.max_intermediate_result_size
pull/4245/head
Önder Kalacı 2020-10-15 17:26:51 +02:00 committed by GitHub
commit 3e5a92d33b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 173 additions and 88 deletions

View File

@ -275,9 +275,6 @@ typedef struct DistributedExecution
*/
uint64 rowsProcessed;
/* statistics on distributed execution */
DistributedExecutionStats *executionStats;
/*
* The following fields are used while receiving results from remote nodes.
* We store this information here to avoid re-allocating it every time.
@ -1085,8 +1082,6 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
execution->localTaskList = NIL;
execution->remoteTaskList = NIL;
execution->executionStats =
(DistributedExecutionStats *) palloc0(sizeof(DistributedExecutionStats));
execution->paramListInfo = paramListInfo;
execution->workerList = NIL;
execution->sessionList = NIL;
@ -3681,7 +3676,6 @@ ReceiveResults(WorkerSession *session, bool storeRows)
MultiConnection *connection = session->connection;
WorkerPool *workerPool = session->workerPool;
DistributedExecution *execution = workerPool->distributedExecution;
DistributedExecutionStats *executionStats = execution->executionStats;
TaskPlacementExecution *placementExecution = session->currentTask;
ShardCommandExecution *shardCommandExecution =
placementExecution->shardCommandExecution;
@ -3822,10 +3816,10 @@ ReceiveResults(WorkerSession *session, bool storeRows)
*/
Assert(EnableBinaryProtocol || !binaryResults);
uint64 tupleLibpqSize = 0;
for (uint32 rowIndex = 0; rowIndex < rowsProcessed; rowIndex++)
{
uint64 tupleLibpqSize = 0;
/*
* Switch to a temporary memory context that we reset after each
* tuple. This protects us from any memory leaks that might be
@ -3864,10 +3858,7 @@ ReceiveResults(WorkerSession *session, bool storeRows)
}
columnArray[columnIndex] = value;
}
if (SubPlanLevel > 0 && executionStats != NULL)
{
executionStats->totalIntermediateResultSize += valueLength;
}
tupleLibpqSize += valueLength;
}
}
@ -3898,11 +3889,6 @@ ReceiveResults(WorkerSession *session, bool storeRows)
}
PQclear(result);
if (executionStats != NULL && CheckIfSizeLimitIsExceeded(executionStats))
{
ErrorSizeLimitIsExceeded();
}
}
/* the context is local to the function, so not needed anymore */

View File

@ -88,9 +88,10 @@
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/local_plan_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_server_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h" /* to access LogRemoteCommands */
#include "distributed/transaction_management.h"
@ -284,10 +285,9 @@ ExecuteLocalTaskListExtended(List *taskList,
if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST)
{
List *queryStringList = task->taskQuery.data.queryStringList;
totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries(
queryStringList,
tupleDest,
task);
totalRowsProcessed +=
LocallyPlanAndExecuteMultipleQueries(queryStringList, tupleDest,
task);
continue;
}

View File

@ -27,6 +27,7 @@
#include "distributed/multi_server_executor.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/subplan_execution.h"
#include "distributed/tuple_destination.h"
#include "distributed/worker_protocol.h"
#include "utils/lsyscache.h"
@ -93,45 +94,3 @@ JobExecutorType(DistributedPlan *distributedPlan)
return MULTI_EXECUTOR_ADAPTIVE;
}
/*
* CheckIfSizeLimitIsExceeded checks if the limit is exceeded by intermediate
* results, if there is any.
*/
bool
CheckIfSizeLimitIsExceeded(DistributedExecutionStats *executionStats)
{
if (!SubPlanLevel || MaxIntermediateResult < 0)
{
return false;
}
uint64 maxIntermediateResultInBytes = MaxIntermediateResult * 1024L;
if (executionStats->totalIntermediateResultSize < maxIntermediateResultInBytes)
{
return false;
}
return true;
}
/*
* This function is called when the intermediate result size limitation is
* exceeded. It basically errors out with a detailed explanation.
*/
void
ErrorSizeLimitIsExceeded()
{
ereport(ERROR, (errmsg("the intermediate result size exceeds "
"citus.max_intermediate_result_size (currently %d kB)",
MaxIntermediateResult),
errdetail("Citus restricts the size of intermediate "
"results of complex subqueries and CTEs to "
"avoid accidentally pulling large result sets "
"into once place."),
errhint("To run the current query, set "
"citus.max_intermediate_result_size to a higher"
" value or -1 to disable.")));
}

View File

@ -7,8 +7,12 @@
#include <sys/stat.h>
#include <unistd.h>
#include "access/htup_details.h"
#include "distributed/multi_server_executor.h"
#include "distributed/subplan_execution.h"
#include "distributed/tuple_destination.h"
/*
* TupleStoreTupleDestination is internal representation of a TupleDestination
* which forwards tuples to a tuple store.
@ -43,6 +47,8 @@ typedef struct TupleDestDestReceiver
static void TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple, uint64 tupleLibpqSize);
static void EnsureIntermediateSizeLimitNotExceeded(TupleDestinationStats *
tupleDestinationStats);
static TupleDesc TupleStoreTupleDestTupleDescForQuery(TupleDestination *self, int
queryNumber);
static void TupleDestNonePutTuple(TupleDestination *self, Task *task,
@ -66,12 +72,17 @@ CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor
{
TupleStoreTupleDestination *tupleStoreTupleDest = palloc0(
sizeof(TupleStoreTupleDestination));
tupleStoreTupleDest->tupleStore = tupleStore;
tupleStoreTupleDest->tupleDesc = tupleDescriptor;
tupleStoreTupleDest->pub.putTuple = TupleStoreTupleDestPutTuple;
tupleStoreTupleDest->pub.tupleDescForQuery =
TupleStoreTupleDestTupleDescForQuery;
TupleDestination *tupleDestination = &tupleStoreTupleDest->pub;
tupleDestination->tupleDestinationStats =
(TupleDestinationStats *) palloc0(sizeof(TupleDestinationStats));
return (TupleDestination *) tupleStoreTupleDest;
}
@ -86,11 +97,83 @@ TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task,
HeapTuple heapTuple, uint64 tupleLibpqSize)
{
TupleStoreTupleDestination *tupleDest = (TupleStoreTupleDestination *) self;
/*
* Remote execution sets tupleLibpqSize, however it is 0 for local execution. We prefer
* to use tupleLibpqSize for the remote execution because that reflects the exact data
* transfer size over the network. For local execution, we rely on the size of the
* tuple.
*/
uint64 tupleSize = tupleLibpqSize;
if (tupleSize == 0)
{
tupleSize = HeapTupleHeaderGetDatumLength(heapTuple);
}
/*
* Enfoce citus.max_intermediate_result_size for subPlans if
* the caller requested.
*/
TupleDestinationStats *tupleDestinationStats = self->tupleDestinationStats;
if (SubPlanLevel > 0 && tupleDestinationStats != NULL)
{
tupleDestinationStats->totalIntermediateResultSize += tupleSize;
EnsureIntermediateSizeLimitNotExceeded(tupleDestinationStats);
}
/* do the actual work */
tuplestore_puttuple(tupleDest->tupleStore, heapTuple);
/* we record tuples received over network */
task->totalReceivedTupleData += tupleLibpqSize;
}
/*
* EnsureIntermediateSizeLimitNotExceeded is a helper function for checking the current
* state of the tupleDestinationStats and throws error if necessary.
*/
static void
EnsureIntermediateSizeLimitNotExceeded(TupleDestinationStats *tupleDestinationStats)
{
if (!tupleDestinationStats)
{
/* unexpected, still prefer defensive approach */
return;
}
/*
* We only care about subPlans. Also, if user disabled, no need to
* check further.
*/
if (SubPlanLevel == 0 || MaxIntermediateResult < 0)
{
return;
}
uint64 maxIntermediateResultInBytes = MaxIntermediateResult * 1024L;
if (tupleDestinationStats->totalIntermediateResultSize < maxIntermediateResultInBytes)
{
/*
* We have not reached the size limit that the user requested, so
* nothing to do for now.
*/
return;
}
ereport(ERROR, (errmsg("the intermediate result size exceeds "
"citus.max_intermediate_result_size (currently %d kB)",
MaxIntermediateResult),
errdetail("Citus restricts the size of intermediate "
"results of complex subqueries and CTEs to "
"avoid accidentally pulling large result sets "
"into once place."),
errhint("To run the current query, set "
"citus.max_intermediate_result_size to a higher"
" value or -1 to disable.")));
}
/*
* TupleStoreTupleDestTupleDescForQuery implements TupleDestination->TupleDescForQuery
* for TupleStoreTupleDestination.
@ -203,7 +286,10 @@ TupleDestDestReceiverReceive(TupleTableSlot *slot,
HeapTuple heapTuple = ExecFetchSlotTuple(slot);
#endif
tupleDest->putTuple(tupleDest, task, placementIndex, queryNumber, heapTuple, 0);
uint64 tupleLibpqSize = 0;
tupleDest->putTuple(tupleDest, task, placementIndex, queryNumber, heapTuple,
tupleLibpqSize);
return true;
}

View File

@ -311,7 +311,8 @@ typedef struct Task
/*
* totalReceivedTupleData only counts the data for a single placement. So
* for RETURNING DML this is not really correct. This is used by
* EXPLAIN ANALYZE, to display the amount of received bytes.
* EXPLAIN ANALYZE, to display the amount of received bytes. The local execution
* does not increment this value, so only used for remote execution.
*/
uint64 totalReceivedTupleData;

View File

@ -15,7 +15,6 @@
#define MULTI_SERVER_EXECUTOR_H
#include "distributed/multi_physical_planner.h"
#include "distributed/worker_manager.h"
/* Adaptive executor repartioning related defines */
@ -34,18 +33,6 @@ typedef enum
} MultiExecutorType;
/*
* DistributedExecutionStats holds the execution related stats.
*
* totalIntermediateResultSize is a counter to keep the size
* of the intermediate results of complex subqueries and CTEs
* so that we can put a limit on the size.
*/
typedef struct DistributedExecutionStats
{
uint64 totalIntermediateResultSize;
} DistributedExecutionStats;
/* Config variable managed via guc.c */
extern int RemoteTaskCheckInterval;
extern int TaskExecutorType;
@ -55,7 +42,5 @@ extern int MultiTaskQueryLogLevel;
/* Function declarations common to more than one executor */
extern MultiExecutorType JobExecutorType(DistributedPlan *distributedPlan);
extern bool CheckIfSizeLimitIsExceeded(DistributedExecutionStats *executionStats);
extern void ErrorSizeLimitIsExceeded(void);
#endif /* MULTI_SERVER_EXECUTOR_H */

View File

@ -15,8 +15,23 @@
#include "tcop/dest.h"
#include "utils/tuplestore.h"
typedef struct TupleDestination TupleDestination;
/*
* TupleDestinationStats holds the size related stats.
*
* totalIntermediateResultSize is a counter to keep the size
* of the intermediate results of complex subqueries and CTEs
* so that we can put a limit on the size.
*/
typedef struct TupleDestinationStats
{
uint64 totalIntermediateResultSize;
} TupleDestinationStats;
/*
* TupleDestination provides a generic interface for where to send tuples.
*
@ -36,6 +51,12 @@ struct TupleDestination
/* tupleDescForQuery returns tuple descriptor for a query number. Can return NULL. */
TupleDesc (*tupleDescForQuery)(TupleDestination *self, int queryNumber);
/*
* Used to enforce citus.max_intermediate_result_size, could be NULL
* if the caller is not interested in the size.
*/
TupleDestinationStats *tupleDestinationStats;
};
extern TupleDestination * CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc

View File

@ -133,12 +133,10 @@ WITH cte AS (
SELECT * FROM cte2, cte3 WHERE cte2.value_1 IN (SELECT value_2 FROM cte3)
)
SELECT count(*) FROM cte;
count
---------------------------------------------------------------------
1824
(1 row)
-- this will fail in real_time_executor
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 3 kB)
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
-- this will fail in remote execution
SET citus.max_intermediate_result_size TO 2;
WITH cte AS (
WITH cte2 AS (

View File

@ -93,6 +93,34 @@ WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER
5 | 6
(5 rows)
-- we should be able to limit intermediate results
BEGIN;
SET LOCAL citus.max_intermediate_result_size TO 0;
WITH cte_1 AS (SELECT * FROM test OFFSET 0) SELECT * FROM cte_1;
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 0 kB)
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
ROLLBACK;
-- the first cte (cte_1) does not exceed the limit
-- but the second (cte_2) exceeds, so we error out
BEGIN;
SET LOCAL citus.max_intermediate_result_size TO '1kB';
INSERT INTO test SELECT i,i from generate_series(0,1000)i;
-- only pulls 1 row, should not hit the limit
WITH cte_1 AS (SELECT * FROM test LIMIT 1) SELECT count(*) FROM cte_1;
count
---------------------------------------------------------------------
1
(1 row)
-- cte_1 only pulls 1 row, but cte_2 all rows
WITH cte_1 AS (SELECT * FROM test LIMIT 1),
cte_2 AS (SELECT * FROM test OFFSET 0)
SELECT count(*) FROM cte_1, cte_2;
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 1 kB)
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
ROLLBACK;
-- single shard and multi-shard delete
-- inside a transaction block
BEGIN;

View File

@ -116,7 +116,7 @@ WITH cte AS (
SELECT count(*) FROM cte;
-- this will fail in real_time_executor
-- this will fail in remote execution
SET citus.max_intermediate_result_size TO 2;
WITH cte AS (
WITH cte2 AS (

View File

@ -39,6 +39,27 @@ SELECT * FROM test ORDER BY x;
UPDATE test SET y = y + 1 RETURNING *;
WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER BY 1,2;
-- we should be able to limit intermediate results
BEGIN;
SET LOCAL citus.max_intermediate_result_size TO 0;
WITH cte_1 AS (SELECT * FROM test OFFSET 0) SELECT * FROM cte_1;
ROLLBACK;
-- the first cte (cte_1) does not exceed the limit
-- but the second (cte_2) exceeds, so we error out
BEGIN;
SET LOCAL citus.max_intermediate_result_size TO '1kB';
INSERT INTO test SELECT i,i from generate_series(0,1000)i;
-- only pulls 1 row, should not hit the limit
WITH cte_1 AS (SELECT * FROM test LIMIT 1) SELECT count(*) FROM cte_1;
-- cte_1 only pulls 1 row, but cte_2 all rows
WITH cte_1 AS (SELECT * FROM test LIMIT 1),
cte_2 AS (SELECT * FROM test OFFSET 0)
SELECT count(*) FROM cte_1, cte_2;
ROLLBACK;
-- single shard and multi-shard delete
-- inside a transaction block
BEGIN;