SET citus.next_shard_id TO 1600000; \c "dbname=regression options='-c\ citus.use_secondary_nodes=always'" CREATE TABLE dest_table (a int, b int); CREATE TABLE source_table (a int, b int); -- attempts to change metadata should fail while reading from secondaries SELECT create_distributed_table('dest_table', 'a'); ERROR: writing to worker nodes is not currently allowed DETAIL: citus.use_secondary_nodes is set to 'always' \c "dbname=regression options='-c\ citus.use_secondary_nodes=never'" SELECT create_distributed_table('dest_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('source_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO dest_table (a, b) VALUES (1, 1); INSERT INTO dest_table (a, b) VALUES (2, 1); INSERT INTO source_table (a, b) VALUES (1, 5); INSERT INTO source_table (a, b) VALUES (10, 10); -- simluate actually having secondary nodes SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node; nodeid | groupid | nodename | nodeport | noderack | isactive | noderole | nodecluster --------------------------------------------------------------------- 1 | 1 | localhost | 57637 | default | t | primary | default 2 | 2 | localhost | 57638 | default | t | primary | default (2 rows) UPDATE pg_dist_node SET noderole = 'secondary'; \c "dbname=regression options='-c\ citus.use_secondary_nodes=always'" -- inserts are disallowed INSERT INTO dest_table (a, b) VALUES (1, 2); ERROR: writing to worker nodes is not currently allowed DETAIL: citus.use_secondary_nodes is set to 'always' -- router selects are allowed SELECT a FROM dest_table WHERE a = 1 ORDER BY 1; a --------------------------------------------------------------------- 1 (1 row) -- real-time selects are also allowed SELECT a FROM dest_table ORDER BY 1; a --------------------------------------------------------------------- 1 2 (2 rows) -- subqueries are also allowed SET client_min_messages TO DEBUG1; SELECT foo.a FROM ( WITH cte AS ( SELECT DISTINCT dest_table.a FROM dest_table, source_table WHERE source_table.a = dest_table.a AND dest_table.b IN (1,2,3,4) ) SELECT * FROM cte ORDER BY 1 DESC LIMIT 5 ) as foo ORDER BY 1; DEBUG: CTE cte is going to be inlined via distributed planning DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_1 for subquery SELECT a FROM (SELECT DISTINCT dest_table.a FROM public.dest_table, public.source_table WHERE ((source_table.a OPERATOR(pg_catalog.=) dest_table.a) AND (dest_table.b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))) cte ORDER BY a DESC LIMIT 5 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) foo ORDER BY a a --------------------------------------------------------------------- 1 (1 row) -- intermediate result pruning should still work SET citus.log_intermediate_results TO TRUE; SELECT * FROM ( WITH dest_table_cte AS ( SELECT a FROM dest_table ), joined_source_table_cte_1 AS ( SELECT b, a FROM source_table INNER JOIN dest_table_cte USING (a) ), joined_source_table_cte_2 AS ( SELECT b, a FROM joined_source_table_cte_1 INNER JOIN dest_table_cte USING (a) ) SELECT SUM(b) OVER (PARTITION BY coalesce(a, NULL)) FROM dest_table_cte INNER JOIN joined_source_table_cte_2 USING (a) ) inner_query; DEBUG: CTE joined_source_table_cte_1 is going to be inlined via distributed planning DEBUG: CTE joined_source_table_cte_2 is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table DEBUG: generating subplan XXX_2 for subquery SELECT sum(joined_source_table_cte_2.b) OVER (PARTITION BY COALESCE(dest_table_cte.a, NULL::integer)) AS sum FROM ((SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte JOIN (SELECT joined_source_table_cte_1.b, joined_source_table_cte_1.a FROM ((SELECT source_table.b, source_table.a FROM (public.source_table JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte_2 USING (a))) joined_source_table_cte_1 JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte_1 USING (a))) joined_source_table_cte_2 USING (a)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be written to local file sum --------------------------------------------------------------------- 5 (1 row) -- confirm that the pruning works well when using round-robin as well SET citus.task_assignment_policy to 'round-robin'; SELECT * FROM ( WITH dest_table_cte AS ( SELECT a FROM dest_table ), joined_source_table_cte_1 AS ( SELECT b, a FROM source_table INNER JOIN dest_table_cte USING (a) ), joined_source_table_cte_2 AS ( SELECT b, a FROM joined_source_table_cte_1 INNER JOIN dest_table_cte USING (a) ) SELECT SUM(b) OVER (PARTITION BY coalesce(a, NULL)) FROM dest_table_cte INNER JOIN joined_source_table_cte_2 USING (a) ) inner_query; DEBUG: CTE joined_source_table_cte_1 is going to be inlined via distributed planning DEBUG: CTE joined_source_table_cte_2 is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table DEBUG: generating subplan XXX_2 for subquery SELECT sum(joined_source_table_cte_2.b) OVER (PARTITION BY COALESCE(dest_table_cte.a, NULL::integer)) AS sum FROM ((SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte JOIN (SELECT joined_source_table_cte_1.b, joined_source_table_cte_1.a FROM ((SELECT source_table.b, source_table.a FROM (public.source_table JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte_2 USING (a))) joined_source_table_cte_1 JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte_1 USING (a))) joined_source_table_cte_2 USING (a)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx sum --------------------------------------------------------------------- 5 (1 row) SET citus.task_assignment_policy to DEFAULT; SET client_min_messages TO DEFAULT; SET citus.log_intermediate_results TO DEFAULT; -- insert into is definitely not allowed INSERT INTO dest_table (a, b) SELECT a, b FROM source_table; ERROR: writing to worker nodes is not currently allowed DETAIL: citus.use_secondary_nodes is set to 'always' \c "dbname=regression options='-c\ citus.use_secondary_nodes=never'" UPDATE pg_dist_node SET noderole = 'primary'; DROP TABLE source_table, dest_table;