diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 655f7ebcb..2eecb6e9f 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -1665,6 +1665,13 @@ AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, List *projectedEntries = NIL; List *nonProjectedEntries = NIL; + /* + * ReorderInsertSelectTargetLists() makes sure that first few columns of + * the SELECT query match the insert targets. It might contain additional + * items for GROUP BY, etc. + */ + Assert(list_length(insertTargetList) <= list_length(selectTargetList)); + Relation distributedRelation = table_open(targetRelationId, RowExclusiveLock); TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation); diff --git a/src/test/regress/expected/insert_select_into_local_table.out b/src/test/regress/expected/insert_select_into_local_table.out index 4630e5cc8..5065cbb62 100644 --- a/src/test/regress/expected/insert_select_into_local_table.out +++ b/src/test/regress/expected/insert_select_into_local_table.out @@ -149,6 +149,67 @@ SELECT * FROM non_dist_unique ORDER BY 1; 5 | 8 (5 rows) +INSERT INTO non_dist_unique +SELECT a+1, b FROM dist_table +UNION ALL +SELECT a+100, b FROM dist_table +ON CONFLICT (a) DO NOTHING; +SELECT * FROM non_dist_unique ORDER BY 1; + a | b +--------------------------------------------------------------------- + 1 | 6 + 2 | 7 + 3 | 14 + 4 | 15 + 5 | 8 + 101 | 6 + 102 | 7 + 103 | 8 +(8 rows) + +INSERT INTO non_dist_unique +SELECT a+1, b FROM dist_table +UNION ALL +SELECT a+100, b FROM dist_table +ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + 1; +SELECT * FROM non_dist_unique ORDER BY 1; + a | b +--------------------------------------------------------------------- + 1 | 6 + 2 | 7 + 3 | 8 + 4 | 9 + 5 | 8 + 101 | 7 + 102 | 8 + 103 | 9 +(8 rows) + +WITH cte1 AS (SELECT s FROM generate_series(1,10) s) +INSERT INTO non_dist_unique +WITH cte2 AS (SELECT s FROM generate_series(1,10) s) +SELECT a+1, b FROM dist_table WHERE b IN (SELECT s FROM cte1) +UNION ALL +SELECT s, s FROM cte1 +ON CONFLICT (a) DO NOTHING; +SELECT * FROM non_dist_unique ORDER BY 1; + a | b +--------------------------------------------------------------------- + 1 | 6 + 2 | 7 + 3 | 8 + 4 | 9 + 5 | 8 + 6 | 6 + 7 | 7 + 8 | 8 + 9 | 9 + 10 | 10 + 101 | 7 + 102 | 8 + 103 | 9 +(13 rows) + DROP TABLE non_dist_unique; -- test INSERT INTO a table with DEFAULT CREATE TABLE non_dist_default (a INT, c TEXT DEFAULT 'def'); @@ -168,6 +229,16 @@ SELECT * FROM non_dist_default ORDER BY 1, 2; 3 | def (3 rows) +SELECT alter_table_set_access_method('non_dist_default', 'columnar'); +NOTICE: creating a new table for insert_select_into_local_table.non_dist_default +NOTICE: moving the data of insert_select_into_local_table.non_dist_default +NOTICE: dropping the old insert_select_into_local_table.non_dist_default +NOTICE: renaming the new table to insert_select_into_local_table.non_dist_default + alter_table_set_access_method +--------------------------------------------------------------------- + +(1 row) + INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1; SELECT * FROM non_dist_default ORDER BY 1, 2; a | c @@ -355,104 +426,345 @@ SELECT * FROM non_dist_2 ORDER BY 1, 2; TRUNCATE non_dist_2; -- check issue https://github.com/citusdata/citus/issues/5858 -CREATE TABLE local_table( +CREATE TABLE local_dest_table( col_1 integer, col_2 integer, col_3 text, col_4 text, + drop_col text, col_5 int, col_6 text, - col_7 text, + col_7 text default 'col_7', col_8 text ); -CREATE TABLE dist_table_1( - dist_col integer, +ALTER TABLE local_dest_table DROP COLUMN drop_col; +CREATE TABLE dist_source_table_1( int_col integer, + drop_col text, text_col_1 text, + dist_col integer, text_col_2 text ); -SELECT create_distributed_table('dist_table_1', 'dist_col'); +SELECT create_distributed_table('dist_source_table_1', 'dist_col'); create_distributed_table --------------------------------------------------------------------- (1 row) -INSERT INTO dist_table_1 VALUES (1, 1, 'string', 'string'); -CREATE TABLE dist_table_2( +ALTER TABLE dist_source_table_1 DROP COLUMN drop_col; +INSERT INTO dist_source_table_1 VALUES (1, 'value', 1, 'value'); +INSERT INTO dist_source_table_1 VALUES (2, 'value2', 1, 'value'); +INSERT INTO dist_source_table_1 VALUES (3, 'value', 3, 'value3'); +CREATE TABLE dist_source_table_2( dist_col integer, int_col integer ); -SELECT create_distributed_table('dist_table_2', 'dist_col'); +SELECT create_distributed_table('dist_source_table_2', 'dist_col'); create_distributed_table --------------------------------------------------------------------- (1 row) -INSERT INTO dist_table_2 VALUES (1, 1); -INSERT INTO local_table -SELECT - t1.dist_col, - 1, - 'string', - 'string', - 1, - 'string', - t1.text_col_1, - t1.text_col_2 -FROM dist_table_1 t1 -WHERE t1.int_col IN (SELECT int_col FROM dist_table_2); -INSERT INTO local_table -SELECT - t1.dist_col, - 1, - 'string', - 'string', - 1, - 'string', - t1.text_col_1, - t1.text_col_2 -FROM dist_table_1 t1 -returning *; - col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8 ---------------------------------------------------------------------- - 1 | 1 | string | string | 1 | string | string | string -(1 row) +INSERT INTO dist_source_table_2 VALUES (1, 1); +INSERT INTO dist_source_table_2 VALUES (2, 2); +INSERT INTO dist_source_table_2 VALUES (4, 4); +CREATE TABLE local_source_table_1 AS SELECT * FROM dist_source_table_1; +CREATE TABLE local_source_table_2 AS SELECT * FROM dist_source_table_2; +/* + * query_results_equal compares the effect of two queries on local_dest_table. + * We use this to ensure that INSERT INTO local_dest_table SELECT behaves + * the same when selecting from a regular table (postgres handles it) and + * a distributed table (Citus handles it). + * + * The queries are generated by calling format() on query_table twice, + * once for each source_table argument. + */ +CREATE OR REPLACE FUNCTION query_results_equal(query_template text, source_table_1 text, source_table_2 text) +RETURNS bool +AS $$ +DECLARE + l1 local_dest_table[]; + l2 local_dest_table[]; +BEGIN + /* get the results using source_table_1 as source */ + TRUNCATE local_dest_table; + EXECUTE format(query_template, source_table_1); + SELECT array_agg(l) INTO l1 + FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l; -INSERT INTO local_table (col_3, col_4) SELECT - 'string', - 'string'::text -FROM dist_table_1 t1 -returning *; - col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8 ---------------------------------------------------------------------- - | | string | string | | | | -(1 row) + /* get the results using source_table_2 as source */ + TRUNCATE local_dest_table; + EXECUTE format(query_template, source_table_2); + SELECT array_agg(l) INTO l2 + FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l; -EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) INSERT INTO local_table + RAISE NOTICE 'l2=%', l1; + RAISE NOTICE 'l2=%', l2; + RETURN l1 = l2; +END; +$$ LANGUAGE plpgsql; +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table SELECT t1.dist_col, 1, - 'string', - 'string', - 1, - 'string', + 'string1', + 'string2', + 2, + 'string3', t1.text_col_1, t1.text_col_2 - FROM dist_table_1 t1 - RETURNING *; - QUERY PLAN + FROM %1$s_1 t1 + WHERE t1.int_col IN (SELECT int_col FROM %1$s_2) +$$, 'local_source_table', 'dist_source_table'); +NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal --------------------------------------------------------------------- - Insert on local_table (actual rows=1 loops=1) - -> Custom Scan (Citus Adaptive) (actual rows=1 loops=1) - Task Count: 4 - Tuple data received from nodes: 42 bytes - Tasks Shown: One of 4 - -> Task - Tuple data received from node: 42 bytes - Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on dist_table_1_11235805 t1 (actual rows=1 loops=1) -(9 rows) + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table + SELECT + t1.dist_col, + 1, + 'string1', + 'string2', + 2, + 'string3', + t1.text_col_1, + t1.text_col_2 + FROM %1$s t1 + returning * +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_3, col_4) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_7, col_4) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_4, col_3) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + WHERE dist_col = 1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s + UNION ALL + SELECT + 'string', + int_col + FROM %1$s; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + WITH cte1 AS (SELECT s FROM generate_series(1,10) s) + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s WHERE int_col IN (SELECT s FROM cte1) + UNION ALL + SELECT + 'string', + int_col + FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1) +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s) + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s WHERE int_col IN (SELECT s FROM cte1) + UNION ALL + SELECT + * + FROM cte1 +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_3) + SELECT t1.text_col_1 + FROM %1$s t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8) + SELECT + max(t1.dist_col), + 3, + 'string_3', + 4, + 'string_4', + t1.text_col_1, + 'string_1000' + FROM %1$s t1 + GROUP BY t1.text_col_2, t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(1,3,string_3,,4,string_4,value,string_1000)","(1,3,string_3,,4,string_4,value2,string_1000)","(3,3,string_3,,4,string_4,value,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,3,string_3,,4,string_4,value,string_1000)","(1,3,string_3,,4,string_4,value2,string_1000)","(3,3,string_3,,4,string_4,value,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_7, col_8) + SELECT + t1.text_col_1, + 'string_1000' + FROM dist_source_table_1 t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_6, col_7, col_8) + SELECT + 'string_4', + t1.text_col_1, + 'string_1000' + FROM %1$s t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_5, col_3) + SELECT 12, 'string_11' FROM %1$s t1 + UNION + SELECT int_col, 'string' FROM %1$s; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +-- use a sequence (cannot use query_results_equal, since sequence values would not match) +CREATE SEQUENCE seq; +TRUNCATE local_dest_table; +INSERT INTO local_dest_table (col_5, col_3) +SELECT 12, 'string_11' FROM dist_source_table_1 +UNION +SELECT nextval('seq'), 'string' FROM dist_source_table_1; +SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8; + col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8 +--------------------------------------------------------------------- + | | string | | 1 | | col_7 | + | | string | | 2 | | col_7 | + | | string | | 3 | | col_7 | + | | string_11 | | 12 | | col_7 | +(4 rows) \set VERBOSITY terse DROP SCHEMA insert_select_into_local_table CASCADE; -NOTICE: drop cascades to 8 other objects +NOTICE: drop cascades to 12 other objects diff --git a/src/test/regress/sql/insert_select_into_local_table.sql b/src/test/regress/sql/insert_select_into_local_table.sql index 24121e776..b0974dbc9 100644 --- a/src/test/regress/sql/insert_select_into_local_table.sql +++ b/src/test/regress/sql/insert_select_into_local_table.sql @@ -64,6 +64,30 @@ INSERT INTO non_dist_unique SELECT a+1, b FROM dist_table ON CONFLICT (a) DO NOT SELECT * FROM non_dist_unique ORDER BY 1; INSERT INTO non_dist_unique SELECT a+2, b FROM dist_table ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + non_dist_unique.b; SELECT * FROM non_dist_unique ORDER BY 1; + +INSERT INTO non_dist_unique +SELECT a+1, b FROM dist_table +UNION ALL +SELECT a+100, b FROM dist_table +ON CONFLICT (a) DO NOTHING; +SELECT * FROM non_dist_unique ORDER BY 1; + +INSERT INTO non_dist_unique +SELECT a+1, b FROM dist_table +UNION ALL +SELECT a+100, b FROM dist_table +ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + 1; +SELECT * FROM non_dist_unique ORDER BY 1; + +WITH cte1 AS (SELECT s FROM generate_series(1,10) s) +INSERT INTO non_dist_unique +WITH cte2 AS (SELECT s FROM generate_series(1,10) s) +SELECT a+1, b FROM dist_table WHERE b IN (SELECT s FROM cte1) +UNION ALL +SELECT s, s FROM cte1 +ON CONFLICT (a) DO NOTHING; +SELECT * FROM non_dist_unique ORDER BY 1; + DROP TABLE non_dist_unique; @@ -73,6 +97,7 @@ INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a = 1; SELECT * FROM non_dist_default ORDER BY 1, 2; INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a > 1; SELECT * FROM non_dist_default ORDER BY 1, 2; +SELECT alter_table_set_access_method('non_dist_default', 'columnar'); INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1; SELECT * FROM non_dist_default ORDER BY 1, 2; INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a > 1; @@ -150,78 +175,232 @@ SELECT * FROM non_dist_2 ORDER BY 1, 2; TRUNCATE non_dist_2; -- check issue https://github.com/citusdata/citus/issues/5858 -CREATE TABLE local_table( +CREATE TABLE local_dest_table( col_1 integer, col_2 integer, col_3 text, col_4 text, + drop_col text, col_5 int, col_6 text, - col_7 text, + col_7 text default 'col_7', col_8 text ); -CREATE TABLE dist_table_1( - dist_col integer, +ALTER TABLE local_dest_table DROP COLUMN drop_col; + +CREATE TABLE dist_source_table_1( int_col integer, + drop_col text, text_col_1 text, + dist_col integer, text_col_2 text ); -SELECT create_distributed_table('dist_table_1', 'dist_col'); +SELECT create_distributed_table('dist_source_table_1', 'dist_col'); -INSERT INTO dist_table_1 VALUES (1, 1, 'string', 'string'); +ALTER TABLE dist_source_table_1 DROP COLUMN drop_col; -CREATE TABLE dist_table_2( +INSERT INTO dist_source_table_1 VALUES (1, 'value', 1, 'value'); +INSERT INTO dist_source_table_1 VALUES (2, 'value2', 1, 'value'); +INSERT INTO dist_source_table_1 VALUES (3, 'value', 3, 'value3'); + +CREATE TABLE dist_source_table_2( dist_col integer, int_col integer ); -SELECT create_distributed_table('dist_table_2', 'dist_col'); +SELECT create_distributed_table('dist_source_table_2', 'dist_col'); -INSERT INTO dist_table_2 VALUES (1, 1); +INSERT INTO dist_source_table_2 VALUES (1, 1); +INSERT INTO dist_source_table_2 VALUES (2, 2); +INSERT INTO dist_source_table_2 VALUES (4, 4); -INSERT INTO local_table -SELECT - t1.dist_col, - 1, - 'string', - 'string', - 1, - 'string', - t1.text_col_1, - t1.text_col_2 -FROM dist_table_1 t1 -WHERE t1.int_col IN (SELECT int_col FROM dist_table_2); +CREATE TABLE local_source_table_1 AS SELECT * FROM dist_source_table_1; +CREATE TABLE local_source_table_2 AS SELECT * FROM dist_source_table_2; -INSERT INTO local_table -SELECT - t1.dist_col, - 1, - 'string', - 'string', - 1, - 'string', - t1.text_col_1, - t1.text_col_2 -FROM dist_table_1 t1 -returning *; -INSERT INTO local_table (col_3, col_4) SELECT - 'string', - 'string'::text -FROM dist_table_1 t1 -returning *; +/* + * query_results_equal compares the effect of two queries on local_dest_table. + * We use this to ensure that INSERT INTO local_dest_table SELECT behaves + * the same when selecting from a regular table (postgres handles it) and + * a distributed table (Citus handles it). + * + * The queries are generated by calling format() on query_table twice, + * once for each source_table argument. + */ +CREATE OR REPLACE FUNCTION query_results_equal(query_template text, source_table_1 text, source_table_2 text) +RETURNS bool +AS $$ +DECLARE + l1 local_dest_table[]; + l2 local_dest_table[]; +BEGIN + /* get the results using source_table_1 as source */ + TRUNCATE local_dest_table; + EXECUTE format(query_template, source_table_1); + SELECT array_agg(l) INTO l1 + FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l; -EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) INSERT INTO local_table + /* get the results using source_table_2 as source */ + TRUNCATE local_dest_table; + EXECUTE format(query_template, source_table_2); + SELECT array_agg(l) INTO l2 + FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l; + + RAISE NOTICE 'l2=%', l1; + RAISE NOTICE 'l2=%', l2; + RETURN l1 = l2; +END; +$$ LANGUAGE plpgsql; + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table SELECT t1.dist_col, 1, - 'string', - 'string', - 1, - 'string', + 'string1', + 'string2', + 2, + 'string3', t1.text_col_1, t1.text_col_2 - FROM dist_table_1 t1 - RETURNING *; + FROM %1$s_1 t1 + WHERE t1.int_col IN (SELECT int_col FROM %1$s_2) +$$, 'local_source_table', 'dist_source_table'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table + SELECT + t1.dist_col, + 1, + 'string1', + 'string2', + 2, + 'string3', + t1.text_col_1, + t1.text_col_2 + FROM %1$s t1 + returning * +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_3, col_4) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_7, col_4) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_4, col_3) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + WHERE dist_col = 1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s + UNION ALL + SELECT + 'string', + int_col + FROM %1$s; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + WITH cte1 AS (SELECT s FROM generate_series(1,10) s) + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s WHERE int_col IN (SELECT s FROM cte1) + UNION ALL + SELECT + 'string', + int_col + FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1) +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s) + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s WHERE int_col IN (SELECT s FROM cte1) + UNION ALL + SELECT + * + FROM cte1 +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_3) + SELECT t1.text_col_1 + FROM %1$s t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8) + SELECT + max(t1.dist_col), + 3, + 'string_3', + 4, + 'string_4', + t1.text_col_1, + 'string_1000' + FROM %1$s t1 + GROUP BY t1.text_col_2, t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_7, col_8) + SELECT + t1.text_col_1, + 'string_1000' + FROM dist_source_table_1 t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_6, col_7, col_8) + SELECT + 'string_4', + t1.text_col_1, + 'string_1000' + FROM %1$s t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_5, col_3) + SELECT 12, 'string_11' FROM %1$s t1 + UNION + SELECT int_col, 'string' FROM %1$s; +$$, 'local_source_table_1', 'dist_source_table_1'); + +-- use a sequence (cannot use query_results_equal, since sequence values would not match) +CREATE SEQUENCE seq; +TRUNCATE local_dest_table; +INSERT INTO local_dest_table (col_5, col_3) +SELECT 12, 'string_11' FROM dist_source_table_1 +UNION +SELECT nextval('seq'), 'string' FROM dist_source_table_1; +SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8; \set VERBOSITY terse DROP SCHEMA insert_select_into_local_table CASCADE;