mirror of https://github.com/citusdata/citus.git
Improve subquery pushdown regression tests
- Use native postgres function for composite key btree functions - Move explain tests to multi_explain.sql (get rid of .out _0.out files) - Get rid of input/output files for multi_subquery.sql by moving table creations - Update some commentspull/1389/head
parent
b072708802
commit
df494c0403
|
@ -397,6 +397,305 @@ HashAggregate
|
||||||
Group Key: lineitem.l_quantity
|
Group Key: lineitem.l_quantity
|
||||||
-> Seq Scan on public.lineitem_290001 lineitem
|
-> Seq Scan on public.lineitem_290001 lineitem
|
||||||
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
|
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
|
||||||
|
-- Subquery pushdown tests with explain
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
SELECT
|
||||||
|
avg(array_length(events, 1)) AS event_average
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
tenant_id,
|
||||||
|
user_id,
|
||||||
|
array_agg(event_type ORDER BY event_time) AS events
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
event_type,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type IN ('click', 'submit', 'pay')) AS subquery
|
||||||
|
GROUP BY
|
||||||
|
tenant_id,
|
||||||
|
user_id) AS subquery;
|
||||||
|
Aggregate
|
||||||
|
-> Custom Scan (Citus Real-Time)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=57637 dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
-> GroupAggregate
|
||||||
|
Group Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id)
|
||||||
|
-> Sort
|
||||||
|
Sort Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id)
|
||||||
|
-> Nested Loop
|
||||||
|
Join Filter: ((NULL::user_composite_type) = events.composite_id)
|
||||||
|
-> Result
|
||||||
|
One-Time Filter: false
|
||||||
|
-> Seq Scan on events_1400027 events
|
||||||
|
Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[]))
|
||||||
|
-- Union and left join subquery pushdown
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
SELECT
|
||||||
|
avg(array_length(events, 1)) AS event_average,
|
||||||
|
hasdone
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
array_agg(event ORDER BY event_time) AS events,
|
||||||
|
COALESCE(hasdone, 'Has not done paying') AS hasdone
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id) as composite_id,
|
||||||
|
'action=>1'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'click')
|
||||||
|
UNION
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id) as composite_id,
|
||||||
|
'action=>2'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'submit')
|
||||||
|
) AS subquery_1
|
||||||
|
LEFT JOIN
|
||||||
|
(SELECT
|
||||||
|
DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id,
|
||||||
|
(composite_id).tenant_id,
|
||||||
|
(composite_id).user_id,
|
||||||
|
'Has done paying'::TEXT AS hasdone
|
||||||
|
FROM
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'pay') AS subquery_2
|
||||||
|
ON
|
||||||
|
subquery_1.composite_id = subquery_2.composite_id
|
||||||
|
GROUP BY
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
hasdone) AS subquery_top
|
||||||
|
GROUP BY
|
||||||
|
hasdone;
|
||||||
|
HashAggregate
|
||||||
|
Group Key: remote_scan.hasdone
|
||||||
|
-> Custom Scan (Citus Real-Time)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=57637 dbname=regression
|
||||||
|
-> GroupAggregate
|
||||||
|
Group Key: subquery_top.hasdone
|
||||||
|
-> Sort
|
||||||
|
Sort Key: subquery_top.hasdone
|
||||||
|
-> Subquery Scan on subquery_top
|
||||||
|
-> GroupAggregate
|
||||||
|
Group Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id), subquery_2.hasdone
|
||||||
|
-> Sort
|
||||||
|
Sort Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id), subquery_2.hasdone
|
||||||
|
-> Hash Left Join
|
||||||
|
Hash Cond: ((NULL::user_composite_type) = subquery_2.composite_id)
|
||||||
|
-> Unique
|
||||||
|
-> Sort
|
||||||
|
Sort Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id), (NULL::user_composite_type), ('action=>1'::text), events.event_time
|
||||||
|
-> Append
|
||||||
|
-> Nested Loop
|
||||||
|
Join Filter: ((NULL::user_composite_type) = events.composite_id)
|
||||||
|
-> Result
|
||||||
|
One-Time Filter: false
|
||||||
|
-> Seq Scan on events_1400027 events
|
||||||
|
Filter: ((event_type)::text = 'click'::text)
|
||||||
|
-> Nested Loop
|
||||||
|
Join Filter: ((NULL::user_composite_type) = events_1.composite_id)
|
||||||
|
-> Result
|
||||||
|
One-Time Filter: false
|
||||||
|
-> Seq Scan on events_1400027 events_1
|
||||||
|
Filter: ((event_type)::text = 'submit'::text)
|
||||||
|
-> Hash
|
||||||
|
-> Subquery Scan on subquery_2
|
||||||
|
-> Unique
|
||||||
|
-> Sort
|
||||||
|
Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id)
|
||||||
|
-> Seq Scan on events_1400027 events_2
|
||||||
|
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text))
|
||||||
|
-- Union, left join and having subquery pushdown
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
SELECT
|
||||||
|
avg(array_length(events, 1)) AS event_average,
|
||||||
|
count_pay
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
array_agg(event ORDER BY event_time) AS events,
|
||||||
|
COALESCE(count_pay, 0) AS count_pay
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id),
|
||||||
|
'action=>1'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'click')
|
||||||
|
UNION
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id),
|
||||||
|
'action=>2'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'submit')
|
||||||
|
) AS subquery_1
|
||||||
|
LEFT JOIN
|
||||||
|
(SELECT
|
||||||
|
(composite_id).tenant_id,
|
||||||
|
(composite_id).user_id,
|
||||||
|
composite_id,
|
||||||
|
COUNT(*) AS count_pay
|
||||||
|
FROM
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'pay'
|
||||||
|
GROUP BY
|
||||||
|
composite_id
|
||||||
|
HAVING
|
||||||
|
COUNT(*) > 2) AS subquery_2
|
||||||
|
ON
|
||||||
|
subquery_1.composite_id = subquery_2.composite_id
|
||||||
|
GROUP BY
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
count_pay) AS subquery_top
|
||||||
|
WHERE
|
||||||
|
array_ndims(events) > 0
|
||||||
|
GROUP BY
|
||||||
|
count_pay
|
||||||
|
ORDER BY
|
||||||
|
count_pay;
|
||||||
|
ERROR: bogus varattno for OUTER_VAR var: 3
|
||||||
|
-- Lateral join subquery pushdown
|
||||||
|
-- set subquery_pushdown due to limit in the query
|
||||||
|
SET citus.subquery_pushdown to ON;
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
SELECT
|
||||||
|
tenant_id,
|
||||||
|
user_id,
|
||||||
|
user_lastseen,
|
||||||
|
event_array
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
tenant_id,
|
||||||
|
user_id,
|
||||||
|
max(lastseen) as user_lastseen,
|
||||||
|
array_agg(event_type ORDER BY event_time) AS event_array
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
(composite_id).tenant_id,
|
||||||
|
(composite_id).user_id,
|
||||||
|
composite_id,
|
||||||
|
lastseen
|
||||||
|
FROM
|
||||||
|
users
|
||||||
|
WHERE
|
||||||
|
composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
composite_id <= '(1, 9223372036854775807)'::user_composite_type
|
||||||
|
ORDER BY
|
||||||
|
lastseen DESC
|
||||||
|
LIMIT
|
||||||
|
10
|
||||||
|
) AS subquery_top
|
||||||
|
LEFT JOIN LATERAL
|
||||||
|
(SELECT
|
||||||
|
event_type,
|
||||||
|
event_time
|
||||||
|
FROM
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(composite_id) = subquery_top.composite_id
|
||||||
|
ORDER BY
|
||||||
|
event_time DESC
|
||||||
|
LIMIT
|
||||||
|
99) AS subquery_lateral
|
||||||
|
ON
|
||||||
|
true
|
||||||
|
GROUP BY
|
||||||
|
tenant_id,
|
||||||
|
user_id
|
||||||
|
) AS shard_union
|
||||||
|
ORDER BY
|
||||||
|
user_lastseen DESC
|
||||||
|
LIMIT
|
||||||
|
10;
|
||||||
|
Limit
|
||||||
|
-> Sort
|
||||||
|
Sort Key: remote_scan.user_lastseen DESC
|
||||||
|
-> Custom Scan (Citus Real-Time)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=57637 dbname=regression
|
||||||
|
-> Limit
|
||||||
|
-> Sort
|
||||||
|
Sort Key: (max(users.lastseen)) DESC
|
||||||
|
-> GroupAggregate
|
||||||
|
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
|
||||||
|
-> Sort
|
||||||
|
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
|
||||||
|
-> Nested Loop Left Join
|
||||||
|
-> Limit
|
||||||
|
-> Sort
|
||||||
|
Sort Key: users.lastseen DESC
|
||||||
|
-> Subquery Scan on users
|
||||||
|
-> Result
|
||||||
|
One-Time Filter: false
|
||||||
|
-> Limit
|
||||||
|
-> Sort
|
||||||
|
Sort Key: events.event_time DESC
|
||||||
|
-> Seq Scan on events_1400027 events
|
||||||
|
Filter: (composite_id = users.composite_id)
|
||||||
-- Test all tasks output
|
-- Test all tasks output
|
||||||
SET citus.explain_all_tasks TO on;
|
SET citus.explain_all_tasks TO on;
|
||||||
EXPLAIN (COSTS FALSE)
|
EXPLAIN (COSTS FALSE)
|
||||||
|
|
|
@ -376,6 +376,289 @@ HashAggregate
|
||||||
Group Key: lineitem.l_quantity
|
Group Key: lineitem.l_quantity
|
||||||
-> Seq Scan on public.lineitem_290001 lineitem
|
-> Seq Scan on public.lineitem_290001 lineitem
|
||||||
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
|
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
|
||||||
|
-- Subquery pushdown tests with explain
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
SELECT
|
||||||
|
avg(array_length(events, 1)) AS event_average
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
tenant_id,
|
||||||
|
user_id,
|
||||||
|
array_agg(event_type ORDER BY event_time) AS events
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
event_type,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type IN ('click', 'submit', 'pay')) AS subquery
|
||||||
|
GROUP BY
|
||||||
|
tenant_id,
|
||||||
|
user_id) AS subquery;
|
||||||
|
Aggregate
|
||||||
|
-> Custom Scan (Citus Real-Time)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=57637 dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
-> GroupAggregate
|
||||||
|
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
|
||||||
|
-> Sort
|
||||||
|
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
|
||||||
|
-> Result
|
||||||
|
One-Time Filter: false
|
||||||
|
-- Union and left join subquery pushdown
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
SELECT
|
||||||
|
avg(array_length(events, 1)) AS event_average,
|
||||||
|
hasdone
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
array_agg(event ORDER BY event_time) AS events,
|
||||||
|
COALESCE(hasdone, 'Has not done paying') AS hasdone
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id) as composite_id,
|
||||||
|
'action=>1'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'click')
|
||||||
|
UNION
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id) as composite_id,
|
||||||
|
'action=>2'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'submit')
|
||||||
|
) AS subquery_1
|
||||||
|
LEFT JOIN
|
||||||
|
(SELECT
|
||||||
|
DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id,
|
||||||
|
(composite_id).tenant_id,
|
||||||
|
(composite_id).user_id,
|
||||||
|
'Has done paying'::TEXT AS hasdone
|
||||||
|
FROM
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'pay') AS subquery_2
|
||||||
|
ON
|
||||||
|
subquery_1.composite_id = subquery_2.composite_id
|
||||||
|
GROUP BY
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
hasdone) AS subquery_top
|
||||||
|
GROUP BY
|
||||||
|
hasdone;
|
||||||
|
HashAggregate
|
||||||
|
Group Key: remote_scan.hasdone
|
||||||
|
-> Custom Scan (Citus Real-Time)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=57637 dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Group Key: COALESCE(subquery_2.hasdone, 'Has not done paying'::text)
|
||||||
|
-> GroupAggregate
|
||||||
|
Group Key: ((composite_id).tenant_id), ((composite_id).user_id), subquery_2.hasdone
|
||||||
|
-> Sort
|
||||||
|
Sort Key: ((composite_id).tenant_id), ((composite_id).user_id), subquery_2.hasdone
|
||||||
|
-> Hash Left Join
|
||||||
|
Hash Cond: (composite_id = subquery_2.composite_id)
|
||||||
|
-> Unique
|
||||||
|
-> Sort
|
||||||
|
Sort Key: ((composite_id).tenant_id), ((composite_id).user_id), composite_id, ('action=>1'::text), event_time
|
||||||
|
-> Append
|
||||||
|
-> Result
|
||||||
|
One-Time Filter: false
|
||||||
|
-> Result
|
||||||
|
One-Time Filter: false
|
||||||
|
-> Hash
|
||||||
|
-> Subquery Scan on subquery_2
|
||||||
|
-> Unique
|
||||||
|
-> Sort
|
||||||
|
Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id)
|
||||||
|
-> Seq Scan on events_1400027 events
|
||||||
|
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text))
|
||||||
|
-- Union, left join and having subquery pushdown
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
SELECT
|
||||||
|
avg(array_length(events, 1)) AS event_average,
|
||||||
|
count_pay
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
array_agg(event ORDER BY event_time) AS events,
|
||||||
|
COALESCE(count_pay, 0) AS count_pay
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id),
|
||||||
|
'action=>1'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'click')
|
||||||
|
UNION
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id),
|
||||||
|
'action=>2'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'submit')
|
||||||
|
) AS subquery_1
|
||||||
|
LEFT JOIN
|
||||||
|
(SELECT
|
||||||
|
(composite_id).tenant_id,
|
||||||
|
(composite_id).user_id,
|
||||||
|
composite_id,
|
||||||
|
COUNT(*) AS count_pay
|
||||||
|
FROM
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'pay'
|
||||||
|
GROUP BY
|
||||||
|
composite_id
|
||||||
|
HAVING
|
||||||
|
COUNT(*) > 2) AS subquery_2
|
||||||
|
ON
|
||||||
|
subquery_1.composite_id = subquery_2.composite_id
|
||||||
|
GROUP BY
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
count_pay) AS subquery_top
|
||||||
|
WHERE
|
||||||
|
array_ndims(events) > 0
|
||||||
|
GROUP BY
|
||||||
|
count_pay
|
||||||
|
ORDER BY
|
||||||
|
count_pay;
|
||||||
|
ERROR: bogus varattno for OUTER_VAR var: 3
|
||||||
|
-- Lateral join subquery pushdown
|
||||||
|
-- set subquery_pushdown due to limit in the query
|
||||||
|
SET citus.subquery_pushdown to ON;
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
SELECT
|
||||||
|
tenant_id,
|
||||||
|
user_id,
|
||||||
|
user_lastseen,
|
||||||
|
event_array
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
tenant_id,
|
||||||
|
user_id,
|
||||||
|
max(lastseen) as user_lastseen,
|
||||||
|
array_agg(event_type ORDER BY event_time) AS event_array
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
(composite_id).tenant_id,
|
||||||
|
(composite_id).user_id,
|
||||||
|
composite_id,
|
||||||
|
lastseen
|
||||||
|
FROM
|
||||||
|
users
|
||||||
|
WHERE
|
||||||
|
composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
composite_id <= '(1, 9223372036854775807)'::user_composite_type
|
||||||
|
ORDER BY
|
||||||
|
lastseen DESC
|
||||||
|
LIMIT
|
||||||
|
10
|
||||||
|
) AS subquery_top
|
||||||
|
LEFT JOIN LATERAL
|
||||||
|
(SELECT
|
||||||
|
event_type,
|
||||||
|
event_time
|
||||||
|
FROM
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(composite_id) = subquery_top.composite_id
|
||||||
|
ORDER BY
|
||||||
|
event_time DESC
|
||||||
|
LIMIT
|
||||||
|
99) AS subquery_lateral
|
||||||
|
ON
|
||||||
|
true
|
||||||
|
GROUP BY
|
||||||
|
tenant_id,
|
||||||
|
user_id
|
||||||
|
) AS shard_union
|
||||||
|
ORDER BY
|
||||||
|
user_lastseen DESC
|
||||||
|
LIMIT
|
||||||
|
10;
|
||||||
|
Limit
|
||||||
|
-> Sort
|
||||||
|
Sort Key: remote_scan.user_lastseen DESC
|
||||||
|
-> Custom Scan (Citus Real-Time)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=57637 dbname=regression
|
||||||
|
-> Limit
|
||||||
|
-> Sort
|
||||||
|
Sort Key: (max(lastseen)) DESC
|
||||||
|
-> GroupAggregate
|
||||||
|
Group Key: ((composite_id).tenant_id), ((composite_id).user_id)
|
||||||
|
-> Sort
|
||||||
|
Sort Key: ((composite_id).tenant_id), ((composite_id).user_id)
|
||||||
|
-> Nested Loop Left Join
|
||||||
|
-> Limit
|
||||||
|
-> Sort
|
||||||
|
Sort Key: lastseen DESC
|
||||||
|
-> Result
|
||||||
|
One-Time Filter: false
|
||||||
|
-> Limit
|
||||||
|
-> Sort
|
||||||
|
Sort Key: events.event_time DESC
|
||||||
|
-> Seq Scan on events_1400027 events
|
||||||
|
Filter: (composite_id = composite_id)
|
||||||
-- Test all tasks output
|
-- Test all tasks output
|
||||||
SET citus.explain_all_tasks TO on;
|
SET citus.explain_all_tasks TO on;
|
||||||
EXPLAIN (COSTS FALSE)
|
EXPLAIN (COSTS FALSE)
|
||||||
|
|
|
@ -0,0 +1,609 @@
|
||||||
|
--
|
||||||
|
-- MULTI_SUBQUERY
|
||||||
|
--
|
||||||
|
-- no need to set shardid sequence given that we're not creating any shards
|
||||||
|
SET citus.enable_router_execution TO FALSE;
|
||||||
|
-- Check that we error out if shard min/max values are not exactly same.
|
||||||
|
SELECT
|
||||||
|
avg(unit_price)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
avg(o_totalprice) AS unit_price
|
||||||
|
FROM
|
||||||
|
lineitem_subquery,
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
l_orderkey = o_orderkey
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey) AS unit_prices;
|
||||||
|
ERROR: cannot push down this subquery
|
||||||
|
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
|
||||||
|
-- Update metadata in order to make all shards equal
|
||||||
|
-- note that the table is created on multi_insert_select_create_table.sql
|
||||||
|
UPDATE
|
||||||
|
pg_dist_shard
|
||||||
|
SET
|
||||||
|
shardmaxvalue = '14947'
|
||||||
|
WHERE
|
||||||
|
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'orders_subquery'::regclass ORDER BY shardid DESC LIMIT 1);
|
||||||
|
-- If group by is not on partition column then we error out from single table
|
||||||
|
-- repartition code path
|
||||||
|
SELECT
|
||||||
|
avg(order_count)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_suppkey,
|
||||||
|
count(*) AS order_count
|
||||||
|
FROM
|
||||||
|
lineitem_subquery
|
||||||
|
GROUP BY
|
||||||
|
l_suppkey) AS order_counts;
|
||||||
|
ERROR: cannot use real time executor with repartition jobs
|
||||||
|
HINT: Set citus.task_executor_type to "task-tracker".
|
||||||
|
-- Check that we error out if join is not on partition columns.
|
||||||
|
SELECT
|
||||||
|
avg(unit_price)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
avg(o_totalprice / l_quantity) AS unit_price
|
||||||
|
FROM
|
||||||
|
lineitem_subquery,
|
||||||
|
orders_subquery
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey) AS unit_prices;
|
||||||
|
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
|
||||||
|
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
|
||||||
|
SELECT
|
||||||
|
avg(unit_price)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
avg(o_totalprice / l_quantity) AS unit_price
|
||||||
|
FROM
|
||||||
|
lineitem_subquery,
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
l_orderkey = o_custkey
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey) AS unit_prices;
|
||||||
|
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
|
||||||
|
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
|
||||||
|
-- Check that we error out if there is non relation subqueries
|
||||||
|
SELECT count(*) FROM
|
||||||
|
(
|
||||||
|
(SELECT l_orderkey FROM lineitem_subquery) UNION ALL
|
||||||
|
(SELECT 1::bigint)
|
||||||
|
) b;
|
||||||
|
ERROR: cannot push down this subquery
|
||||||
|
DETAIL: Subqueries without relations are unsupported
|
||||||
|
-- Check that we error out if queries in union do not include partition columns.
|
||||||
|
SELECT count(*) FROM
|
||||||
|
(
|
||||||
|
(SELECT l_orderkey FROM lineitem_subquery) UNION
|
||||||
|
(SELECT l_partkey FROM lineitem_subquery)
|
||||||
|
) b;
|
||||||
|
ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position
|
||||||
|
DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list.
|
||||||
|
-- Check that we run union queries if partition column is selected.
|
||||||
|
SELECT count(*) FROM
|
||||||
|
(
|
||||||
|
(SELECT l_orderkey FROM lineitem_subquery) UNION
|
||||||
|
(SELECT l_orderkey FROM lineitem_subquery)
|
||||||
|
) b;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
2985
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Check that we error out if inner query has Limit but subquery_pushdown is not set
|
||||||
|
SELECT
|
||||||
|
avg(o_totalprice/l_quantity)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
l_quantity
|
||||||
|
FROM
|
||||||
|
lineitem_subquery
|
||||||
|
ORDER BY
|
||||||
|
l_quantity
|
||||||
|
LIMIT 10
|
||||||
|
) lineitem_quantities
|
||||||
|
JOIN LATERAL
|
||||||
|
(SELECT
|
||||||
|
o_totalprice
|
||||||
|
FROM
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
lineitem_quantities.l_orderkey = o_orderkey) orders_price ON true;
|
||||||
|
ERROR: cannot push down this subquery
|
||||||
|
DETAIL: Limit in subquery is currently unsupported
|
||||||
|
-- Limit is only supported when subquery_pushdown is set
|
||||||
|
-- Check that we error out if inner query has limit but outer query has not.
|
||||||
|
SET citus.subquery_pushdown to ON;
|
||||||
|
SELECT
|
||||||
|
avg(o_totalprice/l_quantity)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
l_quantity
|
||||||
|
FROM
|
||||||
|
lineitem_subquery
|
||||||
|
ORDER BY
|
||||||
|
l_quantity
|
||||||
|
LIMIT 10
|
||||||
|
) lineitem_quantities
|
||||||
|
JOIN LATERAL
|
||||||
|
(SELECT
|
||||||
|
o_totalprice
|
||||||
|
FROM
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
lineitem_quantities.l_orderkey = o_orderkey) orders_price ON true;
|
||||||
|
ERROR: cannot push down this subquery
|
||||||
|
DETAIL: Limit in subquery without limit in the outermost query is unsupported
|
||||||
|
-- reset the flag for next query
|
||||||
|
SET citus.subquery_pushdown to OFF;
|
||||||
|
-- Check that we error out if the outermost query is a distinct clause.
|
||||||
|
SELECT
|
||||||
|
count(DISTINCT a)
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
count(*) a
|
||||||
|
FROM
|
||||||
|
lineitem_subquery
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey
|
||||||
|
) z;
|
||||||
|
ERROR: cannot push down this subquery
|
||||||
|
DETAIL: distinct in the outermost query is unsupported
|
||||||
|
-- Check supported subquery types.
|
||||||
|
SELECT
|
||||||
|
o_custkey,
|
||||||
|
sum(order_count) as total_order_count
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
o_orderkey,
|
||||||
|
o_custkey,
|
||||||
|
count(*) AS order_count
|
||||||
|
FROM
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
o_orderkey > 0 AND
|
||||||
|
o_orderkey < 12000
|
||||||
|
GROUP BY
|
||||||
|
o_orderkey, o_custkey) AS order_counts
|
||||||
|
GROUP BY
|
||||||
|
o_custkey
|
||||||
|
ORDER BY
|
||||||
|
total_order_count DESC,
|
||||||
|
o_custkey ASC
|
||||||
|
LIMIT 10;
|
||||||
|
o_custkey | total_order_count
|
||||||
|
-----------+-------------------
|
||||||
|
1462 | 9
|
||||||
|
619 | 8
|
||||||
|
643 | 8
|
||||||
|
1030 | 8
|
||||||
|
1486 | 8
|
||||||
|
79 | 7
|
||||||
|
304 | 7
|
||||||
|
319 | 7
|
||||||
|
343 | 7
|
||||||
|
448 | 7
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
avg(unit_price)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
avg(o_totalprice / l_quantity) AS unit_price
|
||||||
|
FROM
|
||||||
|
lineitem_subquery,
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
l_orderkey = o_orderkey
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey) AS unit_prices
|
||||||
|
WHERE
|
||||||
|
unit_price > 1000 AND
|
||||||
|
unit_price < 10000;
|
||||||
|
avg
|
||||||
|
-----------------------
|
||||||
|
4968.2889885208475549
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Check that if subquery is pulled, we don't error and run query properly.
|
||||||
|
SELECT count(*) FROM
|
||||||
|
(
|
||||||
|
SELECT l_orderkey FROM (
|
||||||
|
(SELECT l_orderkey FROM lineitem_subquery) UNION
|
||||||
|
(SELECT l_orderkey FROM lineitem_subquery)
|
||||||
|
) a
|
||||||
|
WHERE l_orderkey = 1
|
||||||
|
) b;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM
|
||||||
|
(
|
||||||
|
SELECT * FROM (
|
||||||
|
(SELECT * FROM lineitem_subquery) UNION
|
||||||
|
(SELECT * FROM lineitem_subquery)
|
||||||
|
) a
|
||||||
|
WHERE l_orderkey = 1
|
||||||
|
) b;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT max(l_orderkey) FROM
|
||||||
|
(
|
||||||
|
SELECT l_orderkey FROM (
|
||||||
|
SELECT
|
||||||
|
l_orderkey
|
||||||
|
FROM
|
||||||
|
lineitem_subquery
|
||||||
|
WHERE
|
||||||
|
l_orderkey < 20000
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey
|
||||||
|
) z
|
||||||
|
) y;
|
||||||
|
max
|
||||||
|
-------
|
||||||
|
14947
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Add one more shard to one relation, then test if we error out because of different
|
||||||
|
-- shard counts for joining relations.
|
||||||
|
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 15000, shardmaxvalue = 20000
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT
|
||||||
|
avg(unit_price)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
avg(o_totalprice / l_quantity) AS unit_price
|
||||||
|
FROM
|
||||||
|
lineitem_subquery,
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
l_orderkey = o_orderkey
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey) AS unit_prices;
|
||||||
|
ERROR: cannot push down this subquery
|
||||||
|
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
|
||||||
|
-- Check that we can prune shards in subqueries with VARCHAR partition columns
|
||||||
|
CREATE TABLE subquery_pruning_varchar_test_table
|
||||||
|
(
|
||||||
|
a varchar,
|
||||||
|
b int
|
||||||
|
);
|
||||||
|
SELECT master_create_distributed_table('subquery_pruning_varchar_test_table', 'a', 'hash');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1);
|
||||||
|
master_create_worker_shards
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a)
|
||||||
|
AS foo;
|
||||||
|
DEBUG: Skipping the target shard interval 570033 because SELECT query is pruned away for the interval
|
||||||
|
DEBUG: Skipping the target shard interval 570034 because SELECT query is pruned away for the interval
|
||||||
|
DEBUG: Skipping the target shard interval 570036 because SELECT query is pruned away for the interval
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE 'eren' = a GROUP BY a)
|
||||||
|
AS foo;
|
||||||
|
DEBUG: Skipping the target shard interval 570033 because SELECT query is pruned away for the interval
|
||||||
|
DEBUG: Skipping the target shard interval 570035 because SELECT query is pruned away for the interval
|
||||||
|
DEBUG: Skipping the target shard interval 570036 because SELECT query is pruned away for the interval
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SET client_min_messages TO NOTICE;
|
||||||
|
-- test subquery join on VARCHAR partition column
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT
|
||||||
|
a_inner AS a
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
subquery_pruning_varchar_test_table.a AS a_inner
|
||||||
|
FROM
|
||||||
|
subquery_pruning_varchar_test_table
|
||||||
|
GROUP BY
|
||||||
|
subquery_pruning_varchar_test_table.a
|
||||||
|
HAVING
|
||||||
|
count(subquery_pruning_varchar_test_table.a) < 3)
|
||||||
|
AS f1,
|
||||||
|
(SELECT
|
||||||
|
subquery_pruning_varchar_test_table.a
|
||||||
|
FROM
|
||||||
|
subquery_pruning_varchar_test_table
|
||||||
|
GROUP BY
|
||||||
|
subquery_pruning_varchar_test_table.a
|
||||||
|
HAVING
|
||||||
|
sum(coalesce(subquery_pruning_varchar_test_table.b,0)) > 20.0)
|
||||||
|
AS f2
|
||||||
|
WHERE
|
||||||
|
f1.a_inner = f2.a
|
||||||
|
GROUP BY
|
||||||
|
a_inner)
|
||||||
|
AS foo;
|
||||||
|
a
|
||||||
|
---
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
DROP TABLE subquery_pruning_varchar_test_table;
|
||||||
|
-- Simple join subquery pushdown
|
||||||
|
SELECT
|
||||||
|
avg(array_length(events, 1)) AS event_average
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
tenant_id,
|
||||||
|
user_id,
|
||||||
|
array_agg(event_type ORDER BY event_time) AS events
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
event_type,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type IN ('click', 'submit', 'pay')) AS subquery
|
||||||
|
GROUP BY
|
||||||
|
tenant_id,
|
||||||
|
user_id) AS subquery;
|
||||||
|
event_average
|
||||||
|
--------------------
|
||||||
|
3.6666666666666667
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Union and left join subquery pushdown
|
||||||
|
SELECT
|
||||||
|
avg(array_length(events, 1)) AS event_average,
|
||||||
|
hasdone
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
array_agg(event ORDER BY event_time) AS events,
|
||||||
|
COALESCE(hasdone, 'Has not done paying') AS hasdone
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id) as composite_id,
|
||||||
|
'action=>1'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'click')
|
||||||
|
UNION
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id) as composite_id,
|
||||||
|
'action=>2'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'submit')
|
||||||
|
) AS subquery_1
|
||||||
|
LEFT JOIN
|
||||||
|
(SELECT
|
||||||
|
DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id,
|
||||||
|
(composite_id).tenant_id,
|
||||||
|
(composite_id).user_id,
|
||||||
|
'Has done paying'::TEXT AS hasdone
|
||||||
|
FROM
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'pay') AS subquery_2
|
||||||
|
ON
|
||||||
|
subquery_1.composite_id = subquery_2.composite_id
|
||||||
|
GROUP BY
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
hasdone) AS subquery_top
|
||||||
|
GROUP BY
|
||||||
|
hasdone;
|
||||||
|
event_average | hasdone
|
||||||
|
--------------------+---------------------
|
||||||
|
4.0000000000000000 | Has not done paying
|
||||||
|
2.5000000000000000 | Has done paying
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- Union, left join and having subquery pushdown
|
||||||
|
SELECT
|
||||||
|
avg(array_length(events, 1)) AS event_average,
|
||||||
|
count_pay
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
array_agg(event ORDER BY event_time) AS events,
|
||||||
|
COALESCE(count_pay, 0) AS count_pay
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id),
|
||||||
|
'action=>1'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'click')
|
||||||
|
UNION
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id),
|
||||||
|
'action=>2'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'submit')
|
||||||
|
) AS subquery_1
|
||||||
|
LEFT JOIN
|
||||||
|
(SELECT
|
||||||
|
(composite_id).tenant_id,
|
||||||
|
(composite_id).user_id,
|
||||||
|
composite_id,
|
||||||
|
COUNT(*) AS count_pay
|
||||||
|
FROM
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'pay'
|
||||||
|
GROUP BY
|
||||||
|
composite_id
|
||||||
|
HAVING
|
||||||
|
COUNT(*) > 2) AS subquery_2
|
||||||
|
ON
|
||||||
|
subquery_1.composite_id = subquery_2.composite_id
|
||||||
|
GROUP BY
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
count_pay) AS subquery_top
|
||||||
|
WHERE
|
||||||
|
array_ndims(events) > 0
|
||||||
|
GROUP BY
|
||||||
|
count_pay
|
||||||
|
ORDER BY
|
||||||
|
count_pay;
|
||||||
|
event_average | count_pay
|
||||||
|
--------------------+-----------
|
||||||
|
3.0000000000000000 | 0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
|
||||||
|
-- Lateral join subquery pushdown
|
||||||
|
-- set subquery_pushdown since there is limit in the query
|
||||||
|
SET citus.subquery_pushdown to ON;
|
||||||
|
SELECT
|
||||||
|
tenant_id,
|
||||||
|
user_id,
|
||||||
|
user_lastseen,
|
||||||
|
event_array
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
tenant_id,
|
||||||
|
user_id,
|
||||||
|
max(lastseen) as user_lastseen,
|
||||||
|
array_agg(event_type ORDER BY event_time) AS event_array
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
(composite_id).tenant_id,
|
||||||
|
(composite_id).user_id,
|
||||||
|
composite_id,
|
||||||
|
lastseen
|
||||||
|
FROM
|
||||||
|
users
|
||||||
|
WHERE
|
||||||
|
composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
composite_id <= '(1, 9223372036854775807)'::user_composite_type
|
||||||
|
ORDER BY
|
||||||
|
lastseen DESC
|
||||||
|
LIMIT
|
||||||
|
10
|
||||||
|
) AS subquery_top
|
||||||
|
LEFT JOIN LATERAL
|
||||||
|
(SELECT
|
||||||
|
event_type,
|
||||||
|
event_time
|
||||||
|
FROM
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(composite_id) = subquery_top.composite_id
|
||||||
|
ORDER BY
|
||||||
|
event_time DESC
|
||||||
|
LIMIT
|
||||||
|
99) AS subquery_lateral
|
||||||
|
ON
|
||||||
|
true
|
||||||
|
GROUP BY
|
||||||
|
tenant_id,
|
||||||
|
user_id
|
||||||
|
) AS shard_union
|
||||||
|
ORDER BY
|
||||||
|
user_lastseen DESC
|
||||||
|
LIMIT
|
||||||
|
10;
|
||||||
|
tenant_id | user_id | user_lastseen | event_array
|
||||||
|
-----------+---------+---------------+----------------------------
|
||||||
|
1 | 1003 | 1472807315 | {click,click,click,submit}
|
||||||
|
1 | 1002 | 1472807215 | {click,click,submit,pay}
|
||||||
|
1 | 1001 | 1472807115 | {click,submit,pay}
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
-- cleanup the tables and the type & functions
|
||||||
|
-- also set the min messages to WARNING to skip
|
||||||
|
-- CASCADE NOTICE messagez
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP TABLE users, events;
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
DROP TYPE user_composite_type CASCADE;
|
||||||
|
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- createed in multi_behavioral_analytics_create_table
|
||||||
|
DROP FUNCTION run_command_on_master_and_workers(p_sql text);
|
||||||
|
SET client_min_messages TO DEFAULT;
|
||||||
|
SET citus.subquery_pushdown to OFF;
|
||||||
|
SET citus.enable_router_execution TO 'true';
|
|
@ -41,3 +41,323 @@ CREATE INDEX is_index4 ON events_table(event_type);
|
||||||
CREATE INDEX is_index5 ON users_table(value_2);
|
CREATE INDEX is_index5 ON users_table(value_2);
|
||||||
CREATE INDEX is_index6 ON events_table(value_2);
|
CREATE INDEX is_index6 ON events_table(value_2);
|
||||||
|
|
||||||
|
-- create a helper function to create types/functions on each node
|
||||||
|
CREATE FUNCTION run_command_on_master_and_workers(p_sql text)
|
||||||
|
RETURNS void LANGUAGE plpgsql AS $$
|
||||||
|
BEGIN
|
||||||
|
EXECUTE p_sql;
|
||||||
|
PERFORM run_command_on_workers(p_sql);
|
||||||
|
END;$$;
|
||||||
|
|
||||||
|
-- Create composite type to use in subquery pushdown
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE TYPE user_composite_type AS
|
||||||
|
(
|
||||||
|
tenant_id BIGINT,
|
||||||
|
user_id BIGINT
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION cmp_user_composite_type_function(user_composite_type, user_composite_type) RETURNS int
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'btrecordcmp'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION gt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_gt'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION ge_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_ge'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION equal_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_eq'
|
||||||
|
IMMUTABLE;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION lt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_lt'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION le_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_lt'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR > (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = gt_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR >= (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = ge_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
-- ... use that function to create a custom equality operator...
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
-- ... use that function to create a custom equality operator...
|
||||||
|
CREATE OPERATOR = (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = equal_user_composite_type_function,
|
||||||
|
commutator = =,
|
||||||
|
RESTRICT = eqsel,
|
||||||
|
JOIN = eqjoinsel,
|
||||||
|
merges,
|
||||||
|
hashes
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR <= (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = le_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR < (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = lt_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
|
||||||
|
-- ... and create a custom operator family for hash indexes...
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR FAMILY cats_2_op_fam USING hash;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
|
||||||
|
-- ... create a test HASH function. Though it is a poor hash function,
|
||||||
|
-- it is acceptable for our tests
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION test_composite_type_hash(user_composite_type) RETURNS int
|
||||||
|
AS 'SELECT hashtext( ($1.tenant_id + $1.tenant_id)::text);'
|
||||||
|
LANGUAGE SQL
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
|
||||||
|
-- We need to define two different operator classes for the composite types
|
||||||
|
-- One uses BTREE the other uses HASH
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR CLASS cats_2_op_fam_clas3
|
||||||
|
DEFAULT FOR TYPE user_composite_type USING BTREE AS
|
||||||
|
OPERATOR 1 <= (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 2 < (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 3 = (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 4 >= (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 5 > (user_composite_type, user_composite_type),
|
||||||
|
|
||||||
|
FUNCTION 1 cmp_user_composite_type_function(user_composite_type, user_composite_type);
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR CLASS cats_2_op_fam_class
|
||||||
|
DEFAULT FOR TYPE user_composite_type USING HASH AS
|
||||||
|
OPERATOR 1 = (user_composite_type, user_composite_type),
|
||||||
|
FUNCTION 1 test_composite_type_hash(user_composite_type);
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
CREATE TABLE events (
|
||||||
|
composite_id user_composite_type,
|
||||||
|
event_id bigint,
|
||||||
|
event_type character varying(255),
|
||||||
|
event_time bigint
|
||||||
|
);
|
||||||
|
SELECT master_create_distributed_table('events', 'composite_id', 'range');
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
\COPY events FROM STDIN WITH CSV
|
||||||
|
"(1,1001)",20001,click,1472807012
|
||||||
|
"(1,1001)",20002,submit,1472807015
|
||||||
|
"(1,1001)",20003,pay,1472807020
|
||||||
|
"(1,1002)",20010,click,1472807022
|
||||||
|
"(1,1002)",20011,click,1472807023
|
||||||
|
"(1,1002)",20012,submit,1472807025
|
||||||
|
"(1,1002)",20013,pay,1472807030
|
||||||
|
"(1,1003)",20014,click,1472807032
|
||||||
|
"(1,1003)",20015,click,1472807033
|
||||||
|
"(1,1003)",20016,click,1472807034
|
||||||
|
"(1,1003)",20017,submit,1472807035
|
||||||
|
\.
|
||||||
|
|
||||||
|
CREATE TABLE users (
|
||||||
|
composite_id user_composite_type,
|
||||||
|
lastseen bigint
|
||||||
|
);
|
||||||
|
SELECT master_create_distributed_table('users', 'composite_id', 'range');
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
"(1,1001)",1472807115
|
||||||
|
"(1,1002)",1472807215
|
||||||
|
"(1,1003)",1472807315
|
||||||
|
\.
|
||||||
|
|
||||||
|
-- Create tables for subquery tests
|
||||||
|
CREATE TABLE lineitem_subquery (
|
||||||
|
l_orderkey bigint not null,
|
||||||
|
l_partkey integer not null,
|
||||||
|
l_suppkey integer not null,
|
||||||
|
l_linenumber integer not null,
|
||||||
|
l_quantity decimal(15, 2) not null,
|
||||||
|
l_extendedprice decimal(15, 2) not null,
|
||||||
|
l_discount decimal(15, 2) not null,
|
||||||
|
l_tax decimal(15, 2) not null,
|
||||||
|
l_returnflag char(1) not null,
|
||||||
|
l_linestatus char(1) not null,
|
||||||
|
l_shipdate date not null,
|
||||||
|
l_commitdate date not null,
|
||||||
|
l_receiptdate date not null,
|
||||||
|
l_shipinstruct char(25) not null,
|
||||||
|
l_shipmode char(10) not null,
|
||||||
|
l_comment varchar(44) not null,
|
||||||
|
PRIMARY KEY(l_orderkey, l_linenumber) );
|
||||||
|
SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range');
|
||||||
|
|
||||||
|
CREATE TABLE orders_subquery (
|
||||||
|
o_orderkey bigint not null,
|
||||||
|
o_custkey integer not null,
|
||||||
|
o_orderstatus char(1) not null,
|
||||||
|
o_totalprice decimal(15,2) not null,
|
||||||
|
o_orderdate date not null,
|
||||||
|
o_orderpriority char(15) not null,
|
||||||
|
o_clerk char(15) not null,
|
||||||
|
o_shippriority integer not null,
|
||||||
|
o_comment varchar(79) not null,
|
||||||
|
PRIMARY KEY(o_orderkey) );
|
||||||
|
SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range');
|
||||||
|
|
||||||
|
SET citus.enable_router_execution TO 'false';
|
||||||
|
|
||||||
|
-- Check that we don't crash if there are not any shards.
|
||||||
|
SELECT
|
||||||
|
avg(unit_price)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
avg(o_totalprice) AS unit_price
|
||||||
|
FROM
|
||||||
|
lineitem_subquery,
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
l_orderkey = o_orderkey
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey) AS unit_prices;
|
||||||
|
|
||||||
|
-- Load data into tables.
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14946
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SET citus.shard_max_size TO "1MB";
|
||||||
|
|
||||||
|
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
|
||||||
|
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
|
||||||
|
|
||||||
|
\copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
|
||||||
|
\copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -62,3 +62,369 @@ CREATE INDEX is_index3 ON users_table(value_1);
|
||||||
CREATE INDEX is_index4 ON events_table(event_type);
|
CREATE INDEX is_index4 ON events_table(event_type);
|
||||||
CREATE INDEX is_index5 ON users_table(value_2);
|
CREATE INDEX is_index5 ON users_table(value_2);
|
||||||
CREATE INDEX is_index6 ON events_table(value_2);
|
CREATE INDEX is_index6 ON events_table(value_2);
|
||||||
|
-- create a helper function to create types/functions on each node
|
||||||
|
CREATE FUNCTION run_command_on_master_and_workers(p_sql text)
|
||||||
|
RETURNS void LANGUAGE plpgsql AS $$
|
||||||
|
BEGIN
|
||||||
|
EXECUTE p_sql;
|
||||||
|
PERFORM run_command_on_workers(p_sql);
|
||||||
|
END;$$;
|
||||||
|
-- Create composite type to use in subquery pushdown
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE TYPE user_composite_type AS
|
||||||
|
(
|
||||||
|
tenant_id BIGINT,
|
||||||
|
user_id BIGINT
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION cmp_user_composite_type_function(user_composite_type, user_composite_type) RETURNS int
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'btrecordcmp'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION gt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_gt'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION ge_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_ge'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION equal_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_eq'
|
||||||
|
IMMUTABLE;
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION lt_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_lt'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION le_user_composite_type_function(user_composite_type, user_composite_type) RETURNS boolean
|
||||||
|
LANGUAGE 'internal'
|
||||||
|
AS 'record_lt'
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR > (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = gt_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR >= (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = ge_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- ... use that function to create a custom equality operator...
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
-- ... use that function to create a custom equality operator...
|
||||||
|
CREATE OPERATOR = (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = equal_user_composite_type_function,
|
||||||
|
commutator = =,
|
||||||
|
RESTRICT = eqsel,
|
||||||
|
JOIN = eqjoinsel,
|
||||||
|
merges,
|
||||||
|
hashes
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR <= (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = le_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR < (
|
||||||
|
LEFTARG = user_composite_type,
|
||||||
|
RIGHTARG = user_composite_type,
|
||||||
|
PROCEDURE = lt_user_composite_type_function
|
||||||
|
);
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- ... and create a custom operator family for hash indexes...
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR FAMILY cats_2_op_fam USING hash;
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- ... create a test HASH function. Though it is a poor hash function,
|
||||||
|
-- it is acceptable for our tests
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE FUNCTION test_composite_type_hash(user_composite_type) RETURNS int
|
||||||
|
AS 'SELECT hashtext( ($1.tenant_id + $1.tenant_id)::text);'
|
||||||
|
LANGUAGE SQL
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- We need to define two different operator classes for the composite types
|
||||||
|
-- One uses BTREE the other uses HASH
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR CLASS cats_2_op_fam_clas3
|
||||||
|
DEFAULT FOR TYPE user_composite_type USING BTREE AS
|
||||||
|
OPERATOR 1 <= (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 2 < (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 3 = (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 4 >= (user_composite_type, user_composite_type),
|
||||||
|
OPERATOR 5 > (user_composite_type, user_composite_type),
|
||||||
|
|
||||||
|
FUNCTION 1 cmp_user_composite_type_function(user_composite_type, user_composite_type);
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
CREATE OPERATOR CLASS cats_2_op_fam_class
|
||||||
|
DEFAULT FOR TYPE user_composite_type USING HASH AS
|
||||||
|
OPERATOR 1 = (user_composite_type, user_composite_type),
|
||||||
|
FUNCTION 1 test_composite_type_hash(user_composite_type);
|
||||||
|
$f$);
|
||||||
|
run_command_on_master_and_workers
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE events (
|
||||||
|
composite_id user_composite_type,
|
||||||
|
event_id bigint,
|
||||||
|
event_type character varying(255),
|
||||||
|
event_time bigint
|
||||||
|
);
|
||||||
|
SELECT master_create_distributed_table('events', 'composite_id', 'range');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT master_create_empty_shard('events') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
\COPY events FROM STDIN WITH CSV
|
||||||
|
CREATE TABLE users (
|
||||||
|
composite_id user_composite_type,
|
||||||
|
lastseen bigint
|
||||||
|
);
|
||||||
|
SELECT master_create_distributed_table('users', 'composite_id', 'range');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT master_create_empty_shard('users') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
-- Create tables for subquery tests
|
||||||
|
CREATE TABLE lineitem_subquery (
|
||||||
|
l_orderkey bigint not null,
|
||||||
|
l_partkey integer not null,
|
||||||
|
l_suppkey integer not null,
|
||||||
|
l_linenumber integer not null,
|
||||||
|
l_quantity decimal(15, 2) not null,
|
||||||
|
l_extendedprice decimal(15, 2) not null,
|
||||||
|
l_discount decimal(15, 2) not null,
|
||||||
|
l_tax decimal(15, 2) not null,
|
||||||
|
l_returnflag char(1) not null,
|
||||||
|
l_linestatus char(1) not null,
|
||||||
|
l_shipdate date not null,
|
||||||
|
l_commitdate date not null,
|
||||||
|
l_receiptdate date not null,
|
||||||
|
l_shipinstruct char(25) not null,
|
||||||
|
l_shipmode char(10) not null,
|
||||||
|
l_comment varchar(44) not null,
|
||||||
|
PRIMARY KEY(l_orderkey, l_linenumber) );
|
||||||
|
SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE orders_subquery (
|
||||||
|
o_orderkey bigint not null,
|
||||||
|
o_custkey integer not null,
|
||||||
|
o_orderstatus char(1) not null,
|
||||||
|
o_totalprice decimal(15,2) not null,
|
||||||
|
o_orderdate date not null,
|
||||||
|
o_orderpriority char(15) not null,
|
||||||
|
o_clerk char(15) not null,
|
||||||
|
o_shippriority integer not null,
|
||||||
|
o_comment varchar(79) not null,
|
||||||
|
PRIMARY KEY(o_orderkey) );
|
||||||
|
SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.enable_router_execution TO 'false';
|
||||||
|
-- Check that we don't crash if there are not any shards.
|
||||||
|
SELECT
|
||||||
|
avg(unit_price)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
avg(o_totalprice) AS unit_price
|
||||||
|
FROM
|
||||||
|
lineitem_subquery,
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
l_orderkey = o_orderkey
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey) AS unit_prices;
|
||||||
|
avg
|
||||||
|
-----
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Load data into tables.
|
||||||
|
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14946
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SET citus.shard_max_size TO "1MB";
|
||||||
|
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
|
||||||
|
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
|
||||||
|
\copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
|
||||||
|
\copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -126,6 +126,224 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE)
|
||||||
GROUP BY l_quantity
|
GROUP BY l_quantity
|
||||||
HAVING l_quantity > (100 * random());
|
HAVING l_quantity > (100 * random());
|
||||||
|
|
||||||
|
|
||||||
|
-- Subquery pushdown tests with explain
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
SELECT
|
||||||
|
avg(array_length(events, 1)) AS event_average
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
tenant_id,
|
||||||
|
user_id,
|
||||||
|
array_agg(event_type ORDER BY event_time) AS events
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
event_type,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type IN ('click', 'submit', 'pay')) AS subquery
|
||||||
|
GROUP BY
|
||||||
|
tenant_id,
|
||||||
|
user_id) AS subquery;
|
||||||
|
|
||||||
|
-- Union and left join subquery pushdown
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
SELECT
|
||||||
|
avg(array_length(events, 1)) AS event_average,
|
||||||
|
hasdone
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
array_agg(event ORDER BY event_time) AS events,
|
||||||
|
COALESCE(hasdone, 'Has not done paying') AS hasdone
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id) as composite_id,
|
||||||
|
'action=>1'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'click')
|
||||||
|
UNION
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id) as composite_id,
|
||||||
|
'action=>2'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'submit')
|
||||||
|
) AS subquery_1
|
||||||
|
LEFT JOIN
|
||||||
|
(SELECT
|
||||||
|
DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id,
|
||||||
|
(composite_id).tenant_id,
|
||||||
|
(composite_id).user_id,
|
||||||
|
'Has done paying'::TEXT AS hasdone
|
||||||
|
FROM
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'pay') AS subquery_2
|
||||||
|
ON
|
||||||
|
subquery_1.composite_id = subquery_2.composite_id
|
||||||
|
GROUP BY
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
hasdone) AS subquery_top
|
||||||
|
GROUP BY
|
||||||
|
hasdone;
|
||||||
|
|
||||||
|
-- Union, left join and having subquery pushdown
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
SELECT
|
||||||
|
avg(array_length(events, 1)) AS event_average,
|
||||||
|
count_pay
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
array_agg(event ORDER BY event_time) AS events,
|
||||||
|
COALESCE(count_pay, 0) AS count_pay
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id),
|
||||||
|
'action=>1'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'click')
|
||||||
|
UNION
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id),
|
||||||
|
'action=>2'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'submit')
|
||||||
|
) AS subquery_1
|
||||||
|
LEFT JOIN
|
||||||
|
(SELECT
|
||||||
|
(composite_id).tenant_id,
|
||||||
|
(composite_id).user_id,
|
||||||
|
composite_id,
|
||||||
|
COUNT(*) AS count_pay
|
||||||
|
FROM
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'pay'
|
||||||
|
GROUP BY
|
||||||
|
composite_id
|
||||||
|
HAVING
|
||||||
|
COUNT(*) > 2) AS subquery_2
|
||||||
|
ON
|
||||||
|
subquery_1.composite_id = subquery_2.composite_id
|
||||||
|
GROUP BY
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
count_pay) AS subquery_top
|
||||||
|
WHERE
|
||||||
|
array_ndims(events) > 0
|
||||||
|
GROUP BY
|
||||||
|
count_pay
|
||||||
|
ORDER BY
|
||||||
|
count_pay;
|
||||||
|
|
||||||
|
-- Lateral join subquery pushdown
|
||||||
|
-- set subquery_pushdown due to limit in the query
|
||||||
|
SET citus.subquery_pushdown to ON;
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
SELECT
|
||||||
|
tenant_id,
|
||||||
|
user_id,
|
||||||
|
user_lastseen,
|
||||||
|
event_array
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
tenant_id,
|
||||||
|
user_id,
|
||||||
|
max(lastseen) as user_lastseen,
|
||||||
|
array_agg(event_type ORDER BY event_time) AS event_array
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
(composite_id).tenant_id,
|
||||||
|
(composite_id).user_id,
|
||||||
|
composite_id,
|
||||||
|
lastseen
|
||||||
|
FROM
|
||||||
|
users
|
||||||
|
WHERE
|
||||||
|
composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
composite_id <= '(1, 9223372036854775807)'::user_composite_type
|
||||||
|
ORDER BY
|
||||||
|
lastseen DESC
|
||||||
|
LIMIT
|
||||||
|
10
|
||||||
|
) AS subquery_top
|
||||||
|
LEFT JOIN LATERAL
|
||||||
|
(SELECT
|
||||||
|
event_type,
|
||||||
|
event_time
|
||||||
|
FROM
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(composite_id) = subquery_top.composite_id
|
||||||
|
ORDER BY
|
||||||
|
event_time DESC
|
||||||
|
LIMIT
|
||||||
|
99) AS subquery_lateral
|
||||||
|
ON
|
||||||
|
true
|
||||||
|
GROUP BY
|
||||||
|
tenant_id,
|
||||||
|
user_id
|
||||||
|
) AS shard_union
|
||||||
|
ORDER BY
|
||||||
|
user_lastseen DESC
|
||||||
|
LIMIT
|
||||||
|
10;
|
||||||
|
|
||||||
-- Test all tasks output
|
-- Test all tasks output
|
||||||
SET citus.explain_all_tasks TO on;
|
SET citus.explain_all_tasks TO on;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,538 @@
|
||||||
|
--
|
||||||
|
-- MULTI_SUBQUERY
|
||||||
|
--
|
||||||
|
-- no need to set shardid sequence given that we're not creating any shards
|
||||||
|
|
||||||
|
SET citus.enable_router_execution TO FALSE;
|
||||||
|
|
||||||
|
-- Check that we error out if shard min/max values are not exactly same.
|
||||||
|
SELECT
|
||||||
|
avg(unit_price)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
avg(o_totalprice) AS unit_price
|
||||||
|
FROM
|
||||||
|
lineitem_subquery,
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
l_orderkey = o_orderkey
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey) AS unit_prices;
|
||||||
|
|
||||||
|
-- Update metadata in order to make all shards equal
|
||||||
|
-- note that the table is created on multi_insert_select_create_table.sql
|
||||||
|
UPDATE
|
||||||
|
pg_dist_shard
|
||||||
|
SET
|
||||||
|
shardmaxvalue = '14947'
|
||||||
|
WHERE
|
||||||
|
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'orders_subquery'::regclass ORDER BY shardid DESC LIMIT 1);
|
||||||
|
|
||||||
|
-- If group by is not on partition column then we error out from single table
|
||||||
|
-- repartition code path
|
||||||
|
SELECT
|
||||||
|
avg(order_count)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_suppkey,
|
||||||
|
count(*) AS order_count
|
||||||
|
FROM
|
||||||
|
lineitem_subquery
|
||||||
|
GROUP BY
|
||||||
|
l_suppkey) AS order_counts;
|
||||||
|
|
||||||
|
-- Check that we error out if join is not on partition columns.
|
||||||
|
SELECT
|
||||||
|
avg(unit_price)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
avg(o_totalprice / l_quantity) AS unit_price
|
||||||
|
FROM
|
||||||
|
lineitem_subquery,
|
||||||
|
orders_subquery
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey) AS unit_prices;
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
avg(unit_price)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
avg(o_totalprice / l_quantity) AS unit_price
|
||||||
|
FROM
|
||||||
|
lineitem_subquery,
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
l_orderkey = o_custkey
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey) AS unit_prices;
|
||||||
|
|
||||||
|
-- Check that we error out if there is non relation subqueries
|
||||||
|
SELECT count(*) FROM
|
||||||
|
(
|
||||||
|
(SELECT l_orderkey FROM lineitem_subquery) UNION ALL
|
||||||
|
(SELECT 1::bigint)
|
||||||
|
) b;
|
||||||
|
|
||||||
|
|
||||||
|
-- Check that we error out if queries in union do not include partition columns.
|
||||||
|
|
||||||
|
SELECT count(*) FROM
|
||||||
|
(
|
||||||
|
(SELECT l_orderkey FROM lineitem_subquery) UNION
|
||||||
|
(SELECT l_partkey FROM lineitem_subquery)
|
||||||
|
) b;
|
||||||
|
|
||||||
|
-- Check that we run union queries if partition column is selected.
|
||||||
|
|
||||||
|
SELECT count(*) FROM
|
||||||
|
(
|
||||||
|
(SELECT l_orderkey FROM lineitem_subquery) UNION
|
||||||
|
(SELECT l_orderkey FROM lineitem_subquery)
|
||||||
|
) b;
|
||||||
|
-- Check that we error out if inner query has Limit but subquery_pushdown is not set
|
||||||
|
SELECT
|
||||||
|
avg(o_totalprice/l_quantity)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
l_quantity
|
||||||
|
FROM
|
||||||
|
lineitem_subquery
|
||||||
|
ORDER BY
|
||||||
|
l_quantity
|
||||||
|
LIMIT 10
|
||||||
|
) lineitem_quantities
|
||||||
|
JOIN LATERAL
|
||||||
|
(SELECT
|
||||||
|
o_totalprice
|
||||||
|
FROM
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
lineitem_quantities.l_orderkey = o_orderkey) orders_price ON true;
|
||||||
|
|
||||||
|
-- Limit is only supported when subquery_pushdown is set
|
||||||
|
-- Check that we error out if inner query has limit but outer query has not.
|
||||||
|
SET citus.subquery_pushdown to ON;
|
||||||
|
SELECT
|
||||||
|
avg(o_totalprice/l_quantity)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
l_quantity
|
||||||
|
FROM
|
||||||
|
lineitem_subquery
|
||||||
|
ORDER BY
|
||||||
|
l_quantity
|
||||||
|
LIMIT 10
|
||||||
|
) lineitem_quantities
|
||||||
|
JOIN LATERAL
|
||||||
|
(SELECT
|
||||||
|
o_totalprice
|
||||||
|
FROM
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
lineitem_quantities.l_orderkey = o_orderkey) orders_price ON true;
|
||||||
|
|
||||||
|
-- reset the flag for next query
|
||||||
|
SET citus.subquery_pushdown to OFF;
|
||||||
|
|
||||||
|
-- Check that we error out if the outermost query is a distinct clause.
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
count(DISTINCT a)
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
count(*) a
|
||||||
|
FROM
|
||||||
|
lineitem_subquery
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey
|
||||||
|
) z;
|
||||||
|
|
||||||
|
-- Check supported subquery types.
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
o_custkey,
|
||||||
|
sum(order_count) as total_order_count
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
o_orderkey,
|
||||||
|
o_custkey,
|
||||||
|
count(*) AS order_count
|
||||||
|
FROM
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
o_orderkey > 0 AND
|
||||||
|
o_orderkey < 12000
|
||||||
|
GROUP BY
|
||||||
|
o_orderkey, o_custkey) AS order_counts
|
||||||
|
GROUP BY
|
||||||
|
o_custkey
|
||||||
|
ORDER BY
|
||||||
|
total_order_count DESC,
|
||||||
|
o_custkey ASC
|
||||||
|
LIMIT 10;
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
avg(unit_price)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
avg(o_totalprice / l_quantity) AS unit_price
|
||||||
|
FROM
|
||||||
|
lineitem_subquery,
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
l_orderkey = o_orderkey
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey) AS unit_prices
|
||||||
|
WHERE
|
||||||
|
unit_price > 1000 AND
|
||||||
|
unit_price < 10000;
|
||||||
|
|
||||||
|
-- Check that if subquery is pulled, we don't error and run query properly.
|
||||||
|
|
||||||
|
SELECT count(*) FROM
|
||||||
|
(
|
||||||
|
SELECT l_orderkey FROM (
|
||||||
|
(SELECT l_orderkey FROM lineitem_subquery) UNION
|
||||||
|
(SELECT l_orderkey FROM lineitem_subquery)
|
||||||
|
) a
|
||||||
|
WHERE l_orderkey = 1
|
||||||
|
) b;
|
||||||
|
|
||||||
|
SELECT count(*) FROM
|
||||||
|
(
|
||||||
|
SELECT * FROM (
|
||||||
|
(SELECT * FROM lineitem_subquery) UNION
|
||||||
|
(SELECT * FROM lineitem_subquery)
|
||||||
|
) a
|
||||||
|
WHERE l_orderkey = 1
|
||||||
|
) b;
|
||||||
|
|
||||||
|
SELECT max(l_orderkey) FROM
|
||||||
|
(
|
||||||
|
SELECT l_orderkey FROM (
|
||||||
|
SELECT
|
||||||
|
l_orderkey
|
||||||
|
FROM
|
||||||
|
lineitem_subquery
|
||||||
|
WHERE
|
||||||
|
l_orderkey < 20000
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey
|
||||||
|
) z
|
||||||
|
) y;
|
||||||
|
|
||||||
|
-- Add one more shard to one relation, then test if we error out because of different
|
||||||
|
-- shard counts for joining relations.
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 15000, shardmaxvalue = 20000
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
avg(unit_price)
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
l_orderkey,
|
||||||
|
avg(o_totalprice / l_quantity) AS unit_price
|
||||||
|
FROM
|
||||||
|
lineitem_subquery,
|
||||||
|
orders_subquery
|
||||||
|
WHERE
|
||||||
|
l_orderkey = o_orderkey
|
||||||
|
GROUP BY
|
||||||
|
l_orderkey) AS unit_prices;
|
||||||
|
|
||||||
|
-- Check that we can prune shards in subqueries with VARCHAR partition columns
|
||||||
|
|
||||||
|
CREATE TABLE subquery_pruning_varchar_test_table
|
||||||
|
(
|
||||||
|
a varchar,
|
||||||
|
b int
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT master_create_distributed_table('subquery_pruning_varchar_test_table', 'a', 'hash');
|
||||||
|
SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1);
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a)
|
||||||
|
AS foo;
|
||||||
|
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE 'eren' = a GROUP BY a)
|
||||||
|
AS foo;
|
||||||
|
|
||||||
|
SET client_min_messages TO NOTICE;
|
||||||
|
|
||||||
|
-- test subquery join on VARCHAR partition column
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT
|
||||||
|
a_inner AS a
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
subquery_pruning_varchar_test_table.a AS a_inner
|
||||||
|
FROM
|
||||||
|
subquery_pruning_varchar_test_table
|
||||||
|
GROUP BY
|
||||||
|
subquery_pruning_varchar_test_table.a
|
||||||
|
HAVING
|
||||||
|
count(subquery_pruning_varchar_test_table.a) < 3)
|
||||||
|
AS f1,
|
||||||
|
|
||||||
|
(SELECT
|
||||||
|
subquery_pruning_varchar_test_table.a
|
||||||
|
FROM
|
||||||
|
subquery_pruning_varchar_test_table
|
||||||
|
GROUP BY
|
||||||
|
subquery_pruning_varchar_test_table.a
|
||||||
|
HAVING
|
||||||
|
sum(coalesce(subquery_pruning_varchar_test_table.b,0)) > 20.0)
|
||||||
|
AS f2
|
||||||
|
WHERE
|
||||||
|
f1.a_inner = f2.a
|
||||||
|
GROUP BY
|
||||||
|
a_inner)
|
||||||
|
AS foo;
|
||||||
|
|
||||||
|
DROP TABLE subquery_pruning_varchar_test_table;
|
||||||
|
|
||||||
|
-- Simple join subquery pushdown
|
||||||
|
SELECT
|
||||||
|
avg(array_length(events, 1)) AS event_average
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
tenant_id,
|
||||||
|
user_id,
|
||||||
|
array_agg(event_type ORDER BY event_time) AS events
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
event_type,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type IN ('click', 'submit', 'pay')) AS subquery
|
||||||
|
GROUP BY
|
||||||
|
tenant_id,
|
||||||
|
user_id) AS subquery;
|
||||||
|
|
||||||
|
-- Union and left join subquery pushdown
|
||||||
|
SELECT
|
||||||
|
avg(array_length(events, 1)) AS event_average,
|
||||||
|
hasdone
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
array_agg(event ORDER BY event_time) AS events,
|
||||||
|
COALESCE(hasdone, 'Has not done paying') AS hasdone
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id) as composite_id,
|
||||||
|
'action=>1'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'click')
|
||||||
|
UNION
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id) as composite_id,
|
||||||
|
'action=>2'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'submit')
|
||||||
|
) AS subquery_1
|
||||||
|
LEFT JOIN
|
||||||
|
(SELECT
|
||||||
|
DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id,
|
||||||
|
(composite_id).tenant_id,
|
||||||
|
(composite_id).user_id,
|
||||||
|
'Has done paying'::TEXT AS hasdone
|
||||||
|
FROM
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'pay') AS subquery_2
|
||||||
|
ON
|
||||||
|
subquery_1.composite_id = subquery_2.composite_id
|
||||||
|
GROUP BY
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
hasdone) AS subquery_top
|
||||||
|
GROUP BY
|
||||||
|
hasdone;
|
||||||
|
|
||||||
|
-- Union, left join and having subquery pushdown
|
||||||
|
SELECT
|
||||||
|
avg(array_length(events, 1)) AS event_average,
|
||||||
|
count_pay
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
array_agg(event ORDER BY event_time) AS events,
|
||||||
|
COALESCE(count_pay, 0) AS count_pay
|
||||||
|
FROM
|
||||||
|
(
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id),
|
||||||
|
'action=>1'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'click')
|
||||||
|
UNION
|
||||||
|
(SELECT
|
||||||
|
(users.composite_id).tenant_id,
|
||||||
|
(users.composite_id).user_id,
|
||||||
|
(users.composite_id),
|
||||||
|
'action=>2'AS event,
|
||||||
|
events.event_time
|
||||||
|
FROM
|
||||||
|
users,
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(users.composite_id) = (events.composite_id) AND
|
||||||
|
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'submit')
|
||||||
|
) AS subquery_1
|
||||||
|
LEFT JOIN
|
||||||
|
(SELECT
|
||||||
|
(composite_id).tenant_id,
|
||||||
|
(composite_id).user_id,
|
||||||
|
composite_id,
|
||||||
|
COUNT(*) AS count_pay
|
||||||
|
FROM
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
|
||||||
|
event_type = 'pay'
|
||||||
|
GROUP BY
|
||||||
|
composite_id
|
||||||
|
HAVING
|
||||||
|
COUNT(*) > 2) AS subquery_2
|
||||||
|
ON
|
||||||
|
subquery_1.composite_id = subquery_2.composite_id
|
||||||
|
GROUP BY
|
||||||
|
subquery_1.tenant_id,
|
||||||
|
subquery_1.user_id,
|
||||||
|
count_pay) AS subquery_top
|
||||||
|
WHERE
|
||||||
|
array_ndims(events) > 0
|
||||||
|
GROUP BY
|
||||||
|
count_pay
|
||||||
|
ORDER BY
|
||||||
|
count_pay;
|
||||||
|
|
||||||
|
-- Lateral join subquery pushdown
|
||||||
|
-- set subquery_pushdown since there is limit in the query
|
||||||
|
SET citus.subquery_pushdown to ON;
|
||||||
|
SELECT
|
||||||
|
tenant_id,
|
||||||
|
user_id,
|
||||||
|
user_lastseen,
|
||||||
|
event_array
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
tenant_id,
|
||||||
|
user_id,
|
||||||
|
max(lastseen) as user_lastseen,
|
||||||
|
array_agg(event_type ORDER BY event_time) AS event_array
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
(composite_id).tenant_id,
|
||||||
|
(composite_id).user_id,
|
||||||
|
composite_id,
|
||||||
|
lastseen
|
||||||
|
FROM
|
||||||
|
users
|
||||||
|
WHERE
|
||||||
|
composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
|
||||||
|
composite_id <= '(1, 9223372036854775807)'::user_composite_type
|
||||||
|
ORDER BY
|
||||||
|
lastseen DESC
|
||||||
|
LIMIT
|
||||||
|
10
|
||||||
|
) AS subquery_top
|
||||||
|
LEFT JOIN LATERAL
|
||||||
|
(SELECT
|
||||||
|
event_type,
|
||||||
|
event_time
|
||||||
|
FROM
|
||||||
|
events
|
||||||
|
WHERE
|
||||||
|
(composite_id) = subquery_top.composite_id
|
||||||
|
ORDER BY
|
||||||
|
event_time DESC
|
||||||
|
LIMIT
|
||||||
|
99) AS subquery_lateral
|
||||||
|
ON
|
||||||
|
true
|
||||||
|
GROUP BY
|
||||||
|
tenant_id,
|
||||||
|
user_id
|
||||||
|
) AS shard_union
|
||||||
|
ORDER BY
|
||||||
|
user_lastseen DESC
|
||||||
|
LIMIT
|
||||||
|
10;
|
||||||
|
|
||||||
|
-- cleanup the tables and the type & functions
|
||||||
|
-- also set the min messages to WARNING to skip
|
||||||
|
-- CASCADE NOTICE messagez
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP TABLE users, events;
|
||||||
|
|
||||||
|
SELECT run_command_on_master_and_workers($f$
|
||||||
|
|
||||||
|
DROP TYPE user_composite_type CASCADE;
|
||||||
|
|
||||||
|
$f$);
|
||||||
|
|
||||||
|
-- createed in multi_behavioral_analytics_create_table
|
||||||
|
DROP FUNCTION run_command_on_master_and_workers(p_sql text);
|
||||||
|
|
||||||
|
SET client_min_messages TO DEFAULT;
|
||||||
|
|
||||||
|
SET citus.subquery_pushdown to OFF;
|
||||||
|
SET citus.enable_router_execution TO 'true';
|
Loading…
Reference in New Issue