citus/src/test/regress/output/multi_subquery_0.source

1468 lines
41 KiB
Plaintext

--
-- MULTI_SUBQUERY
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 270000;
-- print major version to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+(?:\.\d+)?') AS major_version;
major_version
---------------
9.5
(1 row)
-- Create tables for subquery tests
CREATE TABLE lineitem_subquery (
l_orderkey bigint not null,
l_partkey integer not null,
l_suppkey integer not null,
l_linenumber integer not null,
l_quantity decimal(15, 2) not null,
l_extendedprice decimal(15, 2) not null,
l_discount decimal(15, 2) not null,
l_tax decimal(15, 2) not null,
l_returnflag char(1) not null,
l_linestatus char(1) not null,
l_shipdate date not null,
l_commitdate date not null,
l_receiptdate date not null,
l_shipinstruct char(25) not null,
l_shipmode char(10) not null,
l_comment varchar(44) not null,
PRIMARY KEY(l_orderkey, l_linenumber) );
SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range');
master_create_distributed_table
---------------------------------
(1 row)
CREATE TABLE orders_subquery (
o_orderkey bigint not null,
o_custkey integer not null,
o_orderstatus char(1) not null,
o_totalprice decimal(15,2) not null,
o_orderdate date not null,
o_orderpriority char(15) not null,
o_clerk char(15) not null,
o_shippriority integer not null,
o_comment varchar(79) not null,
PRIMARY KEY(o_orderkey) );
SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range');
master_create_distributed_table
---------------------------------
(1 row)
SET citus.enable_router_execution TO 'false';
-- Check that we allow subquery pushdown in default settings.
SELECT
avg(unit_price)
FROM
(SELECT
l_orderkey,
avg(o_totalprice) AS unit_price
FROM
lineitem_subquery,
orders_subquery
WHERE
l_orderkey = o_orderkey
GROUP BY
l_orderkey) AS unit_prices;
avg
-----
(1 row)
-- Check that we don't crash if there are not any shards.
SELECT
avg(unit_price)
FROM
(SELECT
l_orderkey,
avg(o_totalprice) AS unit_price
FROM
lineitem_subquery,
orders_subquery
WHERE
l_orderkey = o_orderkey
GROUP BY
l_orderkey) AS unit_prices;
avg
-----
(1 row)
-- Load data into tables.
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14946
WHERE shardid = :new_shard_id;
SET citus.shard_max_size TO "1MB";
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
-- Check that we error out if shard min/max values are not exactly same.
SELECT
avg(unit_price)
FROM
(SELECT
l_orderkey,
avg(o_totalprice) AS unit_price
FROM
lineitem_subquery,
orders_subquery
WHERE
l_orderkey = o_orderkey
GROUP BY
l_orderkey) AS unit_prices;
ERROR: cannot push down this subquery
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
-- Update metadata in order to make all shards equal.
UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003;
-- If group by is not on partition column then we error out from single table
-- repartition code path
SELECT
avg(order_count)
FROM
(SELECT
l_suppkey,
count(*) AS order_count
FROM
lineitem_subquery
GROUP BY
l_suppkey) AS order_counts;
ERROR: cannot use real time executor with repartition jobs
HINT: Set citus.task_executor_type to "task-tracker".
-- Check that we error out if join is not on partition columns.
SELECT
avg(unit_price)
FROM
(SELECT
l_orderkey,
avg(o_totalprice / l_quantity) AS unit_price
FROM
lineitem_subquery,
orders_subquery
GROUP BY
l_orderkey) AS unit_prices;
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
SELECT
avg(unit_price)
FROM
(SELECT
l_orderkey,
avg(o_totalprice / l_quantity) AS unit_price
FROM
lineitem_subquery,
orders_subquery
WHERE
l_orderkey = o_custkey
GROUP BY
l_orderkey) AS unit_prices;
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
-- Check that we error out if there is non relation subqueries
SELECT count(*) FROM
(
(SELECT l_orderkey FROM lineitem_subquery) UNION ALL
(SELECT 1::bigint)
) b;
ERROR: cannot push down this subquery
DETAIL: Subqueries without relations are unsupported
-- Check that we error out if queries in union do not include partition columns.
SELECT count(*) FROM
(
(SELECT l_orderkey FROM lineitem_subquery) UNION
(SELECT l_partkey FROM lineitem_subquery)
) b;
ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position
DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list.
-- Check that we run union queries if partition column is selected.
SELECT count(*) FROM
(
(SELECT l_orderkey FROM lineitem_subquery) UNION
(SELECT l_orderkey FROM lineitem_subquery)
) b;
count
-------
2985
(1 row)
-- Check that we error out if inner query has Limit but subquery_pushdown is not set
SELECT
avg(o_totalprice/l_quantity)
FROM
(SELECT
l_orderkey,
l_quantity
FROM
lineitem_subquery
ORDER BY
l_quantity
LIMIT 10
) lineitem_quantities
JOIN LATERAL
(SELECT
o_totalprice
FROM
orders_subquery
WHERE
lineitem_quantities.l_orderkey = o_orderkey) orders_price ON true;
ERROR: cannot push down this subquery
DETAIL: Limit in subquery is currently unsupported
-- Limit is only supported when subquery_pushdown is set
-- Check that we error out if inner query has limit but outer query has not.
SET citus.subquery_pushdown to ON;
SELECT
avg(o_totalprice/l_quantity)
FROM
(SELECT
l_orderkey,
l_quantity
FROM
lineitem_subquery
ORDER BY
l_quantity
LIMIT 10
) lineitem_quantities
JOIN LATERAL
(SELECT
o_totalprice
FROM
orders_subquery
WHERE
lineitem_quantities.l_orderkey = o_orderkey) orders_price ON true;
ERROR: cannot push down this subquery
DETAIL: Limit in subquery without limit in the outermost query is unsupported
-- reset the flag for next query
SET citus.subquery_pushdown to OFF;
-- Check that we error out if the outermost query is a distinct clause.
SELECT
count(DISTINCT a)
FROM (
SELECT
count(*) a
FROM
lineitem_subquery
GROUP BY
l_orderkey
) z;
ERROR: cannot push down this subquery
DETAIL: distinct in the outermost query is unsupported
-- Check supported subquery types.
SELECT
o_custkey,
sum(order_count) as total_order_count
FROM
(SELECT
o_orderkey,
o_custkey,
count(*) AS order_count
FROM
orders_subquery
WHERE
o_orderkey > 0 AND
o_orderkey < 12000
GROUP BY
o_orderkey, o_custkey) AS order_counts
GROUP BY
o_custkey
ORDER BY
total_order_count DESC,
o_custkey ASC
LIMIT 10;
o_custkey | total_order_count
-----------+-------------------
1462 | 9
619 | 8
643 | 8
1030 | 8
1486 | 8
79 | 7
304 | 7
319 | 7
343 | 7
448 | 7
(10 rows)
SELECT
avg(unit_price)
FROM
(SELECT
l_orderkey,
avg(o_totalprice / l_quantity) AS unit_price
FROM
lineitem_subquery,
orders_subquery
WHERE
l_orderkey = o_orderkey
GROUP BY
l_orderkey) AS unit_prices
WHERE
unit_price > 1000 AND
unit_price < 10000;
avg
-----------------------
4968.2889885208475549
(1 row)
-- Check that if subquery is pulled, we don't error and run query properly.
SELECT count(*) FROM
(
SELECT l_orderkey FROM (
(SELECT l_orderkey FROM lineitem_subquery) UNION
(SELECT l_orderkey FROM lineitem_subquery)
) a
WHERE l_orderkey = 1
) b;
count
-------
1
(1 row)
SELECT count(*) FROM
(
SELECT * FROM (
(SELECT * FROM lineitem_subquery) UNION
(SELECT * FROM lineitem_subquery)
) a
WHERE l_orderkey = 1
) b;
count
-------
6
(1 row)
SELECT max(l_orderkey) FROM
(
SELECT l_orderkey FROM (
SELECT
l_orderkey
FROM
lineitem_subquery
WHERE
l_orderkey < 20000
GROUP BY
l_orderkey
) z
) y;
max
-------
14947
(1 row)
-- Add one more shard to one relation, then test if we error out because of different
-- shard counts for joining relations.
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 15000, shardmaxvalue = 20000
WHERE shardid = :new_shard_id;
SELECT
avg(unit_price)
FROM
(SELECT
l_orderkey,
avg(o_totalprice / l_quantity) AS unit_price
FROM
lineitem_subquery,
orders_subquery
WHERE
l_orderkey = o_orderkey
GROUP BY
l_orderkey) AS unit_prices;
ERROR: cannot push down this subquery
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
-- Check that we can prune shards in subqueries with VARCHAR partition columns
CREATE TABLE subquery_pruning_varchar_test_table
(
a varchar,
b int
);
SELECT master_create_distributed_table('subquery_pruning_varchar_test_table', 'a', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
SET client_min_messages TO DEBUG2;
SELECT * FROM
(SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a)
AS foo;
DEBUG: Skipping the target shard interval 270005 because SELECT query is pruned away for the interval
DEBUG: Skipping the target shard interval 270006 because SELECT query is pruned away for the interval
DEBUG: Skipping the target shard interval 270008 because SELECT query is pruned away for the interval
count
-------
(0 rows)
SELECT * FROM
(SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE 'eren' = a GROUP BY a)
AS foo;
DEBUG: Skipping the target shard interval 270005 because SELECT query is pruned away for the interval
DEBUG: Skipping the target shard interval 270007 because SELECT query is pruned away for the interval
DEBUG: Skipping the target shard interval 270008 because SELECT query is pruned away for the interval
count
-------
(0 rows)
SET client_min_messages TO NOTICE;
-- test subquery join on VARCHAR partition column
SELECT * FROM
(SELECT
a_inner AS a
FROM
(SELECT
subquery_pruning_varchar_test_table.a AS a_inner
FROM
subquery_pruning_varchar_test_table
GROUP BY
subquery_pruning_varchar_test_table.a
HAVING
count(subquery_pruning_varchar_test_table.a) < 3)
AS f1,
(SELECT
subquery_pruning_varchar_test_table.a
FROM
subquery_pruning_varchar_test_table
GROUP BY
subquery_pruning_varchar_test_table.a
HAVING
sum(coalesce(subquery_pruning_varchar_test_table.b,0)) > 20.0)
AS f2
WHERE
f1.a_inner = f2.a
GROUP BY
a_inner)
AS foo;
a
---
(0 rows)
DROP TABLE subquery_pruning_varchar_test_table;
-- Create composite type to use in subquery pushdown
CREATE TYPE user_composite_type AS
(
tenant_id BIGINT,
user_id BIGINT
);
CREATE FUNCTION user_composite_type_equal(user_composite_type,
user_composite_type)
returns boolean AS 'select $1.tenant_id = $2.tenant_id AND $1.user_id = $2.user_id;' language sql immutable
returns NULL ON NULL input;
CREATE OR REPLACE FUNCTION user_composite_type_lt(v1 user_composite_type, v2 user_composite_type) returns boolean AS
$$
BEGIN
if v1.tenant_id < v2.tenant_id THEN
return true;
end if;
if v1.tenant_id = v2.tenant_id AND v1.user_id < v2.user_id THEN
RETURN true;
end if;
RETURN false;
END;
$$ language 'plpgsql' immutable
returns NULL ON NULL input;
CREATE OR REPLACE FUNCTION user_composite_type_le(v1 user_composite_type, v2 user_composite_type) returns boolean AS
$$
BEGIN
if v1.tenant_id < v2.tenant_id THEN
return true;
end if;
if v1.tenant_id = v2.tenant_id AND v1.user_id <= v2.user_id THEN
RETURN true;
end if;
RETURN false;
END;
$$ language 'plpgsql' immutable
returns NULL ON NULL input;
CREATE OR REPLACE FUNCTION user_composite_type_gt(v1 user_composite_type, v2 user_composite_type) returns boolean AS
$$
BEGIN
if v1.tenant_id > v2.tenant_id THEN
return true;
end if;
if v1.tenant_id = v2.tenant_id AND v1.user_id > v2.user_id THEN
RETURN true;
end if;
RETURN false;
END;
$$ language 'plpgsql' immutable
returns NULL ON NULL input;
CREATE OR REPLACE FUNCTION user_composite_type_ge(v1 user_composite_type, v2 user_composite_type) returns boolean AS
$$
BEGIN
if v1.tenant_id > v2.tenant_id THEN
return true;
end if;
if v1.tenant_id = v2.tenant_id AND v1.user_id >= v2.user_id THEN
RETURN true;
end if;
RETURN false;
END;
$$ language 'plpgsql' immutable
returns NULL ON NULL input;
CREATE OR REPLACE FUNCTION btree_comparison(v1 user_composite_type, v2 user_composite_type) returns integer AS
$$
BEGIN
if v1.tenant_id = v2.tenant_id AND v1.user_id = v2.user_id THEN
return 0;
end if;
if v1.tenant_id > v2.tenant_id THEN
RETURN 1;
end if;
if v1.tenant_id < v2.tenant_id THEN
RETURN -1;
end if;
if v1.user_id > v2.user_id THEN
RETURN 1;
end if;
if v1.user_id < v2.user_id THEN
RETURN -1;
end if;
RETURN 0;
END;
$$ language 'plpgsql' immutable
returns NULL ON NULL input;
CREATE operator =
( leftarg = user_composite_type, rightarg = user_composite_type,
PROCEDURE = user_composite_type_equal,
commutator = =,
RESTRICT = eqsel,
JOIN = eqjoinsel,
merges,
hashes);
CREATE operator <
(
leftarg = user_composite_type, rightarg = user_composite_type,
PROCEDURE = user_composite_type_lt
);
CREATE operator >
(
leftarg = user_composite_type, rightarg = user_composite_type,
PROCEDURE = user_composite_type_gt
);
CREATE operator <=
(
leftarg = user_composite_type, rightarg = user_composite_type,
PROCEDURE = user_composite_type_le
);
CREATE operator >=
(
leftarg = user_composite_type, rightarg = user_composite_type,
PROCEDURE = user_composite_type_ge
);
CREATE FUNCTION user_composite_type_hash(user_composite_type)
returns int AS 'SELECT hashtext( ($1.tenant_id + $1.user_id)::text);' language sql immutable
returns NULL ON NULL input;
CREATE OPERATOR CLASS btree_user_composite_ops
DEFAULT FOR TYPE user_composite_type USING btree
AS
OPERATOR 1 < ,
OPERATOR 2 <= ,
OPERATOR 3 = ,
OPERATOR 4 >= ,
OPERATOR 5 > ,
FUNCTION 1 btree_comparison(user_composite_type, user_composite_type);
create operator class user_composite_type_hash_op_class DEFAULT FOR type user_composite_type using hash AS operator 1 = (user_composite_type, user_composite_type), FUNCTION 1 user_composite_type_hash(user_composite_type);
\c - - - :worker_1_port
CREATE TYPE user_composite_type AS
(
tenant_id BIGINT,
user_id BIGINT
);
CREATE FUNCTION user_composite_type_equal(user_composite_type,
user_composite_type)
returns boolean AS 'select $1.tenant_id = $2.tenant_id AND $1.user_id = $2.user_id;' language sql immutable
returns NULL ON NULL input;
CREATE OR REPLACE FUNCTION user_composite_type_lt(v1 user_composite_type, v2 user_composite_type) returns boolean AS
$$
BEGIN
if v1.tenant_id < v2.tenant_id THEN
return true;
end if;
if v1.tenant_id = v2.tenant_id AND v1.user_id < v2.user_id THEN
RETURN true;
end if;
RETURN false;
END;
$$ language 'plpgsql' immutable
returns NULL ON NULL input;
CREATE OR REPLACE FUNCTION user_composite_type_le(v1 user_composite_type, v2 user_composite_type) returns boolean AS
$$
BEGIN
if v1.tenant_id < v2.tenant_id THEN
return true;
end if;
if v1.tenant_id = v2.tenant_id AND v1.user_id <= v2.user_id THEN
RETURN true;
end if;
RETURN false;
END;
$$ language 'plpgsql' immutable
returns NULL ON NULL input;
CREATE OR REPLACE FUNCTION user_composite_type_gt(v1 user_composite_type, v2 user_composite_type) returns boolean AS
$$
BEGIN
if v1.tenant_id > v2.tenant_id THEN
return true;
end if;
if v1.tenant_id = v2.tenant_id AND v1.user_id > v2.user_id THEN
RETURN true;
end if;
RETURN false;
END;
$$ language 'plpgsql' immutable
returns NULL ON NULL input;
CREATE OR REPLACE FUNCTION user_composite_type_ge(v1 user_composite_type, v2 user_composite_type) returns boolean AS
$$
BEGIN
if v1.tenant_id > v2.tenant_id THEN
return true;
end if;
if v1.tenant_id = v2.tenant_id AND v1.user_id >= v2.user_id THEN
RETURN true;
end if;
RETURN false;
END;
$$ language 'plpgsql' immutable
returns NULL ON NULL input;
CREATE OR REPLACE FUNCTION btree_comparison(v1 user_composite_type, v2 user_composite_type) returns integer AS
$$
BEGIN
if v1.tenant_id = v2.tenant_id AND v1.user_id = v2.user_id THEN
return 0;
end if;
if v1.tenant_id > v2.tenant_id THEN
RETURN 1;
end if;
if v1.tenant_id < v2.tenant_id THEN
RETURN -1;
end if;
if v1.user_id > v2.user_id THEN
RETURN 1;
end if;
if v1.user_id < v2.user_id THEN
RETURN -1;
end if;
RETURN 0;
END;
$$ language 'plpgsql' immutable
returns NULL ON NULL input;
CREATE operator =
( leftarg = user_composite_type, rightarg = user_composite_type,
PROCEDURE = user_composite_type_equal,
commutator = =,
RESTRICT = eqsel,
JOIN = eqjoinsel,
merges,
hashes);
CREATE operator <
(
leftarg = user_composite_type, rightarg = user_composite_type,
PROCEDURE = user_composite_type_lt
);
CREATE operator >
(
leftarg = user_composite_type, rightarg = user_composite_type,
PROCEDURE = user_composite_type_gt
);
CREATE operator <=
(
leftarg = user_composite_type, rightarg = user_composite_type,
PROCEDURE = user_composite_type_le
);
CREATE operator >=
(
leftarg = user_composite_type, rightarg = user_composite_type,
PROCEDURE = user_composite_type_ge
);
CREATE FUNCTION user_composite_type_hash(user_composite_type)
returns int AS 'SELECT hashtext( ($1.tenant_id + $1.user_id)::text);' language sql immutable
returns NULL ON NULL input;
CREATE OPERATOR CLASS btree_user_composite_ops
DEFAULT FOR TYPE user_composite_type USING btree
AS
OPERATOR 1 < ,
OPERATOR 2 <= ,
OPERATOR 3 = ,
OPERATOR 4 >= ,
OPERATOR 5 > ,
FUNCTION 1 btree_comparison(user_composite_type, user_composite_type);
create operator class user_composite_type_hash_op_class DEFAULT FOR type user_composite_type using hash AS operator 1 = (user_composite_type, user_composite_type), FUNCTION 1 user_composite_type_hash(user_composite_type);
\c - - - :worker_2_port
CREATE TYPE user_composite_type AS
(
tenant_id BIGINT,
user_id BIGINT
);
CREATE FUNCTION user_composite_type_equal(user_composite_type,
user_composite_type)
returns boolean AS 'select $1.tenant_id = $2.tenant_id AND $1.user_id = $2.user_id;' language sql immutable
returns NULL ON NULL input;
CREATE OR REPLACE FUNCTION user_composite_type_lt(v1 user_composite_type, v2 user_composite_type) returns boolean AS
$$
BEGIN
if v1.tenant_id < v2.tenant_id THEN
return true;
end if;
if v1.tenant_id = v2.tenant_id AND v1.user_id < v2.user_id THEN
RETURN true;
end if;
RETURN false;
END;
$$ language 'plpgsql' immutable
returns NULL ON NULL input;
CREATE OR REPLACE FUNCTION user_composite_type_le(v1 user_composite_type, v2 user_composite_type) returns boolean AS
$$
BEGIN
if v1.tenant_id < v2.tenant_id THEN
return true;
end if;
if v1.tenant_id = v2.tenant_id AND v1.user_id <= v2.user_id THEN
RETURN true;
end if;
RETURN false;
END;
$$ language 'plpgsql' immutable
returns NULL ON NULL input;
CREATE OR REPLACE FUNCTION user_composite_type_gt(v1 user_composite_type, v2 user_composite_type) returns boolean AS
$$
BEGIN
if v1.tenant_id > v2.tenant_id THEN
return true;
end if;
if v1.tenant_id = v2.tenant_id AND v1.user_id > v2.user_id THEN
RETURN true;
end if;
RETURN false;
END;
$$ language 'plpgsql' immutable
returns NULL ON NULL input;
CREATE OR REPLACE FUNCTION user_composite_type_ge(v1 user_composite_type, v2 user_composite_type) returns boolean AS
$$
BEGIN
if v1.tenant_id > v2.tenant_id THEN
return true;
end if;
if v1.tenant_id = v2.tenant_id AND v1.user_id >= v2.user_id THEN
RETURN true;
end if;
RETURN false;
END;
$$ language 'plpgsql' immutable
returns NULL ON NULL input;
CREATE OR REPLACE FUNCTION btree_comparison(v1 user_composite_type, v2 user_composite_type) returns integer AS
$$
BEGIN
if v1.tenant_id = v2.tenant_id AND v1.user_id = v2.user_id THEN
return 0;
end if;
if v1.tenant_id > v2.tenant_id THEN
RETURN 1;
end if;
if v1.tenant_id < v2.tenant_id THEN
RETURN -1;
end if;
if v1.user_id > v2.user_id THEN
RETURN 1;
end if;
if v1.user_id < v2.user_id THEN
RETURN -1;
end if;
RETURN 0;
END;
$$ language 'plpgsql' immutable
returns NULL ON NULL input;
CREATE operator =
( leftarg = user_composite_type, rightarg = user_composite_type,
PROCEDURE = user_composite_type_equal,
commutator = =,
RESTRICT = eqsel,
JOIN = eqjoinsel,
merges,
hashes);
CREATE operator <
(
leftarg = user_composite_type, rightarg = user_composite_type,
PROCEDURE = user_composite_type_lt
);
CREATE operator >
(
leftarg = user_composite_type, rightarg = user_composite_type,
PROCEDURE = user_composite_type_gt
);
CREATE operator <=
(
leftarg = user_composite_type, rightarg = user_composite_type,
PROCEDURE = user_composite_type_le
);
CREATE operator >=
(
leftarg = user_composite_type, rightarg = user_composite_type,
PROCEDURE = user_composite_type_ge
);
CREATE FUNCTION user_composite_type_hash(user_composite_type)
returns int AS 'SELECT hashtext( ($1.tenant_id + $1.user_id)::text);' language sql immutable
returns NULL ON NULL input;
CREATE OPERATOR CLASS btree_user_composite_ops
DEFAULT FOR TYPE user_composite_type USING btree
AS
OPERATOR 1 < ,
OPERATOR 2 <= ,
OPERATOR 3 = ,
OPERATOR 4 >= ,
OPERATOR 5 > ,
FUNCTION 1 btree_comparison(user_composite_type, user_composite_type);
create operator class user_composite_type_hash_op_class DEFAULT FOR type user_composite_type using hash AS operator 1 = (user_composite_type, user_composite_type), FUNCTION 1 user_composite_type_hash(user_composite_type);
\c - - - :master_port
CREATE TABLE events (
composite_id user_composite_type,
event_id bigint,
event_type character varying(255),
event_time bigint
);
SELECT master_create_distributed_table('events', 'composite_id', 'range');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
\COPY events FROM STDIN WITH CSV
CREATE TABLE users (
composite_id user_composite_type,
lastseen bigint
);
SELECT master_create_distributed_table('users', 'composite_id', 'range');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
\COPY users FROM STDIN WITH CSV
-- Simple join subquery pushdown
SELECT
avg(array_length(events, 1)) AS event_average
FROM
(SELECT
tenant_id,
user_id,
array_agg(event_type ORDER BY event_time) AS events
FROM
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
event_type,
events.event_time
FROM
users,
events
WHERE
(users.composite_id) = (events.composite_id) AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type IN ('click', 'submit', 'pay')) AS subquery
GROUP BY
tenant_id,
user_id) AS subquery;
event_average
--------------------
3.6666666666666667
(1 row)
-- Union and left join subquery pushdown
SELECT
avg(array_length(events, 1)) AS event_average,
hasdone
FROM
(SELECT
subquery_1.tenant_id,
subquery_1.user_id,
array_agg(event ORDER BY event_time) AS events,
COALESCE(hasdone, 'Has not done paying') AS hasdone
FROM
(
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
(users.composite_id) as composite_id,
'action=>1'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id) = (events.composite_id) AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'click')
UNION
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
(users.composite_id) as composite_id,
'action=>2'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id) = (events.composite_id) AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'submit')
) AS subquery_1
LEFT JOIN
(SELECT
DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id,
(composite_id).tenant_id,
(composite_id).user_id,
'Has done paying'::TEXT AS hasdone
FROM
events
WHERE
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'pay') AS subquery_2
ON
subquery_1.composite_id = subquery_2.composite_id
GROUP BY
subquery_1.tenant_id,
subquery_1.user_id,
hasdone) AS subquery_top
GROUP BY
hasdone
ORDER BY
event_average;
event_average | hasdone
--------------------+---------------------
2.5000000000000000 | Has done paying
4.0000000000000000 | Has not done paying
(2 rows)
-- Union, left join and having subquery pushdown
SELECT
avg(array_length(events, 1)) AS event_average,
count_pay
FROM (
SELECT
subquery_1.tenant_id,
subquery_1.user_id,
array_agg(event ORDER BY event_time) AS events,
COALESCE(count_pay, 0) AS count_pay
FROM
(
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
(users.composite_id),
'action=>1'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id) = (events.composite_id) AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'click')
UNION
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
(users.composite_id),
'action=>2'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id) = (events.composite_id) AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'submit')
) AS subquery_1
LEFT JOIN
(SELECT
(composite_id).tenant_id,
(composite_id).user_id,
composite_id,
COUNT(*) AS count_pay
FROM
events
WHERE
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'pay'
GROUP BY
composite_id
HAVING
COUNT(*) > 2) AS subquery_2
ON
subquery_1.composite_id = subquery_2.composite_id
GROUP BY
subquery_1.tenant_id,
subquery_1.user_id,
count_pay) AS subquery_top
WHERE
array_ndims(events) > 0
GROUP BY
count_pay
ORDER BY
count_pay;
event_average | count_pay
--------------------+-----------
3.0000000000000000 | 0
(1 row)
-- Lateral join subquery pushdown
-- set subquery_pushdown since there is limit in the query
SET citus.subquery_pushdown to ON;
SELECT
tenant_id,
user_id,
user_lastseen,
event_array
FROM
(SELECT
tenant_id,
user_id,
max(lastseen) as user_lastseen,
array_agg(event_type ORDER BY event_time) AS event_array
FROM
(SELECT
(composite_id).tenant_id,
(composite_id).user_id,
composite_id,
lastseen
FROM
users
WHERE
composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
composite_id <= '(1, 9223372036854775807)'::user_composite_type
ORDER BY
lastseen DESC
LIMIT
10
) AS subquery_top
LEFT JOIN LATERAL
(SELECT
event_type,
event_time
FROM
events
WHERE
(composite_id) = subquery_top.composite_id
ORDER BY
event_time DESC
LIMIT
99) AS subquery_lateral
ON
true
GROUP BY
tenant_id,
user_id
) AS shard_union
ORDER BY
user_lastseen DESC
LIMIT
10;
tenant_id | user_id | user_lastseen | event_array
-----------+---------+---------------+----------------------------
1 | 1003 | 1472807315 | {click,click,click,submit}
1 | 1002 | 1472807215 | {click,click,submit,pay}
1 | 1001 | 1472807115 | {click,submit,pay}
(3 rows)
-- reset the flag for next query
SET citus.subquery_pushdown to OFF;
-- Same queries above with explain
-- Simple join subquery pushdown
EXPLAIN (COSTS OFF)
SELECT
avg(array_length(events, 1)) AS event_average
FROM
(SELECT
tenant_id,
user_id,
array_agg(event_type ORDER BY event_time) AS events
FROM
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
event_type,
events.event_time
FROM
users,
events
WHERE
(users.composite_id) = (events.composite_id) AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type IN ('click', 'submit', 'pay')) AS subquery
GROUP BY
tenant_id,
user_id) AS subquery;
QUERY PLAN
------------------------------------------------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> GroupAggregate
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Sort
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Result
One-Time Filter: false
(13 rows)
-- Union and left join subquery pushdown
EXPLAIN (COSTS OFF)
SELECT
avg(array_length(events, 1)) AS event_average,
hasdone
FROM
(SELECT
subquery_1.tenant_id,
subquery_1.user_id,
array_agg(event ORDER BY event_time) AS events,
COALESCE(hasdone, 'Has not done paying') AS hasdone
FROM
(
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
(users.composite_id) as composite_id,
'action=>1'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id) = (events.composite_id) AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'click')
UNION
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
(users.composite_id) as composite_id,
'action=>2'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id) = (events.composite_id) AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'submit')
) AS subquery_1
LEFT JOIN
(SELECT
DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id,
(composite_id).tenant_id,
(composite_id).user_id,
'Has done paying'::TEXT AS hasdone
FROM
events
WHERE
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'pay') AS subquery_2
ON
subquery_1.composite_id = subquery_2.composite_id
GROUP BY
subquery_1.tenant_id,
subquery_1.user_id,
hasdone) AS subquery_top
GROUP BY
hasdone;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
HashAggregate
Group Key: remote_scan.hasdone
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: COALESCE(('Has done paying'::text), 'Has not done paying'::text)
-> GroupAggregate
Group Key: subquery_1.tenant_id, subquery_1.user_id, ('Has done paying'::text)
-> Sort
Sort Key: subquery_1.tenant_id, subquery_1.user_id, ('Has done paying'::text)
-> Hash Right Join
Hash Cond: (events.composite_id = subquery_1.composite_id)
-> Unique
-> Sort
Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id)
-> Seq Scan on events_270011 events
Filter: (((event_type)::text = 'pay'::text) AND (composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Hash
-> Subquery Scan on subquery_1
-> Unique
-> Sort
Sort Key: ((composite_id).tenant_id), ((composite_id).user_id), composite_id, ('action=>1'::text), event_time
-> Append
-> Result
One-Time Filter: false
-> Result
One-Time Filter: false
(30 rows)
-- Union, left join and having subquery pushdown
EXPLAIN (COSTS OFF)
SELECT
avg(array_length(events, 1)) AS event_average,
count_pay
FROM (
SELECT
subquery_1.tenant_id,
subquery_1.user_id,
array_agg(event ORDER BY event_time) AS events,
COALESCE(count_pay, 0) AS count_pay
FROM
(
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
(users.composite_id),
'action=>1'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id) = (events.composite_id) AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'click')
UNION
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
(users.composite_id),
'action=>2'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id) = (events.composite_id) AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'submit')
) AS subquery_1
LEFT JOIN
(SELECT
(composite_id).tenant_id,
(composite_id).user_id,
composite_id,
COUNT(*) AS count_pay
FROM
events
WHERE
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'pay'
GROUP BY
composite_id
HAVING
COUNT(*) > 2) AS subquery_2
ON
subquery_1.composite_id = subquery_2.composite_id
GROUP BY
subquery_1.tenant_id,
subquery_1.user_id,
count_pay) AS subquery_top
WHERE
array_ndims(events) > 0
GROUP BY
count_pay
ORDER BY
count_pay;
ERROR: bogus varattno for OUTER_VAR var: 3
-- Lateral join subquery pushdown
-- set subquery_pushdown due to limit in the query
SET citus.subquery_pushdown to ON;
EXPLAIN (COSTS OFF)
SELECT
tenant_id,
user_id,
user_lastseen,
event_array
FROM
(SELECT
tenant_id,
user_id,
max(lastseen) as user_lastseen,
array_agg(event_type ORDER BY event_time) AS event_array
FROM
(SELECT
(composite_id).tenant_id,
(composite_id).user_id,
composite_id,
lastseen
FROM
users
WHERE
composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
composite_id <= '(1, 9223372036854775807)'::user_composite_type
ORDER BY
lastseen DESC
LIMIT
10
) AS subquery_top
LEFT JOIN LATERAL
(SELECT
event_type,
event_time
FROM
events
WHERE
(composite_id) = subquery_top.composite_id
ORDER BY
event_time DESC
LIMIT
99) AS subquery_lateral
ON
true
GROUP BY
tenant_id,
user_id
) AS shard_union
ORDER BY
user_lastseen DESC
LIMIT
10;
QUERY PLAN
------------------------------------------------------------------------------------------------------------
Limit
-> Sort
Sort Key: remote_scan.user_lastseen DESC
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Sort
Sort Key: (max(lastseen)) DESC
-> GroupAggregate
Group Key: ((composite_id).tenant_id), ((composite_id).user_id)
-> Sort
Sort Key: ((composite_id).tenant_id), ((composite_id).user_id)
-> Nested Loop Left Join
-> Limit
-> Sort
Sort Key: lastseen DESC
-> Result
One-Time Filter: false
-> Limit
-> Sort
Sort Key: events.event_time DESC
-> Seq Scan on events_270011 events
Filter: (composite_id = composite_id)
(26 rows)
SET citus.subquery_pushdown to OFF;
SET citus.enable_router_execution TO 'true';