Intermediate result size limitation

This commit introduces a new GUC to limit the intermediate
result size which we handle when we use read_intermediate_result
function for CTEs and complex subqueries.
pull/1883/head
mehmet furkan şahin 2017-12-15 17:20:01 +03:00
parent 54ccfb24be
commit fd546cf322
16 changed files with 720 additions and 23 deletions

View File

@ -24,6 +24,7 @@
#include "distributed/multi_server_executor.h"
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/subplan_execution.h"
#include <errno.h>
#include <unistd.h>
@ -703,7 +704,7 @@ MultiClientQueryStatus(int32 connectionId)
/* MultiClientCopyData copies data from the file. */
CopyStatus
MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
MultiClientCopyData(int32 connectionId, int32 fileDescriptor, uint64 *returnBytesReceived)
{
MultiConnection *connection = NULL;
char *receiveBuffer = NULL;
@ -735,6 +736,11 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
int appended = -1;
errno = 0;
if (returnBytesReceived)
{
*returnBytesReceived += receiveLength;
}
appended = write(fileDescriptor, receiveBuffer, receiveLength);
if (appended != receiveLength)
{

View File

@ -19,7 +19,6 @@
#include "postgres.h"
#include "miscadmin.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/xact.h"
@ -42,7 +41,8 @@
/* Local functions forward declarations */
static ConnectAction ManageTaskExecution(Task *task, TaskExecution *taskExecution,
TaskExecutionStatus *executionStatus);
TaskExecutionStatus *executionStatus,
DistributedExecutionStats *executionStats);
static bool TaskExecutionReadyToStart(TaskExecution *taskExecution);
static bool TaskExecutionCompleted(TaskExecution *taskExecution);
static void CancelTaskExecutionIfActive(TaskExecution *taskExecution);
@ -83,6 +83,8 @@ MultiRealTimeExecute(Job *job)
bool allTasksCompleted = false;
bool taskCompleted = false;
bool taskFailed = false;
bool sizeLimitIsExceeded = false;
DistributedExecutionStats executionStats = { 0 };
List *workerNodeList = NIL;
HTAB *workerHash = NULL;
@ -107,7 +109,8 @@ MultiRealTimeExecute(Job *job)
}
/* loop around until all tasks complete, one task fails, or user cancels */
while (!(allTasksCompleted || taskFailed || QueryCancelPending))
while (!(allTasksCompleted || taskFailed || QueryCancelPending ||
sizeLimitIsExceeded))
{
uint32 taskCount = list_length(taskList);
uint32 completedTaskCount = 0;
@ -137,7 +140,8 @@ MultiRealTimeExecute(Job *job)
}
/* call the function that performs the core task execution logic */
connectAction = ManageTaskExecution(task, taskExecution, &executionStatus);
connectAction = ManageTaskExecution(task, taskExecution, &executionStatus,
&executionStats);
/* update the connection counter for throttling */
UpdateConnectionCounter(workerNodeState, connectAction);
@ -173,6 +177,13 @@ MultiRealTimeExecute(Job *job)
}
}
/* in case the task has intermediate results */
if (CheckIfSizeLimitIsExceeded(&executionStats))
{
sizeLimitIsExceeded = true;
break;
}
/*
* Check if all tasks completed; otherwise wait as appropriate to
* avoid a tight loop. That means we immediately continue if tasks are
@ -235,7 +246,11 @@ MultiRealTimeExecute(Job *job)
* user cancellation request, we can now safely emit an error message (all
* client-side resources have been cleared).
*/
if (taskFailed)
if (sizeLimitIsExceeded)
{
ErrorSizeLimitIsExceeded();
}
else if (taskFailed)
{
ereport(ERROR, (errmsg("failed to execute task %u", failedTaskId)));
}
@ -258,7 +273,8 @@ MultiRealTimeExecute(Job *job)
*/
static ConnectAction
ManageTaskExecution(Task *task, TaskExecution *taskExecution,
TaskExecutionStatus *executionStatus)
TaskExecutionStatus *executionStatus,
DistributedExecutionStats *executionStats)
{
TaskExecStatus *taskStatusArray = taskExecution->taskStatusArray;
int32 *connectionIdArray = taskExecution->connectionIdArray;
@ -660,9 +676,16 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
int32 connectionId = connectionIdArray[currentIndex];
int32 fileDesc = fileDescriptorArray[currentIndex];
int closed = -1;
uint64 bytesReceived = 0;
/* copy data from worker node, and write to local file */
CopyStatus copyStatus = MultiClientCopyData(connectionId, fileDesc);
CopyStatus copyStatus = MultiClientCopyData(connectionId, fileDesc,
&bytesReceived);
if (SubPlanLevel > 0)
{
executionStats->totalIntermediateResultSize += bytesReceived;
}
/* if worker node will continue to send more data, keep reading */
if (copyStatus == CLIENT_COPY_MORE)

View File

@ -97,7 +97,8 @@ static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query,
ParamListInfo paramListInfo);
static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
bool failOnError, int64 *rows);
bool failOnError, int64 *rows,
DistributedExecutionStats *executionStats);
static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError,
int64 *rows);
@ -577,6 +578,7 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
ListCell *taskPlacementCell = NULL;
char *queryString = task->queryString;
List *relationShardList = task->relationShardList;
DistributedExecutionStats executionStats = { 0 };
/*
* Try to run the query to completion on one placement. If the query fails
@ -638,7 +640,14 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
}
queryOK = StoreQueryResult(scanState, connection, dontFailOnError,
&currentAffectedTupleCount);
&currentAffectedTupleCount,
&executionStats);
if (CheckIfSizeLimitIsExceeded(&executionStats))
{
ErrorSizeLimitIsExceeded();
}
if (queryOK)
{
return;
@ -821,7 +830,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask
if (!gotResults && expectResults)
{
queryOK = StoreQueryResult(scanState, connection, failOnError,
&currentAffectedTupleCount);
&currentAffectedTupleCount, NULL);
}
else
{
@ -1156,7 +1165,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
Assert(scanState != NULL);
queryOK = StoreQueryResult(scanState, connection, failOnError,
&currentAffectedTupleCount);
&currentAffectedTupleCount, NULL);
}
else
{
@ -1342,7 +1351,8 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT
*/
static bool
StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
bool failOnError, int64 *rows)
bool failOnError, int64 *rows,
DistributedExecutionStats *executionStats)
{
TupleDesc tupleDescriptor =
scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
@ -1442,6 +1452,12 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
else
{
columnArray[columnIndex] = PQgetvalue(result, rowIndex, columnIndex);
if (SubPlanLevel > 0)
{
executionStats->totalIntermediateResultSize += PQgetlength(result,
rowIndex,
columnIndex);
}
}
}

