From a850c3e3a56e502d4fe705af4142975eef0d9562 Mon Sep 17 00:00:00 2001 From: eaydingol Date: Fri, 1 Aug 2025 17:59:28 +0300 Subject: [PATCH] Refactor the update part --- .../distributed/planner/deparse_shard_query.c | 156 +++++++++--------- .../planner/multi_physical_planner.c | 45 ++--- src/include/distributed/deparse_shard_query.h | 4 +- src/test/regress/sql/recurring_outer_join.sql | 60 ------- 4 files changed, 94 insertions(+), 171 deletions(-) diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index d06d740c4..dc58ddc1b 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -208,75 +208,12 @@ UpdateTaskQueryString(Query *query, Task *task) /* - * UpdateWhereClauseForOuterJoin walks over the query tree and appends quals - * to the WHERE clause to filter w.r.to the distribution column of the corresponding shard. + * DefineQualsForShardInterval creates the necessary qual conditions over the + * given attnum and rtindex for the given shard interval. */ -bool -UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList) +Node * +DefineQualsForShardInterval(RelationShard *relationShard, int attnum, int rtindex) { - if (node == NULL) - { - return false; - } - - if (!IsA(node, Query)) - { - return expression_tree_walker(node, UpdateWhereClauseForOuterJoin, - relationShardList); - } - - Query *query = (Query *) node; - - if (query->jointree == NULL) - { - return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, - 0); - } - - FromExpr *fromExpr = query->jointree; - if (fromExpr == NULL || fromExpr->fromlist == NIL) - { - return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, - 0); - } - - /* TODO: generalize to the list */ - Node *firstFromItem = linitial(fromExpr->fromlist); - if (!IsA(firstFromItem, JoinExpr)) - { - return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, - 0); - } - - JoinExpr *joinExpr = (JoinExpr *) firstFromItem; - - /* - * We need to find the outer table in the join clause to add the constraints w.r.to the shard - * intervals of the inner table. - * A representative inner table is sufficient as long as it is colocated with all other - * distributed tables in the join clause. - */ - RangeTblEntry *innerRte = NULL; - RangeTblEntry *outerRte = NULL; - int outerRtIndex = -1; - int attnum; - if (!CheckPushDownFeasibilityAndComputeIndexes(joinExpr, query, &outerRtIndex, - &outerRte, &innerRte, &attnum)) - { - return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, - 0); - } - if (attnum == InvalidAttrNumber) - { - return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, - 0); - } - ereport(DEBUG5, (errmsg( - "Distributed table from the inner part of the outer join: %s.", - innerRte->eref->aliasname))); - - - RelationShard *relationShard = FindRelationShard(innerRte->relid, relationShardList); uint64 shardId = relationShard->shardId; Oid relationId = relationShard->relationId; @@ -284,12 +221,13 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList) Var *partitionColumnVar = cacheEntry->partitionColumn; /* - * we will add constraints for the outer table, - * we create a Var node for the outer table's column that is compared with the distribution column. + * Add constraints for the relation identified by rtindex, specifically on its column at attnum. + * Create a Var node representing this column, which will be used to compare against the partition + * column for shard interval qualification. */ Var *outerTablePartitionColumnVar = makeVar( - outerRtIndex, attnum, partitionColumnVar->vartype, + rtindex, attnum, partitionColumnVar->vartype, partitionColumnVar->vartypmod, partitionColumnVar->varcollid, 0); @@ -377,18 +315,79 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList) shardIntervalBoundQuals = (Node *) make_orclause(list_make2(nullTest, shardIntervalBoundQuals)); } + return shardIntervalBoundQuals; +} - if (fromExpr->quals == NULL) + +/* + * UpdateWhereClauseForOuterJoin walks over the query tree and appends quals + * to the WHERE clause to filter w.r.to the distribution column of the corresponding shard. + */ +void +UpdateWhereClauseForOuterJoin(Query *query, List *relationShardList) +{ + if (query == NULL || query->jointree == NULL || query->jointree->fromlist == NIL) { - fromExpr->quals = (Node *) shardIntervalBoundQuals; - } - else - { - fromExpr->quals = make_and_qual(fromExpr->quals, shardIntervalBoundQuals); + return; } - /* We need to continue the recursive walk for the nested join statements.*/ - return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); + FromExpr *fromExpr = query->jointree; + if (fromExpr == NULL || fromExpr->fromlist == NIL) + { + return; + } + + ListCell *fromExprCell; + foreach(fromExprCell, fromExpr->fromlist) + { + Node *fromItem = (Node *) lfirst(fromExprCell); + if (!IsA(fromItem, JoinExpr)) + { + continue; + } + JoinExpr *joinExpr = (JoinExpr *) fromItem; + + /* + * We will check if we need to add constraints to the WHERE clause. + */ + RangeTblEntry *innerRte = NULL; + RangeTblEntry *outerRte = NULL; + int outerRtIndex = -1; + int attnum; + if (!CheckPushDownFeasibilityAndComputeIndexes(joinExpr, query, &outerRtIndex, + &outerRte, &innerRte, &attnum)) + { + continue; + } + + if (attnum == InvalidAttrNumber) + { + continue; + } + ereport(DEBUG5, (errmsg( + "Distributed table from the inner part of the outer join: %s.", + innerRte->eref->aliasname))); + + RelationShard *relationShard = FindRelationShard(innerRte->relid, + relationShardList); + + if (relationShard == NULL || relationShard->shardId == INVALID_SHARD_ID) + { + continue; + } + + Node *shardIntervalBoundQuals = DefineQualsForShardInterval(relationShard, attnum, + outerRtIndex); + if (fromExpr->quals == NULL) + { + fromExpr->quals = (Node *) shardIntervalBoundQuals; + } + else + { + fromExpr->quals = make_and_qual(fromExpr->quals, shardIntervalBoundQuals); + } + } + return; } @@ -414,6 +413,7 @@ UpdateRelationToShardNames(Node *node, List *relationShardList) /* want to look at all RTEs, even in subqueries, CTEs and such */ if (IsA(node, Query)) { + UpdateWhereClauseForOuterJoin((Query *) node, relationShardList); // TODO, check this again, we might want to skip this for fast path queries return query_tree_walker((Query *) node, UpdateRelationToShardNames, relationShardList, QTW_EXAMINE_RTES_BEFORE); } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index fa016b0e2..bad9a861b 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -168,15 +168,13 @@ static uint32 HashPartitionCount(void); static Job * BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestrictionContext); static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction, - Bitmapset *distributedTables, - bool *outerPartHasDistributedTable); + Bitmapset *distributedTables); static void ErrorIfUnsupportedShardDistribution(Query *query); static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, RelationRestrictionContext *restrictionContext, uint32 taskId, TaskType taskType, bool modifyRequiresCoordinatorEvaluation, - bool innerTableOfOuterJoin, DeferredErrorMessage **planningError); static List * SqlTaskList(Job *job); static bool DependsOnHashPartitionJob(Job *job); @@ -2202,8 +2200,6 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, int minShardOffset = INT_MAX; int prevShardCount = 0; Bitmapset *taskRequiredForShardIndex = NULL; - bool innerTableOfOuterJoin = false; - bool outerPartHasDistributedTable = false; Bitmapset *distributedTableIndex = NULL; /* error if shards are not co-partitioned */ @@ -2274,16 +2270,9 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, * the table is part of the non-outer side of the join and the outer side has a * distributed table. */ - if (IsInnerTableOfOuterJoin(relationRestriction, distributedTableIndex, - &outerPartHasDistributedTable)) + if (IsInnerTableOfOuterJoin(relationRestriction, distributedTableIndex)) { - innerTableOfOuterJoin = true; - - /* Skip this relation only if a relation from the outer part is distributed */ - if (outerPartHasDistributedTable) - { - continue; - } + continue; } ShardInterval *shardInterval = NULL; @@ -2316,7 +2305,6 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, taskIdIndex, taskType, modifyRequiresCoordinatorEvaluation, - innerTableOfOuterJoin, planningError); if (*planningError != NULL) { @@ -2349,14 +2337,12 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, * RelationRestriction if the table accessed for this relation is * a) in an outer join * b) on the inner part of said join + * c) the outer part of the join has a distributed table * - * The function returns true only if both conditions above hold true - * and sets the outerPartHasDistributedTable to true if the outer part of the - * join contains a distributed table. + * The function returns true only if all three conditions above hold true. */ static bool -IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction, - Bitmapset *distributedTables, bool *outerPartHasDistributedTable) +IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction, Bitmapset *distributedTables) { RestrictInfo *joinInfo = NULL; foreach_declared_ptr(joinInfo, relationRestriction->relOptInfo->joininfo) @@ -2377,9 +2363,13 @@ IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction, if (!isInOuter) { /* this table is joined in the inner part of an outer join */ - *outerPartHasDistributedTable = bms_overlap(joinInfo->outer_relids, - distributedTables); - return true; + /* check if the outer part has a distributed relation */ + bool outerPartHasDistributedTable = bms_overlap(joinInfo->outer_relids, distributedTables); + if (outerPartHasDistributedTable) + { + /* this is an inner table of an outer join with a distributed table */ + return true; + } } } @@ -2495,7 +2485,6 @@ static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, RelationRestrictionContext *restrictionContext, uint32 taskId, TaskType taskType, bool modifyRequiresCoordinatorEvaluation, - bool innerTableOfOuterJoin, DeferredErrorMessage **planningError) { Query *taskQuery = copyObject(originalQuery); @@ -2588,14 +2577,6 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, */ UpdateRelationToShardNames((Node *) taskQuery, relationShardList); - /* - * Augment the where clause with the shard intervals for inner table of outer - * joins. - */ - if (innerTableOfOuterJoin) - { - UpdateWhereClauseForOuterJoin((Node *) taskQuery, relationShardList); - } /* * Ands are made implicit during shard pruning, as predicate comparison and diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 13e11c2e0..fcb83c70a 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -25,7 +25,9 @@ extern void RebuildQueryStrings(Job *workerJob); extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); -extern bool UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList); +extern void UpdateWhereClauseForOuterJoin(Query *query, List *relationShardList); +Node * DefineQualsForShardInterval(RelationShard *relationShard, int attnum, int + outerRtIndex); extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryStringList(Task *task, List *queryStringList); diff --git a/src/test/regress/sql/recurring_outer_join.sql b/src/test/regress/sql/recurring_outer_join.sql index 5bef6b9be..d33309817 100644 --- a/src/test/regress/sql/recurring_outer_join.sql +++ b/src/test/regress/sql/recurring_outer_join.sql @@ -24,9 +24,6 @@ INSERT INTO dist_1 VALUES (7, 41), (7, 42); -CREATE TABLE dist_1_local(LIKE dist_1); -INSERT INTO dist_1_local SELECT * FROM dist_1; - CREATE TABLE dist_2_columnar(LIKE dist_1) USING columnar; INSERT INTO dist_2_columnar SELECT * FROM dist_1; SELECT create_distributed_table('dist_2_columnar', 'a'); @@ -38,28 +35,6 @@ CREATE TABLE dist_3_partitioned_p3 PARTITION OF dist_3_partitioned FOR VALUES FR SELECT create_distributed_table('dist_3_partitioned', 'a'); INSERT INTO dist_3_partitioned SELECT * FROM dist_1; -CREATE TABLE dist_4 (a int, b int); -SELECT create_distributed_table('dist_4', 'a'); -INSERT INTO dist_4 VALUES -(1, 100), -(1, 101), -(1, 300), -(2, 20), -(2, 21), -(2, 400), -(2, 23), -(3, 102), -(3, 301), -(3, 300), -(3, null), -(3, 34), -(7, 40), -(7, null), -(7, 11); - -CREATE TABLE dist_4_local(LIKE dist_4); -INSERT INTO dist_4_local SELECT * FROM dist_4; - CREATE TABLE ref_1 (a int, b int); SELECT create_reference_table('ref_1'); INSERT INTO ref_1 VALUES @@ -79,33 +54,6 @@ INSERT INTO ref_1 VALUES (null, 401), (null, 402); -CREATE TABLE ref_1_local(LIKE ref_1); -INSERT INTO ref_1_local SELECT * FROM ref_1; - ---- We create a second reference table that does not have the distribution column -CREATE TABLE ref_2 (a2 int, b int); -SELECT create_reference_table('ref_2'); -INSERT INTO ref_2 VALUES -(1, null), -(1, 100), -(1, 11), -(null, 102), -(2, 200), -(2, 21), -(null, 202), -(2, 203), -(4, 300), -(4, 301), -(null, 302), -(4, 303), -(4, 304), -(null, 400), -(null, 401), -(null, 402); - -CREATE TABLE ref_2_local(LIKE ref_2); -INSERT INTO ref_2_local SELECT * FROM ref_2; - CREATE TABLE local_1 (a int, b int); INSERT INTO local_1 VALUES (null, 1000), @@ -147,16 +95,8 @@ ALTER TABLE dist_5_with_pkey ADD CONSTRAINT pkey_1 PRIMARY KEY (a); -- SELECT COUNT(*) FROM ref_1 LEFT JOIN dist_1 USING (a); -SELECT COUNT(*) FROM ref_1_local LEFT JOIN dist_1_local USING (a); SELECT COUNT(*) FROM ref_1 LEFT JOIN dist_1 USING (a,b); -SELECT COUNT(*) FROM ref_1_local LEFT JOIN dist_1_local USING (a,b); - -SELECT COUNT(*) FROM ref_1 LEFT JOIN dist_4 USING (b); -SELECT COUNT(*) FROM ref_1_local LEFT JOIN dist_4_local USING (b); - -SELECT * FROM ref_2 LEFT JOIN dist_4 USING (b) ORDER BY b, a2, a; -SELECT * FROM ref_2_local LEFT JOIN dist_4_local USING (b) ORDER BY b, a2, a; SELECT COUNT(*) FROM dist_1 RIGHT JOIN ref_1 USING (a);