diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index a9c4b64e6..e0b74b409 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -60,6 +60,7 @@ typedef struct RangeTableEntryDetails Index rteIndex; List *restrictionList; List *requiredAttributeNumbers; + bool hasUniqueIndex; } RangeTableEntryDetails; typedef struct ConversionCandidates @@ -71,26 +72,30 @@ typedef struct ConversionCandidates static Oid GetResultRelationId(Query *query); static bool ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList, Oid resultRelationId, - PlannerRestrictionContext *plannerRestrictionContext); -static bool HasUniqueFilter(RangeTblEntry *distRTE, List *distRTERestrictionList, - List *requiredAttrNumbersForDistRTE); -static bool ShouldConvertDistributedTable(FromExpr *joinTree, - RangeTableEntryDetails *distRTEContext); + PlannerRestrictionContext * + plannerRestrictionContext); +static bool HasUniqueIndex(FromExpr *joinTree, + RangeTblEntry *rangeTableEntry, Index rteIndex); static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, - PlannerRestrictionContext *plannerRestrictionContext); -static ConversionCandidates * CreateConversionCandidates( - PlannerRestrictionContext *plannerRestrictionContext, - List *rangeTableList, - Oid resultRelationId); + PlannerRestrictionContext * + plannerRestrictionContext); +static ConversionCandidates * CreateConversionCandidates(FromExpr *joinTree, + PlannerRestrictionContext * + plannerRestrictionContext, + List *rangeTableList, + Oid resultRelationId); static void GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes); static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(FromExpr *joinTree, - ConversionCandidates *conversionCandidates, - PlannerRestrictionContext* plannerRestrictionContext, + ConversionCandidates * + conversionCandidates, + PlannerRestrictionContext * + plannerRestrictionContext, Oid resultRelationId); static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList, List **joinRangeTableEntries); static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid relationId); +static bool AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList); /* * ConvertUnplannableTableJoinsToSubqueries gets a query and the planner @@ -101,18 +106,22 @@ void ConvertUnplannableTableJoinsToSubqueries(Query *query, RecursivePlanningContext *context) { - PlannerRestrictionContext* plannerRestrictionContext = context->plannerRestrictionContext; + PlannerRestrictionContext *plannerRestrictionContext = + context->plannerRestrictionContext; List *rangeTableList = NIL; GetRangeTableEntriesFromJoinTree((Node *) query->jointree, query->rtable, &rangeTableList); Oid resultRelationId = GetResultRelationId(query); - if (!ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList, resultRelationId, plannerRestrictionContext)) { + if (!ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList, resultRelationId, + plannerRestrictionContext)) + { return; } ConversionCandidates *conversionCandidates = - CreateConversionCandidates(plannerRestrictionContext, rangeTableList, resultRelationId); + CreateConversionCandidates(query->jointree, plannerRestrictionContext, + rangeTableList, resultRelationId); RangeTableEntryDetails *rangeTableEntryDetails = GetNextRTEToConvertToSubquery(query->jointree, conversionCandidates, @@ -120,7 +129,8 @@ ConvertUnplannableTableJoinsToSubqueries(Query *query, resultRelationId); while (ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList, - resultRelationId, plannerRestrictionContext)) + resultRelationId, + plannerRestrictionContext)) { ReplaceRTERelationWithRteSubquery( rangeTableEntryDetails->rangeTableEntry, @@ -219,7 +229,6 @@ GetNextRTEToConvertToSubquery(FromExpr *joinTree, { localRTECandidate = linitial(conversionCandidates->localTableList); } - if (list_length(conversionCandidates->distributedTableList) > 0) { distributedRTECandidate = linitial(conversionCandidates->distributedTableList); @@ -227,41 +236,48 @@ GetNextRTEToConvertToSubquery(FromExpr *joinTree, if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL) { - if (localRTECandidate) - { - return localRTECandidate; - } - else - { - return distributedRTECandidate; - } + return localRTECandidate ? localRTECandidate : distributedRTECandidate; } else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED) { - if (distributedRTECandidate) - { - return distributedRTECandidate; - } - else - { - return localRTECandidate; - } + return distributedRTECandidate ? distributedRTECandidate : localRTECandidate; } else { - if (ShouldConvertDistributedTable(joinTree, distributedRTECandidate) || - localRTECandidate == NULL) + bool allRangeTableEntriesHaveUniqueIndex = AllRangeTableEntriesHaveUniqueIndex( + conversionCandidates->distributedTableList); + + if (allRangeTableEntriesHaveUniqueIndex) { - return distributedRTECandidate; + return distributedRTECandidate ? distributedRTECandidate : localRTECandidate; } else { - return localRTECandidate; + return localRTECandidate ? localRTECandidate : distributedRTECandidate; } } } +/* + * AllRangeTableEntriesHaveUniqueIndex returns true if all of the RTE's in the given + * list have a unique index. + */ +static bool +AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList) +{ + RangeTableEntryDetails *rangeTableEntryDetails = NULL; + foreach_ptr(rangeTableEntryDetails, rangeTableEntryDetailsList) + { + if (!rangeTableEntryDetails->hasUniqueIndex) + { + return false; + } + } + return true; +} + + /* * RemoveFromConversionCandidates removes an element from * the relevant list based on the relation id. @@ -289,7 +305,8 @@ RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid r static bool ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList, Oid resultRelationId, - PlannerRestrictionContext *plannerRestrictionContext) + PlannerRestrictionContext * + plannerRestrictionContext) { if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER) { @@ -301,7 +318,8 @@ ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList, return false; } - plannerRestrictionContext = FilterPlannerRestrictionForQuery(plannerRestrictionContext, query); + plannerRestrictionContext = FilterPlannerRestrictionForQuery( + plannerRestrictionContext, query); if (IsRouterPlannable(query, plannerRestrictionContext)) { return false; @@ -311,22 +329,18 @@ ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList, /* - * ShouldConvertDistributedTable returns true if we should convert the - * distributed table rte to a subquery. This will be the case if the distributed - * table has a unique index on a column that appears in filter. + * HasUniqueIndex returns true if the given rangeTableEntry has a constant + * filter on a unique column. */ static bool -ShouldConvertDistributedTable(FromExpr *joinTree, - RangeTableEntryDetails * - rangeTableEntryDetails) +HasUniqueIndex(FromExpr *joinTree, RangeTblEntry *rangeTableEntry, Index rteIndex) { - if (rangeTableEntryDetails == NULL) + if (rangeTableEntry == NULL) { return false; } - List *distRTEEqualityQuals = - FetchEqualityAttrNumsForRTEFromQuals(joinTree->quals, - rangeTableEntryDetails->rteIndex); + List *rteEqualityQuals = + FetchEqualityAttrNumsForRTEFromQuals(joinTree->quals, rteIndex); Node *join = NULL; foreach_ptr(join, joinTree->fromlist) @@ -334,37 +348,18 @@ ShouldConvertDistributedTable(FromExpr *joinTree, if (IsA(join, JoinExpr)) { JoinExpr *joinExpr = (JoinExpr *) join; - distRTEEqualityQuals = list_concat(distRTEEqualityQuals, - FetchEqualityAttrNumsForRTEFromQuals( - joinExpr->quals, - rangeTableEntryDetails-> - rteIndex) - ); + List *joinExprEqualityQuals = FetchEqualityAttrNumsForRTEFromQuals( + joinExpr->quals, rteIndex); + rteEqualityQuals = list_concat(rteEqualityQuals, joinExprEqualityQuals); } } - bool hasUniqueFilter = HasUniqueFilter( - rangeTableEntryDetails->rangeTableEntry, - rangeTableEntryDetails-> - restrictionList, distRTEEqualityQuals); - return hasUniqueFilter; -} - - -/* - * HasUniqueFilter returns true if the given RTE has a unique filter - * on a column, which is a member of the given requiredAttrNumbersForDistRTE. - */ -static bool -HasUniqueFilter(RangeTblEntry *distRTE, List *distRTERestrictionList, - List *requiredAttrNumbersForDistRTE) -{ - List *uniqueIndexes = ExecuteFunctionOnEachTableIndex(distRTE->relid, + List *uniqueIndexes = ExecuteFunctionOnEachTableIndex(rangeTableEntry->relid, GetAllUniqueIndexes); int columnNumber = 0; foreach_int(columnNumber, uniqueIndexes) { - if (list_member_int(requiredAttrNumbersForDistRTE, columnNumber)) + if (list_member_int(rteEqualityQuals, columnNumber)) { return true; } @@ -450,7 +445,8 @@ RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, * be converted to a subquery so that citus planners can work. */ static ConversionCandidates * -CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext, +CreateConversionCandidates(FromExpr *joinTree, + PlannerRestrictionContext *plannerRestrictionContext, List *rangeTableList, Oid resultRelationId) { ConversionCandidates *conversionCandidates = palloc0( @@ -487,6 +483,9 @@ CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext, rangeTableEntry, plannerRestrictionContext, 1); rangeTableEntryDetails->requiredAttributeNumbers = RequiredAttrNumbersForRelation( rangeTableEntry, plannerRestrictionContext); + rangeTableEntryDetails->hasUniqueIndex = HasUniqueIndex(joinTree, + rangeTableEntry, + rteIndex); if (referenceOrDistributedTable) { diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 406333f8e..dec40c41a 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -221,8 +221,8 @@ CreateModifyPlan(Query *originalQuery, Query *query, distributedPlan->modLevel = RowModifyLevelForQuery(query); distributedPlan->planningError = ModifyQuerySupported(query, originalQuery, - multiShardQuery, - plannerRestrictionContext); + multiShardQuery, + plannerRestrictionContext); if (distributedPlan->planningError != NULL) { diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index cac80f824..03c4f1e32 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -1506,7 +1506,7 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList) { containsDistributedTable = true; } - else if (IsLocalTableRteOrMatView((Node*) rangeTableEntry)) + else if (IsLocalTableRteOrMatView((Node *) rangeTableEntry)) { /* we consider citus local tables as local table */ containsLocalTable = true; @@ -1542,7 +1542,8 @@ ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId) continue; } - if (IsLocalTableRteOrMatView((Node*) rangeTableEntry) && rangeTableEntry->relid != resultRelationId) + if (IsLocalTableRteOrMatView((Node *) rangeTableEntry) && + rangeTableEntry->relid != resultRelationId) { containsLocalTable = true; } @@ -1675,6 +1676,7 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry) subquery->targetList = lappend(subquery->targetList, targetEntry); } } + /* * If tupleDesc is NULL we have 2 different cases: * @@ -1724,6 +1726,7 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry) columnType = list_nth_oid(rangeTblFunction->funccoltypes, targetColumnIndex); } + /* use the types in the function definition otherwise */ else { diff --git a/src/test/regress/expected/local_table_join.out b/src/test/regress/expected/local_table_join.out index b9a6018e0..84f19a909 100644 --- a/src/test/regress/expected/local_table_join.out +++ b/src/test/regress/expected/local_table_join.out @@ -289,6 +289,51 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 0 (1 row) +-- the conversions should be independent from the order of table entries in the query +SELECT COUNT(*) FROM postgres_table join distributed_table_pkey using(key) join local_partitioned_table using(key) join distributed_table using(key) where distributed_table_pkey.key = 5; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 +DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) JOIN local_table_join.distributed_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT COUNT(*) FROM postgres_table join local_partitioned_table using(key) join distributed_table_pkey using(key) join distributed_table using(key) where distributed_table_pkey.key = 5; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 +DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) JOIN local_table_join.distributed_table_pkey USING (key)) JOIN local_table_join.distributed_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT COUNT(*) FROM postgres_table join distributed_table using(key) join local_partitioned_table using(key) join distributed_table_pkey using(key) where distributed_table_pkey.key = 5; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 +DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) JOIN local_table_join.distributed_table_pkey USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT COUNT(*) FROM distributed_table_pkey join distributed_table using(key) join postgres_table using(key) join local_partitioned_table using(key) where distributed_table_pkey.key = 5; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 +DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.distributed_table_pkey JOIN local_table_join.distributed_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) + count +--------------------------------------------------------------------- + 0 +(1 row) + -- TODO:: We should probably recursively plan postgres table here because primary key is on key,value not key. SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10; DEBUG: Wrapping local relation "distributed_table_composite" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 diff --git a/src/test/regress/sql/local_table_join.sql b/src/test/regress/sql/local_table_join.sql index 788d053bb..422c64f95 100644 --- a/src/test/regress/sql/local_table_join.sql +++ b/src/test/regress/sql/local_table_join.sql @@ -85,6 +85,12 @@ SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) WHERE distributed_partitioned_table.key = 10; SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) JOIN reference_table USING (key); +-- the conversions should be independent from the order of table entries in the query +SELECT COUNT(*) FROM postgres_table join distributed_table_pkey using(key) join local_partitioned_table using(key) join distributed_table using(key) where distributed_table_pkey.key = 5; +SELECT COUNT(*) FROM postgres_table join local_partitioned_table using(key) join distributed_table_pkey using(key) join distributed_table using(key) where distributed_table_pkey.key = 5; +SELECT COUNT(*) FROM postgres_table join distributed_table using(key) join local_partitioned_table using(key) join distributed_table_pkey using(key) where distributed_table_pkey.key = 5; +SELECT COUNT(*) FROM distributed_table_pkey join distributed_table using(key) join postgres_table using(key) join local_partitioned_table using(key) where distributed_table_pkey.key = 5; + -- TODO:: We should probably recursively plan postgres table here because primary key is on key,value not key. SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10;