-- =================================================================== -- test recursive planning functionality with subqueries and CTEs -- =================================================================== SET search_path TO subquery_and_ctes; -- prevent PG 11 - PG 12 outputs to diverge SET citus.enable_cte_inlining TO false; CREATE TABLE users_table_local AS SELECT * FROM users_table; CREATE TABLE dist_table (id int, value int); SELECT create_distributed_table('dist_table', 'id', colocate_with => 'users_table'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO dist_table (id, value) VALUES(1, 2),(2, 3),(3,4); CREATE FUNCTION func() RETURNS TABLE (id int, value int) AS $$ SELECT 1, 2 $$ LANGUAGE SQL; SET client_min_messages TO DEBUG1; -- CTEs are recursively planned, and subquery foo is also recursively planned -- final plan becomes a router plan WITH cte AS ( WITH local_cte AS ( SELECT * FROM users_table_local ), dist_cte AS ( SELECT user_id FROM events_table ) SELECT dist_cte.user_id FROM local_cte JOIN dist_cte ON dist_cte.user_id=local_cte.user_id ) SELECT count(*) FROM cte, (SELECT DISTINCT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (1,2,3,4) ORDER BY 1 DESC LIMIT 5 ) as foo WHERE foo.user_id = cte.user_id; DEBUG: generating subplan XXX_1 for CTE cte: WITH local_cte AS (SELECT users_table_local.user_id, users_table_local."time", users_table_local.value_1, users_table_local.value_2, users_table_local.value_3, users_table_local.value_4 FROM subquery_and_ctes.users_table_local), dist_cte AS (SELECT events_table.user_id FROM subquery_and_ctes.events_table) SELECT dist_cte.user_id FROM (local_cte JOIN dist_cte ON ((dist_cte.user_id OPERATOR(pg_catalog.=) local_cte.user_id))) DEBUG: generating subplan XXX_1 for CTE local_cte: SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM subquery_and_ctes.users_table_local DEBUG: generating subplan XXX_2 for CTE dist_cte: SELECT user_id FROM subquery_and_ctes.events_table DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT dist_cte.user_id FROM ((SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.value_1, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, value_1 integer, value_2 integer, value_3 double precision, value_4 bigint)) local_cte JOIN (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) dist_cte ON ((dist_cte.user_id OPERATOR(pg_catalog.=) local_cte.user_id))) DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT users_table.user_id FROM subquery_and_ctes.users_table, subquery_and_ctes.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) ORDER BY users_table.user_id DESC LIMIT 5 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) cte, (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) foo WHERE (foo.user_id OPERATOR(pg_catalog.=) cte.user_id) count --------------------------------------------------------------------- 1644 (1 row) -- CTEs are colocated, route entire query WITH cte1 AS ( SELECT * FROM users_table WHERE user_id = 1 ), cte2 AS ( SELECT * FROM events_table WHERE user_id = 1 ) SELECT cte1.user_id, cte1.value_1, cte2.user_id, cte2.event_type FROM cte1, cte2 ORDER BY cte1.user_id, cte1.value_1, cte2.user_id, cte2.event_type LIMIT 5; user_id | value_1 | user_id | event_type --------------------------------------------------------------------- 1 | 1 | 1 | 0 1 | 1 | 1 | 0 1 | 1 | 1 | 1 1 | 1 | 1 | 1 1 | 1 | 1 | 2 (5 rows) -- CTEs aren't colocated, CTEs become intermediate results WITH cte1 AS ( SELECT * FROM users_table WHERE user_id = 1 ), cte2 AS ( SELECT * FROM events_table WHERE user_id = 6 ) SELECT cte1.user_id, cte1.value_1, cte2.user_id, cte2.user_id FROM cte1, cte2 ORDER BY cte1.user_id, cte1.value_1, cte2.user_id, cte2.event_type LIMIT 5; DEBUG: generating subplan XXX_1 for CTE cte1: SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM subquery_and_ctes.users_table WHERE (user_id OPERATOR(pg_catalog.=) 1) DEBUG: generating subplan XXX_2 for CTE cte2: SELECT user_id, "time", event_type, value_2, value_3, value_4 FROM subquery_and_ctes.events_table WHERE (user_id OPERATOR(pg_catalog.=) 6) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte1.user_id, cte1.value_1, cte2.user_id, cte2.user_id FROM (SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.value_1, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, value_1 integer, value_2 integer, value_3 double precision, value_4 bigint)) cte1, (SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.event_type, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, event_type integer, value_2 integer, value_3 double precision, value_4 bigint)) cte2 ORDER BY cte1.user_id, cte1.value_1, cte2.user_id, cte2.event_type LIMIT 5 user_id | value_1 | user_id | user_id --------------------------------------------------------------------- 1 | 1 | 6 | 6 1 | 1 | 6 | 6 1 | 1 | 6 | 6 1 | 1 | 6 | 6 1 | 1 | 6 | 6 (5 rows) -- users_table & dist_table are colocated, route entire query WITH cte1 AS ( SELECT * FROM users_table WHERE user_id = 1 ) UPDATE dist_table dt SET value = cte1.value_1 FROM cte1 WHERE cte1.user_id = dt.id AND dt.id = 1; -- users_table & events_table & dist_table are colocated, route entire query WITH cte1 AS ( SELECT * FROM users_table WHERE user_id = 1 ), cte2 AS ( SELECT * FROM events_table WHERE user_id = 1 ) UPDATE dist_table dt SET value = cte1.value_1 + cte2.event_type FROM cte1, cte2 WHERE cte1.user_id = dt.id AND dt.id = 1; -- all relations are not colocated, CTEs become intermediate results WITH cte1 AS ( SELECT * FROM users_table WHERE user_id = 1 ), cte2 AS ( SELECT * FROM events_table WHERE user_id = 6 ) UPDATE dist_table dt SET value = cte1.value_1 + cte2.event_type FROM cte1, cte2 WHERE cte1.user_id = dt.id AND dt.id = 1; DEBUG: generating subplan XXX_1 for CTE cte1: SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM subquery_and_ctes.users_table WHERE (user_id OPERATOR(pg_catalog.=) 1) DEBUG: generating subplan XXX_2 for CTE cte2: SELECT user_id, "time", event_type, value_2, value_3, value_4 FROM subquery_and_ctes.events_table WHERE (user_id OPERATOR(pg_catalog.=) 6) DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE subquery_and_ctes.dist_table dt SET value = (cte1.value_1 OPERATOR(pg_catalog.+) cte2.event_type) FROM (SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.value_1, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, value_1 integer, value_2 integer, value_3 double precision, value_4 bigint)) cte1, (SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.event_type, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, event_type integer, value_2 integer, value_3 double precision, value_4 bigint)) cte2 WHERE ((cte1.user_id OPERATOR(pg_catalog.=) dt.id) AND (dt.id OPERATOR(pg_catalog.=) 1)) -- volatile function calls should not be routed WITH cte1 AS (SELECT id, value FROM func()) UPDATE dist_table dt SET value = cte1.value FROM cte1 WHERE dt.id = 1; DEBUG: generating subplan XXX_1 for CTE cte1: SELECT id, value FROM subquery_and_ctes.func() func(id, value) DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE subquery_and_ctes.dist_table dt SET value = cte1.value FROM (SELECT intermediate_result.id, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, value integer)) cte1 WHERE (dt.id OPERATOR(pg_catalog.=) 1) -- CTEs are recursively planned, and subquery foo is also recursively planned -- final plan becomes a real-time plan since we also have events_table in the -- range table entries WITH cte AS ( WITH local_cte AS ( SELECT * FROM users_table_local ), dist_cte AS ( SELECT user_id FROM events_table ) SELECT dist_cte.user_id FROM local_cte JOIN dist_cte ON dist_cte.user_id=local_cte.user_id ) SELECT count(*) FROM cte, (SELECT DISTINCT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (1,2,3,4) ORDER BY 1 DESC LIMIT 5 ) as foo, events_table WHERE foo.user_id = cte.user_id AND events_table.user_id = cte.user_id; DEBUG: generating subplan XXX_1 for CTE cte: WITH local_cte AS (SELECT users_table_local.user_id, users_table_local."time", users_table_local.value_1, users_table_local.value_2, users_table_local.value_3, users_table_local.value_4 FROM subquery_and_ctes.users_table_local), dist_cte AS (SELECT events_table.user_id FROM subquery_and_ctes.events_table) SELECT dist_cte.user_id FROM (local_cte JOIN dist_cte ON ((dist_cte.user_id OPERATOR(pg_catalog.=) local_cte.user_id))) DEBUG: generating subplan XXX_1 for CTE local_cte: SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM subquery_and_ctes.users_table_local DEBUG: generating subplan XXX_2 for CTE dist_cte: SELECT user_id FROM subquery_and_ctes.events_table DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT dist_cte.user_id FROM ((SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.value_1, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, value_1 integer, value_2 integer, value_3 double precision, value_4 bigint)) local_cte JOIN (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) dist_cte ON ((dist_cte.user_id OPERATOR(pg_catalog.=) local_cte.user_id))) DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT users_table.user_id FROM subquery_and_ctes.users_table, subquery_and_ctes.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) ORDER BY users_table.user_id DESC LIMIT 5 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) cte, (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) foo, subquery_and_ctes.events_table WHERE ((foo.user_id OPERATOR(pg_catalog.=) cte.user_id) AND (events_table.user_id OPERATOR(pg_catalog.=) cte.user_id)) count --------------------------------------------------------------------- 30608 (1 row) -- CTEs are replaced and subquery in WHERE is also replaced -- but the query is still real-time query since users_table is in the -- range table list WITH cte AS ( WITH local_cte AS ( SELECT * FROM users_table_local ), dist_cte AS ( SELECT user_id FROM events_table ) SELECT dist_cte.user_id FROM local_cte JOIN dist_cte ON dist_cte.user_id=local_cte.user_id ) SELECT DISTINCT cte.user_id FROM users_table, cte WHERE users_table.user_id = cte.user_id AND users_table.user_id IN (SELECT DISTINCT value_2 FROM users_table WHERE value_1 >= 1 AND value_1 <= 20 ORDER BY 1 LIMIT 5) ORDER BY 1 DESC; DEBUG: generating subplan XXX_1 for CTE cte: WITH local_cte AS (SELECT users_table_local.user_id, users_table_local."time", users_table_local.value_1, users_table_local.value_2, users_table_local.value_3, users_table_local.value_4 FROM subquery_and_ctes.users_table_local), dist_cte AS (SELECT events_table.user_id FROM subquery_and_ctes.events_table) SELECT dist_cte.user_id FROM (local_cte JOIN dist_cte ON ((dist_cte.user_id OPERATOR(pg_catalog.=) local_cte.user_id))) DEBUG: generating subplan XXX_1 for CTE local_cte: SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM subquery_and_ctes.users_table_local DEBUG: generating subplan XXX_2 for CTE dist_cte: SELECT user_id FROM subquery_and_ctes.events_table DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT dist_cte.user_id FROM ((SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.value_1, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, value_1 integer, value_2 integer, value_3 double precision, value_4 bigint)) local_cte JOIN (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) dist_cte ON ((dist_cte.user_id OPERATOR(pg_catalog.=) local_cte.user_id))) DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT value_2 FROM subquery_and_ctes.users_table WHERE ((value_1 OPERATOR(pg_catalog.>=) 1) AND (value_1 OPERATOR(pg_catalog.<=) 20)) ORDER BY value_2 LIMIT 5 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT DISTINCT cte.user_id FROM subquery_and_ctes.users_table, (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) cte WHERE ((users_table.user_id OPERATOR(pg_catalog.=) cte.user_id) AND (users_table.user_id OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)))) ORDER BY cte.user_id DESC user_id --------------------------------------------------------------------- 4 3 2 1 (4 rows) -- subquery in WHERE clause is planned recursively due to the recurring table -- in FROM clause WITH cte AS ( WITH local_cte AS ( SELECT * FROM users_table_local ), dist_cte AS ( SELECT user_id FROM events_table ) SELECT dist_cte.user_id FROM local_cte JOIN dist_cte ON dist_cte.user_id=local_cte.user_id ) SELECT DISTINCT cte.user_id FROM cte WHERE cte.user_id IN (SELECT DISTINCT user_id FROM users_table WHERE value_1 >= 1 AND value_1 <= 20) ORDER BY 1 DESC; DEBUG: generating subplan XXX_1 for CTE cte: WITH local_cte AS (SELECT users_table_local.user_id, users_table_local."time", users_table_local.value_1, users_table_local.value_2, users_table_local.value_3, users_table_local.value_4 FROM subquery_and_ctes.users_table_local), dist_cte AS (SELECT events_table.user_id FROM subquery_and_ctes.events_table) SELECT dist_cte.user_id FROM (local_cte JOIN dist_cte ON ((dist_cte.user_id OPERATOR(pg_catalog.=) local_cte.user_id))) DEBUG: generating subplan XXX_1 for CTE local_cte: SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM subquery_and_ctes.users_table_local DEBUG: generating subplan XXX_2 for CTE dist_cte: SELECT user_id FROM subquery_and_ctes.events_table DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT dist_cte.user_id FROM ((SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.value_1, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, value_1 integer, value_2 integer, value_3 double precision, value_4 bigint)) local_cte JOIN (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) dist_cte ON ((dist_cte.user_id OPERATOR(pg_catalog.=) local_cte.user_id))) DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT user_id FROM subquery_and_ctes.users_table WHERE ((value_1 OPERATOR(pg_catalog.>=) 1) AND (value_1 OPERATOR(pg_catalog.<=) 20)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT DISTINCT user_id FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) cte WHERE (user_id OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer))) ORDER BY user_id DESC user_id --------------------------------------------------------------------- 6 5 4 3 2 1 (6 rows) -- CTEs inside a subquery and the final query becomes a router -- query SELECT user_id FROM ( WITH cte AS ( SELECT DISTINCT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (1,2,3,4) ) SELECT * FROM cte ORDER BY 1 DESC ) as foo ORDER BY 1 DESC; DEBUG: generating subplan XXX_1 for CTE cte: SELECT DISTINCT users_table.user_id FROM subquery_and_ctes.users_table, subquery_and_ctes.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id FROM (SELECT cte.user_id FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) cte ORDER BY cte.user_id DESC) foo ORDER BY user_id DESC user_id --------------------------------------------------------------------- 6 5 4 3 2 1 (6 rows) -- CTEs inside a subquery and the final query becomes a -- real-time query since the other subquery is safe to pushdown SELECT bar.user_id FROM ( WITH cte AS ( SELECT DISTINCT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (1,2,3,4) ) SELECT * FROM cte ORDER BY 1 DESC ) as foo, ( SELECT DISTINCT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (1,2,3,4) ) as bar WHERE foo.user_id = bar.user_id ORDER BY 1 DESC; DEBUG: generating subplan XXX_1 for CTE cte: SELECT DISTINCT users_table.user_id FROM subquery_and_ctes.users_table, subquery_and_ctes.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT bar.user_id FROM (SELECT cte.user_id FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) cte ORDER BY cte.user_id DESC) foo, (SELECT DISTINCT users_table.user_id FROM subquery_and_ctes.users_table, subquery_and_ctes.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))) bar WHERE (foo.user_id OPERATOR(pg_catalog.=) bar.user_id) ORDER BY bar.user_id DESC user_id --------------------------------------------------------------------- 6 5 4 3 2 1 (6 rows) -- CTEs inside a deeper subquery -- and also the subquery that contains the CTE is replaced SELECT DISTINCT bar.user_id FROM ( WITH cte AS ( SELECT DISTINCT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (1,2,3,4) ) SELECT * FROM cte ORDER BY 1 DESC ) as foo, ( SELECT users_table.user_id, some_events.event_type FROM users_table, ( WITH cte AS ( SELECT event_type, users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND value_1 IN (1,2) ) SELECT * FROM cte ORDER BY 1 DESC ) as some_events WHERE users_table.user_id = some_events.user_id AND event_type IN (1,2,3,4) ORDER BY 2,1 LIMIT 2 ) as bar WHERE foo.user_id = bar.user_id ORDER BY 1 DESC LIMIT 5; DEBUG: generating subplan XXX_1 for CTE cte: SELECT DISTINCT users_table.user_id FROM subquery_and_ctes.users_table, subquery_and_ctes.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) DEBUG: generating subplan XXX_2 for CTE cte: SELECT events_table.event_type, users_table.user_id FROM subquery_and_ctes.users_table, subquery_and_ctes.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.value_1 OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2]))) DEBUG: push down of limit count: 2 DEBUG: generating subplan XXX_3 for subquery SELECT users_table.user_id, some_events.event_type FROM subquery_and_ctes.users_table, (SELECT cte.event_type, cte.user_id FROM (SELECT intermediate_result.event_type, intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(event_type integer, user_id integer)) cte ORDER BY cte.event_type DESC) some_events WHERE ((users_table.user_id OPERATOR(pg_catalog.=) some_events.user_id) AND (some_events.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) ORDER BY some_events.event_type, users_table.user_id LIMIT 2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT DISTINCT bar.user_id FROM (SELECT cte.user_id FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) cte ORDER BY cte.user_id DESC) foo, (SELECT intermediate_result.user_id, intermediate_result.event_type FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event_type integer)) bar WHERE (foo.user_id OPERATOR(pg_catalog.=) bar.user_id) ORDER BY bar.user_id DESC LIMIT 5 user_id --------------------------------------------------------------------- 1 (1 row) -- CTEs on the different parts of the query is replaced -- and subquery foo is also replaced since it contains -- DISTINCT on a non-partition key SELECT * FROM ( WITH cte AS ( WITH local_cte AS ( SELECT * FROM users_table_local ), dist_cte AS ( SELECT user_id FROM events_table ) SELECT dist_cte.user_id FROM local_cte JOIN dist_cte ON dist_cte.user_id=local_cte.user_id ) SELECT DISTINCT cte.user_id FROM users_table, cte WHERE users_table.user_id = cte.user_id AND users_table.user_id IN (WITH cte_in_where AS (SELECT DISTINCT value_2 FROM users_table WHERE value_1 >= 1 AND value_1 <= 20 ORDER BY 1 LIMIT 5) SELECT * FROM cte_in_where) ORDER BY 1 DESC ) as foo, events_table WHERE foo.user_id = events_table.value_2 ORDER BY 3 DESC, 2 DESC, 1 DESC LIMIT 5; DEBUG: generating subplan XXX_1 for CTE cte: WITH local_cte AS (SELECT users_table_local.user_id, users_table_local."time", users_table_local.value_1, users_table_local.value_2, users_table_local.value_3, users_table_local.value_4 FROM subquery_and_ctes.users_table_local), dist_cte AS (SELECT events_table.user_id FROM subquery_and_ctes.events_table) SELECT dist_cte.user_id FROM (local_cte JOIN dist_cte ON ((dist_cte.user_id OPERATOR(pg_catalog.=) local_cte.user_id))) DEBUG: generating subplan XXX_1 for CTE local_cte: SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM subquery_and_ctes.users_table_local DEBUG: generating subplan XXX_2 for CTE dist_cte: SELECT user_id FROM subquery_and_ctes.events_table DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT dist_cte.user_id FROM ((SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.value_1, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, value_1 integer, value_2 integer, value_3 double precision, value_4 bigint)) local_cte JOIN (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) dist_cte ON ((dist_cte.user_id OPERATOR(pg_catalog.=) local_cte.user_id))) DEBUG: generating subplan XXX_2 for CTE cte_in_where: SELECT DISTINCT value_2 FROM subquery_and_ctes.users_table WHERE ((value_1 OPERATOR(pg_catalog.>=) 1) AND (value_1 OPERATOR(pg_catalog.<=) 20)) ORDER BY value_2 LIMIT 5 DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_3 for subquery SELECT DISTINCT cte.user_id FROM subquery_and_ctes.users_table, (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) cte WHERE ((users_table.user_id OPERATOR(pg_catalog.=) cte.user_id) AND (users_table.user_id OPERATOR(pg_catalog.=) ANY (SELECT cte_in_where.value_2 FROM (SELECT intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)) cte_in_where))) ORDER BY cte.user_id DESC DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.user_id, events_table.user_id, events_table."time", events_table.event_type, events_table.value_2, events_table.value_3, events_table.value_4 FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) foo, subquery_and_ctes.events_table WHERE (foo.user_id OPERATOR(pg_catalog.=) events_table.value_2) ORDER BY events_table."time" DESC, events_table.user_id DESC, foo.user_id DESC LIMIT 5 DEBUG: push down of limit count: 5 user_id | user_id | time | event_type | value_2 | value_3 | value_4 --------------------------------------------------------------------- 4 | 1 | Thu Nov 23 21:54:46.924477 2017 | 6 | 4 | 5 | 2 | 4 | Thu Nov 23 18:10:21.338399 2017 | 1 | 2 | 4 | 4 | 3 | Thu Nov 23 18:08:26.550729 2017 | 2 | 4 | 3 | 2 | 3 | Thu Nov 23 16:44:41.903713 2017 | 4 | 2 | 2 | 1 | 3 | Thu Nov 23 16:31:56.219594 2017 | 5 | 1 | 2 | (5 rows) -- now recursively plan subqueries inside the CTEs that contains LIMIT and OFFSET WITH cte AS ( WITH local_cte AS ( SELECT * FROM users_table_local ), dist_cte AS ( SELECT user_id FROM events_table, (SELECT DISTINCT value_2 FROM users_table OFFSET 0) as foo WHERE events_table.user_id = foo.value_2 AND events_table.user_id IN (SELECT DISTINCT value_1 FROM users_table ORDER BY 1 LIMIT 3) ) SELECT dist_cte.user_id FROM local_cte JOIN dist_cte ON dist_cte.user_id=local_cte.user_id ) SELECT count(*) FROM cte, (SELECT DISTINCT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (1,2,3,4) ORDER BY 1 DESC LIMIT 5 ) as foo WHERE foo.user_id = cte.user_id; DEBUG: generating subplan XXX_1 for CTE cte: WITH local_cte AS (SELECT users_table_local.user_id, users_table_local."time", users_table_local.value_1, users_table_local.value_2, users_table_local.value_3, users_table_local.value_4 FROM subquery_and_ctes.users_table_local), dist_cte AS (SELECT events_table.user_id FROM subquery_and_ctes.events_table, (SELECT DISTINCT users_table.value_2 FROM subquery_and_ctes.users_table OFFSET 0) foo WHERE ((events_table.user_id OPERATOR(pg_catalog.=) foo.value_2) AND (events_table.user_id OPERATOR(pg_catalog.=) ANY (SELECT DISTINCT users_table.value_1 FROM subquery_and_ctes.users_table ORDER BY users_table.value_1 LIMIT 3)))) SELECT dist_cte.user_id FROM (local_cte JOIN dist_cte ON ((dist_cte.user_id OPERATOR(pg_catalog.=) local_cte.user_id))) DEBUG: generating subplan XXX_1 for CTE local_cte: SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM subquery_and_ctes.users_table_local DEBUG: generating subplan XXX_2 for CTE dist_cte: SELECT events_table.user_id FROM subquery_and_ctes.events_table, (SELECT DISTINCT users_table.value_2 FROM subquery_and_ctes.users_table OFFSET 0) foo WHERE ((events_table.user_id OPERATOR(pg_catalog.=) foo.value_2) AND (events_table.user_id OPERATOR(pg_catalog.=) ANY (SELECT DISTINCT users_table.value_1 FROM subquery_and_ctes.users_table ORDER BY users_table.value_1 LIMIT 3))) DEBUG: push down of limit count: 3 DEBUG: generating subplan XXX_1 for subquery SELECT DISTINCT value_1 FROM subquery_and_ctes.users_table ORDER BY value_1 LIMIT 3 DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT value_2 FROM subquery_and_ctes.users_table OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT events_table.user_id FROM subquery_and_ctes.events_table, (SELECT intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)) foo WHERE ((events_table.user_id OPERATOR(pg_catalog.=) foo.value_2) AND (events_table.user_id OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.value_1 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value_1 integer)))) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT dist_cte.user_id FROM ((SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.value_1, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, value_1 integer, value_2 integer, value_3 double precision, value_4 bigint)) local_cte JOIN (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) dist_cte ON ((dist_cte.user_id OPERATOR(pg_catalog.=) local_cte.user_id))) DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT users_table.user_id FROM subquery_and_ctes.users_table, subquery_and_ctes.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) ORDER BY users_table.user_id DESC LIMIT 5 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) cte, (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) foo WHERE (foo.user_id OPERATOR(pg_catalog.=) cte.user_id) count --------------------------------------------------------------------- 432 (1 row) -- the same query, but this time the CTEs also live inside a subquery SELECT * FROM ( WITH cte AS ( WITH local_cte AS ( SELECT * FROM users_table_local ), dist_cte AS ( SELECT user_id FROM events_table, (SELECT DISTINCT value_2 FROM users_table OFFSET 0) as foo WHERE events_table.user_id = foo.value_2 AND events_table.user_id IN (SELECT DISTINCT value_1 FROM users_table ORDER BY 1 LIMIT 3) ) SELECT dist_cte.user_id FROM local_cte JOIN dist_cte ON dist_cte.user_id=local_cte.user_id ) SELECT count(*) as cnt FROM cte, (SELECT DISTINCT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (1,2,3,4) ORDER BY 1 DESC LIMIT 5 ) as foo WHERE foo.user_id = cte.user_id ) as foo, users_table WHERE foo.cnt > users_table.value_2 ORDER BY 3 DESC, 1 DESC, 2 DESC, 4 DESC LIMIT 5; DEBUG: generating subplan XXX_1 for CTE cte: WITH local_cte AS (SELECT users_table_local.user_id, users_table_local."time", users_table_local.value_1, users_table_local.value_2, users_table_local.value_3, users_table_local.value_4 FROM subquery_and_ctes.users_table_local), dist_cte AS (SELECT events_table.user_id FROM subquery_and_ctes.events_table, (SELECT DISTINCT users_table.value_2 FROM subquery_and_ctes.users_table OFFSET 0) foo WHERE ((events_table.user_id OPERATOR(pg_catalog.=) foo.value_2) AND (events_table.user_id OPERATOR(pg_catalog.=) ANY (SELECT DISTINCT users_table.value_1 FROM subquery_and_ctes.users_table ORDER BY users_table.value_1 LIMIT 3)))) SELECT dist_cte.user_id FROM (local_cte JOIN dist_cte ON ((dist_cte.user_id OPERATOR(pg_catalog.=) local_cte.user_id))) DEBUG: generating subplan XXX_1 for CTE local_cte: SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM subquery_and_ctes.users_table_local DEBUG: generating subplan XXX_2 for CTE dist_cte: SELECT events_table.user_id FROM subquery_and_ctes.events_table, (SELECT DISTINCT users_table.value_2 FROM subquery_and_ctes.users_table OFFSET 0) foo WHERE ((events_table.user_id OPERATOR(pg_catalog.=) foo.value_2) AND (events_table.user_id OPERATOR(pg_catalog.=) ANY (SELECT DISTINCT users_table.value_1 FROM subquery_and_ctes.users_table ORDER BY users_table.value_1 LIMIT 3))) DEBUG: push down of limit count: 3 DEBUG: generating subplan XXX_1 for subquery SELECT DISTINCT value_1 FROM subquery_and_ctes.users_table ORDER BY value_1 LIMIT 3 DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT value_2 FROM subquery_and_ctes.users_table OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT events_table.user_id FROM subquery_and_ctes.events_table, (SELECT intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)) foo WHERE ((events_table.user_id OPERATOR(pg_catalog.=) foo.value_2) AND (events_table.user_id OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.value_1 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value_1 integer)))) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT dist_cte.user_id FROM ((SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.value_1, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, value_1 integer, value_2 integer, value_3 double precision, value_4 bigint)) local_cte JOIN (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) dist_cte ON ((dist_cte.user_id OPERATOR(pg_catalog.=) local_cte.user_id))) DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT users_table.user_id FROM subquery_and_ctes.users_table, subquery_and_ctes.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) ORDER BY users_table.user_id DESC LIMIT 5 DEBUG: generating subplan XXX_3 for subquery SELECT count(*) AS cnt FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) cte, (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) foo WHERE (foo.user_id OPERATOR(pg_catalog.=) cte.user_id) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.cnt, users_table.user_id, users_table."time", users_table.value_1, users_table.value_2, users_table.value_3, users_table.value_4 FROM (SELECT intermediate_result.cnt FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(cnt bigint)) foo, subquery_and_ctes.users_table WHERE (foo.cnt OPERATOR(pg_catalog.>) users_table.value_2) ORDER BY users_table."time" DESC, foo.cnt DESC, users_table.user_id DESC, users_table.value_1 DESC LIMIT 5 DEBUG: push down of limit count: 5 cnt | user_id | time | value_1 | value_2 | value_3 | value_4 --------------------------------------------------------------------- 432 | 1 | Thu Nov 23 17:30:34.635085 2017 | 3 | 4 | 4 | 432 | 1 | Thu Nov 23 17:23:03.441394 2017 | 5 | 4 | 3 | 432 | 3 | Thu Nov 23 17:18:51.048758 2017 | 1 | 5 | 5 | 432 | 3 | Thu Nov 23 17:10:35.959913 2017 | 4 | 3 | 1 | 432 | 5 | Thu Nov 23 16:48:32.08896 2017 | 5 | 2 | 1 | (5 rows) -- recursive CTES are not supported inside subqueries as well SELECT bar.user_id FROM ( WITH RECURSIVE cte AS ( SELECT DISTINCT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (1,2,3,4) ) SELECT * FROM cte ORDER BY 1 DESC ) as foo, ( SELECT DISTINCT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (1,2,3,4) ) as bar WHERE foo.user_id = bar.user_id ORDER BY 1 DESC; ERROR: recursive CTEs are not supported in distributed queries -- We error-out when there's an error in execution of the query. By repeating it -- multiple times, we increase the chance of this test failing before PR #1903. SET client_min_messages TO ERROR; DO $$ DECLARE errors_received INTEGER; BEGIN errors_received := 0; FOR i IN 1..3 LOOP BEGIN WITH cte as ( SELECT user_id, value_2 from events_table ) SELECT * FROM users_table where value_2 < ( SELECT min(cte.value_2) FROM cte WHERE users_table.user_id=cte.user_id GROUP BY user_id, cte.value_2); EXCEPTION WHEN OTHERS THEN IF SQLERRM LIKE 'more than one row returned by a subquery%%' THEN errors_received := errors_received + 1; ELSIF SQLERRM LIKE 'failed to execute task%' THEN errors_received := errors_received + 1; END IF; END; END LOOP; RAISE '(%/3) failed to execute one of the tasks', errors_received; END; $$; ERROR: (3/3) failed to execute one of the tasks CONTEXT: PL/pgSQL function inline_code_block line 31 at RAISE SET client_min_messages TO DEFAULT; DROP SCHEMA subquery_and_ctes CASCADE; NOTICE: drop cascades to 5 other objects DETAIL: drop cascades to table users_table drop cascades to table events_table drop cascades to table users_table_local drop cascades to table dist_table drop cascades to function func() SET search_path TO public;