-- -- MULTI_SUBQUERY -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 270000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 270000; -- Create tables for subquery tests CREATE TABLE lineitem_subquery ( l_orderkey bigint not null, l_partkey integer not null, l_suppkey integer not null, l_linenumber integer not null, l_quantity decimal(15, 2) not null, l_extendedprice decimal(15, 2) not null, l_discount decimal(15, 2) not null, l_tax decimal(15, 2) not null, l_returnflag char(1) not null, l_linestatus char(1) not null, l_shipdate date not null, l_commitdate date not null, l_receiptdate date not null, l_shipinstruct char(25) not null, l_shipmode char(10) not null, l_comment varchar(44) not null, PRIMARY KEY(l_orderkey, l_linenumber) ); SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range'); CREATE TABLE orders_subquery ( o_orderkey bigint not null, o_custkey integer not null, o_orderstatus char(1) not null, o_totalprice decimal(15,2) not null, o_orderdate date not null, o_orderpriority char(15) not null, o_clerk char(15) not null, o_shippriority integer not null, o_comment varchar(79) not null, PRIMARY KEY(o_orderkey) ); SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range'); SET citus.task_executor_type TO 'task-tracker'; -- Check that we don't allow subquery pushdown in default settings. SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice) AS unit_price FROM lineitem_subquery, orders_subquery WHERE l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; SET citus.subquery_pushdown to TRUE; -- Check that we don't crash if there are not any shards. SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice) AS unit_price FROM lineitem_subquery, orders_subquery WHERE l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; -- Stage data to tables. SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986 WHERE shardid = :new_shard_id; SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947 WHERE shardid = :new_shard_id; SELECT master_create_empty_shard('orders_subquery') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986 WHERE shardid = :new_shard_id; SELECT master_create_empty_shard('orders_subquery') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14946 WHERE shardid = :new_shard_id; SET citus.shard_max_size TO "1MB"; \copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|' \copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|' \copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' \copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' -- Check that we error out if shard min/max values are not exactly same. SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice) AS unit_price FROM lineitem_subquery, orders_subquery WHERE l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; -- Update metadata in order to make all shards equal. UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003; -- If group by is not on partition column then we error out. SELECT avg(order_count) FROM (SELECT l_suppkey, count(*) AS order_count FROM lineitem_subquery GROUP BY l_suppkey) AS order_counts; -- Check that we error out if join is not on partition columns. SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice / l_quantity) AS unit_price FROM lineitem_subquery, orders_subquery GROUP BY l_orderkey) AS unit_prices; SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice / l_quantity) AS unit_price FROM lineitem_subquery, orders_subquery WHERE l_orderkey = o_custkey GROUP BY l_orderkey) AS unit_prices; -- Check that we error out if there is union all. SELECT count(*) FROM ( (SELECT l_orderkey FROM lineitem_subquery) UNION ALL (SELECT 1::bigint) ) b; -- Check that we error out if queries in union do not include partition columns. SELECT count(*) FROM ( (SELECT l_orderkey FROM lineitem_subquery) UNION (SELECT l_partkey FROM lineitem_subquery) ) b; -- Check that we run union queries if partition column is selected. SELECT count(*) FROM ( (SELECT l_orderkey FROM lineitem_subquery) UNION (SELECT l_orderkey FROM lineitem_subquery) ) b; -- Check that we error out if the outermost query has subquery join. SELECT avg(o_totalprice/l_quantity) FROM (SELECT l_orderkey, l_quantity FROM lineitem_subquery ORDER BY l_quantity LIMIT 10 ) lineitem_quantities JOIN LATERAL (SELECT o_totalprice FROM orders_subquery WHERE lineitem_quantities.l_orderkey = o_orderkey) orders_price ON true; -- Check that we error out if the outermost query is a distinct clause. SELECT count(DISTINCT a) FROM ( SELECT count(*) a FROM lineitem_subquery ) z; -- Check supported subquery types. SELECT o_custkey, sum(order_count) as total_order_count FROM (SELECT o_orderkey, o_custkey, count(*) AS order_count FROM orders_subquery WHERE o_orderkey > 0 AND o_orderkey < 12000 GROUP BY o_orderkey, o_custkey) AS order_counts GROUP BY o_custkey ORDER BY total_order_count DESC, o_custkey ASC LIMIT 10; SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice / l_quantity) AS unit_price FROM lineitem_subquery, orders_subquery WHERE l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices WHERE unit_price > 1000 AND unit_price < 10000; -- Check that if subquery is pulled, we don't error and run query properly. SELECT count(*) FROM ( SELECT l_orderkey FROM ( (SELECT l_orderkey FROM lineitem_subquery) UNION (SELECT l_orderkey FROM lineitem_subquery) ) a WHERE l_orderkey = 1 ) b; SELECT count(*) FROM ( SELECT * FROM ( (SELECT * FROM lineitem_subquery) UNION (SELECT * FROM lineitem_subquery) ) a WHERE l_orderkey = 1 ) b; SELECT max(l_orderkey) FROM ( SELECT l_orderkey FROM ( SELECT l_orderkey FROM lineitem_subquery WHERE l_orderkey < 20000 GROUP BY l_orderkey ) z ) y; -- Add one more shard to one relation, then test if we error out because of different -- shard counts for joining relations. SELECT master_create_empty_shard('orders_subquery') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 15000, shardmaxvalue = 20000 WHERE shardid = :new_shard_id; SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice / l_quantity) AS unit_price FROM lineitem_subquery, orders_subquery WHERE l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; -- Check that we can prune shards in subqueries with VARCHAR partition columns CREATE TABLE subquery_pruning_varchar_test_table ( a varchar, b int ); SELECT master_create_distributed_table('subquery_pruning_varchar_test_table', 'a', 'hash'); SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1); SET citus.subquery_pushdown TO TRUE; SET client_min_messages TO DEBUG2; SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a) AS foo; SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE 'eren' = a GROUP BY a) AS foo; SET client_min_messages TO NOTICE; -- test subquery join on VARCHAR partition column SELECT * FROM (SELECT a_inner AS a FROM (SELECT subquery_pruning_varchar_test_table.a AS a_inner FROM subquery_pruning_varchar_test_table GROUP BY subquery_pruning_varchar_test_table.a HAVING count(subquery_pruning_varchar_test_table.a) < 3) AS f1, (SELECT subquery_pruning_varchar_test_table.a FROM subquery_pruning_varchar_test_table GROUP BY subquery_pruning_varchar_test_table.a HAVING sum(coalesce(subquery_pruning_varchar_test_table.b,0)) > 20.0) AS f2 WHERE f1.a_inner = f2.a GROUP BY a_inner) AS foo; DROP TABLE subquery_pruning_varchar_test_table;