mirror of https://github.com/citusdata/citus.git
Merge pull request #3129 from citusdata/fix_distributed_function
Disallow distributed function creation when replication_model is 'statement'pull/3134/head^2
commit
ed7c55a7af
|
@ -342,6 +342,8 @@ GetFunctionColocationId(Oid functionOid, char *colocateWithTableName,
|
||||||
|
|
||||||
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0)
|
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0)
|
||||||
{
|
{
|
||||||
|
Oid colocatedTableId = InvalidOid;
|
||||||
|
|
||||||
/* check for default colocation group */
|
/* check for default colocation group */
|
||||||
colocationId = ColocationId(ShardCount, ShardReplicationFactor,
|
colocationId = ColocationId(ShardCount, ShardReplicationFactor,
|
||||||
distributionArgumentOid);
|
distributionArgumentOid);
|
||||||
|
@ -356,6 +358,22 @@ GetFunctionColocationId(Oid functionOid, char *colocateWithTableName,
|
||||||
errhint("Provide a distributed table via \"colocate_with\" "
|
errhint("Provide a distributed table via \"colocate_with\" "
|
||||||
"option to create_distributed_function()")));
|
"option to create_distributed_function()")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
colocatedTableId = ColocatedTableId(colocationId);
|
||||||
|
if (colocatedTableId != InvalidOid)
|
||||||
|
{
|
||||||
|
EnsureFunctionCanBeColocatedWithTable(functionOid, distributionArgumentOid,
|
||||||
|
colocatedTableId);
|
||||||
|
}
|
||||||
|
else if (ReplicationModel == REPLICATION_MODEL_COORDINATOR)
|
||||||
|
{
|
||||||
|
/* streaming replication model is required for metadata syncing */
|
||||||
|
ereport(ERROR, (errmsg("cannot create a function with a distribution "
|
||||||
|
"argument when citus.replication_model is "
|
||||||
|
"'statement'"),
|
||||||
|
errhint("Set citus.replication_model to 'streaming' "
|
||||||
|
"before creating distributed tables")));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -412,7 +430,7 @@ EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnTyp
|
||||||
"with distributed tables that are created using "
|
"with distributed tables that are created using "
|
||||||
"streaming replication model."),
|
"streaming replication model."),
|
||||||
errhint("When distributing tables make sure that "
|
errhint("When distributing tables make sure that "
|
||||||
"\"citus.replication_model\" is set to \"streaming\"")));
|
"citus.replication_model = 'streaming'")));
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -55,6 +55,9 @@ CREATE TYPE dup_result AS (f1 int, f2 text);
|
||||||
CREATE FUNCTION dup(int) RETURNS dup_result
|
CREATE FUNCTION dup(int) RETURNS dup_result
|
||||||
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
|
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
|
||||||
LANGUAGE SQL;
|
LANGUAGE SQL;
|
||||||
|
CREATE FUNCTION increment(int2) RETURNS int
|
||||||
|
AS $$ SELECT $1 + 1$$
|
||||||
|
LANGUAGE SQL;
|
||||||
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
|
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
|
||||||
AS 'select $1 + $2;'
|
AS 'select $1 + $2;'
|
||||||
LANGUAGE SQL
|
LANGUAGE SQL
|
||||||
|
@ -73,9 +76,21 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
|
||||||
-- make sure to propagate ddl propagation after we have setup our functions, this will
|
-- make sure to propagate ddl propagation after we have setup our functions, this will
|
||||||
-- allow alter statements to be propagated and keep the functions in sync across machines
|
-- allow alter statements to be propagated and keep the functions in sync across machines
|
||||||
SET citus.enable_ddl_propagation TO on;
|
SET citus.enable_ddl_propagation TO on;
|
||||||
-- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists.
|
-- use an unusual type to force a new colocation group
|
||||||
CREATE TABLE colocation_table(id int);
|
CREATE TABLE statement_table(id int2);
|
||||||
SELECT create_distributed_table('colocation_table','id');
|
SET citus.replication_model TO 'statement';
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT create_distributed_table('statement_table','id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- create a table uses streaming-based replication (can be synced)
|
||||||
|
CREATE TABLE streaming_table(id int);
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT create_distributed_table('streaming_table','id');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
--------------------------
|
||||||
|
|
||||||
|
@ -120,7 +135,24 @@ select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'pr
|
||||||
f
|
f
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT create_distributed_function('dup(int)', '$1');
|
-- try to co-locate with a table that uses statement-based replication
|
||||||
|
SELECT create_distributed_function('increment(int2)', '$1');
|
||||||
|
ERROR: cannot colocate function "increment" and table "statement_table"
|
||||||
|
DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model.
|
||||||
|
HINT: When distributing tables make sure that citus.replication_model = 'streaming'
|
||||||
|
SELECT create_distributed_function('increment(int2)', '$1', colocate_with := 'statement_table');
|
||||||
|
ERROR: cannot colocate function "increment" and table "statement_table"
|
||||||
|
DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model.
|
||||||
|
HINT: When distributing tables make sure that citus.replication_model = 'streaming'
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.replication_model TO 'statement';
|
||||||
|
DROP TABLE statement_table;
|
||||||
|
SELECT create_distributed_function('increment(int2)', '$1');
|
||||||
|
ERROR: cannot create a function with a distribution argument when citus.replication_model is 'statement'
|
||||||
|
HINT: Set citus.replication_model to 'streaming' before creating distributed tables
|
||||||
|
END;
|
||||||
|
-- try to co-locate with a table that uses streaming replication
|
||||||
|
SELECT create_distributed_function('dup(int)', '$1', colocate_with := 'streaming_table');
|
||||||
create_distributed_function
|
create_distributed_function
|
||||||
-----------------------------
|
-----------------------------
|
||||||
|
|
||||||
|
@ -133,7 +165,7 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY
|
||||||
localhost | 57638 | t | (42,"42 is text")
|
localhost | 57638 | t | (42,"42 is text")
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
SELECT create_distributed_function('add(int,int)', '$1');
|
SELECT create_distributed_function('add(int,int)', '$1', colocate_with := 'streaming_table');
|
||||||
create_distributed_function
|
create_distributed_function
|
||||||
-----------------------------
|
-----------------------------
|
||||||
|
|
||||||
|
@ -455,7 +487,7 @@ SELECT create_distributed_table('replicated_table_func_test', 'a');
|
||||||
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test');
|
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test');
|
||||||
ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test"
|
ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test"
|
||||||
DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model.
|
DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model.
|
||||||
HINT: When distributing tables make sure that "citus.replication_model" is set to "streaming"
|
HINT: When distributing tables make sure that citus.replication_model = 'streaming'
|
||||||
-- a function can be colocated with a different distribution argument type
|
-- a function can be colocated with a different distribution argument type
|
||||||
-- as long as there is a coercion path
|
-- as long as there is a coercion path
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
@ -607,9 +639,9 @@ SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition
|
||||||
|
|
||||||
(3 rows)
|
(3 rows)
|
||||||
|
|
||||||
TRUNCATE pg_dist_node;
|
|
||||||
DROP SCHEMA function_tests CASCADE;
|
DROP SCHEMA function_tests CASCADE;
|
||||||
DROP SCHEMA function_tests2 CASCADE;
|
DROP SCHEMA function_tests2 CASCADE;
|
||||||
|
TRUNCATE pg_dist_node;
|
||||||
\c - - - :worker_2_port
|
\c - - - :worker_2_port
|
||||||
SET client_min_messages TO error; -- suppress cascading objects dropping
|
SET client_min_messages TO error; -- suppress cascading objects dropping
|
||||||
UPDATE pg_dist_local_group SET groupid = 0;
|
UPDATE pg_dist_local_group SET groupid = 0;
|
||||||
|
@ -621,9 +653,9 @@ SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition
|
||||||
|
|
||||||
(3 rows)
|
(3 rows)
|
||||||
|
|
||||||
TRUNCATE pg_dist_node;
|
|
||||||
DROP SCHEMA function_tests CASCADE;
|
DROP SCHEMA function_tests CASCADE;
|
||||||
DROP SCHEMA function_tests2 CASCADE;
|
DROP SCHEMA function_tests2 CASCADE;
|
||||||
|
TRUNCATE pg_dist_node;
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
DROP USER functionuser;
|
DROP USER functionuser;
|
||||||
SELECT run_command_on_workers($$DROP USER functionuser;$$);
|
SELECT run_command_on_workers($$DROP USER functionuser;$$);
|
||||||
|
|
|
@ -22,13 +22,15 @@ END;
|
||||||
$proc$;
|
$proc$;
|
||||||
-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists.
|
-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists.
|
||||||
CREATE TABLE colocation_table(id text);
|
CREATE TABLE colocation_table(id text);
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT create_distributed_table('colocation_table','id');
|
SELECT create_distributed_table('colocation_table','id');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
--------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT create_distributed_function('raise_info(text)', '$1');
|
SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'colocation_table');
|
||||||
create_distributed_function
|
create_distributed_function
|
||||||
-----------------------------
|
-----------------------------
|
||||||
|
|
||||||
|
|
|
@ -68,6 +68,7 @@ step s2-public-schema:
|
||||||
step s2-create-table:
|
step s2-create-table:
|
||||||
CREATE TABLE t1 (a int, b int);
|
CREATE TABLE t1 (a int, b int);
|
||||||
-- session needs to have replication factor set to 1, can't do in setup
|
-- session needs to have replication factor set to 1, can't do in setup
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT create_distributed_table('t1', 'a');
|
SELECT create_distributed_table('t1', 'a');
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
|
@ -193,6 +194,7 @@ step s2-public-schema:
|
||||||
step s2-create-table:
|
step s2-create-table:
|
||||||
CREATE TABLE t1 (a int, b int);
|
CREATE TABLE t1 (a int, b int);
|
||||||
-- session needs to have replication factor set to 1, can't do in setup
|
-- session needs to have replication factor set to 1, can't do in setup
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT create_distributed_table('t1', 'a');
|
SELECT create_distributed_table('t1', 'a');
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
|
@ -315,6 +317,7 @@ step s2-public-schema:
|
||||||
step s2-create-table:
|
step s2-create-table:
|
||||||
CREATE TABLE t1 (a int, b int);
|
CREATE TABLE t1 (a int, b int);
|
||||||
-- session needs to have replication factor set to 1, can't do in setup
|
-- session needs to have replication factor set to 1, can't do in setup
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT create_distributed_table('t1', 'a');
|
SELECT create_distributed_table('t1', 'a');
|
||||||
|
|
||||||
|
@ -447,6 +450,7 @@ step s2-create-schema:
|
||||||
step s2-create-table:
|
step s2-create-table:
|
||||||
CREATE TABLE t1 (a int, b int);
|
CREATE TABLE t1 (a int, b int);
|
||||||
-- session needs to have replication factor set to 1, can't do in setup
|
-- session needs to have replication factor set to 1, can't do in setup
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT create_distributed_table('t1', 'a');
|
SELECT create_distributed_table('t1', 'a');
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
|
@ -574,6 +578,7 @@ step s2-create-schema:
|
||||||
step s2-create-table:
|
step s2-create-table:
|
||||||
CREATE TABLE t1 (a int, b int);
|
CREATE TABLE t1 (a int, b int);
|
||||||
-- session needs to have replication factor set to 1, can't do in setup
|
-- session needs to have replication factor set to 1, can't do in setup
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT create_distributed_table('t1', 'a');
|
SELECT create_distributed_table('t1', 'a');
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
|
@ -698,6 +703,7 @@ step s2-create-schema:
|
||||||
step s2-create-table:
|
step s2-create-table:
|
||||||
CREATE TABLE t1 (a int, b int);
|
CREATE TABLE t1 (a int, b int);
|
||||||
-- session needs to have replication factor set to 1, can't do in setup
|
-- session needs to have replication factor set to 1, can't do in setup
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT create_distributed_table('t1', 'a');
|
SELECT create_distributed_table('t1', 'a');
|
||||||
|
|
||||||
|
@ -837,6 +843,7 @@ step s1-add-worker:
|
||||||
step s2-create-table:
|
step s2-create-table:
|
||||||
CREATE TABLE t1 (a int, b int);
|
CREATE TABLE t1 (a int, b int);
|
||||||
-- session needs to have replication factor set to 1, can't do in setup
|
-- session needs to have replication factor set to 1, can't do in setup
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT create_distributed_table('t1', 'a');
|
SELECT create_distributed_table('t1', 'a');
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
|
@ -986,6 +993,7 @@ step s3-use-schema:
|
||||||
step s2-create-table:
|
step s2-create-table:
|
||||||
CREATE TABLE t1 (a int, b int);
|
CREATE TABLE t1 (a int, b int);
|
||||||
-- session needs to have replication factor set to 1, can't do in setup
|
-- session needs to have replication factor set to 1, can't do in setup
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT create_distributed_table('t1', 'a');
|
SELECT create_distributed_table('t1', 'a');
|
||||||
|
|
||||||
|
@ -1132,6 +1140,7 @@ step s3-create-schema2:
|
||||||
step s2-create-table:
|
step s2-create-table:
|
||||||
CREATE TABLE t1 (a int, b int);
|
CREATE TABLE t1 (a int, b int);
|
||||||
-- session needs to have replication factor set to 1, can't do in setup
|
-- session needs to have replication factor set to 1, can't do in setup
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT create_distributed_table('t1', 'a');
|
SELECT create_distributed_table('t1', 'a');
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
|
@ -1506,6 +1515,7 @@ step s2-create-type:
|
||||||
step s2-create-table-with-type:
|
step s2-create-table-with-type:
|
||||||
CREATE TABLE t1 (a int, b tt1);
|
CREATE TABLE t1 (a int, b tt1);
|
||||||
-- session needs to have replication factor set to 1, can't do in setup
|
-- session needs to have replication factor set to 1, can't do in setup
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT create_distributed_table('t1', 'a');
|
SELECT create_distributed_table('t1', 'a');
|
||||||
|
|
||||||
|
|
|
@ -120,6 +120,8 @@ GRANT USAGE ON SCHEMA full_access_user_schema TO full_access;
|
||||||
GRANT ALL ON SCHEMA full_access_user_schema TO full_access;
|
GRANT ALL ON SCHEMA full_access_user_schema TO full_access;
|
||||||
GRANT USAGE ON SCHEMA full_access_user_schema TO usage_access;
|
GRANT USAGE ON SCHEMA full_access_user_schema TO usage_access;
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
-- create prepare tests
|
-- create prepare tests
|
||||||
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
|
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
|
||||||
PREPARE prepare_select AS SELECT count(*) FROM test;
|
PREPARE prepare_select AS SELECT count(*) FROM test;
|
||||||
|
@ -431,7 +433,7 @@ INSERT INTO full_access_user_schema.t1 VALUES (1),(2),(3);
|
||||||
-- not allowed to create a table
|
-- not allowed to create a table
|
||||||
SELECT create_distributed_table('full_access_user_schema.t1', 'id');
|
SELECT create_distributed_table('full_access_user_schema.t1', 'id');
|
||||||
ERROR: permission denied for schema full_access_user_schema
|
ERROR: permission denied for schema full_access_user_schema
|
||||||
CONTEXT: while executing command on localhost:57638
|
CONTEXT: while executing command on localhost:57637
|
||||||
RESET ROLE;
|
RESET ROLE;
|
||||||
SET ROLE usage_access;
|
SET ROLE usage_access;
|
||||||
CREATE TYPE usage_access_type AS ENUM ('a', 'b');
|
CREATE TYPE usage_access_type AS ENUM ('a', 'b');
|
||||||
|
@ -479,12 +481,19 @@ SELECT wait_until_metadata_sync();
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE colocation_table(id text);
|
||||||
|
SELECT create_distributed_table('colocation_table','id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- now, make sure that the user can use the function
|
-- now, make sure that the user can use the function
|
||||||
-- created in the transaction
|
-- created in the transaction
|
||||||
BEGIN;
|
BEGIN;
|
||||||
CREATE FUNCTION usage_access_func_second(key int, variadic v int[]) RETURNS text
|
CREATE FUNCTION usage_access_func_second(key int, variadic v int[]) RETURNS text
|
||||||
LANGUAGE plpgsql AS 'begin return current_user; end;';
|
LANGUAGE plpgsql AS 'begin return current_user; end;';
|
||||||
SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1');
|
SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1', colocate_with := 'colocation_table');
|
||||||
create_distributed_function
|
create_distributed_function
|
||||||
-----------------------------
|
-----------------------------
|
||||||
|
|
||||||
|
@ -509,7 +518,7 @@ SELECT usesuper FROM pg_user where usename IN (SELECT current_user);
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- superuser creates the distributed function that is owned by a regular user
|
-- superuser creates the distributed function that is owned by a regular user
|
||||||
SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1');
|
SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1', colocate_with := 'colocation_table');
|
||||||
create_distributed_function
|
create_distributed_function
|
||||||
-----------------------------
|
-----------------------------
|
||||||
|
|
||||||
|
@ -675,7 +684,7 @@ ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_0000
|
||||||
-- different user should not be able to fetch partition file
|
-- different user should not be able to fetch partition file
|
||||||
SET ROLE usage_access;
|
SET ROLE usage_access;
|
||||||
SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
|
SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
|
||||||
WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.37455": No such file or directory
|
WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.37457": No such file or directory
|
||||||
CONTEXT: while executing command on localhost:57637
|
CONTEXT: while executing command on localhost:57637
|
||||||
ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_00001" from localhost:57637
|
ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_00001" from localhost:57637
|
||||||
-- only the user whom created the files should be able to fetch
|
-- only the user whom created the files should be able to fetch
|
||||||
|
@ -714,7 +723,7 @@ RESET ROLE;
|
||||||
-- test that the super user is unable to read the contents of the intermediate file,
|
-- test that the super user is unable to read the contents of the intermediate file,
|
||||||
-- although it does create the table
|
-- although it does create the table
|
||||||
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
|
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
|
||||||
WARNING: Task file "task_000001.36164" does not have expected suffix ".10"
|
WARNING: Task file "task_000001.36145" does not have expected suffix ".10"
|
||||||
worker_merge_files_into_table
|
worker_merge_files_into_table
|
||||||
-------------------------------
|
-------------------------------
|
||||||
|
|
||||||
|
@ -756,7 +765,7 @@ SELECT worker_merge_files_and_run_query(42, 1,
|
||||||
'CREATE TABLE task_000001_merge(merge_column_0 int)',
|
'CREATE TABLE task_000001_merge(merge_column_0 int)',
|
||||||
'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge'
|
'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge'
|
||||||
);
|
);
|
||||||
WARNING: Task file "task_000001.36164" does not have expected suffix ".10"
|
WARNING: Task file "task_000001.36145" does not have expected suffix ".10"
|
||||||
worker_merge_files_and_run_query
|
worker_merge_files_and_run_query
|
||||||
----------------------------------
|
----------------------------------
|
||||||
|
|
||||||
|
@ -826,7 +835,8 @@ DROP TABLE
|
||||||
my_role_table_with_data,
|
my_role_table_with_data,
|
||||||
singleshard,
|
singleshard,
|
||||||
test,
|
test,
|
||||||
test_coloc;
|
test_coloc,
|
||||||
|
colocation_table;
|
||||||
DROP USER full_access;
|
DROP USER full_access;
|
||||||
DROP USER read_access;
|
DROP USER read_access;
|
||||||
DROP USER no_access;
|
DROP USER no_access;
|
||||||
|
|
|
@ -537,6 +537,7 @@ select start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
SET search_path to multi_mx_function_call_delegation, public;
|
SET search_path to multi_mx_function_call_delegation, public;
|
||||||
SET client_min_messages TO DEBUG1;
|
SET client_min_messages TO DEBUG1;
|
||||||
|
SET citus.replication_model = 'streaming';
|
||||||
--
|
--
|
||||||
-- Test non-const parameter values
|
-- Test non-const parameter values
|
||||||
--
|
--
|
||||||
|
|
|
@ -91,6 +91,7 @@ step "s2-create-table"
|
||||||
{
|
{
|
||||||
CREATE TABLE t1 (a int, b int);
|
CREATE TABLE t1 (a int, b int);
|
||||||
-- session needs to have replication factor set to 1, can't do in setup
|
-- session needs to have replication factor set to 1, can't do in setup
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT create_distributed_table('t1', 'a');
|
SELECT create_distributed_table('t1', 'a');
|
||||||
}
|
}
|
||||||
|
@ -104,6 +105,7 @@ step "s2-create-table-with-type"
|
||||||
{
|
{
|
||||||
CREATE TABLE t1 (a int, b tt1);
|
CREATE TABLE t1 (a int, b tt1);
|
||||||
-- session needs to have replication factor set to 1, can't do in setup
|
-- session needs to have replication factor set to 1, can't do in setup
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT create_distributed_table('t1', 'a');
|
SELECT create_distributed_table('t1', 'a');
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,10 @@ CREATE FUNCTION dup(int) RETURNS dup_result
|
||||||
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
|
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
|
||||||
LANGUAGE SQL;
|
LANGUAGE SQL;
|
||||||
|
|
||||||
|
CREATE FUNCTION increment(int2) RETURNS int
|
||||||
|
AS $$ SELECT $1 + 1$$
|
||||||
|
LANGUAGE SQL;
|
||||||
|
|
||||||
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
|
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
|
||||||
AS 'select $1 + $2;'
|
AS 'select $1 + $2;'
|
||||||
LANGUAGE SQL
|
LANGUAGE SQL
|
||||||
|
@ -77,9 +81,17 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
|
||||||
-- allow alter statements to be propagated and keep the functions in sync across machines
|
-- allow alter statements to be propagated and keep the functions in sync across machines
|
||||||
SET citus.enable_ddl_propagation TO on;
|
SET citus.enable_ddl_propagation TO on;
|
||||||
|
|
||||||
-- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists.
|
-- use an unusual type to force a new colocation group
|
||||||
CREATE TABLE colocation_table(id int);
|
CREATE TABLE statement_table(id int2);
|
||||||
SELECT create_distributed_table('colocation_table','id');
|
SET citus.replication_model TO 'statement';
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT create_distributed_table('statement_table','id');
|
||||||
|
|
||||||
|
-- create a table uses streaming-based replication (can be synced)
|
||||||
|
CREATE TABLE streaming_table(id int);
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT create_distributed_table('streaming_table','id');
|
||||||
|
|
||||||
-- make sure that none of the active and primary nodes hasmetadata
|
-- make sure that none of the active and primary nodes hasmetadata
|
||||||
-- at the start of the test
|
-- at the start of the test
|
||||||
|
@ -98,10 +110,20 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_name
|
||||||
-- since the function doesn't have a parameter
|
-- since the function doesn't have a parameter
|
||||||
select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
|
select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
|
||||||
|
|
||||||
SELECT create_distributed_function('dup(int)', '$1');
|
-- try to co-locate with a table that uses statement-based replication
|
||||||
|
SELECT create_distributed_function('increment(int2)', '$1');
|
||||||
|
SELECT create_distributed_function('increment(int2)', '$1', colocate_with := 'statement_table');
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.replication_model TO 'statement';
|
||||||
|
DROP TABLE statement_table;
|
||||||
|
SELECT create_distributed_function('increment(int2)', '$1');
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- try to co-locate with a table that uses streaming replication
|
||||||
|
SELECT create_distributed_function('dup(int)', '$1', colocate_with := 'streaming_table');
|
||||||
SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2;
|
SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2;
|
||||||
|
|
||||||
SELECT create_distributed_function('add(int,int)', '$1');
|
SELECT create_distributed_function('add(int,int)', '$1', colocate_with := 'streaming_table');
|
||||||
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
|
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
|
||||||
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
|
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
|
||||||
|
|
||||||
|
@ -318,17 +340,17 @@ DROP SCHEMA function_tests2 CASCADE;
|
||||||
SET client_min_messages TO error; -- suppress cascading objects dropping
|
SET client_min_messages TO error; -- suppress cascading objects dropping
|
||||||
UPDATE pg_dist_local_group SET groupid = 0;
|
UPDATE pg_dist_local_group SET groupid = 0;
|
||||||
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%';
|
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%';
|
||||||
TRUNCATE pg_dist_node;
|
|
||||||
DROP SCHEMA function_tests CASCADE;
|
DROP SCHEMA function_tests CASCADE;
|
||||||
DROP SCHEMA function_tests2 CASCADE;
|
DROP SCHEMA function_tests2 CASCADE;
|
||||||
|
TRUNCATE pg_dist_node;
|
||||||
|
|
||||||
\c - - - :worker_2_port
|
\c - - - :worker_2_port
|
||||||
SET client_min_messages TO error; -- suppress cascading objects dropping
|
SET client_min_messages TO error; -- suppress cascading objects dropping
|
||||||
UPDATE pg_dist_local_group SET groupid = 0;
|
UPDATE pg_dist_local_group SET groupid = 0;
|
||||||
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%';
|
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%';
|
||||||
TRUNCATE pg_dist_node;
|
|
||||||
DROP SCHEMA function_tests CASCADE;
|
DROP SCHEMA function_tests CASCADE;
|
||||||
DROP SCHEMA function_tests2 CASCADE;
|
DROP SCHEMA function_tests2 CASCADE;
|
||||||
|
TRUNCATE pg_dist_node;
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
|
||||||
|
|
|
@ -19,9 +19,11 @@ $proc$;
|
||||||
|
|
||||||
-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists.
|
-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists.
|
||||||
CREATE TABLE colocation_table(id text);
|
CREATE TABLE colocation_table(id text);
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT create_distributed_table('colocation_table','id');
|
SELECT create_distributed_table('colocation_table','id');
|
||||||
|
|
||||||
SELECT create_distributed_function('raise_info(text)', '$1');
|
SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'colocation_table');
|
||||||
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
|
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
|
||||||
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
|
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
|
||||||
|
|
||||||
|
|
|
@ -92,6 +92,9 @@ GRANT USAGE ON SCHEMA full_access_user_schema TO usage_access;
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
-- create prepare tests
|
-- create prepare tests
|
||||||
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
|
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
|
||||||
PREPARE prepare_select AS SELECT count(*) FROM test;
|
PREPARE prepare_select AS SELECT count(*) FROM test;
|
||||||
|
@ -296,12 +299,15 @@ SELECT run_command_on_workers($$SELECT proowner::regrole FROM pg_proc WHERE pron
|
||||||
|
|
||||||
SELECT wait_until_metadata_sync();
|
SELECT wait_until_metadata_sync();
|
||||||
|
|
||||||
|
CREATE TABLE colocation_table(id text);
|
||||||
|
SELECT create_distributed_table('colocation_table','id');
|
||||||
|
|
||||||
-- now, make sure that the user can use the function
|
-- now, make sure that the user can use the function
|
||||||
-- created in the transaction
|
-- created in the transaction
|
||||||
BEGIN;
|
BEGIN;
|
||||||
CREATE FUNCTION usage_access_func_second(key int, variadic v int[]) RETURNS text
|
CREATE FUNCTION usage_access_func_second(key int, variadic v int[]) RETURNS text
|
||||||
LANGUAGE plpgsql AS 'begin return current_user; end;';
|
LANGUAGE plpgsql AS 'begin return current_user; end;';
|
||||||
SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1');
|
SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1', colocate_with := 'colocation_table');
|
||||||
|
|
||||||
SELECT usage_access_func_second(1, 2,3,4,5) FROM full_access_user_schema.t1 LIMIT 1;
|
SELECT usage_access_func_second(1, 2,3,4,5) FROM full_access_user_schema.t1 LIMIT 1;
|
||||||
|
|
||||||
|
@ -317,7 +323,7 @@ CREATE FUNCTION usage_access_func_third(key int, variadic v int[]) RETURNS text
|
||||||
SELECT usesuper FROM pg_user where usename IN (SELECT current_user);
|
SELECT usesuper FROM pg_user where usename IN (SELECT current_user);
|
||||||
|
|
||||||
-- superuser creates the distributed function that is owned by a regular user
|
-- superuser creates the distributed function that is owned by a regular user
|
||||||
SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1');
|
SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1', colocate_with := 'colocation_table');
|
||||||
|
|
||||||
SELECT proowner::regrole FROM pg_proc WHERE proname = 'usage_access_func_third';
|
SELECT proowner::regrole FROM pg_proc WHERE proname = 'usage_access_func_third';
|
||||||
SELECT run_command_on_workers($$SELECT proowner::regrole FROM pg_proc WHERE proname = 'usage_access_func_third'$$);
|
SELECT run_command_on_workers($$SELECT proowner::regrole FROM pg_proc WHERE proname = 'usage_access_func_third'$$);
|
||||||
|
@ -499,7 +505,8 @@ DROP TABLE
|
||||||
my_role_table_with_data,
|
my_role_table_with_data,
|
||||||
singleshard,
|
singleshard,
|
||||||
test,
|
test,
|
||||||
test_coloc;
|
test_coloc,
|
||||||
|
colocation_table;
|
||||||
DROP USER full_access;
|
DROP USER full_access;
|
||||||
DROP USER read_access;
|
DROP USER read_access;
|
||||||
DROP USER no_access;
|
DROP USER no_access;
|
||||||
|
|
|
@ -225,6 +225,7 @@ select start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
SET search_path to multi_mx_function_call_delegation, public;
|
SET search_path to multi_mx_function_call_delegation, public;
|
||||||
SET client_min_messages TO DEBUG1;
|
SET client_min_messages TO DEBUG1;
|
||||||
|
SET citus.replication_model = 'streaming';
|
||||||
|
|
||||||
--
|
--
|
||||||
-- Test non-const parameter values
|
-- Test non-const parameter values
|
||||||
|
|
Loading…
Reference in New Issue