Add support for filters

Ensures filter clauses are stripped from master query, and pushed
down to worker queries.
pull/1002/head
Murat Tuncer 2016-11-29 19:00:56 +03:00
parent 19eaf3885f
commit 45762006f3
10 changed files with 151 additions and 6 deletions

View File

@ -1469,6 +1469,7 @@ MasterAggregateExpression(Aggref *originalAggregate,
unionAggregate->aggtype = hllType; unionAggregate->aggtype = hllType;
unionAggregate->args = list_make1(hllTargetEntry); unionAggregate->args = list_make1(hllTargetEntry);
unionAggregate->aggkind = AGGKIND_NORMAL; unionAggregate->aggkind = AGGKIND_NORMAL;
unionAggregate->aggfilter = NULL;
#if (PG_VERSION_NUM >= 90600) #if (PG_VERSION_NUM >= 90600)
unionAggregate->aggtranstype = InvalidOid; unionAggregate->aggtranstype = InvalidOid;
unionAggregate->aggargtypes = list_make1_oid(unionAggregate->aggtype); unionAggregate->aggargtypes = list_make1_oid(unionAggregate->aggtype);
@ -1533,6 +1534,7 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterAggregate->aggdistinct = NULL; newMasterAggregate->aggdistinct = NULL;
newMasterAggregate->aggfnoid = sumFunctionId; newMasterAggregate->aggfnoid = sumFunctionId;
newMasterAggregate->aggtype = masterReturnType; newMasterAggregate->aggtype = masterReturnType;
newMasterAggregate->aggfilter = NULL;
#if (PG_VERSION_NUM >= 90600) #if (PG_VERSION_NUM >= 90600)
newMasterAggregate->aggtranstype = InvalidOid; newMasterAggregate->aggtranstype = InvalidOid;
newMasterAggregate->aggargtypes = list_make1_oid(newMasterAggregate->aggtype); newMasterAggregate->aggargtypes = list_make1_oid(newMasterAggregate->aggtype);
@ -1602,6 +1604,7 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterAggregate = copyObject(originalAggregate); newMasterAggregate = copyObject(originalAggregate);
newMasterAggregate->aggfnoid = aggregateFunctionId; newMasterAggregate->aggfnoid = aggregateFunctionId;
newMasterAggregate->args = list_make1(arrayCatAggArgument); newMasterAggregate->args = list_make1(arrayCatAggArgument);
newMasterAggregate->aggfilter = NULL;
#if (PG_VERSION_NUM >= 90600) #if (PG_VERSION_NUM >= 90600)
newMasterAggregate->aggtranstype = InvalidOid; newMasterAggregate->aggtranstype = InvalidOid;
newMasterAggregate->aggargtypes = list_make1_oid(ANYARRAYOID); newMasterAggregate->aggargtypes = list_make1_oid(ANYARRAYOID);
@ -1632,6 +1635,7 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterAggregate->aggdistinct = NULL; newMasterAggregate->aggdistinct = NULL;
newMasterAggregate->aggfnoid = aggregateFunctionId; newMasterAggregate->aggfnoid = aggregateFunctionId;
newMasterAggregate->aggtype = masterReturnType; newMasterAggregate->aggtype = masterReturnType;
newMasterAggregate->aggfilter = NULL;
column = makeVar(masterTableId, walkerContext->columnId, workerReturnType, column = makeVar(masterTableId, walkerContext->columnId, workerReturnType,
workerReturnTypeMod, workerCollationId, columnLevelsUp); workerReturnTypeMod, workerCollationId, columnLevelsUp);
@ -2073,6 +2077,8 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
addAggregateFunction->aggtype = hllType; addAggregateFunction->aggtype = hllType;
addAggregateFunction->args = addAggregateArgumentList; addAggregateFunction->args = addAggregateArgumentList;
addAggregateFunction->aggkind = AGGKIND_NORMAL; addAggregateFunction->aggkind = AGGKIND_NORMAL;
addAggregateFunction->aggfilter = (Expr *) copyObject(
originalAggregate->aggfilter);
workerAggregateList = lappend(workerAggregateList, addAggregateFunction); workerAggregateList = lappend(workerAggregateList, addAggregateFunction);
} }

View File

