Make simple in queries on unique columns work with repartion join (#3171)

This is necassery to support Q20 of the CHbenCHmark: #2582.

To summarize the fix: The subquery is converted into an INNER JOIN on a
table. This fixes the issue, since an INNER JOIN on a table is already
supported by the repartion planner.

The way this replacement is happening.:
1. Postgres replaces `col in (subquery)` with a SEMI JOIN (subquery) on col = subquery_result
2. If this subquery is simple enough Postgres will replace it with a
   regular read from a table
3. If the subquery returns unique results (e.g. a primary key) Postgres
   will convert the SEMI JOIN into an INNER JOIN during the planning. It
   will not change this in the rewritten query though.
4. We check if Postgres sends us any SEMI JOINs during its join order
   planning, if it doesn't we replace all SEMI JOINs in the rewritten
   query with INNER JOIN (which we already support).
pull/3176/head^2
Jelte Fennema 2019-11-11 13:44:28 +01:00 committed by GitHub
parent 57380fd668
commit adc6ca6100
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 456 additions and 35 deletions

View File

@ -1331,6 +1331,14 @@ multi_join_restriction_hook(PlannerInfo *root,
joinRestrictionContext->joinRestrictionList =
lappend(joinRestrictionContext->joinRestrictionList, joinRestriction);
/*
* Keep track if we received any semi joins here. If we didn't we can
* later safely convert any semi joins in the rewritten query to inner
* joins.
*/
plannerRestrictionContext->hasSemiJoin = plannerRestrictionContext->hasSemiJoin ||
extra->sjinfo->jointype == JOIN_SEMI;
MemoryContextSwitchTo(oldMemoryContext);
}

View File

@ -134,7 +134,8 @@ MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree,
MultiNode *multiQueryNode = NULL;
MultiTreeRoot *rootNode = NULL;
if (ShouldUseSubqueryPushDown(originalQuery, queryTree))
if (ShouldUseSubqueryPushDown(originalQuery, queryTree, plannerRestrictionContext))
{
multiQueryNode = SubqueryMultiNodeTree(originalQuery, queryTree,
plannerRestrictionContext);
@ -969,7 +970,7 @@ HasUnsupportedJoinWalker(Node *node, void *context)
JoinExpr *joinExpr = (JoinExpr *) node;
JoinType joinType = joinExpr->jointype;
bool outerJoin = IS_OUTER_JOIN(joinType);
if (!outerJoin && joinType != JOIN_INNER)
if (!outerJoin && joinType != JOIN_INNER && joinType != JOIN_SEMI)
{
hasUnsupportedJoin = true;
}
@ -1335,7 +1336,7 @@ ExtractFromExpressionWalker(Node *node, QualifierWalkerContext *walkerContext)
}
/* return outer join clauses in a separate list */
if (joinType == JOIN_INNER)
if (joinType == JOIN_INNER || joinType == JOIN_SEMI)
{
walkerContext->baseQualifierList =
list_concat(walkerContext->baseQualifierList, joinQualifierList);

View File

@ -70,7 +70,6 @@ bool SubqueryPushdown = false; /* is subquery pushdown enabled */
static bool JoinTreeContainsSubqueryWalker(Node *joinTreeNode, void *context);
static bool IsFunctionRTE(Node *node);
static bool IsNodeSubquery(Node *node);
static bool IsNodeSubqueryOrParamExec(Node *node);
static bool IsOuterJoinExpr(Node *node);
static bool WindowPartitionOnDistributionColumn(Query *query);
static DeferredErrorMessage * DeferErrorIfFromClauseRecurs(Query *queryTree);
@ -100,21 +99,21 @@ static List * CreateSubqueryTargetEntryList(List *columnList);
static bool RelationInfoContainsOnlyRecurringTuples(PlannerInfo *plannerInfo,
RelOptInfo *relationInfo);
/*
* ShouldUseSubqueryPushDown determines whether it's desirable to use
* subquery pushdown to plan the query based on the original and
* rewritten query.
*/
bool
ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery)
ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
List *qualifierList = NIL;
StringInfo errorMessage = NULL;
/*
* We check the existence of subqueries in FROM clause on the modified query
* given that if postgres already flattened the subqueries, MultiPlanTree()
* given that if postgres already flattened the subqueries, MultiNodeTree()
* can plan corresponding distributed plan.
*/
if (JoinTreeContainsSubquery(rewrittenQuery))
@ -123,16 +122,35 @@ ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery)
}
/*
* We also check the existence of subqueries in WHERE clause. Note that
* this check needs to be done on the original query given that
* standard_planner() may replace the sublinks with anti/semi joins and
* MultiPlanTree() cannot plan such queries.
* We check the existence of subqueries in WHERE and HAVING clause on the
* modified query. In some cases subqueries in the original query are
* converted into inner joins and in those cases MultiNodeTree() can plan
* the rewritten plan.
*/
if (WhereOrHavingClauseContainsSubquery(originalQuery))
if (WhereOrHavingClauseContainsSubquery(rewrittenQuery))
{
return true;
}
/*
* We check if postgres planned any semi joins, MultiNodeTree doesn't
* support these so we fail. Postgres is able to replace some IN/ANY
* subqueries with semi joins and then replace those with inner joins (ones
* where the subquery returns unique results). This allows MultiNodeTree to
* execute these subqueries (because they are converted to inner joins).
* However, even in that case the rewrittenQuery still contains join nodes
* with jointype JOIN_SEMI because Postgres doesn't actually update these.
* The way we find out instead if it actually planned semi joins, is by
* checking the joins that were sent to multi_join_restriction_hook. If no
* joins of type JOIN_SEMI are sent it is safe to convert all JOIN_SEMI
* nodes to JOIN_INNER nodes (which is what is done in MultiNodeTree).
*/
if (plannerRestrictionContext->hasSemiJoin)
{
return true;
}
/*
* We process function RTEs as subqueries, since the join order planner
* does not know how to handle them.
@ -159,8 +177,6 @@ ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery)
*/
if (FindNodeCheck((Node *) rewrittenQuery->jointree, IsOuterJoinExpr))
{
/* Assert what _should_ be only situation this occurs in. */
Assert(JoinTreeContainsSubquery(originalQuery));
return true;
}
@ -293,7 +309,7 @@ WhereOrHavingClauseContainsSubquery(Query *query)
bool
TargetListContainsSubquery(Query *query)
{
return FindNodeCheck((Node *) query->targetList, IsNodeSubqueryOrParamExec);
return FindNodeCheck((Node *) query->targetList, IsNodeSubquery);
}
@ -318,11 +334,15 @@ IsFunctionRTE(Node *node)
/*
* IsNodeSubquery returns true if the given node is a Query or SubPlan.
* IsNodeSubquery returns true if the given node is a Query or SubPlan or a
* Param node with paramkind PARAM_EXEC.
*
* The check for SubPlan is needed whev this is used on a already rewritten
* query. Such a query has SubPlan nodes instead of SubLink nodes (which
* contain a Query node).
* The check for PARAM_EXEC is needed because some very simple subqueries like
* (select 1) are converted to init plans in the rewritten query. In this case
* the only thing left in the query tree is a Param node with type PARAM_EXEC.
*/
static bool
IsNodeSubquery(Node *node)
@ -332,23 +352,7 @@ IsNodeSubquery(Node *node)
return false;
}
return IsA(node, Query) || IsA(node, SubPlan);
}
/*
* IsNodeSubqueryOrParamExec returns true if the given node is a subquery or a
* Param node with paramkind PARAM_EXEC.
*/
static bool
IsNodeSubqueryOrParamExec(Node *node)
{
if (node == NULL)
{
return false;
}
if (IsNodeSubquery(node))
if (IsA(node, Query) || IsA(node, SubPlan))
{
return true;
}

View File

@ -10,6 +10,8 @@
#ifndef DISTRIBUTED_PLANNER_H
#define DISTRIBUTED_PLANNER_H
#include "postgres.h"
#include "nodes/plannodes.h"
#if PG_VERSION_NUM >= 120000
@ -81,6 +83,7 @@ typedef struct PlannerRestrictionContext
{
RelationRestrictionContext *relationRestrictionContext;
JoinRestrictionContext *joinRestrictionContext;
bool hasSemiJoin;
MemoryContext memoryContext;
} PlannerRestrictionContext;

View File

@ -22,7 +22,8 @@
extern bool SubqueryPushdown;
extern bool ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery);
extern bool ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery,
PlannerRestrictionContext *plannerRestrictionContext);
extern bool JoinTreeContainsSubquery(Query *query);
extern bool HasEmptyJoinTree(Query *query);
extern bool WhereOrHavingClauseContainsSubquery(Query *query);