View File

@ -23,9 +23,9 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_resowner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/subplan_execution.h"
#include "distributed/worker_protocol.h"
int RemoteTaskCheckInterval = 100; /* per cycle sleep interval in millisecs */
int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */
bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */
@ -281,3 +281,47 @@ AdjustStateForFailure(TaskExecution *taskExecution)
taskExecution->dataFetchTaskIndex = -1; /* reset data fetch counter */
taskExecution->failureCount++; /* record failure */
}
/*
* CheckIfSizeLimitIsExceeded checks if the limit is exceeded by intermediate
* results, if there is any.
*/
bool
CheckIfSizeLimitIsExceeded(DistributedExecutionStats *executionStats)
{
uint64 maxIntermediateResultInBytes = 0;
if (!SubPlanLevel || MaxIntermediateResult < 0)
{
return false;
}
maxIntermediateResultInBytes = MaxIntermediateResult * 1024L;
if (executionStats->totalIntermediateResultSize < maxIntermediateResultInBytes)
{
return false;
}
return true;
}
/*
* This function is called when the intermediate result size limitation is
* exceeded. It basically errors out with a detailed explanation.
*/
void
ErrorSizeLimitIsExceeded()
{
ereport(ERROR, (errmsg("the intermediate result size exceeds "
"citus.max_intermediate_result_size (currently %d kB)",
MaxIntermediateResult),
errdetail("Citus restricts the size of intermediate "
"results of complex subqueries and CTEs to "
"avoid accidentally pulling large result sets "
"into once place."),
errhint("To run the current query, set "
"citus.max_intermediate_result_size to a higher"
" value or -1 to disable.")));
}

View File