@ -176,6 +176,29 @@ SELECT l_returnflag, count(DISTINCT l_shipdate) as count_distinct, count(*) as t
N | 1265 | 6155 N | 1265 | 6155
(3 rows) (3 rows)
SELECT
l_orderkey,
count(l_partkey) FILTER (WHERE l_shipmode = 'AIR'),
count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR'),
count(DISTINCT CASE WHEN l_shipmode = 'AIR' THEN l_partkey ELSE NULL END)
FROM lineitem
GROUP BY l_orderkey
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
l_orderkey | count | count | count
------------+-------+-------+-------
12005 | 4 | 4 | 4
5409 | 4 | 4 | 4
4964 | 4 | 4 | 4
14848 | 3 | 3 | 3
14496 | 3 | 3 | 3
13473 | 3 | 3 | 3
13122 | 3 | 3 | 3
12929 | 3 | 3 | 3
12645 | 3 | 3 | 3
12417 | 3 | 3 | 3
(10 rows)
-- Check that we can revert config and disable count(distinct) approximations -- Check that we can revert config and disable count(distinct) approximations
SET citus.count_distinct_error_rate = 0.0; SET citus.count_distinct_error_rate = 0.0;
SELECT count(distinct l_orderkey) FROM lineitem; SELECT count(distinct l_orderkey) FROM lineitem;

View File

@ -135,6 +135,17 @@ SELECT l_returnflag, count(DISTINCT l_shipdate) as count_distinct, count(*) as t
LIMIT 10; LIMIT 10;
ERROR: cannot compute count (distinct) approximation ERROR: cannot compute count (distinct) approximation
HINT: You need to have the hll extension loaded. HINT: You need to have the hll extension loaded.
SELECT
l_orderkey,
count(l_partkey) FILTER (WHERE l_shipmode = 'AIR'),
count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR'),
count(DISTINCT CASE WHEN l_shipmode = 'AIR' THEN l_partkey ELSE NULL END)
FROM lineitem
GROUP BY l_orderkey
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
ERROR: cannot compute count (distinct) approximation
HINT: You need to have the hll extension loaded.
-- Check that we can revert config and disable count(distinct) approximations -- Check that we can revert config and disable count(distinct) approximations
SET citus.count_distinct_error_rate = 0.0; SET citus.count_distinct_error_rate = 0.0;
SELECT count(distinct l_orderkey) FROM lineitem; SELECT count(distinct l_orderkey) FROM lineitem;

View File

@ -457,3 +457,57 @@ DEBUG: push down of limit count: 30
(10 rows) (10 rows)
RESET client_min_messages; RESET client_min_messages;
-- FILTERs
SELECT
l_orderkey,
sum(l_extendedprice),
sum(l_extendedprice) FILTER (WHERE l_shipmode = 'AIR'),
count(*),
count(*) FILTER (WHERE l_shipmode = 'AIR'),
max(l_extendedprice),
max(l_extendedprice) FILTER (WHERE l_quantity < 30)
FROM lineitem
GROUP BY l_orderkey
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
l_orderkey | sum | sum | count | count | max | max
------------+-----------+-----------+-------+-------+-----------+----------
12804 | 440012.71 | 45788.16 | 7 | 1 | 94398.00 | 45788.16
9863 | 412560.63 | 175647.63 | 7 | 3 | 85723.77 | 50769.14
2567 | 412076.77 | 59722.26 | 7 | 1 | 94894.00 | 9784.02
11142 | 410502.38 | 44965.95 | 7 | 1 | 83989.44 | 44965.95
12039 | 407048.94 | 76406.30 | 7 | 2 | 94471.02 | 19679.30
2306 | 405629.96 | 28032.60 | 7 | 1 | 92838.00 | 44384.50
5606 | 403595.91 | 36531.51 | 7 | 2 | 94890.18 | 30582.75
11296 | 399079.89 | | 6 | 0 | 102449.00 | 33122.93
11046 | 391163.26 | 31436.34 | 7 | 2 | 94506.24 | 47519.76
4421 | 387313.12 | | 7 | 0 | 67301.52 | 23783.40
(10 rows)
SELECT
l_orderkey,
sum(l_extendedprice),
sum(l_extendedprice) FILTER (WHERE l_shipmode = 'AIR'),
count(*),
count(*) FILTER (WHERE l_shipmode = 'AIR'),
max(l_extendedprice),
max(l_extendedprice) FILTER (WHERE l_quantity < 30)
FROM lineitem
GROUP BY l_orderkey
HAVING count(*) FILTER (WHERE l_shipmode = 'AIR') > 1
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
l_orderkey | sum | sum | count | count | max | max
------------+-----------+-----------+-------+-------+----------+----------
9863 | 412560.63 | 175647.63 | 7 | 3 | 85723.77 | 50769.14
12039 | 407048.94 | 76406.30 | 7 | 2 | 94471.02 | 19679.30
5606 | 403595.91 | 36531.51 | 7 | 2 | 94890.18 | 30582.75
11046 | 391163.26 | 31436.34 | 7 | 2 | 94506.24 | 47519.76
14499 | 384140.30 | 67867.08 | 7 | 2 | 84335.36 | 46169.75
11623 | 380598.48 | 133709.82 | 7 | 2 | 93701.54 | 21487.65
10787 | 375688.09 | 99424.78 | 7 | 2 | 76732.67 | 50946.91
12902 | 358191.24 | 76891.00 | 7 | 2 | 82008.08 | 35602.08
3747 | 353701.23 | 68592.23 | 7 | 2 | 67181.10 | 46252.77
5158 | 349889.05 | 159753.19 | 7 | 3 | 78714.67 | 29729.20
(10 rows)

