Check outer table partition column (#8092)

DESCRIPTION: Introduce a new check to push down a query including union
and outer join to fix #8091 .

In "SafeToPushdownUnionSubquery", we check if the distribution column of
the outer relation is in the target list.
pull/7670/head^2
eaydingol 2025-08-06 16:13:14 +03:00 committed by GitHub
parent f0789bd388
commit 3d8fd337e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 278 additions and 11 deletions

View File

@ -171,6 +171,14 @@ static bool FindQueryContainingRTEIdentityInternal(Node *node,
static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo); static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo);
static bool PartitionColumnSelectedForOuterJoin(Query *query,
RelationRestrictionContext *
restrictionContext,
JoinRestrictionContext *
joinRestrictionContext);
static bool PartitionColumnIsInTargetList(Query *query, JoinRestriction *joinRestriction,
RelationRestrictionContext *restrictionContext);
/* /*
* AllDistributionKeysInQueryAreEqual returns true if either * AllDistributionKeysInQueryAreEqual returns true if either
@ -391,6 +399,80 @@ SafeToPushdownUnionSubquery(Query *originalQuery,
return false; return false;
} }
if (!PartitionColumnSelectedForOuterJoin(originalQuery,
restrictionContext,
joinRestrictionContext))
{
/* outer join does not select partition column of outer relation */
return false;
}
return true;
}
/*
* PartitionColumnSelectedForOuterJoin checks whether the partition column of
* the outer relation is selected in the target list of the query.
*
* If there is no outer join, it returns true.
*/
static bool
PartitionColumnSelectedForOuterJoin(Query *query,
RelationRestrictionContext *restrictionContext,
JoinRestrictionContext *joinRestrictionContext)
{
ListCell *joinRestrictionCell;
foreach(joinRestrictionCell, joinRestrictionContext->joinRestrictionList)
{
JoinRestriction *joinRestriction = (JoinRestriction *) lfirst(
joinRestrictionCell);
/* Restriction context includes alternative plans, sufficient to check for left joins.*/
if (joinRestriction->joinType != JOIN_LEFT)
{
continue;
}
if (!PartitionColumnIsInTargetList(query, joinRestriction, restrictionContext))
{
/* outer join does not select partition column of outer relation */
return false;
}
}
return true;
}
/*
* PartitionColumnIsInTargetList checks whether the partition column of
* the given relation is included in the target list of the query.
*/
static bool
PartitionColumnIsInTargetList(Query *query, JoinRestriction *joinRestriction,
RelationRestrictionContext *restrictionContext)
{
Relids relids = joinRestriction->outerrelRelids;
int relationId = -1;
Index partitionKeyIndex = InvalidAttrNumber;
while ((relationId = bms_next_member(relids, relationId)) >= 0)
{
RangeTblEntry *rte = joinRestriction->plannerInfo->simple_rte_array[relationId];
if (rte->rtekind != RTE_RELATION)
{
/* skip if it is not a relation */
continue;
}
int targetRTEIndex = GetRTEIdentity(rte);
PartitionKeyForRTEIdentityInQuery(query, targetRTEIndex,
&partitionKeyIndex);
if (partitionKeyIndex == 0)
{
/* partition key is not in the target list */
return false;
}
}
return true; return true;
} }

View File

@ -226,6 +226,15 @@ DEPS = {
repeatable=False, repeatable=False,
), ),
"pg17": TestDeps("minimal_schedule", ["multi_behavioral_analytics_create_table"]), "pg17": TestDeps("minimal_schedule", ["multi_behavioral_analytics_create_table"]),
"multi_subquery_misc": TestDeps(
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
),
"multi_subquery_union": TestDeps(
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
),
"multi_subquery_in_where_clause": TestDeps(
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
),
} }

View File

@ -1,7 +1,7 @@
-- --
-- multi subquery in where queries aims to expand existing subquery pushdown -- multi subquery in where queries aims to expand existing subquery pushdown
-- regression tests to cover more cases specifically subqueries in WHERE clause -- regression tests to cover more cases specifically subqueries in WHERE clause
-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql -- the tables that are used depends to multi_behavioral_analytics_create_table.sql
-- --
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
-- subqueries in WHERE with greater operator -- subqueries in WHERE with greater operator

View File

@ -2,7 +2,7 @@
-- (i) Prepared statements -- (i) Prepared statements
-- (ii) PL/PGSQL functions -- (ii) PL/PGSQL functions
-- (iii) SQL functions -- (iii) SQL functions
-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql -- the tables that are used depends to multi_behavioral_analytics_create_table.sql
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
SET citus.enable_router_execution TO false; SET citus.enable_router_execution TO false;
PREPARE prepared_subquery_1 AS PREPARE prepared_subquery_1 AS
@ -352,6 +352,7 @@ ORDER BY 2 DESC;
-- Similar to the above queries, but -- Similar to the above queries, but
-- this time the joins are not removed because -- this time the joins are not removed because
-- target list contains all the entries -- target list contains all the entries
SET citus.enable_router_execution TO true;
SELECT SELECT
* *
FROM users_table t1 FROM users_table t1

