diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 93ad31ac5..c19cfa64c 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -144,6 +144,7 @@ static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray, static bool DistributedPlanRouterExecutable(DistributedPlan *distributedPlan); static Job * BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestrictionContext); +static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction); static void ErrorIfUnsupportedShardDistribution(Query *query); static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, RelationRestrictionContext *restrictionContext, @@ -2201,6 +2202,21 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, maxShardOffset = -1; } + /* + * For left joins we don't care about the shards pruned for the right hand side. + * If the right hand side would prune to a smaller set we should still send it to + * all tables of the left hand side. However if the right hand side is bigger than + * the left hand side we don't have to send the query to any shard that is not + * matching anything on the left hand side. + * + * Instead we will simply skip any RelationRestriction if it is an OUTER join and + * the table is part of the non-outer side of the join. + */ + if (IsInnerTableOfOuterJoin(relationRestriction)) + { + continue; + } + foreach(shardIntervalCell, prunedShardList) { ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); @@ -2208,15 +2224,8 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, taskRequiredForShardIndex[shardIndex] = true; - if (shardIndex < minShardOffset) - { - minShardOffset = shardIndex; - } - - if (shardIndex > maxShardOffset) - { - maxShardOffset = shardIndex; - } + minShardOffset = Min(minShardOffset, shardIndex); + maxShardOffset = Max(maxShardOffset, shardIndex); } } @@ -2266,6 +2275,45 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, } +/* + * IsInnerTableOfOuterJoin tests based on the join information envoded in a + * RelationRestriction if the table accessed for this relation is + * a) in an outer join + * b) on the inner part of said join + * + * The function returns true only if both conditions above hold true + */ +static bool +IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction) +{ + RestrictInfo *joinInfo = NULL; + foreach_ptr(joinInfo, relationRestriction->relOptInfo->joininfo) + { + if (joinInfo->outer_relids == NULL) + { + /* not an outer join */ + continue; + } + + /* + * This join restriction info describes an outer join, we need to figure out if + * our table is in the non outer part of this join. If that is the case this is a + * non outer table of an outer join. + */ + bool isInOuter = bms_is_member(relationRestriction->relOptInfo->relid, + joinInfo->outer_relids); + if (!isInOuter) + { + /* this table is joined in the inner part of an outer join */ + return true; + } + } + + /* we have not found any join clause that satisfies both requirements */ + return false; +} + + /* * ErrorIfUnsupportedShardDistribution gets list of relations in the given query * and checks if two conditions below hold for them, otherwise it errors out. diff --git a/src/test/regress/expected/multi_hash_pruning.out b/src/test/regress/expected/multi_hash_pruning.out index e7afee5a4..e1c8a51b4 100644 --- a/src/test/regress/expected/multi_hash_pruning.out +++ b/src/test/regress/expected/multi_hash_pruning.out @@ -29,6 +29,30 @@ SELECT create_distributed_table('orders_hash_partitioned', 'o_orderkey'); (1 row) +CREATE TABLE lineitem_hash_partitioned ( + l_orderkey integer not null, + l_partkey integer not null, + l_suppkey integer not null, + l_linenumber integer not null, + l_quantity decimal(15, 2) not null, + l_extendedprice decimal(15, 2) not null, + l_discount decimal(15, 2) not null, + l_tax decimal(15, 2) not null, + l_returnflag char(1) not null, + l_linestatus char(1) not null, + l_shipdate date not null, + l_commitdate date not null, + l_receiptdate date not null, + l_shipinstruct char(25) not null, + l_shipmode char(10) not null, + l_comment varchar(44) not null, + PRIMARY KEY(l_orderkey, l_linenumber) ); +SELECT create_distributed_table('lineitem_hash_partitioned', 'l_orderkey'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + INSERT INTO orders_hash_partitioned (o_orderkey, o_custkey, o_totalprice, o_shippriority, o_clerk) VALUES (1, 11, 10, 111, 'aaa'), (2, 22, 20, 222, 'bbb'), @@ -1116,5 +1140,127 @@ DEBUG: assigned task to node localhost:xxxxx 3 (1 row) -SET citus.task_executor_type TO DEFAULT; SET client_min_messages TO DEFAULT; +-- left joins should prune shards based on the left hand side of the left join +-- it should only assign 2 tasks as there is a filter on the left table pruning to 2 +-- shards +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +LEFT JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Nested Loop Left Join + -> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned + Filter: (o_orderkey = ANY ('{1,2}'::integer[])) + -> Index Only Scan using lineitem_hash_partitioned_pkey_630004 on lineitem_hash_partitioned_630004 lineitem_hash_partitioned + Index Cond: (l_orderkey = orders_hash_partitioned.o_orderkey) +(12 rows) + +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +INNER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Nested Loop + -> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned + Filter: (o_orderkey = ANY ('{1,2}'::integer[])) + -> Index Only Scan using lineitem_hash_partitioned_pkey_630004 on lineitem_hash_partitioned_630004 lineitem_hash_partitioned + Index Cond: (l_orderkey = orders_hash_partitioned.o_orderkey) +(12 rows) + +-- same principle but on a right join +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +RIGHT JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE l_orderkey IN (1, 2); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Right Join + Hash Cond: (orders_hash_partitioned.o_orderkey = lineitem_hash_partitioned.l_orderkey) + -> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned + -> Hash + -> Bitmap Heap Scan on lineitem_hash_partitioned_630004 lineitem_hash_partitioned + Recheck Cond: (l_orderkey = ANY ('{1,2}'::integer[])) + -> Bitmap Index Scan on lineitem_hash_partitioned_pkey_630004 + Index Cond: (l_orderkey = ANY ('{1,2}'::integer[])) +(15 rows) + +-- full outerjoin should only prune partitions that will not return any rows. In short it +-- should cause a union of the FROM and FULL OUTER JOIN tables. +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +FULL OUTER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2) + OR l_orderkey IN (2, 3); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Full Join + Hash Cond: (orders_hash_partitioned.o_orderkey = lineitem_hash_partitioned.l_orderkey) + Filter: ((orders_hash_partitioned.o_orderkey = ANY ('{1,2}'::integer[])) OR (lineitem_hash_partitioned.l_orderkey = ANY ('{2,3}'::integer[]))) + -> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned + -> Hash + -> Seq Scan on lineitem_hash_partitioned_630004 lineitem_hash_partitioned +(13 rows) + +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +FULL OUTER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2) + AND l_orderkey IN (2, 3); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 3 + Tasks Shown: One of 3 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Nested Loop + Join Filter: (orders_hash_partitioned.o_orderkey = lineitem_hash_partitioned.l_orderkey) + -> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned + Filter: (o_orderkey = ANY ('{1,2}'::integer[])) + -> Materialize + -> Bitmap Heap Scan on lineitem_hash_partitioned_630004 lineitem_hash_partitioned + Recheck Cond: (l_orderkey = ANY ('{2,3}'::integer[])) + -> Bitmap Index Scan on lineitem_hash_partitioned_pkey_630004 + Index Cond: (l_orderkey = ANY ('{2,3}'::integer[])) +(16 rows) + +SET citus.task_executor_type TO DEFAULT; +DROP TABLE lineitem_hash_partitioned; diff --git a/src/test/regress/sql/multi_hash_pruning.sql b/src/test/regress/sql/multi_hash_pruning.sql index 1d366f02c..af881be0b 100644 --- a/src/test/regress/sql/multi_hash_pruning.sql +++ b/src/test/regress/sql/multi_hash_pruning.sql @@ -32,6 +32,26 @@ CREATE TABLE orders_hash_partitioned ( o_comment varchar(79) ); SELECT create_distributed_table('orders_hash_partitioned', 'o_orderkey'); +CREATE TABLE lineitem_hash_partitioned ( + l_orderkey integer not null, + l_partkey integer not null, + l_suppkey integer not null, + l_linenumber integer not null, + l_quantity decimal(15, 2) not null, + l_extendedprice decimal(15, 2) not null, + l_discount decimal(15, 2) not null, + l_tax decimal(15, 2) not null, + l_returnflag char(1) not null, + l_linestatus char(1) not null, + l_shipdate date not null, + l_commitdate date not null, + l_receiptdate date not null, + l_shipinstruct char(25) not null, + l_shipmode char(10) not null, + l_comment varchar(44) not null, + PRIMARY KEY(l_orderkey, l_linenumber) ); +SELECT create_distributed_table('lineitem_hash_partitioned', 'l_orderkey'); + INSERT INTO orders_hash_partitioned (o_orderkey, o_custkey, o_totalprice, o_shippriority, o_clerk) VALUES (1, 11, 10, 111, 'aaa'), (2, 22, 20, 222, 'bbb'), @@ -284,5 +304,46 @@ SELECT count(*) FROM orders_hash_partitioned SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey IN (1,2) OR o_custkey NOT IN (SELECT o_custkey FROM orders_hash_partitioned WHERE o_orderkey = 3); -SET citus.task_executor_type TO DEFAULT; SET client_min_messages TO DEFAULT; + +-- left joins should prune shards based on the left hand side of the left join +-- it should only assign 2 tasks as there is a filter on the left table pruning to 2 +-- shards +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +LEFT JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2); + +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +INNER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2); + +-- same principle but on a right join +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +RIGHT JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE l_orderkey IN (1, 2); + +-- full outerjoin should only prune partitions that will not return any rows. In short it +-- should cause a union of the FROM and FULL OUTER JOIN tables. +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +FULL OUTER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2) + OR l_orderkey IN (2, 3); + +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +FULL OUTER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2) + AND l_orderkey IN (2, 3); + +SET citus.task_executor_type TO DEFAULT; + +DROP TABLE lineitem_hash_partitioned;