diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index d9535d1c6..c5206d0a8 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -66,14 +66,12 @@ typedef struct ConversionCandidates { List *distributedTableList; /* reference or distributed table */ List *localTableList; /* local or citus local table */ - bool hasSubqueryRTE; }ConversionCandidates; static Oid GetResultRelationId(Query *query); -static Oid GetRTEToSubqueryConverterReferenceRelId( - RangeTableEntryDetails *rangeTableEntryDetails); -static bool ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList, Oid - resultRelationId); +static bool ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList, + Oid resultRelationId, + RecursivePlanningContext *context); static bool HasUniqueFilter(RangeTblEntry *distRTE, List *distRTERestrictionList, List *requiredAttrNumbersForDistRTE); static bool ShouldConvertDistributedTable(FromExpr *joinTree, @@ -82,8 +80,8 @@ static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, RecursivePlanningContext *planningContext); static ConversionCandidates * CreateConversionCandidates( RecursivePlanningContext *context, - List * - rangeTableList); + List *rangeTableList, + Oid resultRelationId); static void GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes); static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(FromExpr *joinTree, ConversionCandidates @@ -98,12 +96,6 @@ static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableLis List **joinRangeTableEntries); static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid relationId); -static bool FillLocalAndDistributedRTECandidates( - ConversionCandidates *conversionCandidates, - RangeTableEntryDetails ** - localRTECandidate, - RangeTableEntryDetails ** - distributedRTECandidate); /* * ConvertUnplannableTableJoinsToSubqueries gets a query and the planner @@ -117,25 +109,18 @@ ConvertUnplannableTableJoinsToSubqueries(Query *query, List *rangeTableList = NIL; GetRangeTableEntriesFromJoinTree((Node *) query->jointree, query->rtable, &rangeTableList); - Oid resultRelationId = GetResultRelationId(query); - if (!ShouldConvertLocalTableJoinsToSubqueries(rangeTableList, resultRelationId)) - { - return; - } + Oid resultRelationId = GetResultRelationId(query); ConversionCandidates *conversionCandidates = - CreateConversionCandidates( - context, rangeTableList); + CreateConversionCandidates(context, rangeTableList, resultRelationId); RangeTableEntryDetails *rangeTableEntryDetails = GetNextRTEToConvertToSubquery(query->jointree, conversionCandidates, context->plannerRestrictionContext, resultRelationId); - PlannerRestrictionContext *plannerRestrictionContext = - FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query); - while (rangeTableEntryDetails && !IsRouterPlannable(query, - plannerRestrictionContext)) + while (ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList, + resultRelationId, context)) { ReplaceRTERelationWithRteSubquery( rangeTableEntryDetails->rangeTableEntry, @@ -146,6 +131,7 @@ ConvertUnplannableTableJoinsToSubqueries(Query *query, RemoveFromConversionCandidates(conversionCandidates, rangeTableEntryDetails-> rangeTableEntry->relid); + rangeTableEntryDetails = GetNextRTEToConvertToSubquery(query->jointree, conversionCandidates, context->plannerRestrictionContext, @@ -228,30 +214,27 @@ GetNextRTEToConvertToSubquery(FromExpr *joinTree, { RangeTableEntryDetails *localRTECandidate = NULL; RangeTableEntryDetails *distributedRTECandidate = NULL; - if (!FillLocalAndDistributedRTECandidates(conversionCandidates, - &localRTECandidate, - &distributedRTECandidate)) + + if (list_length(conversionCandidates->localTableList) > 0) { - return NULL; + localRTECandidate = linitial(conversionCandidates->localTableList); } - if (OidIsValid(resultRelationId)) + if (list_length(conversionCandidates->distributedTableList) > 0) { - if (resultRelationId == GetRTEToSubqueryConverterReferenceRelId( - localRTECandidate)) - { - return distributedRTECandidate; - } - if (resultRelationId == GetRTEToSubqueryConverterReferenceRelId( - distributedRTECandidate)) - { - return localRTECandidate; - } + distributedRTECandidate = linitial(conversionCandidates->distributedTableList); } if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL) { - return localRTECandidate; + if (localRTECandidate) + { + return localRTECandidate; + } + else + { + return distributedRTECandidate; + } } else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED) { @@ -266,7 +249,8 @@ GetNextRTEToConvertToSubquery(FromExpr *joinTree, } else { - if (ShouldConvertDistributedTable(joinTree, distributedRTECandidate)) + if (ShouldConvertDistributedTable(joinTree, distributedRTECandidate) || + localRTECandidate == NULL) { return distributedRTECandidate; } @@ -278,51 +262,6 @@ GetNextRTEToConvertToSubquery(FromExpr *joinTree, } -/* - * FillLocalAndDistributedRTECandidates fills the local and distributed RTE candidates. - * It returns true if we should continue converting tables to subqueries. - */ -static bool -FillLocalAndDistributedRTECandidates(ConversionCandidates *conversionCandidates, - RangeTableEntryDetails **localRTECandidate, - RangeTableEntryDetails ** - distributedRTECandidate) -{ - if (list_length(conversionCandidates->localTableList) > 0) - { - *localRTECandidate = linitial(conversionCandidates->localTableList); - } - if (*localRTECandidate == NULL) - { - return false; - } - - if (list_length(conversionCandidates->distributedTableList) > 0) - { - *distributedRTECandidate = linitial( - conversionCandidates->distributedTableList); - } - return *distributedRTECandidate != NULL || - conversionCandidates->hasSubqueryRTE; -} - - -/* - * GetRTEToSubqueryConverterReferenceRelId returns the underlying relation id - * if it is a valid one. - */ -static Oid -GetRTEToSubqueryConverterReferenceRelId(RangeTableEntryDetails *rangeTableEntryDetails) -{ - if (rangeTableEntryDetails && - rangeTableEntryDetails->rangeTableEntry) - { - return rangeTableEntryDetails->rangeTableEntry->relid; - } - return InvalidOid; -} - - /* * RemoveFromConversionCandidates removes an element from * the relevant list based on the relation id. @@ -348,7 +287,9 @@ RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid r * convert local-dist table joins to subqueries. */ static bool -ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList, Oid resultRelationId) +ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList, + Oid resultRelationId, + RecursivePlanningContext *context) { if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER) { @@ -359,6 +300,13 @@ ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList, Oid resultRelatio { return false; } + + PlannerRestrictionContext *plannerRestrictionContext = + FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query); + if (IsRouterPlannable(query, plannerRestrictionContext)) + { + return false; + } return true; } @@ -507,7 +455,7 @@ RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, */ static ConversionCandidates * CreateConversionCandidates(RecursivePlanningContext *context, - List *rangeTableList) + List *rangeTableList, Oid resultRelationId) { ConversionCandidates *conversionCandidates = palloc0( sizeof(ConversionCandidates)); @@ -517,10 +465,6 @@ CreateConversionCandidates(RecursivePlanningContext *context, foreach_ptr(rangeTableEntry, rangeTableList) { rteIndex++; - if (rangeTableEntry->rtekind == RTE_SUBQUERY) - { - conversionCandidates->hasSubqueryRTE = true; - } /* we're only interested in tables */ if (!IsRecursivelyPlannableRelation(rangeTableEntry)) @@ -528,6 +472,17 @@ CreateConversionCandidates(RecursivePlanningContext *context, continue; } + bool referenceOrDistributedTable = IsCitusTableType(rangeTableEntry->relid, + REFERENCE_TABLE) || + IsCitusTableType(rangeTableEntry->relid, + DISTRIBUTED_TABLE); + + /* result relation cannot converted to a subquery */ + if (resultRelationId == rangeTableEntry->relid) + { + continue; + } + RangeTableEntryDetails *rangeTableEntryDetails = palloc0( sizeof(RangeTableEntryDetails)); rangeTableEntryDetails->rangeTableEntry = rangeTableEntry; @@ -539,10 +494,6 @@ CreateConversionCandidates(RecursivePlanningContext *context, rangeTableEntryDetails->requiredAttributeNumbers = RequiredAttrNumbersForRelation( rangeTableEntry, context); - bool referenceOrDistributedTable = IsCitusTableType(rangeTableEntry->relid, - REFERENCE_TABLE) || - IsCitusTableType(rangeTableEntry->relid, - DISTRIBUTED_TABLE); if (referenceOrDistributedTable) { conversionCandidates->distributedTableList = diff --git a/src/test/regress/expected/local_table_join.out b/src/test/regress/expected/local_table_join.out index 9d2ebb12a..790f79c77 100644 --- a/src/test/regress/expected/local_table_join.out +++ b/src/test/regress/expected/local_table_join.out @@ -847,9 +847,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c SELECT count(*) FROM citus_local JOIN distributed_table USING(key) JOIN postgres_table USING (key) JOIN reference_table USING(key); DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 -DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0 -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table USING (key)) JOIN local_table_join.postgres_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) reference_table USING (key)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table USING (key)) JOIN local_table_join.postgres_table USING (key)) JOIN local_table_join.reference_table USING (key)) count --------------------------------------------------------------------- 0 @@ -859,9 +857,7 @@ SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key JOIN citus_local USING(key) WHERE distributed_partitioned_table.key > 10 and distributed_partitioned_table.key = 10; DEBUG: Wrapping local relation "distributed_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.distributed_partitioned_table WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.distributed_partitioned_table WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 -DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 -DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) distributed_partitioned_table JOIN local_table_join.postgres_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) reference_table USING (key)) JOIN local_table_join.citus_local USING (key)) WHERE ((distributed_partitioned_table.key OPERATOR(pg_catalog.>) 10) AND (distributed_partitioned_table.key OPERATOR(pg_catalog.=) 10)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) distributed_partitioned_table JOIN local_table_join.postgres_table USING (key)) JOIN local_table_join.reference_table USING (key)) JOIN local_table_join.citus_local USING (key)) WHERE ((distributed_partitioned_table.key OPERATOR(pg_catalog.>) 10) AND (distributed_partitioned_table.key OPERATOR(pg_catalog.=) 10)) count --------------------------------------------------------------------- 0