diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index a02fd2e5d..2ffe30ec2 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -342,6 +342,8 @@ GetFunctionColocationId(Oid functionOid, char *colocateWithTableName, if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0) { + Oid colocatedTableId = InvalidOid; + /* check for default colocation group */ colocationId = ColocationId(ShardCount, ShardReplicationFactor, distributionArgumentOid); @@ -356,6 +358,22 @@ GetFunctionColocationId(Oid functionOid, char *colocateWithTableName, errhint("Provide a distributed table via \"colocate_with\" " "option to create_distributed_function()"))); } + + colocatedTableId = ColocatedTableId(colocationId); + if (colocatedTableId != InvalidOid) + { + EnsureFunctionCanBeColocatedWithTable(functionOid, distributionArgumentOid, + colocatedTableId); + } + else if (ReplicationModel == REPLICATION_MODEL_COORDINATOR) + { + /* streaming replication model is required for metadata syncing */ + ereport(ERROR, (errmsg("cannot create a function with a distribution " + "argument when citus.replication_model is " + "'statement'"), + errhint("Set citus.replication_model to 'streaming' " + "before creating distributed tables"))); + } } else { @@ -412,7 +430,7 @@ EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnTyp "with distributed tables that are created using " "streaming replication model."), errhint("When distributing tables make sure that " - "\"citus.replication_model\" is set to \"streaming\""))); + "citus.replication_model = 'streaming'"))); } /* diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index 7942bf846..cb42734d1 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -55,6 +55,9 @@ CREATE TYPE dup_result AS (f1 int, f2 text); CREATE FUNCTION dup(int) RETURNS dup_result AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ LANGUAGE SQL; +CREATE FUNCTION increment(int2) RETURNS int + AS $$ SELECT $1 + 1$$ + LANGUAGE SQL; CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer AS 'select $1 + $2;' LANGUAGE SQL @@ -73,9 +76,21 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer -- make sure to propagate ddl propagation after we have setup our functions, this will -- allow alter statements to be propagated and keep the functions in sync across machines SET citus.enable_ddl_propagation TO on; --- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists. -CREATE TABLE colocation_table(id int); -SELECT create_distributed_table('colocation_table','id'); +-- use an unusual type to force a new colocation group +CREATE TABLE statement_table(id int2); +SET citus.replication_model TO 'statement'; +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('statement_table','id'); + create_distributed_table +-------------------------- + +(1 row) + +-- create a table uses streaming-based replication (can be synced) +CREATE TABLE streaming_table(id int); +SET citus.replication_model TO 'streaming'; +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('streaming_table','id'); create_distributed_table -------------------------- @@ -120,7 +135,24 @@ select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'pr f (1 row) -SELECT create_distributed_function('dup(int)', '$1'); +-- try to co-locate with a table that uses statement-based replication +SELECT create_distributed_function('increment(int2)', '$1'); +ERROR: cannot colocate function "increment" and table "statement_table" +DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model. +HINT: When distributing tables make sure that citus.replication_model = 'streaming' +SELECT create_distributed_function('increment(int2)', '$1', colocate_with := 'statement_table'); +ERROR: cannot colocate function "increment" and table "statement_table" +DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model. +HINT: When distributing tables make sure that citus.replication_model = 'streaming' +BEGIN; +SET LOCAL citus.replication_model TO 'statement'; +DROP TABLE statement_table; +SELECT create_distributed_function('increment(int2)', '$1'); +ERROR: cannot create a function with a distribution argument when citus.replication_model is 'statement' +HINT: Set citus.replication_model to 'streaming' before creating distributed tables +END; +-- try to co-locate with a table that uses streaming replication +SELECT create_distributed_function('dup(int)', '$1', colocate_with := 'streaming_table'); create_distributed_function ----------------------------- @@ -133,7 +165,7 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY localhost | 57638 | t | (42,"42 is text") (2 rows) -SELECT create_distributed_function('add(int,int)', '$1'); +SELECT create_distributed_function('add(int,int)', '$1', colocate_with := 'streaming_table'); create_distributed_function ----------------------------- @@ -455,7 +487,7 @@ SELECT create_distributed_table('replicated_table_func_test', 'a'); SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test'); ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test" DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model. -HINT: When distributing tables make sure that "citus.replication_model" is set to "streaming" +HINT: When distributing tables make sure that citus.replication_model = 'streaming' -- a function can be colocated with a different distribution argument type -- as long as there is a coercion path SET citus.shard_replication_factor TO 1; @@ -607,9 +639,9 @@ SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition (3 rows) -TRUNCATE pg_dist_node; DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests2 CASCADE; +TRUNCATE pg_dist_node; \c - - - :worker_2_port SET client_min_messages TO error; -- suppress cascading objects dropping UPDATE pg_dist_local_group SET groupid = 0; @@ -621,9 +653,9 @@ SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition (3 rows) -TRUNCATE pg_dist_node; DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests2 CASCADE; +TRUNCATE pg_dist_node; \c - - - :master_port DROP USER functionuser; SELECT run_command_on_workers($$DROP USER functionuser;$$); diff --git a/src/test/regress/expected/distributed_procedure.out b/src/test/regress/expected/distributed_procedure.out index fb1fceb37..cf438ed5d 100644 --- a/src/test/regress/expected/distributed_procedure.out +++ b/src/test/regress/expected/distributed_procedure.out @@ -22,13 +22,15 @@ END; $proc$; -- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists. CREATE TABLE colocation_table(id text); +SET citus.replication_model TO 'streaming'; +SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('colocation_table','id'); create_distributed_table -------------------------- (1 row) -SELECT create_distributed_function('raise_info(text)', '$1'); +SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'colocation_table'); create_distributed_function ----------------------------- diff --git a/src/test/regress/expected/isolation_ensure_dependency_activate_node.out b/src/test/regress/expected/isolation_ensure_dependency_activate_node.out index a1f4af81e..6aa249981 100644 --- a/src/test/regress/expected/isolation_ensure_dependency_activate_node.out +++ b/src/test/regress/expected/isolation_ensure_dependency_activate_node.out @@ -68,6 +68,7 @@ step s2-public-schema: step s2-create-table: CREATE TABLE t1 (a int, b int); -- session needs to have replication factor set to 1, can't do in setup + SET citus.replication_model TO 'streaming'; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('t1', 'a'); @@ -193,6 +194,7 @@ step s2-public-schema: step s2-create-table: CREATE TABLE t1 (a int, b int); -- session needs to have replication factor set to 1, can't do in setup + SET citus.replication_model TO 'streaming'; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('t1', 'a'); @@ -315,6 +317,7 @@ step s2-public-schema: step s2-create-table: CREATE TABLE t1 (a int, b int); -- session needs to have replication factor set to 1, can't do in setup + SET citus.replication_model TO 'streaming'; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('t1', 'a'); @@ -447,6 +450,7 @@ step s2-create-schema: step s2-create-table: CREATE TABLE t1 (a int, b int); -- session needs to have replication factor set to 1, can't do in setup + SET citus.replication_model TO 'streaming'; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('t1', 'a'); @@ -574,6 +578,7 @@ step s2-create-schema: step s2-create-table: CREATE TABLE t1 (a int, b int); -- session needs to have replication factor set to 1, can't do in setup + SET citus.replication_model TO 'streaming'; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('t1', 'a'); @@ -698,6 +703,7 @@ step s2-create-schema: step s2-create-table: CREATE TABLE t1 (a int, b int); -- session needs to have replication factor set to 1, can't do in setup + SET citus.replication_model TO 'streaming'; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('t1', 'a'); @@ -837,6 +843,7 @@ step s1-add-worker: step s2-create-table: CREATE TABLE t1 (a int, b int); -- session needs to have replication factor set to 1, can't do in setup + SET citus.replication_model TO 'streaming'; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('t1', 'a'); @@ -986,6 +993,7 @@ step s3-use-schema: step s2-create-table: CREATE TABLE t1 (a int, b int); -- session needs to have replication factor set to 1, can't do in setup + SET citus.replication_model TO 'streaming'; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('t1', 'a'); @@ -1132,6 +1140,7 @@ step s3-create-schema2: step s2-create-table: CREATE TABLE t1 (a int, b int); -- session needs to have replication factor set to 1, can't do in setup + SET citus.replication_model TO 'streaming'; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('t1', 'a'); @@ -1506,6 +1515,7 @@ step s2-create-type: step s2-create-table-with-type: CREATE TABLE t1 (a int, b tt1); -- session needs to have replication factor set to 1, can't do in setup + SET citus.replication_model TO 'streaming'; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('t1', 'a'); diff --git a/src/test/regress/expected/multi_multiuser.out b/src/test/regress/expected/multi_multiuser.out index 714822198..e16915c73 100644 --- a/src/test/regress/expected/multi_multiuser.out +++ b/src/test/regress/expected/multi_multiuser.out @@ -120,6 +120,8 @@ GRANT USAGE ON SCHEMA full_access_user_schema TO full_access; GRANT ALL ON SCHEMA full_access_user_schema TO full_access; GRANT USAGE ON SCHEMA full_access_user_schema TO usage_access; \c - - - :master_port +SET citus.replication_model TO 'streaming'; +SET citus.shard_replication_factor TO 1; -- create prepare tests PREPARE prepare_insert AS INSERT INTO test VALUES ($1); PREPARE prepare_select AS SELECT count(*) FROM test; @@ -431,7 +433,7 @@ INSERT INTO full_access_user_schema.t1 VALUES (1),(2),(3); -- not allowed to create a table SELECT create_distributed_table('full_access_user_schema.t1', 'id'); ERROR: permission denied for schema full_access_user_schema -CONTEXT: while executing command on localhost:57638 +CONTEXT: while executing command on localhost:57637 RESET ROLE; SET ROLE usage_access; CREATE TYPE usage_access_type AS ENUM ('a', 'b'); @@ -479,12 +481,19 @@ SELECT wait_until_metadata_sync(); (1 row) +CREATE TABLE colocation_table(id text); +SELECT create_distributed_table('colocation_table','id'); + create_distributed_table +-------------------------- + +(1 row) + -- now, make sure that the user can use the function -- created in the transaction BEGIN; CREATE FUNCTION usage_access_func_second(key int, variadic v int[]) RETURNS text LANGUAGE plpgsql AS 'begin return current_user; end;'; -SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1'); +SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1', colocate_with := 'colocation_table'); create_distributed_function ----------------------------- @@ -509,7 +518,7 @@ SELECT usesuper FROM pg_user where usename IN (SELECT current_user); (1 row) -- superuser creates the distributed function that is owned by a regular user -SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1'); +SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1', colocate_with := 'colocation_table'); create_distributed_function ----------------------------- @@ -675,7 +684,7 @@ ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_0000 -- different user should not be able to fetch partition file SET ROLE usage_access; SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port); -WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.37455": No such file or directory +WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.37457": No such file or directory CONTEXT: while executing command on localhost:57637 ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_00001" from localhost:57637 -- only the user whom created the files should be able to fetch @@ -714,7 +723,7 @@ RESET ROLE; -- test that the super user is unable to read the contents of the intermediate file, -- although it does create the table SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']); -WARNING: Task file "task_000001.36164" does not have expected suffix ".10" +WARNING: Task file "task_000001.36145" does not have expected suffix ".10" worker_merge_files_into_table ------------------------------- @@ -756,7 +765,7 @@ SELECT worker_merge_files_and_run_query(42, 1, 'CREATE TABLE task_000001_merge(merge_column_0 int)', 'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge' ); -WARNING: Task file "task_000001.36164" does not have expected suffix ".10" +WARNING: Task file "task_000001.36145" does not have expected suffix ".10" worker_merge_files_and_run_query ---------------------------------- @@ -826,7 +835,8 @@ DROP TABLE my_role_table_with_data, singleshard, test, - test_coloc; + test_coloc, + colocation_table; DROP USER full_access; DROP USER read_access; DROP USER no_access; diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index 99d322d95..2f4f45b69 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -537,6 +537,7 @@ select start_metadata_sync_to_node('localhost', :worker_2_port); \c - - - :master_port SET search_path to multi_mx_function_call_delegation, public; SET client_min_messages TO DEBUG1; +SET citus.replication_model = 'streaming'; -- -- Test non-const parameter values -- diff --git a/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec b/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec index 9686d007d..7bb55ae0e 100644 --- a/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec +++ b/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec @@ -91,6 +91,7 @@ step "s2-create-table" { CREATE TABLE t1 (a int, b int); -- session needs to have replication factor set to 1, can't do in setup + SET citus.replication_model TO 'streaming'; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('t1', 'a'); } @@ -104,6 +105,7 @@ step "s2-create-table-with-type" { CREATE TABLE t1 (a int, b tt1); -- session needs to have replication factor set to 1, can't do in setup + SET citus.replication_model TO 'streaming'; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('t1', 'a'); } diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index 65b592b3b..2801aa75e 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -55,6 +55,10 @@ CREATE FUNCTION dup(int) RETURNS dup_result AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ LANGUAGE SQL; +CREATE FUNCTION increment(int2) RETURNS int + AS $$ SELECT $1 + 1$$ + LANGUAGE SQL; + CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer AS 'select $1 + $2;' LANGUAGE SQL @@ -77,9 +81,17 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer -- allow alter statements to be propagated and keep the functions in sync across machines SET citus.enable_ddl_propagation TO on; --- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists. -CREATE TABLE colocation_table(id int); -SELECT create_distributed_table('colocation_table','id'); +-- use an unusual type to force a new colocation group +CREATE TABLE statement_table(id int2); +SET citus.replication_model TO 'statement'; +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('statement_table','id'); + +-- create a table uses streaming-based replication (can be synced) +CREATE TABLE streaming_table(id int); +SET citus.replication_model TO 'streaming'; +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('streaming_table','id'); -- make sure that none of the active and primary nodes hasmetadata -- at the start of the test @@ -98,10 +110,20 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_name -- since the function doesn't have a parameter select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; -SELECT create_distributed_function('dup(int)', '$1'); +-- try to co-locate with a table that uses statement-based replication +SELECT create_distributed_function('increment(int2)', '$1'); +SELECT create_distributed_function('increment(int2)', '$1', colocate_with := 'statement_table'); +BEGIN; +SET LOCAL citus.replication_model TO 'statement'; +DROP TABLE statement_table; +SELECT create_distributed_function('increment(int2)', '$1'); +END; + +-- try to co-locate with a table that uses streaming replication +SELECT create_distributed_function('dup(int)', '$1', colocate_with := 'streaming_table'); SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2; -SELECT create_distributed_function('add(int,int)', '$1'); +SELECT create_distributed_function('add(int,int)', '$1', colocate_with := 'streaming_table'); SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); @@ -318,17 +340,17 @@ DROP SCHEMA function_tests2 CASCADE; SET client_min_messages TO error; -- suppress cascading objects dropping UPDATE pg_dist_local_group SET groupid = 0; SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; -TRUNCATE pg_dist_node; DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests2 CASCADE; +TRUNCATE pg_dist_node; \c - - - :worker_2_port SET client_min_messages TO error; -- suppress cascading objects dropping UPDATE pg_dist_local_group SET groupid = 0; SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; -TRUNCATE pg_dist_node; DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests2 CASCADE; +TRUNCATE pg_dist_node; \c - - - :master_port diff --git a/src/test/regress/sql/distributed_procedure.sql b/src/test/regress/sql/distributed_procedure.sql index 51d52b533..3ea35c7b9 100644 --- a/src/test/regress/sql/distributed_procedure.sql +++ b/src/test/regress/sql/distributed_procedure.sql @@ -19,9 +19,11 @@ $proc$; -- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists. CREATE TABLE colocation_table(id text); +SET citus.replication_model TO 'streaming'; +SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('colocation_table','id'); -SELECT create_distributed_function('raise_info(text)', '$1'); +SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'colocation_table'); SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); diff --git a/src/test/regress/sql/multi_multiuser.sql b/src/test/regress/sql/multi_multiuser.sql index 4338892f7..010b0bda0 100644 --- a/src/test/regress/sql/multi_multiuser.sql +++ b/src/test/regress/sql/multi_multiuser.sql @@ -92,6 +92,9 @@ GRANT USAGE ON SCHEMA full_access_user_schema TO usage_access; \c - - - :master_port +SET citus.replication_model TO 'streaming'; +SET citus.shard_replication_factor TO 1; + -- create prepare tests PREPARE prepare_insert AS INSERT INTO test VALUES ($1); PREPARE prepare_select AS SELECT count(*) FROM test; @@ -296,12 +299,15 @@ SELECT run_command_on_workers($$SELECT proowner::regrole FROM pg_proc WHERE pron SELECT wait_until_metadata_sync(); +CREATE TABLE colocation_table(id text); +SELECT create_distributed_table('colocation_table','id'); + -- now, make sure that the user can use the function -- created in the transaction BEGIN; CREATE FUNCTION usage_access_func_second(key int, variadic v int[]) RETURNS text LANGUAGE plpgsql AS 'begin return current_user; end;'; -SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1'); +SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1', colocate_with := 'colocation_table'); SELECT usage_access_func_second(1, 2,3,4,5) FROM full_access_user_schema.t1 LIMIT 1; @@ -317,7 +323,7 @@ CREATE FUNCTION usage_access_func_third(key int, variadic v int[]) RETURNS text SELECT usesuper FROM pg_user where usename IN (SELECT current_user); -- superuser creates the distributed function that is owned by a regular user -SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1'); +SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1', colocate_with := 'colocation_table'); SELECT proowner::regrole FROM pg_proc WHERE proname = 'usage_access_func_third'; SELECT run_command_on_workers($$SELECT proowner::regrole FROM pg_proc WHERE proname = 'usage_access_func_third'$$); @@ -499,7 +505,8 @@ DROP TABLE my_role_table_with_data, singleshard, test, - test_coloc; + test_coloc, + colocation_table; DROP USER full_access; DROP USER read_access; DROP USER no_access; diff --git a/src/test/regress/sql/multi_mx_function_call_delegation.sql b/src/test/regress/sql/multi_mx_function_call_delegation.sql index 9c0a6c6e4..6f58ac8a2 100644 --- a/src/test/regress/sql/multi_mx_function_call_delegation.sql +++ b/src/test/regress/sql/multi_mx_function_call_delegation.sql @@ -225,6 +225,7 @@ select start_metadata_sync_to_node('localhost', :worker_2_port); \c - - - :master_port SET search_path to multi_mx_function_call_delegation, public; SET client_min_messages TO DEBUG1; +SET citus.replication_model = 'streaming'; -- -- Test non-const parameter values