Fix task computation

onur-leftjoin_push-improvements
eaydingol 2025-08-01 15:29:49 +03:00
parent 188043c5e7
commit 7a89dad904
1 changed files with 50 additions and 20 deletions

View File

@ -167,7 +167,9 @@ static uint32 HashPartitionCount(void);
/* Local functions forward declarations for task list creation and helper functions */ /* Local functions forward declarations for task list creation and helper functions */
static Job * BuildJobTreeTaskList(Job *jobTree, static Job * BuildJobTreeTaskList(Job *jobTree,
PlannerRestrictionContext *plannerRestrictionContext); PlannerRestrictionContext *plannerRestrictionContext);
static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction); static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction,
Bitmapset *distributedTables,
bool *outerPartHasDistributedTable);
static void ErrorIfUnsupportedShardDistribution(Query *query); static void ErrorIfUnsupportedShardDistribution(Query *query);
static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext, RelationRestrictionContext *restrictionContext,
@ -2200,8 +2202,9 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
int minShardOffset = INT_MAX; int minShardOffset = INT_MAX;
int prevShardCount = 0; int prevShardCount = 0;
Bitmapset *taskRequiredForShardIndex = NULL; Bitmapset *taskRequiredForShardIndex = NULL;
Bitmapset *innerTableOfOuterJoinSet = NULL;
bool innerTableOfOuterJoin = false; bool innerTableOfOuterJoin = false;
bool outerPartHasDistributedTable = false;
Bitmapset *distributedTableIndex = NULL;
/* error if shards are not co-partitioned */ /* error if shards are not co-partitioned */
ErrorIfUnsupportedShardDistribution(query); ErrorIfUnsupportedShardDistribution(query);
@ -2218,8 +2221,12 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
RelationRestriction *relationRestriction = NULL; RelationRestriction *relationRestriction = NULL;
List *prunedShardList = NULL; List *prunedShardList = NULL;
forboth_ptr(prunedShardList, prunedRelationShardList, /* First loop, gather the indexes of distributed tables
relationRestriction, relationRestrictionContext->relationRestrictionList) * this is required to decide whether we can skip shards
* from inner tables of outer joins
*/
foreach_declared_ptr(relationRestriction,
relationRestrictionContext->relationRestrictionList)
{ {
Oid relationId = relationRestriction->relationId; Oid relationId = relationRestriction->relationId;
@ -2239,16 +2246,44 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
return NIL; return NIL;
} }
prevShardCount = cacheEntry->shardIntervalArrayLength; prevShardCount = cacheEntry->shardIntervalArrayLength;
innerTableOfOuterJoin = false;
distributedTableIndex = bms_add_member(distributedTableIndex,
relationRestriction->index);
}
/* In the second loop, populate taskRequiredForShardIndex */
forboth_ptr(prunedShardList, prunedRelationShardList,
relationRestriction, relationRestrictionContext->relationRestrictionList)
{
Oid relationId = relationRestriction->relationId;
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
if (!HasDistributionKeyCacheEntry(cacheEntry))
{
continue;
}
/* /*
* For left outer joins, we need to check if the table is in the inner * For left joins we don't care about the shards pruned for the right hand side.
* part of the join. If it is, we need to mark this shard and add interval * If the right hand side would prune to a smaller set we should still send it to
* constraints to the join. * all tables of the left hand side. However if the right hand side is bigger than
* the left hand side we don't have to send the query to any shard that is not
* matching anything on the left hand side.
*
* Instead we will simply skip any RelationRestriction if it is an OUTER join,
* the table is part of the non-outer side of the join and the outer side has a
* distributed table.
*/ */
if (IsInnerTableOfOuterJoin(relationRestriction)) if (IsInnerTableOfOuterJoin(relationRestriction, distributedTableIndex,
&outerPartHasDistributedTable))
{ {
innerTableOfOuterJoin = true; innerTableOfOuterJoin = true;
/* Skip this relation only if a relation from the outer part is distributed */
if (outerPartHasDistributedTable)
{
continue;
}
} }
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
@ -2259,15 +2294,6 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
taskRequiredForShardIndex = taskRequiredForShardIndex =
bms_add_member(taskRequiredForShardIndex, shardIndex); bms_add_member(taskRequiredForShardIndex, shardIndex);
minShardOffset = Min(minShardOffset, shardIndex); minShardOffset = Min(minShardOffset, shardIndex);
if (innerTableOfOuterJoin)
{
/*
* We need to keep track of the inner table of outer join
* shards so that we can process them later.
*/
innerTableOfOuterJoinSet =
bms_add_member(innerTableOfOuterJoinSet, shardIndex);
}
} }
} }
@ -2285,7 +2311,6 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
int shardOffset = minShardOffset - 1; int shardOffset = minShardOffset - 1;
while ((shardOffset = bms_next_member(taskRequiredForShardIndex, shardOffset)) >= 0) while ((shardOffset = bms_next_member(taskRequiredForShardIndex, shardOffset)) >= 0)
{ {
innerTableOfOuterJoin = bms_is_member(shardOffset, innerTableOfOuterJoinSet);
Task *subqueryTask = QueryPushdownTaskCreate(query, shardOffset, Task *subqueryTask = QueryPushdownTaskCreate(query, shardOffset,
relationRestrictionContext, relationRestrictionContext,
taskIdIndex, taskIdIndex,
@ -2326,9 +2351,12 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
* b) on the inner part of said join * b) on the inner part of said join
* *
* The function returns true only if both conditions above hold true * The function returns true only if both conditions above hold true
* and sets the outerPartHasDistributedTable to true if the outer part of the
* join contains a distributed table.
*/ */
static bool static bool
IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction) IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction,
Bitmapset *distributedTables, bool *outerPartHasDistributedTable)
{ {
RestrictInfo *joinInfo = NULL; RestrictInfo *joinInfo = NULL;
foreach_declared_ptr(joinInfo, relationRestriction->relOptInfo->joininfo) foreach_declared_ptr(joinInfo, relationRestriction->relOptInfo->joininfo)
@ -2349,6 +2377,8 @@ IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction)
if (!isInOuter) if (!isInOuter)
{ {
/* this table is joined in the inner part of an outer join */ /* this table is joined in the inner part of an outer join */
*outerPartHasDistributedTable = bms_overlap(joinInfo->outer_relids,
distributedTables);
return true; return true;
} }
} }