diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 5a3680cbc..cbc2fd572 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -24,6 +24,7 @@ #include "distributed/multi_server_executor.h" #include "distributed/placement_connection.h" #include "distributed/remote_commands.h" +#include "distributed/subplan_execution.h" #include #include @@ -703,7 +704,7 @@ MultiClientQueryStatus(int32 connectionId) /* MultiClientCopyData copies data from the file. */ CopyStatus -MultiClientCopyData(int32 connectionId, int32 fileDescriptor) +MultiClientCopyData(int32 connectionId, int32 fileDescriptor, uint64 *returnBytesReceived) { MultiConnection *connection = NULL; char *receiveBuffer = NULL; @@ -735,6 +736,11 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) int appended = -1; errno = 0; + if (returnBytesReceived) + { + *returnBytesReceived += receiveLength; + } + appended = write(fileDescriptor, receiveBuffer, receiveLength); if (appended != receiveLength) { diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 735f75a34..ca1a40c62 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -19,7 +19,6 @@ #include "postgres.h" #include "miscadmin.h" -#include #include #include "access/xact.h" @@ -42,7 +41,8 @@ /* Local functions forward declarations */ static ConnectAction ManageTaskExecution(Task *task, TaskExecution *taskExecution, - TaskExecutionStatus *executionStatus); + TaskExecutionStatus *executionStatus, + DistributedExecutionStats *executionStats); static bool TaskExecutionReadyToStart(TaskExecution *taskExecution); static bool TaskExecutionCompleted(TaskExecution *taskExecution); static void CancelTaskExecutionIfActive(TaskExecution *taskExecution); @@ -83,6 +83,8 @@ MultiRealTimeExecute(Job *job) bool allTasksCompleted = false; bool taskCompleted = false; bool taskFailed = false; + bool sizeLimitIsExceeded = false; + DistributedExecutionStats executionStats = { 0 }; List *workerNodeList = NIL; HTAB *workerHash = NULL; @@ -107,7 +109,8 @@ MultiRealTimeExecute(Job *job) } /* loop around until all tasks complete, one task fails, or user cancels */ - while (!(allTasksCompleted || taskFailed || QueryCancelPending)) + while (!(allTasksCompleted || taskFailed || QueryCancelPending || + sizeLimitIsExceeded)) { uint32 taskCount = list_length(taskList); uint32 completedTaskCount = 0; @@ -137,7 +140,8 @@ MultiRealTimeExecute(Job *job) } /* call the function that performs the core task execution logic */ - connectAction = ManageTaskExecution(task, taskExecution, &executionStatus); + connectAction = ManageTaskExecution(task, taskExecution, &executionStatus, + &executionStats); /* update the connection counter for throttling */ UpdateConnectionCounter(workerNodeState, connectAction); @@ -173,6 +177,13 @@ MultiRealTimeExecute(Job *job) } } + /* in case the task has intermediate results */ + if (CheckIfSizeLimitIsExceeded(&executionStats)) + { + sizeLimitIsExceeded = true; + break; + } + /* * Check if all tasks completed; otherwise wait as appropriate to * avoid a tight loop. That means we immediately continue if tasks are @@ -235,7 +246,11 @@ MultiRealTimeExecute(Job *job) * user cancellation request, we can now safely emit an error message (all * client-side resources have been cleared). */ - if (taskFailed) + if (sizeLimitIsExceeded) + { + ErrorSizeLimitIsExceeded(); + } + else if (taskFailed) { ereport(ERROR, (errmsg("failed to execute task %u", failedTaskId))); } @@ -258,7 +273,8 @@ MultiRealTimeExecute(Job *job) */ static ConnectAction ManageTaskExecution(Task *task, TaskExecution *taskExecution, - TaskExecutionStatus *executionStatus) + TaskExecutionStatus *executionStatus, + DistributedExecutionStats *executionStats) { TaskExecStatus *taskStatusArray = taskExecution->taskStatusArray; int32 *connectionIdArray = taskExecution->connectionIdArray; @@ -660,9 +676,16 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, int32 connectionId = connectionIdArray[currentIndex]; int32 fileDesc = fileDescriptorArray[currentIndex]; int closed = -1; + uint64 bytesReceived = 0; /* copy data from worker node, and write to local file */ - CopyStatus copyStatus = MultiClientCopyData(connectionId, fileDesc); + CopyStatus copyStatus = MultiClientCopyData(connectionId, fileDesc, + &bytesReceived); + + if (SubPlanLevel > 0) + { + executionStats->totalIntermediateResultSize += bytesReceived; + } /* if worker node will continue to send more data, keep reading */ if (copyStatus == CLIENT_COPY_MORE) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 795bb2fdc..6ac238d88 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -97,7 +97,8 @@ static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query, ParamListInfo paramListInfo); static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, - bool failOnError, int64 *rows); + bool failOnError, int64 *rows, + DistributedExecutionStats *executionStats); static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows); @@ -577,6 +578,7 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) ListCell *taskPlacementCell = NULL; char *queryString = task->queryString; List *relationShardList = task->relationShardList; + DistributedExecutionStats executionStats = { 0 }; /* * Try to run the query to completion on one placement. If the query fails @@ -638,7 +640,14 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) } queryOK = StoreQueryResult(scanState, connection, dontFailOnError, - ¤tAffectedTupleCount); + ¤tAffectedTupleCount, + &executionStats); + + if (CheckIfSizeLimitIsExceeded(&executionStats)) + { + ErrorSizeLimitIsExceeded(); + } + if (queryOK) { return; @@ -821,7 +830,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask if (!gotResults && expectResults) { queryOK = StoreQueryResult(scanState, connection, failOnError, - ¤tAffectedTupleCount); + ¤tAffectedTupleCount, NULL); } else { @@ -1156,7 +1165,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn Assert(scanState != NULL); queryOK = StoreQueryResult(scanState, connection, failOnError, - ¤tAffectedTupleCount); + ¤tAffectedTupleCount, NULL); } else { @@ -1342,7 +1351,8 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT */ static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, - bool failOnError, int64 *rows) + bool failOnError, int64 *rows, + DistributedExecutionStats *executionStats) { TupleDesc tupleDescriptor = scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; @@ -1442,6 +1452,12 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, else { columnArray[columnIndex] = PQgetvalue(result, rowIndex, columnIndex); + if (SubPlanLevel > 0) + { + executionStats->totalIntermediateResultSize += PQgetlength(result, + rowIndex, + columnIndex); + } } } diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 57cb46f40..b1196eff2 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -23,9 +23,9 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_resowner.h" #include "distributed/multi_server_executor.h" +#include "distributed/subplan_execution.h" #include "distributed/worker_protocol.h" - int RemoteTaskCheckInterval = 100; /* per cycle sleep interval in millisecs */ int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */ bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */ @@ -281,3 +281,47 @@ AdjustStateForFailure(TaskExecution *taskExecution) taskExecution->dataFetchTaskIndex = -1; /* reset data fetch counter */ taskExecution->failureCount++; /* record failure */ } + + +/* + * CheckIfSizeLimitIsExceeded checks if the limit is exceeded by intermediate + * results, if there is any. + */ +bool +CheckIfSizeLimitIsExceeded(DistributedExecutionStats *executionStats) +{ + uint64 maxIntermediateResultInBytes = 0; + + if (!SubPlanLevel || MaxIntermediateResult < 0) + { + return false; + } + + maxIntermediateResultInBytes = MaxIntermediateResult * 1024L; + if (executionStats->totalIntermediateResultSize < maxIntermediateResultInBytes) + { + return false; + } + + return true; +} + + +/* + * This function is called when the intermediate result size limitation is + * exceeded. It basically errors out with a detailed explanation. + */ +void +ErrorSizeLimitIsExceeded() +{ + ereport(ERROR, (errmsg("the intermediate result size exceeds " + "citus.max_intermediate_result_size (currently %d kB)", + MaxIntermediateResult), + errdetail("Citus restricts the size of intermediate " + "results of complex subqueries and CTEs to " + "avoid accidentally pulling large result sets " + "into once place."), + errhint("To run the current query, set " + "citus.max_intermediate_result_size to a higher" + " value or -1 to disable."))); +} diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 3dc4f3111..47c05e9e3 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -35,6 +35,7 @@ #include "distributed/multi_server_executor.h" #include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" +#include "distributed/subplan_execution.h" #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" #include "storage/fd.h" @@ -96,7 +97,9 @@ static TaskExecStatus ManageTaskExecution(TaskTracker *taskTracker, Task *task, TaskExecution *taskExecution); static TransmitExecStatus ManageTransmitExecution(TaskTracker *transmitTracker, Task *task, - TaskExecution *taskExecution); + TaskExecution *taskExecution, + DistributedExecutionStats * + executionStats); static bool TaskExecutionsCompleted(List *taskList); static StringInfo MapFetchTaskQueryString(Task *mapFetchTask, Task *mapTask); static void TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task); @@ -161,7 +164,9 @@ MultiTaskTrackerExecute(Job *job) bool taskFailed = false; bool taskTransmitFailed = false; bool clusterFailed = false; + bool sizeLimitIsExceeded = false; + DistributedExecutionStats executionStats = { 0 }; List *workerNodeList = NIL; HTAB *taskTrackerHash = NULL; HTAB *transmitTrackerHash = NULL; @@ -219,7 +224,7 @@ MultiTaskTrackerExecute(Job *job) /* loop around until all tasks complete, one task fails, or user cancels */ while (!(allTasksCompleted || taskFailed || taskTransmitFailed || - clusterFailed || QueryCancelPending)) + clusterFailed || QueryCancelPending || sizeLimitIsExceeded)) { TaskTracker *taskTracker = NULL; TaskTracker *transmitTracker = NULL; @@ -328,7 +333,8 @@ MultiTaskTrackerExecute(Job *job) /* call the function that fetches results for completed SQL tasks */ transmitExecutionStatus = ManageTransmitExecution(execTransmitTracker, - task, taskExecution); + task, taskExecution, + &executionStats); /* * If we cannot transmit SQL task's results to the master, we first @@ -363,6 +369,13 @@ MultiTaskTrackerExecute(Job *job) } } + + if (CheckIfSizeLimitIsExceeded(&executionStats)) + { + sizeLimitIsExceeded = true; + break; + } + /* third, loop around task trackers and manage them */ hash_seq_init(&taskStatus, taskTrackerHash); hash_seq_init(&transmitStatus, transmitTrackerHash); @@ -428,7 +441,11 @@ MultiTaskTrackerExecute(Job *job) * If we previously broke out of the execution loop due to a task failure or * user cancellation request, we can now safely emit an error message. */ - if (taskFailed) + if (sizeLimitIsExceeded) + { + ErrorSizeLimitIsExceeded(); + } + else if (taskFailed) { ereport(ERROR, (errmsg("failed to execute task %u", failedTaskId))); } @@ -1273,7 +1290,8 @@ ManageTaskExecution(TaskTracker *taskTracker, TaskTracker *sourceTaskTracker, */ static TransmitExecStatus ManageTransmitExecution(TaskTracker *transmitTracker, - Task *task, TaskExecution *taskExecution) + Task *task, TaskExecution *taskExecution, + DistributedExecutionStats *executionStats) { int32 *fileDescriptorArray = taskExecution->fileDescriptorArray; uint32 currentNodeIndex = taskExecution->currentNodeIndex; @@ -1397,12 +1415,20 @@ ManageTransmitExecution(TaskTracker *transmitTracker, int32 fileDescriptor = fileDescriptorArray[currentNodeIndex]; CopyStatus copyStatus = CLIENT_INVALID_COPY; int closed = -1; + uint64 bytesReceived = 0; /* the open connection belongs to this task */ int32 connectionId = TransmitTrackerConnectionId(transmitTracker, task); Assert(connectionId != INVALID_CONNECTION_ID); - copyStatus = MultiClientCopyData(connectionId, fileDescriptor); + copyStatus = MultiClientCopyData(connectionId, fileDescriptor, + &bytesReceived); + + if (SubPlanLevel > 0) + { + executionStats->totalIntermediateResultSize += bytesReceived; + } + if (copyStatus == CLIENT_COPY_MORE) { /* worker node continues to send more data, keep reading */ diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c index e61820d46..5a3449dfe 100644 --- a/src/backend/distributed/executor/subplan_execution.c +++ b/src/backend/distributed/executor/subplan_execution.c @@ -19,6 +19,11 @@ #include "executor/executor.h" +int MaxIntermediateResult = 1048576; /* maximum size in KB the intermediate result can grow to */ +/* when this is true, we enforce intermediate result size limit in all executors */ +int SubPlanLevel = 0; + + /* * ExecuteSubPlans executes a list of subplans from a distributed plan * by sequentially executing each plan from the top. @@ -43,6 +48,7 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) char *resultId = GenerateResultId(planId, subPlanId); + SubPlanLevel++; estate = CreateExecutorState(); copyDest = (DestReceiver *) CreateRemoteFileDestReceiver(resultId, estate, nodeList, @@ -50,6 +56,7 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest); + SubPlanLevel--; FreeExecutorState(estate); } } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index ab6cb2633..4a1054d0a 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -41,6 +41,7 @@ #include "distributed/remote_commands.h" #include "distributed/shared_library_init.h" #include "distributed/statistics_collection.h" +#include "distributed/subplan_execution.h" #include "distributed/task_tracker.h" #include "distributed/transaction_management.h" #include "distributed/transaction_recovery.h" @@ -547,6 +548,17 @@ RegisterCitusConfigVariables(void) GUC_UNIT_KB, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_intermediate_result_size", + gettext_noop("Sets the maximum size of the intermediate results in KB for " + "CTEs and complex subqueries."), + NULL, + &MaxIntermediateResult, + 1048576, -1, MAX_KILOBYTES, + PGC_USERSET, + GUC_UNIT_KB, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.max_worker_nodes_tracked", gettext_noop("Sets the maximum number of worker nodes that are tracked."), diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index c73d4b603..f666da70c 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -26,6 +26,7 @@ #include "distributed/multi_shard_transaction.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" +#include "distributed/subplan_execution.h" #include "utils/hsearch.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -213,6 +214,16 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) XactModificationLevel = XACT_MODIFICATION_NONE; dlist_init(&InProgressTransactions); CoordinatedTransactionUses2PC = false; + + /* + * We should reset SubPlanLevel in case a transaction is aborted, + * otherwise this variable would stay +ve if the transaction is + * aborted in the middle of a CTE/complex subquery execution + * which would cause the subsequent queries to error out in + * case the copy size is greater than + * citus.max_intermediate_result_size + */ + SubPlanLevel = 0; UnSetDistributedTransactionId(); break; } diff --git a/src/backend/distributed/utils/multi_resowner.c b/src/backend/distributed/utils/multi_resowner.c index 0e0c1ccaf..0e674bd46 100644 --- a/src/backend/distributed/utils/multi_resowner.c +++ b/src/backend/distributed/utils/multi_resowner.c @@ -20,6 +20,7 @@ #include "utils/memutils.h" #include "utils/resowner_private.h" #include "distributed/multi_resowner.h" +#include "distributed/subplan_execution.h" typedef struct JobDirectoryEntry diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 69641bfa4..fdb82c56c 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -343,7 +343,7 @@ ReceiveRegularFile(const char *nodeName, uint32 nodePort, const char *nodeUser, /* loop until we receive and append all the data from remote node */ while (!copyDone) { - CopyStatus copyStatus = MultiClientCopyData(connectionId, fileDescriptor); + CopyStatus copyStatus = MultiClientCopyData(connectionId, fileDescriptor, NULL); if (copyStatus == CLIENT_COPY_DONE) { copyDone = true; diff --git a/src/include/distributed/multi_client_executor.h b/src/include/distributed/multi_client_executor.h index 474e787a5..70a29243e 100644 --- a/src/include/distributed/multi_client_executor.h +++ b/src/include/distributed/multi_client_executor.h @@ -117,7 +117,8 @@ extern bool MultiClientSendQuery(int32 connectionId, const char *query); extern bool MultiClientCancel(int32 connectionId); extern ResultStatus MultiClientResultStatus(int32 connectionId); extern QueryStatus MultiClientQueryStatus(int32 connectionId); -extern CopyStatus MultiClientCopyData(int32 connectionId, int32 fileDescriptor); +extern CopyStatus MultiClientCopyData(int32 connectionId, int32 fileDescriptor, + uint64 *returnBytesReceived); extern bool MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, int *columnCount); extern BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryResult, diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index a22176d95..6f2ef0c28 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -110,6 +110,19 @@ typedef enum } ConnectAction; +/* + * DistributedExecutionStats holds the execution related stats. + * + * totalIntermediateResultSize is a counter to keep the size + * of the intermediate results of complex subqueries and CTEs + * so that we can put a limit on the size. + */ +typedef struct DistributedExecutionStats +{ + uint64 totalIntermediateResultSize; +} DistributedExecutionStats; + + /* * TaskExecution holds state that relates to a task's execution. In the case of * the real-time executor, this struct encapsulates all information necessary to @@ -203,7 +216,9 @@ extern void MultiTaskTrackerExecute(Job *job); extern MultiExecutorType JobExecutorType(DistributedPlan *distributedPlan); extern void RemoveJobDirectory(uint64 jobId); extern TaskExecution * InitTaskExecution(Task *task, TaskExecStatus initialStatus); +extern bool CheckIfSizeLimitIsExceeded(DistributedExecutionStats *executionStats); extern void CleanupTaskExecution(TaskExecution *taskExecution); +extern void ErrorSizeLimitIsExceeded(void); extern bool TaskExecutionFailed(TaskExecution *taskExecution); extern void AdjustStateForFailure(TaskExecution *taskExecution); extern int MaxMasterConnectionCount(void); diff --git a/src/include/distributed/subplan_execution.h b/src/include/distributed/subplan_execution.h index 5f5b45114..33c2bda7b 100644 --- a/src/include/distributed/subplan_execution.h +++ b/src/include/distributed/subplan_execution.h @@ -14,6 +14,8 @@ #include "distributed/multi_physical_planner.h" +extern int MaxIntermediateResult; +extern int SubPlanLevel; extern void ExecuteSubPlans(DistributedPlan *distributedPlan); diff --git a/src/test/regress/expected/limit_intermediate_size.out b/src/test/regress/expected/limit_intermediate_size.out new file mode 100644 index 000000000..044bc6c03 --- /dev/null +++ b/src/test/regress/expected/limit_intermediate_size.out @@ -0,0 +1,304 @@ +SET citus.enable_repartition_joins to ON; +SET citus.max_intermediate_result_size TO 3; +-- should fail because the copy size is ~4kB for each cte +WITH cte AS +( + SELECT * FROM users_table +), +cte2 AS ( + SELECT * FROM events_table +) +SELECT cte.user_id, cte.value_2 FROM cte,cte2 ORDER BY 1,2 LIMIT 10; +ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 3 kB) +DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. +HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. +SET citus.max_intermediate_result_size TO 9; +-- regular task-tracker CTE should fail +WITH cte AS +( + SELECT + users_table.user_id, users_table.value_1, users_table.value_2 + FROM + users_table + join + events_table + on + (users_table.value_3=events_table.value_3) +), +cte2 AS ( + SELECT * FROM events_table +) +SELECT + cte.user_id, cte2.value_2 +FROM + cte JOIN cte2 ON (cte.value_1 = cte2.event_type) +ORDER BY + 1,2 +LIMIT 10; +ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 9 kB) +DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. +HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. +-- router queries should be able to get limitted too +SET citus.max_intermediate_result_size TO 3; +-- this should pass, since we fetch small portions in each subplan +with cte as (select * from users_table where user_id=1), +cte2 as (select * from users_table where user_id=2), +cte3 as (select * from users_table where user_id=3), +cte4 as (select * from users_table where user_id=4), +cte5 as (select * from users_table where user_id=5) +SELECT * FROM ( +(select * from cte) +UNION +(select * from cte2) +UNION +(select * from cte3) +UNION +(select * from cte4) +UNION +(select * from cte5) +)a ORDER BY 1,2,3,4,5 LIMIT 10; + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+---------------------------------+---------+---------+---------+--------- + 1 | Wed Nov 22 22:51:43.132261 2017 | 4 | 0 | 3 | + 1 | Thu Nov 23 03:32:50.803031 2017 | 3 | 2 | 1 | + 1 | Thu Nov 23 09:26:42.145043 2017 | 1 | 3 | 3 | + 1 | Thu Nov 23 11:11:24.40789 2017 | 3 | 4 | 0 | + 1 | Thu Nov 23 11:44:57.515981 2017 | 4 | 3 | 4 | + 1 | Thu Nov 23 17:23:03.441394 2017 | 5 | 4 | 3 | + 1 | Thu Nov 23 17:30:34.635085 2017 | 3 | 4 | 4 | + 2 | Wed Nov 22 18:19:49.944985 2017 | 3 | 5 | 1 | + 2 | Thu Nov 23 00:19:14.138058 2017 | 3 | 4 | 0 | + 2 | Thu Nov 23 01:04:26.198826 2017 | 4 | 3 | 4 | +(10 rows) + +-- if we fetch the same amount of data at once, it should fail +WITH cte AS (SELECT * FROM users_table WHERE user_id IN (1,2,3,4,5)) +SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10; +ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 3 kB) +DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. +HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. +SET citus.max_intermediate_result_size TO 0; +-- this should fail +WITH cte AS (SELECT * FROM users_table WHERE user_id=1), +cte2 AS (SELECT * FROM users_table WHERE user_id=2), +cte3 AS (SELECT * FROM users_table WHERE user_id=3), +cte4 AS (SELECT * FROM users_table WHERE user_id=4), +cte5 AS (SELECT * FROM users_table WHERE user_id=5) +SELECT * FROM ( +(SELECT * FROM cte) +UNION +(SELECT * FROM cte2) +UNION +(SELECT * FROM cte3) +UNION +(SELECT * FROM cte4) +UNION +(SELECT * FROM cte5) +)a ORDER BY 1,2,3,4,5 LIMIT 10; +ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 0 kB) +DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. +HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. +-- this should fail since the cte-subplan exceeds the limit even if the +-- cte2 and cte3 does not +SET citus.max_intermediate_result_size TO 4; +WITH cte AS ( + WITH cte2 AS ( + SELECT * FROM users_table + ), + cte3 AS ( + SELECT * FROM events_table + ) + SELECT * FROM cte2, cte3 WHERE cte2.user_id = cte3.user_id AND cte2.user_id = 1 +) +SELECT * FROM cte; +ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 4 kB) +DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. +HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. +SET citus.max_intermediate_result_size TO 3; +-- this should fail since the cte-subplan exceeds the limit even if the +-- cte2 and cte3 does not +WITH cte AS ( + WITH cte2 AS ( + SELECT * FROM users_table WHERE user_id IN (3,4,5,6) + ), + cte3 AS ( + SELECT * FROM events_table WHERE event_type = 1 + ) + SELECT * FROM cte2, cte3 WHERE cte2.value_1 IN (SELECT value_2 FROM cte3) +) +SELECT * FROM cte; +ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 3 kB) +DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. +HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. +-- this will fail in real_time_executor +WITH cte AS ( + WITH cte2 AS ( + SELECT * FROM users_table WHERE user_id IN (1, 2) + ), + cte3 AS ( + SELECT * FROM users_table WHERE user_id = 3 + ) + SELECT * FROM cte2 UNION (SELECT * FROM cte3) +), +cte4 AS ( + SELECT * FROM events_table +) +SELECT * FROM cte UNION ALL +SELECT * FROM cte4 ORDER BY 1,2,3,4,5 LIMIT 5; +ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 3 kB) +DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. +HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. +SET citus.max_intermediate_result_size TO 1; +-- this will fail in router_executor +WITH cte AS ( + WITH cte2 AS ( + SELECT * FROM users_table WHERE user_id IN (1, 2) + ), + cte3 AS ( + SELECT * FROM users_table WHERE user_id = 3 + ) + SELECT * FROM cte2 UNION (SELECT * FROM cte3) +), +cte4 AS ( + SELECT * FROM events_table +) +SELECT * FROM cte UNION ALL +SELECT * FROM cte4 ORDER BY 1,2,3,4,5 LIMIT 5; +ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 1 kB) +DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. +HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. +-- Below that, all should pass since -1 disables the limit +SET citus.max_intermediate_result_size TO -1; +-- real_time_executor + router_executor + real_time_executor will pass +WITH cte AS ( + WITH cte2 AS ( + SELECT * FROM users_table WHERE user_id IN (1, 2) + ), + cte3 AS ( + SELECT * FROM users_table WHERE user_id = 3 + ) + SELECT * FROM cte2 UNION (SELECT * FROM cte3) +), +cte4 AS ( + SELECT * FROM events_table +) +SELECT * FROM cte UNION ALL +SELECT * FROM cte4 ORDER BY 1,2,3,4,5 LIMIT 5; + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+---------------------------------+---------+---------+---------+--------- + 1 | Wed Nov 22 18:49:42.327403 2017 | 3 | 2 | 1 | + 1 | Wed Nov 22 19:03:01.772353 2017 | 4 | 1 | 2 | + 1 | Wed Nov 22 19:07:03.846437 2017 | 1 | 2 | 5 | + 1 | Wed Nov 22 20:56:21.122638 2017 | 2 | 4 | 4 | + 1 | Wed Nov 22 21:06:57.457147 2017 | 4 | 3 | 2 | +(5 rows) + +-- regular task-tracker CTE, should work since -1 disables the limit +WITH cte AS +( + SELECT + users_table.user_id, users_table.value_1, users_table.value_2 + FROM + users_table + join + events_table + on + (users_table.value_2=events_table.value_2) +), +cte2 AS ( + SELECT * FROM events_table +) +SELECT + cte.user_id, cte2.value_2 +FROM + cte JOIN cte2 ON (cte.value_1 = cte2.event_type) +ORDER BY + 1,2 +LIMIT 10; + user_id | value_2 +---------+--------- + 1 | 0 + 1 | 0 + 1 | 0 + 1 | 0 + 1 | 0 + 1 | 0 + 1 | 0 + 1 | 0 + 1 | 0 + 1 | 0 +(10 rows) + +-- regular real-time CTE fetches around ~4kb data in each subplan +WITH cte AS +( + SELECT * FROM users_table +), +cte2 AS ( + SELECT * FROM events_table +) +SELECT cte.user_id, cte.value_2 FROM cte,cte2 ORDER BY 1,2 LIMIT 10; + user_id | value_2 +---------+--------- + 1 | 0 + 1 | 0 + 1 | 0 + 1 | 0 + 1 | 0 + 1 | 0 + 1 | 0 + 1 | 0 + 1 | 0 + 1 | 0 +(10 rows) + +-- regular real-time query fetches ~4kB +WITH cte AS +( + SELECT * FROM users_table WHERE user_id IN (1,2,3,4,5) +) +SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10; + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+---------------------------------+---------+---------+---------+--------- + 1 | Wed Nov 22 22:51:43.132261 2017 | 4 | 0 | 3 | + 1 | Thu Nov 23 03:32:50.803031 2017 | 3 | 2 | 1 | + 1 | Thu Nov 23 09:26:42.145043 2017 | 1 | 3 | 3 | + 1 | Thu Nov 23 11:11:24.40789 2017 | 3 | 4 | 0 | + 1 | Thu Nov 23 11:44:57.515981 2017 | 4 | 3 | 4 | + 1 | Thu Nov 23 17:23:03.441394 2017 | 5 | 4 | 3 | + 1 | Thu Nov 23 17:30:34.635085 2017 | 3 | 4 | 4 | + 2 | Wed Nov 22 18:19:49.944985 2017 | 3 | 5 | 1 | + 2 | Thu Nov 23 00:19:14.138058 2017 | 3 | 4 | 0 | + 2 | Thu Nov 23 01:04:26.198826 2017 | 4 | 3 | 4 | +(10 rows) + +-- nested CTEs +WITH cte AS ( + WITH cte2 AS ( + SELECT * FROM users_table + ), + cte3 AS ( + SELECT * FROM events_table + ) + SELECT + cte2.user_id, cte2.time, cte3.event_type, cte3.value_2, cte3.value_3 + FROM + cte2, cte3 + WHERE + cte2.user_id = cte3.user_id AND cte2.user_id = 1 +) +SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10; + user_id | time | event_type | value_2 | value_3 +---------+---------------------------------+------------+---------+--------- + 1 | Wed Nov 22 22:51:43.132261 2017 | 0 | 2 | 0 + 1 | Wed Nov 22 22:51:43.132261 2017 | 0 | 5 | 1 + 1 | Wed Nov 22 22:51:43.132261 2017 | 1 | 1 | 1 + 1 | Wed Nov 22 22:51:43.132261 2017 | 1 | 2 | 5 + 1 | Wed Nov 22 22:51:43.132261 2017 | 2 | 4 | 3 + 1 | Wed Nov 22 22:51:43.132261 2017 | 2 | 4 | 4 + 1 | Wed Nov 22 22:51:43.132261 2017 | 3 | 1 | 1 + 1 | Wed Nov 22 22:51:43.132261 2017 | 3 | 2 | 1 + 1 | Wed Nov 22 22:51:43.132261 2017 | 3 | 4 | 1 + 1 | Wed Nov 22 22:51:43.132261 2017 | 4 | 1 | 2 +(10 rows) + diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index e27935aa3..7c31a3981 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -49,7 +49,7 @@ test: subquery_prepared_statements # ---------- # Miscellaneous tests to check our query planning behavior # ---------- -test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time_transaction intermediate_results +test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time_transaction intermediate_results limit_intermediate_size test: multi_explain test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql diff --git a/src/test/regress/sql/limit_intermediate_size.sql b/src/test/regress/sql/limit_intermediate_size.sql new file mode 100644 index 000000000..eef2f4301 --- /dev/null +++ b/src/test/regress/sql/limit_intermediate_size.sql @@ -0,0 +1,229 @@ +SET citus.enable_repartition_joins to ON; + + +SET citus.max_intermediate_result_size TO 3; +-- should fail because the copy size is ~4kB for each cte +WITH cte AS +( + SELECT * FROM users_table +), +cte2 AS ( + SELECT * FROM events_table +) +SELECT cte.user_id, cte.value_2 FROM cte,cte2 ORDER BY 1,2 LIMIT 10; + + +SET citus.max_intermediate_result_size TO 9; +-- regular task-tracker CTE should fail +WITH cte AS +( + SELECT + users_table.user_id, users_table.value_1, users_table.value_2 + FROM + users_table + join + events_table + on + (users_table.value_3=events_table.value_3) +), +cte2 AS ( + SELECT * FROM events_table +) +SELECT + cte.user_id, cte2.value_2 +FROM + cte JOIN cte2 ON (cte.value_1 = cte2.event_type) +ORDER BY + 1,2 +LIMIT 10; + + +-- router queries should be able to get limitted too +SET citus.max_intermediate_result_size TO 3; +-- this should pass, since we fetch small portions in each subplan +with cte as (select * from users_table where user_id=1), +cte2 as (select * from users_table where user_id=2), +cte3 as (select * from users_table where user_id=3), +cte4 as (select * from users_table where user_id=4), +cte5 as (select * from users_table where user_id=5) +SELECT * FROM ( +(select * from cte) +UNION +(select * from cte2) +UNION +(select * from cte3) +UNION +(select * from cte4) +UNION +(select * from cte5) +)a ORDER BY 1,2,3,4,5 LIMIT 10; + + +-- if we fetch the same amount of data at once, it should fail +WITH cte AS (SELECT * FROM users_table WHERE user_id IN (1,2,3,4,5)) +SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10; + + +SET citus.max_intermediate_result_size TO 0; +-- this should fail +WITH cte AS (SELECT * FROM users_table WHERE user_id=1), +cte2 AS (SELECT * FROM users_table WHERE user_id=2), +cte3 AS (SELECT * FROM users_table WHERE user_id=3), +cte4 AS (SELECT * FROM users_table WHERE user_id=4), +cte5 AS (SELECT * FROM users_table WHERE user_id=5) +SELECT * FROM ( +(SELECT * FROM cte) +UNION +(SELECT * FROM cte2) +UNION +(SELECT * FROM cte3) +UNION +(SELECT * FROM cte4) +UNION +(SELECT * FROM cte5) +)a ORDER BY 1,2,3,4,5 LIMIT 10; + + +-- this should fail since the cte-subplan exceeds the limit even if the +-- cte2 and cte3 does not +SET citus.max_intermediate_result_size TO 4; +WITH cte AS ( + WITH cte2 AS ( + SELECT * FROM users_table + ), + cte3 AS ( + SELECT * FROM events_table + ) + SELECT * FROM cte2, cte3 WHERE cte2.user_id = cte3.user_id AND cte2.user_id = 1 +) +SELECT * FROM cte; + + +SET citus.max_intermediate_result_size TO 3; +-- this should fail since the cte-subplan exceeds the limit even if the +-- cte2 and cte3 does not +WITH cte AS ( + WITH cte2 AS ( + SELECT * FROM users_table WHERE user_id IN (3,4,5,6) + ), + cte3 AS ( + SELECT * FROM events_table WHERE event_type = 1 + ) + SELECT * FROM cte2, cte3 WHERE cte2.value_1 IN (SELECT value_2 FROM cte3) +) +SELECT * FROM cte; + + +-- this will fail in real_time_executor +WITH cte AS ( + WITH cte2 AS ( + SELECT * FROM users_table WHERE user_id IN (1, 2) + ), + cte3 AS ( + SELECT * FROM users_table WHERE user_id = 3 + ) + SELECT * FROM cte2 UNION (SELECT * FROM cte3) +), +cte4 AS ( + SELECT * FROM events_table +) +SELECT * FROM cte UNION ALL +SELECT * FROM cte4 ORDER BY 1,2,3,4,5 LIMIT 5; + + +SET citus.max_intermediate_result_size TO 1; +-- this will fail in router_executor +WITH cte AS ( + WITH cte2 AS ( + SELECT * FROM users_table WHERE user_id IN (1, 2) + ), + cte3 AS ( + SELECT * FROM users_table WHERE user_id = 3 + ) + SELECT * FROM cte2 UNION (SELECT * FROM cte3) +), +cte4 AS ( + SELECT * FROM events_table +) +SELECT * FROM cte UNION ALL +SELECT * FROM cte4 ORDER BY 1,2,3,4,5 LIMIT 5; + + +-- Below that, all should pass since -1 disables the limit +SET citus.max_intermediate_result_size TO -1; + +-- real_time_executor + router_executor + real_time_executor will pass +WITH cte AS ( + WITH cte2 AS ( + SELECT * FROM users_table WHERE user_id IN (1, 2) + ), + cte3 AS ( + SELECT * FROM users_table WHERE user_id = 3 + ) + SELECT * FROM cte2 UNION (SELECT * FROM cte3) +), +cte4 AS ( + SELECT * FROM events_table +) +SELECT * FROM cte UNION ALL +SELECT * FROM cte4 ORDER BY 1,2,3,4,5 LIMIT 5; + +-- regular task-tracker CTE, should work since -1 disables the limit +WITH cte AS +( + SELECT + users_table.user_id, users_table.value_1, users_table.value_2 + FROM + users_table + join + events_table + on + (users_table.value_2=events_table.value_2) +), +cte2 AS ( + SELECT * FROM events_table +) +SELECT + cte.user_id, cte2.value_2 +FROM + cte JOIN cte2 ON (cte.value_1 = cte2.event_type) +ORDER BY + 1,2 +LIMIT 10; + + +-- regular real-time CTE fetches around ~4kb data in each subplan +WITH cte AS +( + SELECT * FROM users_table +), +cte2 AS ( + SELECT * FROM events_table +) +SELECT cte.user_id, cte.value_2 FROM cte,cte2 ORDER BY 1,2 LIMIT 10; + + +-- regular real-time query fetches ~4kB +WITH cte AS +( + SELECT * FROM users_table WHERE user_id IN (1,2,3,4,5) +) +SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10; + + +-- nested CTEs +WITH cte AS ( + WITH cte2 AS ( + SELECT * FROM users_table + ), + cte3 AS ( + SELECT * FROM events_table + ) + SELECT + cte2.user_id, cte2.time, cte3.event_type, cte3.value_2, cte3.value_3 + FROM + cte2, cte3 + WHERE + cte2.user_id = cte3.user_id AND cte2.user_id = 1 +) +SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10; \ No newline at end of file