-- -- MULTI_INSERT_SELECT_CONFLICT -- -- This test file has an alternative output because of the change in the -- display of SQL-standard function's arguments in INSERT/SELECT in PG15. -- The alternative output can be deleted when we drop support for PG14 -- SHOW server_version \gset SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15; server_version_ge_15 --------------------------------------------------------------------- f (1 row) CREATE SCHEMA on_conflict; SET search_path TO on_conflict, public; SET citus.next_shard_id TO 1900000; SET citus.shard_replication_factor TO 1; CREATE TABLE target_table(col_1 int primary key, col_2 int); SELECT create_distributed_table('target_table','col_1'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO target_table VALUES(1,2),(2,3),(3,4),(4,5),(5,6); CREATE TABLE source_table_1(col_1 int primary key, col_2 int, col_3 int); SELECT create_distributed_table('source_table_1','col_1'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source_table_1 VALUES(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5); CREATE TABLE source_table_2(col_1 int, col_2 int, col_3 int); SELECT create_distributed_table('source_table_2','col_1'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source_table_2 VALUES(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10); SET client_min_messages to debug1; -- Generate series directly on the coordinator and on conflict do nothing INSERT INTO target_table (col_1, col_2) SELECT s, s FROM generate_series(1,10) s ON CONFLICT DO NOTHING; DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Collecting INSERT ... SELECT results on coordinator -- Generate series directly on the coordinator and on conflict update the target table INSERT INTO target_table (col_1, col_2) SELECT s, s FROM generate_series(1,10) s ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1; DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Collecting INSERT ... SELECT results on coordinator -- Since partition columns do not match, pull the data to the coordinator -- and do not change conflicted values INSERT INTO target_table SELECT col_2, col_3 FROM source_table_1 ON CONFLICT DO NOTHING; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: performing repartitioned INSERT ... SELECT -- Since partition columns do not match, pull the data to the coordinator -- and update the non-partition column. Query is wrapped by CTE to return -- ordered result. WITH inserted_table AS ( INSERT INTO target_table SELECT col_2, col_3 FROM source_table_1 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING * ) SELECT * FROM inserted_table ORDER BY 1; DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_2, col_3 FROM on_conflict.source_table_1 ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 RETURNING target_table.col_1, target_table.col_2 DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: performing repartitioned INSERT ... SELECT col_1 | col_2 --------------------------------------------------------------------- 1 | 1 2 | 2 3 | 3 4 | 4 5 | 5 (5 rows) -- Subquery should be recursively planned due to the limit and do nothing on conflict INSERT INTO target_table SELECT col_1, col_2 FROM ( SELECT col_1, col_2, col_3 FROM source_table_1 LIMIT 5 ) as foo ON CONFLICT DO NOTHING; DEBUG: cannot push down this subquery DETAIL: Limit clause is currently unsupported when a subquery references a column from another query DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo DEBUG: Collecting INSERT ... SELECT results on coordinator -- Subquery should be recursively planned due to the limit and update on conflict -- Query is wrapped by CTE to return ordered result. WITH inserted_table AS ( INSERT INTO target_table SELECT col_1, col_2 FROM ( SELECT col_1, col_2, col_3 FROM source_table_1 LIMIT 5 ) as foo ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING * ) SELECT * FROM inserted_table ORDER BY 1; DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 RETURNING target_table.col_1, target_table.col_2 DEBUG: cannot push down this subquery DETAIL: Limit clause is currently unsupported when a subquery references a column from another query DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: Collecting INSERT ... SELECT results on coordinator col_1 | col_2 --------------------------------------------------------------------- 1 | 1 2 | 2 3 | 3 4 | 4 5 | 5 (5 rows) -- Test with multiple subqueries. Query is wrapped by CTE to return ordered result. WITH inserted_table AS ( INSERT INTO target_table SELECT col_1, col_2 FROM ( (SELECT col_1, col_2, col_3 FROM source_table_1 LIMIT 5) UNION (SELECT col_1, col_2, col_3 FROM source_table_2 LIMIT 5) ) as foo ON CONFLICT(col_1) DO UPDATE SET col_2 = 0 RETURNING * ) SELECT * FROM inserted_table ORDER BY 1; DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM ((SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) UNION (SELECT source_table_2.col_1, source_table_2.col_2, source_table_2.col_3 FROM on_conflict.source_table_2 LIMIT 5)) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = 0 RETURNING target_table.col_1, target_table.col_2 DEBUG: cannot push down this subquery DETAIL: Limit clause is currently unsupported when a subquery references a column from another query DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5 DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_2 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_2 LIMIT 5 DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) UNION SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: Collecting INSERT ... SELECT results on coordinator col_1 | col_2 --------------------------------------------------------------------- 1 | 0 2 | 0 3 | 0 4 | 0 5 | 0 6 | 0 7 | 0 8 | 0 9 | 0 10 | 0 (10 rows) -- Get the select part from cte and do nothing on conflict WITH cte AS MATERIALIZED ( SELECT col_1, col_2 FROM source_table_1 ) INSERT INTO target_table SELECT * FROM cte ON CONFLICT DO NOTHING; DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: generating subplan XXX_1 for CTE cte: SELECT col_1, col_2 FROM on_conflict.source_table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte.col_1, cte.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte) citus_insert_select_subquery DEBUG: Collecting INSERT ... SELECT results on coordinator -- Get the select part from cte and update on conflict WITH cte AS MATERIALIZED ( SELECT col_1, col_2 FROM source_table_1 ) INSERT INTO target_table SELECT * FROM cte ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1; DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: generating subplan XXX_1 for CTE cte: SELECT col_1, col_2 FROM on_conflict.source_table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte.col_1, cte.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte) citus_insert_select_subquery DEBUG: Collecting INSERT ... SELECT results on coordinator SELECT * FROM target_table ORDER BY 1; col_1 | col_2 --------------------------------------------------------------------- 1 | 2 2 | 3 3 | 4 4 | 5 5 | 6 6 | 0 7 | 0 8 | 0 9 | 0 10 | 0 (10 rows) -- Test with multiple CTEs WITH cte AS( SELECT col_1, col_2 FROM source_table_1 ), cte_2 AS( SELECT col_1, col_2 FROM source_table_2 ) INSERT INTO target_table ((SELECT * FROM cte) UNION (SELECT * FROM cte_2)) ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1; DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: CTE cte is going to be inlined via distributed planning DEBUG: CTE cte_2 is going to be inlined via distributed planning DEBUG: performing repartitioned INSERT ... SELECT SELECT * FROM target_table ORDER BY 1; col_1 | col_2 --------------------------------------------------------------------- 1 | 2 2 | 3 3 | 4 4 | 5 5 | 6 6 | 7 7 | 8 8 | 9 9 | 10 10 | 11 (10 rows) WITH inserted_table AS MATERIALIZED ( WITH cte AS MATERIALIZED ( SELECT col_1, col_2, col_3 FROM source_table_1 ), cte_2 AS MATERIALIZED ( SELECT col_1, col_2 FROM cte ) INSERT INTO target_table SELECT * FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1 RETURNING * ) SELECT * FROM inserted_table ORDER BY 1; DEBUG: generating subplan XXX_1 for CTE inserted_table: WITH cte AS MATERIALIZED (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1), cte_2 AS MATERIALIZED (SELECT cte.col_1, cte.col_2 FROM cte) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = (excluded.col_2 OPERATOR(pg_catalog.+) 1) RETURNING target_table.col_1, target_table.col_2 DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: generating subplan XXX_1 for CTE cte: SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) cte DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte_2.col_1, cte_2.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_2) citus_insert_select_subquery DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: Collecting INSERT ... SELECT results on coordinator col_1 | col_2 --------------------------------------------------------------------- 1 | 2 2 | 3 3 | 4 4 | 5 5 | 6 (5 rows) WITH cte AS MATERIALIZED ( WITH basic AS MATERIALIZED ( SELECT col_1, col_2 FROM source_table_1 ) INSERT INTO target_table (SELECT * FROM basic) ON CONFLICT DO NOTHING RETURNING * ) UPDATE target_table SET col_2 = 4 WHERE col_1 IN (SELECT col_1 FROM cte); DEBUG: generating subplan XXX_1 for CTE cte: WITH basic AS MATERIALIZED (SELECT source_table_1.col_1, source_table_1.col_2 FROM on_conflict.source_table_1) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM basic ON CONFLICT DO NOTHING RETURNING target_table.col_1, target_table.col_2 DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: generating subplan XXX_1 for CTE basic: SELECT col_1, col_2 FROM on_conflict.source_table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT basic.col_1, basic.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) basic) citus_insert_select_subquery DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE on_conflict.target_table SET col_2 = 4 WHERE (col_1 OPERATOR(pg_catalog.=) ANY (SELECT cte.col_1 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte)) DEBUG: Collecting INSERT ... SELECT results on coordinator RESET client_min_messages; -- Following query is supported by using repartition join for the insert/select SELECT coordinator_plan($Q$ EXPLAIN (costs off) WITH cte AS ( SELECT col_1, col_2 FROM source_table_1 ) INSERT INTO target_table SELECT source_table_1.col_1, source_table_1.col_2 FROM cte, source_table_1 WHERE cte.col_1 = source_table_1.col_1 ON CONFLICT DO NOTHING; $Q$); coordinator_plan --------------------------------------------------------------------- Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: repartition -> Custom Scan (Citus Adaptive) Task Count: 4 (4 rows) -- Tests with foreign key to reference table CREATE TABLE test_ref_table (key int PRIMARY KEY); SELECT create_reference_table('test_ref_table'); create_reference_table --------------------------------------------------------------------- (1 row) INSERT INTO test_ref_table VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10); ALTER TABLE target_table ADD CONSTRAINT fkey FOREIGN KEY (col_1) REFERENCES test_ref_table(key) ON DELETE CASCADE; BEGIN; TRUNCATE test_ref_table CASCADE; NOTICE: truncate cascades to table "target_table" INSERT INTO target_table SELECT col_2, col_1 FROM source_table_1 ON CONFLICT (col_1) DO UPDATE SET col_2 = 55 RETURNING *; ERROR: insert or update on table "target_table_xxxxxxx" violates foreign key constraint "fkey_xxxxxxx" DETAIL: Key (col_1)=(X) is not present in table "test_ref_table_xxxxxxx". CONTEXT: while executing command on localhost:xxxxx ROLLBACK; BEGIN; DELETE FROM test_ref_table WHERE key > 10; WITH r AS ( INSERT INTO target_table SELECT col_2, col_1 FROM source_table_1 ON CONFLICT (col_1) DO UPDATE SET col_2 = 1 RETURNING *) SELECT * FROM r ORDER BY col_1; col_1 | col_2 --------------------------------------------------------------------- 1 | 1 2 | 1 3 | 1 4 | 1 5 | 1 (5 rows) ROLLBACK; -- Following two queries are supported since we no not modify but only select from -- the target_table after modification on test_ref_table. BEGIN; TRUNCATE test_ref_table CASCADE; NOTICE: truncate cascades to table "target_table" INSERT INTO source_table_1 SELECT col_2, col_1 FROM target_table ON CONFLICT (col_1) DO UPDATE SET col_2 = 55 RETURNING *; col_1 | col_2 | col_3 --------------------------------------------------------------------- (0 rows) ROLLBACK; BEGIN; DELETE FROM test_ref_table; INSERT INTO source_table_1 SELECT col_2, col_1 FROM target_table ON CONFLICT (col_1) DO UPDATE SET col_2 = 55 RETURNING *; col_1 | col_2 | col_3 --------------------------------------------------------------------- (0 rows) ROLLBACK; -- INSERT .. SELECT with different column types CREATE TABLE source_table_3(col_1 numeric, col_2 numeric, col_3 numeric); SELECT create_distributed_table('source_table_3','col_1'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source_table_3 VALUES(1,11,1),(2,22,2),(3,33,3),(4,44,4),(5,55,5); CREATE TABLE source_table_4(id int, arr_val text[]); SELECT create_distributed_table('source_table_4','id'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source_table_4 VALUES(1, '{"abc","cde","efg"}'), (2, '{"xyz","tvu"}'); CREATE TABLE target_table_2(id int primary key, arr_val char(10)[]); SELECT create_distributed_table('target_table_2','id'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO target_table_2 VALUES(1, '{"abc","def","gyx"}'); SET client_min_messages to debug1; INSERT INTO target_table SELECT col_1, col_2 FROM source_table_3 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. DEBUG: performing repartitioned INSERT ... SELECT SELECT * FROM target_table ORDER BY 1; col_1 | col_2 --------------------------------------------------------------------- 1 | 11 2 | 22 3 | 33 4 | 44 5 | 55 6 | 7 7 | 8 8 | 9 9 | 10 10 | 11 (10 rows) INSERT INTO target_table_2 SELECT * FROM source_table_4 ON CONFLICT DO NOTHING; SELECT * FROM target_table_2 ORDER BY 1; id | arr_val --------------------------------------------------------------------- 1 | {"abc ","def ","gyx "} 2 | {"xyz ","tvu "} (2 rows) RESET client_min_messages; -- Test with shard_replication_factor = 2 SET citus.shard_replication_factor to 2; DROP TABLE target_table, source_table_1, source_table_2; CREATE TABLE target_table(col_1 int primary key, col_2 int); SELECT create_distributed_table('target_table','col_1'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO target_table VALUES(1,2),(2,3),(3,4),(4,5),(5,6); CREATE TABLE source_table_1(col_1 int, col_2 int, col_3 int); SELECT create_distributed_table('source_table_1','col_1'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source_table_1 VALUES(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5); CREATE TABLE source_table_2(col_1 int, col_2 int, col_3 int); SELECT create_distributed_table('source_table_2','col_1'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source_table_2 VALUES(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10); SET client_min_messages to debug1; -- Generate series directly on the coordinator and on conflict do nothing INSERT INTO target_table (col_1, col_2) SELECT s, s FROM generate_series(1,10) s ON CONFLICT DO NOTHING; DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Collecting INSERT ... SELECT results on coordinator -- Test with multiple subqueries INSERT INTO target_table SELECT col_1, col_2 FROM ( (SELECT col_1, col_2, col_3 FROM source_table_1 LIMIT 5) UNION (SELECT col_1, col_2, col_3 FROM source_table_2 LIMIT 5) ) as foo ON CONFLICT(col_1) DO UPDATE SET col_2 = 0; DEBUG: cannot push down this subquery DETAIL: Limit clause is currently unsupported when a subquery references a column from another query DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5 DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_2 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_2 LIMIT 5 DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) UNION SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo DEBUG: Collecting INSERT ... SELECT results on coordinator SELECT * FROM target_table ORDER BY 1; col_1 | col_2 --------------------------------------------------------------------- 1 | 0 2 | 0 3 | 0 4 | 0 5 | 0 6 | 0 7 | 0 8 | 0 9 | 0 10 | 0 (10 rows) WITH cte AS MATERIALIZED( SELECT col_1, col_2, col_3 FROM source_table_1 ), cte_2 AS MATERIALIZED( SELECT col_1, col_2 FROM cte ) INSERT INTO target_table SELECT * FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1; DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: generating subplan XXX_1 for CTE cte: SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) cte DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte_2.col_1, cte_2.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_2) citus_insert_select_subquery DEBUG: Collecting INSERT ... SELECT results on coordinator SELECT * FROM target_table ORDER BY 1; col_1 | col_2 --------------------------------------------------------------------- 1 | 2 2 | 3 3 | 4 4 | 5 5 | 6 6 | 0 7 | 0 8 | 0 9 | 0 10 | 0 (10 rows) -- make sure that even if COPY switchover happens -- the results are correct SET citus.copy_switchover_threshold TO 1; TRUNCATE target_table; -- load some data to make sure copy commands switch over connections INSERT INTO target_table SELECT i,0 FROM generate_series(0,500)i; DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Collecting INSERT ... SELECT results on coordinator -- make sure that SELECT only uses 1 connection 1 node -- yet still COPY commands use 1 connection per co-located -- intermediate result file SET citus.max_adaptive_executor_pool_size TO 1; INSERT INTO target_table SELECT * FROM target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1; DEBUG: cannot push down this subquery DETAIL: Limit clause is currently unsupported when a subquery references a column from another query DEBUG: push down of limit count: 10000 DEBUG: Collecting INSERT ... SELECT results on coordinator SELECT DISTINCT col_2 FROM target_table; col_2 --------------------------------------------------------------------- 1 (1 row) WITH cte_1 AS (INSERT INTO target_table SELECT * FROM target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1 RETURNING *) SELECT DISTINCT col_2 FROM cte_1; DEBUG: generating subplan XXX_1 for CTE cte_1: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM on_conflict.target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = (excluded.col_2 OPERATOR(pg_catalog.+) 1) RETURNING target_table.col_1, target_table.col_2 DEBUG: cannot push down this subquery DETAIL: Limit clause is currently unsupported when a subquery references a column from another query DEBUG: push down of limit count: 10000 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT DISTINCT col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_1 DEBUG: Collecting INSERT ... SELECT results on coordinator col_2 --------------------------------------------------------------------- 2 (1 row) RESET client_min_messages; DROP SCHEMA on_conflict CASCADE; NOTICE: drop cascades to 8 other objects DETAIL: drop cascades to table test_ref_table drop cascades to table test_ref_table_1900012 drop cascades to table source_table_3 drop cascades to table source_table_4 drop cascades to table target_table_2 drop cascades to table target_table drop cascades to table source_table_1 drop cascades to table source_table_2