Add tests for distributing functions with replication_model statement

pull/3129/head
Marco Slot 2019-10-23 04:41:04 +02:00
parent 067657af26
commit 03cae27782
10 changed files with 116 additions and 27 deletions

View File

@ -55,6 +55,9 @@ CREATE TYPE dup_result AS (f1 int, f2 text);
CREATE FUNCTION dup(int) RETURNS dup_result
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
LANGUAGE SQL;
CREATE FUNCTION increment(int2) RETURNS int
AS $$ SELECT $1 + 1$$
LANGUAGE SQL;
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
AS 'select $1 + $2;'
LANGUAGE SQL
@ -73,9 +76,21 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
-- make sure to propagate ddl propagation after we have setup our functions, this will
-- allow alter statements to be propagated and keep the functions in sync across machines
SET citus.enable_ddl_propagation TO on;
-- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id int);
SELECT create_distributed_table('colocation_table','id');
-- use an unusual type to force a new colocation group
CREATE TABLE statement_table(id int2);
SET citus.replication_model TO 'statement';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('statement_table','id');
create_distributed_table
--------------------------
(1 row)
-- create a table uses streaming-based replication (can be synced)
CREATE TABLE streaming_table(id int);
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('streaming_table','id');
create_distributed_table
--------------------------
@ -120,7 +135,24 @@ select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'pr
f
(1 row)
SELECT create_distributed_function('dup(int)', '$1');
-- try to co-locate with a table that uses statement-based replication
SELECT create_distributed_function('increment(int2)', '$1');
ERROR: cannot colocate function "increment" and table "statement_table"
DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model.
HINT: When distributing tables make sure that citus.replication_model = 'streaming'
SELECT create_distributed_function('increment(int2)', '$1', colocate_with := 'statement_table');
ERROR: cannot colocate function "increment" and table "statement_table"
DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model.
HINT: When distributing tables make sure that citus.replication_model = 'streaming'
BEGIN;
SET LOCAL citus.replication_model TO 'statement';
DROP TABLE statement_table;
SELECT create_distributed_function('increment(int2)', '$1');
ERROR: cannot create a function with a distribution argument when citus.replication_model is 'statement'
HINT: Set citus.replication_model to 'streaming' before creating distributed tables
END;
-- try to co-locate with a table that uses streaming replication
SELECT create_distributed_function('dup(int)', '$1', colocate_with := 'streaming_table');
create_distributed_function
-----------------------------
@ -133,7 +165,7 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY
localhost | 57638 | t | (42,"42 is text")
(2 rows)
SELECT create_distributed_function('add(int,int)', '$1');
SELECT create_distributed_function('add(int,int)', '$1', colocate_with := 'streaming_table');
create_distributed_function
-----------------------------
@ -455,7 +487,7 @@ SELECT create_distributed_table('replicated_table_func_test', 'a');
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test');
ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test"
DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model.
HINT: When distributing tables make sure that "citus.replication_model" is set to "streaming"
HINT: When distributing tables make sure that citus.replication_model = 'streaming'
-- a function can be colocated with a different distribution argument type
-- as long as there is a coercion path
SET citus.shard_replication_factor TO 1;
@ -607,9 +639,9 @@ SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition
(3 rows)
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
TRUNCATE pg_dist_node;
\c - - - :worker_2_port
SET client_min_messages TO error; -- suppress cascading objects dropping
UPDATE pg_dist_local_group SET groupid = 0;
@ -621,9 +653,9 @@ SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition
(3 rows)
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
TRUNCATE pg_dist_node;
\c - - - :master_port
DROP USER functionuser;
SELECT run_command_on_workers($$DROP USER functionuser;$$);

View File

@ -22,13 +22,15 @@ END;
$proc$;
-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id text);
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('colocation_table','id');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_function('raise_info(text)', '$1');
SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'colocation_table');
create_distributed_function
-----------------------------

View File