@ -35,6 +35,7 @@
#include "distributed/multi_server_executor.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h"
#include "distributed/subplan_execution.h"
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h"
#include "storage/fd.h"
@ -96,7 +97,9 @@ static TaskExecStatus ManageTaskExecution(TaskTracker *taskTracker,
Task *task, TaskExecution *taskExecution);
static TransmitExecStatus ManageTransmitExecution(TaskTracker *transmitTracker,
Task *task,
TaskExecution *taskExecution);
TaskExecution *taskExecution,
DistributedExecutionStats *
executionStats);
static bool TaskExecutionsCompleted(List *taskList);
static StringInfo MapFetchTaskQueryString(Task *mapFetchTask, Task *mapTask);
static void TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task);
@ -161,7 +164,9 @@ MultiTaskTrackerExecute(Job *job)
bool taskFailed = false;
bool taskTransmitFailed = false;
bool clusterFailed = false;
bool sizeLimitIsExceeded = false;
DistributedExecutionStats executionStats = { 0 };
List *workerNodeList = NIL;
HTAB *taskTrackerHash = NULL;
HTAB *transmitTrackerHash = NULL;
@ -219,7 +224,7 @@ MultiTaskTrackerExecute(Job *job)
/* loop around until all tasks complete, one task fails, or user cancels */
while (!(allTasksCompleted || taskFailed || taskTransmitFailed ||
clusterFailed || QueryCancelPending))
clusterFailed || QueryCancelPending || sizeLimitIsExceeded))
{
TaskTracker *taskTracker = NULL;
TaskTracker *transmitTracker = NULL;
@ -328,7 +333,8 @@ MultiTaskTrackerExecute(Job *job)
/* call the function that fetches results for completed SQL tasks */
transmitExecutionStatus = ManageTransmitExecution(execTransmitTracker,
task, taskExecution);
task, taskExecution,
&executionStats);
/*
* If we cannot transmit SQL task's results to the master, we first
@ -363,6 +369,13 @@ MultiTaskTrackerExecute(Job *job)
}
}
if (CheckIfSizeLimitIsExceeded(&executionStats))
{
sizeLimitIsExceeded = true;
break;
}
/* third, loop around task trackers and manage them */
hash_seq_init(&taskStatus, taskTrackerHash);
hash_seq_init(&transmitStatus, transmitTrackerHash);
@ -428,7 +441,11 @@ MultiTaskTrackerExecute(Job *job)
* If we previously broke out of the execution loop due to a task failure or
* user cancellation request, we can now safely emit an error message.
*/
if (taskFailed)
if (sizeLimitIsExceeded)
{
ErrorSizeLimitIsExceeded();
}
else if (taskFailed)
{
ereport(ERROR, (errmsg("failed to execute task %u", failedTaskId)));
}
@ -1273,7 +1290,8 @@ ManageTaskExecution(TaskTracker *taskTracker, TaskTracker *sourceTaskTracker,
*/
static TransmitExecStatus
ManageTransmitExecution(TaskTracker *transmitTracker,
Task *task, TaskExecution *taskExecution)
Task *task, TaskExecution *taskExecution,
DistributedExecutionStats *executionStats)
{
int32 *fileDescriptorArray = taskExecution->fileDescriptorArray;
uint32 currentNodeIndex = taskExecution->currentNodeIndex;
@ -1397,12 +1415,20 @@ ManageTransmitExecution(TaskTracker *transmitTracker,
int32 fileDescriptor = fileDescriptorArray[currentNodeIndex];
CopyStatus copyStatus = CLIENT_INVALID_COPY;
int closed = -1;
uint64 bytesReceived = 0;
/* the open connection belongs to this task */
int32 connectionId = TransmitTrackerConnectionId(transmitTracker, task);
Assert(connectionId != INVALID_CONNECTION_ID);
copyStatus = MultiClientCopyData(connectionId, fileDescriptor);
copyStatus = MultiClientCopyData(connectionId, fileDescriptor,
&bytesReceived);
if (SubPlanLevel > 0)
{
executionStats->totalIntermediateResultSize += bytesReceived;
}
if (copyStatus == CLIENT_COPY_MORE)
{
/* worker node continues to send more data, keep reading */

View File

@ -19,6 +19,11 @@
#include "executor/executor.h"
int MaxIntermediateResult = 1048576; /* maximum size in KB the intermediate result can grow to */
/* when this is true, we enforce intermediate result size limit in all executors */
int SubPlanLevel = 0;
/*
* ExecuteSubPlans executes a list of subplans from a distributed plan
* by sequentially executing each plan from the top.
@ -43,6 +48,7 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
char *resultId = GenerateResultId(planId, subPlanId);
SubPlanLevel++;
estate = CreateExecutorState();
copyDest = (DestReceiver *) CreateRemoteFileDestReceiver(resultId, estate,
nodeList,
@ -50,6 +56,7 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest);
SubPlanLevel--;
FreeExecutorState(estate);
}
}

View File

@ -41,6 +41,7 @@
#include "distributed/remote_commands.h"
#include "distributed/shared_library_init.h"
#include "distributed/statistics_collection.h"
#include "distributed/subplan_execution.h"
#include "distributed/task_tracker.h"
#include "distributed/transaction_management.h"
#include "distributed/transaction_recovery.h"
@ -547,6 +548,17 @@ RegisterCitusConfigVariables(void)
GUC_UNIT_KB,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_intermediate_result_size",
gettext_noop("Sets the maximum size of the intermediate results in KB for "
"CTEs and complex subqueries."),
NULL,
&MaxIntermediateResult,
1048576, -1, MAX_KILOBYTES,
PGC_USERSET,
GUC_UNIT_KB,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_worker_nodes_tracked",
gettext_noop("Sets the maximum number of worker nodes that are tracked."),

View File

@ -26,6 +26,7 @@
#include "distributed/multi_shard_transaction.h"
#include "distributed/transaction_management.h"
#include "distributed/placement_connection.h"
#include "distributed/subplan_execution.h"
#include "utils/hsearch.h"
#include "utils/guc.h"
#include "utils/memutils.h"
@ -213,6 +214,16 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
XactModificationLevel = XACT_MODIFICATION_NONE;
dlist_init(&InProgressTransactions);
CoordinatedTransactionUses2PC = false;
/*
* We should reset SubPlanLevel in case a transaction is aborted,
* otherwise this variable would stay +ve if the transaction is
* aborted in the middle of a CTE/complex subquery execution
* which would cause the subsequent queries to error out in
* case the copy size is greater than
* citus.max_intermediate_result_size
*/
SubPlanLevel = 0;
UnSetDistributedTransactionId();
break;
}

View File

@ -20,6 +20,7 @@
#include "utils/memutils.h"
#include "utils/resowner_private.h"
#include "distributed/multi_resowner.h"
#include "distributed/subplan_execution.h"
typedef struct JobDirectoryEntry

View File

@ -343,7 +343,7 @@ ReceiveRegularFile(const char *nodeName, uint32 nodePort, const char *nodeUser,
/* loop until we receive and append all the data from remote node */
while (!copyDone)
{
CopyStatus copyStatus = MultiClientCopyData(connectionId, fileDescriptor);
CopyStatus copyStatus = MultiClientCopyData(connectionId, fileDescriptor, NULL);
if (copyStatus == CLIENT_COPY_DONE)
{
copyDone = true;

View File

@ -117,7 +117,8 @@ extern bool MultiClientSendQuery(int32 connectionId, const char *query);
extern bool MultiClientCancel(int32 connectionId);
extern ResultStatus MultiClientResultStatus(int32 connectionId);
extern QueryStatus MultiClientQueryStatus(int32 connectionId);
extern CopyStatus MultiClientCopyData(int32 connectionId, int32 fileDescriptor);
extern CopyStatus MultiClientCopyData(int32 connectionId, int32 fileDescriptor,
uint64 *returnBytesReceived);
extern bool MultiClientQueryResult(int32 connectionId, void **queryResult,
int *rowCount, int *columnCount);
extern BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryResult,

View File

@ -110,6 +110,19 @@ typedef enum
} ConnectAction;
/*
* DistributedExecutionStats holds the execution related stats.
*
* totalIntermediateResultSize is a counter to keep the size
* of the intermediate results of complex subqueries and CTEs
* so that we can put a limit on the size.
*/
typedef struct DistributedExecutionStats
{
uint64 totalIntermediateResultSize;
} DistributedExecutionStats;
/*
* TaskExecution holds state that relates to a task's execution. In the case of
* the real-time executor, this struct encapsulates all information necessary to
@ -203,7 +216,9 @@ extern void MultiTaskTrackerExecute(Job *job);
extern MultiExecutorType JobExecutorType(DistributedPlan *distributedPlan);
extern void RemoveJobDirectory(uint64 jobId);
extern TaskExecution * InitTaskExecution(Task *task, TaskExecStatus initialStatus);
extern bool CheckIfSizeLimitIsExceeded(DistributedExecutionStats *executionStats);
extern void CleanupTaskExecution(TaskExecution *taskExecution);
extern void ErrorSizeLimitIsExceeded(void);
extern bool TaskExecutionFailed(TaskExecution *taskExecution);
extern void AdjustStateForFailure(TaskExecution *taskExecution);
extern int MaxMasterConnectionCount(void);

View File

@ -14,6 +14,8 @@
#include "distributed/multi_physical_planner.h"
extern int MaxIntermediateResult;
extern int SubPlanLevel;
extern void ExecuteSubPlans(DistributedPlan *distributedPlan);

View File

@ -0,0 +1,304 @@
SET citus.enable_repartition_joins to ON;
SET citus.max_intermediate_result_size TO 3;
-- should fail because the copy size is ~4kB for each cte
WITH cte AS
(
SELECT * FROM users_table
),
cte2 AS (
SELECT * FROM events_table
)
SELECT cte.user_id, cte.value_2 FROM cte,cte2 ORDER BY 1,2 LIMIT 10;
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 3 kB)
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
SET citus.max_intermediate_result_size TO 9;
-- regular task-tracker CTE should fail
WITH cte AS
(
SELECT
users_table.user_id, users_table.value_1, users_table.value_2
FROM
users_table
join
events_table
on
(users_table.value_3=events_table.value_3)
),
cte2 AS (
SELECT * FROM events_table
)
SELECT
cte.user_id, cte2.value_2
FROM
cte JOIN cte2 ON (cte.value_1 = cte2.event_type)
ORDER BY
1,2
LIMIT 10;
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 9 kB)
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
-- router queries should be able to get limitted too
SET citus.max_intermediate_result_size TO 3;
-- this should pass, since we fetch small portions in each subplan
with cte as (select * from users_table where user_id=1),
cte2 as (select * from users_table where user_id=2),
cte3 as (select * from users_table where user_id=3),
cte4 as (select * from users_table where user_id=4),
cte5 as (select * from users_table where user_id=5)
SELECT * FROM (
(select * from cte)
UNION
(select * from cte2)
UNION
(select * from cte3)
UNION
(select * from cte4)
UNION
(select * from cte5)
)a ORDER BY 1,2,3,4,5 LIMIT 10;
user_id | time | value_1 | value_2 | value_3 | value_4
---------+---------------------------------+---------+---------+---------+---------
1 | Wed Nov 22 22:51:43.132261 2017 | 4 | 0 | 3 |
1 | Thu Nov 23 03:32:50.803031 2017 | 3 | 2 | 1 |
1 | Thu Nov 23 09:26:42.145043 2017 | 1 | 3 | 3 |
1 | Thu Nov 23 11:11:24.40789 2017 | 3 | 4 | 0 |
1 | Thu Nov 23 11:44:57.515981 2017 | 4 | 3 | 4 |
1 | Thu Nov 23 17:23:03.441394 2017 | 5 | 4 | 3 |
1 | Thu Nov 23 17:30:34.635085 2017 | 3 | 4 | 4 |
2 | Wed Nov 22 18:19:49.944985 2017 | 3 | 5 | 1 |
2 | Thu Nov 23 00:19:14.138058 2017 | 3 | 4 | 0 |
2 | Thu Nov 23 01:04:26.198826 2017 | 4 | 3 | 4 |
(10 rows)
-- if we fetch the same amount of data at once, it should fail
WITH cte AS (SELECT * FROM users_table WHERE user_id IN (1,2,3,4,5))
SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10;
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 3 kB)
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
SET citus.max_intermediate_result_size TO 0;
-- this should fail
WITH cte AS (SELECT * FROM users_table WHERE user_id=1),
cte2 AS (SELECT * FROM users_table WHERE user_id=2),
cte3 AS (SELECT * FROM users_table WHERE user_id=3),
cte4 AS (SELECT * FROM users_table WHERE user_id=4),
cte5 AS (SELECT * FROM users_table WHERE user_id=5)
SELECT * FROM (
(SELECT * FROM cte)
UNION
(SELECT * FROM cte2)
UNION
(SELECT * FROM cte3)
UNION
(SELECT * FROM cte4)
UNION
(SELECT * FROM cte5)
)a ORDER BY 1,2,3,4,5 LIMIT 10;
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 0 kB)
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
-- this should fail since the cte-subplan exceeds the limit even if the
-- cte2 and cte3 does not
SET citus.max_intermediate_result_size TO 4;
WITH cte AS (
WITH cte2 AS (
SELECT * FROM users_table
),
cte3 AS (
SELECT * FROM events_table
)
SELECT * FROM cte2, cte3 WHERE cte2.user_id = cte3.user_id AND cte2.user_id = 1
)
SELECT * FROM cte;
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 4 kB)
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
SET citus.max_intermediate_result_size TO 3;
-- this should fail since the cte-subplan exceeds the limit even if the
-- cte2 and cte3 does not
WITH cte AS (
WITH cte2 AS (
SELECT * FROM users_table WHERE user_id IN (3,4,5,6)
),
cte3 AS (
SELECT * FROM events_table WHERE event_type = 1
)
SELECT * FROM cte2, cte3 WHERE cte2.value_1 IN (SELECT value_2 FROM cte3)
)
SELECT * FROM cte;
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 3 kB)
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
-- this will fail in real_time_executor
WITH cte AS (
WITH cte2 AS (
SELECT * FROM users_table WHERE user_id IN (1, 2)
),
cte3 AS (
SELECT * FROM users_table WHERE user_id = 3
)
SELECT * FROM cte2 UNION (SELECT * FROM cte3)
),
cte4 AS (
SELECT * FROM events_table
)
SELECT * FROM cte UNION ALL
SELECT * FROM cte4 ORDER BY 1,2,3,4,5 LIMIT 5;
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 3 kB)
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
SET citus.max_intermediate_result_size TO 1;
-- this will fail in router_executor
WITH cte AS (
WITH cte2 AS (
SELECT * FROM users_table WHERE user_id IN (1, 2)
),
cte3 AS (
SELECT * FROM users_table WHERE user_id = 3
)
SELECT * FROM cte2 UNION (SELECT * FROM cte3)
),
cte4 AS (
SELECT * FROM events_table
)
SELECT * FROM cte UNION ALL
SELECT * FROM cte4 ORDER BY 1,2,3,4,5 LIMIT 5;
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 1 kB)
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
-- Below that, all should pass since -1 disables the limit
SET citus.max_intermediate_result_size TO -1;
-- real_time_executor + router_executor + real_time_executor will pass
WITH cte AS (
WITH cte2 AS (
SELECT * FROM users_table WHERE user_id IN (1, 2)
),
cte3 AS (
SELECT * FROM users_table WHERE user_id = 3
)
SELECT * FROM cte2 UNION (SELECT * FROM cte3)
),
cte4 AS (
SELECT * FROM events_table
)
SELECT * FROM cte UNION ALL
SELECT * FROM cte4 ORDER BY 1,2,3,4,5 LIMIT 5;
user_id | time | value_1 | value_2 | value_3 | value_4
---------+---------------------------------+---------+---------+---------+---------
1 | Wed Nov 22 18:49:42.327403 2017 | 3 | 2 | 1 |
1 | Wed Nov 22 19:03:01.772353 2017 | 4 | 1 | 2 |
1 | Wed Nov 22 19:07:03.846437 2017 | 1 | 2 | 5 |
1 | Wed Nov 22 20:56:21.122638 2017 | 2 | 4 | 4 |
1 | Wed Nov 22 21:06:57.457147 2017 | 4 | 3 | 2 |
(5 rows)
-- regular task-tracker CTE, should work since -1 disables the limit
WITH cte AS
(
SELECT
users_table.user_id, users_table.value_1, users_table.value_2
FROM
users_table
join
events_table
on
(users_table.value_2=events_table.value_2)
),
cte2 AS (
SELECT * FROM events_table
)
SELECT
cte.user_id, cte2.value_2
FROM
cte JOIN cte2 ON (cte.value_1 = cte2.event_type)
ORDER BY
1,2
LIMIT 10;
user_id | value_2
---------+---------
1 | 0
1 | 0
1 | 0
1 | 0
1 | 0
1 | 0
1 | 0
1 | 0
1 | 0
1 | 0
(10 rows)
-- regular real-time CTE fetches around ~4kb data in each subplan
WITH cte AS
(
SELECT * FROM users_table
),
cte2 AS (
SELECT * FROM events_table
)
SELECT cte.user_id, cte.value_2 FROM cte,cte2 ORDER BY 1,2 LIMIT 10;
user_id | value_2
---------+---------
1 | 0
1 | 0
1 | 0
1 | 0
1 | 0
1 | 0
1 | 0
1 | 0
1 | 0
1 | 0
(10 rows)
-- regular real-time query fetches ~4kB
WITH cte AS
(
SELECT * FROM users_table WHERE user_id IN (1,2,3,4,5)
)
SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10;
user_id | time | value_1 | value_2 | value_3 | value_4
---------+---------------------------------+---------+---------+---------+---------
1 | Wed Nov 22 22:51:43.132261 2017 | 4 | 0 | 3 |
1 | Thu Nov 23 03:32:50.803031 2017 | 3 | 2 | 1 |
1 | Thu Nov 23 09:26:42.145043 2017 | 1 | 3 | 3 |
1 | Thu Nov 23 11:11:24.40789 2017 | 3 | 4 | 0 |
1 | Thu Nov 23 11:44:57.515981 2017 | 4 | 3 | 4 |
1 | Thu Nov 23 17:23:03.441394 2017 | 5 | 4 | 3 |
1 | Thu Nov 23 17:30:34.635085 2017 | 3 | 4 | 4 |
2 | Wed Nov 22 18:19:49.944985 2017 | 3 | 5 | 1 |
2 | Thu Nov 23 00:19:14.138058 2017 | 3 | 4 | 0 |
2 | Thu Nov 23 01:04:26.198826 2017 | 4 | 3 | 4 |
(10 rows)
-- nested CTEs
WITH cte AS (
WITH cte2 AS (
SELECT * FROM users_table
),
cte3 AS (
SELECT * FROM events_table
)
SELECT
cte2.user_id, cte2.time, cte3.event_type, cte3.value_2, cte3.value_3
FROM
cte2, cte3
WHERE
cte2.user_id = cte3.user_id AND cte2.user_id = 1
)
SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10;
user_id | time | event_type | value_2 | value_3
---------+---------------------------------+------------+---------+---------
1 | Wed Nov 22 22:51:43.132261 2017 | 0 | 2 | 0
1 | Wed Nov 22 22:51:43.132261 2017 | 0 | 5 | 1
1 | Wed Nov 22 22:51:43.132261 2017 | 1 | 1 | 1
1 | Wed Nov 22 22:51:43.132261 2017 | 1 | 2 | 5
1 | Wed Nov 22 22:51:43.132261 2017 | 2 | 4 | 3
1 | Wed Nov 22 22:51:43.132261 2017 | 2 | 4 | 4
1 | Wed Nov 22 22:51:43.132261 2017 | 3 | 1 | 1
1 | Wed Nov 22 22:51:43.132261 2017 | 3 | 2 | 1
1 | Wed Nov 22 22:51:43.132261 2017 | 3 | 4 | 1
1 | Wed Nov 22 22:51:43.132261 2017 | 4 | 1 | 2
(10 rows)

View File

@ -49,7 +49,7 @@ test: subquery_prepared_statements
# ----------
# Miscellaneous tests to check our query planning behavior
# ----------
test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time_transaction intermediate_results
test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time_transaction intermediate_results limit_intermediate_size
test: multi_explain
test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql

View File

@ -0,0 +1,229 @@
SET citus.enable_repartition_joins to ON;
SET citus.max_intermediate_result_size TO 3;
-- should fail because the copy size is ~4kB for each cte
WITH cte AS
(
SELECT * FROM users_table
),
cte2 AS (
SELECT * FROM events_table
)
SELECT cte.user_id, cte.value_2 FROM cte,cte2 ORDER BY 1,2 LIMIT 10;
SET citus.max_intermediate_result_size TO 9;
-- regular task-tracker CTE should fail
WITH cte AS
(
SELECT
users_table.user_id, users_table.value_1, users_table.value_2
FROM
users_table
join
events_table
on
(users_table.value_3=events_table.value_3)
),
cte2 AS (
SELECT * FROM events_table
)
SELECT
cte.user_id, cte2.value_2
FROM
cte JOIN cte2 ON (cte.value_1 = cte2.event_type)
ORDER BY
1,2
LIMIT 10;
-- router queries should be able to get limitted too
SET citus.max_intermediate_result_size TO 3;
-- this should pass, since we fetch small portions in each subplan
with cte as (select * from users_table where user_id=1),
cte2 as (select * from users_table where user_id=2),
cte3 as (select * from users_table where user_id=3),
cte4 as (select * from users_table where user_id=4),
cte5 as (select * from users_table where user_id=5)
SELECT * FROM (
(select * from cte)
UNION
(select * from cte2)
UNION
(select * from cte3)
UNION
(select * from cte4)
UNION
(select * from cte5)
)a ORDER BY 1,2,3,4,5 LIMIT 10;
-- if we fetch the same amount of data at once, it should fail
WITH cte AS (SELECT * FROM users_table WHERE user_id IN (1,2,3,4,5))
SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10;
SET citus.max_intermediate_result_size TO 0;
-- this should fail
WITH cte AS (SELECT * FROM users_table WHERE user_id=1),
cte2 AS (SELECT * FROM users_table WHERE user_id=2),
cte3 AS (SELECT * FROM users_table WHERE user_id=3),
cte4 AS (SELECT * FROM users_table WHERE user_id=4),
cte5 AS (SELECT * FROM users_table WHERE user_id=5)
SELECT * FROM (
(SELECT * FROM cte)
UNION
(SELECT * FROM cte2)
UNION
(SELECT * FROM cte3)
UNION
(SELECT * FROM cte4)
UNION
(SELECT * FROM cte5)
)a ORDER BY 1,2,3,4,5 LIMIT 10;
-- this should fail since the cte-subplan exceeds the limit even if the
-- cte2 and cte3 does not
SET citus.max_intermediate_result_size TO 4;
WITH cte AS (
WITH cte2 AS (
SELECT * FROM users_table
),
cte3 AS (
SELECT * FROM events_table
)
SELECT * FROM cte2, cte3 WHERE cte2.user_id = cte3.user_id AND cte2.user_id = 1
)
SELECT * FROM cte;
SET citus.max_intermediate_result_size TO 3;
-- this should fail since the cte-subplan exceeds the limit even if the
-- cte2 and cte3 does not
WITH cte AS (
WITH cte2 AS (
SELECT * FROM users_table WHERE user_id IN (3,4,5,6)
),
cte3 AS (
SELECT * FROM events_table WHERE event_type = 1
)
SELECT * FROM cte2, cte3 WHERE cte2.value_1 IN (SELECT value_2 FROM cte3)
)
SELECT * FROM cte;
-- this will fail in real_time_executor
WITH cte AS (
WITH cte2 AS (
SELECT * FROM users_table WHERE user_id IN (1, 2)
),
cte3 AS (
SELECT * FROM users_table WHERE user_id = 3
)
SELECT * FROM cte2 UNION (SELECT * FROM cte3)
),
cte4 AS (
SELECT * FROM events_table
)
SELECT * FROM cte UNION ALL
SELECT * FROM cte4 ORDER BY 1,2,3,4,5 LIMIT 5;
SET citus.max_intermediate_result_size TO 1;
-- this will fail in router_executor
WITH cte AS (
WITH cte2 AS (
SELECT * FROM users_table WHERE user_id IN (1, 2)
),
cte3 AS (
SELECT * FROM users_table WHERE user_id = 3
)
SELECT * FROM cte2 UNION (SELECT * FROM cte3)
),
cte4 AS (
SELECT * FROM events_table
)
SELECT * FROM cte UNION ALL
SELECT * FROM cte4 ORDER BY 1,2,3,4,5 LIMIT 5;
-- Below that, all should pass since -1 disables the limit
SET citus.max_intermediate_result_size TO -1;
-- real_time_executor + router_executor + real_time_executor will pass
WITH cte AS (
WITH cte2 AS (
SELECT * FROM users_table WHERE user_id IN (1, 2)
),
cte3 AS (
SELECT * FROM users_table WHERE user_id = 3
)
SELECT * FROM cte2 UNION (SELECT * FROM cte3)
),
cte4 AS (
SELECT * FROM events_table
)
SELECT * FROM cte UNION ALL
SELECT * FROM cte4 ORDER BY 1,2,3,4,5 LIMIT 5;
-- regular task-tracker CTE, should work since -1 disables the limit
WITH cte AS
(
SELECT
users_table.user_id, users_table.value_1, users_table.value_2
FROM
users_table
join
events_table
on
(users_table.value_2=events_table.value_2)
),
cte2 AS (
SELECT * FROM events_table
)
SELECT
cte.user_id, cte2.value_2
FROM
cte JOIN cte2 ON (cte.value_1 = cte2.event_type)
ORDER BY
1,2
LIMIT 10;
-- regular real-time CTE fetches around ~4kb data in each subplan
WITH cte AS
(
SELECT * FROM users_table
),
cte2 AS (
SELECT * FROM events_table
)
SELECT cte.user_id, cte.value_2 FROM cte,cte2 ORDER BY 1,2 LIMIT 10;
-- regular real-time query fetches ~4kB
WITH cte AS
(
SELECT * FROM users_table WHERE user_id IN (1,2,3,4,5)
)
SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10;
-- nested CTEs
WITH cte AS (
WITH cte2 AS (
SELECT * FROM users_table
),
cte3 AS (
SELECT * FROM events_table
)
SELECT
cte2.user_id, cte2.time, cte3.event_type, cte3.value_2, cte3.value_3
FROM
cte2, cte3
WHERE
cte2.user_id = cte3.user_id AND cte2.user_id = 1
)
SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10;