Refactor the update part

onur-leftjoin_push-improvements
eaydingol 2025-08-01 17:59:28 +03:00
parent 7a89dad904
commit a850c3e3a5
4 changed files with 94 additions and 171 deletions

View File

@ -208,75 +208,12 @@ UpdateTaskQueryString(Query *query, Task *task)
/*
* UpdateWhereClauseForOuterJoin walks over the query tree and appends quals
* to the WHERE clause to filter w.r.to the distribution column of the corresponding shard.
* DefineQualsForShardInterval creates the necessary qual conditions over the
* given attnum and rtindex for the given shard interval.
*/
bool
UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
Node *
DefineQualsForShardInterval(RelationShard *relationShard, int attnum, int rtindex)
{
if (node == NULL)
{
return false;
}
if (!IsA(node, Query))
{
return expression_tree_walker(node, UpdateWhereClauseForOuterJoin,
relationShardList);
}
Query *query = (Query *) node;
if (query->jointree == NULL)
{
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList,
0);
}
FromExpr *fromExpr = query->jointree;
if (fromExpr == NULL || fromExpr->fromlist == NIL)
{
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList,
0);
}
/* TODO: generalize to the list */
Node *firstFromItem = linitial(fromExpr->fromlist);
if (!IsA(firstFromItem, JoinExpr))
{
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList,
0);
}
JoinExpr *joinExpr = (JoinExpr *) firstFromItem;
/*
* We need to find the outer table in the join clause to add the constraints w.r.to the shard
* intervals of the inner table.
* A representative inner table is sufficient as long as it is colocated with all other
* distributed tables in the join clause.
*/
RangeTblEntry *innerRte = NULL;
RangeTblEntry *outerRte = NULL;
int outerRtIndex = -1;
int attnum;
if (!CheckPushDownFeasibilityAndComputeIndexes(joinExpr, query, &outerRtIndex,
&outerRte, &innerRte, &attnum))
{
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList,
0);
}
if (attnum == InvalidAttrNumber)
{
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList,
0);
}
ereport(DEBUG5, (errmsg(
"Distributed table from the inner part of the outer join: %s.",
innerRte->eref->aliasname)));
RelationShard *relationShard = FindRelationShard(innerRte->relid, relationShardList);
uint64 shardId = relationShard->shardId;
Oid relationId = relationShard->relationId;
@ -284,12 +221,13 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
Var *partitionColumnVar = cacheEntry->partitionColumn;
/*
* we will add constraints for the outer table,
* we create a Var node for the outer table's column that is compared with the distribution column.
* Add constraints for the relation identified by rtindex, specifically on its column at attnum.
* Create a Var node representing this column, which will be used to compare against the partition
* column for shard interval qualification.
*/
Var *outerTablePartitionColumnVar = makeVar(
outerRtIndex, attnum, partitionColumnVar->vartype,
rtindex, attnum, partitionColumnVar->vartype,
partitionColumnVar->vartypmod,
partitionColumnVar->varcollid,
0);
@ -377,18 +315,79 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
shardIntervalBoundQuals = (Node *) make_orclause(list_make2(nullTest,
shardIntervalBoundQuals));
}
return shardIntervalBoundQuals;
}
if (fromExpr->quals == NULL)
/*
* UpdateWhereClauseForOuterJoin walks over the query tree and appends quals
* to the WHERE clause to filter w.r.to the distribution column of the corresponding shard.
*/
void
UpdateWhereClauseForOuterJoin(Query *query, List *relationShardList)
{
if (query == NULL || query->jointree == NULL || query->jointree->fromlist == NIL)
{
fromExpr->quals = (Node *) shardIntervalBoundQuals;
}
else
{
fromExpr->quals = make_and_qual(fromExpr->quals, shardIntervalBoundQuals);
return;
}
/* We need to continue the recursive walk for the nested join statements.*/
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
FromExpr *fromExpr = query->jointree;
if (fromExpr == NULL || fromExpr->fromlist == NIL)
{
return;
}
ListCell *fromExprCell;
foreach(fromExprCell, fromExpr->fromlist)
{
Node *fromItem = (Node *) lfirst(fromExprCell);
if (!IsA(fromItem, JoinExpr))
{
continue;
}
JoinExpr *joinExpr = (JoinExpr *) fromItem;
/*
* We will check if we need to add constraints to the WHERE clause.
*/
RangeTblEntry *innerRte = NULL;
RangeTblEntry *outerRte = NULL;
int outerRtIndex = -1;
int attnum;
if (!CheckPushDownFeasibilityAndComputeIndexes(joinExpr, query, &outerRtIndex,
&outerRte, &innerRte, &attnum))
{
continue;
}
if (attnum == InvalidAttrNumber)
{
continue;
}
ereport(DEBUG5, (errmsg(
"Distributed table from the inner part of the outer join: %s.",
innerRte->eref->aliasname)));
RelationShard *relationShard = FindRelationShard(innerRte->relid,
relationShardList);
if (relationShard == NULL || relationShard->shardId == INVALID_SHARD_ID)
{
continue;
}
Node *shardIntervalBoundQuals = DefineQualsForShardInterval(relationShard, attnum,
outerRtIndex);
if (fromExpr->quals == NULL)
{
fromExpr->quals = (Node *) shardIntervalBoundQuals;
}
else
{
fromExpr->quals = make_and_qual(fromExpr->quals, shardIntervalBoundQuals);
}
}
return;
}
@ -414,6 +413,7 @@ UpdateRelationToShardNames(Node *node, List *relationShardList)
/* want to look at all RTEs, even in subqueries, CTEs and such */
if (IsA(node, Query))
{
UpdateWhereClauseForOuterJoin((Query *) node, relationShardList); // TODO, check this again, we might want to skip this for fast path queries
return query_tree_walker((Query *) node, UpdateRelationToShardNames,
relationShardList, QTW_EXAMINE_RTES_BEFORE);
}

View File

@ -168,15 +168,13 @@ static uint32 HashPartitionCount(void);
static Job * BuildJobTreeTaskList(Job *jobTree,
PlannerRestrictionContext *plannerRestrictionContext);
static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction,
Bitmapset *distributedTables,
bool *outerPartHasDistributedTable);
Bitmapset *distributedTables);
static void ErrorIfUnsupportedShardDistribution(Query *query);
static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext,
uint32 taskId,
TaskType taskType,
bool modifyRequiresCoordinatorEvaluation,
bool innerTableOfOuterJoin,
DeferredErrorMessage **planningError);
static List * SqlTaskList(Job *job);
static bool DependsOnHashPartitionJob(Job *job);
@ -2202,8 +2200,6 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
int minShardOffset = INT_MAX;
int prevShardCount = 0;
Bitmapset *taskRequiredForShardIndex = NULL;
bool innerTableOfOuterJoin = false;
bool outerPartHasDistributedTable = false;
Bitmapset *distributedTableIndex = NULL;
/* error if shards are not co-partitioned */
@ -2274,16 +2270,9 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
* the table is part of the non-outer side of the join and the outer side has a
* distributed table.
*/
if (IsInnerTableOfOuterJoin(relationRestriction, distributedTableIndex,
&outerPartHasDistributedTable))
if (IsInnerTableOfOuterJoin(relationRestriction, distributedTableIndex))
{
innerTableOfOuterJoin = true;
/* Skip this relation only if a relation from the outer part is distributed */
if (outerPartHasDistributedTable)
{
continue;
}
continue;
}
ShardInterval *shardInterval = NULL;
@ -2316,7 +2305,6 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
taskIdIndex,
taskType,
modifyRequiresCoordinatorEvaluation,
innerTableOfOuterJoin,
planningError);
if (*planningError != NULL)
{
@ -2349,14 +2337,12 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
* RelationRestriction if the table accessed for this relation is
* a) in an outer join
* b) on the inner part of said join
* c) the outer part of the join has a distributed table
*
* The function returns true only if both conditions above hold true
* and sets the outerPartHasDistributedTable to true if the outer part of the
* join contains a distributed table.
* The function returns true only if all three conditions above hold true.
*/
static bool
IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction,
Bitmapset *distributedTables, bool *outerPartHasDistributedTable)
IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction, Bitmapset *distributedTables)
{
RestrictInfo *joinInfo = NULL;
foreach_declared_ptr(joinInfo, relationRestriction->relOptInfo->joininfo)
@ -2377,9 +2363,13 @@ IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction,
if (!isInOuter)
{
/* this table is joined in the inner part of an outer join */
*outerPartHasDistributedTable = bms_overlap(joinInfo->outer_relids,
distributedTables);
return true;
/* check if the outer part has a distributed relation */
bool outerPartHasDistributedTable = bms_overlap(joinInfo->outer_relids, distributedTables);
if (outerPartHasDistributedTable)
{
/* this is an inner table of an outer join with a distributed table */
return true;
}
}
}
@ -2495,7 +2485,6 @@ static Task *
QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext, uint32 taskId,
TaskType taskType, bool modifyRequiresCoordinatorEvaluation,
bool innerTableOfOuterJoin,
DeferredErrorMessage **planningError)
{
Query *taskQuery = copyObject(originalQuery);
@ -2588,14 +2577,6 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
*/
UpdateRelationToShardNames((Node *) taskQuery, relationShardList);
/*
* Augment the where clause with the shard intervals for inner table of outer
* joins.
*/
if (innerTableOfOuterJoin)
{
UpdateWhereClauseForOuterJoin((Node *) taskQuery, relationShardList);
}
/*
* Ands are made implicit during shard pruning, as predicate comparison and

View File

@ -25,7 +25,9 @@
extern void RebuildQueryStrings(Job *workerJob);
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
extern bool UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList);
extern void UpdateWhereClauseForOuterJoin(Query *query, List *relationShardList);
Node * DefineQualsForShardInterval(RelationShard *relationShard, int attnum, int
outerRtIndex);
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
extern void SetTaskQueryString(Task *task, char *queryString);
extern void SetTaskQueryStringList(Task *task, List *queryStringList);

View File

@ -24,9 +24,6 @@ INSERT INTO dist_1 VALUES
(7, 41),
(7, 42);
CREATE TABLE dist_1_local(LIKE dist_1);
INSERT INTO dist_1_local SELECT * FROM dist_1;
CREATE TABLE dist_2_columnar(LIKE dist_1) USING columnar;
INSERT INTO dist_2_columnar SELECT * FROM dist_1;
SELECT create_distributed_table('dist_2_columnar', 'a');
@ -38,28 +35,6 @@ CREATE TABLE dist_3_partitioned_p3 PARTITION OF dist_3_partitioned FOR VALUES FR
SELECT create_distributed_table('dist_3_partitioned', 'a');
INSERT INTO dist_3_partitioned SELECT * FROM dist_1;
CREATE TABLE dist_4 (a int, b int);
SELECT create_distributed_table('dist_4', 'a');
INSERT INTO dist_4 VALUES
(1, 100),
(1, 101),
(1, 300),
(2, 20),
(2, 21),
(2, 400),
(2, 23),
(3, 102),
(3, 301),
(3, 300),
(3, null),
(3, 34),
(7, 40),
(7, null),
(7, 11);
CREATE TABLE dist_4_local(LIKE dist_4);
INSERT INTO dist_4_local SELECT * FROM dist_4;
CREATE TABLE ref_1 (a int, b int);
SELECT create_reference_table('ref_1');
INSERT INTO ref_1 VALUES
@ -79,33 +54,6 @@ INSERT INTO ref_1 VALUES
(null, 401),
(null, 402);
CREATE TABLE ref_1_local(LIKE ref_1);
INSERT INTO ref_1_local SELECT * FROM ref_1;
--- We create a second reference table that does not have the distribution column
CREATE TABLE ref_2 (a2 int, b int);
SELECT create_reference_table('ref_2');
INSERT INTO ref_2 VALUES
(1, null),
(1, 100),
(1, 11),
(null, 102),
(2, 200),
(2, 21),
(null, 202),
(2, 203),
(4, 300),
(4, 301),
(null, 302),
(4, 303),
(4, 304),
(null, 400),
(null, 401),
(null, 402);
CREATE TABLE ref_2_local(LIKE ref_2);
INSERT INTO ref_2_local SELECT * FROM ref_2;
CREATE TABLE local_1 (a int, b int);
INSERT INTO local_1 VALUES
(null, 1000),
@ -147,16 +95,8 @@ ALTER TABLE dist_5_with_pkey ADD CONSTRAINT pkey_1 PRIMARY KEY (a);
--
SELECT COUNT(*) FROM ref_1 LEFT JOIN dist_1 USING (a);
SELECT COUNT(*) FROM ref_1_local LEFT JOIN dist_1_local USING (a);
SELECT COUNT(*) FROM ref_1 LEFT JOIN dist_1 USING (a,b);
SELECT COUNT(*) FROM ref_1_local LEFT JOIN dist_1_local USING (a,b);
SELECT COUNT(*) FROM ref_1 LEFT JOIN dist_4 USING (b);
SELECT COUNT(*) FROM ref_1_local LEFT JOIN dist_4_local USING (b);
SELECT * FROM ref_2 LEFT JOIN dist_4 USING (b) ORDER BY b, a2, a;
SELECT * FROM ref_2_local LEFT JOIN dist_4_local USING (b) ORDER BY b, a2, a;
SELECT COUNT(*) FROM dist_1 RIGHT JOIN ref_1 USING (a);