mirror of https://github.com/citusdata/citus.git
MX tests for INSERT/SELECT repartition
parent
af2349f21f
commit
665b33dca1
|
@ -0,0 +1,96 @@
|
||||||
|
-- Test behaviour of repartitioned INSERT ... SELECT in MX setup
|
||||||
|
CREATE SCHEMA multi_mx_insert_select_repartition;
|
||||||
|
SET search_path TO multi_mx_insert_select_repartition;
|
||||||
|
SET citus.next_shard_id TO 4213581;
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
CREATE TABLE source_table(a int, b int);
|
||||||
|
SELECT create_distributed_table('source_table', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO source_table SELECT floor(i/4), i*i FROM generate_series(1, 20) i;
|
||||||
|
SET citus.shard_count TO 3;
|
||||||
|
CREATE TABLE target_table(a int, b int);
|
||||||
|
SELECT create_distributed_table('target_table', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE FUNCTION square(int) RETURNS INT
|
||||||
|
AS $$ SELECT $1 * $1 $$
|
||||||
|
LANGUAGE SQL;
|
||||||
|
select create_distributed_function('square(int)');
|
||||||
|
create_distributed_function
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select public.colocate_proc_with_table('square', 'source_table'::regclass, 0);
|
||||||
|
colocate_proc_with_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test along with function delegation
|
||||||
|
-- function delegation only happens for "SELECT f()", and we don't use
|
||||||
|
-- repartitioned INSERT/SELECT when task count is 1, so the following
|
||||||
|
-- should go via coordinator
|
||||||
|
EXPLAIN (costs off) INSERT INTO target_table(a) SELECT square(4);
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
|
INSERT/SELECT method: pull to coordinator
|
||||||
|
-> Result
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
INSERT INTO target_table(a) SELECT square(4);
|
||||||
|
SELECT * FROM target_table;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
16 |
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
TRUNCATE target_table;
|
||||||
|
--
|
||||||
|
-- Test repartitioned INSERT/SELECT from MX worker
|
||||||
|
--
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO multi_mx_insert_select_repartition;
|
||||||
|
EXPLAIN (costs off) INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
|
INSERT/SELECT method: repartition
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Group Key: a
|
||||||
|
-> Seq Scan on source_table_4213581 source_table
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
|
INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO multi_mx_insert_select_repartition;
|
||||||
|
SELECT * FROM target_table ORDER BY a;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 9
|
||||||
|
1 | 49
|
||||||
|
2 | 121
|
||||||
|
3 | 225
|
||||||
|
4 | 361
|
||||||
|
5 | 400
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
|
\set VERBOSITY terse
|
||||||
|
DROP SCHEMA multi_mx_insert_select_repartition CASCADE;
|
||||||
|
NOTICE: drop cascades to 3 other objects
|
|
@ -43,6 +43,7 @@ test: multi_mx_transaction_recovery
|
||||||
test: multi_mx_modifying_xacts
|
test: multi_mx_modifying_xacts
|
||||||
test: multi_mx_explain
|
test: multi_mx_explain
|
||||||
test: multi_mx_reference_table
|
test: multi_mx_reference_table
|
||||||
|
test: multi_mx_insert_select_repartition
|
||||||
|
|
||||||
# test that no tests leaked intermediate results. This should always be last
|
# test that no tests leaked intermediate results. This should always be last
|
||||||
test: ensure_no_intermediate_data_leak
|
test: ensure_no_intermediate_data_leak
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
-- Test behaviour of repartitioned INSERT ... SELECT in MX setup
|
||||||
|
|
||||||
|
CREATE SCHEMA multi_mx_insert_select_repartition;
|
||||||
|
SET search_path TO multi_mx_insert_select_repartition;
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 4213581;
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
CREATE TABLE source_table(a int, b int);
|
||||||
|
SELECT create_distributed_table('source_table', 'a');
|
||||||
|
INSERT INTO source_table SELECT floor(i/4), i*i FROM generate_series(1, 20) i;
|
||||||
|
|
||||||
|
SET citus.shard_count TO 3;
|
||||||
|
CREATE TABLE target_table(a int, b int);
|
||||||
|
SELECT create_distributed_table('target_table', 'a');
|
||||||
|
|
||||||
|
CREATE FUNCTION square(int) RETURNS INT
|
||||||
|
AS $$ SELECT $1 * $1 $$
|
||||||
|
LANGUAGE SQL;
|
||||||
|
|
||||||
|
select create_distributed_function('square(int)');
|
||||||
|
select public.colocate_proc_with_table('square', 'source_table'::regclass, 0);
|
||||||
|
|
||||||
|
-- Test along with function delegation
|
||||||
|
-- function delegation only happens for "SELECT f()", and we don't use
|
||||||
|
-- repartitioned INSERT/SELECT when task count is 1, so the following
|
||||||
|
-- should go via coordinator
|
||||||
|
EXPLAIN (costs off) INSERT INTO target_table(a) SELECT square(4);
|
||||||
|
INSERT INTO target_table(a) SELECT square(4);
|
||||||
|
SELECT * FROM target_table;
|
||||||
|
|
||||||
|
TRUNCATE target_table;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Test repartitioned INSERT/SELECT from MX worker
|
||||||
|
--
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO multi_mx_insert_select_repartition;
|
||||||
|
EXPLAIN (costs off) INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
|
||||||
|
INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO multi_mx_insert_select_repartition;
|
||||||
|
SELECT * FROM target_table ORDER BY a;
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
|
\set VERBOSITY terse
|
||||||
|
DROP SCHEMA multi_mx_insert_select_repartition CASCADE;
|
Loading…
Reference in New Issue