View File

@ -1,7 +1,7 @@
-- --
-- multi subquery toplevel union queries aims to expand existing subquery pushdown -- multi subquery toplevel union queries aims to expand existing subquery pushdown
-- regression tests to cover more cases -- regression tests to cover more cases
-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql -- the tables that are used depends to multi_behavioral_analytics_create_table.sql
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
-- SET citus.next_shard_id TO 1400000; -- SET citus.next_shard_id TO 1400000;
-- a very simple union query -- a very simple union query
@ -1246,5 +1246,3 @@ SELECT user_id FROM users_table
UNION SELECT u.user_id FROM users_table, users_udf() u; UNION SELECT u.user_id FROM users_table, users_udf() u;
ERROR: cannot perform distributed planning on this query because parameterized queries for SQL functions referencing distributed tables are not supported ERROR: cannot perform distributed planning on this query because parameterized queries for SQL functions referencing distributed tables are not supported
HINT: Consider using PL/pgSQL functions instead. HINT: Consider using PL/pgSQL functions instead.
DROP TABLE events_reference_table;
DROP TABLE users_reference_table;

View File

@ -1469,5 +1469,140 @@ $$);
f f
(1 row) (1 row)
CREATE TABLE dist1 (a int, b int);
CREATE TABLE dist2 (a int, b int);
SET citus.shard_count to 4;
SELECT create_distributed_table('dist1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist2', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist1 VALUES (1, 1), (2, 1), (3, 1), (4, 1), (5, 1);
INSERT INTO dist2 VALUES (5, 2), (6, 2), (7, 2), (8, 2), (9, 2);
-- safe to pushdown
SELECT * FROM
(
SELECT * FROM dist1 JOIN dist2 USING (a)
UNION
SELECT * FROM dist1 JOIN dist2 USING (a)
) AS t1 ORDER BY 1;
a | b | b
---------------------------------------------------------------------
5 | 1 | 2
(1 row)
-- not safe to pushdown, the distribution key from the outer part of the outer join is not in the target list
SELECT * FROM
(
SELECT dist2.a FROM dist1 LEFT JOIN dist2 USING (a)
UNION
SELECT dist2.a FROM dist2
) AS t1 ORDER BY 1;
a
---------------------------------------------------------------------
5
6
7
8
9
(6 rows)
set client_min_messages to DEBUG3;
-- not safe to pushdown, as is, sub-plan is generated
-- the distribution key from the outer part of the outer join is not in the target list
SELECT * FROM
(
SELECT dist1.a FROM dist1 RIGHT JOIN dist2 USING (a)
UNION
SELECT dist2.a FROM dist2
) AS t1 ORDER BY 1;
DEBUG: no shard pruning constraints on dist1 found
DEBUG: shard count after pruning for dist1: 4
DEBUG: no shard pruning constraints on dist2 found
DEBUG: shard count after pruning for dist2: 4
DEBUG: no shard pruning constraints on dist2 found
DEBUG: shard count after pruning for dist2: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: no shard pruning constraints on dist1 found
DEBUG: shard count after pruning for dist1: 4
DEBUG: no shard pruning constraints on dist2 found
DEBUG: shard count after pruning for dist2: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: no shard pruning constraints on dist1 found
DEBUG: shard count after pruning for dist1: 4
DEBUG: no shard pruning constraints on dist2 found
DEBUG: shard count after pruning for dist2: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: generating subplan XXX_1 for subquery SELECT dist1.a FROM (union_pushdown.dist1 RIGHT JOIN union_pushdown.dist2 USING (a))
DEBUG: no shard pruning constraints on dist2 found
DEBUG: shard count after pruning for dist2: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: no shard pruning constraints on dist2 found
DEBUG: shard count after pruning for dist2: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: generating subplan XXX_2 for subquery SELECT a FROM union_pushdown.dist2
DEBUG: Creating router plan
DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer) UNION SELECT intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) t1 ORDER BY a
DEBUG: Creating router plan
a
---------------------------------------------------------------------
5
6
7
8
9
(6 rows)
-- safe to pushdown, the distribution key from the outer side of the RIGHT join is in the target list
SELECT * FROM
(
SELECT dist2.a
FROM dist1 RIGHT JOIN dist2 USING (a)
UNION
SELECT dist2.a FROM dist2
) AS t1
ORDER BY 1;
DEBUG: no shard pruning constraints on dist1 found
DEBUG: shard count after pruning for dist1: 4
DEBUG: no shard pruning constraints on dist2 found
DEBUG: shard count after pruning for dist2: 4
DEBUG: no shard pruning constraints on dist2 found
DEBUG: shard count after pruning for dist2: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: no shard pruning constraints on dist1 found
DEBUG: shard count after pruning for dist1: 4
DEBUG: no shard pruning constraints on dist2 found
DEBUG: shard count after pruning for dist2: 4
DEBUG: no shard pruning constraints on dist2 found
DEBUG: shard count after pruning for dist2: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
a
---------------------------------------------------------------------
5
6
7
8
9
(5 rows)
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA union_pushdown CASCADE; DROP SCHEMA union_pushdown CASCADE;

