Leaf level recursive planning for non colocated subqueries

With this commit, we enable recursive planning for the subqueries
that are not joined on the distribution keys.
pull/2016/head
Onder Kalaci 2018-02-13 16:22:35 +02:00
parent e998703ff8
commit 4d70c86645
8 changed files with 279 additions and 49 deletions

View File

@ -86,6 +86,7 @@ typedef struct RecursivePlanningContext
{
int level;
uint64 planId;
bool queryContainsDistributionKeyEquality; /* used for some optimizations */
List *subPlanList;
PlannerRestrictionContext *plannerRestrictionContext;
} RecursivePlanningContext;
@ -121,7 +122,11 @@ static bool RecursivelyPlanAllSubqueries(Node *node,
static DeferredErrorMessage * RecursivelyPlanCTEs(Query *query,
RecursivePlanningContext *context);
static bool RecursivelyPlanSubqueryWalker(Node *node, RecursivePlanningContext *context);
static bool ShouldRecursivelyPlanSubquery(Query *subquery);
static bool ShouldRecursivelyPlanSubquery(Query *subquery,
RecursivePlanningContext *context);
static bool SubqueryContainsDistributionKeyEquality(Query *subquery,
PlannerRestrictionContext *
restrictionContext);
static bool ShouldRecursivelyPlanSetOperation(Query *query,
RecursivePlanningContext *context);
static void RecursivelyPlanSetOperations(Query *query, Node *node,
@ -162,6 +167,21 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
context.subPlanList = NIL;
context.plannerRestrictionContext = plannerRestrictionContext;
/*
* Calculating the distribution key equality upfront is a trade-off for us.
*
* When the originalQuery contains the distribution key equality, we'd be
* able to skip further checks for each lower level subqueries (i.e., if the
* all query contains distribution key equality, each subquery also contains
* distribution key equality.)
*
* When the originalQuery doesn't contain the distribution key equality,
* calculating this wouldn't help us at all, we should individually check
* each each subquery and subquery joins among subqueries.
*/
context.queryContainsDistributionKeyEquality =
QueryContainsDistributionKeyEquality(plannerRestrictionContext, originalQuery);
error = RecursivelyPlanSubqueriesAndCTEs(originalQuery, &context);
if (error != NULL)
{
@ -497,7 +517,7 @@ RecursivelyPlanSubqueryWalker(Node *node, RecursivePlanningContext *context)
* Recursively plan this subquery if it cannot be pushed down and is
* eligible for recursive planning.
*/
if (ShouldRecursivelyPlanSubquery(query))
if (ShouldRecursivelyPlanSubquery(query, context))
{
RecursivelyPlanSubquery(query, context);
}
@ -517,7 +537,7 @@ RecursivelyPlanSubqueryWalker(Node *node, RecursivePlanningContext *context)
* For the details, see the cases in the function.
*/
static bool
ShouldRecursivelyPlanSubquery(Query *subquery)
ShouldRecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *context)
{
if (FindNodeCheckInRangeTableList(subquery->rtable, IsLocalTableRTE))
{
@ -533,6 +553,22 @@ ShouldRecursivelyPlanSubquery(Query *subquery)
}
else if (DeferErrorIfCannotPushdownSubquery(subquery, false) == NULL)
{
/*
* We should do one more check for the distribution key equality.
*
* If the input query to the planner doesn't contain distribution key equality,
* we should further check whether this individual subquery contains or not.
*
* If all the relations are not joined on their distribution keys for the
* subquery, we cannot pushdown the it, thus, recursively plan it.
*/
if (!context->queryContainsDistributionKeyEquality &&
!SubqueryContainsDistributionKeyEquality(subquery,
context->plannerRestrictionContext))
{
return true;
}
/*
* Citus can pushdown this subquery, no need to recursively
* plan which is much expensive than pushdown.
@ -553,6 +589,39 @@ ShouldRecursivelyPlanSubquery(Query *subquery)
}
/*
* SubqueryContainsDistributionKeyEquality is a wrapper function
* for QueryContainsDistributionKeyEquality(). Here, we filter the
* planner restrictions for the given subquery and do the restriction
* equality checks on the filtered restriction.
*/
static bool
SubqueryContainsDistributionKeyEquality(Query *subquery,
PlannerRestrictionContext *restrictionContext)
{
bool queryContainsDistributionKeyEquality = false;
PlannerRestrictionContext *filteredRestrictionContext = NULL;
/* we don't support distribution eq. checks for CTEs yet */
if (subquery->cteList != NIL)
{
return false;
}
filteredRestrictionContext =
FilterPlannerRestrictionForQuery(restrictionContext, subquery);
queryContainsDistributionKeyEquality =
QueryContainsDistributionKeyEquality(filteredRestrictionContext, subquery);
if (!queryContainsDistributionKeyEquality)
{
return false;
}
return true;
}
/*
* ShouldRecursivelyPlanSetOperation determines whether the leaf queries of a
* set operations tree need to be recursively planned in order to support the

View File

@ -63,6 +63,7 @@ typedef struct AttributeEquivalenceClassMember
} AttributeEquivalenceClassMember;
static bool ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext);
static Var * FindTranslatedVar(List *appendRelList, Oid relationOid,
Index relationRteIndex, Index *partitionKeyIndex);
static bool EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
@ -154,9 +155,24 @@ bool
QueryContainsDistributionKeyEquality(PlannerRestrictionContext *plannerRestrictionContext,
Query *originalQuery)
{
bool restrictionEquivalenceForPartitionKeys =
RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext);
bool restrictionEquivalenceForPartitionKeys = false;
RelationRestrictionContext *restrictionContext = NULL;
/* we don't support distribution key equality checks for CTEs yet */
if (originalQuery->cteList != NIL)
{
return false;
}
/* we don't support distribution key equality checks for local tables */
restrictionContext = plannerRestrictionContext->relationRestrictionContext;
if (ContextContainsLocalRelation(restrictionContext))
{
return false;
}
restrictionEquivalenceForPartitionKeys =
RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext);
if (restrictionEquivalenceForPartitionKeys)
{
return true;
@ -171,6 +187,29 @@ QueryContainsDistributionKeyEquality(PlannerRestrictionContext *plannerRestricti
}
/*
* ContextContainsLocalRelation determines whether the given
* RelationRestrictionContext contains any local tables.
*/
static bool
ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext)
{
ListCell *relationRestrictionCell = NULL;
foreach(relationRestrictionCell, restrictionContext->relationRestrictionList)
{
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell);
if (!relationRestriction->distributedRelation)
{
return true;
}
}
return false;
}
/*
* SafeToPushdownUnionSubquery returns true if all the relations are returns
* partition keys in the same ordinal position and there is no reference table
@ -215,7 +254,6 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext
foreach(relationRestrictionCell, restrictionContext->relationRestrictionList)
{
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell);
Oid relationId = relationRestriction->relationId;
Index partitionKeyIndex = InvalidAttrNumber;
PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo;
List *targetList = relationPlannerRoot->parse->targetList;
@ -223,20 +261,6 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext
Var *varToBeAdded = NULL;
TargetEntry *targetEntryToAdd = NULL;
/*
* Although it is not the best place to error out when facing with reference
* tables, we decide to error out here. Otherwise, we need to add equality
* for each reference table and it is more complex to implement. In the
* future implementation all checks will be gathered to single point.
*/
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot pushdown this query"),
errdetail(
"Reference tables are not allowed with set operations")));
}
/*
* We first check whether UNION ALLs are pulled up or not. Note that Postgres
* planner creates AppendRelInfos per each UNION ALL query that is pulled up.

View File

@ -27,6 +27,7 @@ SET
shardmaxvalue = '14947'
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'orders_subquery'::regclass ORDER BY shardid DESC LIMIT 1);
SET client_min_messages TO DEBUG1;
-- If group by is not on partition column then we recursively plan
SELECT
avg(order_count)
@ -38,12 +39,14 @@ FROM
lineitem_subquery
GROUP BY
l_suppkey) AS order_counts;
DEBUG: generating subplan 2_1 for subquery SELECT l_suppkey, count(*) AS order_count FROM public.lineitem_subquery GROUP BY l_suppkey
DEBUG: Plan 2 query after replacing subqueries and CTEs: SELECT avg(order_count) AS avg FROM (SELECT intermediate_result.l_suppkey, intermediate_result.order_count FROM read_intermediate_result('2_1'::text, 'binary'::citus_copy_format) intermediate_result(l_suppkey integer, order_count bigint)) order_counts
avg
--------------------
1.7199369356456930
(1 row)
-- Check that we error out if join is not on partition columns.
-- Check that we recursively plan if join is not on partition columns.
SELECT
avg(unit_price)
FROM
@ -55,7 +58,35 @@ FROM
orders_subquery
GROUP BY
l_orderkey) AS unit_prices;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
ERROR: cannot perform distributed planning on this query
DETAIL: Cartesian products are currently unsupported
-- this query is only required to execute
-- the following query given that recursive planning
-- (in general real-time queries in transactions)
-- do not execute shard fetch tasks and the next
-- query relies on that
SELECT
l_orderkey,
avg(o_totalprice / l_quantity) AS unit_price
FROM
lineitem_subquery,
orders_subquery
WHERE
l_orderkey = o_custkey
GROUP BY
l_orderkey
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
DEBUG: push down of limit count: 5
l_orderkey | unit_price
------------+------------------------
1124 | 56102.2804738959822181
230 | 54613.8568599033816703
935 | 51688.6111227238944448
451 | 51673.9297867063492063
646 | 50919.3957476807927619
(5 rows)
SELECT
avg(unit_price)
FROM
@ -69,7 +100,14 @@ FROM
l_orderkey = o_custkey
GROUP BY
l_orderkey) AS unit_prices;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DEBUG: generating subplan 7_1 for subquery SELECT lineitem_subquery.l_orderkey, avg((orders_subquery.o_totalprice / lineitem_subquery.l_quantity)) AS unit_price FROM public.lineitem_subquery, public.orders_subquery WHERE (lineitem_subquery.l_orderkey = orders_subquery.o_custkey) GROUP BY lineitem_subquery.l_orderkey
DEBUG: Plan 7 query after replacing subqueries and CTEs: SELECT avg(unit_price) AS avg FROM (SELECT intermediate_result.l_orderkey, intermediate_result.unit_price FROM read_intermediate_result('7_1'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint, unit_price numeric)) unit_prices
avg
------------------------
12973.7343244916919367
(1 row)
RESET client_min_messages;
-- Subqueries without relation with a volatile functions (non-constant) are planned recursively
SELECT count(*) FROM (
SELECT l_orderkey FROM lineitem_subquery JOIN (SELECT random()::int r) sub ON (l_orderkey = r) WHERE r > 10
@ -86,11 +124,11 @@ SELECT count(*) FROM
(SELECT l_orderkey FROM lineitem_subquery) UNION ALL
(SELECT 1::bigint)
) b;
DEBUG: generating subplan 7_1 for subquery SELECT l_orderkey FROM public.lineitem_subquery
DEBUG: generating subplan 10_1 for subquery SELECT l_orderkey FROM public.lineitem_subquery
DEBUG: Creating router plan
DEBUG: Plan is router executable
DEBUG: generating subplan 7_2 for subquery SELECT intermediate_result.l_orderkey FROM read_intermediate_result('7_1'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint) UNION ALL SELECT (1)::bigint AS int8
DEBUG: Plan 7 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.l_orderkey FROM read_intermediate_result('7_2'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint)) b
DEBUG: generating subplan 10_2 for subquery SELECT intermediate_result.l_orderkey FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint) UNION ALL SELECT (1)::bigint AS int8
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.l_orderkey FROM read_intermediate_result('10_2'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint)) b
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
@ -104,12 +142,12 @@ SELECT count(*) FROM
(SELECT l_orderkey FROM lineitem_subquery) UNION
(SELECT l_partkey FROM lineitem_subquery)
) b;
DEBUG: generating subplan 10_1 for subquery SELECT l_orderkey FROM public.lineitem_subquery
DEBUG: generating subplan 10_2 for subquery SELECT l_partkey FROM public.lineitem_subquery
DEBUG: generating subplan 13_1 for subquery SELECT l_orderkey FROM public.lineitem_subquery
DEBUG: generating subplan 13_2 for subquery SELECT l_partkey FROM public.lineitem_subquery
DEBUG: Creating router plan
DEBUG: Plan is router executable
DEBUG: generating subplan 10_3 for subquery SELECT intermediate_result.l_orderkey FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint) UNION SELECT intermediate_result.l_partkey FROM read_intermediate_result('10_2'::text, 'binary'::citus_copy_format) intermediate_result(l_partkey integer)
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.l_orderkey FROM read_intermediate_result('10_3'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint)) b
DEBUG: generating subplan 13_3 for subquery SELECT intermediate_result.l_orderkey FROM read_intermediate_result('13_1'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint) UNION SELECT intermediate_result.l_partkey FROM read_intermediate_result('13_2'::text, 'binary'::citus_copy_format) intermediate_result(l_partkey integer)
DEBUG: Plan 13 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.l_orderkey FROM read_intermediate_result('13_3'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint)) b
DEBUG: Creating router plan
DEBUG: Plan is router executable
count

View File

@ -352,7 +352,9 @@ ORDER BY
3 | 75
(3 rows)
-- not supported since events_subquery_5 is not joined on partition key
SET citus.enable_repartition_joins to ON;
SET client_min_messages TO DEBUG1;
-- recursively planned since events_subquery_5 is not joined on partition key
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT
@ -423,7 +425,23 @@ GROUP BY
types
ORDER BY
types;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DEBUG: cannot use real time executor with repartition jobs
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
DEBUG: generating subplan 16_1 for subquery SELECT max(events."time") AS max, 0 AS event, events.user_id FROM public.events_table events, public.users_table users WHERE ((events.user_id = users.value_2) AND (events.event_type = ANY (ARRAY[1, 2]))) GROUP BY events.user_id
DEBUG: generating subplan 16_2 for subquery SELECT "time", event, user_id FROM (SELECT events."time", 0 AS event, events.user_id FROM public.events_table events WHERE (events.event_type = ANY (ARRAY[1, 2]))) events_subquery_1
DEBUG: generating subplan 16_3 for subquery SELECT "time", event, user_id FROM (SELECT events."time", 2 AS event, events.user_id FROM public.events_table events WHERE (events.event_type = ANY (ARRAY[3, 4]))) events_subquery_3
DEBUG: generating subplan 16_4 for subquery SELECT "time", event, user_id FROM (SELECT events."time", 3 AS event, events.user_id FROM public.events_table events WHERE (events.event_type = ANY (ARRAY[5, 6]))) events_subquery_4
DEBUG: generating subplan 16_5 for subquery SELECT intermediate_result."time", intermediate_result.event, intermediate_result.user_id FROM read_intermediate_result('16_2'::text, 'binary'::citus_copy_format) intermediate_result("time" timestamp without time zone, event integer, user_id integer) UNION SELECT events_subquery_2.max, events_subquery_2.event, events_subquery_2.user_id FROM (SELECT events_subquery_5.max, events_subquery_5.event, events_subquery_5.user_id FROM (SELECT intermediate_result.max, intermediate_result.event, intermediate_result.user_id FROM read_intermediate_result('16_1'::text, 'binary'::citus_copy_format) intermediate_result(max timestamp without time zone, event integer, user_id integer)) events_subquery_5) events_subquery_2 UNION SELECT intermediate_result."time", intermediate_result.event, intermediate_result.user_id FROM read_intermediate_result('16_3'::text, 'binary'::citus_copy_format) intermediate_result("time" timestamp without time zone, event integer, user_id integer) UNION SELECT intermediate_result."time", intermediate_result.event, intermediate_result.user_id FROM read_intermediate_result('16_4'::text, 'binary'::citus_copy_format) intermediate_result("time" timestamp without time zone, event integer, user_id integer)
DEBUG: Plan 16 query after replacing subqueries and CTEs: SELECT event_types AS types, count(*) AS sumofeventtype FROM (SELECT q.user_id, q."time", q.event_types, t.user_id, random() AS random FROM ((SELECT t_1.user_id, t_1."time", unnest(t_1.collected_events) AS event_types FROM (SELECT t1.user_id, min(t1."time") AS "time", array_agg(t1.event ORDER BY t1."time", t1.event DESC) AS collected_events FROM (SELECT intermediate_result."time", intermediate_result.event, intermediate_result.user_id FROM read_intermediate_result('16_5'::text, 'binary'::citus_copy_format) intermediate_result("time" timestamp without time zone, event integer, user_id integer)) t1 GROUP BY t1.user_id) t_1) q JOIN (SELECT users.user_id FROM public.users_table users WHERE ((users.value_1 > 0) AND (users.value_1 < 4))) t ON ((t.user_id = q.user_id)))) final_query(user_id, "time", event_types, user_id_1, random) GROUP BY event_types ORDER BY event_types
types | sumofeventtype
-------+----------------
0 | 449
2 | 433
3 | 75
(3 rows)
RESET client_min_messages;
SET citus.enable_repartition_joins to OFF;
-- not supported since the join is not equi join
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
@ -1461,7 +1479,9 @@ ORDER BY
user_id DESC
LIMIT 10;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- not supported since the inner JOIN is not on the partition key
SET citus.enable_repartition_joins to ON;
SET client_min_messages TO DEBUG1;
-- recursively planner since the inner JOIN is not on the partition key
SELECT user_id, lastseen
FROM
(SELECT
@ -1514,7 +1534,13 @@ FROM
ORDER BY
user_id DESC
LIMIT 10;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DEBUG: cannot use real time executor with repartition jobs
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
DEBUG: generating subplan 55_1 for subquery SELECT user_where_1_1.user_id FROM ((SELECT users.user_id FROM public.users_table users WHERE ((users.user_id > 1) AND (users.user_id < 4) AND (users.value_1 > 2))) user_where_1_1 JOIN (SELECT users.user_id, users.value_1 FROM public.users_table users WHERE ((users.user_id > 1) AND (users.user_id < 4) AND (users.value_2 > 3))) user_where_1_join_1 ON ((user_where_1_1.user_id = user_where_1_join_1.value_1)))
ERROR: cannot push down this subquery
DETAIL: Limit in subquery is currently unsupported when a subquery references a column from another query
SET citus.enable_repartition_joins to OFF;
RESET client_min_messages;
-- not supported since upper LATERAL JOIN is not equi join
SELECT user_id, lastseen
FROM
@ -1676,7 +1702,9 @@ ORDER BY
1 | 0
(4 rows)
-- not supported since the first inner join is not on the partition key
SET citus.enable_repartition_joins to ON;
SET client_min_messages TO DEBUG1;
-- recursively planned since the first inner join is not on the partition key
SELECT
count(*) AS value, "generated_group_field"
FROM
@ -1718,8 +1746,20 @@ GROUP BY
"generated_group_field"
ORDER BY
generated_group_field DESC, value DESC;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- not supported since the first inner join is not an equi join
DEBUG: cannot use real time executor with repartition jobs
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
DEBUG: generating subplan 63_1 for subquery SELECT user_where_1_1.real_user_id FROM ((SELECT users.user_id AS real_user_id FROM public.users_table users WHERE ((users.user_id > 1) AND (users.user_id < 4) AND (users.value_2 > 3))) user_where_1_1 JOIN (SELECT users.user_id, users.value_2 FROM public.users_table users WHERE ((users.user_id > 1) AND (users.user_id < 4) AND (users.value_3 > (3)::double precision))) user_where_1_join_1 ON ((user_where_1_1.real_user_id = user_where_1_join_1.value_2)))
DEBUG: generating subplan 63_2 for subquery SELECT DISTINCT real_user_id, generated_group_field FROM (SELECT "eventQuery".real_user_id, "eventQuery"."time", random() AS random, "eventQuery".value_2 AS generated_group_field FROM (SELECT temp_data_queries."time", temp_data_queries.user_id, temp_data_queries.value_2, user_filters_1.real_user_id FROM ((SELECT events."time", events.user_id, events.value_2 FROM public.events_table events WHERE ((events.user_id > 1) AND (events.user_id < 4) AND (events.event_type = ANY (ARRAY[4, 5])))) temp_data_queries JOIN (SELECT intermediate_result.real_user_id FROM read_intermediate_result('63_1'::text, 'binary'::citus_copy_format) intermediate_result(real_user_id integer)) user_filters_1 ON ((temp_data_queries.user_id = user_filters_1.real_user_id)))) "eventQuery") "pushedDownQuery"
DEBUG: Plan 63 query after replacing subqueries and CTEs: SELECT count(*) AS value, generated_group_field FROM (SELECT intermediate_result.real_user_id, intermediate_result.generated_group_field FROM read_intermediate_result('63_2'::text, 'binary'::citus_copy_format) intermediate_result(real_user_id integer, generated_group_field integer)) "pushedDownQuery" GROUP BY generated_group_field ORDER BY generated_group_field DESC, (count(*)) DESC
value | generated_group_field
-------+-----------------------
1 | 5
2 | 2
2 | 1
1 | 0
(4 rows)
-- recursive planning didn't kick-in since the non-equi join is among subqueries
SELECT
count(*) AS value, "generated_group_field"
FROM
@ -1762,6 +1802,9 @@ GROUP BY
ORDER BY
generated_group_field DESC, value DESC;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
SET citus.enable_repartition_joins to OFF;
RESET client_min_messages;
-- single level inner joins
SELECT
"value_3", count(*) AS cnt
@ -1809,7 +1852,10 @@ ORDER BY cnt, value_3 DESC LIMIT 10;
2 | 35
(6 rows)
-- not supported since there is no partition column equality at all
SET citus.enable_repartition_joins to ON;
SET client_min_messages TO DEBUG1;
-- not supported since there is no column equality at all
-- but still recursive planning is tried
SELECT
"value_3", count(*) AS cnt
FROM
@ -1846,7 +1892,10 @@ FROM
) segmentalias_1) "tempQuery"
GROUP BY "value_3"
ORDER BY cnt, value_3 DESC LIMIT 10;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
ERROR: cannot perform distributed planning on this query
DETAIL: Cartesian products are currently unsupported
SET citus.enable_repartition_joins to OFF;
RESET client_min_messages;
-- nested LATERAL JOINs
SET citus.subquery_pushdown to ON;
SELECT *

View File

@ -845,7 +845,7 @@ EXPLAIN (COSTS FALSE) SELECT et.* FROM recent_10_users JOIN events_table et USIN
-> Sort
Sort Key: remote_scan."time" DESC
-> Custom Scan (Citus Real-Time)
-> Distributed Subplan 91_1
-> Distributed Subplan 93_1
-> Limit
-> Sort
Sort Key: max((max(remote_scan.lastseen))) DESC

View File

@ -776,7 +776,6 @@ DEBUG: Plan is router executable
-- repartition is recursively planned with the set operation
(SELECT x FROM test) INTERSECT (SELECT t1.x FROM test as t1, test as t2 WHERE t1.x = t2.y) ORDER BY 1 DESC;
DEBUG: generating subplan 164_1 for subquery SELECT x FROM recursive_union.test
DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3
@ -807,8 +806,9 @@ DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 24
DEBUG: cannot use real time executor with repartition jobs
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
DEBUG: generating subplan 164_2 for subquery SELECT t1.x FROM recursive_union.test t1, recursive_union.test t2 WHERE (t1.x = t2.y)
DEBUG: Plan 164 query after replacing subqueries and CTEs: SELECT intermediate_result.x FROM read_intermediate_result('164_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer) INTERSECT SELECT intermediate_result.x FROM read_intermediate_result('164_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer) ORDER BY 1 DESC
DEBUG: generating subplan 164_1 for subquery SELECT t1.x FROM recursive_union.test t1, recursive_union.test t2 WHERE (t1.x = t2.y)
DEBUG: generating subplan 164_2 for subquery SELECT x FROM recursive_union.test
DEBUG: Plan 164 query after replacing subqueries and CTEs: SELECT intermediate_result.x FROM read_intermediate_result('164_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer) INTERSECT SELECT intermediate_result.x FROM read_intermediate_result('164_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer) ORDER BY 1 DESC
DEBUG: Creating router plan
DEBUG: Plan is router executable
x

View File

@ -29,6 +29,7 @@ SET
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'orders_subquery'::regclass ORDER BY shardid DESC LIMIT 1);
SET client_min_messages TO DEBUG1;
-- If group by is not on partition column then we recursively plan
SELECT
avg(order_count)
@ -41,7 +42,7 @@ FROM
GROUP BY
l_suppkey) AS order_counts;
-- Check that we error out if join is not on partition columns.
-- Check that we recursively plan if join is not on partition columns.
SELECT
avg(unit_price)
FROM
@ -54,6 +55,24 @@ FROM
GROUP BY
l_orderkey) AS unit_prices;
-- this query is only required to execute
-- the following query given that recursive planning
-- (in general real-time queries in transactions)
-- do not execute shard fetch tasks and the next
-- query relies on that
SELECT
l_orderkey,
avg(o_totalprice / l_quantity) AS unit_price
FROM
lineitem_subquery,
orders_subquery
WHERE
l_orderkey = o_custkey
GROUP BY
l_orderkey
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
SELECT
avg(unit_price)
FROM
@ -68,6 +87,8 @@ FROM
GROUP BY
l_orderkey) AS unit_prices;
RESET client_min_messages;
-- Subqueries without relation with a volatile functions (non-constant) are planned recursively
SELECT count(*) FROM (
SELECT l_orderkey FROM lineitem_subquery JOIN (SELECT random()::int r) sub ON (l_orderkey = r) WHERE r > 10

View File

@ -319,7 +319,10 @@ GROUP BY
ORDER BY
types;
-- not supported since events_subquery_5 is not joined on partition key
SET citus.enable_repartition_joins to ON;
SET client_min_messages TO DEBUG1;
-- recursively planned since events_subquery_5 is not joined on partition key
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT
@ -391,6 +394,9 @@ GROUP BY
ORDER BY
types;
RESET client_min_messages;
SET citus.enable_repartition_joins to OFF;
-- not supported since the join is not equi join
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
@ -1357,7 +1363,11 @@ ORDER BY
user_id DESC
LIMIT 10;
-- not supported since the inner JOIN is not on the partition key
SET citus.enable_repartition_joins to ON;
SET client_min_messages TO DEBUG1;
-- recursively planner since the inner JOIN is not on the partition key
SELECT user_id, lastseen
FROM
(SELECT
@ -1412,6 +1422,9 @@ ORDER BY
LIMIT 10;
SET citus.enable_repartition_joins to OFF;
RESET client_min_messages;
-- not supported since upper LATERAL JOIN is not equi join
SELECT user_id, lastseen
FROM
@ -1565,7 +1578,11 @@ GROUP BY
ORDER BY
generated_group_field DESC, value DESC;
-- not supported since the first inner join is not on the partition key
SET citus.enable_repartition_joins to ON;
SET client_min_messages TO DEBUG1;
-- recursively planned since the first inner join is not on the partition key
SELECT
count(*) AS value, "generated_group_field"
FROM
@ -1608,7 +1625,7 @@ GROUP BY
ORDER BY
generated_group_field DESC, value DESC;
-- not supported since the first inner join is not an equi join
-- recursive planning didn't kick-in since the non-equi join is among subqueries
SELECT
count(*) AS value, "generated_group_field"
FROM
@ -1651,6 +1668,10 @@ GROUP BY
ORDER BY
generated_group_field DESC, value DESC;
SET citus.enable_repartition_joins to OFF;
RESET client_min_messages;
-- single level inner joins
SELECT
"value_3", count(*) AS cnt
@ -1690,7 +1711,12 @@ GROUP BY "value_3"
ORDER BY cnt, value_3 DESC LIMIT 10;
-- not supported since there is no partition column equality at all
SET citus.enable_repartition_joins to ON;
SET client_min_messages TO DEBUG1;
-- not supported since there is no column equality at all
-- but still recursive planning is tried
SELECT
"value_3", count(*) AS cnt
FROM
@ -1728,6 +1754,9 @@ FROM
GROUP BY "value_3"
ORDER BY cnt, value_3 DESC LIMIT 10;
SET citus.enable_repartition_joins to OFF;
RESET client_min_messages;
-- nested LATERAL JOINs
SET citus.subquery_pushdown to ON;
SELECT *