diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index 89516640a..94c99ef20 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -171,6 +171,14 @@ static bool FindQueryContainingRTEIdentityInternal(Node *node, static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo); +static bool PartitionColumnSelectedForOuterJoin(Query *query, + RelationRestrictionContext * + restrictionContext, + JoinRestrictionContext * + joinRestrictionContext); + +static bool PartitionColumnIsInTargetList(Query *query, JoinRestriction *joinRestriction, + RelationRestrictionContext *restrictionContext); /* * AllDistributionKeysInQueryAreEqual returns true if either @@ -391,6 +399,80 @@ SafeToPushdownUnionSubquery(Query *originalQuery, return false; } + if (!PartitionColumnSelectedForOuterJoin(originalQuery, + restrictionContext, + joinRestrictionContext)) + { + /* outer join does not select partition column of outer relation */ + return false; + } + return true; +} + + +/* + * PartitionColumnSelectedForOuterJoin checks whether the partition column of + * the outer relation is selected in the target list of the query. + * + * If there is no outer join, it returns true. + */ +static bool +PartitionColumnSelectedForOuterJoin(Query *query, + RelationRestrictionContext *restrictionContext, + JoinRestrictionContext *joinRestrictionContext) +{ + ListCell *joinRestrictionCell; + foreach(joinRestrictionCell, joinRestrictionContext->joinRestrictionList) + { + JoinRestriction *joinRestriction = (JoinRestriction *) lfirst( + joinRestrictionCell); + + /* Restriction context includes alternative plans, sufficient to check for left joins.*/ + if (joinRestriction->joinType != JOIN_LEFT) + { + continue; + } + + if (!PartitionColumnIsInTargetList(query, joinRestriction, restrictionContext)) + { + /* outer join does not select partition column of outer relation */ + return false; + } + } + + return true; +} + + +/* + * PartitionColumnIsInTargetList checks whether the partition column of + * the given relation is included in the target list of the query. + */ +static bool +PartitionColumnIsInTargetList(Query *query, JoinRestriction *joinRestriction, + RelationRestrictionContext *restrictionContext) +{ + Relids relids = joinRestriction->outerrelRelids; + int relationId = -1; + Index partitionKeyIndex = InvalidAttrNumber; + while ((relationId = bms_next_member(relids, relationId)) >= 0) + { + RangeTblEntry *rte = joinRestriction->plannerInfo->simple_rte_array[relationId]; + if (rte->rtekind != RTE_RELATION) + { + /* skip if it is not a relation */ + continue; + } + int targetRTEIndex = GetRTEIdentity(rte); + PartitionKeyForRTEIdentityInQuery(query, targetRTEIndex, + &partitionKeyIndex); + if (partitionKeyIndex == 0) + { + /* partition key is not in the target list */ + return false; + } + } + return true; } diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 193bdf09f..fff261372 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -226,6 +226,15 @@ DEPS = { repeatable=False, ), "pg17": TestDeps("minimal_schedule", ["multi_behavioral_analytics_create_table"]), + "multi_subquery_misc": TestDeps( + "minimal_schedule", ["multi_behavioral_analytics_create_table"] + ), + "multi_subquery_union": TestDeps( + "minimal_schedule", ["multi_behavioral_analytics_create_table"] + ), + "multi_subquery_in_where_clause": TestDeps( + "minimal_schedule", ["multi_behavioral_analytics_create_table"] + ), } diff --git a/src/test/regress/expected/multi_insert_select_behavioral_analytics_create_table.out b/src/test/regress/expected/multi_insert_select_behavioral_analytics_create_table.out new file mode 100644 index 000000000..e69de29bb diff --git a/src/test/regress/expected/multi_subquery_in_where_clause.out b/src/test/regress/expected/multi_subquery_in_where_clause.out index 834cef505..c6c5a2b2a 100644 --- a/src/test/regress/expected/multi_subquery_in_where_clause.out +++ b/src/test/regress/expected/multi_subquery_in_where_clause.out @@ -1,7 +1,7 @@ -- -- multi subquery in where queries aims to expand existing subquery pushdown -- regression tests to cover more cases specifically subqueries in WHERE clause --- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql +-- the tables that are used depends to multi_behavioral_analytics_create_table.sql -- -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- subqueries in WHERE with greater operator diff --git a/src/test/regress/expected/multi_subquery_misc.out b/src/test/regress/expected/multi_subquery_misc.out index 3c8abc67d..32f5ab801 100644 --- a/src/test/regress/expected/multi_subquery_misc.out +++ b/src/test/regress/expected/multi_subquery_misc.out @@ -2,7 +2,7 @@ -- (i) Prepared statements -- (ii) PL/PGSQL functions -- (iii) SQL functions --- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql +-- the tables that are used depends to multi_behavioral_analytics_create_table.sql -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests SET citus.enable_router_execution TO false; PREPARE prepared_subquery_1 AS @@ -352,6 +352,7 @@ ORDER BY 2 DESC; -- Similar to the above queries, but -- this time the joins are not removed because -- target list contains all the entries +SET citus.enable_router_execution TO true; SELECT * FROM users_table t1 diff --git a/src/test/regress/expected/multi_subquery_union.out b/src/test/regress/expected/multi_subquery_union.out index 2206e5a4a..7dfd389b3 100644 --- a/src/test/regress/expected/multi_subquery_union.out +++ b/src/test/regress/expected/multi_subquery_union.out @@ -1,7 +1,7 @@ -- -- multi subquery toplevel union queries aims to expand existing subquery pushdown -- regression tests to cover more cases --- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql +-- the tables that are used depends to multi_behavioral_analytics_create_table.sql -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- SET citus.next_shard_id TO 1400000; -- a very simple union query @@ -1246,5 +1246,3 @@ SELECT user_id FROM users_table UNION SELECT u.user_id FROM users_table, users_udf() u; ERROR: cannot perform distributed planning on this query because parameterized queries for SQL functions referencing distributed tables are not supported HINT: Consider using PL/pgSQL functions instead. -DROP TABLE events_reference_table; -DROP TABLE users_reference_table; diff --git a/src/test/regress/expected/union_pushdown.out b/src/test/regress/expected/union_pushdown.out index 4ae83c972..bd078b1fb 100644 --- a/src/test/regress/expected/union_pushdown.out +++ b/src/test/regress/expected/union_pushdown.out @@ -1469,5 +1469,140 @@ $$); f (1 row) +CREATE TABLE dist1 (a int, b int); +CREATE TABLE dist2 (a int, b int); +SET citus.shard_count to 4; +SELECT create_distributed_table('dist1', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist2', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist1 VALUES (1, 1), (2, 1), (3, 1), (4, 1), (5, 1); +INSERT INTO dist2 VALUES (5, 2), (6, 2), (7, 2), (8, 2), (9, 2); +-- safe to pushdown +SELECT * FROM +( + SELECT * FROM dist1 JOIN dist2 USING (a) + UNION + SELECT * FROM dist1 JOIN dist2 USING (a) +) AS t1 ORDER BY 1; + a | b | b +--------------------------------------------------------------------- + 5 | 1 | 2 +(1 row) + +-- not safe to pushdown, the distribution key from the outer part of the outer join is not in the target list +SELECT * FROM +( + SELECT dist2.a FROM dist1 LEFT JOIN dist2 USING (a) + UNION + SELECT dist2.a FROM dist2 +) AS t1 ORDER BY 1; + a +--------------------------------------------------------------------- + 5 + 6 + 7 + 8 + 9 + +(6 rows) + +set client_min_messages to DEBUG3; +-- not safe to pushdown, as is, sub-plan is generated +-- the distribution key from the outer part of the outer join is not in the target list +SELECT * FROM +( + SELECT dist1.a FROM dist1 RIGHT JOIN dist2 USING (a) + UNION + SELECT dist2.a FROM dist2 +) AS t1 ORDER BY 1; +DEBUG: no shard pruning constraints on dist1 found +DEBUG: shard count after pruning for dist1: 4 +DEBUG: no shard pruning constraints on dist2 found +DEBUG: shard count after pruning for dist2: 4 +DEBUG: no shard pruning constraints on dist2 found +DEBUG: shard count after pruning for dist2: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: no shard pruning constraints on dist1 found +DEBUG: shard count after pruning for dist1: 4 +DEBUG: no shard pruning constraints on dist2 found +DEBUG: shard count after pruning for dist2: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: no shard pruning constraints on dist1 found +DEBUG: shard count after pruning for dist1: 4 +DEBUG: no shard pruning constraints on dist2 found +DEBUG: shard count after pruning for dist2: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: generating subplan XXX_1 for subquery SELECT dist1.a FROM (union_pushdown.dist1 RIGHT JOIN union_pushdown.dist2 USING (a)) +DEBUG: no shard pruning constraints on dist2 found +DEBUG: shard count after pruning for dist2: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: no shard pruning constraints on dist2 found +DEBUG: shard count after pruning for dist2: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: generating subplan XXX_2 for subquery SELECT a FROM union_pushdown.dist2 +DEBUG: Creating router plan +DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer) UNION SELECT intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) t1 ORDER BY a +DEBUG: Creating router plan + a +--------------------------------------------------------------------- + 5 + 6 + 7 + 8 + 9 + +(6 rows) + +-- safe to pushdown, the distribution key from the outer side of the RIGHT join is in the target list +SELECT * FROM +( + SELECT dist2.a + FROM dist1 RIGHT JOIN dist2 USING (a) + UNION + SELECT dist2.a FROM dist2 +) AS t1 +ORDER BY 1; +DEBUG: no shard pruning constraints on dist1 found +DEBUG: shard count after pruning for dist1: 4 +DEBUG: no shard pruning constraints on dist2 found +DEBUG: shard count after pruning for dist2: 4 +DEBUG: no shard pruning constraints on dist2 found +DEBUG: shard count after pruning for dist2: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: no shard pruning constraints on dist1 found +DEBUG: shard count after pruning for dist1: 4 +DEBUG: no shard pruning constraints on dist2 found +DEBUG: shard count after pruning for dist2: 4 +DEBUG: no shard pruning constraints on dist2 found +DEBUG: shard count after pruning for dist2: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx + a +--------------------------------------------------------------------- + 5 + 6 + 7 + 8 + 9 +(5 rows) + SET client_min_messages TO WARNING; DROP SCHEMA union_pushdown CASCADE; diff --git a/src/test/regress/sql/multi_subquery_in_where_clause.sql b/src/test/regress/sql/multi_subquery_in_where_clause.sql index ecd4cbffa..0c7343627 100644 --- a/src/test/regress/sql/multi_subquery_in_where_clause.sql +++ b/src/test/regress/sql/multi_subquery_in_where_clause.sql @@ -1,7 +1,7 @@ -- -- multi subquery in where queries aims to expand existing subquery pushdown -- regression tests to cover more cases specifically subqueries in WHERE clause --- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql +-- the tables that are used depends to multi_behavioral_analytics_create_table.sql -- -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests diff --git a/src/test/regress/sql/multi_subquery_misc.sql b/src/test/regress/sql/multi_subquery_misc.sql index 4b81491b1..2cdcc810a 100644 --- a/src/test/regress/sql/multi_subquery_misc.sql +++ b/src/test/regress/sql/multi_subquery_misc.sql @@ -3,7 +3,7 @@ -- (ii) PL/PGSQL functions -- (iii) SQL functions --- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql +-- the tables that are used depends to multi_behavioral_analytics_create_table.sql -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests SET citus.enable_router_execution TO false; @@ -213,6 +213,7 @@ ORDER BY 2 DESC; -- Similar to the above queries, but -- this time the joins are not removed because -- target list contains all the entries +SET citus.enable_router_execution TO true; SELECT * FROM users_table t1 diff --git a/src/test/regress/sql/multi_subquery_union.sql b/src/test/regress/sql/multi_subquery_union.sql index 3d35609e5..d4407646a 100644 --- a/src/test/regress/sql/multi_subquery_union.sql +++ b/src/test/regress/sql/multi_subquery_union.sql @@ -1,7 +1,7 @@ -- -- multi subquery toplevel union queries aims to expand existing subquery pushdown -- regression tests to cover more cases --- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql +-- the tables that are used depends to multi_behavioral_analytics_create_table.sql -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- SET citus.next_shard_id TO 1400000; @@ -898,6 +898,3 @@ LANGUAGE sql stable; SELECT user_id FROM users_table UNION SELECT u.user_id FROM users_table, users_udf() u; - -DROP TABLE events_reference_table; -DROP TABLE users_reference_table; diff --git a/src/test/regress/sql/union_pushdown.sql b/src/test/regress/sql/union_pushdown.sql index 1bb63eb62..09bf218d3 100644 --- a/src/test/regress/sql/union_pushdown.sql +++ b/src/test/regress/sql/union_pushdown.sql @@ -1109,5 +1109,49 @@ SELECT k, COUNT(*) FROM v GROUP BY k ORDER BY k; $$); +CREATE TABLE dist1 (a int, b int); +CREATE TABLE dist2 (a int, b int); +SET citus.shard_count to 4; +SELECT create_distributed_table('dist1', 'a'); +SELECT create_distributed_table('dist2', 'a'); +INSERT INTO dist1 VALUES (1, 1), (2, 1), (3, 1), (4, 1), (5, 1); +INSERT INTO dist2 VALUES (5, 2), (6, 2), (7, 2), (8, 2), (9, 2); + +-- safe to pushdown +SELECT * FROM +( + SELECT * FROM dist1 JOIN dist2 USING (a) + UNION + SELECT * FROM dist1 JOIN dist2 USING (a) +) AS t1 ORDER BY 1; + +-- not safe to pushdown, the distribution key from the outer part of the outer join is not in the target list +SELECT * FROM +( + SELECT dist2.a FROM dist1 LEFT JOIN dist2 USING (a) + UNION + SELECT dist2.a FROM dist2 +) AS t1 ORDER BY 1; + +set client_min_messages to DEBUG3; +-- not safe to pushdown, as is, sub-plan is generated +-- the distribution key from the outer part of the outer join is not in the target list +SELECT * FROM +( + SELECT dist1.a FROM dist1 RIGHT JOIN dist2 USING (a) + UNION + SELECT dist2.a FROM dist2 +) AS t1 ORDER BY 1; + +-- safe to pushdown, the distribution key from the outer side of the RIGHT join is in the target list +SELECT * FROM +( + SELECT dist2.a + FROM dist1 RIGHT JOIN dist2 USING (a) + UNION + SELECT dist2.a FROM dist2 +) AS t1 +ORDER BY 1; + SET client_min_messages TO WARNING; DROP SCHEMA union_pushdown CASCADE;