Use ShouldConvertLocalTableJoinsToSubqueries

Remove FillLocalAndDistributedRTECandidates and use
ShouldConvertLocalTableJoinsToSubqueries, which simplifies things as we
rely on a single function to decide whether we should continue
converting RTE to subquery.
pull/4358/head
Sait Talha Nisanci 2020-12-01 18:09:50 +03:00
parent eebcd995b3
commit 3fe3c55023
2 changed files with 49 additions and 102 deletions

View File

@ -66,14 +66,12 @@ typedef struct ConversionCandidates
{ {
List *distributedTableList; /* reference or distributed table */ List *distributedTableList; /* reference or distributed table */
List *localTableList; /* local or citus local table */ List *localTableList; /* local or citus local table */
bool hasSubqueryRTE;
}ConversionCandidates; }ConversionCandidates;
static Oid GetResultRelationId(Query *query); static Oid GetResultRelationId(Query *query);
static Oid GetRTEToSubqueryConverterReferenceRelId( static bool ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList,
RangeTableEntryDetails *rangeTableEntryDetails); Oid resultRelationId,
static bool ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList, Oid RecursivePlanningContext *context);
resultRelationId);
static bool HasUniqueFilter(RangeTblEntry *distRTE, List *distRTERestrictionList, static bool HasUniqueFilter(RangeTblEntry *distRTE, List *distRTERestrictionList,
List *requiredAttrNumbersForDistRTE); List *requiredAttrNumbersForDistRTE);
static bool ShouldConvertDistributedTable(FromExpr *joinTree, static bool ShouldConvertDistributedTable(FromExpr *joinTree,
@ -82,8 +80,8 @@ static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
RecursivePlanningContext *planningContext); RecursivePlanningContext *planningContext);
static ConversionCandidates * CreateConversionCandidates( static ConversionCandidates * CreateConversionCandidates(
RecursivePlanningContext *context, RecursivePlanningContext *context,
List * List *rangeTableList,
rangeTableList); Oid resultRelationId);
static void GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes); static void GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes);
static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(FromExpr *joinTree, static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(FromExpr *joinTree,
ConversionCandidates ConversionCandidates
@ -98,12 +96,6 @@ static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableLis
List **joinRangeTableEntries); List **joinRangeTableEntries);
static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid
relationId); relationId);
static bool FillLocalAndDistributedRTECandidates(
ConversionCandidates *conversionCandidates,
RangeTableEntryDetails **
localRTECandidate,
RangeTableEntryDetails **
distributedRTECandidate);
/* /*
* ConvertUnplannableTableJoinsToSubqueries gets a query and the planner * ConvertUnplannableTableJoinsToSubqueries gets a query and the planner
@ -117,25 +109,18 @@ ConvertUnplannableTableJoinsToSubqueries(Query *query,
List *rangeTableList = NIL; List *rangeTableList = NIL;
GetRangeTableEntriesFromJoinTree((Node *) query->jointree, query->rtable, GetRangeTableEntriesFromJoinTree((Node *) query->jointree, query->rtable,
&rangeTableList); &rangeTableList);
Oid resultRelationId = GetResultRelationId(query);
if (!ShouldConvertLocalTableJoinsToSubqueries(rangeTableList, resultRelationId))
{
return;
}
Oid resultRelationId = GetResultRelationId(query);
ConversionCandidates *conversionCandidates = ConversionCandidates *conversionCandidates =
CreateConversionCandidates( CreateConversionCandidates(context, rangeTableList, resultRelationId);
context, rangeTableList);
RangeTableEntryDetails *rangeTableEntryDetails = RangeTableEntryDetails *rangeTableEntryDetails =
GetNextRTEToConvertToSubquery(query->jointree, conversionCandidates, GetNextRTEToConvertToSubquery(query->jointree, conversionCandidates,
context->plannerRestrictionContext, context->plannerRestrictionContext,
resultRelationId); resultRelationId);
PlannerRestrictionContext *plannerRestrictionContext = while (ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList,
FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query); resultRelationId, context))
while (rangeTableEntryDetails && !IsRouterPlannable(query,
plannerRestrictionContext))
{ {
ReplaceRTERelationWithRteSubquery( ReplaceRTERelationWithRteSubquery(
rangeTableEntryDetails->rangeTableEntry, rangeTableEntryDetails->rangeTableEntry,
@ -146,6 +131,7 @@ ConvertUnplannableTableJoinsToSubqueries(Query *query,
RemoveFromConversionCandidates(conversionCandidates, RemoveFromConversionCandidates(conversionCandidates,
rangeTableEntryDetails-> rangeTableEntryDetails->
rangeTableEntry->relid); rangeTableEntry->relid);
rangeTableEntryDetails = rangeTableEntryDetails =
GetNextRTEToConvertToSubquery(query->jointree, conversionCandidates, GetNextRTEToConvertToSubquery(query->jointree, conversionCandidates,
context->plannerRestrictionContext, context->plannerRestrictionContext,
@ -228,30 +214,27 @@ GetNextRTEToConvertToSubquery(FromExpr *joinTree,
{ {
RangeTableEntryDetails *localRTECandidate = NULL; RangeTableEntryDetails *localRTECandidate = NULL;
RangeTableEntryDetails *distributedRTECandidate = NULL; RangeTableEntryDetails *distributedRTECandidate = NULL;
if (!FillLocalAndDistributedRTECandidates(conversionCandidates,
&localRTECandidate, if (list_length(conversionCandidates->localTableList) > 0)
&distributedRTECandidate))
{ {
return NULL; localRTECandidate = linitial(conversionCandidates->localTableList);
} }
if (OidIsValid(resultRelationId)) if (list_length(conversionCandidates->distributedTableList) > 0)
{ {
if (resultRelationId == GetRTEToSubqueryConverterReferenceRelId( distributedRTECandidate = linitial(conversionCandidates->distributedTableList);
localRTECandidate))
{
return distributedRTECandidate;
}
if (resultRelationId == GetRTEToSubqueryConverterReferenceRelId(
distributedRTECandidate))
{
return localRTECandidate;
}
} }
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL) if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL)
{ {
return localRTECandidate; if (localRTECandidate)
{
return localRTECandidate;
}
else
{
return distributedRTECandidate;
}
} }
else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED) else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED)
{ {
@ -266,7 +249,8 @@ GetNextRTEToConvertToSubquery(FromExpr *joinTree,
} }
else else
{ {
if (ShouldConvertDistributedTable(joinTree, distributedRTECandidate)) if (ShouldConvertDistributedTable(joinTree, distributedRTECandidate) ||
localRTECandidate == NULL)
{ {
return distributedRTECandidate; return distributedRTECandidate;
} }
@ -278,51 +262,6 @@ GetNextRTEToConvertToSubquery(FromExpr *joinTree,
} }
/*
* FillLocalAndDistributedRTECandidates fills the local and distributed RTE candidates.
* It returns true if we should continue converting tables to subqueries.
*/
static bool
FillLocalAndDistributedRTECandidates(ConversionCandidates *conversionCandidates,
RangeTableEntryDetails **localRTECandidate,
RangeTableEntryDetails **
distributedRTECandidate)
{
if (list_length(conversionCandidates->localTableList) > 0)
{
*localRTECandidate = linitial(conversionCandidates->localTableList);
}
if (*localRTECandidate == NULL)
{
return false;
}
if (list_length(conversionCandidates->distributedTableList) > 0)
{
*distributedRTECandidate = linitial(
conversionCandidates->distributedTableList);
}
return *distributedRTECandidate != NULL ||
conversionCandidates->hasSubqueryRTE;
}
/*
* GetRTEToSubqueryConverterReferenceRelId returns the underlying relation id
* if it is a valid one.
*/
static Oid
GetRTEToSubqueryConverterReferenceRelId(RangeTableEntryDetails *rangeTableEntryDetails)
{
if (rangeTableEntryDetails &&
rangeTableEntryDetails->rangeTableEntry)
{
return rangeTableEntryDetails->rangeTableEntry->relid;
}
return InvalidOid;
}
/* /*
* RemoveFromConversionCandidates removes an element from * RemoveFromConversionCandidates removes an element from
* the relevant list based on the relation id. * the relevant list based on the relation id.
@ -348,7 +287,9 @@ RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid r
* convert local-dist table joins to subqueries. * convert local-dist table joins to subqueries.
*/ */
static bool static bool
ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList, Oid resultRelationId) ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList,
Oid resultRelationId,
RecursivePlanningContext *context)
{ {
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER) if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER)
{ {
@ -359,6 +300,13 @@ ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList, Oid resultRelatio
{ {
return false; return false;
} }
PlannerRestrictionContext *plannerRestrictionContext =
FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query);
if (IsRouterPlannable(query, plannerRestrictionContext))
{
return false;
}
return true; return true;
} }
@ -507,7 +455,7 @@ RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
*/ */
static ConversionCandidates * static ConversionCandidates *
CreateConversionCandidates(RecursivePlanningContext *context, CreateConversionCandidates(RecursivePlanningContext *context,
List *rangeTableList) List *rangeTableList, Oid resultRelationId)
{ {
ConversionCandidates *conversionCandidates = palloc0( ConversionCandidates *conversionCandidates = palloc0(
sizeof(ConversionCandidates)); sizeof(ConversionCandidates));
@ -517,10 +465,6 @@ CreateConversionCandidates(RecursivePlanningContext *context,
foreach_ptr(rangeTableEntry, rangeTableList) foreach_ptr(rangeTableEntry, rangeTableList)
{ {
rteIndex++; rteIndex++;
if (rangeTableEntry->rtekind == RTE_SUBQUERY)
{
conversionCandidates->hasSubqueryRTE = true;
}
/* we're only interested in tables */ /* we're only interested in tables */
if (!IsRecursivelyPlannableRelation(rangeTableEntry)) if (!IsRecursivelyPlannableRelation(rangeTableEntry))
@ -528,6 +472,17 @@ CreateConversionCandidates(RecursivePlanningContext *context,
continue; continue;
} }
bool referenceOrDistributedTable = IsCitusTableType(rangeTableEntry->relid,
REFERENCE_TABLE) ||
IsCitusTableType(rangeTableEntry->relid,
DISTRIBUTED_TABLE);
/* result relation cannot converted to a subquery */
if (resultRelationId == rangeTableEntry->relid)
{
continue;
}
RangeTableEntryDetails *rangeTableEntryDetails = palloc0( RangeTableEntryDetails *rangeTableEntryDetails = palloc0(
sizeof(RangeTableEntryDetails)); sizeof(RangeTableEntryDetails));
rangeTableEntryDetails->rangeTableEntry = rangeTableEntry; rangeTableEntryDetails->rangeTableEntry = rangeTableEntry;
@ -539,10 +494,6 @@ CreateConversionCandidates(RecursivePlanningContext *context,
rangeTableEntryDetails->requiredAttributeNumbers = RequiredAttrNumbersForRelation( rangeTableEntryDetails->requiredAttributeNumbers = RequiredAttrNumbersForRelation(
rangeTableEntry, context); rangeTableEntry, context);
bool referenceOrDistributedTable = IsCitusTableType(rangeTableEntry->relid,
REFERENCE_TABLE) ||
IsCitusTableType(rangeTableEntry->relid,
DISTRIBUTED_TABLE);
if (referenceOrDistributedTable) if (referenceOrDistributedTable)
{ {
conversionCandidates->distributedTableList = conversionCandidates->distributedTableList =

View File

@ -847,9 +847,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
SELECT count(*) FROM citus_local JOIN distributed_table USING(key) JOIN postgres_table USING (key) JOIN reference_table USING(key); SELECT count(*) FROM citus_local JOIN distributed_table USING(key) JOIN postgres_table USING (key) JOIN reference_table USING(key);
DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0
DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table USING (key)) JOIN local_table_join.postgres_table USING (key)) JOIN local_table_join.reference_table USING (key))
DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table USING (key)) JOIN local_table_join.postgres_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) reference_table USING (key))
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
@ -859,9 +857,7 @@ SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key
JOIN citus_local USING(key) WHERE distributed_partitioned_table.key > 10 and distributed_partitioned_table.key = 10; JOIN citus_local USING(key) WHERE distributed_partitioned_table.key > 10 and distributed_partitioned_table.key = 10;
DEBUG: Wrapping local relation "distributed_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.distributed_partitioned_table WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 DEBUG: Wrapping local relation "distributed_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.distributed_partitioned_table WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.distributed_partitioned_table WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.distributed_partitioned_table WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0
DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) distributed_partitioned_table JOIN local_table_join.postgres_table USING (key)) JOIN local_table_join.reference_table USING (key)) JOIN local_table_join.citus_local USING (key)) WHERE ((distributed_partitioned_table.key OPERATOR(pg_catalog.>) 10) AND (distributed_partitioned_table.key OPERATOR(pg_catalog.=) 10))
DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) distributed_partitioned_table JOIN local_table_join.postgres_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) reference_table USING (key)) JOIN local_table_join.citus_local USING (key)) WHERE ((distributed_partitioned_table.key OPERATOR(pg_catalog.>) 10) AND (distributed_partitioned_table.key OPERATOR(pg_catalog.=) 10))
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0