Fix UNION not being pushdown

Postgres optimizes column fields that are not needed in the output. We
were relying on these fields to understand if it is safe to push down a
union query.

This fix looks at the parse query, which has the original column fields
to detect if it is safe to push down a union query.
test_hll
Sait Talha Nisanci 2021-06-19 00:51:27 +03:00
parent 2954fb0ee8
commit 4e3c6224fb
7 changed files with 245 additions and 40 deletions

View File

@ -582,7 +582,7 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
*/ */
if (ContainsUnionSubquery(originalQuery)) if (ContainsUnionSubquery(originalQuery))
{ {
if (!SafeToPushdownUnionSubquery(plannerRestrictionContext)) if (!SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext))
{ {
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot pushdown the subquery since not all subqueries " "cannot pushdown the subquery since not all subqueries "

View File

@ -1013,7 +1013,7 @@ ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *contex
PlannerRestrictionContext *filteredRestrictionContext = PlannerRestrictionContext *filteredRestrictionContext =
FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query); FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query);
if (!SafeToPushdownUnionSubquery(filteredRestrictionContext)) if (!SafeToPushdownUnionSubquery(query, filteredRestrictionContext))
{ {
/* /*
* The distribution column is not in the same place in all sides * The distribution column is not in the same place in all sides

View File

@ -59,6 +59,13 @@ typedef struct AttributeEquivalenceClass
Index unionQueryPartitionKeyIndex; Index unionQueryPartitionKeyIndex;
} AttributeEquivalenceClass; } AttributeEquivalenceClass;
typedef struct FindQueryContainingRteIdentityContext
{
int targetRTEIdentity;
Query *query;
Query *tempQuery;
}FindQueryContainingRteIdentityContext;
/* /*
* AttributeEquivalenceClassMember - one member expression of an * AttributeEquivalenceClassMember - one member expression of an
* AttributeEquivalenceClass. The important thing to consider is that * AttributeEquivalenceClass. The important thing to consider is that
@ -142,8 +149,9 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass
firstClass, firstClass,
AttributeEquivalenceClass * AttributeEquivalenceClass *
secondClass); secondClass);
static Index RelationRestrictionPartitionKeyIndex(RelationRestriction * static Var * RelationRestrictionPartitionKeyIndex(Query *query, RelationRestriction *
relationRestriction); relationRestriction,
Index *partitionKeyIndex);
static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext * static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext *
restrictionContext); restrictionContext);
static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node); static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node);
@ -155,6 +163,11 @@ static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEn
queryRteIdentities); queryRteIdentities);
static Relids QueryRteIdentities(Query *queryTree); static Relids QueryRteIdentities(Query *queryTree);
static bool FindQueryContainingRTEIdentityInternal(Node *node,
FindQueryContainingRteIdentityContext *
context);
static Query * FindQueryContainingRTEIdentity(Query *mainQuery, int rteIndex);
#if PG_VERSION_NUM >= PG_VERSION_13 #if PG_VERSION_NUM >= PG_VERSION_13
static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo); static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo);
#endif #endif
@ -194,7 +207,7 @@ AllDistributionKeysInQueryAreEqual(Query *originalQuery,
if (originalQuery->setOperations || ContainsUnionSubquery(originalQuery)) if (originalQuery->setOperations || ContainsUnionSubquery(originalQuery))
{ {
return SafeToPushdownUnionSubquery(plannerRestrictionContext); return SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext);
} }
return false; return false;
@ -244,7 +257,8 @@ ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext)
* safe to push down, the function would fail to return true. * safe to push down, the function would fail to return true.
*/ */
bool bool
SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext) SafeToPushdownUnionSubquery(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{ {
RelationRestrictionContext *restrictionContext = RelationRestrictionContext *restrictionContext =
plannerRestrictionContext->relationRestrictionContext; plannerRestrictionContext->relationRestrictionContext;
@ -267,10 +281,8 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell); RelationRestriction *relationRestriction = lfirst(relationRestrictionCell);
Index partitionKeyIndex = InvalidAttrNumber; Index partitionKeyIndex = InvalidAttrNumber;
PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo; PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo;
List *targetList = relationPlannerRoot->parse->targetList;
List *appendRelList = relationPlannerRoot->append_rel_list; List *appendRelList = relationPlannerRoot->append_rel_list;
Var *varToBeAdded = NULL; Var *varToBeAdded = NULL;
TargetEntry *targetEntryToAdd = NULL;
/* /*
* We first check whether UNION ALLs are pulled up or not. Note that Postgres * We first check whether UNION ALLs are pulled up or not. Note that Postgres
@ -294,8 +306,9 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext
} }
else else
{ {
partitionKeyIndex = varToBeAdded =
RelationRestrictionPartitionKeyIndex(relationRestriction); RelationRestrictionPartitionKeyIndex(originalQuery, relationRestriction,
&partitionKeyIndex);
/* union does not have partition key in the target list */ /* union does not have partition key in the target list */
if (partitionKeyIndex == 0) if (partitionKeyIndex == 0)
@ -303,13 +316,12 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext
continue; continue;
} }
targetEntryToAdd = list_nth(targetList, partitionKeyIndex - 1); /*
if (!IsA(targetEntryToAdd->expr, Var)) * we update the varno because we use the original parse tree for finding the
{ * var. However the rest of the code relies on a query tree that might be different
continue; * than the original parse tree because of postgres optimizations.
} */
varToBeAdded->varno = relationRestriction->index;
varToBeAdded = (Var *) targetEntryToAdd->expr;
} }
/* /*
@ -1760,16 +1772,29 @@ ContainsUnionSubquery(Query *queryTree)
* index that the partition key of the relation exists in the query. The query is * index that the partition key of the relation exists in the query. The query is
* found in the planner info of the relation restriction. * found in the planner info of the relation restriction.
*/ */
static Index static Var *
RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction) RelationRestrictionPartitionKeyIndex(Query *originalQuery,
RelationRestriction *relationRestriction,
Index *partitionKeyIndex)
{ {
int targetRTEIndex = GetRTEIdentity(relationRestriction->rte);
Query *originalQueryContainingRTEIdentity =
FindQueryContainingRTEIdentity(originalQuery, targetRTEIndex);
if (!originalQueryContainingRTEIdentity)
{
/*
* We should always find the query but we have this check for sanity.
* This check makes sure that if there is a bug while finding the query,
* we don't get a crash etc. and the only downside will be we might be recursively
* planning a query that could be pushed down.
*/
return NULL;
}
List *relationTargetList = originalQueryContainingRTEIdentity->targetList;
ListCell *targetEntryCell = NULL; ListCell *targetEntryCell = NULL;
Index partitionKeyTargetAttrIndex = 0; Index partitionKeyTargetAttrIndex = 0;
PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo;
Query *relationPlannerParseQuery = relationPlannerRoot->parse;
List *relationTargetList = relationPlannerParseQuery->targetList;
foreach(targetEntryCell, relationTargetList) foreach(targetEntryCell, relationTargetList)
{ {
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
@ -1779,18 +1804,86 @@ RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction)
if (!targetEntry->resjunk && if (!targetEntry->resjunk &&
IsA(targetExpression, Var) && IsA(targetExpression, Var) &&
IsPartitionColumn(targetExpression, relationPlannerParseQuery)) IsPartitionColumn(targetExpression, originalQueryContainingRTEIdentity))
{ {
Var *targetColumn = (Var *) targetExpression; Var *targetColumn = (Var *) targetExpression;
if (targetColumn->varno == relationRestriction->index) Oid relationId = InvalidOid;
Var *column = NULL;
FindReferencedTableColumn(targetExpression, NIL,
originalQueryContainingRTEIdentity, &relationId,
&column);
targetColumn = column;
RangeTblEntry *resRTE =
(RangeTblEntry *) list_nth(originalQueryContainingRTEIdentity->rtable,
targetColumn->varno - 1);
if (resRTE->rtekind == RTE_RELATION && GetRTEIdentity(resRTE) ==
targetRTEIndex)
{ {
return partitionKeyTargetAttrIndex; *partitionKeyIndex = partitionKeyTargetAttrIndex;
return (Var *) copyObject(targetColumn);
} }
} }
} }
return InvalidAttrNumber; return NULL;
}
/*
* FindQueryContainingRTEIdentity finds the query/subquery that has an RTE
* with rteIndex in its rtable.
*/
static Query *
FindQueryContainingRTEIdentity(Query *mainQuery, int rteIndex)
{
FindQueryContainingRteIdentityContext *findRteIdentityContext =
palloc0(sizeof(FindQueryContainingRteIdentityContext));
findRteIdentityContext->targetRTEIdentity = rteIndex;
FindQueryContainingRTEIdentityInternal((Node *) mainQuery, findRteIdentityContext);
return findRteIdentityContext->query;
}
/*
* FindQueryContainingRTEIdentityInternal walks on the given node to find a query
* which has an RTE that has a given rteIdentity.
*/
static bool
FindQueryContainingRTEIdentityInternal(Node *node,
FindQueryContainingRteIdentityContext *context)
{
if (node == NULL)
{
return false;
}
if (IsA(node, Query))
{
Query *query = (Query *) node;
Query *prevQuery = context->tempQuery;
context->tempQuery = query;
query_tree_walker(query, FindQueryContainingRTEIdentityInternal, context,
QTW_EXAMINE_RTES_BEFORE);
context->tempQuery = prevQuery;
return false;
}
if (!IsA(node, RangeTblEntry))
{
return expression_tree_walker(node, FindQueryContainingRTEIdentityInternal,
context);
}
RangeTblEntry *rte = (RangeTblEntry *) node;
if (rte->rtekind == RTE_RELATION)
{
if (GetRTEIdentity(rte) == context->targetRTEIdentity)
{
context->query = context->tempQuery;
return true;
}
}
return false;
} }

View File

@ -20,7 +20,7 @@
extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery, extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
extern bool SafeToPushdownUnionSubquery(PlannerRestrictionContext * extern bool SafeToPushdownUnionSubquery(Query *originalQuery, PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
extern bool ContainsUnionSubquery(Query *queryTree); extern bool ContainsUnionSubquery(Query *queryTree);
extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext * extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *

View File

@ -740,10 +740,7 @@ DEBUG: Wrapping relation "local" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true DEBUG: generating subplan XXX_1 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true
DEBUG: Wrapping relation "local" to a subquery DEBUG: Wrapping relation "local" to a subquery
DEBUG: generating subplan XXX_2 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true DEBUG: generating subplan XXX_2 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true
DEBUG: generating subplan XXX_3 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT fo.id, fo.name, fo.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo UNION ALL SELECT ba.id, ba.name, ba.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba) bar
DEBUG: generating subplan XXX_4 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba
DEBUG: generating subplan XXX_5 for subquery SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone) UNION ALL SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) bar
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
202 202

View File

@ -912,14 +912,13 @@ $$);
f f
(1 row) (1 row)
-- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781
SELECT public.explain_has_distributed_subplan($$ SELECT public.explain_has_distributed_subplan($$
EXPLAIN EXPLAIN
SELECT COUNT(k) FROM v; SELECT COUNT(k) FROM v;
$$); $$);
explain_has_distributed_subplan explain_has_distributed_subplan
--------------------------------------------------------------------- ---------------------------------------------------------------------
t f
(1 row) (1 row)
SELECT public.explain_has_distributed_subplan($$ SELECT public.explain_has_distributed_subplan($$
@ -928,7 +927,7 @@ SELECT AVG(k) FROM v;
$$); $$);
explain_has_distributed_subplan explain_has_distributed_subplan
--------------------------------------------------------------------- ---------------------------------------------------------------------
t f
(1 row) (1 row)
SELECT public.explain_has_distributed_subplan($$ SELECT public.explain_has_distributed_subplan($$
@ -937,7 +936,7 @@ SELECT SUM(k) FROM v;
$$); $$);
explain_has_distributed_subplan explain_has_distributed_subplan
--------------------------------------------------------------------- ---------------------------------------------------------------------
t f
(1 row) (1 row)
SELECT public.explain_has_distributed_subplan($$ SELECT public.explain_has_distributed_subplan($$
@ -946,7 +945,81 @@ SELECT MAX(k) FROM v;
$$); $$);
explain_has_distributed_subplan explain_has_distributed_subplan
--------------------------------------------------------------------- ---------------------------------------------------------------------
t f
(1 row)
CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT AVG(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT SUM(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT MAX(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT AVG(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT SUM(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT MAX(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row) (1 row)
-- order by prevents postgres from optimizing fields so can be pushed down -- order by prevents postgres from optimizing fields so can be pushed down
@ -966,7 +1039,7 @@ SELECT k, COUNT(*) FROM v GROUP BY k ORDER BY k;
$$); $$);
explain_has_distributed_subplan explain_has_distributed_subplan
--------------------------------------------------------------------- ---------------------------------------------------------------------
t f
(1 row) (1 row)
RESET client_min_messages; RESET client_min_messages;

View File

@ -682,7 +682,6 @@ EXPLAIN
SELECT MAX(id) FROM v; SELECT MAX(id) FROM v;
$$); $$);
-- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781
SELECT public.explain_has_distributed_subplan($$ SELECT public.explain_has_distributed_subplan($$
EXPLAIN EXPLAIN
SELECT COUNT(k) FROM v; SELECT COUNT(k) FROM v;
@ -703,6 +702,49 @@ EXPLAIN
SELECT MAX(k) FROM v; SELECT MAX(k) FROM v;
$$); $$);
CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT AVG(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT SUM(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT MAX(k) FROM v;
$$);
CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT AVG(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT SUM(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT MAX(k) FROM v;
$$);
-- order by prevents postgres from optimizing fields so can be pushed down -- order by prevents postgres from optimizing fields so can be pushed down
SELECT public.explain_has_distributed_subplan($$ SELECT public.explain_has_distributed_subplan($$
EXPLAIN EXPLAIN