View File

@ -0,0 +1,215 @@
SET citus.next_shard_id TO 1680000;
CREATE SCHEMA ch_bench_subquery_repartition;
SET search_path = ch_bench_subquery_repartition, public;
SET citus.enable_repartition_joins TO on;
CREATE TABLE order_line (
ol_w_id int NOT NULL,
ol_d_id int NOT NULL,
ol_o_id int NOT NULL,
ol_number int NOT NULL,
ol_i_id int NOT NULL,
ol_delivery_d timestamp NULL DEFAULT NULL,
ol_amount decimal(6,2) NOT NULL,
ol_supply_w_id int NOT NULL,
ol_quantity decimal(2,0) NOT NULL,
ol_dist_info char(24) NOT NULL,
PRIMARY KEY (ol_w_id,ol_d_id,ol_o_id,ol_number)
);
CREATE TABLE stock (
s_w_id int NOT NULL,
s_i_id int NOT NULL,
s_quantity decimal(4,0) NOT NULL,
s_ytd decimal(8,2) NOT NULL,
s_order_cnt int NOT NULL,
s_remote_cnt int NOT NULL,
s_data varchar(50) NOT NULL,
s_dist_01 char(24) NOT NULL,
s_dist_02 char(24) NOT NULL,
s_dist_03 char(24) NOT NULL,
s_dist_04 char(24) NOT NULL,
s_dist_05 char(24) NOT NULL,
s_dist_06 char(24) NOT NULL,
s_dist_07 char(24) NOT NULL,
s_dist_08 char(24) NOT NULL,
s_dist_09 char(24) NOT NULL,
s_dist_10 char(24) NOT NULL,
PRIMARY KEY (s_w_id,s_i_id)
);
CREATE TABLE item (
i_id int NOT NULL,
i_name varchar(24) NOT NULL,
i_price decimal(5,2) NOT NULL,
i_data varchar(50) NOT NULL,
i_im_id int NOT NULL,
PRIMARY KEY (i_id)
);
create table nation (
n_nationkey int not null,
n_name char(25) not null,
n_regionkey int not null,
n_comment char(152) not null,
PRIMARY KEY ( n_nationkey )
);
create table supplier (
su_suppkey int not null,
su_name char(25) not null,
su_address varchar(40) not null,
su_nationkey int not null,
su_phone char(15) not null,
su_acctbal numeric(12,2) not null,
su_comment char(101) not null,
PRIMARY KEY ( su_suppkey )
);
SELECT create_distributed_table('order_line','ol_w_id');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('stock','s_w_id');
create_distributed_table
--------------------------
(1 row)
SELECT create_reference_table('item');
create_reference_table
------------------------
(1 row)
SELECT create_reference_table('nation');
create_reference_table
------------------------
(1 row)
SELECT create_reference_table('supplier');
create_reference_table
------------------------
(1 row)
INSERT INTO order_line SELECT c, c, c, c, c, NULL, c, c, c, 'abc' FROM generate_series(1, 10) as c;
INSERT INTO stock SELECT c, c, c, c, c, c, 'abc', c, c, c, c, c, c, c, c, c, c FROM generate_series(1, 5) as c;
INSERT INTO item SELECT c, 'abc', c, 'abc', c FROM generate_series(1, 3) as c;
INSERT INTO item SELECT 10+c, 'abc', c, 'abc', c FROM generate_series(1, 3) as c;
-- Subquery + repartion is supported when it is an IN query where the subquery
-- returns unique results (because it's converted to an INNER JOIN)
select s_i_id
from stock, order_line
where
s_i_id in (select i_id from item)
AND s_i_id = ol_i_id
order by s_i_id;
s_i_id
--------
1
2
3
(3 rows)
-- Subquery + repartion is not supported when it is an IN query where the
-- subquery doesn't return unique results
select s_i_id
from stock, order_line
where
s_i_id in (select i_im_id from item)
AND s_i_id = ol_i_id;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- Subquery + repartion is supported when it is a NOT IN query where the subquery
-- returns unique results
select s_i_id
from stock, order_line
where
s_i_id not in (select i_id from item)
AND s_i_id = ol_i_id;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- Subquery + repartion is not supported when it is a NOT IN where the subquery
-- doesn't return unique results
select s_i_id
from stock, order_line
where
s_i_id not in (select i_im_id from item)
AND s_i_id = ol_i_id;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- Actual CHbenCHmark query is supported
select su_name, su_address
from supplier, nation
where su_suppkey in
(select mod(s_i_id * s_w_id, 10000)
from stock, order_line
where s_i_id in
(select i_id
from item
where i_data like 'co%')
and ol_i_id=s_i_id
and ol_delivery_d > '2010-05-23 12:00:00'
group by s_i_id, s_w_id, s_quantity
having 2*s_quantity > sum(ol_quantity))
and su_nationkey = n_nationkey
and n_name = 'Germany'
order by su_name;
su_name | su_address
---------+------------
(0 rows)
-- Fallback to public tables with prefilled data
DROP table ch_bench_subquery_repartition.supplier, ch_bench_subquery_repartition.nation;
TRUNCATE order_line, stock, item;
SET search_path = ch_bench_subquery_repartition, public;
insert into stock VALUES
(1, 33, 1000, 1, 1, 1, '', '','','','','','','','','',''),
(1, 44, 1000, 1, 1, 1, '', '','','','','','','','','',''),
(33, 1, 1000, 1, 1, 1, '', '','','','','','','','','',''),
(32, 1, 1000, 1, 1, 1, '', '','','','','','','','','','');
INSERT INTO order_line SELECT c, c, c, c, 33, '2011-01-01', c, c, c, 'abc' FROM generate_series(1, 3) as c;
INSERT INTO order_line SELECT c, c, c, c, 44, '2011-01-01', c, c, c, 'abc' FROM generate_series(4, 6) as c;
INSERT INTO item SELECT c, 'abc', c, 'noco_abc', c FROM generate_series(40, 45) as c;
INSERT INTO item SELECT c, 'abc', c, 'co_abc', c FROM generate_series(30, 33) as c;
-- Actual CHbenCHmark query is supported with data
select s_name, s_address
from supplier, nation
where s_suppkey in
(select mod(s_i_id * s_w_id, 10000)
from stock, order_line
where s_i_id in
(select i_id
from item
where i_data like 'co%')
and ol_i_id=s_i_id
and ol_delivery_d > '2010-05-23 12:00:00'
group by s_i_id, s_w_id, s_quantity
having 2*s_quantity > sum(ol_quantity))
and s_nationkey = n_nationkey
and n_name = 'GERMANY'
order by s_name;
s_name | s_address
---------------------------+-------------------------------------
Supplier#000000033 | gfeKpYw3400L0SDywXA6Ya1Qmq1w6YB9f3R
(1 row)
-- Confirm that like 'co%' filter filtered out item with id 44
select s_name, s_address
from supplier, nation
where s_suppkey in
(select mod(s_i_id * s_w_id, 10000)
from stock, order_line
where s_i_id in
(select i_id
from item)
and ol_i_id=s_i_id
and ol_delivery_d > '2010-05-23 12:00:00'
group by s_i_id, s_w_id, s_quantity
having 2*s_quantity > sum(ol_quantity))
and s_nationkey = n_nationkey
and n_name = 'GERMANY'
order by s_name;
s_name | s_address
---------------------------+-------------------------------------
Supplier#000000033 | gfeKpYw3400L0SDywXA6Ya1Qmq1w6YB9f3R
Supplier#000000044 | kERxlLDnlIZJdN66zAPHklyL
(2 rows)
SET client_min_messages TO WARNING;
DROP SCHEMA ch_bench_subquery_repartition CASCADE;

View File

@ -73,7 +73,7 @@ test: multi_reference_table multi_select_for_update relation_access_tracking
test: custom_aggregate_support
test: multi_average_expression multi_working_columns multi_having_pushdown
test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having ch_bench_subquery_repartition
test: multi_agg_type_conversion multi_count_type_conversion
test: multi_partition_pruning single_hash_repartition_join
test: multi_join_pruning multi_hash_pruning

View File

@ -0,0 +1,189 @@
SET citus.next_shard_id TO 1680000;
CREATE SCHEMA ch_bench_subquery_repartition;
SET search_path = ch_bench_subquery_repartition, public;
SET citus.enable_repartition_joins TO on;
CREATE TABLE order_line (
ol_w_id int NOT NULL,
ol_d_id int NOT NULL,
ol_o_id int NOT NULL,
ol_number int NOT NULL,
ol_i_id int NOT NULL,
ol_delivery_d timestamp NULL DEFAULT NULL,
ol_amount decimal(6,2) NOT NULL,
ol_supply_w_id int NOT NULL,
ol_quantity decimal(2,0) NOT NULL,
ol_dist_info char(24) NOT NULL,
PRIMARY KEY (ol_w_id,ol_d_id,ol_o_id,ol_number)
);
CREATE TABLE stock (
s_w_id int NOT NULL,
s_i_id int NOT NULL,
s_quantity decimal(4,0) NOT NULL,
s_ytd decimal(8,2) NOT NULL,
s_order_cnt int NOT NULL,
s_remote_cnt int NOT NULL,
s_data varchar(50) NOT NULL,
s_dist_01 char(24) NOT NULL,
s_dist_02 char(24) NOT NULL,
s_dist_03 char(24) NOT NULL,
s_dist_04 char(24) NOT NULL,
s_dist_05 char(24) NOT NULL,
s_dist_06 char(24) NOT NULL,
s_dist_07 char(24) NOT NULL,
s_dist_08 char(24) NOT NULL,
s_dist_09 char(24) NOT NULL,
s_dist_10 char(24) NOT NULL,
PRIMARY KEY (s_w_id,s_i_id)
);
CREATE TABLE item (
i_id int NOT NULL,
i_name varchar(24) NOT NULL,
i_price decimal(5,2) NOT NULL,
i_data varchar(50) NOT NULL,
i_im_id int NOT NULL,
PRIMARY KEY (i_id)
);
create table nation (
n_nationkey int not null,
n_name char(25) not null,
n_regionkey int not null,
n_comment char(152) not null,
PRIMARY KEY ( n_nationkey )
);
create table supplier (
su_suppkey int not null,
su_name char(25) not null,
su_address varchar(40) not null,
su_nationkey int not null,
su_phone char(15) not null,
su_acctbal numeric(12,2) not null,
su_comment char(101) not null,
PRIMARY KEY ( su_suppkey )
);
SELECT create_distributed_table('order_line','ol_w_id');
SELECT create_distributed_table('stock','s_w_id');
SELECT create_reference_table('item');
SELECT create_reference_table('nation');
SELECT create_reference_table('supplier');
INSERT INTO order_line SELECT c, c, c, c, c, NULL, c, c, c, 'abc' FROM generate_series(1, 10) as c;
INSERT INTO stock SELECT c, c, c, c, c, c, 'abc', c, c, c, c, c, c, c, c, c, c FROM generate_series(1, 5) as c;
INSERT INTO item SELECT c, 'abc', c, 'abc', c FROM generate_series(1, 3) as c;
INSERT INTO item SELECT 10+c, 'abc', c, 'abc', c FROM generate_series(1, 3) as c;
-- Subquery + repartion is supported when it is an IN query where the subquery
-- returns unique results (because it's converted to an INNER JOIN)
select s_i_id
from stock, order_line
where
s_i_id in (select i_id from item)
AND s_i_id = ol_i_id
order by s_i_id;
-- Subquery + repartion is not supported when it is an IN query where the
-- subquery doesn't return unique results
select s_i_id
from stock, order_line
where
s_i_id in (select i_im_id from item)
AND s_i_id = ol_i_id;
-- Subquery + repartion is supported when it is a NOT IN query where the subquery
-- returns unique results
select s_i_id
from stock, order_line
where
s_i_id not in (select i_id from item)
AND s_i_id = ol_i_id;
-- Subquery + repartion is not supported when it is a NOT IN where the subquery
-- doesn't return unique results
select s_i_id
from stock, order_line
where
s_i_id not in (select i_im_id from item)
AND s_i_id = ol_i_id;
-- Actual CHbenCHmark query is supported
select su_name, su_address
from supplier, nation
where su_suppkey in
(select mod(s_i_id * s_w_id, 10000)
from stock, order_line
where s_i_id in
(select i_id
from item
where i_data like 'co%')
and ol_i_id=s_i_id
and ol_delivery_d > '2010-05-23 12:00:00'
group by s_i_id, s_w_id, s_quantity
having 2*s_quantity > sum(ol_quantity))
and su_nationkey = n_nationkey
and n_name = 'Germany'
order by su_name;
-- Fallback to public tables with prefilled data
DROP table ch_bench_subquery_repartition.supplier, ch_bench_subquery_repartition.nation;
TRUNCATE order_line, stock, item;
SET search_path = ch_bench_subquery_repartition, public;
insert into stock VALUES
(1, 33, 1000, 1, 1, 1, '', '','','','','','','','','',''),
(1, 44, 1000, 1, 1, 1, '', '','','','','','','','','',''),
(33, 1, 1000, 1, 1, 1, '', '','','','','','','','','',''),
(32, 1, 1000, 1, 1, 1, '', '','','','','','','','','','');
INSERT INTO order_line SELECT c, c, c, c, 33, '2011-01-01', c, c, c, 'abc' FROM generate_series(1, 3) as c;
INSERT INTO order_line SELECT c, c, c, c, 44, '2011-01-01', c, c, c, 'abc' FROM generate_series(4, 6) as c;
INSERT INTO item SELECT c, 'abc', c, 'noco_abc', c FROM generate_series(40, 45) as c;
INSERT INTO item SELECT c, 'abc', c, 'co_abc', c FROM generate_series(30, 33) as c;
-- Actual CHbenCHmark query is supported with data
select s_name, s_address
from supplier, nation
where s_suppkey in
(select mod(s_i_id * s_w_id, 10000)
from stock, order_line
where s_i_id in
(select i_id
from item
where i_data like 'co%')
and ol_i_id=s_i_id
and ol_delivery_d > '2010-05-23 12:00:00'
group by s_i_id, s_w_id, s_quantity
having 2*s_quantity > sum(ol_quantity))
and s_nationkey = n_nationkey
and n_name = 'GERMANY'
order by s_name;
-- Confirm that like 'co%' filter filtered out item with id 44
select s_name, s_address
from supplier, nation
where s_suppkey in
(select mod(s_i_id * s_w_id, 10000)
from stock, order_line
where s_i_id in
(select i_id
from item)
and ol_i_id=s_i_id
and ol_delivery_d > '2010-05-23 12:00:00'
group by s_i_id, s_w_id, s_quantity
having 2*s_quantity > sum(ol_quantity))
and s_nationkey = n_nationkey
and n_name = 'GERMANY'
order by s_name;
SET client_min_messages TO WARNING;
DROP SCHEMA ch_bench_subquery_repartition CASCADE;