mirror of https://github.com/citusdata/citus.git
Update vars in quals while wrapping RTE to subquery
When we wrap an RTE to subquery we are updating the variables varno's as 1, however we should also update the varno's of vars in quals. Also some other small code quality improvements are done.pull/4358/head
parent
0689f2ac1a
commit
7e9204eba9
|
@ -60,7 +60,7 @@ typedef struct RangeTableEntryDetails
|
|||
Index rteIndex;
|
||||
List *restrictionList;
|
||||
List *requiredAttributeNumbers;
|
||||
bool hasUniqueIndex;
|
||||
bool hasConstantFilterOnUniqueColumn;
|
||||
} RangeTableEntryDetails;
|
||||
|
||||
typedef struct ConversionCandidates
|
||||
|
@ -69,13 +69,13 @@ typedef struct ConversionCandidates
|
|||
List *localTableList; /* local or citus local table */
|
||||
}ConversionCandidates;
|
||||
|
||||
static Oid GetResultRelationId(Query *query);
|
||||
static bool ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList,
|
||||
Oid resultRelationId,
|
||||
PlannerRestrictionContext *
|
||||
plannerRestrictionContext);
|
||||
static bool HasUniqueIndex(FromExpr *joinTree,
|
||||
RangeTblEntry *rangeTableEntry, Index rteIndex);
|
||||
static bool HasConstantFilterOnUniqueColumn(FromExpr *joinTree,
|
||||
RangeTblEntry *rangeTableEntry, Index
|
||||
rteIndex);
|
||||
static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
|
||||
PlannerRestrictionContext *
|
||||
plannerRestrictionContext);
|
||||
|
@ -113,7 +113,12 @@ ConvertUnplannableTableJoinsToSubqueries(Query *query,
|
|||
GetRangeTableEntriesFromJoinTree((Node *) query->jointree, query->rtable,
|
||||
&rangeTableList);
|
||||
|
||||
Oid resultRelationId = GetResultRelationId(query);
|
||||
Oid resultRelationId = InvalidOid;
|
||||
if (IsModifyCommand(query))
|
||||
{
|
||||
resultRelationId = ModifyQueryResultRelationId(query);
|
||||
}
|
||||
|
||||
if (!ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList, resultRelationId,
|
||||
plannerRestrictionContext))
|
||||
{
|
||||
|
@ -125,7 +130,7 @@ ConvertUnplannableTableJoinsToSubqueries(Query *query,
|
|||
|
||||
RangeTableEntryDetails *rangeTableEntryDetails =
|
||||
GetNextRTEToConvertToSubquery(query->jointree, conversionCandidates,
|
||||
context->plannerRestrictionContext,
|
||||
plannerRestrictionContext,
|
||||
resultRelationId);
|
||||
|
||||
while (ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList,
|
||||
|
@ -135,38 +140,20 @@ ConvertUnplannableTableJoinsToSubqueries(Query *query,
|
|||
ReplaceRTERelationWithRteSubquery(
|
||||
rangeTableEntryDetails->rangeTableEntry,
|
||||
rangeTableEntryDetails->restrictionList,
|
||||
rangeTableEntryDetails->
|
||||
requiredAttributeNumbers,
|
||||
rangeTableEntryDetails->requiredAttributeNumbers,
|
||||
context);
|
||||
RemoveFromConversionCandidates(conversionCandidates,
|
||||
rangeTableEntryDetails->
|
||||
rangeTableEntry->relid);
|
||||
RemoveFromConversionCandidates(
|
||||
conversionCandidates,
|
||||
rangeTableEntryDetails->rangeTableEntry->relid);
|
||||
|
||||
rangeTableEntryDetails =
|
||||
GetNextRTEToConvertToSubquery(query->jointree, conversionCandidates,
|
||||
context->plannerRestrictionContext,
|
||||
plannerRestrictionContext,
|
||||
resultRelationId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetResultRelationId gets the result relation id from query
|
||||
* if it exists.
|
||||
*/
|
||||
static Oid
|
||||
GetResultRelationId(Query *query)
|
||||
{
|
||||
RangeTblEntry *resultRelation = ExtractResultRelationRTE(query);
|
||||
Oid resultRelationId = InvalidOid;
|
||||
if (resultRelation)
|
||||
{
|
||||
resultRelationId = resultRelation->relid;
|
||||
}
|
||||
return resultRelationId;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetRangeTableEntriesFromJoinTree gets the range table entries that are
|
||||
* on the given join tree.
|
||||
|
@ -269,7 +256,7 @@ AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList)
|
|||
RangeTableEntryDetails *rangeTableEntryDetails = NULL;
|
||||
foreach_ptr(rangeTableEntryDetails, rangeTableEntryDetailsList)
|
||||
{
|
||||
if (!rangeTableEntryDetails->hasUniqueIndex)
|
||||
if (!rangeTableEntryDetails->hasConstantFilterOnUniqueColumn)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -329,11 +316,12 @@ ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList,
|
|||
|
||||
|
||||
/*
|
||||
* HasUniqueIndex returns true if the given rangeTableEntry has a constant
|
||||
* HasConstantFilterOnUniqueColumn returns true if the given rangeTableEntry has a constant
|
||||
* filter on a unique column.
|
||||
*/
|
||||
static bool
|
||||
HasUniqueIndex(FromExpr *joinTree, RangeTblEntry *rangeTableEntry, Index rteIndex)
|
||||
HasConstantFilterOnUniqueColumn(FromExpr *joinTree, RangeTblEntry *rangeTableEntry, Index
|
||||
rteIndex)
|
||||
{
|
||||
if (rangeTableEntry == NULL)
|
||||
{
|
||||
|
@ -348,8 +336,8 @@ HasUniqueIndex(FromExpr *joinTree, RangeTblEntry *rangeTableEntry, Index rteInde
|
|||
if (IsA(join, JoinExpr))
|
||||
{
|
||||
JoinExpr *joinExpr = (JoinExpr *) join;
|
||||
List *joinExprEqualityQuals = FetchEqualityAttrNumsForRTEFromQuals(
|
||||
joinExpr->quals, rteIndex);
|
||||
List *joinExprEqualityQuals =
|
||||
FetchEqualityAttrNumsForRTEFromQuals(joinExpr->quals, rteIndex);
|
||||
rteEqualityQuals = list_concat(rteEqualityQuals, joinExprEqualityQuals);
|
||||
}
|
||||
}
|
||||
|
@ -483,7 +471,8 @@ CreateConversionCandidates(FromExpr *joinTree,
|
|||
rangeTableEntry, plannerRestrictionContext, 1);
|
||||
rangeTableEntryDetails->requiredAttributeNumbers = RequiredAttrNumbersForRelation(
|
||||
rangeTableEntry, plannerRestrictionContext);
|
||||
rangeTableEntryDetails->hasUniqueIndex = HasUniqueIndex(joinTree,
|
||||
rangeTableEntryDetails->hasConstantFilterOnUniqueColumn =
|
||||
HasConstantFilterOnUniqueColumn(joinTree,
|
||||
rangeTableEntry,
|
||||
rteIndex);
|
||||
|
||||
|
|
|
@ -185,6 +185,7 @@ static Query * BuildReadIntermediateResultsQuery(List *targetEntryList,
|
|||
List *columnAliasList,
|
||||
Const *resultIdConst, Oid functionOid,
|
||||
bool useBinaryCopyFormat);
|
||||
static void UpdateVarNosInQualForSubquery(Node *node);
|
||||
|
||||
/*
|
||||
* GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs.
|
||||
|
@ -1360,6 +1361,7 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict
|
|||
Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry, requiredAttrNumbers);
|
||||
Expr *andedBoundExpressions = make_ands_explicit(restrictionList);
|
||||
subquery->jointree->quals = (Node *) andedBoundExpressions;
|
||||
UpdateVarNosInQualForSubquery(subquery->jointree->quals);
|
||||
|
||||
/* force recursively planning of the newly created subquery */
|
||||
subquery->limitOffset = (Node *) MakeIntegerConst(0);
|
||||
|
@ -1379,7 +1381,8 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict
|
|||
{
|
||||
StringInfo subqueryString = makeStringInfo();
|
||||
|
||||
pg_get_query_def(subquery, subqueryString);
|
||||
pg_get_query_def(subquery,
|
||||
subqueryString);
|
||||
|
||||
ereport(DEBUG1, (errmsg("Wrapping local relation \"%s\" to a subquery: %s ",
|
||||
get_rel_name(rangeTableEntry->relid),
|
||||
|
@ -1389,6 +1392,48 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdateVarNosInQualForSubquery iterates the Vars in the
|
||||
* given quals node and updates the varno's as 1 as there
|
||||
* will be only one RTE in rtable, which is the subquery.
|
||||
*/
|
||||
static void
|
||||
UpdateVarNosInQualForSubquery(Node *node)
|
||||
{
|
||||
if (node == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (IsA(node, Var))
|
||||
{
|
||||
Var *var = (Var *) node;
|
||||
|
||||
/* we update the varno as 1 as there is only one subquery */
|
||||
var->varno = 1;
|
||||
}
|
||||
else if (IsA(node, OpExpr))
|
||||
{
|
||||
OpExpr *opExpr = (OpExpr *) node;
|
||||
Var *leftColumn = LeftColumnOrNULL(opExpr);
|
||||
Var *rightColumn = RightColumnOrNULL(opExpr);
|
||||
UpdateVarNosInQualForSubquery((Node *) leftColumn);
|
||||
UpdateVarNosInQualForSubquery((Node *) rightColumn);
|
||||
}
|
||||
else if (IsA(node, BoolExpr))
|
||||
{
|
||||
BoolExpr *boolExpr = (BoolExpr *) node;
|
||||
List *argumentList = boolExpr->args;
|
||||
|
||||
Node *arg = NULL;
|
||||
foreach_ptr(arg, argumentList)
|
||||
{
|
||||
UpdateVarNosInQualForSubquery(arg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ContainsTableToBeConvertedToSubquery checks if the given range table list contains
|
||||
* any table that should be converted to a subquery, which otherwise is not plannable.
|
||||
|
|
|
@ -334,6 +334,33 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
|
|||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM (SELECT *, random() FROM distributed_table) as d1 JOIN postgres_table ON (postgres_table.key = d1.key AND d1.key < postgres_table.key) WHERE d1.key = 1 AND false;
|
||||
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE ((key OPERATOR(pg_catalog.<) key) AND false) OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE ((key OPERATOR(pg_catalog.<) key) AND false) OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table.key, distributed_table.value, distributed_table.value_2, random() AS random FROM local_table_join.distributed_table) d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table ON (((postgres_table.key OPERATOR(pg_catalog.=) d1.key) AND (d1.key OPERATOR(pg_catalog.<) postgres_table.key)))) WHERE ((d1.key OPERATOR(pg_catalog.=) 1) AND false)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM (SELECT *, random() FROM distributed_table_pkey) as d1 JOIN postgres_table ON (postgres_table.key = d1.key AND d1.key < postgres_table.key) WHERE d1.key = 1 AND false;
|
||||
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE ((key OPERATOR(pg_catalog.<) key) AND false) OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE ((key OPERATOR(pg_catalog.<) key) AND false) OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table_pkey.key, distributed_table_pkey.value, distributed_table_pkey.value_2, random() AS random FROM local_table_join.distributed_table_pkey) d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table ON (((postgres_table.key OPERATOR(pg_catalog.=) d1.key) AND (d1.key OPERATOR(pg_catalog.<) postgres_table.key)))) WHERE ((d1.key OPERATOR(pg_catalog.=) 1) AND false)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM (SELECT *, random() FROM distributed_partitioned_table) as d1 JOIN postgres_table ON (postgres_table.key = d1.key AND d1.key < postgres_table.key) WHERE d1.key = 1 AND false;
|
||||
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE ((key OPERATOR(pg_catalog.<) key) AND false) OFFSET 0
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE ((key OPERATOR(pg_catalog.<) key) AND false) OFFSET 0
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_partitioned_table.key, distributed_partitioned_table.value, random() AS random FROM local_table_join.distributed_partitioned_table) d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table ON (((postgres_table.key OPERATOR(pg_catalog.=) d1.key) AND (d1.key OPERATOR(pg_catalog.<) postgres_table.key)))) WHERE ((d1.key OPERATOR(pg_catalog.=) 1) AND false)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- TODO:: We should probably recursively plan postgres table here because primary key is on key,value not key.
|
||||
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10;
|
||||
DEBUG: Wrapping local relation "distributed_table_composite" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
|
||||
|
|
|
@ -91,6 +91,11 @@ SELECT COUNT(*) FROM postgres_table join local_partitioned_table using(key) join
|
|||
SELECT COUNT(*) FROM postgres_table join distributed_table using(key) join local_partitioned_table using(key) join distributed_table_pkey using(key) where distributed_table_pkey.key = 5;
|
||||
SELECT COUNT(*) FROM distributed_table_pkey join distributed_table using(key) join postgres_table using(key) join local_partitioned_table using(key) where distributed_table_pkey.key = 5;
|
||||
|
||||
SELECT count(*) FROM (SELECT *, random() FROM distributed_table) as d1 JOIN postgres_table ON (postgres_table.key = d1.key AND d1.key < postgres_table.key) WHERE d1.key = 1 AND false;
|
||||
SELECT count(*) FROM (SELECT *, random() FROM distributed_table_pkey) as d1 JOIN postgres_table ON (postgres_table.key = d1.key AND d1.key < postgres_table.key) WHERE d1.key = 1 AND false;
|
||||
SELECT count(*) FROM (SELECT *, random() FROM distributed_partitioned_table) as d1 JOIN postgres_table ON (postgres_table.key = d1.key AND d1.key < postgres_table.key) WHERE d1.key = 1 AND false;
|
||||
|
||||
|
||||
-- TODO:: We should probably recursively plan postgres table here because primary key is on key,value not key.
|
||||
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10;
|
||||
|
||||
|
|
Loading…
Reference in New Issue