View File

@ -1681,13 +1681,13 @@ DEBUG: Plan is router executable
5 | 1 5 | 1
(1 row) (1 row)
-- non-router plannable queries do not support filters -- non-router plannable queries also support filters
SELECT count(*), count(*) FILTER (WHERE id < 3) SELECT count(*), count(*) FILTER (WHERE id < 3)
FROM articles_hash FROM articles_hash
WHERE author_id = 1 or author_id = 2; WHERE author_id = 1 or author_id = 2;
count | count count | count
-------+------- -------+-------
10 | 0 10 | 2
(1 row) (1 row)
-- prepare queries can be router plannable -- prepare queries can be router plannable

View File

@ -76,7 +76,7 @@ SELECT *
ORDER BY 2 DESC, 1 DESC ORDER BY 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;
-- there is a known issue with aggregates with filters in non-repartition queries (#395) -- count distinct with filters
SELECT SELECT
l_orderkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') l_orderkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR')
FROM lineitem_hash FROM lineitem_hash

View File

@ -122,14 +122,27 @@ SELECT *
1927 | 3 1927 | 3
(10 rows) (10 rows)
-- there is a known issue with aggregates with filters in non-repartition queries (#395) -- count distinct with filters
SELECT SELECT
l_orderkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') l_orderkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR')
FROM lineitem_hash FROM lineitem_hash
GROUP BY l_orderkey GROUP BY l_orderkey
ORDER BY 2 DESC, 1 DESC ORDER BY 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;
ERROR: attribute number 15 exceeds number of columns 2 l_orderkey | count
------------+-------
12005 | 4
5409 | 4
4964 | 4
14848 | 3
14496 | 3
13473 | 3
13122 | 3
12929 | 3
12645 | 3
12417 | 3
(10 rows)
-- filter column already exists in target list -- filter column already exists in target list
SELECT * SELECT *
FROM ( FROM (

View File

@ -116,6 +116,16 @@ SELECT l_returnflag, count(DISTINCT l_shipdate) as count_distinct, count(*) as t
ORDER BY total ORDER BY total
LIMIT 10; LIMIT 10;
SELECT
l_orderkey,
count(l_partkey) FILTER (WHERE l_shipmode = 'AIR'),
count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR'),
count(DISTINCT CASE WHEN l_shipmode = 'AIR' THEN l_partkey ELSE NULL END)
FROM lineitem
GROUP BY l_orderkey
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
-- Check that we can revert config and disable count(distinct) approximations -- Check that we can revert config and disable count(distinct) approximations
SET citus.count_distinct_error_rate = 0.0; SET citus.count_distinct_error_rate = 0.0;

View File

@ -225,3 +225,31 @@ ORDER BY
LIMIT 10 OFFSET 20; LIMIT 10 OFFSET 20;
RESET client_min_messages; RESET client_min_messages;
-- FILTERs
SELECT
l_orderkey,
sum(l_extendedprice),
sum(l_extendedprice) FILTER (WHERE l_shipmode = 'AIR'),
count(*),
count(*) FILTER (WHERE l_shipmode = 'AIR'),
max(l_extendedprice),
max(l_extendedprice) FILTER (WHERE l_quantity < 30)
FROM lineitem
GROUP BY l_orderkey
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
SELECT
l_orderkey,
sum(l_extendedprice),
sum(l_extendedprice) FILTER (WHERE l_shipmode = 'AIR'),
count(*),
count(*) FILTER (WHERE l_shipmode = 'AIR'),
max(l_extendedprice),
max(l_extendedprice) FILTER (WHERE l_quantity < 30)
FROM lineitem
GROUP BY l_orderkey
HAVING count(*) FILTER (WHERE l_shipmode = 'AIR') > 1
ORDER BY 2 DESC, 1 DESC
LIMIT 10;

View File

@ -719,7 +719,7 @@ SELECT count(*), count(*) FILTER (WHERE id < 3)
FROM articles_hash FROM articles_hash
WHERE author_id = 1; WHERE author_id = 1;
-- non-router plannable queries do not support filters -- non-router plannable queries also support filters
SELECT count(*), count(*) FILTER (WHERE id < 3) SELECT count(*), count(*) FILTER (WHERE id < 3)
FROM articles_hash FROM articles_hash
WHERE author_id = 1 or author_id = 2; WHERE author_id = 1 or author_id = 2;