--------------------------------------------------------------------- -- recursive_relation_planning_restirction_pushdown -- In this test file, we mosly test whether Citus -- can successfully pushdown filters to the subquery -- that is being recursively planned. This is done -- for all types of JOINs --------------------------------------------------------------------- -- all the queries in this file have the -- same tables/subqueries combination as below -- because this test aims to hold the query planning -- steady, but mostly ensure that filters are handled -- properly. Note that local is the relation that is -- recursively planned throughout the file CREATE SCHEMA push_down_filters; SET search_path TO push_down_filters; CREATE TABLE local_table (key int, value int, time timestamptz); CREATE TABLE distributed_table (key int, value int, metadata jsonb); SELECT create_distributed_table('distributed_table', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TYPE new_type AS (n int, m text); CREATE TABLE local_table_type (key int, value new_type, value_2 jsonb); CREATE TABLE distributed_table_type (key int, value new_type, value_2 jsonb); SELECT create_distributed_table('distributed_table_type', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) -- Setting the debug level so that filters can be observed SET client_min_messages TO DEBUG1; -- for the purposes of these tests, we always want to recursively -- plan local tables. SET citus.local_table_join_policy TO "prefer-local"; -- there are no filters, hence cannot pushdown any filters SELECT count(*) FROM distributed_table u1 JOIN distributed_table u2 USING(key) JOIN local_table USING (key); DEBUG: Wrapping relation "local_table" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT key FROM push_down_filters.local_table WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((push_down_filters.distributed_table u1 JOIN push_down_filters.distributed_table u2 USING (key)) JOIN (SELECT local_table_1.key, NULL::integer AS value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) local_table_1) local_table USING (key)) count --------------------------------------------------------------------- 0 (1 row) -- composite types can be pushed down SELECT count(*) FROM distributed_table d1 JOIN local_table_type d2 using(key) WHERE d2.value = (83, 'citus8.3')::new_type; DEBUG: Wrapping relation "local_table_type" "d2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT key, value FROM push_down_filters.local_table_type d2 WHERE (value OPERATOR(pg_catalog.=) '(83,citus8.3)'::push_down_filters.new_type) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table d1 JOIN (SELECT d2_1.key, d2_1.value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'text'::citus_copy_format) intermediate_result(key integer, value push_down_filters.new_type)) d2_1) d2 USING (key)) WHERE (d2.value OPERATOR(pg_catalog.=) ROW(83, 'citus8.3'::text)::push_down_filters.new_type) count --------------------------------------------------------------------- 0 (1 row) -- composite types can be pushed down SELECT count(*) FROM distributed_table d1 JOIN local_table_type d2 using(key) WHERE d2.value = (83, 'citus8.3')::new_type AND d2.key = 10; DEBUG: Wrapping relation "local_table_type" "d2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT key, value FROM push_down_filters.local_table_type d2 WHERE ((key OPERATOR(pg_catalog.=) 10) AND (value OPERATOR(pg_catalog.=) '(83,citus8.3)'::push_down_filters.new_type)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table d1 JOIN (SELECT d2_1.key, d2_1.value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'text'::citus_copy_format) intermediate_result(key integer, value push_down_filters.new_type)) d2_1) d2 USING (key)) WHERE ((d2.value OPERATOR(pg_catalog.=) ROW(83, 'citus8.3'::text)::push_down_filters.new_type) AND (d2.key OPERATOR(pg_catalog.=) 10)) count --------------------------------------------------------------------- 0 (1 row) -- join on a composite type works SELECT count(*) FROM distributed_table_type d1 JOIN local_table_type d2 USING(value); DEBUG: Wrapping relation "local_table_type" "d2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table_type d2 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table_type d1 JOIN (SELECT NULL::integer AS key, d2_1.value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'text'::citus_copy_format) intermediate_result(value push_down_filters.new_type)) d2_1) d2 USING (value)) count --------------------------------------------------------------------- 0 (1 row) -- scalar array expressions can be pushed down SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING (key) WHERE u2.key > ANY(ARRAY[2, 1, 6]); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT key FROM push_down_filters.local_table u2 WHERE (key OPERATOR(pg_catalog.>) ANY ('{2,1,6}'::integer[])) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT u2_1.key, NULL::integer AS value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) u2_1) u2 USING (key)) WHERE (u2.key OPERATOR(pg_catalog.>) ANY (ARRAY[2, 1, 6])) count --------------------------------------------------------------------- 0 (1 row) -- array operators on the table can be pushed down SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(key) WHERE ARRAY[u2.key, u2.value] @> (ARRAY[2, 3]); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT key, value FROM push_down_filters.local_table u2 WHERE (ARRAY[key, value] OPERATOR(pg_catalog.@>) '{2,3}'::integer[]) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT u2_1.key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value integer)) u2_1) u2 USING (key)) WHERE (ARRAY[u2.key, u2.value] OPERATOR(pg_catalog.@>) ARRAY[2, 3]) count --------------------------------------------------------------------- 0 (1 row) -- array operators on different tables cannot be pushed down SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE ARRAY[u2.value, u1.value] @> (ARRAY[2, 3]); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE (ARRAY[u2.value, u1.value] OPERATOR(pg_catalog.@>) ARRAY[2, 3]) count --------------------------------------------------------------------- 0 (1 row) -- coerced expressions can be pushed down SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE (u2.value/2.0 > 2)::int::bool::text::bool; DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE (((((((value)::numeric OPERATOR(pg_catalog./) 2.0) OPERATOR(pg_catalog.>) '2'::numeric))::integer)::boolean)::text)::boolean DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE (((((((u2.value)::numeric OPERATOR(pg_catalog./) 2.0) OPERATOR(pg_catalog.>) (2)::numeric))::integer)::boolean)::text)::boolean count --------------------------------------------------------------------- 0 (1 row) -- case expression on a single table can be pushed down SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE (CASE WHEN u2.value > 3 THEN u2.value > 2 ELSE false END); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE CASE WHEN (value OPERATOR(pg_catalog.>) 3) THEN (value OPERATOR(pg_catalog.>) 2) ELSE false END DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE CASE WHEN (u2.value OPERATOR(pg_catalog.>) 3) THEN (u2.value OPERATOR(pg_catalog.>) 2) ELSE false END count --------------------------------------------------------------------- 0 (1 row) -- case expression multiple tables cannot be pushed down SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE (CASE WHEN u1.value > 4000 THEN u2.value / 100 > 1 ELSE false END); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE CASE WHEN (u1.value OPERATOR(pg_catalog.>) 4000) THEN ((u2.value OPERATOR(pg_catalog./) 100) OPERATOR(pg_catalog.>) 1) ELSE false END count --------------------------------------------------------------------- 0 (1 row) -- coalesce expressions can be pushed down SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE COALESCE((u2.key/5.0)::int::bool, false); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT key, value FROM push_down_filters.local_table u2 WHERE COALESCE(((((key)::numeric OPERATOR(pg_catalog./) 5.0))::integer)::boolean, false) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT u2_1.key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value integer)) u2_1) u2 USING (value)) WHERE COALESCE(((((u2.key)::numeric OPERATOR(pg_catalog./) 5.0))::integer)::boolean, false) count --------------------------------------------------------------------- 0 (1 row) -- nullif expressions can be pushed down SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE NULLIF((u2.value/5.0)::int::bool, false); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE NULLIF(((((value)::numeric OPERATOR(pg_catalog./) 5.0))::integer)::boolean, false) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE NULLIF(((((u2.value)::numeric OPERATOR(pg_catalog./) 5.0))::integer)::boolean, false) count --------------------------------------------------------------------- 0 (1 row) -- null test can be pushed down SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE u2.value IS NOT NULL; DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE (value IS NOT NULL) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE (u2.value IS NOT NULL) count --------------------------------------------------------------------- 0 (1 row) -- functions can be pushed down SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE isfinite(u2.time); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value, "time" FROM push_down_filters.local_table u2 WHERE isfinite("time") DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, u2_1."time" FROM (SELECT intermediate_result.value, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer, "time" timestamp with time zone)) u2_1) u2 USING (value)) WHERE isfinite(u2."time") count --------------------------------------------------------------------- 0 (1 row) -- functions with multiple tables cannot be pushed down SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE int4smaller(u2.value, u1.value) = 55; DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE (int4smaller(u2.value, u1.value) OPERATOR(pg_catalog.=) 55) count --------------------------------------------------------------------- 0 (1 row) -- functions with multiple columns from the same tables can be pushed down SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE int4smaller(u2.key, u2.value) = u2.key; DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT key, value FROM push_down_filters.local_table u2 WHERE (int4smaller(key, value) OPERATOR(pg_catalog.=) key) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT u2_1.key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value integer)) u2_1) u2 USING (value)) WHERE (int4smaller(u2.key, u2.value) OPERATOR(pg_catalog.=) u2.key) count --------------------------------------------------------------------- 0 (1 row) -- row expressions can be pushdown SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE row(u2.value, 2, 3) > row(u2.value, 2, 3); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE (ROW(value, 2, 3) OPERATOR(pg_catalog.>) ROW(value, 2, 3)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE (ROW(u2.value, 2, 3) OPERATOR(pg_catalog.>) ROW(u2.value, 2, 3)) count --------------------------------------------------------------------- 0 (1 row) -- multiple expression from the same table can be pushed down together SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE (u2.key/1.0)::int::bool::text::bool AND CASE WHEN u2.key > 4000 THEN u2.value / 100 > 1 ELSE false END AND COALESCE((u2.key/50000)::bool, false) AND NULLIF((u2.value/50000)::int::bool, false) AND isfinite(u2.time) AND u2.value IS DISTINCT FROM 50040 AND row(u2.value, 2, 3) > row(2000, 2, 3); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT key, value, "time" FROM push_down_filters.local_table u2 WHERE (((((((key)::numeric OPERATOR(pg_catalog./) 1.0))::integer)::boolean)::text)::boolean AND CASE WHEN (key OPERATOR(pg_catalog.>) 4000) THEN ((value OPERATOR(pg_catalog./) 100) OPERATOR(pg_catalog.>) 1) ELSE false END AND COALESCE(((key OPERATOR(pg_catalog./) 50000))::boolean, false) AND NULLIF(((value OPERATOR(pg_catalog./) 50000))::boolean, false) AND isfinite("time") AND (value IS DISTINCT FROM 50040) AND (ROW(value, 2, 3) OPERATOR(pg_catalog.>) ROW(2000, 2, 3))) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT u2_1.key, u2_1.value, u2_1."time" FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value integer, "time" timestamp with time zone)) u2_1) u2 USING (value)) WHERE (((((((u2.key)::numeric OPERATOR(pg_catalog./) 1.0))::integer)::boolean)::text)::boolean AND CASE WHEN (u2.key OPERATOR(pg_catalog.>) 4000) THEN ((u2.value OPERATOR(pg_catalog./) 100) OPERATOR(pg_catalog.>) 1) ELSE false END AND COALESCE(((u2.key OPERATOR(pg_catalog./) 50000))::boolean, false) AND NULLIF(((u2.value OPERATOR(pg_catalog./) 50000))::boolean, false) AND isfinite(u2."time") AND (u2.value IS DISTINCT FROM 50040) AND (ROW(u2.value, 2, 3) OPERATOR(pg_catalog.>) ROW(2000, 2, 3))) count --------------------------------------------------------------------- 0 (1 row) -- subqueries filters are not pushdown SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE u2.value > (SELECT avg(key) FROM distributed_table); DEBUG: generating subplan XXX_1 for subquery SELECT avg(key) AS avg FROM push_down_filters.distributed_table DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_2 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE ((u2.value)::numeric OPERATOR(pg_catalog.>) (SELECT intermediate_result.avg FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(avg numeric))) count --------------------------------------------------------------------- 0 (1 row) -- even subqueries with constant values are not pushdowned SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE u2.value > (SELECT 5); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE (u2.value OPERATOR(pg_catalog.>) (SELECT 5)) count --------------------------------------------------------------------- 0 (1 row) -- filters involving multiple tables aren't pushed down SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE u2.value * u1.key > 25; DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE ((u2.value OPERATOR(pg_catalog.*) u1.key) OPERATOR(pg_catalog.>) 25) count --------------------------------------------------------------------- 0 (1 row) -- filter on other tables can only be pushdown -- as long as they are equality filters on the -- joining column SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE u1.value = 3; DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE (value OPERATOR(pg_catalog.=) 3) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE (u1.value OPERATOR(pg_catalog.=) 3) count --------------------------------------------------------------------- 0 (1 row) -- but not when the filter is gt, lt or any other thing SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE u1.value > 3; DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE (u1.value OPERATOR(pg_catalog.>) 3) count --------------------------------------------------------------------- 0 (1 row) -- when the filter is on another column than the -- join column, that's obviously not pushed down SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE u1.key = 3; DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE (u1.key OPERATOR(pg_catalog.=) 3) count --------------------------------------------------------------------- 0 (1 row) -- or filters on the same table is pushdown SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE u2.value > 4 OR u2.value = 4; DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE ((value OPERATOR(pg_catalog.>) 4) OR (value OPERATOR(pg_catalog.=) 4)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE ((u2.value OPERATOR(pg_catalog.>) 4) OR (u2.value OPERATOR(pg_catalog.=) 4)) count --------------------------------------------------------------------- 0 (1 row) -- and filters on the same table is pushdown SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE u2.value > 2 and u2.time IS NULL; DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value, "time" FROM push_down_filters.local_table u2 WHERE ((value OPERATOR(pg_catalog.>) 2) AND ("time" IS NULL)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, u2_1."time" FROM (SELECT intermediate_result.value, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer, "time" timestamp with time zone)) u2_1) u2 USING (value)) WHERE ((u2.value OPERATOR(pg_catalog.>) 2) AND (u2."time" IS NULL)) count --------------------------------------------------------------------- 0 (1 row) -- filters on different tables are pushdown -- only the ones that are not ANDed SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE (u2.value > 2 OR u2.value IS NULL) AND (u2.key > 4 OR u1.key > 3); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT key, value FROM push_down_filters.local_table u2 WHERE ((value OPERATOR(pg_catalog.>) 2) OR (value IS NULL)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT u2_1.key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value integer)) u2_1) u2 USING (value)) WHERE (((u2.value OPERATOR(pg_catalog.>) 2) OR (u2.value IS NULL)) AND ((u2.key OPERATOR(pg_catalog.>) 4) OR (u1.key OPERATOR(pg_catalog.>) 3))) count --------------------------------------------------------------------- 0 (1 row) -- filters on different tables are pushdown -- only the ones that are not ANDed SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE (u2.value > 2 OR u2.value IS NULL) OR (u2.key > 4 OR u1.key > 3); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT key, value FROM push_down_filters.local_table u2 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT u2_1.key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value integer)) u2_1) u2 USING (value)) WHERE ((u2.value OPERATOR(pg_catalog.>) 2) OR (u2.value IS NULL) OR ((u2.key OPERATOR(pg_catalog.>) 4) OR (u1.key OPERATOR(pg_catalog.>) 3))) count --------------------------------------------------------------------- 0 (1 row) -- filters on different tables are pushdown -- only the ones that are not ANDed SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE (u2.value > 2 OR u2.value IS NULL) AND (u2.key > 4 OR u1.key > 3); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT key, value FROM push_down_filters.local_table u2 WHERE ((value OPERATOR(pg_catalog.>) 2) OR (value IS NULL)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT u2_1.key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value integer)) u2_1) u2 USING (value)) WHERE (((u2.value OPERATOR(pg_catalog.>) 2) OR (u2.value IS NULL)) AND ((u2.key OPERATOR(pg_catalog.>) 4) OR (u1.key OPERATOR(pg_catalog.>) 3))) count --------------------------------------------------------------------- 0 (1 row) -- but volatile functions are not pushed down SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE (u2.value > 2 OR u1.value IS NULL) AND (u2.key = 10000 * random() OR u1.key > 3); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT key, value FROM push_down_filters.local_table u2 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT u2_1.key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value integer)) u2_1) u2 USING (value)) WHERE (((u2.value OPERATOR(pg_catalog.>) 2) OR (u1.value IS NULL)) AND (((u2.key)::double precision OPERATOR(pg_catalog.=) ((10000)::double precision OPERATOR(pg_catalog.*) random())) OR (u1.key OPERATOR(pg_catalog.>) 3))) count --------------------------------------------------------------------- 0 (1 row) -- constant results should be pushed down, but not supported yet SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) WHERE (u2.value > 2 AND false); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE false DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) WHERE ((u2.value OPERATOR(pg_catalog.>) 2) AND false) count --------------------------------------------------------------------- 0 (1 row) -- we can still pushdown WHERE false -- even if it is a LATERAL join SELECT count(*) FROM distributed_table u1 JOIN local_table u2 USING(value) JOIN LATERAL (SELECT value, random() FROM distributed_table WHERE u2.value = 15) AS u3 USING (value) WHERE (u2.value > 2 AND FALSE); DEBUG: Wrapping relation "local_table" "u2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT value FROM push_down_filters.local_table u2 WHERE false DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((push_down_filters.distributed_table u1 JOIN (SELECT NULL::integer AS key, u2_1.value, NULL::timestamp with time zone AS "time" FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer)) u2_1) u2 USING (value)) JOIN LATERAL (SELECT distributed_table.value, random() AS random FROM push_down_filters.distributed_table WHERE (u2.value OPERATOR(pg_catalog.=) 15)) u3 USING (value)) WHERE ((u2.value OPERATOR(pg_catalog.>) 2) AND false) ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -- Test Nested Select Query with Union, with Reference Tables CREATE TABLE tbl1(a int); CREATE TABLE tbl2(b int); INSERT INTO tbl1 VALUES (1); INSERT INTO tbl2 VALUES (1); SELECT create_reference_table('tbl1'); NOTICE: Copying data from local table... DEBUG: Copied 1 rows NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$push_down_filters.tbl1$$) create_reference_table --------------------------------------------------------------------- (1 row) SELECT MAX(x) FROM ( SELECT 1 as x FROM (SELECT 1 FROM tbl1, tbl2 WHERE b > 0) AS s1 WHERE true UNION ALL SELECT 1 as x FROM (SELECT 1 FROM tbl1, tbl2 WHERE b > 0) AS s1 WHERE false ) as res; DEBUG: Wrapping relation "tbl2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT b FROM push_down_filters.tbl2 WHERE (b OPERATOR(pg_catalog.>) 0) DEBUG: Wrapping relation "tbl2" to a subquery DEBUG: generating subplan XXX_2 for subquery SELECT b FROM push_down_filters.tbl2 WHERE false DEBUG: generating subplan XXX_3 for subquery SELECT 1 AS x FROM (SELECT 1 FROM push_down_filters.tbl1, (SELECT tbl2_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) tbl2_1) tbl2 WHERE (tbl2.b OPERATOR(pg_catalog.>) 0)) s1("?column?") WHERE true UNION ALL SELECT 1 AS x FROM (SELECT 1 FROM push_down_filters.tbl1, (SELECT tbl2_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) tbl2_1) tbl2 WHERE (tbl2.b OPERATOR(pg_catalog.>) 0)) s1("?column?") WHERE false DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT max(x) AS max FROM (SELECT intermediate_result.x FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(x integer)) res max --------------------------------------------------------------------- 1 (1 row) DROP TABLE tbl1, tbl2; CREATE table tbl2(a int, b int, d int); CREATE table tbl1(a int, b int, c int); INSERT INTO tbl1 VALUES (1,1,1); INSERT INTO tbl2 VALUES (1,1,1); SELECT create_distributed_table('tbl1', 'a'); NOTICE: Copying data from local table... DEBUG: Copied 1 rows NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$push_down_filters.tbl1$$) create_distributed_table --------------------------------------------------------------------- (1 row) SELECT MAX(x) FROM ( SELECT 1 as x FROM (SELECT 1 FROM tbl1, tbl2 WHERE tbl2.b > 0) AS s1 WHERE true UNION ALL SELECT 1 as x FROM (SELECT 1 FROM tbl1, tbl2 WHERE tbl2.b > 0) AS s1 WHERE false ) as res; DEBUG: Wrapping relation "tbl2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT b FROM push_down_filters.tbl2 WHERE (b OPERATOR(pg_catalog.>) 0) DEBUG: Wrapping relation "tbl2" to a subquery DEBUG: generating subplan XXX_2 for subquery SELECT b FROM push_down_filters.tbl2 WHERE false DEBUG: generating subplan XXX_3 for subquery SELECT 1 AS x FROM (SELECT 1 FROM push_down_filters.tbl1, (SELECT NULL::integer AS a, tbl2_1.b, NULL::integer AS d FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) tbl2_1) tbl2 WHERE (tbl2.b OPERATOR(pg_catalog.>) 0)) s1("?column?") WHERE true DEBUG: generating subplan XXX_4 for subquery SELECT 1 AS x FROM (SELECT 1 FROM push_down_filters.tbl1, (SELECT NULL::integer AS a, tbl2_1.b, NULL::integer AS d FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) tbl2_1) tbl2 WHERE (tbl2.b OPERATOR(pg_catalog.>) 0)) s1("?column?") WHERE false DEBUG: generating subplan XXX_5 for subquery SELECT intermediate_result.x FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(x integer) UNION ALL SELECT intermediate_result.x FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(x integer) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT max(x) AS max FROM (SELECT intermediate_result.x FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(x integer)) res max --------------------------------------------------------------------- 1 (1 row) SELECT undistribute_table('tbl1'); NOTICE: creating a new table for push_down_filters.tbl1 NOTICE: moving the data of push_down_filters.tbl1 NOTICE: dropping the old push_down_filters.tbl1 CONTEXT: SQL statement "DROP TABLE push_down_filters.tbl1 CASCADE" NOTICE: renaming the new table to push_down_filters.tbl1 undistribute_table --------------------------------------------------------------------- (1 row) SELECT create_reference_table('tbl1'); NOTICE: Copying data from local table... DEBUG: Copied 1 rows NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$push_down_filters.tbl1$$) create_reference_table --------------------------------------------------------------------- (1 row) SELECT MAX(x) FROM ( SELECT 1 as x FROM (SELECT 1 FROM tbl1, tbl2 WHERE tbl2.b > 0) AS s1 WHERE true UNION ALL SELECT 1 as x FROM (SELECT 1 FROM tbl1, tbl2 WHERE tbl2.b > 0) AS s1 WHERE false ) as res; DEBUG: Wrapping relation "tbl2" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT b FROM push_down_filters.tbl2 WHERE (b OPERATOR(pg_catalog.>) 0) DEBUG: Wrapping relation "tbl2" to a subquery DEBUG: generating subplan XXX_2 for subquery SELECT b FROM push_down_filters.tbl2 WHERE false DEBUG: generating subplan XXX_3 for subquery SELECT 1 AS x FROM (SELECT 1 FROM push_down_filters.tbl1, (SELECT NULL::integer AS a, tbl2_1.b, NULL::integer AS d FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) tbl2_1) tbl2 WHERE (tbl2.b OPERATOR(pg_catalog.>) 0)) s1("?column?") WHERE true UNION ALL SELECT 1 AS x FROM (SELECT 1 FROM push_down_filters.tbl1, (SELECT NULL::integer AS a, tbl2_1.b, NULL::integer AS d FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) tbl2_1) tbl2 WHERE (tbl2.b OPERATOR(pg_catalog.>) 0)) s1("?column?") WHERE false DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT max(x) AS max FROM (SELECT intermediate_result.x FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(x integer)) res max --------------------------------------------------------------------- 1 (1 row) \set VERBOSITY terse RESET client_min_messages; DROP SCHEMA push_down_filters CASCADE; NOTICE: drop cascades to 7 other objects