test_fixunion
Sait Talha Nisanci 2021-06-19 00:51:27 +03:00
parent c7d04e7f40
commit f45f247459
9 changed files with 198 additions and 46 deletions

View File

@ -373,8 +373,7 @@ AssignRTEIdentities(List *rangeTableList, int rteIdCounter)
* Note that we're only interested in RTE_RELATIONs and thus assigning
* identifiers to those RTEs only.
*/
if (rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->values_lists == NIL)
if (rangeTableEntry->rtekind == RTE_RELATION && rangeTableEntry->values_lists == NIL)
{
AssignRTEIdentity(rangeTableEntry, rteIdCounter++);
}

View File

@ -582,7 +582,7 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
*/
if (ContainsUnionSubquery(originalQuery))
{
if (!SafeToPushdownUnionSubquery(plannerRestrictionContext))
if (!SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot pushdown the subquery since not all subqueries "

View File

@ -1013,7 +1013,7 @@ ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *contex
PlannerRestrictionContext *filteredRestrictionContext =
FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query);
if (!SafeToPushdownUnionSubquery(filteredRestrictionContext))
if (!SafeToPushdownUnionSubquery(query, filteredRestrictionContext))
{
/*
* The distribution column is not in the same place in all sides

View File

@ -59,6 +59,11 @@ typedef struct AttributeEquivalenceClass
Index unionQueryPartitionKeyIndex;
} AttributeEquivalenceClass;
typedef struct FindRteIdentityContext {
uint32 rteIdentity;
Query* query;
}FindRteIdentityContext;
/*
* AttributeEquivalenceClassMember - one member expression of an
* AttributeEquivalenceClass. The important thing to consider is that
@ -142,8 +147,8 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass
firstClass,
AttributeEquivalenceClass *
secondClass);
static Index RelationRestrictionPartitionKeyIndex(RelationRestriction *
relationRestriction);
static Var* RelationRestrictionPartitionKeyIndex(Query* query, RelationRestriction *
relationRestriction, Index* partitionKeyIndex);
static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext *
restrictionContext);
static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node);
@ -155,6 +160,9 @@ static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEn
queryRteIdentities);
static Relids QueryRteIdentities(Query *queryTree);
static bool
findRteIdentityWalker(Node *node, FindRteIdentityContext* context);
#if PG_VERSION_NUM >= PG_VERSION_13
static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo);
#endif
@ -194,7 +202,7 @@ AllDistributionKeysInQueryAreEqual(Query *originalQuery,
if (originalQuery->setOperations || ContainsUnionSubquery(originalQuery))
{
return SafeToPushdownUnionSubquery(plannerRestrictionContext);
return SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext);
}
return false;
@ -244,7 +252,7 @@ ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext)
* safe to push down, the function would fail to return true.
*/
bool
SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext)
SafeToPushdownUnionSubquery(Query* originalQuery, PlannerRestrictionContext *plannerRestrictionContext)
{
RelationRestrictionContext *restrictionContext =
plannerRestrictionContext->relationRestrictionContext;
@ -267,10 +275,8 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell);
Index partitionKeyIndex = InvalidAttrNumber;
PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo;
List *targetList = relationPlannerRoot->parse->targetList;
List *appendRelList = relationPlannerRoot->append_rel_list;
Var *varToBeAdded = NULL;
TargetEntry *targetEntryToAdd = NULL;
/*
* We first check whether UNION ALLs are pulled up or not. Note that Postgres
@ -294,8 +300,8 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext
}
else
{
partitionKeyIndex =
RelationRestrictionPartitionKeyIndex(relationRestriction);
varToBeAdded =
RelationRestrictionPartitionKeyIndex(originalQuery, relationRestriction, &partitionKeyIndex);
/* union does not have partition key in the target list */
if (partitionKeyIndex == 0)
@ -303,13 +309,12 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext
continue;
}
targetEntryToAdd = list_nth(targetList, partitionKeyIndex - 1);
if (!IsA(targetEntryToAdd->expr, Var))
{
continue;
}
varToBeAdded = (Var *) targetEntryToAdd->expr;
/*
* we update the varno because we use the original parse tree for finding the
* var. However the rest of the code relies on a query tree that might be different
* than the original parse tree because of postgres optimizations.
*/
varToBeAdded->varno = relationRestriction->index;
}
/*
@ -1760,15 +1765,23 @@ ContainsUnionSubquery(Query *queryTree)
* index that the partition key of the relation exists in the query. The query is
* found in the planner info of the relation restriction.
*/
static Index
RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction)
static Var*
RelationRestrictionPartitionKeyIndex(Query* originalQuery, RelationRestriction *relationRestriction, Index* partitionKeyIndex)
{
ListCell *targetEntryCell = NULL;
Index partitionKeyTargetAttrIndex = 0;
PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo;
Query *relationPlannerParseQuery = relationPlannerRoot->parse;
List *relationTargetList = relationPlannerParseQuery->targetList;
FindRteIdentityContext* findRteIdentityContext = palloc0(sizeof(FindRteIdentityContext));
findRteIdentityContext->rteIdentity = GetRTEIdentity(relationRestriction->rte);
findRteIdentityWalker((Node*) originalQuery, findRteIdentityContext);
if (findRteIdentityContext->query == NULL) {
return NULL;
}
List *relationTargetList = findRteIdentityContext->query->targetList;
Query *relationPlannerParseQuery = findRteIdentityContext->query;
foreach(targetEntryCell, relationTargetList)
{
@ -1782,15 +1795,47 @@ RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction)
IsPartitionColumn(targetExpression, relationPlannerParseQuery))
{
Var *targetColumn = (Var *) targetExpression;
if (targetColumn->varno == relationRestriction->index)
RangeTblEntry* res = (RangeTblEntry*) list_nth(relationPlannerParseQuery->rtable, targetColumn->varno - 1);
if (res->rtekind == RTE_RELATION && GetRTEIdentity(res) == findRteIdentityContext->rteIdentity)
{
return partitionKeyTargetAttrIndex;
*partitionKeyIndex = partitionKeyTargetAttrIndex;
return (Var*)copyObject(targetColumn);
}
}
}
return InvalidAttrNumber;
return NULL;
}
/*
* findRteIdentityWalker walks on the given node to find a query
* which has an RTE that has a given rteIdentity.
*/
static bool
findRteIdentityWalker(Node *node, FindRteIdentityContext* context)
{
if (node == NULL)
{
return false;
}
if (IsA(node, Query))
{
Query *query = (Query *) node;
RangeTblEntry* rte = NULL;
foreach_ptr(rte, query->rtable) {
if (rte->rtekind == RTE_RELATION) {
if (GetRTEIdentity(rte) == context->rteIdentity) {
context->query = query;
return true;
}
}
}
query_tree_walker(query, findRteIdentityWalker, context,
QTW_EXAMINE_RTES_BEFORE);
return false;
}
return false;
}

