mirror of https://github.com/citusdata/citus.git
Improve insert into local select tests
parent
f849487cb3
commit
61083f9187
|
@ -1665,6 +1665,13 @@ AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
||||||
List *projectedEntries = NIL;
|
List *projectedEntries = NIL;
|
||||||
List *nonProjectedEntries = NIL;
|
List *nonProjectedEntries = NIL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ReorderInsertSelectTargetLists() makes sure that first few columns of
|
||||||
|
* the SELECT query match the insert targets. It might contain additional
|
||||||
|
* items for GROUP BY, etc.
|
||||||
|
*/
|
||||||
|
Assert(list_length(insertTargetList) <= list_length(selectTargetList));
|
||||||
|
|
||||||
Relation distributedRelation = table_open(targetRelationId, RowExclusiveLock);
|
Relation distributedRelation = table_open(targetRelationId, RowExclusiveLock);
|
||||||
TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation);
|
TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation);
|
||||||
|
|
||||||
|
|
|
@ -149,6 +149,67 @@ SELECT * FROM non_dist_unique ORDER BY 1;
|
||||||
5 | 8
|
5 | 8
|
||||||
(5 rows)
|
(5 rows)
|
||||||
|
|
||||||
|
INSERT INTO non_dist_unique
|
||||||
|
SELECT a+1, b FROM dist_table
|
||||||
|
UNION ALL
|
||||||
|
SELECT a+100, b FROM dist_table
|
||||||
|
ON CONFLICT (a) DO NOTHING;
|
||||||
|
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 6
|
||||||
|
2 | 7
|
||||||
|
3 | 14
|
||||||
|
4 | 15
|
||||||
|
5 | 8
|
||||||
|
101 | 6
|
||||||
|
102 | 7
|
||||||
|
103 | 8
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
INSERT INTO non_dist_unique
|
||||||
|
SELECT a+1, b FROM dist_table
|
||||||
|
UNION ALL
|
||||||
|
SELECT a+100, b FROM dist_table
|
||||||
|
ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + 1;
|
||||||
|
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 6
|
||||||
|
2 | 7
|
||||||
|
3 | 8
|
||||||
|
4 | 9
|
||||||
|
5 | 8
|
||||||
|
101 | 7
|
||||||
|
102 | 8
|
||||||
|
103 | 9
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
|
||||||
|
INSERT INTO non_dist_unique
|
||||||
|
WITH cte2 AS (SELECT s FROM generate_series(1,10) s)
|
||||||
|
SELECT a+1, b FROM dist_table WHERE b IN (SELECT s FROM cte1)
|
||||||
|
UNION ALL
|
||||||
|
SELECT s, s FROM cte1
|
||||||
|
ON CONFLICT (a) DO NOTHING;
|
||||||
|
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 6
|
||||||
|
2 | 7
|
||||||
|
3 | 8
|
||||||
|
4 | 9
|
||||||
|
5 | 8
|
||||||
|
6 | 6
|
||||||
|
7 | 7
|
||||||
|
8 | 8
|
||||||
|
9 | 9
|
||||||
|
10 | 10
|
||||||
|
101 | 7
|
||||||
|
102 | 8
|
||||||
|
103 | 9
|
||||||
|
(13 rows)
|
||||||
|
|
||||||
DROP TABLE non_dist_unique;
|
DROP TABLE non_dist_unique;
|
||||||
-- test INSERT INTO a table with DEFAULT
|
-- test INSERT INTO a table with DEFAULT
|
||||||
CREATE TABLE non_dist_default (a INT, c TEXT DEFAULT 'def');
|
CREATE TABLE non_dist_default (a INT, c TEXT DEFAULT 'def');
|
||||||
|
@ -168,6 +229,16 @@ SELECT * FROM non_dist_default ORDER BY 1, 2;
|
||||||
3 | def
|
3 | def
|
||||||
(3 rows)
|
(3 rows)
|
||||||
|
|
||||||
|
SELECT alter_table_set_access_method('non_dist_default', 'columnar');
|
||||||
|
NOTICE: creating a new table for insert_select_into_local_table.non_dist_default
|
||||||
|
NOTICE: moving the data of insert_select_into_local_table.non_dist_default
|
||||||
|
NOTICE: dropping the old insert_select_into_local_table.non_dist_default
|
||||||
|
NOTICE: renaming the new table to insert_select_into_local_table.non_dist_default
|
||||||
|
alter_table_set_access_method
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1;
|
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1;
|
||||||
SELECT * FROM non_dist_default ORDER BY 1, 2;
|
SELECT * FROM non_dist_default ORDER BY 1, 2;
|
||||||
a | c
|
a | c
|
||||||
|
@ -355,104 +426,345 @@ SELECT * FROM non_dist_2 ORDER BY 1, 2;
|
||||||
|
|
||||||
TRUNCATE non_dist_2;
|
TRUNCATE non_dist_2;
|
||||||
-- check issue https://github.com/citusdata/citus/issues/5858
|
-- check issue https://github.com/citusdata/citus/issues/5858
|
||||||
CREATE TABLE local_table(
|
CREATE TABLE local_dest_table(
|
||||||
col_1 integer,
|
col_1 integer,
|
||||||
col_2 integer,
|
col_2 integer,
|
||||||
col_3 text,
|
col_3 text,
|
||||||
col_4 text,
|
col_4 text,
|
||||||
|
drop_col text,
|
||||||
col_5 int,
|
col_5 int,
|
||||||
col_6 text,
|
col_6 text,
|
||||||
col_7 text,
|
col_7 text default 'col_7',
|
||||||
col_8 text
|
col_8 text
|
||||||
);
|
);
|
||||||
CREATE TABLE dist_table_1(
|
ALTER TABLE local_dest_table DROP COLUMN drop_col;
|
||||||
dist_col integer,
|
CREATE TABLE dist_source_table_1(
|
||||||
int_col integer,
|
int_col integer,
|
||||||
|
drop_col text,
|
||||||
text_col_1 text,
|
text_col_1 text,
|
||||||
|
dist_col integer,
|
||||||
text_col_2 text
|
text_col_2 text
|
||||||
);
|
);
|
||||||
SELECT create_distributed_table('dist_table_1', 'dist_col');
|
SELECT create_distributed_table('dist_source_table_1', 'dist_col');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
INSERT INTO dist_table_1 VALUES (1, 1, 'string', 'string');
|
ALTER TABLE dist_source_table_1 DROP COLUMN drop_col;
|
||||||
CREATE TABLE dist_table_2(
|
INSERT INTO dist_source_table_1 VALUES (1, 'value', 1, 'value');
|
||||||
|
INSERT INTO dist_source_table_1 VALUES (2, 'value2', 1, 'value');
|
||||||
|
INSERT INTO dist_source_table_1 VALUES (3, 'value', 3, 'value3');
|
||||||
|
CREATE TABLE dist_source_table_2(
|
||||||
dist_col integer,
|
dist_col integer,
|
||||||
int_col integer
|
int_col integer
|
||||||
);
|
);
|
||||||
SELECT create_distributed_table('dist_table_2', 'dist_col');
|
SELECT create_distributed_table('dist_source_table_2', 'dist_col');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
INSERT INTO dist_table_2 VALUES (1, 1);
|
INSERT INTO dist_source_table_2 VALUES (1, 1);
|
||||||
INSERT INTO local_table
|
INSERT INTO dist_source_table_2 VALUES (2, 2);
|
||||||
|
INSERT INTO dist_source_table_2 VALUES (4, 4);
|
||||||
|
CREATE TABLE local_source_table_1 AS SELECT * FROM dist_source_table_1;
|
||||||
|
CREATE TABLE local_source_table_2 AS SELECT * FROM dist_source_table_2;
|
||||||
|
/*
|
||||||
|
* query_results_equal compares the effect of two queries on local_dest_table.
|
||||||
|
* We use this to ensure that INSERT INTO local_dest_table SELECT behaves
|
||||||
|
* the same when selecting from a regular table (postgres handles it) and
|
||||||
|
* a distributed table (Citus handles it).
|
||||||
|
*
|
||||||
|
* The queries are generated by calling format() on query_table twice,
|
||||||
|
* once for each source_table argument.
|
||||||
|
*/
|
||||||
|
CREATE OR REPLACE FUNCTION query_results_equal(query_template text, source_table_1 text, source_table_2 text)
|
||||||
|
RETURNS bool
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
l1 local_dest_table[];
|
||||||
|
l2 local_dest_table[];
|
||||||
|
BEGIN
|
||||||
|
/* get the results using source_table_1 as source */
|
||||||
|
TRUNCATE local_dest_table;
|
||||||
|
EXECUTE format(query_template, source_table_1);
|
||||||
|
SELECT array_agg(l) INTO l1
|
||||||
|
FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l;
|
||||||
|
|
||||||
|
/* get the results using source_table_2 as source */
|
||||||
|
TRUNCATE local_dest_table;
|
||||||
|
EXECUTE format(query_template, source_table_2);
|
||||||
|
SELECT array_agg(l) INTO l2
|
||||||
|
FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l;
|
||||||
|
|
||||||
|
RAISE NOTICE 'l2=%', l1;
|
||||||
|
RAISE NOTICE 'l2=%', l2;
|
||||||
|
RETURN l1 = l2;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table
|
||||||
SELECT
|
SELECT
|
||||||
t1.dist_col,
|
t1.dist_col,
|
||||||
1,
|
1,
|
||||||
'string',
|
'string1',
|
||||||
'string',
|
'string2',
|
||||||
1,
|
2,
|
||||||
'string',
|
'string3',
|
||||||
t1.text_col_1,
|
t1.text_col_1,
|
||||||
t1.text_col_2
|
t1.text_col_2
|
||||||
FROM dist_table_1 t1
|
FROM %1$s_1 t1
|
||||||
WHERE t1.int_col IN (SELECT int_col FROM dist_table_2);
|
WHERE t1.int_col IN (SELECT int_col FROM %1$s_2)
|
||||||
INSERT INTO local_table
|
$$, 'local_source_table', 'dist_source_table');
|
||||||
SELECT
|
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"}
|
||||||
t1.dist_col,
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
1,
|
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"}
|
||||||
'string',
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
'string',
|
query_results_equal
|
||||||
1,
|
|
||||||
'string',
|
|
||||||
t1.text_col_1,
|
|
||||||
t1.text_col_2
|
|
||||||
FROM dist_table_1 t1
|
|
||||||
returning *;
|
|
||||||
col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8
|
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
1 | 1 | string | string | 1 | string | string | string
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
INSERT INTO local_table (col_3, col_4) SELECT
|
SELECT * FROM query_results_equal($$
|
||||||
'string',
|
INSERT INTO local_dest_table
|
||||||
'string'::text
|
|
||||||
FROM dist_table_1 t1
|
|
||||||
returning *;
|
|
||||||
col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
| | string | string | | | |
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) INSERT INTO local_table
|
|
||||||
SELECT
|
SELECT
|
||||||
t1.dist_col,
|
t1.dist_col,
|
||||||
1,
|
1,
|
||||||
'string',
|
'string1',
|
||||||
'string',
|
'string2',
|
||||||
1,
|
2,
|
||||||
'string',
|
'string3',
|
||||||
t1.text_col_1,
|
t1.text_col_1,
|
||||||
t1.text_col_2
|
t1.text_col_2
|
||||||
FROM dist_table_1 t1
|
FROM %1$s t1
|
||||||
RETURNING *;
|
returning *
|
||||||
QUERY PLAN
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
query_results_equal
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
Insert on local_table (actual rows=1 loops=1)
|
t
|
||||||
-> Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
|
(1 row)
|
||||||
Task Count: 4
|
|
||||||
Tuple data received from nodes: 42 bytes
|
SELECT * FROM query_results_equal($$
|
||||||
Tasks Shown: One of 4
|
INSERT INTO local_dest_table (col_3, col_4) SELECT
|
||||||
-> Task
|
'string1',
|
||||||
Tuple data received from node: 42 bytes
|
'string2'::text
|
||||||
Node: host=localhost port=xxxxx dbname=regression
|
FROM %1$s t1
|
||||||
-> Seq Scan on dist_table_1_11235805 t1 (actual rows=1 loops=1)
|
returning *;
|
||||||
(9 rows)
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
query_results_equal
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_7, col_4) SELECT
|
||||||
|
'string1',
|
||||||
|
'string2'::text
|
||||||
|
FROM %1$s t1
|
||||||
|
returning *;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
query_results_equal
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_4, col_3) SELECT
|
||||||
|
'string1',
|
||||||
|
'string2'::text
|
||||||
|
FROM %1$s t1
|
||||||
|
WHERE dist_col = 1
|
||||||
|
returning *;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
query_results_equal
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_4, col_1)
|
||||||
|
SELECT
|
||||||
|
'string1',
|
||||||
|
dist_col
|
||||||
|
FROM %1$s
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
'string',
|
||||||
|
int_col
|
||||||
|
FROM %1$s;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
query_results_equal
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
|
||||||
|
INSERT INTO local_dest_table (col_4, col_1)
|
||||||
|
SELECT
|
||||||
|
'string1',
|
||||||
|
dist_col
|
||||||
|
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
'string',
|
||||||
|
int_col
|
||||||
|
FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1)
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
query_results_equal
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s)
|
||||||
|
INSERT INTO local_dest_table (col_4, col_1)
|
||||||
|
SELECT
|
||||||
|
'string1',
|
||||||
|
dist_col
|
||||||
|
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
*
|
||||||
|
FROM cte1
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
query_results_equal
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_3)
|
||||||
|
SELECT t1.text_col_1
|
||||||
|
FROM %1$s t1
|
||||||
|
GROUP BY t1.text_col_1;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
query_results_equal
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8)
|
||||||
|
SELECT
|
||||||
|
max(t1.dist_col),
|
||||||
|
3,
|
||||||
|
'string_3',
|
||||||
|
4,
|
||||||
|
'string_4',
|
||||||
|
t1.text_col_1,
|
||||||
|
'string_1000'
|
||||||
|
FROM %1$s t1
|
||||||
|
GROUP BY t1.text_col_2, t1.text_col_1;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
NOTICE: l2={"(1,3,string_3,,4,string_4,value,string_1000)","(1,3,string_3,,4,string_4,value2,string_1000)","(3,3,string_3,,4,string_4,value,string_1000)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
NOTICE: l2={"(1,3,string_3,,4,string_4,value,string_1000)","(1,3,string_3,,4,string_4,value2,string_1000)","(3,3,string_3,,4,string_4,value,string_1000)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
query_results_equal
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_7, col_8)
|
||||||
|
SELECT
|
||||||
|
t1.text_col_1,
|
||||||
|
'string_1000'
|
||||||
|
FROM dist_source_table_1 t1
|
||||||
|
GROUP BY t1.text_col_1;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
query_results_equal
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_6, col_7, col_8)
|
||||||
|
SELECT
|
||||||
|
'string_4',
|
||||||
|
t1.text_col_1,
|
||||||
|
'string_1000'
|
||||||
|
FROM %1$s t1
|
||||||
|
GROUP BY t1.text_col_1;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
query_results_equal
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_5, col_3)
|
||||||
|
SELECT 12, 'string_11' FROM %1$s t1
|
||||||
|
UNION
|
||||||
|
SELECT int_col, 'string' FROM %1$s;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"}
|
||||||
|
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||||
|
query_results_equal
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- use a sequence (cannot use query_results_equal, since sequence values would not match)
|
||||||
|
CREATE SEQUENCE seq;
|
||||||
|
TRUNCATE local_dest_table;
|
||||||
|
INSERT INTO local_dest_table (col_5, col_3)
|
||||||
|
SELECT 12, 'string_11' FROM dist_source_table_1
|
||||||
|
UNION
|
||||||
|
SELECT nextval('seq'), 'string' FROM dist_source_table_1;
|
||||||
|
SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8;
|
||||||
|
col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
| | string | | 1 | | col_7 |
|
||||||
|
| | string | | 2 | | col_7 |
|
||||||
|
| | string | | 3 | | col_7 |
|
||||||
|
| | string_11 | | 12 | | col_7 |
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
DROP SCHEMA insert_select_into_local_table CASCADE;
|
DROP SCHEMA insert_select_into_local_table CASCADE;
|
||||||
NOTICE: drop cascades to 8 other objects
|
NOTICE: drop cascades to 12 other objects
|
||||||
|
|
|
@ -64,6 +64,30 @@ INSERT INTO non_dist_unique SELECT a+1, b FROM dist_table ON CONFLICT (a) DO NOT
|
||||||
SELECT * FROM non_dist_unique ORDER BY 1;
|
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||||
INSERT INTO non_dist_unique SELECT a+2, b FROM dist_table ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + non_dist_unique.b;
|
INSERT INTO non_dist_unique SELECT a+2, b FROM dist_table ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + non_dist_unique.b;
|
||||||
SELECT * FROM non_dist_unique ORDER BY 1;
|
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||||
|
|
||||||
|
INSERT INTO non_dist_unique
|
||||||
|
SELECT a+1, b FROM dist_table
|
||||||
|
UNION ALL
|
||||||
|
SELECT a+100, b FROM dist_table
|
||||||
|
ON CONFLICT (a) DO NOTHING;
|
||||||
|
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||||
|
|
||||||
|
INSERT INTO non_dist_unique
|
||||||
|
SELECT a+1, b FROM dist_table
|
||||||
|
UNION ALL
|
||||||
|
SELECT a+100, b FROM dist_table
|
||||||
|
ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + 1;
|
||||||
|
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||||
|
|
||||||
|
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
|
||||||
|
INSERT INTO non_dist_unique
|
||||||
|
WITH cte2 AS (SELECT s FROM generate_series(1,10) s)
|
||||||
|
SELECT a+1, b FROM dist_table WHERE b IN (SELECT s FROM cte1)
|
||||||
|
UNION ALL
|
||||||
|
SELECT s, s FROM cte1
|
||||||
|
ON CONFLICT (a) DO NOTHING;
|
||||||
|
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||||
|
|
||||||
DROP TABLE non_dist_unique;
|
DROP TABLE non_dist_unique;
|
||||||
|
|
||||||
|
|
||||||
|
@ -73,6 +97,7 @@ INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a = 1;
|
||||||
SELECT * FROM non_dist_default ORDER BY 1, 2;
|
SELECT * FROM non_dist_default ORDER BY 1, 2;
|
||||||
INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a > 1;
|
INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a > 1;
|
||||||
SELECT * FROM non_dist_default ORDER BY 1, 2;
|
SELECT * FROM non_dist_default ORDER BY 1, 2;
|
||||||
|
SELECT alter_table_set_access_method('non_dist_default', 'columnar');
|
||||||
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1;
|
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1;
|
||||||
SELECT * FROM non_dist_default ORDER BY 1, 2;
|
SELECT * FROM non_dist_default ORDER BY 1, 2;
|
||||||
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a > 1;
|
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a > 1;
|
||||||
|
@ -150,78 +175,232 @@ SELECT * FROM non_dist_2 ORDER BY 1, 2;
|
||||||
TRUNCATE non_dist_2;
|
TRUNCATE non_dist_2;
|
||||||
|
|
||||||
-- check issue https://github.com/citusdata/citus/issues/5858
|
-- check issue https://github.com/citusdata/citus/issues/5858
|
||||||
CREATE TABLE local_table(
|
CREATE TABLE local_dest_table(
|
||||||
col_1 integer,
|
col_1 integer,
|
||||||
col_2 integer,
|
col_2 integer,
|
||||||
col_3 text,
|
col_3 text,
|
||||||
col_4 text,
|
col_4 text,
|
||||||
|
drop_col text,
|
||||||
col_5 int,
|
col_5 int,
|
||||||
col_6 text,
|
col_6 text,
|
||||||
col_7 text,
|
col_7 text default 'col_7',
|
||||||
col_8 text
|
col_8 text
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE dist_table_1(
|
ALTER TABLE local_dest_table DROP COLUMN drop_col;
|
||||||
dist_col integer,
|
|
||||||
|
CREATE TABLE dist_source_table_1(
|
||||||
int_col integer,
|
int_col integer,
|
||||||
|
drop_col text,
|
||||||
text_col_1 text,
|
text_col_1 text,
|
||||||
|
dist_col integer,
|
||||||
text_col_2 text
|
text_col_2 text
|
||||||
);
|
);
|
||||||
SELECT create_distributed_table('dist_table_1', 'dist_col');
|
SELECT create_distributed_table('dist_source_table_1', 'dist_col');
|
||||||
|
|
||||||
INSERT INTO dist_table_1 VALUES (1, 1, 'string', 'string');
|
ALTER TABLE dist_source_table_1 DROP COLUMN drop_col;
|
||||||
|
|
||||||
CREATE TABLE dist_table_2(
|
INSERT INTO dist_source_table_1 VALUES (1, 'value', 1, 'value');
|
||||||
|
INSERT INTO dist_source_table_1 VALUES (2, 'value2', 1, 'value');
|
||||||
|
INSERT INTO dist_source_table_1 VALUES (3, 'value', 3, 'value3');
|
||||||
|
|
||||||
|
CREATE TABLE dist_source_table_2(
|
||||||
dist_col integer,
|
dist_col integer,
|
||||||
int_col integer
|
int_col integer
|
||||||
);
|
);
|
||||||
SELECT create_distributed_table('dist_table_2', 'dist_col');
|
SELECT create_distributed_table('dist_source_table_2', 'dist_col');
|
||||||
|
|
||||||
INSERT INTO dist_table_2 VALUES (1, 1);
|
INSERT INTO dist_source_table_2 VALUES (1, 1);
|
||||||
|
INSERT INTO dist_source_table_2 VALUES (2, 2);
|
||||||
|
INSERT INTO dist_source_table_2 VALUES (4, 4);
|
||||||
|
|
||||||
INSERT INTO local_table
|
CREATE TABLE local_source_table_1 AS SELECT * FROM dist_source_table_1;
|
||||||
|
CREATE TABLE local_source_table_2 AS SELECT * FROM dist_source_table_2;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* query_results_equal compares the effect of two queries on local_dest_table.
|
||||||
|
* We use this to ensure that INSERT INTO local_dest_table SELECT behaves
|
||||||
|
* the same when selecting from a regular table (postgres handles it) and
|
||||||
|
* a distributed table (Citus handles it).
|
||||||
|
*
|
||||||
|
* The queries are generated by calling format() on query_table twice,
|
||||||
|
* once for each source_table argument.
|
||||||
|
*/
|
||||||
|
CREATE OR REPLACE FUNCTION query_results_equal(query_template text, source_table_1 text, source_table_2 text)
|
||||||
|
RETURNS bool
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
l1 local_dest_table[];
|
||||||
|
l2 local_dest_table[];
|
||||||
|
BEGIN
|
||||||
|
/* get the results using source_table_1 as source */
|
||||||
|
TRUNCATE local_dest_table;
|
||||||
|
EXECUTE format(query_template, source_table_1);
|
||||||
|
SELECT array_agg(l) INTO l1
|
||||||
|
FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l;
|
||||||
|
|
||||||
|
/* get the results using source_table_2 as source */
|
||||||
|
TRUNCATE local_dest_table;
|
||||||
|
EXECUTE format(query_template, source_table_2);
|
||||||
|
SELECT array_agg(l) INTO l2
|
||||||
|
FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l;
|
||||||
|
|
||||||
|
RAISE NOTICE 'l2=%', l1;
|
||||||
|
RAISE NOTICE 'l2=%', l2;
|
||||||
|
RETURN l1 = l2;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table
|
||||||
SELECT
|
SELECT
|
||||||
t1.dist_col,
|
t1.dist_col,
|
||||||
1,
|
1,
|
||||||
'string',
|
'string1',
|
||||||
'string',
|
'string2',
|
||||||
1,
|
2,
|
||||||
'string',
|
'string3',
|
||||||
t1.text_col_1,
|
t1.text_col_1,
|
||||||
t1.text_col_2
|
t1.text_col_2
|
||||||
FROM dist_table_1 t1
|
FROM %1$s_1 t1
|
||||||
WHERE t1.int_col IN (SELECT int_col FROM dist_table_2);
|
WHERE t1.int_col IN (SELECT int_col FROM %1$s_2)
|
||||||
|
$$, 'local_source_table', 'dist_source_table');
|
||||||
|
|
||||||
INSERT INTO local_table
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table
|
||||||
SELECT
|
SELECT
|
||||||
t1.dist_col,
|
t1.dist_col,
|
||||||
1,
|
1,
|
||||||
'string',
|
'string1',
|
||||||
'string',
|
'string2',
|
||||||
1,
|
2,
|
||||||
'string',
|
'string3',
|
||||||
t1.text_col_1,
|
t1.text_col_1,
|
||||||
t1.text_col_2
|
t1.text_col_2
|
||||||
FROM dist_table_1 t1
|
FROM %1$s t1
|
||||||
returning *;
|
returning *
|
||||||
INSERT INTO local_table (col_3, col_4) SELECT
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
'string',
|
|
||||||
'string'::text
|
SELECT * FROM query_results_equal($$
|
||||||
FROM dist_table_1 t1
|
INSERT INTO local_dest_table (col_3, col_4) SELECT
|
||||||
|
'string1',
|
||||||
|
'string2'::text
|
||||||
|
FROM %1$s t1
|
||||||
returning *;
|
returning *;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
|
||||||
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) INSERT INTO local_table
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_7, col_4) SELECT
|
||||||
|
'string1',
|
||||||
|
'string2'::text
|
||||||
|
FROM %1$s t1
|
||||||
|
returning *;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_4, col_3) SELECT
|
||||||
|
'string1',
|
||||||
|
'string2'::text
|
||||||
|
FROM %1$s t1
|
||||||
|
WHERE dist_col = 1
|
||||||
|
returning *;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_4, col_1)
|
||||||
|
SELECT
|
||||||
|
'string1',
|
||||||
|
dist_col
|
||||||
|
FROM %1$s
|
||||||
|
UNION ALL
|
||||||
SELECT
|
SELECT
|
||||||
t1.dist_col,
|
|
||||||
1,
|
|
||||||
'string',
|
'string',
|
||||||
|
int_col
|
||||||
|
FROM %1$s;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
|
||||||
|
INSERT INTO local_dest_table (col_4, col_1)
|
||||||
|
SELECT
|
||||||
|
'string1',
|
||||||
|
dist_col
|
||||||
|
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
'string',
|
'string',
|
||||||
1,
|
int_col
|
||||||
'string',
|
FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1)
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s)
|
||||||
|
INSERT INTO local_dest_table (col_4, col_1)
|
||||||
|
SELECT
|
||||||
|
'string1',
|
||||||
|
dist_col
|
||||||
|
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
*
|
||||||
|
FROM cte1
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_3)
|
||||||
|
SELECT t1.text_col_1
|
||||||
|
FROM %1$s t1
|
||||||
|
GROUP BY t1.text_col_1;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8)
|
||||||
|
SELECT
|
||||||
|
max(t1.dist_col),
|
||||||
|
3,
|
||||||
|
'string_3',
|
||||||
|
4,
|
||||||
|
'string_4',
|
||||||
t1.text_col_1,
|
t1.text_col_1,
|
||||||
t1.text_col_2
|
'string_1000'
|
||||||
FROM dist_table_1 t1
|
FROM %1$s t1
|
||||||
RETURNING *;
|
GROUP BY t1.text_col_2, t1.text_col_1;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_7, col_8)
|
||||||
|
SELECT
|
||||||
|
t1.text_col_1,
|
||||||
|
'string_1000'
|
||||||
|
FROM dist_source_table_1 t1
|
||||||
|
GROUP BY t1.text_col_1;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_6, col_7, col_8)
|
||||||
|
SELECT
|
||||||
|
'string_4',
|
||||||
|
t1.text_col_1,
|
||||||
|
'string_1000'
|
||||||
|
FROM %1$s t1
|
||||||
|
GROUP BY t1.text_col_1;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
|
||||||
|
SELECT * FROM query_results_equal($$
|
||||||
|
INSERT INTO local_dest_table (col_5, col_3)
|
||||||
|
SELECT 12, 'string_11' FROM %1$s t1
|
||||||
|
UNION
|
||||||
|
SELECT int_col, 'string' FROM %1$s;
|
||||||
|
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||||
|
|
||||||
|
-- use a sequence (cannot use query_results_equal, since sequence values would not match)
|
||||||
|
CREATE SEQUENCE seq;
|
||||||
|
TRUNCATE local_dest_table;
|
||||||
|
INSERT INTO local_dest_table (col_5, col_3)
|
||||||
|
SELECT 12, 'string_11' FROM dist_source_table_1
|
||||||
|
UNION
|
||||||
|
SELECT nextval('seq'), 'string' FROM dist_source_table_1;
|
||||||
|
SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8;
|
||||||
|
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
DROP SCHEMA insert_select_into_local_table CASCADE;
|
DROP SCHEMA insert_select_into_local_table CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue