mirror of https://github.com/citusdata/citus.git
Refactor WrapRteRelationIntoSubquery
parent
0e53aa5d3b
commit
7951273f74
|
@ -47,7 +47,14 @@
|
||||||
static RangeTblEntry * AnchorRte(Query *subquery);
|
static RangeTblEntry * AnchorRte(Query *subquery);
|
||||||
static List * UnionRelationRestrictionLists(List *firstRelationList,
|
static List * UnionRelationRestrictionLists(List *firstRelationList,
|
||||||
List *secondRelationList);
|
List *secondRelationList);
|
||||||
static void MakeVarAttNosSequential(List *targetList);
|
static List * CreateFilteredTargetListForRelation(Oid relationId,
|
||||||
|
List *requiredAttributes);
|
||||||
|
static List * CreateDummyTargetList(Oid relationId, List *requiredAttributes);
|
||||||
|
static TargetEntry * CreateTargetEntryForColumn(Form_pg_attribute attributeTuple, Index
|
||||||
|
rteIndex,
|
||||||
|
int attributeNumber, int resno);
|
||||||
|
static TargetEntry * CreateTargetEntryForNullCol(Form_pg_attribute attributeTuple, int
|
||||||
|
resno);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -78,9 +85,7 @@ CreateColocatedJoinChecker(Query *subquery, PlannerRestrictionContext *restricti
|
||||||
* functions (i.e., FilterPlannerRestrictionForQuery()) rely on queries
|
* functions (i.e., FilterPlannerRestrictionForQuery()) rely on queries
|
||||||
* not relations.
|
* not relations.
|
||||||
*/
|
*/
|
||||||
List *allTargetList = NIL;
|
anchorSubquery = WrapRteRelationIntoSubquery(anchorRangeTblEntry, NIL);
|
||||||
anchorSubquery = WrapRteRelationIntoSubquery(anchorRangeTblEntry, NIL,
|
|
||||||
&allTargetList);
|
|
||||||
}
|
}
|
||||||
else if (anchorRangeTblEntry->rtekind == RTE_SUBQUERY)
|
else if (anchorRangeTblEntry->rtekind == RTE_SUBQUERY)
|
||||||
{
|
{
|
||||||
|
@ -262,13 +267,9 @@ SubqueryColocated(Query *subquery, ColocatedJoinChecker *checker)
|
||||||
* Note that the query returned by this function does not contain any filters or
|
* Note that the query returned by this function does not contain any filters or
|
||||||
* projections. The returned query should be used cautiosly and it is mostly
|
* projections. The returned query should be used cautiosly and it is mostly
|
||||||
* designed for generating a stub query.
|
* designed for generating a stub query.
|
||||||
*
|
|
||||||
* allTargetList will contain all columns for the given rteRelation but for the ones
|
|
||||||
* that are not required, it will have NULL entries.
|
|
||||||
*/
|
*/
|
||||||
Query *
|
Query *
|
||||||
WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation, List *requiredAttributes,
|
WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation, List *requiredAttributes)
|
||||||
List **allTargetList)
|
|
||||||
{
|
{
|
||||||
Query *subquery = makeNode(Query);
|
Query *subquery = makeNode(Query);
|
||||||
RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
|
RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
|
||||||
|
@ -281,93 +282,150 @@ WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation, List *requiredAttributes
|
||||||
|
|
||||||
/* set the FROM expression to the subquery */
|
/* set the FROM expression to the subquery */
|
||||||
newRangeTableRef = makeNode(RangeTblRef);
|
newRangeTableRef = makeNode(RangeTblRef);
|
||||||
newRangeTableRef->rtindex = 1;
|
newRangeTableRef->rtindex = SINGLE_RTE_INDEX;
|
||||||
subquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
|
subquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
|
||||||
|
|
||||||
Relation relation = relation_open(rteRelation->relid, AccessShareLock);
|
subquery->targetList =
|
||||||
int numberOfAttributes = RelationGetNumberOfAttributes(relation);
|
CreateFilteredTargetListForRelation(rteRelation->relid, requiredAttributes);
|
||||||
|
|
||||||
bool shouldAssignDummyNullColumn = list_length(requiredAttributes) == 0;
|
if (list_length(subquery->targetList) == 0)
|
||||||
bool assignedDummyNullColumn = false;
|
|
||||||
int attributeNumber = 1;
|
|
||||||
int resultNo = 1;
|
|
||||||
for (; attributeNumber <= numberOfAttributes; attributeNumber++)
|
|
||||||
{
|
{
|
||||||
Form_pg_attribute attributeTuple =
|
/*
|
||||||
TupleDescAttr(relation->rd_att, attributeNumber - 1);
|
* in case there is no required column, we assign one dummy NULL target entry
|
||||||
Var *targetColumn =
|
* to the subquery targetList so that it has at least one target.
|
||||||
makeVar(newRangeTableRef->rtindex, attributeNumber, attributeTuple->atttypid,
|
* (targetlist should have at least one element)
|
||||||
attributeTuple->atttypmod, attributeTuple->attcollation, 0);
|
*/
|
||||||
TargetEntry *targetEntry =
|
subquery->targetList = CreateDummyTargetList(rteRelation->relid,
|
||||||
makeTargetEntry((Expr *) targetColumn, attributeNumber,
|
requiredAttributes);
|
||||||
strdup(attributeTuple->attname.data), false);
|
|
||||||
|
|
||||||
if (shouldAssignDummyNullColumn && !assignedDummyNullColumn)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* in case there is no required column, we assign one dummy NULL target entry
|
|
||||||
* to the subquery targetList so that it has at least one target.
|
|
||||||
* (targetlist should have at least one element)
|
|
||||||
*/
|
|
||||||
subquery->targetList = lappend(subquery->targetList, targetEntry);
|
|
||||||
assignedDummyNullColumn = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!list_member_int(requiredAttributes, attributeNumber))
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* We add a null target entry because we don't have an easy
|
|
||||||
* way of changing all the references to this column and
|
|
||||||
* we don't want to break postgres query.
|
|
||||||
*/
|
|
||||||
targetEntry->expr =
|
|
||||||
(Expr *) makeNullConst(attributeTuple->atttypid,
|
|
||||||
attributeTuple->atttypmod,
|
|
||||||
attributeTuple->attcollation);
|
|
||||||
*allTargetList = lappend(*allTargetList, targetEntry);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
TargetEntry *copyTargetEntry = copyObject(targetEntry);
|
|
||||||
*allTargetList = lappend(*allTargetList, copyTargetEntry);
|
|
||||||
|
|
||||||
/* In the subquery with only required attribute numbers, the result no
|
|
||||||
* corresponds to the ordinal index of it in targetList.
|
|
||||||
*/
|
|
||||||
targetEntry->resno = resultNo++;
|
|
||||||
subquery->targetList = lappend(subquery->targetList, targetEntry);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
MakeVarAttNosSequential(*allTargetList);
|
|
||||||
|
|
||||||
relation_close(relation, NoLock);
|
|
||||||
|
|
||||||
return subquery;
|
return subquery;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MakeVarAttNosSequential changes the attribute numbers of the given targetList
|
* CreateAllTargetListForRelation creates a target list which contains all the columns
|
||||||
* to sequential numbers, [1, 2, 3] ...
|
* of the given relation. If the column is not in required columns, then it is added
|
||||||
|
* as a NULL column.
|
||||||
*/
|
*/
|
||||||
static void
|
List *
|
||||||
MakeVarAttNosSequential(List *targetList)
|
CreateAllTargetListForRelation(Oid relationId, List *requiredAttributes)
|
||||||
{
|
{
|
||||||
TargetEntry *entry = NULL;
|
Relation relation = relation_open(relationId, AccessShareLock);
|
||||||
int attrNo = 1;
|
int numberOfAttributes = RelationGetNumberOfAttributes(relation);
|
||||||
foreach_ptr(entry, targetList)
|
|
||||||
{
|
|
||||||
if (IsA(entry->expr, Var))
|
|
||||||
{
|
|
||||||
Var *var = (Var *) entry->expr;
|
|
||||||
|
|
||||||
/*
|
List *targetList = NIL;
|
||||||
* the inner subquery is an intermediate result hence
|
int varAttrNo = 1;
|
||||||
* the attribute no's should be in ordinal order. [1, 2, 3...]
|
for (int attrNum = 1; attrNum <= numberOfAttributes; attrNum++)
|
||||||
*/
|
{
|
||||||
var->varattno = attrNo++;
|
Form_pg_attribute attributeTuple =
|
||||||
|
TupleDescAttr(relation->rd_att, attrNum - 1);
|
||||||
|
|
||||||
|
int resNo = attrNum;
|
||||||
|
if (!list_member_int(requiredAttributes, attrNum))
|
||||||
|
{
|
||||||
|
TargetEntry *nullTargetEntry =
|
||||||
|
CreateTargetEntryForNullCol(attributeTuple, resNo);
|
||||||
|
targetList = lappend(targetList, nullTargetEntry);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
TargetEntry *targetEntry =
|
||||||
|
CreateTargetEntryForColumn(attributeTuple, SINGLE_RTE_INDEX, varAttrNo++,
|
||||||
|
resNo);
|
||||||
|
targetList = lappend(targetList, targetEntry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
relation_close(relation, NoLock);
|
||||||
|
return targetList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateFilteredTargetListForRelation creates a target list which contains
|
||||||
|
* only the required columns of the given relation. If there is not required
|
||||||
|
* columns then a dummy NULL column is put as the only entry.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
CreateFilteredTargetListForRelation(Oid relationId, List *requiredAttributes)
|
||||||
|
{
|
||||||
|
Relation relation = relation_open(relationId, AccessShareLock);
|
||||||
|
int numberOfAttributes = RelationGetNumberOfAttributes(relation);
|
||||||
|
|
||||||
|
List *targetList = NIL;
|
||||||
|
int resultNo = 1;
|
||||||
|
for (int attrNum = 1; attrNum <= numberOfAttributes; attrNum++)
|
||||||
|
{
|
||||||
|
Form_pg_attribute attributeTuple =
|
||||||
|
TupleDescAttr(relation->rd_att, attrNum - 1);
|
||||||
|
|
||||||
|
if (list_member_int(requiredAttributes, attrNum))
|
||||||
|
{
|
||||||
|
/* In the subquery with only required attribute numbers, the result no
|
||||||
|
* corresponds to the ordinal index of it in targetList.
|
||||||
|
*/
|
||||||
|
TargetEntry *targetEntry =
|
||||||
|
CreateTargetEntryForColumn(attributeTuple, SINGLE_RTE_INDEX, attrNum,
|
||||||
|
resultNo++);
|
||||||
|
targetList = lappend(targetList, targetEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
relation_close(relation, NoLock);
|
||||||
|
return targetList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateDummyTargetList creates a target list which contains only a
|
||||||
|
* NULL entry.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
CreateDummyTargetList(Oid relationId, List *requiredAttributes)
|
||||||
|
{
|
||||||
|
Relation relation = relation_open(relationId, AccessShareLock);
|
||||||
|
|
||||||
|
Form_pg_attribute attributeTuple =
|
||||||
|
TupleDescAttr(relation->rd_att, 0);
|
||||||
|
TargetEntry *nullTargetEntry =
|
||||||
|
CreateTargetEntryForNullCol(attributeTuple, 1);
|
||||||
|
|
||||||
|
relation_close(relation, NoLock);
|
||||||
|
return list_make1(nullTargetEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateTargetEntryForColumn creates a target entry for the given
|
||||||
|
* column.
|
||||||
|
*/
|
||||||
|
static TargetEntry *
|
||||||
|
CreateTargetEntryForColumn(Form_pg_attribute attributeTuple, Index rteIndex,
|
||||||
|
int attributeNumber, int resno)
|
||||||
|
{
|
||||||
|
Var *targetColumn =
|
||||||
|
makeVar(rteIndex, attributeNumber, attributeTuple->atttypid,
|
||||||
|
attributeTuple->atttypmod, attributeTuple->attcollation, 0);
|
||||||
|
TargetEntry *targetEntry =
|
||||||
|
makeTargetEntry((Expr *) targetColumn, resno,
|
||||||
|
strdup(attributeTuple->attname.data), false);
|
||||||
|
return targetEntry;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateTargetEntryForNullCol creates a target entry that has a NULL expression.
|
||||||
|
*/
|
||||||
|
static TargetEntry *
|
||||||
|
CreateTargetEntryForNullCol(Form_pg_attribute attributeTuple, int resno)
|
||||||
|
{
|
||||||
|
Expr *nullExpr = (Expr *) makeNullConst(attributeTuple->atttypid,
|
||||||
|
attributeTuple->atttypmod,
|
||||||
|
attributeTuple->attcollation);
|
||||||
|
TargetEntry *targetEntry =
|
||||||
|
makeTargetEntry(nullExpr, resno,
|
||||||
|
strdup(attributeTuple->attname.data), false);
|
||||||
|
return targetEntry;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1403,9 +1403,10 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
|
||||||
List *requiredAttrNumbers,
|
List *requiredAttrNumbers,
|
||||||
RecursivePlanningContext *context)
|
RecursivePlanningContext *context)
|
||||||
{
|
{
|
||||||
List *outerQueryTargetList = NIL;
|
Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry, requiredAttrNumbers);
|
||||||
Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry, requiredAttrNumbers,
|
List *outerQueryTargetList = CreateAllTargetListForRelation(rangeTableEntry->relid,
|
||||||
&outerQueryTargetList);
|
requiredAttrNumbers);
|
||||||
|
|
||||||
List *restrictionList =
|
List *restrictionList =
|
||||||
GetRestrictInfoListForRelation(rangeTableEntry,
|
GetRestrictInfoListForRelation(rangeTableEntry,
|
||||||
context->plannerRestrictionContext);
|
context->plannerRestrictionContext);
|
||||||
|
|
|
@ -35,8 +35,7 @@ extern ColocatedJoinChecker CreateColocatedJoinChecker(Query *subquery,
|
||||||
restrictionContext);
|
restrictionContext);
|
||||||
extern bool SubqueryColocated(Query *subquery, ColocatedJoinChecker *context);
|
extern bool SubqueryColocated(Query *subquery, ColocatedJoinChecker *context);
|
||||||
extern Query * WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation,
|
extern Query * WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation,
|
||||||
List *requiredAttributes,
|
List *requiredAttributes);
|
||||||
List **allTargetList);
|
extern List * CreateAllTargetListForRelation(Oid relationId, List *requiredAttributes);
|
||||||
|
|
||||||
|
|
||||||
#endif /* QUERY_COLOCATION_CHECKER_H */
|
#endif /* QUERY_COLOCATION_CHECKER_H */
|
||||||
|
|
|
@ -665,16 +665,20 @@ select (SELECT local.id) FROM local, distributed WHERE distributed.id != 1 LIMIT
|
||||||
DEBUG: Wrapping relation "local" to a subquery
|
DEBUG: Wrapping relation "local" to a subquery
|
||||||
DEBUG: generating subplan XXX_1 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true
|
DEBUG: generating subplan XXX_1 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (SELECT local.id) AS id FROM (SELECT local_1.id, NULL::text AS title FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local, local_dist_join_mixed.distributed WHERE (distributed.id OPERATOR(pg_catalog.<>) 1) LIMIT 1
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (SELECT local.id) AS id FROM (SELECT local_1.id, NULL::text AS title FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local, local_dist_join_mixed.distributed WHERE (distributed.id OPERATOR(pg_catalog.<>) 1) LIMIT 1
|
||||||
ERROR: could not run distributed query with subquery outside the FROM, WHERE and HAVING clauses
|
DEBUG: push down of limit count: 1
|
||||||
HINT: Consider using an equality filter on the distributed table's partition column.
|
id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- currently not supported, but should work with https://github.com/citusdata/citus/pull/4360/files
|
-- currently not supported, but should work with https://github.com/citusdata/citus/pull/4360/files
|
||||||
SELECT
|
SELECT
|
||||||
name, (SELECT id FROM local WHERE id = e.id)
|
name, (SELECT id FROM local WHERE id = e.id)
|
||||||
FROM
|
FROM
|
||||||
distributed e
|
distributed e
|
||||||
ORDER BY 1,2 LIMIT 1;
|
ORDER BY 1,2 LIMIT 1;
|
||||||
ERROR: could not run distributed query with subquery outside the FROM, WHERE and HAVING clauses
|
ERROR: direct joins between distributed and local tables are not supported
|
||||||
HINT: Consider using an equality filter on the distributed table's partition column.
|
HINT: Use CTE's or subqueries to select from local tables and use them in joins
|
||||||
-- set operations
|
-- set operations
|
||||||
SELECT local.* FROM distributed JOIN local USING (id)
|
SELECT local.* FROM distributed JOIN local USING (id)
|
||||||
EXCEPT
|
EXCEPT
|
||||||
|
|
|
@ -224,30 +224,30 @@ SELECT count(*) FROM postgres_table JOIN (SELECT * FROM (SELECT * FROM distribut
|
||||||
SELECT count(*) FROM postgres_table JOIN (SELECT * FROM (SELECT * FROM distributed_table LIMIT 1) d1) d2 using (key) JOIN reference_table USING(key) JOIN citus_local USING (key) JOIN (SELECT * FROM citus_local) c1 USING (key) WHERE d2.key > 10 AND d2.key = 10;
|
SELECT count(*) FROM postgres_table JOIN (SELECT * FROM (SELECT * FROM distributed_table LIMIT 1) d1) d2 using (key) JOIN reference_table USING(key) JOIN citus_local USING (key) JOIN (SELECT * FROM citus_local) c1 USING (key) WHERE d2.key > 10 AND d2.key = 10;
|
||||||
|
|
||||||
|
|
||||||
SELECT
|
SELECT
|
||||||
COUNT(*)
|
COUNT(*)
|
||||||
FROM
|
FROM
|
||||||
postgres_table p1
|
postgres_table p1
|
||||||
JOIN
|
JOIN
|
||||||
distributed_partitioned_table dp1
|
distributed_partitioned_table dp1
|
||||||
USING (key)
|
USING (key)
|
||||||
JOIN
|
JOIN
|
||||||
distributed_table d1
|
distributed_table d1
|
||||||
USING (key)
|
USING (key)
|
||||||
JOIN
|
JOIN
|
||||||
citus_local c1
|
citus_local c1
|
||||||
USING (key)
|
USING (key)
|
||||||
JOIN
|
JOIN
|
||||||
postgres_table p2
|
postgres_table p2
|
||||||
USING (key)
|
USING (key)
|
||||||
JOIN
|
JOIN
|
||||||
reference_table r1
|
reference_table r1
|
||||||
USING (key)
|
USING (key)
|
||||||
JOIN
|
JOIN
|
||||||
distributed_table d2
|
distributed_table d2
|
||||||
USING (key)
|
USING (key)
|
||||||
JOIN
|
JOIN
|
||||||
citus_local c2
|
citus_local c2
|
||||||
USING (key);
|
USING (key);
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue