-- tests behaviour of INSERT INTO ... SELECT with repartitioning CREATE SCHEMA insert_select_repartition; SET search_path TO 'insert_select_repartition'; SET citus.next_shard_id TO 4213581; SET citus.shard_replication_factor TO 1; SET citus.replication_model TO 'streaming'; -- 4 shards, hash distributed. -- Negate distribution column value. SET citus.shard_count TO 4; CREATE TABLE source_table(a int); SELECT create_distributed_table('source_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source_table SELECT * FROM generate_series(1, 10); CREATE TABLE target_table(a int); SELECT create_distributed_table('target_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) SET client_min_messages TO DEBUG2; INSERT INTO target_table SELECT -a FROM source_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an operator in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'a' DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213585 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213583_to_0,repartitioned_results_xxxxx_from_4213584_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213586 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213582_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213587 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_2,repartitioned_results_xxxxx_from_4213582_to_2,repartitioned_results_xxxxx_from_4213584_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213588 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) RESET client_min_messages; SELECT * FROM target_table WHERE a=-1 OR a=-3 OR a=-7 ORDER BY a; a --------------------------------------------------------------------- -7 -3 -1 (3 rows) DROP TABLE source_table, target_table; -- -- range partitioning, composite distribution column -- CREATE TYPE composite_key_type AS (f1 int, f2 text); -- source CREATE TABLE source_table(f1 int, key composite_key_type, value int, mapped_key composite_key_type); SELECT create_distributed_table('source_table', 'key', 'range'); NOTICE: using statement-based replication DETAIL: Streaming replication is supported only for hash-distributed tables. create_distributed_table --------------------------------------------------------------------- (1 row) CALL public.create_range_partitioned_shards('source_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}'); INSERT INTO source_table VALUES (0, (0, 'a'), 1, (0, 'a')); INSERT INTO source_table VALUES (1, (1, 'b'), 2, (26, 'b')); INSERT INTO source_table VALUES (2, (2, 'c'), 3, (3, 'c')); INSERT INTO source_table VALUES (3, (4, 'd'), 4, (27, 'd')); INSERT INTO source_table VALUES (4, (30, 'e'), 5, (30, 'e')); INSERT INTO source_table VALUES (5, (31, 'f'), 6, (31, 'f')); INSERT INTO source_table VALUES (6, (32, 'g'), 50, (8, 'g')); -- target CREATE TABLE target_table(f1 int DEFAULT 0, value int, key composite_key_type PRIMARY KEY); SELECT create_distributed_table('target_table', 'key', 'range'); NOTICE: using statement-based replication DETAIL: Streaming replication is supported only for hash-distributed tables. create_distributed_table --------------------------------------------------------------------- (1 row) CALL public.create_range_partitioned_shards('target_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}'); SET client_min_messages TO DEBUG2; INSERT INTO target_table SELECT f1, value, mapped_key FROM source_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 2 with name 'key' DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0,repartitioned_results_xxxxx_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) RESET client_min_messages; SELECT * FROM target_table ORDER BY key; f1 | value | key --------------------------------------------------------------------- 0 | 1 | (0,a) 2 | 3 | (3,c) 6 | 50 | (8,g) 1 | 2 | (26,b) 3 | 4 | (27,d) 4 | 5 | (30,e) 5 | 6 | (31,f) (7 rows) SELECT * FROM target_table WHERE key = (26, 'b')::composite_key_type; f1 | value | key --------------------------------------------------------------------- 1 | 2 | (26,b) (1 row) -- with explicit column names TRUNCATE target_table; SET client_min_messages TO DEBUG2; INSERT INTO target_table(value, key) SELECT value, mapped_key FROM source_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 2 with name 'key' DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0,repartitioned_results_xxxxx_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) RESET client_min_messages; SELECT * FROM target_table ORDER BY key; f1 | value | key --------------------------------------------------------------------- 0 | 1 | (0,a) 0 | 3 | (3,c) 0 | 50 | (8,g) 0 | 2 | (26,b) 0 | 4 | (27,d) 0 | 5 | (30,e) 0 | 6 | (31,f) (7 rows) -- missing value for a column TRUNCATE target_table; SET client_min_messages TO DEBUG2; INSERT INTO target_table(key) SELECT mapped_key AS key_renamed FROM source_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 1 with name 'key' DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0,repartitioned_results_xxxxx_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) RESET client_min_messages; SELECT * FROM target_table ORDER BY key; f1 | value | key --------------------------------------------------------------------- 0 | | (0,a) 0 | | (3,c) 0 | | (8,g) 0 | | (26,b) 0 | | (27,d) 0 | | (30,e) 0 | | (31,f) (7 rows) -- ON CONFLICT SET client_min_messages TO DEBUG2; INSERT INTO target_table(key) SELECT mapped_key AS key_renamed FROM source_table WHERE (mapped_key).f1 % 2 = 1 ON CONFLICT (key) DO UPDATE SET f1=1; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 1 with name 'key' DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) ON CONFLICT(key) DO UPDATE SET f1 = 1 DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) ON CONFLICT(key) DO UPDATE SET f1 = 1 RESET client_min_messages; SELECT * FROM target_table ORDER BY key; f1 | value | key --------------------------------------------------------------------- 0 | | (0,a) 1 | | (3,c) 0 | | (8,g) 0 | | (26,b) 1 | | (27,d) 0 | | (30,e) 1 | | (31,f) (7 rows) -- missing value for distribution column INSERT INTO target_table(value) SELECT value FROM source_table; ERROR: the partition column of table insert_select_repartition.target_table should have a value DROP TABLE source_table, target_table; -- different column types -- verifies that we add necessary casts, otherwise even shard routing won't -- work correctly and we will see 2 values for the same primary key. CREATE TABLE target_table(col_1 int primary key, col_2 int); SELECT create_distributed_table('target_table','col_1'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO target_table VALUES (1,2), (2,3), (3,4), (4,5), (5,6); CREATE TABLE source_table(col_1 numeric, col_2 numeric, col_3 numeric); SELECT create_distributed_table('source_table','col_1'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source_table VALUES (1,1,1), (3,3,3), (5,5,5); SET client_min_messages TO DEBUG2; INSERT INTO target_table SELECT col_1, col_2 FROM source_table ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'col_1' DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213593 AS citus_table_alias (col_1, col_2) SELECT col_1, col_2 FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213597_to_0,repartitioned_results_xxxxx_from_4213600_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer) ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213594 AS citus_table_alias (col_1, col_2) SELECT col_1, col_2 FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213599_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer) ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 RESET client_min_messages; SELECT * FROM target_table ORDER BY 1; col_1 | col_2 --------------------------------------------------------------------- 1 | 1 2 | 3 3 | 3 4 | 5 5 | 5 (5 rows) DROP TABLE source_table, target_table; -- -- array coercion -- SET citus.shard_count TO 3; CREATE TABLE source_table(a int, mapped_key int, c float[]); SELECT create_distributed_table('source_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source_table VALUES (1, -1, ARRAY[1.1, 2.2, 3.3]), (2, -2, ARRAY[4.5, 5.8]), (3, -3, ARRAY[]::float[]), (4, -4, ARRAY[3.3]); SET citus.shard_count TO 2; CREATE TABLE target_table(a int, b int[]); SELECT create_distributed_table('target_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) SET client_min_messages TO DEBUG1; INSERT INTO target_table SELECT mapped_key, c FROM source_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: performing repartitioned INSERT ... SELECT RESET client_min_messages; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- -4 | {3} -3 | {} -2 | {4,6} -1 | {1,2,3} (4 rows) -- -- worker queries can have more columns than necessary. ExpandWorkerTargetEntry() -- might add additional columns to the target list. -- TRUNCATE target_table; \set VERBOSITY TERSE -- first verify that the SELECT query below fetches 3 projected columns from workers SET citus.log_remote_commands TO true; SET client_min_messages TO DEBUG; CREATE TABLE results AS SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a; DEBUG: Router planner cannot handle multi-shard select queries DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially NOTICE: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213601 source_table WHERE true GROUP BY a NOTICE: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213602 source_table WHERE true GROUP BY a NOTICE: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213603 source_table WHERE true GROUP BY a RESET citus.log_remote_commands; RESET client_min_messages; DROP TABLE results; -- now verify that we don't write the extra columns to the intermediate result files and -- insertion to the target works fine. SET client_min_messages TO DEBUG1; INSERT INTO target_table SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: performing repartitioned INSERT ... SELECT RESET client_min_messages; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- -4 | {-4} -3 | {-3} -2 | {-2} -1 | {-1} (4 rows) -- -- repartitioned INSERT/SELECT followed/preceded by other DML in same transaction -- -- case 1. followed by DELETE TRUNCATE target_table; BEGIN; INSERT INTO target_table SELECT mapped_key, c FROM source_table; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- -4 | {3} -3 | {} -2 | {4,6} -1 | {1,2,3} (4 rows) DELETE FROM target_table; END; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- (0 rows) -- case 2. followed by UPDATE TRUNCATE target_table; BEGIN; INSERT INTO target_table SELECT mapped_key, c FROM source_table; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- -4 | {3} -3 | {} -2 | {4,6} -1 | {1,2,3} (4 rows) UPDATE target_table SET b=array_append(b, a); END; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- -4 | {3,-4} -3 | {-3} -2 | {4,6,-2} -1 | {1,2,3,-1} (4 rows) -- case 3. followed by multi-row INSERT TRUNCATE target_table; BEGIN; INSERT INTO target_table SELECT mapped_key, c FROM source_table; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- -4 | {3} -3 | {} -2 | {4,6} -1 | {1,2,3} (4 rows) INSERT INTO target_table VALUES (-5, ARRAY[10,11]), (-6, ARRAY[11,12]), (-7, ARRAY[999]); END; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- -7 | {999} -6 | {11,12} -5 | {10,11} -4 | {3} -3 | {} -2 | {4,6} -1 | {1,2,3} (7 rows) -- case 4. followed by distributed INSERT/SELECT TRUNCATE target_table; BEGIN; INSERT INTO target_table SELECT mapped_key, c FROM source_table; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- -4 | {3} -3 | {} -2 | {4,6} -1 | {1,2,3} (4 rows) INSERT INTO target_table SELECT * FROM target_table; END; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- -4 | {3} -4 | {3} -3 | {} -3 | {} -2 | {4,6} -2 | {4,6} -1 | {1,2,3} -1 | {1,2,3} (8 rows) -- case 5. preceded by DELETE TRUNCATE target_table; BEGIN; DELETE FROM target_table; INSERT INTO target_table SELECT mapped_key, c FROM source_table; END; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- -4 | {3} -3 | {} -2 | {4,6} -1 | {1,2,3} (4 rows) -- case 6. preceded by UPDATE TRUNCATE target_table; BEGIN; UPDATE target_table SET b=array_append(b, a); INSERT INTO target_table SELECT mapped_key, c FROM source_table; END; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- -4 | {3} -3 | {} -2 | {4,6} -1 | {1,2,3} (4 rows) -- case 7. preceded by multi-row INSERT TRUNCATE target_table; BEGIN; INSERT INTO target_table VALUES (-5, ARRAY[10,11]), (-6, ARRAY[11,12]), (-7, ARRAY[999]); INSERT INTO target_table SELECT mapped_key, c FROM source_table; END; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- -7 | {999} -6 | {11,12} -5 | {10,11} -4 | {3} -3 | {} -2 | {4,6} -1 | {1,2,3} (7 rows) -- case 8. preceded by distributed INSERT/SELECT TRUNCATE target_table; INSERT INTO target_table SELECT mapped_key, c FROM source_table; BEGIN; INSERT INTO target_table SELECT * FROM target_table; INSERT INTO target_table SELECT mapped_key, c FROM source_table; END; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- -4 | {3} -4 | {3} -4 | {3} -3 | {} -3 | {} -3 | {} -2 | {4,6} -2 | {4,6} -2 | {4,6} -1 | {1,2,3} -1 | {1,2,3} -1 | {1,2,3} (12 rows) -- -- repartitioned INSERT/SELECT with RETURNING -- TRUNCATE target_table; SET client_min_messages TO DEBUG1; WITH c AS ( INSERT INTO target_table SELECT mapped_key, c FROM source_table RETURNING *) SELECT * FROM c ORDER by a; DEBUG: generating subplan XXX_1 for CTE c: INSERT INTO insert_select_repartition.target_table (a, b) SELECT mapped_key, c FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer[])) c ORDER BY a DEBUG: performing repartitioned INSERT ... SELECT a | b --------------------------------------------------------------------- -4 | {3} -3 | {} -2 | {4,6} -1 | {1,2,3} (4 rows) RESET client_min_messages; -- -- in combination with CTEs -- TRUNCATE target_table; SET client_min_messages TO DEBUG1; WITH t AS ( SELECT mapped_key, a, c FROM source_table WHERE a > floor(random()) ) INSERT INTO target_table SELECT mapped_key, c FROM t NATURAL JOIN source_table; DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries DEBUG: generating subplan XXX_1 for CTE t: SELECT mapped_key, a, c FROM insert_select_repartition.source_table WHERE ((a)::double precision OPERATOR(pg_catalog.>) floor(random())) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT mapped_key AS a, (c)::integer[] AS b FROM (SELECT t.mapped_key, t.c FROM ((SELECT intermediate_result.mapped_key, intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(mapped_key integer, a integer, c double precision[])) t JOIN insert_select_repartition.source_table USING (mapped_key, a, c))) citus_insert_select_subquery DEBUG: performing repartitioned INSERT ... SELECT RESET client_min_messages; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- -4 | {3} -3 | {} -2 | {4,6} -1 | {1,2,3} (4 rows) DROP TABLE source_table, target_table; -- -- The case where select query has a GROUP BY ... -- SET citus.shard_count TO 4; CREATE TABLE source_table(a int, b int); SELECT create_distributed_table('source_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) SET citus.shard_count TO 3; CREATE TABLE target_table(a int, b int); SELECT create_distributed_table('target_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source_table SELECT floor(i/4), i*i FROM generate_series(1, 20) i; SET client_min_messages TO DEBUG1; INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT RESET client_min_messages; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- 0 | 9 1 | 49 2 | 121 3 | 225 4 | 361 5 | 400 (6 rows) -- -- EXPLAIN output should specify repartitioned INSERT/SELECT -- EXPLAIN INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0) INSERT/SELECT method: repartition -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (cost=43.90..45.90 rows=200 width=8) Group Key: a -> Seq Scan on source_table_4213606 source_table (cost=0.00..32.60 rows=2260 width=8) (10 rows) -- -- Duplicate names in target list -- TRUNCATE target_table; SET client_min_messages TO DEBUG2; INSERT INTO target_table SELECT max(b), max(b) FROM source_table GROUP BY a; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'a' DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213611 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213607_to_1,repartitioned_results_xxxxx_from_4213609_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213612 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_2,repartitioned_results_xxxxx_from_4213607_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RESET client_min_messages; SELECT * FROM target_table ORDER BY a; a | b --------------------------------------------------------------------- 9 | 9 49 | 49 121 | 121 225 | 225 361 | 361 400 | 400 (6 rows) -- -- Prepared INSERT/SELECT -- TRUNCATE target_table; PREPARE insert_plan(int, int) AS INSERT INTO target_table SELECT a, max(b) FROM source_table WHERE a BETWEEN $1 AND $2 GROUP BY a; SET client_min_messages TO DEBUG1; EXECUTE insert_plan(0, 2); DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT EXECUTE insert_plan(0, 2); DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT EXECUTE insert_plan(0, 2); DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT EXECUTE insert_plan(0, 2); DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT EXECUTE insert_plan(0, 2); DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT EXECUTE insert_plan(0, 2); DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT EXECUTE insert_plan(2, 4); DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT EXECUTE insert_plan(2, 4); DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT EXECUTE insert_plan(2, 4); DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT EXECUTE insert_plan(2, 4); DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT EXECUTE insert_plan(2, 4); DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT EXECUTE insert_plan(2, 4); DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT RESET client_min_messages; SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; a | count | distinct_values --------------------------------------------------------------------- 0 | 6 | 1 1 | 6 | 1 2 | 12 | 1 3 | 6 | 1 4 | 6 | 1 (5 rows) -- -- INSERT/SELECT in CTE -- TRUNCATE target_table; SET client_min_messages TO DEBUG2; WITH r AS ( INSERT INTO target_table SELECT * FROM source_table RETURNING * ) INSERT INTO target_table SELECT source_table.a, max(source_table.b) FROM source_table NATURAL JOIN r GROUP BY source_table.a; DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries DEBUG: generating subplan XXX_1 for CTE r: INSERT INTO insert_select_repartition.target_table (a, b) SELECT a, b FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, max AS b FROM (SELECT source_table.a, max(source_table.b) AS max FROM (insert_select_repartition.source_table JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) r USING (a, b)) GROUP BY source_table.a) citus_insert_select_subquery DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'a' DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213611 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213607_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213612 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213609_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b DEBUG: partitioning SELECT query by column index 0 with name 'a' DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213611 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213607_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213612 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213609_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RESET client_min_messages; SELECT * FROM target_table ORDER BY a, b; a | b --------------------------------------------------------------------- 0 | 1 0 | 4 0 | 9 0 | 9 1 | 16 1 | 25 1 | 36 1 | 49 1 | 49 2 | 64 2 | 81 2 | 100 2 | 121 2 | 121 3 | 144 3 | 169 3 | 196 3 | 225 3 | 225 4 | 256 4 | 289 4 | 324 4 | 361 4 | 361 5 | 400 5 | 400 (26 rows) DROP TABLE source_table, target_table; -- -- Constraint failure and rollback -- SET citus.shard_count TO 4; CREATE TABLE source_table(a int, b int); SELECT create_distributed_table('source_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source_table SELECT i, i * i FROM generate_series(1, 10) i; UPDATE source_table SET b = NULL where b IN (9, 4); SET citus.shard_replication_factor TO 2; CREATE TABLE target_table(a int, b int not null); SELECT create_distributed_table('target_table', 'a', 'range'); NOTICE: using statement-based replication create_distributed_table --------------------------------------------------------------------- (1 row) CALL public.create_range_partitioned_shards('target_table', '{0,3,6,9}','{2,5,8,50}'); INSERT INTO target_table VALUES (11,9), (22,4); EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: repartition -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_table_4213613 source_table (8 rows) EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table WHERE b IS NOT NULL; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: repartition -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_table_4213613 source_table Filter: (b IS NOT NULL) (9 rows) BEGIN; SAVEPOINT s1; INSERT INTO target_table SELECT * FROM source_table; ERROR: null value in column "b" violates not-null constraint ROLLBACK TO SAVEPOINT s1; INSERT INTO target_table SELECT * FROM source_table WHERE b IS NOT NULL; END; SELECT * FROM target_table ORDER BY b; a | b --------------------------------------------------------------------- 1 | 1 22 | 4 11 | 9 4 | 16 5 | 25 6 | 36 7 | 49 8 | 64 9 | 81 10 | 100 (10 rows) -- verify that values have been replicated to both replicas SELECT * FROM run_command_on_placements('target_table', 'select count(*) from %s') ORDER BY shardid, nodeport; nodename | nodeport | shardid | success | result --------------------------------------------------------------------- localhost | 57637 | 4213617 | t | 1 localhost | 57638 | 4213617 | t | 1 localhost | 57637 | 4213618 | t | 2 localhost | 57638 | 4213618 | t | 2 localhost | 57637 | 4213619 | t | 3 localhost | 57638 | 4213619 | t | 3 localhost | 57637 | 4213620 | t | 4 localhost | 57638 | 4213620 | t | 4 (8 rows) -- -- Multiple casts in the SELECT query -- TRUNCATE target_table; SET client_min_messages TO DEBUG2; INSERT INTO target_table SELECT 1.12, b::bigint FROM source_table WHERE b IS NOT NULL; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'a' DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213617 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213613_to_0,repartitioned_results_xxxxx_from_4213614_to_0,repartitioned_results_xxxxx_from_4213615_to_0,repartitioned_results_xxxxx_from_4213616_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RESET client_min_messages; SELECT * FROM target_table ORDER BY a, b; a | b --------------------------------------------------------------------- 1 | 1 1 | 16 1 | 25 1 | 36 1 | 49 1 | 64 1 | 81 1 | 100 (8 rows) -- -- ROLLBACK after out of range error -- TRUNCATE target_table; BEGIN; INSERT INTO target_table SELECT a * 10, b FROM source_table WHERE b IS NOT NULL; ERROR: could not find shard for partition column value END; SELECT max(result) FROM run_command_on_placements('target_table', 'select count(*) from %s'); max --------------------------------------------------------------------- 0 (1 row) DROP TABLE source_table, target_table; -- -- Range partitioned target's ranges doesn't cover the whole range -- SET citus.shard_replication_factor TO 2; SET citus.replication_model TO 'statement'; SET citus.shard_count TO 4; CREATE TABLE source_table(a int, b int); SELECT create_distributed_table('source_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source_table SELECT i, i * i FROM generate_series(1, 10) i; SET citus.shard_replication_factor TO 2; CREATE TABLE target_table(b int not null, a float); SELECT create_distributed_table('target_table', 'a', 'range'); create_distributed_table --------------------------------------------------------------------- (1 row) CALL public.create_range_partitioned_shards('target_table', '{0.0,3.5,6.5,9.5}','{2.9,5.9,8.9,50.0}'); INSERT INTO target_table SELECT b, a+0.6 FROM source_table; SELECT * FROM target_table ORDER BY a; b | a --------------------------------------------------------------------- 1 | 1.6 4 | 2.6 9 | 3.6 16 | 4.6 25 | 5.6 36 | 6.6 49 | 7.6 64 | 8.6 81 | 9.6 100 | 10.6 (10 rows) -- verify that values have been replicated to both replicas, and that each -- replica has received correct number of rows SELECT * FROM run_command_on_placements('target_table', 'select count(*) from %s') ORDER BY shardid, nodeport; nodename | nodeport | shardid | success | result --------------------------------------------------------------------- localhost | 57637 | 4213625 | t | 2 localhost | 57638 | 4213625 | t | 2 localhost | 57637 | 4213626 | t | 3 localhost | 57638 | 4213626 | t | 3 localhost | 57637 | 4213627 | t | 3 localhost | 57638 | 4213627 | t | 3 localhost | 57637 | 4213628 | t | 2 localhost | 57638 | 4213628 | t | 2 (8 rows) DROP TABLE source_table, target_table; -- -- Select column names should be unique -- SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 4; CREATE TABLE source_table(a int, b int); SELECT create_distributed_table('source_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) SET citus.shard_count TO 3; CREATE TABLE target_table(a int, b int, c int, d int, e int, f int); SELECT create_distributed_table('target_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO source_table SELECT i, i * i FROM generate_series(1, 10) i; SET client_min_messages TO DEBUG2; INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table; DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'a' DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213633 AS citus_table_alias (a, b, c, d) SELECT a, b, c, d FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213629_to_0,repartitioned_results_xxxxx_from_4213630_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer, c integer, d integer) DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213634 AS citus_table_alias (a, b, c, d) SELECT a, b, c, d FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213630_to_1,repartitioned_results_xxxxx_from_4213631_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer, c integer, d integer) DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213635 AS citus_table_alias (a, b, c, d) SELECT a, b, c, d FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213632_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer, c integer, d integer) RESET client_min_messages; SELECT count(*) FROM target_table; count --------------------------------------------------------------------- 10 (1 row) -- -- Disable repartitioned insert/select -- TRUNCATE target_table; SET citus.enable_repartitioned_insert_select TO OFF; EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_table_4213629 source_table (8 rows) SET client_min_messages TO DEBUG2; INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table; DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Collecting INSERT ... SELECT results on coordinator RESET client_min_messages; SELECT count(*) FROM target_table; count --------------------------------------------------------------------- 10 (1 row) SET citus.enable_repartitioned_insert_select TO ON; EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: repartition -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on source_table_4213629 source_table (8 rows) DROP TABLE source_table, target_table; SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE;