diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index dbdc8560a..fa016b0e2 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -167,7 +167,9 @@ static uint32 HashPartitionCount(void); /* Local functions forward declarations for task list creation and helper functions */ static Job * BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestrictionContext); -static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction); +static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction, + Bitmapset *distributedTables, + bool *outerPartHasDistributedTable); static void ErrorIfUnsupportedShardDistribution(Query *query); static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, RelationRestrictionContext *restrictionContext, @@ -2200,8 +2202,9 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, int minShardOffset = INT_MAX; int prevShardCount = 0; Bitmapset *taskRequiredForShardIndex = NULL; - Bitmapset *innerTableOfOuterJoinSet = NULL; bool innerTableOfOuterJoin = false; + bool outerPartHasDistributedTable = false; + Bitmapset *distributedTableIndex = NULL; /* error if shards are not co-partitioned */ ErrorIfUnsupportedShardDistribution(query); @@ -2218,8 +2221,12 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, RelationRestriction *relationRestriction = NULL; List *prunedShardList = NULL; - forboth_ptr(prunedShardList, prunedRelationShardList, - relationRestriction, relationRestrictionContext->relationRestrictionList) + /* First loop, gather the indexes of distributed tables + * this is required to decide whether we can skip shards + * from inner tables of outer joins + */ + foreach_declared_ptr(relationRestriction, + relationRestrictionContext->relationRestrictionList) { Oid relationId = relationRestriction->relationId; @@ -2239,16 +2246,44 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, return NIL; } prevShardCount = cacheEntry->shardIntervalArrayLength; - innerTableOfOuterJoin = false; + + distributedTableIndex = bms_add_member(distributedTableIndex, + relationRestriction->index); + } + + /* In the second loop, populate taskRequiredForShardIndex */ + forboth_ptr(prunedShardList, prunedRelationShardList, + relationRestriction, relationRestrictionContext->relationRestrictionList) + { + Oid relationId = relationRestriction->relationId; + + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); + if (!HasDistributionKeyCacheEntry(cacheEntry)) + { + continue; + } /* - * For left outer joins, we need to check if the table is in the inner - * part of the join. If it is, we need to mark this shard and add interval - * constraints to the join. + * For left joins we don't care about the shards pruned for the right hand side. + * If the right hand side would prune to a smaller set we should still send it to + * all tables of the left hand side. However if the right hand side is bigger than + * the left hand side we don't have to send the query to any shard that is not + * matching anything on the left hand side. + * + * Instead we will simply skip any RelationRestriction if it is an OUTER join, + * the table is part of the non-outer side of the join and the outer side has a + * distributed table. */ - if (IsInnerTableOfOuterJoin(relationRestriction)) + if (IsInnerTableOfOuterJoin(relationRestriction, distributedTableIndex, + &outerPartHasDistributedTable)) { innerTableOfOuterJoin = true; + + /* Skip this relation only if a relation from the outer part is distributed */ + if (outerPartHasDistributedTable) + { + continue; + } } ShardInterval *shardInterval = NULL; @@ -2259,15 +2294,6 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, taskRequiredForShardIndex = bms_add_member(taskRequiredForShardIndex, shardIndex); minShardOffset = Min(minShardOffset, shardIndex); - if (innerTableOfOuterJoin) - { - /* - * We need to keep track of the inner table of outer join - * shards so that we can process them later. - */ - innerTableOfOuterJoinSet = - bms_add_member(innerTableOfOuterJoinSet, shardIndex); - } } } @@ -2285,7 +2311,6 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, int shardOffset = minShardOffset - 1; while ((shardOffset = bms_next_member(taskRequiredForShardIndex, shardOffset)) >= 0) { - innerTableOfOuterJoin = bms_is_member(shardOffset, innerTableOfOuterJoinSet); Task *subqueryTask = QueryPushdownTaskCreate(query, shardOffset, relationRestrictionContext, taskIdIndex, @@ -2326,9 +2351,12 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, * b) on the inner part of said join * * The function returns true only if both conditions above hold true + * and sets the outerPartHasDistributedTable to true if the outer part of the + * join contains a distributed table. */ static bool -IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction) +IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction, + Bitmapset *distributedTables, bool *outerPartHasDistributedTable) { RestrictInfo *joinInfo = NULL; foreach_declared_ptr(joinInfo, relationRestriction->relOptInfo->joininfo) @@ -2349,6 +2377,8 @@ IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction) if (!isInOuter) { /* this table is joined in the inner part of an outer join */ + *outerPartHasDistributedTable = bms_overlap(joinInfo->outer_relids, + distributedTables); return true; } }