mirror of https://github.com/citusdata/citus.git
Add regression tests for intermediate results
parent
4cdadfcab6
commit
716448ddef
|
@ -0,0 +1,209 @@
|
||||||
|
-- Test functions for copying intermediate results
|
||||||
|
CREATE SCHEMA intermediate_results;
|
||||||
|
SET search_path TO 'intermediate_results';
|
||||||
|
-- in the same transaction we can read a result
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
create_intermediate_result
|
||||||
|
----------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int);
|
||||||
|
x | x2
|
||||||
|
---+----
|
||||||
|
1 | 1
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
4 | 16
|
||||||
|
5 | 25
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- in separate transactions, the result is no longer available
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
create_intermediate_result
|
||||||
|
----------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int);
|
||||||
|
ERROR: result "squares" does not exist
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE interesting_squares (user_id text, interested_in text);
|
||||||
|
SELECT create_distributed_table('interesting_squares', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO interesting_squares VALUES ('jon', '2'), ('jon', '5'), ('jack', '3');
|
||||||
|
-- put an intermediate result on all workers
|
||||||
|
SELECT broadcast_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
broadcast_intermediate_result
|
||||||
|
-------------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- query the intermediate result in a router query
|
||||||
|
SELECT x, x2
|
||||||
|
FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int)) squares ON (x::text = interested_in)
|
||||||
|
WHERE user_id = 'jon'
|
||||||
|
ORDER BY x;
|
||||||
|
x | x2
|
||||||
|
---+----
|
||||||
|
2 | 4
|
||||||
|
5 | 25
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
END;
|
||||||
|
BEGIN;
|
||||||
|
-- put an intermediate result on all workers
|
||||||
|
SELECT broadcast_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
broadcast_intermediate_result
|
||||||
|
-------------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- query the intermediate result in a distributed query
|
||||||
|
SELECT x, x2
|
||||||
|
FROM interesting_squares
|
||||||
|
JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int)) squares ON (x::text = interested_in)
|
||||||
|
ORDER BY x;
|
||||||
|
x | x2
|
||||||
|
---+----
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
5 | 25
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
END;
|
||||||
|
-- files should now be cleaned up
|
||||||
|
SELECT x, x2
|
||||||
|
FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in)
|
||||||
|
WHERE user_id = 'jon'
|
||||||
|
ORDER BY x;
|
||||||
|
WARNING: result "squares" does not exist
|
||||||
|
CONTEXT: while executing command on localhost:57638
|
||||||
|
WARNING: result "squares" does not exist
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
ERROR: could not receive query results
|
||||||
|
-- try to read the file as text, will fail because of binary encoding
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
create_intermediate_result
|
||||||
|
----------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int);
|
||||||
|
ERROR: invalid byte sequence for encoding "UTF8": 0x00
|
||||||
|
END;
|
||||||
|
-- try to read the file with wrong encoding
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
create_intermediate_result
|
||||||
|
----------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'csv') AS res (x int, x2 int);
|
||||||
|
ERROR: invalid input syntax for integer: "PGCOPY"
|
||||||
|
END;
|
||||||
|
-- try a composite type
|
||||||
|
CREATE TYPE intermediate_results.square_type AS (x text, x2 int);
|
||||||
|
SELECT run_command_on_workers('CREATE TYPE intermediate_results.square_type AS (x text, x2 int)');
|
||||||
|
run_command_on_workers
|
||||||
|
-----------------------------------
|
||||||
|
(localhost,57637,t,"CREATE TYPE")
|
||||||
|
(localhost,57638,t,"CREATE TYPE")
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
CREATE TABLE stored_squares (user_id text, square intermediate_results.square_type, metadata jsonb);
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(2,4)'::intermediate_results.square_type, '{"value":2}');
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(3,9)'::intermediate_results.square_type, '{"value":3}');
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(4,16)'::intermediate_results.square_type, '{"value":4}');
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(5,25)'::intermediate_results.square_type, '{"value":5}');
|
||||||
|
-- composite types change the format to text
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares');
|
||||||
|
create_intermediate_result
|
||||||
|
----------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM read_intermediate_result('stored_squares', 'binary') AS res (s intermediate_results.square_type);
|
||||||
|
ERROR: COPY file signature not recognized
|
||||||
|
COMMIT;
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares');
|
||||||
|
create_intermediate_result
|
||||||
|
----------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type);
|
||||||
|
s
|
||||||
|
--------
|
||||||
|
(2,4)
|
||||||
|
(3,9)
|
||||||
|
(4,16)
|
||||||
|
(5,25)
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
BEGIN;
|
||||||
|
-- put an intermediate result in text format on all workers
|
||||||
|
SELECT broadcast_intermediate_result('stored_squares', 'SELECT square, metadata FROM stored_squares');
|
||||||
|
broadcast_intermediate_result
|
||||||
|
-------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- query the intermediate result in a router query using text format
|
||||||
|
SELECT * FROM interesting_squares JOIN (
|
||||||
|
SELECT * FROM
|
||||||
|
read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type, m jsonb)
|
||||||
|
) squares
|
||||||
|
ON ((s).x = interested_in) WHERE user_id = 'jon' ORDER BY 1,2;
|
||||||
|
user_id | interested_in | s | m
|
||||||
|
---------+---------------+--------+--------------
|
||||||
|
jon | 2 | (2,4) | {"value": 2}
|
||||||
|
jon | 5 | (5,25) | {"value": 5}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- query the intermediate result in a real-time query using text format
|
||||||
|
SELECT * FROM interesting_squares JOIN (
|
||||||
|
SELECT * FROM
|
||||||
|
read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type, m jsonb)
|
||||||
|
) squares
|
||||||
|
ON ((s).x = interested_in) ORDER BY 1,2;
|
||||||
|
user_id | interested_in | s | m
|
||||||
|
---------+---------------+--------+--------------
|
||||||
|
jack | 3 | (3,9) | {"value": 3}
|
||||||
|
jon | 2 | (2,4) | {"value": 2}
|
||||||
|
jon | 5 | (5,25) | {"value": 5}
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
END;
|
||||||
|
-- pipe query output into a result file and create a table to check the result
|
||||||
|
COPY (SELECT s, s*s FROM generate_series(1,5) s)
|
||||||
|
TO PROGRAM
|
||||||
|
$$psql -h localhost -p 57636 -U postgres -d regression -c "BEGIN; COPY squares FROM STDIN WITH (format result); CREATE TABLE intermediate_results.squares AS SELECT * FROM read_intermediate_result('squares', 'binary') AS res(x int, x2 int); END;"$$
|
||||||
|
WITH (FORMAT binary);
|
||||||
|
SELECT * FROM squares ORDER BY x;
|
||||||
|
x | x2
|
||||||
|
---+----
|
||||||
|
1 | 1
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
4 | 16
|
||||||
|
5 | 25
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
DROP SCHEMA intermediate_results CASCADE;
|
||||||
|
NOTICE: drop cascades to 4 other objects
|
||||||
|
DETAIL: drop cascades to table interesting_squares
|
||||||
|
drop cascades to type square_type
|
||||||
|
drop cascades to table stored_squares
|
||||||
|
drop cascades to table squares
|
|
@ -41,7 +41,7 @@ test: multi_partitioning_utils multi_partitioning
|
||||||
# ----------
|
# ----------
|
||||||
# Miscellaneous tests to check our query planning behavior
|
# Miscellaneous tests to check our query planning behavior
|
||||||
# ----------
|
# ----------
|
||||||
test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time_transaction
|
test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time_transaction intermediate_results
|
||||||
test: multi_explain
|
test: multi_explain
|
||||||
test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
|
test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
|
||||||
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
|
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
|
||||||
|
|
|
@ -0,0 +1,108 @@
|
||||||
|
-- Test functions for copying intermediate results
|
||||||
|
CREATE SCHEMA intermediate_results;
|
||||||
|
SET search_path TO 'intermediate_results';
|
||||||
|
|
||||||
|
-- in the same transaction we can read a result
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int);
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- in separate transactions, the result is no longer available
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int);
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE interesting_squares (user_id text, interested_in text);
|
||||||
|
SELECT create_distributed_table('interesting_squares', 'user_id');
|
||||||
|
INSERT INTO interesting_squares VALUES ('jon', '2'), ('jon', '5'), ('jack', '3');
|
||||||
|
|
||||||
|
-- put an intermediate result on all workers
|
||||||
|
SELECT broadcast_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
|
||||||
|
-- query the intermediate result in a router query
|
||||||
|
SELECT x, x2
|
||||||
|
FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int)) squares ON (x::text = interested_in)
|
||||||
|
WHERE user_id = 'jon'
|
||||||
|
ORDER BY x;
|
||||||
|
|
||||||
|
END;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- put an intermediate result on all workers
|
||||||
|
SELECT broadcast_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
-- query the intermediate result in a distributed query
|
||||||
|
SELECT x, x2
|
||||||
|
FROM interesting_squares
|
||||||
|
JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int)) squares ON (x::text = interested_in)
|
||||||
|
ORDER BY x;
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- files should now be cleaned up
|
||||||
|
SELECT x, x2
|
||||||
|
FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in)
|
||||||
|
WHERE user_id = 'jon'
|
||||||
|
ORDER BY x;
|
||||||
|
|
||||||
|
-- try to read the file as text, will fail because of binary encoding
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int);
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- try to read the file with wrong encoding
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'csv') AS res (x int, x2 int);
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- try a composite type
|
||||||
|
CREATE TYPE intermediate_results.square_type AS (x text, x2 int);
|
||||||
|
SELECT run_command_on_workers('CREATE TYPE intermediate_results.square_type AS (x text, x2 int)');
|
||||||
|
|
||||||
|
CREATE TABLE stored_squares (user_id text, square intermediate_results.square_type, metadata jsonb);
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(2,4)'::intermediate_results.square_type, '{"value":2}');
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(3,9)'::intermediate_results.square_type, '{"value":3}');
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(4,16)'::intermediate_results.square_type, '{"value":4}');
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(5,25)'::intermediate_results.square_type, '{"value":5}');
|
||||||
|
|
||||||
|
-- composite types change the format to text
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares');
|
||||||
|
SELECT * FROM read_intermediate_result('stored_squares', 'binary') AS res (s intermediate_results.square_type);
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares');
|
||||||
|
SELECT * FROM read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type);
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- put an intermediate result in text format on all workers
|
||||||
|
SELECT broadcast_intermediate_result('stored_squares', 'SELECT square, metadata FROM stored_squares');
|
||||||
|
|
||||||
|
-- query the intermediate result in a router query using text format
|
||||||
|
SELECT * FROM interesting_squares JOIN (
|
||||||
|
SELECT * FROM
|
||||||
|
read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type, m jsonb)
|
||||||
|
) squares
|
||||||
|
ON ((s).x = interested_in) WHERE user_id = 'jon' ORDER BY 1,2;
|
||||||
|
|
||||||
|
-- query the intermediate result in a real-time query using text format
|
||||||
|
SELECT * FROM interesting_squares JOIN (
|
||||||
|
SELECT * FROM
|
||||||
|
read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type, m jsonb)
|
||||||
|
) squares
|
||||||
|
ON ((s).x = interested_in) ORDER BY 1,2;
|
||||||
|
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- pipe query output into a result file and create a table to check the result
|
||||||
|
COPY (SELECT s, s*s FROM generate_series(1,5) s)
|
||||||
|
TO PROGRAM
|
||||||
|
$$psql -h localhost -p 57636 -U postgres -d regression -c "BEGIN; COPY squares FROM STDIN WITH (format result); CREATE TABLE intermediate_results.squares AS SELECT * FROM read_intermediate_result('squares', 'binary') AS res(x int, x2 int); END;"$$
|
||||||
|
WITH (FORMAT binary);
|
||||||
|
|
||||||
|
SELECT * FROM squares ORDER BY x;
|
||||||
|
|
||||||
|
DROP SCHEMA intermediate_results CASCADE;
|
Loading…
Reference in New Issue