View File

@ -1,7 +1,7 @@
-- --
-- multi subquery in where queries aims to expand existing subquery pushdown -- multi subquery in where queries aims to expand existing subquery pushdown
-- regression tests to cover more cases specifically subqueries in WHERE clause -- regression tests to cover more cases specifically subqueries in WHERE clause
-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql -- the tables that are used depends to multi_behavioral_analytics_create_table.sql
-- --
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests

View File

@ -3,7 +3,7 @@
-- (ii) PL/PGSQL functions -- (ii) PL/PGSQL functions
-- (iii) SQL functions -- (iii) SQL functions
-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql -- the tables that are used depends to multi_behavioral_analytics_create_table.sql
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
SET citus.enable_router_execution TO false; SET citus.enable_router_execution TO false;
@ -213,6 +213,7 @@ ORDER BY 2 DESC;
-- Similar to the above queries, but -- Similar to the above queries, but
-- this time the joins are not removed because -- this time the joins are not removed because
-- target list contains all the entries -- target list contains all the entries
SET citus.enable_router_execution TO true;
SELECT SELECT
* *
FROM users_table t1 FROM users_table t1

View File

@ -1,7 +1,7 @@
-- --
-- multi subquery toplevel union queries aims to expand existing subquery pushdown -- multi subquery toplevel union queries aims to expand existing subquery pushdown
-- regression tests to cover more cases -- regression tests to cover more cases
-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql -- the tables that are used depends to multi_behavioral_analytics_create_table.sql
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
-- SET citus.next_shard_id TO 1400000; -- SET citus.next_shard_id TO 1400000;
@ -898,6 +898,3 @@ LANGUAGE sql stable;
SELECT user_id FROM users_table SELECT user_id FROM users_table
UNION SELECT u.user_id FROM users_table, users_udf() u; UNION SELECT u.user_id FROM users_table, users_udf() u;
DROP TABLE events_reference_table;
DROP TABLE users_reference_table;

View File

@ -1109,5 +1109,49 @@ SELECT k, COUNT(*) FROM v GROUP BY k ORDER BY k;
$$); $$);
CREATE TABLE dist1 (a int, b int);
CREATE TABLE dist2 (a int, b int);
SET citus.shard_count to 4;
SELECT create_distributed_table('dist1', 'a');
SELECT create_distributed_table('dist2', 'a');
INSERT INTO dist1 VALUES (1, 1), (2, 1), (3, 1), (4, 1), (5, 1);
INSERT INTO dist2 VALUES (5, 2), (6, 2), (7, 2), (8, 2), (9, 2);
-- safe to pushdown
SELECT * FROM
(
SELECT * FROM dist1 JOIN dist2 USING (a)
UNION
SELECT * FROM dist1 JOIN dist2 USING (a)
) AS t1 ORDER BY 1;
-- not safe to pushdown, the distribution key from the outer part of the outer join is not in the target list
SELECT * FROM
(
SELECT dist2.a FROM dist1 LEFT JOIN dist2 USING (a)
UNION
SELECT dist2.a FROM dist2
) AS t1 ORDER BY 1;
set client_min_messages to DEBUG3;
-- not safe to pushdown, as is, sub-plan is generated
-- the distribution key from the outer part of the outer join is not in the target list
SELECT * FROM
(
SELECT dist1.a FROM dist1 RIGHT JOIN dist2 USING (a)
UNION
SELECT dist2.a FROM dist2
) AS t1 ORDER BY 1;
-- safe to pushdown, the distribution key from the outer side of the RIGHT join is in the target list
SELECT * FROM
(
SELECT dist2.a
FROM dist1 RIGHT JOIN dist2 USING (a)
UNION
SELECT dist2.a FROM dist2
) AS t1
ORDER BY 1;
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA union_pushdown CASCADE; DROP SCHEMA union_pushdown CASCADE;