mirror of https://github.com/citusdata/citus.git
Fix left join shard pruning (#3569)
DESCRIPTION: Fix left join shard pruning in pushdown planner Due to #2481 which moves outer join planning through the pushdown planner we caused a regression on the shard pruning behaviour for outer joins. In the pushdown planner we make a union of the placement groups for all shards accessed by a query based on the filters we see during planning. Unfortunately implicit filters for left joins are not available during this part. This causes the inner part of an outer join to not prune any shards away. When we take the union of the placement groups it shows the behaviour of not having any shards pruned. Since the inner part of an outer query will not return any rows if the outer part does not contain any rows we have observed we do not have to add the shard intervals of the inner part of an outer query to the list of shard intervals to query. Fixes: #3512pull/3614/head
parent
a14739f808
commit
e5237b9e20
|
@ -144,6 +144,7 @@ static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray,
|
|||
static bool DistributedPlanRouterExecutable(DistributedPlan *distributedPlan);
|
||||
static Job * BuildJobTreeTaskList(Job *jobTree,
|
||||
PlannerRestrictionContext *plannerRestrictionContext);
|
||||
static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction);
|
||||
static void ErrorIfUnsupportedShardDistribution(Query *query);
|
||||
static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
||||
RelationRestrictionContext *restrictionContext,
|
||||
|
@ -2201,6 +2202,21 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
|||
maxShardOffset = -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* For left joins we don't care about the shards pruned for the right hand side.
|
||||
* If the right hand side would prune to a smaller set we should still send it to
|
||||
* all tables of the left hand side. However if the right hand side is bigger than
|
||||
* the left hand side we don't have to send the query to any shard that is not
|
||||
* matching anything on the left hand side.
|
||||
*
|
||||
* Instead we will simply skip any RelationRestriction if it is an OUTER join and
|
||||
* the table is part of the non-outer side of the join.
|
||||
*/
|
||||
if (IsInnerTableOfOuterJoin(relationRestriction))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
foreach(shardIntervalCell, prunedShardList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||
|
@ -2208,15 +2224,8 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
|||
|
||||
taskRequiredForShardIndex[shardIndex] = true;
|
||||
|
||||
if (shardIndex < minShardOffset)
|
||||
{
|
||||
minShardOffset = shardIndex;
|
||||
}
|
||||
|
||||
if (shardIndex > maxShardOffset)
|
||||
{
|
||||
maxShardOffset = shardIndex;
|
||||
}
|
||||
minShardOffset = Min(minShardOffset, shardIndex);
|
||||
maxShardOffset = Max(maxShardOffset, shardIndex);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2266,6 +2275,45 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsInnerTableOfOuterJoin tests based on the join information envoded in a
|
||||
* RelationRestriction if the table accessed for this relation is
|
||||
* a) in an outer join
|
||||
* b) on the inner part of said join
|
||||
*
|
||||
* The function returns true only if both conditions above hold true
|
||||
*/
|
||||
static bool
|
||||
IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction)
|
||||
{
|
||||
RestrictInfo *joinInfo = NULL;
|
||||
foreach_ptr(joinInfo, relationRestriction->relOptInfo->joininfo)
|
||||
{
|
||||
if (joinInfo->outer_relids == NULL)
|
||||
{
|
||||
/* not an outer join */
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* This join restriction info describes an outer join, we need to figure out if
|
||||
* our table is in the non outer part of this join. If that is the case this is a
|
||||
* non outer table of an outer join.
|
||||
*/
|
||||
bool isInOuter = bms_is_member(relationRestriction->relOptInfo->relid,
|
||||
joinInfo->outer_relids);
|
||||
if (!isInOuter)
|
||||
{
|
||||
/* this table is joined in the inner part of an outer join */
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/* we have not found any join clause that satisfies both requirements */
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfUnsupportedShardDistribution gets list of relations in the given query
|
||||
* and checks if two conditions below hold for them, otherwise it errors out.
|
||||
|
|
|
@ -29,6 +29,30 @@ SELECT create_distributed_table('orders_hash_partitioned', 'o_orderkey');
|
|||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE lineitem_hash_partitioned (
|
||||
l_orderkey integer not null,
|
||||
l_partkey integer not null,
|
||||
l_suppkey integer not null,
|
||||
l_linenumber integer not null,
|
||||
l_quantity decimal(15, 2) not null,
|
||||
l_extendedprice decimal(15, 2) not null,
|
||||
l_discount decimal(15, 2) not null,
|
||||
l_tax decimal(15, 2) not null,
|
||||
l_returnflag char(1) not null,
|
||||
l_linestatus char(1) not null,
|
||||
l_shipdate date not null,
|
||||
l_commitdate date not null,
|
||||
l_receiptdate date not null,
|
||||
l_shipinstruct char(25) not null,
|
||||
l_shipmode char(10) not null,
|
||||
l_comment varchar(44) not null,
|
||||
PRIMARY KEY(l_orderkey, l_linenumber) );
|
||||
SELECT create_distributed_table('lineitem_hash_partitioned', 'l_orderkey');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO orders_hash_partitioned (o_orderkey, o_custkey, o_totalprice, o_shippriority, o_clerk) VALUES
|
||||
(1, 11, 10, 111, 'aaa'),
|
||||
(2, 22, 20, 222, 'bbb'),
|
||||
|
@ -1116,5 +1140,127 @@ DEBUG: assigned task to node localhost:xxxxx
|
|||
3
|
||||
(1 row)
|
||||
|
||||
SET citus.task_executor_type TO DEFAULT;
|
||||
SET client_min_messages TO DEFAULT;
|
||||
-- left joins should prune shards based on the left hand side of the left join
|
||||
-- it should only assign 2 tasks as there is a filter on the left table pruning to 2
|
||||
-- shards
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT count(*)
|
||||
FROM orders_hash_partitioned
|
||||
LEFT JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey)
|
||||
WHERE o_orderkey IN (1, 2);
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Aggregate
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 2
|
||||
Tasks Shown: One of 2
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Aggregate
|
||||
-> Nested Loop Left Join
|
||||
-> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned
|
||||
Filter: (o_orderkey = ANY ('{1,2}'::integer[]))
|
||||
-> Index Only Scan using lineitem_hash_partitioned_pkey_630004 on lineitem_hash_partitioned_630004 lineitem_hash_partitioned
|
||||
Index Cond: (l_orderkey = orders_hash_partitioned.o_orderkey)
|
||||
(12 rows)
|
||||
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT count(*)
|
||||
FROM orders_hash_partitioned
|
||||
INNER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey)
|
||||
WHERE o_orderkey IN (1, 2);
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Aggregate
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 2
|
||||
Tasks Shown: One of 2
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Aggregate
|
||||
-> Nested Loop
|
||||
-> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned
|
||||
Filter: (o_orderkey = ANY ('{1,2}'::integer[]))
|
||||
-> Index Only Scan using lineitem_hash_partitioned_pkey_630004 on lineitem_hash_partitioned_630004 lineitem_hash_partitioned
|
||||
Index Cond: (l_orderkey = orders_hash_partitioned.o_orderkey)
|
||||
(12 rows)
|
||||
|
||||
-- same principle but on a right join
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT count(*)
|
||||
FROM orders_hash_partitioned
|
||||
RIGHT JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey)
|
||||
WHERE l_orderkey IN (1, 2);
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Aggregate
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 2
|
||||
Tasks Shown: One of 2
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Aggregate
|
||||
-> Hash Right Join
|
||||
Hash Cond: (orders_hash_partitioned.o_orderkey = lineitem_hash_partitioned.l_orderkey)
|
||||
-> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned
|
||||
-> Hash
|
||||
-> Bitmap Heap Scan on lineitem_hash_partitioned_630004 lineitem_hash_partitioned
|
||||
Recheck Cond: (l_orderkey = ANY ('{1,2}'::integer[]))
|
||||
-> Bitmap Index Scan on lineitem_hash_partitioned_pkey_630004
|
||||
Index Cond: (l_orderkey = ANY ('{1,2}'::integer[]))
|
||||
(15 rows)
|
||||
|
||||
-- full outerjoin should only prune partitions that will not return any rows. In short it
|
||||
-- should cause a union of the FROM and FULL OUTER JOIN tables.
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT count(*)
|
||||
FROM orders_hash_partitioned
|
||||
FULL OUTER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey)
|
||||
WHERE o_orderkey IN (1, 2)
|
||||
OR l_orderkey IN (2, 3);
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Aggregate
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
Tasks Shown: One of 4
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Aggregate
|
||||
-> Hash Full Join
|
||||
Hash Cond: (orders_hash_partitioned.o_orderkey = lineitem_hash_partitioned.l_orderkey)
|
||||
Filter: ((orders_hash_partitioned.o_orderkey = ANY ('{1,2}'::integer[])) OR (lineitem_hash_partitioned.l_orderkey = ANY ('{2,3}'::integer[])))
|
||||
-> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned
|
||||
-> Hash
|
||||
-> Seq Scan on lineitem_hash_partitioned_630004 lineitem_hash_partitioned
|
||||
(13 rows)
|
||||
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT count(*)
|
||||
FROM orders_hash_partitioned
|
||||
FULL OUTER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey)
|
||||
WHERE o_orderkey IN (1, 2)
|
||||
AND l_orderkey IN (2, 3);
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Aggregate
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 3
|
||||
Tasks Shown: One of 3
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Aggregate
|
||||
-> Nested Loop
|
||||
Join Filter: (orders_hash_partitioned.o_orderkey = lineitem_hash_partitioned.l_orderkey)
|
||||
-> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned
|
||||
Filter: (o_orderkey = ANY ('{1,2}'::integer[]))
|
||||
-> Materialize
|
||||
-> Bitmap Heap Scan on lineitem_hash_partitioned_630004 lineitem_hash_partitioned
|
||||
Recheck Cond: (l_orderkey = ANY ('{2,3}'::integer[]))
|
||||
-> Bitmap Index Scan on lineitem_hash_partitioned_pkey_630004
|
||||
Index Cond: (l_orderkey = ANY ('{2,3}'::integer[]))
|
||||
(16 rows)
|
||||
|
||||
SET citus.task_executor_type TO DEFAULT;
|
||||
DROP TABLE lineitem_hash_partitioned;
|
||||
|
|
|
@ -32,6 +32,26 @@ CREATE TABLE orders_hash_partitioned (
|
|||
o_comment varchar(79) );
|
||||
SELECT create_distributed_table('orders_hash_partitioned', 'o_orderkey');
|
||||
|
||||
CREATE TABLE lineitem_hash_partitioned (
|
||||
l_orderkey integer not null,
|
||||
l_partkey integer not null,
|
||||
l_suppkey integer not null,
|
||||
l_linenumber integer not null,
|
||||
l_quantity decimal(15, 2) not null,
|
||||
l_extendedprice decimal(15, 2) not null,
|
||||
l_discount decimal(15, 2) not null,
|
||||
l_tax decimal(15, 2) not null,
|
||||
l_returnflag char(1) not null,
|
||||
l_linestatus char(1) not null,
|
||||
l_shipdate date not null,
|
||||
l_commitdate date not null,
|
||||
l_receiptdate date not null,
|
||||
l_shipinstruct char(25) not null,
|
||||
l_shipmode char(10) not null,
|
||||
l_comment varchar(44) not null,
|
||||
PRIMARY KEY(l_orderkey, l_linenumber) );
|
||||
SELECT create_distributed_table('lineitem_hash_partitioned', 'l_orderkey');
|
||||
|
||||
INSERT INTO orders_hash_partitioned (o_orderkey, o_custkey, o_totalprice, o_shippriority, o_clerk) VALUES
|
||||
(1, 11, 10, 111, 'aaa'),
|
||||
(2, 22, 20, 222, 'bbb'),
|
||||
|
@ -284,5 +304,46 @@ SELECT count(*) FROM orders_hash_partitioned
|
|||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey IN (1,2) OR o_custkey NOT IN (SELECT o_custkey FROM orders_hash_partitioned WHERE o_orderkey = 3);
|
||||
|
||||
SET citus.task_executor_type TO DEFAULT;
|
||||
SET client_min_messages TO DEFAULT;
|
||||
|
||||
-- left joins should prune shards based on the left hand side of the left join
|
||||
-- it should only assign 2 tasks as there is a filter on the left table pruning to 2
|
||||
-- shards
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT count(*)
|
||||
FROM orders_hash_partitioned
|
||||
LEFT JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey)
|
||||
WHERE o_orderkey IN (1, 2);
|
||||
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT count(*)
|
||||
FROM orders_hash_partitioned
|
||||
INNER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey)
|
||||
WHERE o_orderkey IN (1, 2);
|
||||
|
||||
-- same principle but on a right join
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT count(*)
|
||||
FROM orders_hash_partitioned
|
||||
RIGHT JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey)
|
||||
WHERE l_orderkey IN (1, 2);
|
||||
|
||||
-- full outerjoin should only prune partitions that will not return any rows. In short it
|
||||
-- should cause a union of the FROM and FULL OUTER JOIN tables.
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT count(*)
|
||||
FROM orders_hash_partitioned
|
||||
FULL OUTER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey)
|
||||
WHERE o_orderkey IN (1, 2)
|
||||
OR l_orderkey IN (2, 3);
|
||||
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT count(*)
|
||||
FROM orders_hash_partitioned
|
||||
FULL OUTER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey)
|
||||
WHERE o_orderkey IN (1, 2)
|
||||
AND l_orderkey IN (2, 3);
|
||||
|
||||
SET citus.task_executor_type TO DEFAULT;
|
||||
|
||||
DROP TABLE lineitem_hash_partitioned;
|
||||
|
|
Loading…
Reference in New Issue