View File

@ -20,7 +20,7 @@
extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
extern bool SafeToPushdownUnionSubquery(PlannerRestrictionContext *
extern bool SafeToPushdownUnionSubquery(Query* originalQuery, PlannerRestrictionContext *
plannerRestrictionContext);
extern bool ContainsUnionSubquery(Query *queryTree);
extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *

View File

@ -740,10 +740,7 @@ DEBUG: Wrapping relation "local" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true
DEBUG: Wrapping relation "local" to a subquery
DEBUG: generating subplan XXX_2 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true
DEBUG: generating subplan XXX_3 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo
DEBUG: generating subplan XXX_4 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba
DEBUG: generating subplan XXX_5 for subquery SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone) UNION ALL SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) bar
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT fo.id, fo.name, fo.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo UNION ALL SELECT ba.id, ba.name, ba.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba) bar
count
---------------------------------------------------------------------
202

View File

@ -236,12 +236,8 @@ FROM
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 8 GROUP BY user_id)) AS bar
WHERE user_id < 2000 ) as level_1 ) as level_2 ) as level_3
ORDER BY 1 DESC LIMIT 10;
DEBUG: push down of limit count: 10
count
---------------------------------------------------------------------
78
(1 row)
DEBUG: generating subplan XXX_1 for subquery SELECT user_id FROM union_pushdown.users_table_part WHERE (value_2 OPERATOR(pg_catalog.=) 3)
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
-- safe to pushdown
SELECT DISTINCT user_id FROM (
SELECT * FROM
@ -912,14 +908,13 @@ $$);
f
(1 row)
-- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
t
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
@ -928,7 +923,7 @@ SELECT AVG(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
t
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
@ -937,7 +932,7 @@ SELECT SUM(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
t
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
@ -946,7 +941,81 @@ SELECT MAX(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
t
f
(1 row)
CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT AVG(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT SUM(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT MAX(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT AVG(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT SUM(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT MAX(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
-- order by prevents postgres from optimizing fields so can be pushed down
@ -966,7 +1035,7 @@ SELECT k, COUNT(*) FROM v GROUP BY k ORDER BY k;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
t
f
(1 row)
RESET client_min_messages;

View File

@ -682,7 +682,6 @@ EXPLAIN
SELECT MAX(id) FROM v;
$$);
-- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(k) FROM v;
@ -703,6 +702,49 @@ EXPLAIN
SELECT MAX(k) FROM v;
$$);
CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT AVG(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT SUM(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT MAX(k) FROM v;
$$);
CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT AVG(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT SUM(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT MAX(k) FROM v;
$$);
-- order by prevents postgres from optimizing fields so can be pushed down
SELECT public.explain_has_distributed_subplan($$
EXPLAIN