CREATE SCHEMA insert_select_single_shard_table; SET search_path TO insert_select_single_shard_table; SET citus.next_shard_id TO 1820000; SET citus.shard_count TO 32; SET citus.shard_replication_factor TO 1; SET client_min_messages TO NOTICE; CREATE TABLE nullkey_c1_t1(a int, b int); CREATE TABLE nullkey_c1_t2(a int, b int); SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE nullkey_c2_t1(a int, b int); SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE reference_table(a int, b int); SELECT create_reference_table('reference_table'); create_reference_table --------------------------------------------------------------------- (1 row) CREATE TABLE distributed_table_c1_t1(a int, b int); SELECT create_distributed_table('distributed_table_c1_t1', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE distributed_table_c1_t2(a int, b int); SELECT create_distributed_table('distributed_table_c1_t2', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE distributed_table_c2_t1(a int, b int); SELECT create_distributed_table('distributed_table_c2_t1', 'a', colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE citus_local_table(a int, b int); SELECT citus_add_local_table_to_metadata('citus_local_table'); citus_add_local_table_to_metadata --------------------------------------------------------------------- (1 row) CREATE TABLE postgres_local_table(a int, b int); CREATE FUNCTION reload_tables() RETURNS void AS $$ BEGIN SET LOCAL client_min_messages TO WARNING; TRUNCATE nullkey_c1_t1, nullkey_c1_t2, nullkey_c2_t1, reference_table, distributed_table_c1_t1, distributed_table_c1_t2, distributed_table_c2_t1, citus_local_table, postgres_local_table; INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i; INSERT INTO nullkey_c1_t2 SELECT i, i FROM generate_series(2, 7) i; INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i; INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; INSERT INTO distributed_table_c1_t1 SELECT i, i FROM generate_series(3, 8) i; INSERT INTO distributed_table_c1_t2 SELECT i, i FROM generate_series(2, 9) i; INSERT INTO distributed_table_c2_t1 SELECT i, i FROM generate_series(5, 10) i; INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; END; $$ LANGUAGE plpgsql; SELECT reload_tables(); reload_tables --------------------------------------------------------------------- (1 row) CREATE TABLE append_table (a int, b int); SELECT create_distributed_table('append_table', 'a', 'append'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT master_create_empty_shard('append_table') AS shardid1 \gset SELECT master_create_empty_shard('append_table') AS shardid2 \gset SELECT master_create_empty_shard('append_table') AS shardid3 \gset COPY append_table (a, b) FROM STDIN WITH (format 'csv', append_to_shard :shardid1); COPY append_table (a, b) FROM STDIN WITH (format 'csv', append_to_shard :shardid2); CREATE TABLE range_table(a int, b int); SELECT create_distributed_table('range_table', 'a', 'range'); create_distributed_table --------------------------------------------------------------------- (1 row) CALL public.create_range_partitioned_shards('range_table', '{"0","25"}','{"24","49"}'); INSERT INTO range_table VALUES (0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 50); CREATE MATERIALIZED VIEW matview AS SELECT b*2+a AS a, a*a AS b FROM nullkey_c1_t1; SET client_min_messages TO DEBUG2; -- Test inserting into a distributed table by selecting from a combination of -- different table types together with single-shard tables. -- use a single-shard table INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1; DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a reference table INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a); DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 RIGHT JOIN reference_table USING (b) WHERE reference_table.a >= 1 AND reference_table.a <= 5; DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b); DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 INTERSECT SELECT * FROM reference_table; DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a colocated single-shard table INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b); DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a); DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table_c1_t1 SELECT COALESCE(nullkey_c1_t1.a, 1), nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN matview USING (a); DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "matview" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.matview WHERE true DEBUG: recursively planning left side of the full join since the other side is a recurring rel DEBUG: recursively planning distributed relation "nullkey_c1_t1" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: Wrapping relation "nullkey_c1_t1" to a subquery DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: generating subplan XXX_2 for subquery SELECT a, b FROM insert_select_single_shard_table.nullkey_c1_t1 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT COALESCE(nullkey_c1_t1.a, 1) AS a, nullkey_c1_t1.b FROM ((SELECT nullkey_c1_t1_1.a, nullkey_c1_t1_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1_1) nullkey_c1_t1 FULL JOIN (SELECT matview_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) matview_1) matview USING (a)) DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c1_t2; DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a non-colocated single-shard table INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a); DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: router planner does not support queries that reference non-colocated distributed tables ERROR: cannot push down this subquery DETAIL: nullkey_c1_t2 and nullkey_c2_t1 are not colocated INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c2_t1; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM insert_select_single_shard_table.nullkey_c1_t1 DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: generating subplan XXX_2 for subquery SELECT a, b FROM insert_select_single_shard_table.nullkey_c2_t1 DEBUG: Creating router plan DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) UNION SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_insert_select_subquery DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator SET client_min_messages TO DEBUG1; SET citus.enable_repartition_joins TO ON; -- use a distributed table that is colocated with the target table, with repartition joins enabled INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b); DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1; DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a distributed table that is not colocated with the target table, with repartition joins enabled INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN distributed_table_c2_t1 USING (a); DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Collecting INSERT ... SELECT results on coordinator RESET citus.enable_repartition_joins; SET client_min_messages TO DEBUG2; -- use a citus local table INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a); DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "citus_local_table" to a subquery DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.citus_local_table WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM (insert_select_single_shard_table.nullkey_c1_t1 JOIN (SELECT citus_local_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) citus_local_table_1) citus_local_table USING (a)) DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a postgres local table INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a); DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "postgres_local_table" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.postgres_local_table WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM (insert_select_single_shard_table.nullkey_c1_t2 JOIN (SELECT postgres_local_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) postgres_local_table_1) postgres_local_table USING (a)) DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use append / range distributed tables INSERT INTO range_table SELECT * FROM nullkey_c1_t1; DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO append_table SELECT * FROM nullkey_c1_t1; DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: INSERT ... SELECT into an append-distributed table is not supported ERROR: INSERT ... SELECT into an append-distributed table is not supported SELECT avg(a), avg(b) FROM distributed_table_c1_t1 ORDER BY 1, 2; DEBUG: Router planner cannot handle multi-shard select queries avg | avg --------------------------------------------------------------------- 4.3421052631578947 | 4.5277777777777778 (1 row) TRUNCATE distributed_table_c1_t1; INSERT INTO distributed_table_c1_t1 SELECT i, i FROM generate_series(3, 8) i; DEBUG: Creating router plan DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Collecting INSERT ... SELECT results on coordinator -- Test inserting into a reference table by selecting from a combination of -- different table types together with single-shard tables. -- use a single-shard table INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1; DEBUG: Creating router plan DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a reference table INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a); DEBUG: Creating router plan DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b); DEBUG: Creating router plan DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 UNION SELECT * FROM reference_table; DEBUG: Creating router plan DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b) WHERE b IN (SELECT b FROM matview); DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Local tables cannot be used in distributed queries. DEBUG: generating subplan XXX_1 for subquery SELECT b FROM insert_select_single_shard_table.matview DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM (insert_select_single_shard_table.nullkey_c1_t2 LEFT JOIN insert_select_single_shard_table.reference_table USING (b)) WHERE (nullkey_c1_t2.b OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer))) DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a colocated single-shard table INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b); DEBUG: Creating router plan DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a); DEBUG: Creating router plan DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a non-colocated single-shard table INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a); DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT DEBUG: router planner does not support queries that reference non-colocated distributed tables ERROR: cannot push down this subquery DETAIL: nullkey_c1_t2 and nullkey_c2_t1 are not colocated -- use a distributed table SET client_min_messages TO DEBUG1; SET citus.enable_repartition_joins TO ON; INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b); DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1; DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Collecting INSERT ... SELECT results on coordinator RESET citus.enable_repartition_joins; SET client_min_messages TO DEBUG2; -- use a citus local table INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a); DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "citus_local_table" to a subquery DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.citus_local_table WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM (insert_select_single_shard_table.nullkey_c1_t1 JOIN (SELECT citus_local_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) citus_local_table_1) citus_local_table USING (a)) DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a postgres local table INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a); DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "postgres_local_table" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.postgres_local_table WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM (insert_select_single_shard_table.nullkey_c1_t2 JOIN (SELECT postgres_local_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) postgres_local_table_1) postgres_local_table USING (a)) DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator SELECT avg(a), avg(b) FROM reference_table ORDER BY 1, 2; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan avg | avg --------------------------------------------------------------------- 4.3063063063063063 | 4.3063063063063063 (1 row) TRUNCATE reference_table; INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; DEBUG: Creating router plan DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Collecting INSERT ... SELECT results on coordinator -- Test inserting into a citus local table by selecting from a combination of -- different table types together with single-shard tables. -- use a single-shard table INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1; DEBUG: Creating router plan DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a reference table INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a); DEBUG: Creating router plan DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a colocated single-shard table INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b); DEBUG: Creating router plan DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a distributed table SET client_min_messages TO DEBUG1; SET citus.enable_repartition_joins TO ON; INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a); DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata DEBUG: Collecting INSERT ... SELECT results on coordinator RESET citus.enable_repartition_joins; SET client_min_messages TO DEBUG2; -- use a citus local table INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a); DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "citus_local_table" to a subquery DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.citus_local_table WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM (insert_select_single_shard_table.nullkey_c1_t1 JOIN (SELECT citus_local_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) citus_local_table_1) citus_local_table USING (a)) DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a postgres local table INSERT INTO citus_local_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a); DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "postgres_local_table" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.postgres_local_table WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM (insert_select_single_shard_table.nullkey_c1_t2 JOIN (SELECT postgres_local_table_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) postgres_local_table_1) postgres_local_table USING (a)) DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator SELECT avg(a), avg(b) FROM citus_local_table ORDER BY 1, 2; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan avg | avg --------------------------------------------------------------------- 4.5270270270270270 | 4.5270270270270270 (1 row) TRUNCATE citus_local_table; INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; DEBUG: Creating router plan DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Collecting INSERT ... SELECT results on coordinator -- Test inserting into a single-shard table by selecting from a combination of -- different table types, together with or without single-shard tables. -- use a postgres local table INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table; DEBUG: Creating router plan DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table JOIN reference_table USING (a); DEBUG: Creating router plan DEBUG: distributed INSERT ... SELECT cannot select from a local table DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN nullkey_c1_t1 USING (a); DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "postgres_local_table" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM insert_select_single_shard_table.postgres_local_table WHERE true DEBUG: recursively planning right side of the left join since the outer side is a recurring rel DEBUG: recursively planning distributed relation "nullkey_c1_t1" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: Wrapping relation "nullkey_c1_t1" to a subquery DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: generating subplan XXX_2 for subquery SELECT a FROM insert_select_single_shard_table.nullkey_c1_t1 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT postgres_local_table.a, postgres_local_table.b FROM ((SELECT postgres_local_table_1.a, postgres_local_table_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) postgres_local_table_1) postgres_local_table LEFT JOIN (SELECT nullkey_c1_t1_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) nullkey_c1_t1_1) nullkey_c1_t1 USING (a)) DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a citus local table INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table; DEBUG: Creating router plan DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN reference_table USING (a) JOIN postgres_local_table USING (a) ORDER BY 1,2 OFFSET 7; DEBUG: Creating router plan DEBUG: distributed INSERT ... SELECT cannot select from a local table DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN nullkey_c1_t1 USING (a); DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time DEBUG: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "citus_local_table" to a subquery DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM insert_select_single_shard_table.citus_local_table WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT citus_local_table.a, citus_local_table.b FROM ((SELECT citus_local_table_1.a, citus_local_table_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_local_table_1) citus_local_table JOIN insert_select_single_shard_table.nullkey_c1_t1 USING (a)) DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a distributed table INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2; DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN reference_table USING (a); DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Collecting INSERT ... SELECT results on coordinator SET client_min_messages TO DEBUG1; SET citus.enable_repartition_joins TO ON; INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN nullkey_c1_t1 USING (a); DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Collecting INSERT ... SELECT results on coordinator RESET citus.enable_repartition_joins; SET client_min_messages TO DEBUG2; -- use a non-colocated single-shard table INSERT INTO nullkey_c2_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a)) q JOIN nullkey_c1_t2 USING (a); DEBUG: Creating router plan DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use a materialized view INSERT INTO nullkey_c1_t1 SELECT * FROM matview; DEBUG: Creating router plan DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO nullkey_c1_t1 SELECT reference_table.a, reference_table.b FROM reference_table JOIN matview ON (reference_table.a = matview.a); DEBUG: Creating router plan DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO nullkey_c1_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table JOIN nullkey_c1_t1 USING (a)) q JOIN matview USING (a); DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Local tables cannot be used in distributed queries. DEBUG: Wrapping relation "matview" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT a FROM insert_select_single_shard_table.matview WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT q.a, q.b FROM ((SELECT reference_table.a, reference_table.b FROM (insert_select_single_shard_table.reference_table JOIN insert_select_single_shard_table.nullkey_c1_t1 USING (a))) q JOIN (SELECT matview_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) matview_1) matview USING (a)) DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- use append / range distributed tables INSERT INTO nullkey_c1_t1 SELECT * FROM range_table; DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO nullkey_c1_t1 SELECT * FROM append_table; DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT DEBUG: Router planner does not support append-partitioned tables. DEBUG: Collecting INSERT ... SELECT results on coordinator SELECT avg(a), avg(b) FROM nullkey_c1_t1 ORDER BY 1, 2; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan avg | avg --------------------------------------------------------------------- 5.6971153846153846 | 8.4903846153846154 (1 row) SELECT avg(a), avg(b) FROM nullkey_c2_t1 ORDER BY 1, 2; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan avg | avg --------------------------------------------------------------------- 3.9864864864864865 | 3.9864864864864865 (1 row) TRUNCATE nullkey_c1_t1, nullkey_c2_t1; INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i; DEBUG: Creating router plan DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i; DEBUG: Creating router plan DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Collecting INSERT ... SELECT results on coordinator -- Test inserting into a local table by selecting from a combination of -- different table types, together with or without single-shard tables. INSERT INTO postgres_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a); DEBUG: Creating router plan INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 ORDER BY 1,2 OFFSET 3 LIMIT 2; DEBUG: Creating router plan WITH cte_1 AS ( DELETE FROM nullkey_c1_t1 WHERE a >= 1 and a <= 4 RETURNING * ) INSERT INTO postgres_local_table SELECT cte_1.* FROM cte_1 LEFT JOIN nullkey_c1_t2 USING (a) WHERE nullkey_c1_t2.a IS NULL; DEBUG: Creating router plan INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 EXCEPT SELECT * FROM postgres_local_table; DEBUG: Local tables cannot be used in distributed queries. DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM insert_select_single_shard_table.postgres_local_table DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: generating subplan XXX_2 for subquery SELECT a, b FROM insert_select_single_shard_table.nullkey_c1_t1 DEBUG: Creating router plan DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) EXCEPT SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_insert_select_subquery DEBUG: Creating router plan SELECT avg(a), avg(b) FROM postgres_local_table ORDER BY 1, 2; avg | avg --------------------------------------------------------------------- 5.0000000000000000 | 5.0000000000000000 (1 row) TRUNCATE postgres_local_table; INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; -- Try slightly more complex queries. SET client_min_messages TO DEBUG1; WITH cte_1 AS ( SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a) ), cte_2 AS ( SELECT reference_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN reference_table USING (b) ) INSERT INTO distributed_table_c1_t1 SELECT cte_1.* FROM cte_1 JOIN cte_2 USING (a) JOIN distributed_table_c1_t2 USING (a) ORDER BY 1,2; DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: CTE cte_1 is going to be inlined via distributed planning DEBUG: CTE cte_2 is going to be inlined via distributed planning DEBUG: Wrapping relation "postgres_local_table" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT b FROM insert_select_single_shard_table.postgres_local_table WHERE true DEBUG: generating subplan XXX_2 for subquery SELECT nullkey_c1_t1.a, reference_table.b FROM (insert_select_single_shard_table.nullkey_c1_t1 JOIN insert_select_single_shard_table.reference_table USING (a)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM (SELECT cte_1.a, cte_1.b FROM (((SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 JOIN (SELECT reference_table.a, postgres_local_table.b FROM ((SELECT NULL::integer AS a, postgres_local_table_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) postgres_local_table_1) postgres_local_table LEFT JOIN insert_select_single_shard_table.reference_table USING (b))) cte_2 USING (a)) JOIN insert_select_single_shard_table.distributed_table_c1_t2 USING (a)) ORDER BY cte_1.a, cte_1.b) citus_insert_select_subquery DEBUG: performing repartitioned INSERT ... SELECT SET client_min_messages TO DEBUG2; WITH cte_1 AS ( SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a) ), cte_2 AS ( SELECT * FROM nullkey_c1_t2 WHERE EXISTS ( SELECT 1 FROM reference_table WHERE reference_table.a = nullkey_c1_t2.a ) ORDER BY 1,2 OFFSET 1 LIMIT 4 ) INSERT INTO distributed_table_c1_t1 SELECT * FROM cte_1 UNION SELECT * FROM cte_2 EXCEPT SELECT * FROM reference_table; DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: CTE cte_1 is going to be inlined via distributed planning DEBUG: CTE cte_2 is going to be inlined via distributed planning DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table_c1_t1 (a, b) SELECT t1.a, t2.b FROM nullkey_c1_t1 t1 JOIN ( SELECT b FROM nullkey_c1_t2 ORDER BY b DESC LIMIT 1 ) t2 ON t1.b < t2.b; DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table_c1_t1 (a, b) WITH cte AS ( SELECT a, b, (SELECT a FROM nullkey_c1_t2 WHERE b = t.b) AS d1, (SELECT a FROM reference_table WHERE b = t.b) AS d2 FROM nullkey_c1_t1 t ) SELECT d1, COALESCE(d2, a) FROM cte WHERE d1 IS NOT NULL AND d2 IS NOT NULL; DEBUG: CTE cte is going to be inlined via distributed planning DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO citus_local_table (a, b) SELECT t1.a, t2.b FROM nullkey_c1_t1 t1 CROSS JOIN ( SELECT b FROM nullkey_c2_t1 ORDER BY b LIMIT 1 ) t2; DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: generating subplan XXX_1 for subquery SELECT b FROM insert_select_single_shard_table.nullkey_c2_t1 ORDER BY b LIMIT 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT t1.a, t2.b FROM (insert_select_single_shard_table.nullkey_c1_t1 t1 CROSS JOIN (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) t2) DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table_c1_t1 (a, b) SELECT t1.a, t2.b FROM reference_table t1 LEFT JOIN ( SELECT b, ROW_NUMBER() OVER (ORDER BY b DESC) AS rn FROM nullkey_c1_t1 ) t2 ON t1.b = t2.b WHERE t2.rn > 0; DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO nullkey_c1_t1 (a, b) SELECT t1.a, t2.b FROM nullkey_c1_t1 t1 JOIN ( SELECT rn, b FROM ( SELECT b, ROW_NUMBER() OVER (ORDER BY b DESC) AS rn FROM distributed_table_c2_t1 ) q ) t2 ON t1.b = t2.b WHERE t2.rn > 2; DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for subquery SELECT b, row_number() OVER (ORDER BY b DESC) AS rn FROM insert_select_single_shard_table.distributed_table_c2_t1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT t1.a, t2.b FROM (insert_select_single_shard_table.nullkey_c1_t1 t1 JOIN (SELECT q.rn, q.b FROM (SELECT intermediate_result.b, intermediate_result.rn FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer, rn bigint)) q) t2 ON ((t1.b OPERATOR(pg_catalog.=) t2.b))) WHERE (t2.rn OPERATOR(pg_catalog.>) 2) DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table_c1_t1 (a, b) SELECT t1.a, t2.b FROM nullkey_c1_t1 t1 JOIN ( SELECT sum_val, b FROM ( SELECT b, SUM(a) OVER (PARTITION BY b) AS sum_val FROM nullkey_c1_t1 ) q ) t2 ON t1.b = t2.b WHERE t2.sum_val > 2; DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- Temporaryly reduce the verbosity to avoid noise -- in the output of the next query. SET client_min_messages TO DEBUG1; INSERT INTO nullkey_c1_t1 SELECT DISTINCT ON (a) a, b FROM nullkey_c1_t2; -- keep low verbosity as PG15 and PG14 produces slightly different outputs INSERT INTO nullkey_c1_t1 SELECT b, SUM(a) OVER (ORDER BY b) AS sum_val FROM nullkey_c1_t1; SET client_min_messages TO DEBUG2; INSERT INTO nullkey_c2_t1 SELECT t2.a, t2.b FROM nullkey_c1_t1 AS t2 JOIN reference_table AS t3 ON (t2.a = t3.a) WHERE NOT EXISTS ( SELECT 1 FROM nullkey_c1_t2 AS t1 WHERE t1.b = t3.b ); DEBUG: Creating router plan DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table_c1_t1 SELECT t1.a, t1.b FROM nullkey_c1_t1 AS t1 WHERE t1.a NOT IN ( SELECT DISTINCT t2.a FROM distributed_table_c1_t2 AS t2 ); DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for subquery SELECT DISTINCT a FROM insert_select_single_shard_table.distributed_table_c1_t2 t2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM insert_select_single_shard_table.nullkey_c1_t1 t1 WHERE (NOT (a OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)))) DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO distributed_table_c1_t1 SELECT t1.a, t1.b FROM reference_table AS t1 JOIN ( SELECT t2.a FROM ( SELECT a FROM nullkey_c1_t1 UNION SELECT a FROM nullkey_c1_t2 ) AS t2 ) AS t3 ON t1.a = t3.a; DEBUG: Creating router plan DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- Temporaryly reduce the verbosity to avoid noise -- in the output of the next query. SET client_min_messages TO DEBUG1; INSERT INTO nullkey_c1_t1 SELECT t1.a, t1.b FROM reference_table AS t1 WHERE t1.a IN ( SELECT t2.a FROM ( SELECT t3.a FROM ( SELECT a FROM distributed_table_c1_t1 WHERE b > 4 ) AS t3 JOIN ( SELECT a FROM distributed_table_c1_t2 WHERE b < 7 ) AS t4 ON t3.a = t4.a ) AS t2 ); DEBUG: correlated subqueries are not supported when the FROM clause contains a reference table DEBUG: generating subplan XXX_1 for subquery SELECT a FROM (SELECT t3.a FROM ((SELECT distributed_table_c1_t1.a FROM insert_select_single_shard_table.distributed_table_c1_t1 WHERE (distributed_table_c1_t1.b OPERATOR(pg_catalog.>) 4)) t3 JOIN (SELECT distributed_table_c1_t2.a FROM insert_select_single_shard_table.distributed_table_c1_t2 WHERE (distributed_table_c1_t2.b OPERATOR(pg_catalog.<) 7)) t4 ON ((t3.a OPERATOR(pg_catalog.=) t4.a)))) t2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM insert_select_single_shard_table.reference_table t1 WHERE (a OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer))) DEBUG: Collecting INSERT ... SELECT results on coordinator SET client_min_messages TO DEBUG2; -- test upsert with plain INSERT query CREATE TABLE upsert_test_1 ( unique_col int UNIQUE, other_col int, third_col int ); DEBUG: CREATE TABLE / UNIQUE will create implicit index "upsert_test_1_unique_col_key" for table "upsert_test_1" SELECT create_distributed_table('upsert_test_1', null); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE upsert_test_2(key int primary key, value text); DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "upsert_test_2_pkey" for table "upsert_test_2" SELECT create_distributed_table('upsert_test_2', null); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO upsert_test_2 AS upsert_test_2_alias (key, value) VALUES (1, '5') ON CONFLICT(key) DO UPDATE SET value = (upsert_test_2_alias.value::int * 2)::text; DEBUG: Creating router plan INSERT INTO upsert_test_2 (key, value) VALUES (1, '5') ON CONFLICT(key) DO UPDATE SET value = (upsert_test_2.value::int * 3)::text; DEBUG: Creating router plan INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1); DEBUG: subqueries are not supported within INSERT queries HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: generating subplan XXX_1 for subquery SELECT count(*) AS count FROM insert_select_single_shard_table.upsert_test_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: INSERT INTO insert_select_single_shard_table.upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT(unique_col) DO UPDATE SET other_col = (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) DEBUG: subqueries are not supported within INSERT queries HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. ERROR: subqueries are not supported within INSERT queries HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) DO UPDATE SET other_col = random()::int; DEBUG: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE ERROR: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int; DEBUG: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE INSERT INTO upsert_test_1 VALUES (3, 5, 7); DEBUG: Creating router plan INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) WHERE unique_col = random()::int DO UPDATE SET other_col = 5; DEBUG: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE CREATE TABLE upsert_test_3 (key_1 int, key_2 bigserial, value text DEFAULT 'default_value', PRIMARY KEY (key_1, key_2)); DEBUG: CREATE TABLE will create implicit sequence "upsert_test_3_key_2_seq" for serial column "upsert_test_3.key_2" DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "upsert_test_3_pkey" for table "upsert_test_3" SELECT create_distributed_table('upsert_test_3', null); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO upsert_test_3 VALUES (1, DEFAULT, '1') RETURNING *; DEBUG: Creating router plan key_1 | key_2 | value --------------------------------------------------------------------- 1 | 1 | 1 (1 row) INSERT INTO upsert_test_3 VALUES (5, DEFAULT, DEFAULT) RETURNING *; DEBUG: Creating router plan key_1 | key_2 | value --------------------------------------------------------------------- 5 | 2 | default_value (1 row) SET client_min_messages TO DEBUG1; INSERT INTO upsert_test_3 SELECT 7, other_col, 'harcoded_text_value' FROM upsert_test_1 RETURNING *; key_1 | key_2 | value --------------------------------------------------------------------- 7 | 5 | harcoded_text_value (1 row) SET client_min_messages TO DEBUG2; -- test upsert with INSERT .. SELECT queries SET client_min_messages TO DEBUG1; INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col) DO UPDATE SET other_col = upsert_test_1.other_col + 1; -- Fails due to https://github.com/citusdata/citus/issues/6826. INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col) DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1); ERROR: cannot execute a distributed query from a query on a shard DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: while executing command on localhost:xxxxx SET client_min_messages TO DEBUG2; INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col) DO UPDATE SET other_col = random()::int; ERROR: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col) DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int; ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE SELECT reload_tables(); reload_tables --------------------------------------------------------------------- (1 row) ALTER TABLE nullkey_c1_t1 ADD PRIMARY KEY (a); DEBUG: ALTER TABLE / ADD PRIMARY KEY will create implicit index "nullkey_c1_t1_pkey" for table "nullkey_c1_t1" DEBUG: verifying table "nullkey_c1_t1" ALTER TABLE distributed_table_c1_t1 ADD PRIMARY KEY (a,b); DEBUG: ALTER TABLE / ADD PRIMARY KEY will create implicit index "distributed_table_c1_t1_pkey" for table "distributed_table_c1_t1" DEBUG: verifying table "distributed_table_c1_t1" INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM nullkey_c1_t2 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a) DO UPDATE SET a = t1.a + 10; DEBUG: Creating router plan DEBUG: distributed statement: INSERT INTO insert_select_single_shard_table.nullkey_c1_t1_1820000 AS t1 (a, b) SELECT t3.a, t3.b FROM (insert_select_single_shard_table.nullkey_c1_t2_1820001 t2 JOIN insert_select_single_shard_table.reference_table_1820003 t3 ON ((t2.a OPERATOR(pg_catalog.=) t3.a))) ON CONFLICT(a) DO UPDATE SET a = (t1.a OPERATOR(pg_catalog.+) 10) SET client_min_messages TO DEBUG1; INSERT INTO distributed_table_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM nullkey_c1_t2 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a, b) DO UPDATE SET b = t1.b + 10; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a) DO UPDATE SET a = t1.a + 10; DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT DEBUG: Collecting INSERT ... SELECT results on coordinator -- This also fails due to https://github.com/citusdata/citus/issues/6826. INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) WHERE t2.a = 3 ON CONFLICT (a) DO UPDATE SET a = (SELECT max(b)+1 FROM distributed_table_c1_t1 WHERE a = 3); DEBUG: INSERT target relation and all source relations of the SELECT must be colocated in distributed INSERT ... SELECT DEBUG: Collecting INSERT ... SELECT results on coordinator ERROR: cannot execute a distributed query from a query on a shard DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: while executing command on localhost:xxxxx SET client_min_messages TO DEBUG2; SELECT avg(a), avg(b) FROM distributed_table_c1_t1; DEBUG: Router planner cannot handle multi-shard select queries avg | avg --------------------------------------------------------------------- 5.0000000000000000 | 9.2857142857142857 (1 row) SELECT avg(a), avg(b) FROM nullkey_c1_t1; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan avg | avg --------------------------------------------------------------------- 7.5000000000000000 | 4.1666666666666667 (1 row) SELECT avg(a), avg(b) FROM nullkey_c1_t2; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan avg | avg --------------------------------------------------------------------- 4.5000000000000000 | 4.5000000000000000 (1 row) SELECT * FROM upsert_test_1 ORDER BY unique_col; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan unique_col | other_col | third_col --------------------------------------------------------------------- 3 | 6 | 7 (1 row) SELECT * FROM upsert_test_2 ORDER BY key; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan key | value --------------------------------------------------------------------- 1 | 15 (1 row) SELECT * FROM upsert_test_3 ORDER BY key_1, key_2; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan key_1 | key_2 | value --------------------------------------------------------------------- 1 | 1 | 1 5 | 2 | default_value 7 | 5 | harcoded_text_value (3 rows) SET client_min_messages TO WARNING; DROP SCHEMA insert_select_single_shard_table CASCADE;