@ -68,6 +68,7 @@ step s2-public-schema:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
@ -193,6 +194,7 @@ step s2-public-schema:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
@ -315,6 +317,7 @@ step s2-public-schema:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
@ -447,6 +450,7 @@ step s2-create-schema:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
@ -574,6 +578,7 @@ step s2-create-schema:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
@ -698,6 +703,7 @@ step s2-create-schema:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
@ -837,6 +843,7 @@ step s1-add-worker:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
@ -986,6 +993,7 @@ step s3-use-schema:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
@ -1132,6 +1140,7 @@ step s3-create-schema2:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
@ -1506,6 +1515,7 @@ step s2-create-type:
step s2-create-table-with-type:
CREATE TABLE t1 (a int, b tt1);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');

View File

@ -120,6 +120,8 @@ GRANT USAGE ON SCHEMA full_access_user_schema TO full_access;
GRANT ALL ON SCHEMA full_access_user_schema TO full_access;
GRANT USAGE ON SCHEMA full_access_user_schema TO usage_access;
\c - - - :master_port
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
-- create prepare tests
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
PREPARE prepare_select AS SELECT count(*) FROM test;
@ -431,7 +433,7 @@ INSERT INTO full_access_user_schema.t1 VALUES (1),(2),(3);
-- not allowed to create a table
SELECT create_distributed_table('full_access_user_schema.t1', 'id');
ERROR: permission denied for schema full_access_user_schema
CONTEXT: while executing command on localhost:57638
CONTEXT: while executing command on localhost:57637
RESET ROLE;
SET ROLE usage_access;
CREATE TYPE usage_access_type AS ENUM ('a', 'b');
@ -479,12 +481,19 @@ SELECT wait_until_metadata_sync();
(1 row)
CREATE TABLE colocation_table(id text);
SELECT create_distributed_table('colocation_table','id');
create_distributed_table
--------------------------
(1 row)
-- now, make sure that the user can use the function
-- created in the transaction
BEGIN;
CREATE FUNCTION usage_access_func_second(key int, variadic v int[]) RETURNS text
LANGUAGE plpgsql AS 'begin return current_user; end;';
SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1');
SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1', colocate_with := 'colocation_table');
create_distributed_function
-----------------------------
@ -509,7 +518,7 @@ SELECT usesuper FROM pg_user where usename IN (SELECT current_user);
(1 row)
-- superuser creates the distributed function that is owned by a regular user
SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1');
SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1', colocate_with := 'colocation_table');
create_distributed_function
-----------------------------
@ -675,7 +684,7 @@ ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_0000
-- different user should not be able to fetch partition file
SET ROLE usage_access;
SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.37455": No such file or directory
WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.37457": No such file or directory
CONTEXT: while executing command on localhost:57637
ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_00001" from localhost:57637
-- only the user whom created the files should be able to fetch
@ -714,7 +723,7 @@ RESET ROLE;
-- test that the super user is unable to read the contents of the intermediate file,
-- although it does create the table
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
WARNING: Task file "task_000001.36164" does not have expected suffix ".10"
WARNING: Task file "task_000001.36145" does not have expected suffix ".10"
worker_merge_files_into_table
-------------------------------
@ -756,7 +765,7 @@ SELECT worker_merge_files_and_run_query(42, 1,
'CREATE TABLE task_000001_merge(merge_column_0 int)',
'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge'
);
WARNING: Task file "task_000001.36164" does not have expected suffix ".10"
WARNING: Task file "task_000001.36145" does not have expected suffix ".10"
worker_merge_files_and_run_query
----------------------------------
@ -826,7 +835,8 @@ DROP TABLE
my_role_table_with_data,
singleshard,
test,
test_coloc;
test_coloc,
colocation_table;
DROP USER full_access;
DROP USER read_access;
DROP USER no_access;

View File

@ -537,6 +537,7 @@ select start_metadata_sync_to_node('localhost', :worker_2_port);
\c - - - :master_port
SET search_path to multi_mx_function_call_delegation, public;
SET client_min_messages TO DEBUG1;
SET citus.replication_model = 'streaming';
--
-- Test non-const parameter values
--

View File

@ -91,6 +91,7 @@ step "s2-create-table"
{
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
}
@ -104,6 +105,7 @@ step "s2-create-table-with-type"
{
CREATE TABLE t1 (a int, b tt1);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
}

View File

