diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index c81585087..0617a63b6 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -140,6 +140,7 @@ static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray, static bool DistributedPlanRouterExecutable(DistributedPlan *distributedPlan); static Job * BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestrictionContext); +static bool IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction); static void ErrorIfUnsupportedShardDistribution(Query *query); static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, RelationRestrictionContext *restrictionContext, @@ -2165,6 +2166,21 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, maxShardOffset = -1; } + /* + * For left joins we don't care about the shards pruned for the right hand side. + * If the right hand side would prune to a smaller set we should still send it to + * all tables of the left hand side. However if the right hand side is bigger than + * the left hand side we don't have to send the query to any shard that is not + * matching anything on the left hand side. + * + * Instead we will simply skip any RelationRestriction if it is an OUTER join and + * the table is part of the non-outer side of the join. + */ + if (IsInnerTableOfOuterJoin(relationRestriction)) + { + continue; + } + foreach(shardIntervalCell, prunedShardList) { ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); @@ -2172,15 +2188,8 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, taskRequiredForShardIndex[shardIndex] = true; - if (shardIndex < minShardOffset) - { - minShardOffset = shardIndex; - } - - if (shardIndex > maxShardOffset) - { - maxShardOffset = shardIndex; - } + minShardOffset = Min(minShardOffset, shardIndex); + maxShardOffset = Max(maxShardOffset, shardIndex); } } @@ -2230,6 +2239,45 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, } +/* + * IsInnerTableOfOuterJoin tests based on the join information envoded in a + * RelationRestriction if the table accessed for this relation is + * a) in an outer join + * b) on the inner part of said join + * + * The function returns true only if both conditions above hold true + */ +static bool +IsInnerTableOfOuterJoin(RelationRestriction *relationRestriction) +{ + RestrictInfo *joinInfo = NULL; + foreach_ptr(joinInfo, relationRestriction->relOptInfo->joininfo) + { + if (joinInfo->outer_relids == NULL) + { + /* not an outer join */ + continue; + } + + /* + * This join restriction info describes an outer join, we need to figure out if + * our table is in the non outer part of this join. If that is the case this is a + * non outer table of an outer join. + */ + bool isInOuter = bms_is_member(relationRestriction->relOptInfo->relid, + joinInfo->outer_relids); + if (!isInOuter) + { + /* this table is joined in the inner part of an outer join */ + return true; + } + } + + /* we have not found any join clause that satisfies both requirements */ + return false; +} + + /* * ErrorIfUnsupportedShardDistribution gets list of relations in the given query * and checks if two conditions below hold for them, otherwise it errors out. diff --git a/src/test/regress/expected/multi_hash_pruning.out b/src/test/regress/expected/multi_hash_pruning.out index f61625c85..b976abdb7 100644 --- a/src/test/regress/expected/multi_hash_pruning.out +++ b/src/test/regress/expected/multi_hash_pruning.out @@ -29,6 +29,35 @@ SELECT create_distributed_table('orders_hash_partitioned', 'o_orderkey'); (1 row) +CREATE TABLE lineitem_hash_partitioned ( + l_orderkey integer not null, + l_partkey integer not null, + l_suppkey integer not null, + l_linenumber integer not null, + l_quantity decimal(15, 2) not null, + l_extendedprice decimal(15, 2) not null, + l_discount decimal(15, 2) not null, + l_tax decimal(15, 2) not null, + l_returnflag char(1) not null, + l_linestatus char(1) not null, + l_shipdate date not null, + l_commitdate date not null, + l_receiptdate date not null, + l_shipinstruct char(25) not null, + l_shipmode char(10) not null, + l_comment varchar(44) not null, + PRIMARY KEY(l_orderkey, l_linenumber) ); +SELECT create_distributed_table('lineitem_hash_partitioned', 'l_orderkey'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO orders_hash_partitioned (o_orderkey, o_custkey, o_totalprice, o_shippriority, o_clerk) VALUES + (1, 11, 10, 111, 'aaa'), + (2, 22, 20, 222, 'bbb'), + (3, 33, 30, 333, 'ccc'), + (4, 44, 40, 444, 'ddd'); SET client_min_messages TO DEBUG2; -- Check that we can prune shards for simple cases, boolean expressions and -- immutable functions. @@ -36,7 +65,7 @@ SELECT count(*) FROM orders_hash_partitioned; DEBUG: Router planner cannot handle multi-shard select queries count --------------------------------------------------------------------- - 0 + 4 (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1; @@ -45,7 +74,7 @@ DEBUG: Plan is router executable DETAIL: distribution column value: 1 count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2; @@ -54,7 +83,7 @@ DEBUG: Plan is router executable DETAIL: distribution column value: 2 count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3; @@ -63,7 +92,7 @@ DEBUG: Plan is router executable DETAIL: distribution column value: 3 count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4; @@ -72,7 +101,7 @@ DEBUG: Plan is router executable DETAIL: distribution column value: 4 count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) FROM orders_hash_partitioned @@ -82,7 +111,7 @@ DEBUG: Plan is router executable DETAIL: distribution column value: 1 count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1); @@ -91,7 +120,7 @@ DEBUG: Plan is router executable DETAIL: distribution column value: 1 count --------------------------------------------------------------------- - 0 + 1 (1 row) -- disable router planning @@ -100,35 +129,35 @@ SELECT count(*) FROM orders_hash_partitioned; DEBUG: Router planner not enabled. count --------------------------------------------------------------------- - 0 + 4 (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1; DEBUG: Router planner not enabled. count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2; DEBUG: Router planner not enabled. count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3; DEBUG: Router planner not enabled. count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4; DEBUG: Router planner not enabled. count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) FROM orders_hash_partitioned @@ -136,14 +165,14 @@ SELECT count(*) FROM orders_hash_partitioned DEBUG: Router planner not enabled. count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1); DEBUG: Router planner not enabled. count --------------------------------------------------------------------- - 0 + 1 (1 row) SET citus.enable_router_execution TO DEFAULT; @@ -158,14 +187,14 @@ SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is not NULL; DEBUG: Router planner cannot handle multi-shard select queries count --------------------------------------------------------------------- - 0 + 4 (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey > 2; DEBUG: Router planner cannot handle multi-shard select queries count --------------------------------------------------------------------- - 0 + 2 (1 row) SELECT count(*) FROM orders_hash_partitioned @@ -173,7 +202,7 @@ SELECT count(*) FROM orders_hash_partitioned DEBUG: Router planner cannot handle multi-shard select queries count --------------------------------------------------------------------- - 0 + 2 (1 row) SELECT count(*) FROM orders_hash_partitioned @@ -181,7 +210,7 @@ SELECT count(*) FROM orders_hash_partitioned DEBUG: Router planner cannot handle multi-shard select queries count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) FROM orders_hash_partitioned @@ -189,7 +218,7 @@ SELECT count(*) FROM orders_hash_partitioned DEBUG: Router planner cannot handle multi-shard select queries count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) FROM orders_hash_partitioned @@ -197,7 +226,7 @@ SELECT count(*) FROM orders_hash_partitioned DEBUG: Router planner cannot handle multi-shard select queries count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) FROM @@ -207,7 +236,7 @@ DEBUG: Plan is router executable DETAIL: distribution column value: 1 count --------------------------------------------------------------------- - 0 + 1 (1 row) SET client_min_messages TO DEFAULT; @@ -249,7 +278,7 @@ SELECT count(*) FROM lineitem_hash_part (1 row) SELECT count(*) FROM lineitem_hash_part - WHERE l_orderkey = ANY (NULL) OR TRUE; + WHERE l_orderkey = ANY (NULL) OR TRUE; count --------------------------------------------------------------------- 12000 @@ -284,7 +313,7 @@ SELECT count(*) FROM lineitem (1 row) SELECT count(*) FROM lineitem - WHERE l_orderkey = ANY(NULL) OR TRUE; + WHERE l_orderkey = ANY(NULL) OR TRUE; count --------------------------------------------------------------------- 12000 @@ -329,7 +358,7 @@ SELECT count(*) FROM orders_hash_partitioned DEBUG: Router planner cannot handle multi-shard select queries count --------------------------------------------------------------------- - 0 + 1 (1 row) -- Check that we cannot prune for mutable functions. @@ -345,7 +374,7 @@ SELECT count(*) FROM orders_hash_partitioned DEBUG: Router planner cannot handle multi-shard select queries count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) FROM orders_hash_partitioned @@ -377,7 +406,7 @@ DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] count --------------------------------------------------------------------- - 0 + 4 (1 row) SELECT count(*) @@ -393,3 +422,128 @@ DETAIL: distribution column value: 1 0 (1 row) +SET client_min_messages TO DEFAULT; +SET citus.task_executor_type TO 'adaptive'; +-- left joins should prune shards based on the left hand side of the left join +-- it should only assign 2 tasks as there is a filter on the left table pruning to 2 +-- shards +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +LEFT JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Nested Loop Left Join + -> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned + Filter: (o_orderkey = ANY ('{1,2}'::integer[])) + -> Index Only Scan using lineitem_hash_partitioned_pkey_630004 on lineitem_hash_partitioned_630004 lineitem_hash_partitioned + Index Cond: (l_orderkey = orders_hash_partitioned.o_orderkey) +(12 rows) + +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +INNER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Nested Loop + -> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned + Filter: (o_orderkey = ANY ('{1,2}'::integer[])) + -> Index Only Scan using lineitem_hash_partitioned_pkey_630004 on lineitem_hash_partitioned_630004 lineitem_hash_partitioned + Index Cond: (l_orderkey = orders_hash_partitioned.o_orderkey) +(12 rows) + +-- same principle but on a right join +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +RIGHT JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE l_orderkey IN (1, 2); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Right Join + Hash Cond: (orders_hash_partitioned.o_orderkey = lineitem_hash_partitioned.l_orderkey) + -> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned + -> Hash + -> Bitmap Heap Scan on lineitem_hash_partitioned_630004 lineitem_hash_partitioned + Recheck Cond: (l_orderkey = ANY ('{1,2}'::integer[])) + -> Bitmap Index Scan on lineitem_hash_partitioned_pkey_630004 + Index Cond: (l_orderkey = ANY ('{1,2}'::integer[])) +(15 rows) + +-- full outerjoin should only prune partitions that will not return any rows. In short it +-- should cause a union of the FROM and FULL OUTER JOIN tables. +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +FULL OUTER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2) + OR l_orderkey IN (2, 3); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Full Join + Hash Cond: (orders_hash_partitioned.o_orderkey = lineitem_hash_partitioned.l_orderkey) + Filter: ((orders_hash_partitioned.o_orderkey = ANY ('{1,2}'::integer[])) OR (lineitem_hash_partitioned.l_orderkey = ANY ('{2,3}'::integer[]))) + -> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned + -> Hash + -> Seq Scan on lineitem_hash_partitioned_630004 lineitem_hash_partitioned +(13 rows) + +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +FULL OUTER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2) + AND l_orderkey IN (2, 3); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 3 + Tasks Shown: One of 3 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Nested Loop + Join Filter: (orders_hash_partitioned.o_orderkey = lineitem_hash_partitioned.l_orderkey) + -> Seq Scan on orders_hash_partitioned_630000 orders_hash_partitioned + Filter: (o_orderkey = ANY ('{1,2}'::integer[])) + -> Materialize + -> Bitmap Heap Scan on lineitem_hash_partitioned_630004 lineitem_hash_partitioned + Recheck Cond: (l_orderkey = ANY ('{2,3}'::integer[])) + -> Bitmap Index Scan on lineitem_hash_partitioned_pkey_630004 + Index Cond: (l_orderkey = ANY ('{2,3}'::integer[])) +(16 rows) + +SET citus.task_executor_type TO DEFAULT; +DROP TABLE lineitem_hash_partitioned; diff --git a/src/test/regress/sql/multi_hash_pruning.sql b/src/test/regress/sql/multi_hash_pruning.sql index 2b5964ad9..10dbc94c2 100644 --- a/src/test/regress/sql/multi_hash_pruning.sql +++ b/src/test/regress/sql/multi_hash_pruning.sql @@ -32,6 +32,32 @@ CREATE TABLE orders_hash_partitioned ( o_comment varchar(79) ); SELECT create_distributed_table('orders_hash_partitioned', 'o_orderkey'); +CREATE TABLE lineitem_hash_partitioned ( + l_orderkey integer not null, + l_partkey integer not null, + l_suppkey integer not null, + l_linenumber integer not null, + l_quantity decimal(15, 2) not null, + l_extendedprice decimal(15, 2) not null, + l_discount decimal(15, 2) not null, + l_tax decimal(15, 2) not null, + l_returnflag char(1) not null, + l_linestatus char(1) not null, + l_shipdate date not null, + l_commitdate date not null, + l_receiptdate date not null, + l_shipinstruct char(25) not null, + l_shipmode char(10) not null, + l_comment varchar(44) not null, + PRIMARY KEY(l_orderkey, l_linenumber) ); +SELECT create_distributed_table('lineitem_hash_partitioned', 'l_orderkey'); + +INSERT INTO orders_hash_partitioned (o_orderkey, o_custkey, o_totalprice, o_shippriority, o_clerk) VALUES + (1, 11, 10, 111, 'aaa'), + (2, 22, 20, 222, 'bbb'), + (3, 33, 30, 333, 'ccc'), + (4, 44, 40, 444, 'ddd'); + SET client_min_messages TO DEBUG2; -- Check that we can prune shards for simple cases, boolean expressions and @@ -150,3 +176,48 @@ SELECT count(*) WHERE orders1.o_orderkey = orders2.o_orderkey AND orders1.o_orderkey = 1 AND orders2.o_orderkey is NULL; + +SET client_min_messages TO DEFAULT; +SET citus.task_executor_type TO 'adaptive'; + +-- left joins should prune shards based on the left hand side of the left join +-- it should only assign 2 tasks as there is a filter on the left table pruning to 2 +-- shards +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +LEFT JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2); + +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +INNER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2); + +-- same principle but on a right join +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +RIGHT JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE l_orderkey IN (1, 2); + +-- full outerjoin should only prune partitions that will not return any rows. In short it +-- should cause a union of the FROM and FULL OUTER JOIN tables. +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +FULL OUTER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2) + OR l_orderkey IN (2, 3); + +EXPLAIN (COSTS OFF) +SELECT count(*) +FROM orders_hash_partitioned +FULL OUTER JOIN lineitem_hash_partitioned ON (o_orderkey = l_orderkey) +WHERE o_orderkey IN (1, 2) + AND l_orderkey IN (2, 3); + +SET citus.task_executor_type TO DEFAULT; + +DROP TABLE lineitem_hash_partitioned;