diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 8c182f871..6259bf2e4 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -593,9 +593,7 @@ CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, Query *query, DistributedPlan *distributedPlan = NULL; MultiTreeRoot *logicalPlan = NULL; - DeferredErrorMessage *error = NULL; List *subPlanList = NIL; - RecursivePlanningContext context; /* * For select queries we, if router executor is enabled, first try to @@ -647,18 +645,10 @@ CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, Query *query, /* * Plan subqueries and CTEs that cannot be pushed down by recursively - * calling the planner and add the resulting plans to subPlanList. + * calling the planner and return the resulting plans to subPlanList. */ - context.level = 0; - context.planId = planId; - context.subPlanList = NIL; - context.plannerRestrictionContext = plannerRestrictionContext; - - error = RecursivelyPlanSubqueriesAndCTEs(originalQuery, &context); - if (error != NULL) - { - RaiseDeferredError(error, ERROR); - } + subPlanList = GenerateSubplansForSubqueriesAndCTEs(planId, originalQuery, + plannerRestrictionContext); /* * If subqueries were recursively planned then we need to replan the query @@ -668,7 +658,6 @@ CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, Query *query, * with an original query. In that case, we would only have to filter the * planner restriction context. */ - subPlanList = context.subPlanList; if (list_length(subPlanList) > 0) { Query *newQuery = copyObject(originalQuery); diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index cb7906ae9..052cc63d5 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -33,6 +33,7 @@ #include "distributed/distributed_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/remote_commands.h" +#include "distributed/recursive_planning.h" #include "distributed/placement_connection.h" #include "distributed/worker_protocol.h" #include "lib/stringinfo.h" diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 938a52064..1f6cc9eb5 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -80,7 +80,6 @@ typedef MultiNode *(*RuleApplyFunction) (MultiNode *leftNode, MultiNode *rightNo static RuleApplyFunction RuleApplyFunctionArray[JOIN_RULE_LAST] = { 0 }; /* join rules */ /* Local functions forward declarations */ -static bool SingleRelationRepartitionSubquery(Query *queryTree); static DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryPushdown(Query * originalQuery, PlannerRestrictionContext @@ -98,9 +97,6 @@ static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEn queryRteIdentities); static Relids QueryRteIdentities(Query *queryTree); static DeferredErrorMessage * DeferErrorIfFromClauseRecurs(Query *queryTree); -static DeferredErrorMessage * DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, - bool - outerMostQueryHasLimit); static DeferredErrorMessage * DeferErrorIfUnsupportedUnionQuery(Query *queryTree, bool outerMostQueryHasLimit); @@ -108,7 +104,6 @@ static bool ExtractSetOperationStatmentWalker(Node *node, List **setOperationLis static DeferredErrorMessage * DeferErrorIfUnsupportedTableCombination(Query *queryTree); static bool WindowPartitionOnDistributionColumn(Query *query); static bool AllTargetExpressionsAreColumnReferences(List *targetEntryList); -static bool FindNodeCheckInRangeTableList(List *rtable, bool (*check)(Node *)); static bool IsDistributedTableRTE(Node *node); static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query); static bool FullCompositeFieldList(List *compositeFieldList); @@ -472,7 +467,7 @@ SubqueryMultiNodeTree(Query *originalQuery, Query *queryTree, * to ensure that Citus supports the subquery. Also, this function is designed to run * on the original query. */ -static bool +bool SingleRelationRepartitionSubquery(Query *queryTree) { List *rangeTableIndexList = NULL; @@ -930,7 +925,7 @@ DeferErrorIfFromClauseRecurs(Query *queryTree) * limit, we let this query to run, but results could be wrong depending on the * features of underlying tables. */ -static DeferredErrorMessage * +DeferredErrorMessage * DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLimit) { bool preconditionsSatisfied = true; @@ -1517,7 +1512,7 @@ AllTargetExpressionsAreColumnReferences(List *targetEntryList) * FindNodeCheckInRangeTableList relies on FindNodeCheck() but only * considers the range table entries. */ -static bool +bool FindNodeCheckInRangeTableList(List *rtable, bool (*check)(Node *)) { return range_table_walker(rtable, FindNodeCheck, check, QTW_EXAMINE_RTES); @@ -2259,8 +2254,12 @@ DeferErrorIfQueryNotSupported(Query *queryTree) if (queryTree->hasWindowFuncs) { preconditionsSatisfied = false; - errorMessage = "could not run distributed query with window functions"; - errorHint = filterHint; + errorMessage = "could not run distributed query because the window " + "function that is used cannot be pushed down"; + errorHint = "Window functions are supported in two ways. Either add " + "an equality filter on the distributed tables' partition " + "column or use the window functions inside a subquery with " + "a PARTITION BY clause containing the distribution column"; } if (queryTree->setOperations) @@ -2642,10 +2641,12 @@ HasComplexRangeTableType(Query *queryTree) /* * Check if the range table in the join tree is a simple relation or a - * subquery. + * subquery or a function. Note that RTE_FUNCTIONs are handled via (sub)query + * pushdown. */ if (rangeTableEntry->rtekind != RTE_RELATION && - rangeTableEntry->rtekind != RTE_SUBQUERY) + rangeTableEntry->rtekind != RTE_SUBQUERY && + rangeTableEntry->rtekind != RTE_FUNCTION) { hasComplexRangeTableType = true; } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 9f334545d..2ba9dceb5 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2093,7 +2093,19 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte */ if (targetCacheEntry == NULL) { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(rangeTableList); + RangeTblEntry *rangeTableEntry = NULL; + + if (list_length(rangeTableList) == 0) + { + /* + * User disabled the router planner and forced planner go through + * subquery pushdown, but we cannot continue anymore. + */ + ereport(ERROR, (errmsg("cannot handle complex subqueries when the " + "router executor is disabled"))); + } + + rangeTableEntry = (RangeTblEntry *) linitial(rangeTableList); relationId = rangeTableEntry->relid; targetCacheEntry = DistributedTableCacheEntry(relationId); } diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 03182412f..cd2871ec4 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -52,6 +52,7 @@ #include "postgres.h" #include "catalog/pg_type.h" +#include "catalog/pg_class.h" #include "distributed/citus_nodes.h" #include "distributed/citus_ruleutils.h" #include "distributed/distributed_planner.h" @@ -62,6 +63,7 @@ #include "distributed/multi_router_planner.h" #include "distributed/multi_physical_planner.h" #include "distributed/recursive_planning.h" +#include "distributed/multi_server_executor.h" #include "distributed/relation_restriction_equivalence.h" #include "lib/stringinfo.h" #include "optimizer/planner.h" @@ -73,6 +75,20 @@ #include "utils/guc.h" +/* + * RecursivePlanningContext is used to recursively plan subqueries + * and CTEs, pull results to the coordinator, and push it back into + * the workers. + */ +typedef struct RecursivePlanningContext +{ + int level; + uint64 planId; + List *subPlanList; + PlannerRestrictionContext *plannerRestrictionContext; +} RecursivePlanningContext; + + /* * CteReferenceWalkerContext is used to collect CTE references in * CteReferenceListWalker. @@ -94,8 +110,16 @@ typedef struct VarLevelsUpWalkerContext /* local function forward declarations */ +static DeferredErrorMessage * RecursivelyPlanSubqueriesAndCTEs(Query *query, + RecursivePlanningContext * + context); static DeferredErrorMessage * RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *context); +static bool RecursivelyPlanSubqueryWalker(Node *node, RecursivePlanningContext *context); +static bool ShouldRecursivelyPlanSubquery(Query *subquery); +static bool IsLocalTableRTE(Node *node); +static void RecursivelyPlanSubquery(Query *subquery, + RecursivePlanningContext *planningContext); static DistributedSubPlan * CreateDistributedSubPlan(uint32 subPlanId, Query *subPlanQuery); static bool CteReferenceListWalker(Node *node, CteReferenceWalkerContext *context); @@ -105,6 +129,39 @@ static bool ContainsReferencesToOuterQueryWalker(Node *node, static Query * BuildSubPlanResultQuery(Query *subquery, uint64 planId, uint32 subPlanId); +/* + * GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs. + * The function returns the subplans if necessary. For the details of when/how subplans are + * generated, see RecursivelyPlanSubqueriesAndCTEs(). + * + * Note that the input originalQuery query is modified if any subplans are generated. + */ +List * +GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext) +{ + RecursivePlanningContext context; + DeferredErrorMessage *error = NULL; + + /* + * Plan subqueries and CTEs that cannot be pushed down by recursively + * calling the planner and add the resulting plans to subPlanList. + */ + context.level = 0; + context.planId = planId; + context.subPlanList = NIL; + context.plannerRestrictionContext = plannerRestrictionContext; + + error = RecursivelyPlanSubqueriesAndCTEs(originalQuery, &context); + if (error != NULL) + { + RaiseDeferredError(error, ERROR); + } + + return context.subPlanList; +} + + /* * RecursivelyPlanSubqueriesAndCTEs finds subqueries and CTEs that cannot be pushed down to * workers directly and instead plans them by recursively calling the planner and @@ -119,12 +176,12 @@ static Query * BuildSubPlanResultQuery(Query *subquery, uint64 planId, uint32 su * If recursive planning results in an error then the error is returned. Otherwise, the * subplans will be added to subPlanList. */ -DeferredErrorMessage * +static DeferredErrorMessage * RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context) { DeferredErrorMessage *error = NULL; - error = RecursivelyPlanCTEs(query, &context); + error = RecursivelyPlanCTEs(query, context); if (error != NULL) { return error; @@ -224,7 +281,7 @@ RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext) subPlanId = list_length(planningContext->subPlanList) + 1; - if (log_min_messages >= DEBUG1) + if (log_min_messages <= DEBUG1 || client_min_messages <= DEBUG1) { StringInfo subPlanString = makeStringInfo(); pg_get_query_def(subquery, subPlanString); @@ -294,6 +351,201 @@ RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext) } +/* + * RecursivelyPlanSubqueryWalker recursively finds all the Query nodes and + * recursively plans if necessary. + */ +static bool +RecursivelyPlanSubqueryWalker(Node *node, RecursivePlanningContext *context) +{ + if (node == NULL) + { + return false; + } + + if (IsA(node, Query)) + { + Query *query = (Query *) node; + DeferredErrorMessage *error = NULL; + + context->level += 1; + + /* + * First, make sure any subqueries and CTEs within this subquery + * are recursively planned if necessary. + */ + error = RecursivelyPlanSubqueriesAndCTEs(query, context); + if (error != NULL) + { + RaiseDeferredError(error, ERROR); + } + context->level -= 1; + + /* + * Recursively plan this subquery if it cannot be pushed down and is + * eligible for recursive planning. + */ + if (ShouldRecursivelyPlanSubquery(query)) + { + RecursivelyPlanSubquery(query, context); + } + + /* we're done, no need to recurse anymore for this query */ + return false; + } + + return expression_tree_walker(node, RecursivelyPlanSubqueryWalker, context); +} + + +/* + * ShouldRecursivelyPlanSubquery decides whether the input subquery should be recursively + * planned or not. + * + * For the details, see the cases in the function. + */ +static bool +ShouldRecursivelyPlanSubquery(Query *subquery) +{ + if (FindNodeCheckInRangeTableList(subquery->rtable, IsLocalTableRTE)) + { + /* + * Postgres can always plan queries that don't require distributed planning. + * Note that we need to check this first, otherwise the calls to the many other + * Citus planner functions would error our due to local relations. + * + * TODO: We could only successfully create distributed plans with local tables + * when the local tables are on the leaf queries and the upper level queries + * do not contain any other local tables. + */ + } + else if (DeferErrorIfCannotPushdownSubquery(subquery, false) == NULL) + { + /* + * Citus can pushdown this subquery, no need to recursively + * plan which is much expensive than pushdown. + */ + return false; + } + else if (TaskExecutorType == MULTI_EXECUTOR_TASK_TRACKER && + SingleRelationRepartitionSubquery(subquery)) + { + /* + * Citus can plan this and execute via repartitioning. Thus, + * no need to recursively plan. + */ + return false; + } + + /* + * Even if we could recursively plan the subquery, we should ensure + * that the subquery doesn't contain any references to the outer + * queries. + */ + if (ContainsReferencesToOuterQuery(subquery)) + { + elog(DEBUG2, "skipping recursive planning for the subquery since it " + "contains references to outer queries"); + + return false; + } + + return true; +} + + +/* + * IsLocalTableRTE gets a node and returns true if the node + * is a range table relation entry that points to a local + * relation (i.e., not a distributed relation). + */ +static bool +IsLocalTableRTE(Node *node) +{ + RangeTblEntry *rangeTableEntry = NULL; + Oid relationId = InvalidOid; + + if (node == NULL) + { + return false; + } + + if (!IsA(node, RangeTblEntry)) + { + return false; + } + + rangeTableEntry = (RangeTblEntry *) node; + if (rangeTableEntry->rtekind != RTE_RELATION) + { + return false; + } + + if (rangeTableEntry->relkind == RELKIND_VIEW) + { + return false; + } + + relationId = rangeTableEntry->relid; + if (IsDistributedTable(relationId)) + { + return false; + } + + /* local table found */ + return true; +} + + +/* + * RecursivelyPlanQuery recursively plans a query, replaces it with a + * result query and returns the subplan. + */ +static void +RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningContext) +{ + DistributedSubPlan *subPlan = NULL; + uint64 planId = planningContext->planId; + int subPlanId = 0; + + Query *resultQuery = NULL; + Query *debugQuery = NULL; + + /* + * Subquery will go through the standard planner, thus to properly deparse it + * we keep its copy: debugQuery. + */ + if (log_min_messages <= DEBUG1 || client_min_messages <= DEBUG1) + { + debugQuery = copyObject(subquery); + } + + /* + * Create the subplan and append it to the list in the planning context. + */ + subPlanId = list_length(planningContext->subPlanList) + 1; + + subPlan = CreateDistributedSubPlan(subPlanId, subquery); + planningContext->subPlanList = lappend(planningContext->subPlanList, subPlan); + + resultQuery = BuildSubPlanResultQuery(subquery, planId, subPlanId); + + if (log_min_messages <= DEBUG1 || client_min_messages <= DEBUG1) + { + StringInfo subqueryString = makeStringInfo(); + + pg_get_query_def(debugQuery, subqueryString); + + ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT "_%u for " + "subquery %s", + planId, subPlanId, subqueryString->data))); + } + + /* finally update the input subquery to point the result query */ + memcpy(subquery, resultQuery, sizeof(Query)); +} + + /* * CreateDistributedSubPlan creates a distributed subplan by recursively calling * the planner from the top, which may either generate a local plan or another diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 15daed0ef..b1e1d55da 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -187,11 +187,16 @@ extern bool SubqueryPushdown; extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree, PlannerRestrictionContext * plannerRestrictionContext); +extern bool SingleRelationRepartitionSubquery(Query *queryTree); +extern DeferredErrorMessage * DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, + bool + outerMostQueryHasLimit); extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery( PlannerRestrictionContext *plannerRestrictionContext, Query *query); extern bool SafeToPushdownWindowFunction(Query *query, StringInfo *errorDetail); extern bool TargetListOnPartitionColumn(Query *query, List *targetEntryList); +extern bool FindNodeCheckInRangeTableList(List *rtable, bool (*check)(Node *)); extern bool ContainsReadIntermediateResultFunction(Node *node); extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode); diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index 1f6ca5112..8e2d6b2e3 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -18,23 +18,9 @@ #include "nodes/relation.h" -/* - * RecursivePlanningContext is used to recursively plan subqueries - * and CTEs, pull results to the coordinator, and push it back into - * the workers. - */ -typedef struct RecursivePlanningContext -{ - int level; - uint64 planId; - List *subPlanList; - PlannerRestrictionContext *plannerRestrictionContext; -} RecursivePlanningContext; - - -extern DeferredErrorMessage * RecursivelyPlanSubqueriesAndCTEs(Query *query, - RecursivePlanningContext * - context); +extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, + PlannerRestrictionContext * + plannerRestrictionContext); extern char * GenerateResultId(uint64 planId, uint32 subPlanId); diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index ab5c97c65..b8c1553dc 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -726,8 +726,8 @@ INSERT INTO (SELECT user_id FROM raw_events_first); DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries DEBUG: Collecting INSERT ... SELECT results on coordinator -ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys -DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- If the query is router plannable then it is executed via the coordinator INSERT INTO raw_events_first(user_id) @@ -1066,6 +1066,7 @@ DEBUG: Group by list without distribution column is not allowed in distributed DEBUG: Collecting INSERT ... SELECT results on coordinator ERROR: cannot push down this subquery DETAIL: Group by list without partition column is currently unsupported +DEBUG: generating subplan 86_1 for subquery SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.value_3 AS id FROM public.raw_events_first, public.raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id) GROUP BY raw_events_second.value_3 -- error cases -- no part column at all INSERT INTO raw_events_second @@ -1183,6 +1184,7 @@ DEBUG: Group by list without distribution column is not allowed in distributed DEBUG: Collecting INSERT ... SELECT results on coordinator ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. +DEBUG: generating subplan 105_1 for subquery SELECT sum(raw_events_second.value_4) AS v4, raw_events_second.value_1 AS v1, sum(raw_events_second.user_id) AS id FROM public.raw_events_first, public.raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id) GROUP BY raw_events_second.value_1 HAVING (sum(raw_events_second.value_4) > (10)::numeric) -- the second part of the query is not routable since -- GROUP BY not on the partition column (i.e., value_1) and thus join -- on f.id = f2.id is not on the partition key (instead on the sum of partition key) @@ -1212,6 +1214,7 @@ DEBUG: Group by list without distribution column is not allowed in distributed DEBUG: Collecting INSERT ... SELECT results on coordinator ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. +DEBUG: generating subplan 108_1 for subquery SELECT sum(raw_events_second.value_4) AS v4, raw_events_second.value_1 AS v1, sum(raw_events_second.user_id) AS id FROM public.raw_events_first, public.raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id) GROUP BY raw_events_second.value_1 HAVING (sum(raw_events_second.value_4) > (10)::numeric) -- cannot pushdown the query since the JOIN is not equi JOIN INSERT INTO agg_events (user_id, value_4_agg) diff --git a/src/test/regress/expected/multi_insert_select_window.out b/src/test/regress/expected/multi_insert_select_window.out index cc1e5c38a..f0ba6d581 100644 --- a/src/test/regress/expected/multi_insert_select_window.out +++ b/src/test/regress/expected/multi_insert_select_window.out @@ -653,8 +653,8 @@ ORDER BY 3 DESC, 1 DESC, 2 DESC LIMIT 10; -ERROR: cannot push down this subquery -DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- user needs to supply partition by which should -- include the distribution key INSERT INTO agg_results_window (user_id, agg_time, value_2_agg) @@ -672,8 +672,8 @@ ORDER BY 3 DESC, 1 DESC, 2 DESC LIMIT 10; -ERROR: cannot push down this subquery -DETAIL: Window functions without PARTITION BY on distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- user needs to supply partition by which should -- include the distribution key INSERT INTO agg_results_window (user_id, agg_time, value_2_agg) @@ -691,8 +691,8 @@ ORDER BY 3 DESC, 1 DESC, 2 DESC LIMIT 10; -ERROR: cannot push down this subquery -DETAIL: Window functions without PARTITION BY on distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- w2 should not be pushed down INSERT INTO agg_results_window (user_id, value_1_agg, value_2_agg) SELECT * FROM @@ -709,8 +709,8 @@ SELECT * FROM ) as foo LIMIT 10; -ERROR: cannot push down this subquery -DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- GROUP BY includes the partition key, but not the WINDOW function INSERT INTO agg_results_window (user_id, agg_time, value_2_agg) SELECT @@ -727,8 +727,8 @@ FROM ) as foo WHERE my_rank > 125; -ERROR: cannot push down this subquery -DETAIL: Window functions without PARTITION BY on distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- GROUP BY includes the partition key, but not the WINDOW function INSERT INTO agg_results_window (user_id, agg_time, value_2_agg) SELECT @@ -745,8 +745,8 @@ FROM ) as foo WHERE my_rank > 125; -ERROR: cannot push down this subquery -DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- w2 should not be allowed INSERT INTO agg_results_window (user_id, value_2_agg, value_3_agg) SELECT * FROM @@ -761,8 +761,8 @@ SELECT * FROM WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time), w2 AS (ORDER BY events_table.time) ) as foo; -ERROR: cannot push down this subquery -DETAIL: Window functions without PARTITION BY on distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- unsupported window function with an override INSERT INTO agg_results_window(user_id, agg_time, value_2_agg) SELECT * FROM ( @@ -779,8 +779,8 @@ SELECT * FROM ( WINDOW w2 as (PARTITION BY user_id, time) ) a; -ERROR: cannot push down this subquery -DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- Subquery in where with unsupported window function INSERT INTO agg_results_window(user_id) SELECT @@ -811,8 +811,8 @@ SELECT * FROM ( GROUP BY user_id ) a; -ERROR: cannot push down this subquery -DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- UNION with only one subquery which has a partition on non-distribution column should -- error out INSERT INTO agg_results_window(user_id, value_1_agg) @@ -849,6 +849,6 @@ FROM ( user_id ) ) AS ftop; -ERROR: cannot push down this subquery -DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column DROP VIEW view_with_window_func; diff --git a/src/test/regress/expected/multi_mx_router_planner.out b/src/test/regress/expected/multi_mx_router_planner.out index 62868945b..97c295a8e 100644 --- a/src/test/regress/expected/multi_mx_router_planner.out +++ b/src/test/regress/expected/multi_mx_router_planner.out @@ -425,10 +425,18 @@ DEBUG: Plan is router executable 43 | 3 | affixal | 12723 | 3 (10 rows) --- they are not supported if multiple workers are involved -SELECT * FROM articles_hash_mx, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 2; -ERROR: could not run distributed query with complex table expressions -HINT: Consider using an equality filter on the distributed table's partition column. +-- they are supported via (sub)query pushdown if multiple workers are involved +SELECT * FROM articles_hash_mx, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 2 ORDER BY 4 DESC, 1 DESC, 2 DESC LIMIT 5; +DEBUG: push down of limit count: 5 + id | author_id | title | word_count | position +----+-----------+------------+------------+---------- + 12 | 2 | archiblast | 18185 | 3 + 42 | 2 | ausable | 15885 | 3 + 2 | 2 | abducing | 13642 | 3 + 41 | 1 | aznavour | 11814 | 3 + 32 | 2 | amazon | 11342 | 3 +(5 rows) + -- subqueries are supported in FROM clause but they are not router plannable SELECT articles_hash_mx.id,test.word_count FROM articles_hash_mx, (SELECT id, word_count FROM articles_hash_mx) AS test WHERE test.id = articles_hash_mx.id @@ -1137,13 +1145,13 @@ DEBUG: Plan is router executable SELECT id, MIN(id) over (order by word_count) FROM articles_hash_mx WHERE author_id = 1 or author_id = 2; -ERROR: could not run distributed query with window functions -HINT: Consider using an equality filter on the distributed table's partition column. +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count FROM articles_hash_mx WHERE author_id = 5 or author_id = 2; -ERROR: could not run distributed query with window functions -HINT: Consider using an equality filter on the distributed table's partition column. +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- complex query hitting a single shard SELECT count(DISTINCT CASE diff --git a/src/test/regress/expected/multi_recursive_subquery_partitioning_0.out b/src/test/regress/expected/multi_recursive_subquery_partitioning_0.out new file mode 100644 index 000000000..3e3771511 --- /dev/null +++ b/src/test/regress/expected/multi_recursive_subquery_partitioning_0.out @@ -0,0 +1,246 @@ +-- =================================================================== +-- test recursive planning functionality on partitioned tables +-- =================================================================== +CREATE SCHEMA subquery_and_partitioning; +SET search_path TO subquery_and_partitioning, public; +CREATE TABLE users_table_local AS SELECT * FROM users_table; +CREATE TABLE events_table_local AS SELECT * FROM events_table; +CREATE TABLE partitioning_test(id int, value_1 int, time date) PARTITION BY RANGE (time); +ERROR: syntax error at or near "PARTITION" +LINE 1: ...partitioning_test(id int, value_1 int, time date) PARTITION ... + ^ + +-- create its partitions +CREATE TABLE partitioning_test_2017 PARTITION OF partitioning_test FOR VALUES FROM ('2017-01-01') TO ('2018-01-01'); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test_2017 PARTITION OF partitionin... + ^ +CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test_2010 PARTITION OF partitionin... + ^ +-- load some data and distribute tables +INSERT INTO partitioning_test VALUES (1, 1, '2017-11-23'); +ERROR: relation "partitioning_test" does not exist +LINE 1: INSERT INTO partitioning_test VALUES (1, 1, '2017-11-23'); + ^ +INSERT INTO partitioning_test VALUES (2, 1, '2010-07-07'); +ERROR: relation "partitioning_test" does not exist +LINE 1: INSERT INTO partitioning_test VALUES (2, 1, '2010-07-07'); + ^ +INSERT INTO partitioning_test_2017 VALUES (3, 3, '2017-11-22'); +ERROR: relation "partitioning_test_2017" does not exist +LINE 1: INSERT INTO partitioning_test_2017 VALUES (3, 3, '2017-11-22... + ^ +INSERT INTO partitioning_test_2010 VALUES (4, 4, '2010-03-03'); +ERROR: relation "partitioning_test_2010" does not exist +LINE 1: INSERT INTO partitioning_test_2010 VALUES (4, 4, '2010-03-03... + ^ +-- distribute partitioned table +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('partitioning_test', 'id'); +ERROR: relation "partitioning_test" does not exist +LINE 1: SELECT create_distributed_table('partitioning_test', 'id'); + ^ +SET client_min_messages TO DEBUG1; +-- subplan for partitioned tables +SELECT + id +FROM + (SELECT + DISTINCT partitioning_test.id + FROM + partitioning_test + LIMIT 5 + ) as foo + ORDER BY 1 DESC; +ERROR: relation "partitioning_test" does not exist +LINE 7: partitioning_test + ^ +-- final query is router on partitioned tables +SELECT + * +FROM + (SELECT + DISTINCT partitioning_test.id + FROM + partitioning_test + LIMIT 5 + ) as foo, + (SELECT + DISTINCT partitioning_test.time + FROM + partitioning_test + LIMIT 5 + ) as bar + WHERE foo.id = date_part('day', bar.time) + ORDER BY 2 DESC, 1; +ERROR: relation "partitioning_test" does not exist +LINE 7: partitioning_test + ^ +-- final query is real-time +SELECT + * +FROM + (SELECT + DISTINCT partitioning_test.time + FROM + partitioning_test + ORDER BY 1 DESC + LIMIT 5 + ) as foo, + ( + SELECT + DISTINCT partitioning_test.id + FROM + partitioning_test + ) as bar + WHERE date_part('day', foo.time) = bar.id + ORDER BY 2 DESC, 1 DESC + LIMIT 3; +ERROR: relation "partitioning_test" does not exist +LINE 7: partitioning_test + ^ +-- final query is real-time that is joined with partitioned table +SELECT + * +FROM + (SELECT + DISTINCT partitioning_test.time + FROM + partitioning_test + ORDER BY 1 DESC + LIMIT 5 + ) as foo, + ( + SELECT + DISTINCT partitioning_test.id + FROM + partitioning_test + ) as bar, + partitioning_test + WHERE date_part('day', foo.time) = bar.id AND partitioning_test.id = bar.id + ORDER BY 2 DESC, 1 DESC + LIMIT 3; +ERROR: relation "partitioning_test" does not exist +LINE 7: partitioning_test + ^ +-- subquery in WHERE clause +SELECT DISTINCT id +FROM partitioning_test +WHERE + id IN (SELECT DISTINCT date_part('day', time) FROM partitioning_test); +ERROR: relation "partitioning_test" does not exist +LINE 2: FROM partitioning_test + ^ +-- repartition subquery +SET citus.enable_repartition_joins to ON; +SELECT + count(*) +FROM +( + SELECT DISTINCT p1.value_1 FROM partitioning_test as p1, partitioning_test as p2 WHERE p1.id = p2.value_1 +) as foo, +( + SELECT user_id FROM users_table +) as bar +WHERE foo.value_1 = bar.user_id; +ERROR: relation "partitioning_test" does not exist +LINE 5: SELECT DISTINCT p1.value_1 FROM partitioning_test as p1, pa... + ^ +SET citus.enable_repartition_joins to OFF; +-- subquery, cte, view and non-partitioned tables +CREATE VIEW subquery_and_ctes AS +SELECT + * +FROM +( + WITH cte AS ( + WITH local_cte AS ( + SELECT * FROM users_table_local + ), + dist_cte AS ( + SELECT + user_id + FROM + events_table, + (SELECT DISTINCT value_1 FROM partitioning_test OFFSET 0) as foo + WHERE + events_table.user_id = foo.value_1 AND + events_table.user_id IN (SELECT DISTINCT value_1 FROM users_table ORDER BY 1 LIMIT 3) + ) + SELECT dist_cte.user_id FROM local_cte join dist_cte on dist_cte.user_id=local_cte.user_id +) +SELECT + count(*) as cnt +FROM + cte, + (SELECT + DISTINCT events_table.user_id + FROM + partitioning_test, events_table + WHERE + events_table.user_id = partitioning_test.id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo + WHERE foo.user_id = cte.user_id +) as foo, users_table WHERE foo.cnt > users_table.value_2; +ERROR: relation "partitioning_test" does not exist +LINE 15: (SELECT DISTINCT value_1 FROM partitioning_test OFFSET 0)... + ^ +SELECT * FROM subquery_and_ctes +ORDER BY 3 DESC, 1 DESC, 2 DESC, 4 DESC +LIMIT 5; +ERROR: relation "subquery_and_ctes" does not exist +LINE 1: SELECT * FROM subquery_and_ctes + ^ +-- deep subquery, partitioned and non-partitioned tables together +SELECT count(*) +FROM +( + SELECT avg(min) FROM + ( + SELECT min(partitioning_test.value_1) FROM + ( + SELECT avg(event_type) as avg_ev_type FROM + ( + SELECT + max(value_1) as mx_val_1 + FROM ( + SELECT + avg(event_type) as avg + FROM + ( + SELECT + cnt + FROM + (SELECT count(*) as cnt, value_1 FROM partitioning_test GROUP BY value_1) as level_1, users_table + WHERE + users_table.user_id = level_1.cnt + ) as level_2, events_table + WHERE events_table.user_id = level_2.cnt + GROUP BY level_2.cnt + ) as level_3, users_table + WHERE user_id = level_3.avg + GROUP BY level_3.avg + ) as level_4, events_table + WHERE level_4.mx_val_1 = events_table.user_id + GROUP BY level_4.mx_val_1 + ) as level_5, partitioning_test + WHERE + level_5.avg_ev_type = partitioning_test.id + GROUP BY + level_5.avg_ev_type + ) as level_6, users_table WHERE users_table.user_id = level_6.min + GROUP BY users_table.value_1 + ) as bar; +ERROR: relation "partitioning_test" does not exist +LINE 20: (SELECT count(*) as cnt, value_1 FROM partitioning_... + ^ +SET client_min_messages TO DEFAULT; +DROP SCHEMA subquery_and_partitioning CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table users_table_local +drop cascades to table events_table_local +SET search_path TO public; diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index 0c7221d15..4a40f6cd8 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -525,10 +525,18 @@ DEBUG: Plan is router executable 43 | 3 | affixal | 12723 | 3 (10 rows) --- they are not supported if multiple workers are involved -SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 2; -ERROR: could not run distributed query with complex table expressions -HINT: Consider using an equality filter on the distributed table's partition column. +-- they are supported via (sub)query pushdown if multiple workers are involved +SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 2 ORDER BY 4 DESC, 1 DESC, 2 DESC LIMIT 5; +DEBUG: push down of limit count: 5 + id | author_id | title | word_count | position +----+-----------+------------+------------+---------- + 12 | 2 | archiblast | 18185 | 3 + 42 | 2 | ausable | 15885 | 3 + 2 | 2 | abducing | 13642 | 3 + 41 | 1 | aznavour | 11814 | 3 + 32 | 2 | amazon | 11342 | 3 +(5 rows) + -- unless the query can be transformed into a join SELECT * FROM articles_hash WHERE author_id IN (SELECT author_id FROM articles_hash WHERE author_id = 2) @@ -1267,13 +1275,13 @@ DEBUG: Plan is router executable SELECT id, MIN(id) over (order by word_count) FROM articles_hash WHERE author_id = 1 or author_id = 2; -ERROR: could not run distributed query with window functions -HINT: Consider using an equality filter on the distributed table's partition column. +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count FROM articles_hash WHERE author_id = 5 or author_id = 2; -ERROR: could not run distributed query with window functions -HINT: Consider using an equality filter on the distributed table's partition column. +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- where false queries are router plannable SELECT * FROM articles_hash diff --git a/src/test/regress/expected/multi_simple_queries.out b/src/test/regress/expected/multi_simple_queries.out index b997589ee..06d1e68c5 100644 --- a/src/test/regress/expected/multi_simple_queries.out +++ b/src/test/regress/expected/multi_simple_queries.out @@ -184,11 +184,17 @@ SELECT title FROM articles ORDER BY 1 LIMIT 5; ablation (5 rows) --- queries which involve functions in FROM clause are unsupported. -SELECT * FROM articles, position('om' in 'Thomas'); -ERROR: could not run distributed query with complex table expressions -HINT: Consider using an equality filter on the distributed table's partition column. --- subqueries are not supported in WHERE clause in Citus +-- queries which involve functions in FROM clause are recursively planned +SELECT * FROM articles, position('om' in 'Thomas') ORDER BY 2 DESC, 1 DESC, 3 DESC LIMIT 5; + id | author_id | title | word_count | position +----+-----------+------------+------------+---------- + 50 | 10 | anjanette | 19519 | 3 + 40 | 10 | attemper | 14976 | 3 + 30 | 10 | andelee | 6363 | 3 + 20 | 10 | absentness | 1820 | 3 + 10 | 10 | aggrandize | 17277 | 3 +(5 rows) + SELECT * FROM articles WHERE author_id IN (SELECT id FROM authors WHERE name LIKE '%a'); ERROR: relation authors is not distributed -- subqueries are supported in FROM clause diff --git a/src/test/regress/expected/multi_simple_queries_0.out b/src/test/regress/expected/multi_simple_queries_0.out new file mode 100644 index 000000000..c04e36c28 --- /dev/null +++ b/src/test/regress/expected/multi_simple_queries_0.out @@ -0,0 +1,595 @@ +SET citus.next_shard_id TO 850000; +-- =================================================================== +-- test end-to-end query functionality +-- =================================================================== +CREATE TABLE articles ( + id bigint NOT NULL, + author_id bigint NOT NULL, + title varchar(20) NOT NULL, + word_count integer NOT NULL CHECK (word_count > 0) +); +-- this table is used in a CTE test +CREATE TABLE authors ( name text, id bigint ); +-- this table is used in router executor tests +CREATE TABLE articles_single_shard (LIKE articles); +SELECT master_create_distributed_table('articles', 'author_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_distributed_table('articles_single_shard', 'author_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('articles', 2, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +SELECT master_create_worker_shards('articles_single_shard', 1, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +-- create a bunch of test data +INSERT INTO articles VALUES ( 1, 1, 'arsenous', 9572); +INSERT INTO articles VALUES ( 2, 2, 'abducing', 13642); +INSERT INTO articles VALUES ( 3, 3, 'asternal', 10480); +INSERT INTO articles VALUES ( 4, 4, 'altdorfer', 14551); +INSERT INTO articles VALUES ( 5, 5, 'aruru', 11389); +INSERT INTO articles VALUES ( 6, 6, 'atlases', 15459); +INSERT INTO articles VALUES ( 7, 7, 'aseptic', 12298); +INSERT INTO articles VALUES ( 8, 8, 'agatized', 16368); +INSERT INTO articles VALUES ( 9, 9, 'alligate', 438); +INSERT INTO articles VALUES (10, 10, 'aggrandize', 17277); +INSERT INTO articles VALUES (11, 1, 'alamo', 1347); +INSERT INTO articles VALUES (12, 2, 'archiblast', 18185); +INSERT INTO articles VALUES (13, 3, 'aseyev', 2255); +INSERT INTO articles VALUES (14, 4, 'andesite', 19094); +INSERT INTO articles VALUES (15, 5, 'adversa', 3164); +INSERT INTO articles VALUES (16, 6, 'allonym', 2); +INSERT INTO articles VALUES (17, 7, 'auriga', 4073); +INSERT INTO articles VALUES (18, 8, 'assembly', 911); +INSERT INTO articles VALUES (19, 9, 'aubergiste', 4981); +INSERT INTO articles VALUES (20, 10, 'absentness', 1820); +INSERT INTO articles VALUES (21, 1, 'arcading', 5890); +INSERT INTO articles VALUES (22, 2, 'antipope', 2728); +INSERT INTO articles VALUES (23, 3, 'abhorring', 6799); +INSERT INTO articles VALUES (24, 4, 'audacious', 3637); +INSERT INTO articles VALUES (25, 5, 'antehall', 7707); +INSERT INTO articles VALUES (26, 6, 'abington', 4545); +INSERT INTO articles VALUES (27, 7, 'arsenous', 8616); +INSERT INTO articles VALUES (28, 8, 'aerophyte', 5454); +INSERT INTO articles VALUES (29, 9, 'amateur', 9524); +INSERT INTO articles VALUES (30, 10, 'andelee', 6363); +INSERT INTO articles VALUES (31, 1, 'athwartships', 7271); +INSERT INTO articles VALUES (32, 2, 'amazon', 11342); +INSERT INTO articles VALUES (33, 3, 'autochrome', 8180); +INSERT INTO articles VALUES (34, 4, 'amnestied', 12250); +INSERT INTO articles VALUES (35, 5, 'aminate', 9089); +INSERT INTO articles VALUES (36, 6, 'ablation', 13159); +INSERT INTO articles VALUES (37, 7, 'archduchies', 9997); +INSERT INTO articles VALUES (38, 8, 'anatine', 14067); +INSERT INTO articles VALUES (39, 9, 'anchises', 10906); +INSERT INTO articles VALUES (40, 10, 'attemper', 14976); +INSERT INTO articles VALUES (41, 1, 'aznavour', 11814); +INSERT INTO articles VALUES (42, 2, 'ausable', 15885); +INSERT INTO articles VALUES (43, 3, 'affixal', 12723); +INSERT INTO articles VALUES (44, 4, 'anteport', 16793); +INSERT INTO articles VALUES (45, 5, 'afrasia', 864); +INSERT INTO articles VALUES (46, 6, 'atlanta', 17702); +INSERT INTO articles VALUES (47, 7, 'abeyance', 1772); +INSERT INTO articles VALUES (48, 8, 'alkylic', 18610); +INSERT INTO articles VALUES (49, 9, 'anyone', 2681); +INSERT INTO articles VALUES (50, 10, 'anjanette', 19519); +-- insert a single row for the test +INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519); +-- zero-shard modifications should succeed +UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2; +UPDATE articles SET title = '' WHERE 0 = 1; +DELETE FROM articles WHERE author_id = 1 AND author_id = 2; +-- single-shard tests +-- test simple select for a single row +SELECT * FROM articles WHERE author_id = 10 AND id = 50; + id | author_id | title | word_count +----+-----------+-----------+------------ + 50 | 10 | anjanette | 19519 +(1 row) + +-- get all titles by a single author +SELECT title FROM articles WHERE author_id = 10; + title +------------ + aggrandize + absentness + andelee + attemper + anjanette +(5 rows) + +-- try ordering them by word count +SELECT title, word_count FROM articles + WHERE author_id = 10 + ORDER BY word_count DESC NULLS LAST; + title | word_count +------------+------------ + anjanette | 19519 + aggrandize | 17277 + attemper | 14976 + andelee | 6363 + absentness | 1820 +(5 rows) + +-- look at last two articles by an author +SELECT title, id FROM articles + WHERE author_id = 5 + ORDER BY id + LIMIT 2; + title | id +---------+---- + aruru | 5 + adversa | 15 +(2 rows) + +-- find all articles by two authors in same shard +SELECT title, author_id FROM articles + WHERE author_id = 7 OR author_id = 8 + ORDER BY author_id ASC, id; + title | author_id +-------------+----------- + aseptic | 7 + auriga | 7 + arsenous | 7 + archduchies | 7 + abeyance | 7 + agatized | 8 + assembly | 8 + aerophyte | 8 + anatine | 8 + alkylic | 8 +(10 rows) + +-- add in some grouping expressions +SELECT author_id, sum(word_count) AS corpus_size FROM articles + WHERE author_id = 1 OR author_id = 2 OR author_id = 8 OR author_id = 10 + GROUP BY author_id + HAVING sum(word_count) > 40000 + ORDER BY sum(word_count) DESC; + author_id | corpus_size +-----------+------------- + 2 | 61782 + 10 | 59955 + 8 | 55410 +(3 rows) + +-- UNION/INTERSECT queries are unsupported if on multiple shards +SELECT * FROM articles WHERE author_id = 10 UNION +SELECT * FROM articles WHERE author_id = 2; +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. +-- queries using CTEs are supported +WITH long_names AS ( SELECT id FROM authors WHERE char_length(name) > 15 ) +SELECT title FROM articles ORDER BY 1 LIMIT 5; + title +----------- + abducing + abeyance + abhorring + abington + ablation +(5 rows) + +-- queries which involve functions in FROM clause are recursively planned +SELECT * FROM articles, position('om' in 'Thomas') ORDER BY 2 DESC, 1 DESC, 3 DESC LIMIT 5; + id | author_id | title | word_count | position +----+-----------+------------+------------+---------- + 50 | 10 | anjanette | 19519 | 3 + 40 | 10 | attemper | 14976 | 3 + 30 | 10 | andelee | 6363 | 3 + 20 | 10 | absentness | 1820 | 3 + 10 | 10 | aggrandize | 17277 | 3 +(5 rows) + +-- subqueries are supported in WHERE clause in Citus even if the relations are not distributed +SELECT * FROM articles WHERE author_id IN (SELECT id FROM authors WHERE name LIKE '%a'); +ERROR: Complex subqueries and CTEs are not supported when task_executor_type is set to 'task-tracker' +-- subqueries are supported in FROM clause +SELECT articles.id,test.word_count +FROM articles, (SELECT id, word_count FROM articles) AS test WHERE test.id = articles.id +ORDER BY articles.id; + id | word_count +----+------------ + 1 | 9572 + 2 | 13642 + 3 | 10480 + 4 | 14551 + 5 | 11389 + 6 | 15459 + 7 | 12298 + 8 | 16368 + 9 | 438 + 10 | 17277 + 11 | 1347 + 12 | 18185 + 13 | 2255 + 14 | 19094 + 15 | 3164 + 16 | 2 + 17 | 4073 + 18 | 911 + 19 | 4981 + 20 | 1820 + 21 | 5890 + 22 | 2728 + 23 | 6799 + 24 | 3637 + 25 | 7707 + 26 | 4545 + 27 | 8616 + 28 | 5454 + 29 | 9524 + 30 | 6363 + 31 | 7271 + 32 | 11342 + 33 | 8180 + 34 | 12250 + 35 | 9089 + 36 | 13159 + 37 | 9997 + 38 | 14067 + 39 | 10906 + 40 | 14976 + 41 | 11814 + 42 | 15885 + 43 | 12723 + 44 | 16793 + 45 | 864 + 46 | 17702 + 47 | 1772 + 48 | 18610 + 49 | 2681 + 50 | 19519 +(50 rows) + +-- subqueries are not supported in SELECT clause +SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard a2 WHERE a.id = a2.id LIMIT 1) + AS special_price FROM articles a; +ERROR: could not run distributed query with subquery outside the FROM and WHERE clauses +HINT: Consider using an equality filter on the distributed table's partition column. +-- joins are not supported between local and distributed tables +SELECT title, authors.name FROM authors, articles WHERE authors.id = articles.author_id; +ERROR: relation authors is not distributed +-- inner joins are not supported (I think) +SELECT * FROM (articles INNER JOIN authors ON articles.id = authors.id); +ERROR: relation authors is not distributed +-- test use of EXECUTE statements within plpgsql +DO $sharded_execute$ + BEGIN + EXECUTE 'SELECT COUNT(*) FROM articles ' || + 'WHERE author_id = $1 AND author_id = $2' USING 1, 2; + END +$sharded_execute$; +-- test use of bare SQL within plpgsql +DO $sharded_sql$ + BEGIN + SELECT COUNT(*) FROM articles WHERE author_id = 1 AND author_id = 2; + END +$sharded_sql$; +ERROR: query has no destination for result data +HINT: If you want to discard the results of a SELECT, use PERFORM instead. +CONTEXT: PL/pgSQL function inline_code_block line 3 at SQL statement +-- test cross-shard queries +SELECT COUNT(*) FROM articles; + count +------- + 50 +(1 row) + +-- test with empty target list +SELECT FROM articles; +-- +(50 rows) + +SELECT FROM articles WHERE author_id = 3737; +-- +(0 rows) + +SELECT FROM articles WHERE word_count = 65500; +-- +(0 rows) + +-- having queries supported in Citus +SELECT author_id, sum(word_count) AS corpus_size FROM articles + GROUP BY author_id + HAVING sum(word_count) > 25000 + ORDER BY sum(word_count) DESC + LIMIT 5; + author_id | corpus_size +-----------+------------- + 4 | 66325 + 2 | 61782 + 10 | 59955 + 8 | 55410 + 6 | 50867 +(5 rows) + +SELECT author_id FROM articles + GROUP BY author_id + HAVING sum(word_count) > 50000 + ORDER BY author_id; + author_id +----------- + 2 + 4 + 6 + 8 + 10 +(5 rows) + +SELECT author_id FROM articles + GROUP BY author_id + HAVING sum(word_count) > 50000 AND author_id < 5 + ORDER BY author_id; + author_id +----------- + 2 + 4 +(2 rows) + +SELECT author_id FROM articles + GROUP BY author_id + HAVING sum(word_count) > 50000 OR author_id < 5 + ORDER BY author_id; + author_id +----------- + 1 + 2 + 3 + 4 + 6 + 8 + 10 +(7 rows) + +SELECT author_id FROM articles + GROUP BY author_id + HAVING author_id <= 2 OR author_id = 8 + ORDER BY author_id; + author_id +----------- + 1 + 2 + 8 +(3 rows) + +SELECT o_orderstatus, count(*), avg(o_totalprice) FROM orders + GROUP BY o_orderstatus + HAVING count(*) > 1450 OR avg(o_totalprice) > 150000 + ORDER BY o_orderstatus; + o_orderstatus | count | avg +---------------+-------+--------------------- + O | 1460 | 143355.847013698630 + P | 75 | 164847.914533333333 +(2 rows) + +SELECT o_orderstatus, sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders + WHERE l_orderkey = o_orderkey AND l_orderkey > 9030 + GROUP BY o_orderstatus + HAVING sum(l_linenumber) > 1000 + ORDER BY o_orderstatus; + o_orderstatus | sum | avg +---------------+------+-------------------- + F | 8559 | 3.0126715945089757 + O | 8901 | 3.0050641458474004 +(2 rows) + +-- now, test the cases where Citus do or do not need to create +-- the master queries +SET citus.large_table_shard_count TO 2; +SET client_min_messages TO 'DEBUG2'; +SET citus.task_executor_type TO 'real-time'; +-- start with the simple lookup query +SELECT * + FROM articles + WHERE author_id = 1; +DEBUG: Creating router plan +DEBUG: Plan is router executable + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +-- below query hits a single shard, so no need to create the master query +SELECT * + FROM articles + WHERE author_id = 1 OR author_id = 17; +DEBUG: Creating router plan +DEBUG: Plan is router executable + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +-- below query hits two shards, so needs to create the master query +SELECT * + FROM articles + WHERE author_id = 1 OR author_id = 18; + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +-- rename the output columns on a no master query case +SELECT id as article_id, word_count * id as random_value + FROM articles + WHERE author_id = 1; +DEBUG: Creating router plan +DEBUG: Plan is router executable + article_id | random_value +------------+-------------- + 1 | 9572 + 11 | 14817 + 21 | 123690 + 31 | 225401 + 41 | 484374 +(5 rows) + +-- we can push down co-located joins to a single worker without the +-- master query being required for only the same tables +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles a, articles b + WHERE a.author_id = 10 and a.author_id = b.author_id + LIMIT 3; +DEBUG: Creating router plan +DEBUG: Plan is router executable + first_author | second_word_count +--------------+------------------- + 10 | 17277 + 10 | 1820 + 10 | 6363 +(3 rows) + +-- now show that JOINs with multiple tables are not router executable +-- they are executed by real-time executor +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles a, articles_single_shard b + WHERE a.author_id = 10 and a.author_id = b.author_id + LIMIT 3; +DEBUG: Creating router plan +DEBUG: Plan is router executable + first_author | second_word_count +--------------+------------------- + 10 | 19519 + 10 | 19519 + 10 | 19519 +(3 rows) + +-- do not create the master query for LIMIT on a single shard SELECT +SELECT * + FROM articles + WHERE author_id = 1 + LIMIT 2; +DEBUG: Creating router plan +DEBUG: Plan is router executable + id | author_id | title | word_count +----+-----------+----------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 +(2 rows) + +-- This query hits a single shard. So GROUP BY can be +-- pushed down to the workers directly. This query is +-- equivalent to SELECT DISTINCT on a single shard. +SELECT id + FROM articles + WHERE author_id = 1 + GROUP BY id + ORDER BY id; +DEBUG: Creating router plan +DEBUG: Plan is router executable + id +---- + 1 + 11 + 21 + 31 + 41 +(5 rows) + +-- copying from a single shard table does not require the master query +COPY articles_single_shard TO stdout; +DEBUG: Creating router plan +DEBUG: Plan is router executable +50 10 anjanette 19519 +-- error out for queries with aggregates +SELECT avg(word_count) + FROM articles + WHERE author_id = 2; +DEBUG: Creating router plan +DEBUG: Plan is router executable + avg +-------------------- + 12356.400000000000 +(1 row) + +-- max, min, sum, count is somehow implemented +-- differently in distributed planning +SELECT max(word_count) as max, min(word_count) as min, + sum(word_count) as sum, count(word_count) as cnt + FROM articles + WHERE author_id = 2; +DEBUG: Creating router plan +DEBUG: Plan is router executable + max | min | sum | cnt +-------+------+-------+----- + 18185 | 2728 | 61782 | 5 +(1 row) + +-- error out for queries with repartition jobs +SELECT * + FROM articles a, articles b + WHERE a.id = b.id AND a.author_id = 1; +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 7 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 11 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 14 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- system columns from shard tables can be queried and retrieved +SELECT count(*) FROM ( + SELECT tableoid, ctid, cmin, cmax, xmin, xmax + FROM articles + WHERE tableoid IS NOT NULL OR + ctid IS NOT NULL OR + cmin IS NOT NULL OR + cmax IS NOT NULL OR + xmin IS NOT NULL OR + xmax IS NOT NULL +) x; + count +------- + 50 +(1 row) + +SET client_min_messages to 'NOTICE'; diff --git a/src/test/regress/expected/multi_single_relation_subquery.out b/src/test/regress/expected/multi_single_relation_subquery.out index 090d81ca7..6dec98669 100644 --- a/src/test/regress/expected/multi_single_relation_subquery.out +++ b/src/test/regress/expected/multi_single_relation_subquery.out @@ -95,7 +95,7 @@ from group by suppkey_bin order by - avg_count desc + avg_count desc, suppkey_bin DESC limit 20; suppkey_bin | avg_count -------------+-------------------- @@ -179,7 +179,7 @@ from 1.00083402835696413678 (1 row) --- Check that we don't support subqueries with limit. +-- we don't support subqueries with limit. select l_suppkey, sum(suppkey_count) as total_suppkey_count @@ -195,19 +195,23 @@ from l_suppkey limit 100) as distributed_table group by - l_suppkey; + l_suppkey + ORDER BY 2 DESC, 1 DESC +LIMIT 5; ERROR: cannot perform distributed planning on this query DETAIL: Subqueries with limit are not supported yet -- Check that we don't support subqueries without aggregates. select - rounded_tax + DISTINCT rounded_tax from (select round(l_tax) as rounded_tax from lineitem group by - l_tax) as distributed_table; + l_tax) as distributed_table + ORDER BY 1 DESC + LIMIT 5; ERROR: cannot perform distributed planning on this query DETAIL: Subqueries without aggregates are not supported yet -- Check that we support subqueries with count(distinct). diff --git a/src/test/regress/expected/multi_subquery.out b/src/test/regress/expected/multi_subquery.out index 8bd197c4d..9e1421acf 100644 --- a/src/test/regress/expected/multi_subquery.out +++ b/src/test/regress/expected/multi_subquery.out @@ -83,8 +83,7 @@ SELECT count(*) FROM (SELECT l_orderkey FROM lineitem_subquery) UNION ALL (SELECT 1::bigint) ) b; -ERROR: cannot push down this subquery -DETAIL: Subqueries without a FROM clause are not supported with union operator +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT -- Check that we error out if queries in union do not include partition columns. SELECT count(*) FROM ( diff --git a/src/test/regress/expected/multi_subquery_complex_queries.out b/src/test/regress/expected/multi_subquery_complex_queries.out index 21bfd8555..bd92b5c0b 100644 --- a/src/test/regress/expected/multi_subquery_complex_queries.out +++ b/src/test/regress/expected/multi_subquery_complex_queries.out @@ -2270,8 +2270,8 @@ GROUP BY types ORDER BY types; -ERROR: cannot push down this subquery -DETAIL: Intersect and Except are currently unsupported +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- not supported due to offset SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType FROM @@ -2333,8 +2333,8 @@ GROUP BY types ORDER BY types; -ERROR: cannot push down this subquery -DETAIL: Offset clause is currently unsupported +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- not supported due to non relation rte SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType FROM @@ -2393,8 +2393,8 @@ GROUP BY types ORDER BY types; -ERROR: cannot push down this subquery -DETAIL: Subqueries without a FROM clause are not supported with union operator +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- similar to the above, but constant rte is on the right side of the query SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType FROM @@ -2448,6 +2448,6 @@ GROUP BY types ORDER BY types; -ERROR: cannot push down this subquery -DETAIL: Subqueries without a FROM clause are not supported with union operator +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. SET citus.enable_router_execution TO TRUE; diff --git a/src/test/regress/expected/multi_subquery_complex_reference_clause.out b/src/test/regress/expected/multi_subquery_complex_reference_clause.out index 97dd96cba..4b3d10889 100644 --- a/src/test/regress/expected/multi_subquery_complex_reference_clause.out +++ b/src/test/regress/expected/multi_subquery_complex_reference_clause.out @@ -230,8 +230,11 @@ SELECT count(*) FROM -- table function cannot be used without subquery pushdown SELECT count(*) FROM user_buy_test_table JOIN generate_series(1,10) AS users_ref_test_table(id) ON user_buy_test_table.item_id = users_ref_test_table.id; -ERROR: could not run distributed query with complex table expressions -HINT: Consider using an equality filter on the distributed table's partition column. + count +------- + 4 +(1 row) + -- table function can be the inner relationship in an outer join SELECT count(*) FROM (SELECT random() FROM user_buy_test_table LEFT JOIN generate_series(1,10) AS users_ref_test_table(id) @@ -262,21 +265,19 @@ DETAIL: There exist a table function in the outer part of the outer join SELECT count(*) FROM (SELECT random() FROM user_buy_test_table JOIN random() AS users_ref_test_table(id) ON user_buy_test_table.item_id > users_ref_test_table.id) subquery_1; -ERROR: cannot push down this subquery -DETAIL: Only immutable functions can be used as a table expressions in a multi-shard query +ERROR: cannot handle complex subqueries when the router executor is disabled -- cannot sneak in a volatile function as a parameter SELECT count(*) FROM (SELECT random() FROM user_buy_test_table JOIN generate_series(random()::int,10) AS users_ref_test_table(id) ON user_buy_test_table.item_id > users_ref_test_table.id) subquery_1; -ERROR: cannot push down this subquery -DETAIL: Only immutable functions can be used as a table expressions in a multi-shard query +ERROR: cannot handle complex subqueries when the router executor is disabled -- cannot perform a union with table function SELECT count(*) FROM (SELECT user_id FROM user_buy_test_table UNION ALL SELECT id FROM generate_series(1,10) AS users_ref_test_table(id)) subquery_1; -ERROR: cannot push down this subquery -DETAIL: Table functions are not supported with union operator +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- subquery without FROM can be the inner relationship in a join SELECT count(*) FROM (SELECT random() FROM user_buy_test_table JOIN (SELECT 4 AS id) users_ref_test_table @@ -312,8 +313,8 @@ SELECT count(*) FROM (SELECT user_id FROM user_buy_test_table UNION ALL SELECT id FROM (SELECT 5 AS id) users_ref_test_table) subquery_1; -ERROR: cannot push down this subquery -DETAIL: Subqueries without a FROM clause are not supported with union operator +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- should be able to pushdown since reference table is in the -- inner part of the left join SELECT @@ -983,8 +984,8 @@ INNER JOIN ON (t.user_id = q.user_id)) as final_query ORDER BY types; -ERROR: cannot push down this subquery -DETAIL: Reference tables are not supported with union operator +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- reference table exist in the subquery of union, should error out SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType FROM @@ -1056,8 +1057,8 @@ GROUP BY types ORDER BY types; -ERROR: cannot push down this subquery -DETAIL: Reference tables are not supported with union operator +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- -- Should error out with UNION ALL Queries on reference tables -- @@ -1111,8 +1112,8 @@ INNER JOIN WHERE value_1 > 2 and value_1 < 4) AS t ON (t.user_id = q.user_id)) as final_query GROUP BY types ORDER BY types; -ERROR: cannot push down this subquery -DETAIL: Reference tables are not supported with union operator +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- just a sanity check that we don't allow this if the reference table is on the -- left part of the left join SELECT count(*) FROM @@ -1483,7 +1484,7 @@ LIMIT 5; 5 (5 rows) --- should error out since there is a distributed table and +-- should recursively plan since -- there are no columns on the GROUP BY from the distributed table SELECT DISTINCT user_id @@ -1499,8 +1500,12 @@ JOIN ON (mx = user_id) ORDER BY 1 LIMIT 5; -ERROR: cannot push down this subquery -DETAIL: Group by list without partition column is currently unsupported + user_id +--------- + 1 + 4 +(2 rows) + ROLLBACK; -- should work since we're using an immutable function as recurring tuple SELECT @@ -1521,7 +1526,7 @@ LIMIT 5; 6 (2 rows) --- should not work since we're +-- should recursively plan since we're -- using an immutable function as recurring tuple -- along with a distributed table, where GROUP BY is -- on the recurring tuple @@ -1539,8 +1544,11 @@ JOIN ON (mx = user_id) ORDER BY 1 LIMIT 5; -ERROR: cannot push down this subquery -DETAIL: Group by list without partition column is currently unsupported + user_id +--------- + 6 +(1 row) + DROP TABLE user_buy_test_table; DROP TABLE users_ref_test_table; DROP TABLE users_return_test_table; diff --git a/src/test/regress/expected/multi_subquery_union.out b/src/test/regress/expected/multi_subquery_union.out index 4adbf441a..59c7181d1 100644 --- a/src/test/regress/expected/multi_subquery_union.out +++ b/src/test/regress/expected/multi_subquery_union.out @@ -47,8 +47,8 @@ FROM ( ) user_id ORDER BY 2 DESC,1 LIMIT 5; -ERROR: cannot pushdown this query -DETAIL: Reference tables are not allowed with set operations +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- the same query with union all SELECT user_id, counter FROM ( @@ -76,8 +76,8 @@ FROM ( ) user_id ORDER BY 2 DESC,1 LIMIT 5; -ERROR: cannot pushdown this query -DETAIL: Reference tables are not allowed with set operations +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- the same query with group by SELECT user_id, sum(counter) FROM ( @@ -212,8 +212,8 @@ FROM ( SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 5 and value_1 < 6 GROUP BY user_id HAVING sum(value_2) > 25 ) user_id GROUP BY user_id ORDER BY 1 DESC LIMIT 5; -ERROR: cannot pushdown this query -DETAIL: Reference tables are not allowed with set operations +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- similar query as above, with UNION ALL SELECT sum(counter) FROM ( @@ -329,8 +329,8 @@ FROM ( user_id)) AS ftop ORDER BY 2 DESC, 1 DESC LIMIT 5; -ERROR: cannot pushdown this query -DETAIL: Reference tables are not allowed with set operations +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- top level unions are wrapped into top level aggregations SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType FROM @@ -597,8 +597,8 @@ FROM UNION ALL (SELECT user_id FROM events_reference_table) ) b; -ERROR: cannot pushdown this query -DETAIL: Reference tables are not allowed with set operations +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- similar query without top level agg SELECT user_id @@ -746,8 +746,8 @@ FROM ( SELECT value_1 as user_id, sum(value_2) AS counter FROM users_table GROUP BY value_1 ) user_id GROUP BY user_id; -ERROR: cannot pushdown the subquery since not all subqueries in the UNION have the partition column in the same position -DETAIL: Each leaf query of the UNION should return the partition column in the same position and all joins must be on the partition column +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- partition key is not selected SELECT sum(counter) FROM ( @@ -786,8 +786,8 @@ UNION ) user_id_2 GROUP BY user_id) ) as ftop; -ERROR: cannot push down this subquery -DETAIL: Intersect and Except are currently unsupported +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- non-equi join are not supported since there is no equivalence between the partition column SELECT user_id, sum(counter) FROM ( @@ -943,8 +943,8 @@ FROM ( SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id OFFSET 4 ) user_id GROUP BY user_id; -ERROR: cannot push down this subquery -DETAIL: Offset clause is currently unsupported +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- lower level union does not return partition key with the other relations SELECT * FROM ( @@ -1039,8 +1039,8 @@ FROM UNION ALL (SELECT 1) ) b; -ERROR: cannot push down this subquery -DETAIL: Subqueries without a FROM clause are not supported with union operator +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- we don't support subqueries without relations SELECT * @@ -1050,8 +1050,8 @@ FROM UNION ALL (SELECT (random() * 100)::int) ) b; -ERROR: cannot push down this subquery -DETAIL: Subqueries without a FROM clause are not supported with union operator +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- we don't support subqueries without relations SELECT user_id, value_3 @@ -1071,8 +1071,8 @@ FROM ) b ORDER BY 1 DESC, 2 DESC LIMIT 5; -ERROR: cannot push down this subquery -DETAIL: Subqueries without a FROM clause are not supported with union operator +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType FROM ( SELECT *, random() @@ -1115,8 +1115,8 @@ FROM ) as final_query GROUP BY types ORDER BY types; -ERROR: cannot push down this subquery -DETAIL: Subqueries without a FROM clause are not supported with union operator +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. SET citus.enable_router_execution TO true; DROP TABLE events_reference_table; DROP TABLE users_reference_table; diff --git a/src/test/regress/expected/multi_subquery_window_functions.out b/src/test/regress/expected/multi_subquery_window_functions.out index 6e5a6d0e7..b43e97745 100644 --- a/src/test/regress/expected/multi_subquery_window_functions.out +++ b/src/test/regress/expected/multi_subquery_window_functions.out @@ -790,8 +790,8 @@ ORDER BY 3 DESC, 1 DESC, 2 DESC LIMIT 10; -ERROR: cannot push down this subquery -DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- user needs to supply partition by which should -- include the distribution key SELECT @@ -808,8 +808,8 @@ ORDER BY 3 DESC, 1 DESC, 2 DESC LIMIT 10; -ERROR: cannot push down this subquery -DETAIL: Window functions without PARTITION BY on distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- user needs to supply partition by which should -- include the distribution key SELECT @@ -826,8 +826,8 @@ ORDER BY 3 DESC, 1 DESC, 2 DESC LIMIT 10; -ERROR: cannot push down this subquery -DETAIL: Window functions without PARTITION BY on distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- w2 should not be pushed down SELECT * FROM ( @@ -843,8 +843,8 @@ SELECT * FROM ) as foo ORDER BY 3 DESC, 1 DESC, 2 DESC NULLS LAST LIMIT 10; -ERROR: cannot push down this subquery -DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- w2 should not be pushed down SELECT * FROM ( @@ -862,8 +862,8 @@ ORDER BY 3 DESC, 1 DESC, 2 DESC NULLS LAST LIMIT 10; -ERROR: cannot push down this subquery -DETAIL: Window functions without PARTITION BY on distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- GROUP BY includes the partition key, but not the WINDOW function SELECT user_id, time, my_rank @@ -883,8 +883,8 @@ ORDER BY 3 DESC, 1 DESC,2 DESC LIMIT 10; -ERROR: cannot push down this subquery -DETAIL: Window functions without PARTITION BY on distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- GROUP BY includes the partition key, but not the WINDOW function SELECT user_id, time, my_rank @@ -904,8 +904,8 @@ ORDER BY 3 DESC, 1 DESC,2 DESC LIMIT 10; -ERROR: cannot push down this subquery -DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- Overriding window function but not supported SELECT * FROM ( SELECT @@ -923,8 +923,8 @@ SELECT * FROM ( ) a ORDER BY 1,2,3; -ERROR: cannot push down this subquery -DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- Aggregate function on distribution column should error out SELECT * FROM ( SELECT @@ -936,8 +936,8 @@ SELECT * FROM ( ) a ORDER BY 1 DESC, 2 DESC; -ERROR: cannot push down this subquery -DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- test with reference table partitioned on only a column from reference table SELECT * FROM @@ -951,8 +951,8 @@ ORDER BY 1, 2, 3 LIMIT 20; -ERROR: cannot push down this subquery -DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- UNION ALL with only one of them is not partitioned over distribution column which -- should not be allowed. SELECT @@ -974,8 +974,8 @@ FROM GROUP BY user_id ORDER BY 1 DESC LIMIT 5; -ERROR: cannot push down this subquery -DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- UNION with only one subquery which has a partition on non-distribution column should -- error out SELECT * @@ -1011,6 +1011,6 @@ FROM ( user_id)) AS ftop ORDER BY 2 DESC, 1 DESC LIMIT 5; -ERROR: cannot push down this subquery -DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column DROP VIEW subq; diff --git a/src/test/regress/expected/multi_view.out b/src/test/regress/expected/multi_view.out index cb7c258ee..c09516ef9 100644 --- a/src/test/regress/expected/multi_view.out +++ b/src/test/regress/expected/multi_view.out @@ -794,6 +794,7 @@ EXPLAIN (COSTS FALSE) SELECT * EXPLAIN (COSTS FALSE) SELECT et.* FROM recent_10_users JOIN events_table et USING(user_id) ORDER BY et.time DESC LIMIT 10; ERROR: cannot push down this subquery DETAIL: Limit in subquery is currently unsupported + -> Distributed Subplan 83_1 SET citus.subquery_pushdown to ON; EXPLAIN (COSTS FALSE) SELECT et.* FROM recent_10_users JOIN events_table et USING(user_id) ORDER BY et.time DESC LIMIT 10; QUERY PLAN diff --git a/src/test/regress/expected/subquery_complex_target_list.out b/src/test/regress/expected/subquery_complex_target_list.out new file mode 100644 index 000000000..32cf90fcb --- /dev/null +++ b/src/test/regress/expected/subquery_complex_target_list.out @@ -0,0 +1,389 @@ +-- =================================================================== +-- test recursive planning functionality with complex target entries +-- and some utilities +-- =================================================================== +CREATE SCHEMA subquery_complex; +SET search_path TO subquery_complex, public; +SET client_min_messages TO DEBUG1; +-- COUNT DISTINCT at the top level query +SELECT + event_type, count(distinct value_2) +FROM + events_table +WHERE + user_id IN (SELECT user_id FROM users_table GROUP BY user_id ORDER BY count(*) DESC LIMIT 20) +GROUP BY + event_type +ORDER BY 1 DESC, 2 DESC +LIMIT 3; +DEBUG: push down of limit count: 20 +DEBUG: generating subplan 1_1 for subquery SELECT user_id FROM public.users_table GROUP BY user_id ORDER BY (count(*)) DESC LIMIT 20 + event_type | count +------------+------- + 6 | 1 + 5 | 3 + 4 | 6 +(3 rows) + +-- aggregate distinct in the subqueries + -- avg distinct on partition key + -- count distinct on partition key + -- count distinct on non-partition key + -- sum distinct on non-partition key when group by is partition key + -- and the final query is real-time query +SELECT + DISTINCT ON (avg) avg, cnt_1, cnt_2, sum +FROM + ( + SELECT avg(distinct user_id) as avg FROM users_table ORDER BY 1 DESC LIMIT 3 + ) as foo, + ( + SELECT count(distinct user_id) as cnt_1 FROM users_table ORDER BY 1 DESC LIMIT 3 + ) as bar, + ( + SELECT count(distinct value_2) as cnt_2 FROM users_table ORDER BY 1 DESC LIMIT 4 + ) as baz, + ( + SELECT user_id, sum(distinct value_2) as sum FROM users_table GROUP BY user_id ORDER BY 1 DESC LIMIT 4 + ) as bat, events_table + WHERE foo.avg != bar.cnt_1 AND baz.cnt_2 = events_table.event_type + ORDER BY 1 DESC; +DEBUG: push down of limit count: 3 +DEBUG: generating subplan 3_1 for subquery SELECT avg(DISTINCT user_id) AS avg FROM public.users_table ORDER BY (avg(DISTINCT user_id)) DESC LIMIT 3 +DEBUG: push down of limit count: 3 +DEBUG: generating subplan 3_2 for subquery SELECT count(DISTINCT user_id) AS cnt_1 FROM public.users_table ORDER BY (count(DISTINCT user_id)) DESC LIMIT 3 +DEBUG: generating subplan 3_3 for subquery SELECT count(DISTINCT value_2) AS cnt_2 FROM public.users_table ORDER BY (count(DISTINCT value_2)) DESC LIMIT 4 +DEBUG: push down of limit count: 4 +DEBUG: generating subplan 3_4 for subquery SELECT user_id, sum(DISTINCT value_2) AS sum FROM public.users_table GROUP BY user_id ORDER BY user_id DESC LIMIT 4 + avg | cnt_1 | cnt_2 | sum +--------------------+-------+-------+----- + 3.5000000000000000 | 6 | 6 | 10 +(1 row) + +-- Aggregate type conversions inside the subqueries +SELECT + * +FROM + ( + SELECT + min(user_id) * 2, max(user_id) / 2, sum(user_id), count(user_id)::float, avg(user_id)::bigint + FROM + users_table + ORDER BY 1 DESC + LIMIT 3 + ) as foo, + ( + SELECT + min(value_3) * 2, max(value_3) / 2, sum(value_3), count(value_3), avg(value_3) + FROM + users_table + ORDER BY 1 DESC + LIMIT 3 + ) as bar, + ( + SELECT + min(time), max(time), count(time), + count(*) FILTER (WHERE user_id = 3) as cnt_with_filter, + count(*) FILTER (WHERE user_id::text LIKE '%3%') as cnt_with_filter_2 + FROM + users_table + ORDER BY 1 DESC + LIMIT 3 + ) as baz + ORDER BY 1 DESC; +DEBUG: push down of limit count: 3 +DEBUG: generating subplan 8_1 for subquery SELECT (min(user_id) * 2), (max(user_id) / 2), sum(user_id) AS sum, (count(user_id))::double precision AS count, (avg(user_id))::bigint AS avg FROM public.users_table ORDER BY (min(user_id) * 2) DESC LIMIT 3 +DEBUG: push down of limit count: 3 +DEBUG: generating subplan 8_2 for subquery SELECT (min(value_3) * (2)::double precision), (max(value_3) / (2)::double precision), sum(value_3) AS sum, count(value_3) AS count, avg(value_3) AS avg FROM public.users_table ORDER BY (min(value_3) * (2)::double precision) DESC LIMIT 3 +DEBUG: push down of limit count: 3 +DEBUG: generating subplan 8_3 for subquery SELECT min("time") AS min, max("time") AS max, count("time") AS count, count(*) FILTER (WHERE (user_id = 3)) AS cnt_with_filter, count(*) FILTER (WHERE ((user_id)::text ~~ '%3%'::text)) AS cnt_with_filter_2 FROM public.users_table ORDER BY (min("time")) DESC LIMIT 3 + ?column? | ?column? | sum | count | avg | ?column? | ?column? | sum | count | avg | min | max | count | cnt_with_filter | cnt_with_filter_2 +----------+----------+-----+-------+-----+----------+----------+-----+-------+-----------------+---------------------------------+---------------------------------+-------+-----------------+------------------- + 2 | 3 | 376 | 101 | 4 | 0 | 2.5 | 273 | 101 | 2.7029702970297 | Wed Nov 22 18:19:49.944985 2017 | Thu Nov 23 17:30:34.635085 2017 | 101 | 17 | 17 +(1 row) + +-- Expressions inside the aggregates +-- parts of the query is inspired by TPCH queries +SELECT + DISTINCT ON (avg) avg, cnt_1, cnt_2, cnt_3, sum_1,l_year, pos, count_pay +FROM + ( + SELECT avg(user_id * (5.0 / (value_1 + 0.1))) as avg FROM users_table ORDER BY 1 DESC LIMIT 3 + ) as foo, + ( + SELECT sum(user_id * (5.0 / (value_1 + value_2 + 0.1)) * value_3) as cnt_1 FROM users_table ORDER BY 1 DESC LIMIT 3 + ) as bar, + ( + SELECT + avg(case + when user_id > 4 + then value_1 + end) as cnt_2, + avg(case + when user_id > 500 + then value_1 + end) as cnt_3, + sum(case + when value_1 = 1 + OR value_2 = 1 + then 1 + else 0 + end) as sum_1, + extract(year FROM max(time)) as l_year, + strpos(max(user_id)::text, '1') as pos + FROM + users_table + ORDER BY + 1 DESC + LIMIT 4 + ) as baz, + ( + SELECT COALESCE(value_3, 20) AS count_pay FROM users_table ORDER BY 1 OFFSET 20 LIMIT 5 + ) as tar, + events_table + WHERE foo.avg != bar.cnt_1 AND baz.cnt_2 != events_table.event_type + ORDER BY 1 DESC; +DEBUG: push down of limit count: 3 +DEBUG: generating subplan 12_1 for subquery SELECT avg(((user_id)::numeric * (5.0 / ((value_1)::numeric + 0.1)))) AS avg FROM public.users_table ORDER BY (avg(((user_id)::numeric * (5.0 / ((value_1)::numeric + 0.1))))) DESC LIMIT 3 +DEBUG: push down of limit count: 3 +DEBUG: generating subplan 12_2 for subquery SELECT sum(((((user_id)::numeric * (5.0 / (((value_1 + value_2))::numeric + 0.1))))::double precision * value_3)) AS cnt_1 FROM public.users_table ORDER BY (sum(((((user_id)::numeric * (5.0 / (((value_1 + value_2))::numeric + 0.1))))::double precision * value_3))) DESC LIMIT 3 +DEBUG: push down of limit count: 4 +DEBUG: generating subplan 12_3 for subquery SELECT avg(CASE WHEN (user_id > 4) THEN value_1 ELSE NULL::integer END) AS cnt_2, avg(CASE WHEN (user_id > 500) THEN value_1 ELSE NULL::integer END) AS cnt_3, sum(CASE WHEN ((value_1 = 1) OR (value_2 = 1)) THEN 1 ELSE 0 END) AS sum_1, date_part('year'::text, max("time")) AS l_year, strpos((max(user_id))::text, '1'::text) AS pos FROM public.users_table ORDER BY (avg(CASE WHEN (user_id > 4) THEN value_1 ELSE NULL::integer END)) DESC LIMIT 4 +DEBUG: push down of limit count: 25 +DEBUG: generating subplan 12_4 for subquery SELECT COALESCE(value_3, (20)::double precision) AS count_pay FROM public.users_table ORDER BY COALESCE(value_3, (20)::double precision) OFFSET 20 LIMIT 5 + avg | cnt_1 | cnt_2 | cnt_3 | sum_1 | l_year | pos | count_pay +-------------------------+------------------+--------------------+-------+-------+--------+-----+----------- + 30.14666771571734992301 | 3308.14619815793 | 2.5000000000000000 | | 31 | 2017 | 0 | 1 +(1 row) + +-- Multiple columns in GROUP BYs +-- foo needs to be recursively planned, bar can be pushded down +SELECT + DISTINCT ON (avg) avg, avg2 +FROM + ( + SELECT avg(value_3) as avg FROM users_table GROUP BY value_1, value_2 + ) as foo, + ( + SELECT avg(value_3) as avg2 FROM users_table GROUP BY value_1, value_2, user_id + ) as bar + WHERE foo.avg = bar.avg2 + ORDER BY 1 DESC, 2 DESC + LIMIT 3; +DEBUG: generating subplan 17_1 for subquery SELECT avg(value_3) AS avg FROM public.users_table GROUP BY value_1, value_2 +DEBUG: push down of limit count: 3 + avg | avg2 +-----+------ + 5 | 5 + 4 | 4 + 3.5 | 3.5 +(3 rows) + +-- HAVING and ORDER BY tests +SELECT a.user_id, b.value_2, c.avg +FROM ( + SELECT + user_id + FROM + users_table + WHERE + (value_1 > 2) + GROUP BY + user_id + HAVING + count(distinct value_1) > 2 + ORDER BY 1 DESC + LIMIT 3 + ) as a, + ( + SELECT + value_2 + FROM + users_table + WHERE + (value_1 > 2) + GROUP BY + value_2 + HAVING + count(distinct value_1) > 2 + ORDER BY 1 DESC + LIMIT 3 + ) as b, + ( + SELECT + avg(user_id) as avg + FROM + users_table + WHERE + (value_1 > 2) + GROUP BY + value_2 + HAVING + sum(value_1) > 10 + ORDER BY (sum(value_3) - avg(value_1) - COALESCE(array_upper(ARRAY[max(user_id)],1) * 5,0)) DESC + LIMIT 3 + ) as c + WHERE b.value_2 != a.user_id + ORDER BY 3 DESC, 2 DESC, 1 DESC + LIMIT 5; +DEBUG: push down of limit count: 3 +DEBUG: generating subplan 19_1 for subquery SELECT user_id FROM public.users_table WHERE (value_1 > 2) GROUP BY user_id HAVING (count(DISTINCT value_1) > 2) ORDER BY user_id DESC LIMIT 3 +DEBUG: generating subplan 19_2 for subquery SELECT value_2 FROM public.users_table WHERE (value_1 > 2) GROUP BY value_2 HAVING (count(DISTINCT value_1) > 2) ORDER BY value_2 DESC LIMIT 3 +DEBUG: generating subplan 19_3 for subquery SELECT avg(user_id) AS avg FROM public.users_table WHERE (value_1 > 2) GROUP BY value_2 HAVING (sum(value_1) > 10) ORDER BY ((sum(value_3) - (avg(value_1))::double precision) - (COALESCE((array_upper(ARRAY[max(user_id)], 1) * 5), 0))::double precision) DESC LIMIT 3 + user_id | value_2 | avg +---------+---------+-------------------- + 4 | 5 | 4.1666666666666667 + 3 | 5 | 4.1666666666666667 + 5 | 4 | 4.1666666666666667 + 3 | 4 | 4.1666666666666667 + 5 | 3 | 4.1666666666666667 +(5 rows) + +-- zero shard subquery joined with a regular one +SELECT + bar.user_id +FROM + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND false AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as bar + WHERE foo.user_id > bar.user_id + ORDER BY 1 DESC; +DEBUG: push down of limit count: 5 +DEBUG: generating subplan 23_1 for subquery SELECT DISTINCT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id = events_table.user_id) AND (events_table.event_type = ANY (ARRAY[1, 2, 3, 4]))) ORDER BY users_table.user_id DESC LIMIT 5 +DEBUG: generating subplan 23_2 for subquery SELECT DISTINCT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id = events_table.user_id) AND false AND (events_table.event_type = ANY (ARRAY[1, 2, 3, 4]))) ORDER BY users_table.user_id DESC LIMIT 5 + user_id +--------- +(0 rows) + +-- window functions tests, both is recursively planned +SELECT * FROM +( + SELECT + user_id, time, rnk + FROM + ( + SELECT * FROM ( + SELECT + *, rank() OVER my_win as rnk + FROM + events_table + WINDOW my_win AS (PARTITION BY user_id ORDER BY time DESC) + ORDER BY rnk DESC + ) as foo_inner + LIMIT 4 + ) as foo + ORDER BY + 3 DESC, 1 DESC, 2 DESC +) foo, +( + SELECT + user_id, time, rnk + FROM + ( + SELECT + *, rank() OVER my_win as rnk + FROM + events_table + WHERE + user_id = 3 + WINDOW my_win AS (PARTITION BY event_type ORDER BY time DESC) + ) as foo + ORDER BY + 3 DESC, 1 DESC, 2 DESC +) bar WHERE foo.user_id = bar.user_id +ORDER BY foo.rnk DESC, foo.time DESC, bar.time LIMIT 5; +DEBUG: push down of limit count: 4 +DEBUG: generating subplan 26_1 for subquery SELECT user_id, "time", event_type, value_2, value_3, value_4, rnk FROM (SELECT events_table.user_id, events_table."time", events_table.event_type, events_table.value_2, events_table.value_3, events_table.value_4, rank() OVER my_win AS rnk FROM public.events_table WINDOW my_win AS (PARTITION BY events_table.user_id ORDER BY events_table."time" DESC) ORDER BY (rank() OVER my_win) DESC) foo_inner LIMIT 4 +DEBUG: generating subplan 26_2 for subquery SELECT user_id, "time", event_type, value_2, value_3, value_4, rank() OVER my_win AS rnk FROM public.events_table WHERE (user_id = 3) WINDOW my_win AS (PARTITION BY event_type ORDER BY "time" DESC) + user_id | time | rnk | user_id | time | rnk +---------+------+-----+---------+------+----- +(0 rows) + +-- cursor test +BEGIN; + + DECLARE recursive_subquery CURSOR FOR + SELECT + event_type, count(distinct value_2) + FROM + events_table + WHERE + user_id IN (SELECT user_id FROM users_table GROUP BY user_id ORDER BY count(*) DESC LIMIT 20) + GROUP BY + event_type + ORDER BY 1 DESC, 2 DESC + LIMIT 3; +DEBUG: push down of limit count: 20 +DEBUG: generating subplan 29_1 for subquery SELECT user_id FROM public.users_table GROUP BY user_id ORDER BY (count(*)) DESC LIMIT 20 + FETCH 1 FROM recursive_subquery; + event_type | count +------------+------- + 6 | 1 +(1 row) + + FETCH 1 FROM recursive_subquery; + event_type | count +------------+------- + 5 | 3 +(1 row) + + FETCH 1 FROM recursive_subquery; + event_type | count +------------+------- + 4 | 6 +(1 row) + + FETCH 1 FROM recursive_subquery; + event_type | count +------------+------- +(0 rows) + +COMMIT; +-- cursor test with FETCH ALL +BEGIN; + + DECLARE recursive_subquery CURSOR FOR + SELECT + event_type, count(distinct value_2) + FROM + events_table + WHERE + user_id IN (SELECT user_id FROM users_table GROUP BY user_id ORDER BY count(*) DESC LIMIT 20) + GROUP BY + event_type + ORDER BY 1 DESC, 2 DESC + LIMIT 3; +DEBUG: push down of limit count: 20 +DEBUG: generating subplan 31_1 for subquery SELECT user_id FROM public.users_table GROUP BY user_id ORDER BY (count(*)) DESC LIMIT 20 + FETCH ALL FROM recursive_subquery; + event_type | count +------------+------- + 6 | 1 + 5 | 3 + 4 | 6 +(3 rows) + + FETCH ALL FROM recursive_subquery; + event_type | count +------------+------- +(0 rows) + +COMMIT; +SET client_min_messages TO DEFAULT; +DROP SCHEMA subquery_complex CASCADE; +SET search_path TO public; diff --git a/src/test/regress/expected/with_basics.out b/src/test/regress/expected/with_basics.out index 5172ad302..86dab119c 100644 --- a/src/test/regress/expected/with_basics.out +++ b/src/test/regress/expected/with_basics.out @@ -218,8 +218,20 @@ FROM (SELECT min(user_id) AS user_id FROM top_users) top_users JOIN users_table USING (user_id); -ERROR: cannot push down this subquery -DETAIL: Aggregates without group by are currently unsupported + user_id +--------- + 6 + 6 + 6 + 6 + 6 + 6 + 6 + 6 + 6 + 6 +(10 rows) + -- FOR UPDATE in subquery on CTE WITH top_users AS ( SELECT user_id, value_2 FROM users_table ORDER BY user_id DESC LIMIT 10 @@ -257,8 +269,15 @@ ORDER BY user_id LIMIT 5; -ERROR: cannot push down this subquery -DETAIL: Limit in subquery is currently unsupported + user_id +--------- + 6 + 6 + 6 + 6 + 6 +(5 rows) + -- OFFSET in subquery on CTE WITH top_users AS ( SELECT user_id, value_2 FROM users_table ORDER BY user_id DESC LIMIT 10 @@ -273,8 +292,15 @@ ORDER BY user_id LIMIT 5; -ERROR: cannot push down this subquery -DETAIL: Offset clause is currently unsupported + user_id +--------- + 6 + 6 + 6 + 6 + 6 +(5 rows) + -- Unsupported join in CTE WITH top_users AS ( SELECT DISTINCT e.user_id FROM users_table u JOIN events_table e ON (u.user_id = e.user_id AND u.value_1 > e.value_2) @@ -330,8 +356,8 @@ ORDER BY user_id LIMIT 5; -ERROR: could not run distributed query with window functions -HINT: Consider using an equality filter on the distributed table's partition column. +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions inside a subquery with a PARTITION BY clause containing the distribution column -- Window functions that partition by the distribution column in subqueries in CTEs are ok WITH top_users AS (SELECT * @@ -434,8 +460,8 @@ ORDER BY 1,2,3,4,5,6 LIMIT 10; -ERROR: cannot push down this subquery -DETAIL: Complex subqueries and CTEs are not supported within a UNION +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. SELECT * FROM ( SELECT * FROM (WITH cte AS ( SELECT * FROM users_table @@ -446,8 +472,8 @@ ORDER BY 1,2,3,4,5,6 LIMIT 10; -ERROR: cannot push down this subquery -DETAIL: CTEs in subqueries are currently unsupported +ERROR: could not run distributed query with UNION, INTERSECT, or EXCEPT +HINT: Consider using an equality filter on the distributed table's partition column. -- SELECT * FROM (SELECT * FROM cte UNION SELECT * FROM cte) a; should work WITH cte AS ( SELECT * FROM users_table WHERE user_id IN (1, 2) @@ -530,8 +556,7 @@ WHERE SELECT user_id FROM users_table WHERE user_id>1 ) SELECT * FROM basic_recursive ORDER BY user_id LIMIT 1); -ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys -DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. +ERROR: recursive CTEs are not supported in distributed queries -- one recursive one regular CTE should error out WITH RECURSIVE basic_recursive(x) AS( VALUES (1) diff --git a/src/test/regress/expected/with_where.out b/src/test/regress/expected/with_where.out index 03fd85f8a..b13df9f97 100644 --- a/src/test/regress/expected/with_where.out +++ b/src/test/regress/expected/with_where.out @@ -141,8 +141,11 @@ IN 1) SELECT * FROM events LIMIT 10 ); -ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys -DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. + count +------- + 101 +(1 row) + -- CTE with non-colocated join in WHERE SELECT count(*) @@ -164,5 +167,8 @@ WHERE ) SELECT * FROM users LIMIT 10 ); -ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys -DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. + count +------- + 101 +(1 row) + diff --git a/src/test/regress/sql/multi_mx_router_planner.sql b/src/test/regress/sql/multi_mx_router_planner.sql index 7c0d840ad..b2b97040f 100644 --- a/src/test/regress/sql/multi_mx_router_planner.sql +++ b/src/test/regress/sql/multi_mx_router_planner.sql @@ -216,8 +216,8 @@ SELECT * FROM articles_hash_mx, position('om' in 'Thomas') WHERE author_id = 1; SELECT * FROM articles_hash_mx, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 3; --- they are not supported if multiple workers are involved -SELECT * FROM articles_hash_mx, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 2; +-- they are supported via (sub)query pushdown if multiple workers are involved +SELECT * FROM articles_hash_mx, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 2 ORDER BY 4 DESC, 1 DESC, 2 DESC LIMIT 5; -- subqueries are supported in FROM clause but they are not router plannable SELECT articles_hash_mx.id,test.word_count diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql index 1e1707dc9..01a6848cc 100644 --- a/src/test/regress/sql/multi_router_planner.sql +++ b/src/test/regress/sql/multi_router_planner.sql @@ -280,8 +280,8 @@ SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1; SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 3; --- they are not supported if multiple workers are involved -SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 2; +-- they are supported via (sub)query pushdown if multiple workers are involved +SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 2 ORDER BY 4 DESC, 1 DESC, 2 DESC LIMIT 5; -- unless the query can be transformed into a join SELECT * FROM articles_hash diff --git a/src/test/regress/sql/multi_simple_queries.sql b/src/test/regress/sql/multi_simple_queries.sql index 4d8396885..1d1fdcbca 100644 --- a/src/test/regress/sql/multi_simple_queries.sql +++ b/src/test/regress/sql/multi_simple_queries.sql @@ -124,8 +124,8 @@ SELECT * FROM articles WHERE author_id = 2; WITH long_names AS ( SELECT id FROM authors WHERE char_length(name) > 15 ) SELECT title FROM articles ORDER BY 1 LIMIT 5; --- queries which involve functions in FROM clause are unsupported. -SELECT * FROM articles, position('om' in 'Thomas'); +-- queries which involve functions in FROM clause are recursively planned +SELECT * FROM articles, position('om' in 'Thomas') ORDER BY 2 DESC, 1 DESC, 3 DESC LIMIT 5; -- subqueries are not supported in WHERE clause in Citus SELECT * FROM articles WHERE author_id IN (SELECT id FROM authors WHERE name LIKE '%a'); diff --git a/src/test/regress/sql/multi_single_relation_subquery.sql b/src/test/regress/sql/multi_single_relation_subquery.sql index 3c034a7a8..48d52b6f1 100644 --- a/src/test/regress/sql/multi_single_relation_subquery.sql +++ b/src/test/regress/sql/multi_single_relation_subquery.sql @@ -74,7 +74,7 @@ from group by suppkey_bin order by - avg_count desc + avg_count desc, suppkey_bin DESC limit 20; select @@ -122,8 +122,7 @@ from (l_orderkey/4)::int, l_suppkey ) as distributed_table; --- Check that we don't support subqueries with limit. - +-- we don't support subqueries with limit. select l_suppkey, sum(suppkey_count) as total_suppkey_count @@ -139,19 +138,23 @@ from l_suppkey limit 100) as distributed_table group by - l_suppkey; + l_suppkey + ORDER BY 2 DESC, 1 DESC +LIMIT 5; -- Check that we don't support subqueries without aggregates. select - rounded_tax + DISTINCT rounded_tax from (select round(l_tax) as rounded_tax from lineitem group by - l_tax) as distributed_table; + l_tax) as distributed_table + ORDER BY 1 DESC + LIMIT 5; -- Check that we support subqueries with count(distinct). diff --git a/src/test/regress/sql/multi_subquery_complex_reference_clause.sql b/src/test/regress/sql/multi_subquery_complex_reference_clause.sql index c7949d742..842d81415 100644 --- a/src/test/regress/sql/multi_subquery_complex_reference_clause.sql +++ b/src/test/regress/sql/multi_subquery_complex_reference_clause.sql @@ -1172,7 +1172,7 @@ FROM ORDER BY 1 LIMIT 5; --- should error out since there is a distributed table and +-- should recursively plan since -- there are no columns on the GROUP BY from the distributed table SELECT DISTINCT user_id @@ -1206,7 +1206,7 @@ ORDER BY 1 LIMIT 5; --- should not work since we're +-- should recursively plan since we're -- using an immutable function as recurring tuple -- along with a distributed table, where GROUP BY is -- on the recurring tuple diff --git a/src/test/regress/sql/subquery_complex_target_list.sql b/src/test/regress/sql/subquery_complex_target_list.sql new file mode 100644 index 000000000..431f57b00 --- /dev/null +++ b/src/test/regress/sql/subquery_complex_target_list.sql @@ -0,0 +1,293 @@ +-- =================================================================== +-- test recursive planning functionality with complex target entries +-- and some utilities +-- =================================================================== +CREATE SCHEMA subquery_complex; +SET search_path TO subquery_complex, public; + +SET client_min_messages TO DEBUG1; + +-- COUNT DISTINCT at the top level query +SELECT + event_type, count(distinct value_2) +FROM + events_table +WHERE + user_id IN (SELECT user_id FROM users_table GROUP BY user_id ORDER BY count(*) DESC LIMIT 20) +GROUP BY + event_type +ORDER BY 1 DESC, 2 DESC +LIMIT 3; + + +-- aggregate distinct in the subqueries + -- avg distinct on partition key + -- count distinct on partition key + -- count distinct on non-partition key + -- sum distinct on non-partition key when group by is partition key + -- and the final query is real-time query +SELECT + DISTINCT ON (avg) avg, cnt_1, cnt_2, sum +FROM + ( + SELECT avg(distinct user_id) as avg FROM users_table ORDER BY 1 DESC LIMIT 3 + ) as foo, + ( + SELECT count(distinct user_id) as cnt_1 FROM users_table ORDER BY 1 DESC LIMIT 3 + ) as bar, + ( + SELECT count(distinct value_2) as cnt_2 FROM users_table ORDER BY 1 DESC LIMIT 4 + ) as baz, + ( + SELECT user_id, sum(distinct value_2) as sum FROM users_table GROUP BY user_id ORDER BY 1 DESC LIMIT 4 + ) as bat, events_table + WHERE foo.avg != bar.cnt_1 AND baz.cnt_2 = events_table.event_type + ORDER BY 1 DESC; + +-- Aggregate type conversions inside the subqueries +SELECT + * +FROM + ( + SELECT + min(user_id) * 2, max(user_id) / 2, sum(user_id), count(user_id)::float, avg(user_id)::bigint + FROM + users_table + ORDER BY 1 DESC + LIMIT 3 + ) as foo, + ( + SELECT + min(value_3) * 2, max(value_3) / 2, sum(value_3), count(value_3), avg(value_3) + FROM + users_table + ORDER BY 1 DESC + LIMIT 3 + ) as bar, + ( + SELECT + min(time), max(time), count(time), + count(*) FILTER (WHERE user_id = 3) as cnt_with_filter, + count(*) FILTER (WHERE user_id::text LIKE '%3%') as cnt_with_filter_2 + FROM + users_table + ORDER BY 1 DESC + LIMIT 3 + ) as baz + ORDER BY 1 DESC; + +-- Expressions inside the aggregates +-- parts of the query is inspired by TPCH queries +SELECT + DISTINCT ON (avg) avg, cnt_1, cnt_2, cnt_3, sum_1,l_year, pos, count_pay +FROM + ( + SELECT avg(user_id * (5.0 / (value_1 + 0.1))) as avg FROM users_table ORDER BY 1 DESC LIMIT 3 + ) as foo, + ( + SELECT sum(user_id * (5.0 / (value_1 + value_2 + 0.1)) * value_3) as cnt_1 FROM users_table ORDER BY 1 DESC LIMIT 3 + ) as bar, + ( + SELECT + avg(case + when user_id > 4 + then value_1 + end) as cnt_2, + avg(case + when user_id > 500 + then value_1 + end) as cnt_3, + sum(case + when value_1 = 1 + OR value_2 = 1 + then 1 + else 0 + end) as sum_1, + extract(year FROM max(time)) as l_year, + strpos(max(user_id)::text, '1') as pos + FROM + users_table + ORDER BY + 1 DESC + LIMIT 4 + ) as baz, + ( + SELECT COALESCE(value_3, 20) AS count_pay FROM users_table ORDER BY 1 OFFSET 20 LIMIT 5 + ) as tar, + events_table + WHERE foo.avg != bar.cnt_1 AND baz.cnt_2 != events_table.event_type + ORDER BY 1 DESC; + +-- Multiple columns in GROUP BYs +-- foo needs to be recursively planned, bar can be pushded down +SELECT + DISTINCT ON (avg) avg, avg2 +FROM + ( + SELECT avg(value_3) as avg FROM users_table GROUP BY value_1, value_2 + ) as foo, + ( + SELECT avg(value_3) as avg2 FROM users_table GROUP BY value_1, value_2, user_id + ) as bar + WHERE foo.avg = bar.avg2 + ORDER BY 1 DESC, 2 DESC + LIMIT 3; + +-- HAVING and ORDER BY tests +SELECT a.user_id, b.value_2, c.avg +FROM ( + SELECT + user_id + FROM + users_table + WHERE + (value_1 > 2) + GROUP BY + user_id + HAVING + count(distinct value_1) > 2 + ORDER BY 1 DESC + LIMIT 3 + ) as a, + ( + SELECT + value_2 + FROM + users_table + WHERE + (value_1 > 2) + GROUP BY + value_2 + HAVING + count(distinct value_1) > 2 + ORDER BY 1 DESC + LIMIT 3 + ) as b, + ( + SELECT + avg(user_id) as avg + FROM + users_table + WHERE + (value_1 > 2) + GROUP BY + value_2 + HAVING + sum(value_1) > 10 + ORDER BY (sum(value_3) - avg(value_1) - COALESCE(array_upper(ARRAY[max(user_id)],1) * 5,0)) DESC + LIMIT 3 + ) as c + + WHERE b.value_2 != a.user_id + ORDER BY 3 DESC, 2 DESC, 1 DESC + LIMIT 5; + +-- zero shard subquery joined with a regular one +SELECT + bar.user_id +FROM + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as foo, + (SELECT + DISTINCT users_table.user_id + FROM + users_table, events_table + WHERE + users_table.user_id = events_table.user_id AND false AND + event_type IN (1,2,3,4) + ORDER BY 1 DESC LIMIT 5 + ) as bar + WHERE foo.user_id > bar.user_id + ORDER BY 1 DESC; + +-- window functions tests, both is recursively planned +SELECT * FROM +( + SELECT + user_id, time, rnk + FROM + ( + SELECT * FROM ( + SELECT + *, rank() OVER my_win as rnk + FROM + events_table + WINDOW my_win AS (PARTITION BY user_id ORDER BY time DESC) + ORDER BY rnk DESC + ) as foo_inner + + LIMIT 4 + ) as foo + ORDER BY + 3 DESC, 1 DESC, 2 DESC +) foo, +( + SELECT + user_id, time, rnk + FROM + ( + SELECT + *, rank() OVER my_win as rnk + FROM + events_table + WHERE + user_id = 3 + WINDOW my_win AS (PARTITION BY event_type ORDER BY time DESC) + + ) as foo + ORDER BY + 3 DESC, 1 DESC, 2 DESC +) bar WHERE foo.user_id = bar.user_id +ORDER BY foo.rnk DESC, foo.time DESC, bar.time LIMIT 5; + +-- cursor test +BEGIN; + + DECLARE recursive_subquery CURSOR FOR + SELECT + event_type, count(distinct value_2) + FROM + events_table + WHERE + user_id IN (SELECT user_id FROM users_table GROUP BY user_id ORDER BY count(*) DESC LIMIT 20) + GROUP BY + event_type + ORDER BY 1 DESC, 2 DESC + LIMIT 3; + + FETCH 1 FROM recursive_subquery; + FETCH 1 FROM recursive_subquery; + FETCH 1 FROM recursive_subquery; + FETCH 1 FROM recursive_subquery; +COMMIT; + +-- cursor test with FETCH ALL +BEGIN; + + DECLARE recursive_subquery CURSOR FOR + SELECT + event_type, count(distinct value_2) + FROM + events_table + WHERE + user_id IN (SELECT user_id FROM users_table GROUP BY user_id ORDER BY count(*) DESC LIMIT 20) + GROUP BY + event_type + ORDER BY 1 DESC, 2 DESC + LIMIT 3; + + FETCH ALL FROM recursive_subquery; + FETCH ALL FROM recursive_subquery; +COMMIT; + +SET client_min_messages TO DEFAULT; + +DROP SCHEMA subquery_complex CASCADE; +SET search_path TO public;