@ -55,6 +55,10 @@ CREATE FUNCTION dup(int) RETURNS dup_result
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
LANGUAGE SQL;
CREATE FUNCTION increment(int2) RETURNS int
AS $$ SELECT $1 + 1$$
LANGUAGE SQL;
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
AS 'select $1 + $2;'
LANGUAGE SQL
@ -77,9 +81,17 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
-- allow alter statements to be propagated and keep the functions in sync across machines
SET citus.enable_ddl_propagation TO on;
-- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id int);
SELECT create_distributed_table('colocation_table','id');
-- use an unusual type to force a new colocation group
CREATE TABLE statement_table(id int2);
SET citus.replication_model TO 'statement';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('statement_table','id');
-- create a table uses streaming-based replication (can be synced)
CREATE TABLE streaming_table(id int);
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('streaming_table','id');
-- make sure that none of the active and primary nodes hasmetadata
-- at the start of the test
@ -98,10 +110,20 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_name
-- since the function doesn't have a parameter
select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
SELECT create_distributed_function('dup(int)', '$1');
-- try to co-locate with a table that uses statement-based replication
SELECT create_distributed_function('increment(int2)', '$1');
SELECT create_distributed_function('increment(int2)', '$1', colocate_with := 'statement_table');
BEGIN;
SET LOCAL citus.replication_model TO 'statement';
DROP TABLE statement_table;
SELECT create_distributed_function('increment(int2)', '$1');
END;
-- try to co-locate with a table that uses streaming replication
SELECT create_distributed_function('dup(int)', '$1', colocate_with := 'streaming_table');
SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2;
SELECT create_distributed_function('add(int,int)', '$1');
SELECT create_distributed_function('add(int,int)', '$1', colocate_with := 'streaming_table');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
@ -318,17 +340,17 @@ DROP SCHEMA function_tests2 CASCADE;
SET client_min_messages TO error; -- suppress cascading objects dropping
UPDATE pg_dist_local_group SET groupid = 0;
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%';
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
TRUNCATE pg_dist_node;
\c - - - :worker_2_port
SET client_min_messages TO error; -- suppress cascading objects dropping
UPDATE pg_dist_local_group SET groupid = 0;
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%';
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
TRUNCATE pg_dist_node;
\c - - - :master_port

View File

@ -19,9 +19,11 @@ $proc$;
-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id text);
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('colocation_table','id');
SELECT create_distributed_function('raise_info(text)', '$1');
SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'colocation_table');
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');

View File

@ -92,6 +92,9 @@ GRANT USAGE ON SCHEMA full_access_user_schema TO usage_access;
\c - - - :master_port
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
-- create prepare tests
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
PREPARE prepare_select AS SELECT count(*) FROM test;
@ -296,12 +299,15 @@ SELECT run_command_on_workers($$SELECT proowner::regrole FROM pg_proc WHERE pron
SELECT wait_until_metadata_sync();
CREATE TABLE colocation_table(id text);
SELECT create_distributed_table('colocation_table','id');
-- now, make sure that the user can use the function
-- created in the transaction
BEGIN;
CREATE FUNCTION usage_access_func_second(key int, variadic v int[]) RETURNS text
LANGUAGE plpgsql AS 'begin return current_user; end;';
SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1');
SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1', colocate_with := 'colocation_table');
SELECT usage_access_func_second(1, 2,3,4,5) FROM full_access_user_schema.t1 LIMIT 1;
@ -317,7 +323,7 @@ CREATE FUNCTION usage_access_func_third(key int, variadic v int[]) RETURNS text
SELECT usesuper FROM pg_user where usename IN (SELECT current_user);
-- superuser creates the distributed function that is owned by a regular user
SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1');
SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1', colocate_with := 'colocation_table');
SELECT proowner::regrole FROM pg_proc WHERE proname = 'usage_access_func_third';
SELECT run_command_on_workers($$SELECT proowner::regrole FROM pg_proc WHERE proname = 'usage_access_func_third'$$);
@ -499,7 +505,8 @@ DROP TABLE
my_role_table_with_data,
singleshard,
test,
test_coloc;
test_coloc,
colocation_table;
DROP USER full_access;
DROP USER read_access;
DROP USER no_access;

View File

@ -225,6 +225,7 @@ select start_metadata_sync_to_node('localhost', :worker_2_port);
\c - - - :master_port
SET search_path to multi_mx_function_call_delegation, public;
SET client_min_messages TO DEBUG1;
SET citus.replication_model = 'streaming';
--
-- Test non-const parameter values