diff --git a/src/test/regress/citus_tests/config.py b/src/test/regress/citus_tests/config.py index c88efa814..25f19bafb 100644 --- a/src/test/regress/citus_tests/config.py +++ b/src/test/regress/citus_tests/config.py @@ -321,11 +321,14 @@ class CitusShardReplicationFactorClusterConfig(CitusDefaultClusterConfig): self.new_settings = {"citus.shard_replication_factor": 2} self.skip_tests = [ # citus does not support foreign keys in distributed tables - # when citus.shard_replication_factor > 2 + # when citus.shard_replication_factor >= 2 "arbitrary_configs_truncate_partition_create", "arbitrary_configs_truncate_partition", # citus does not support modifying a partition when - # citus.shard_replication_factor > 2 - "arbitrary_configs_truncate_cascade_create", "arbitrary_configs_truncate_cascade"] + # citus.shard_replication_factor >= 2 + "arbitrary_configs_truncate_cascade_create", "arbitrary_configs_truncate_cascade", + # citus does not support colocating functions with distributed tables when + # citus.shard_replication_factor >= 2 + "function_create", "functions"] class CitusSingleShardClusterConfig(CitusDefaultClusterConfig): diff --git a/src/test/regress/create_schedule b/src/test/regress/create_schedule index c0967a29d..d78129c9a 100644 --- a/src/test/regress/create_schedule +++ b/src/test/regress/create_schedule @@ -8,6 +8,7 @@ test: schemas_create test: views_create test: sequences_create test: index_create +test: function_create test: arbitrary_configs_truncate_create test: arbitrary_configs_truncate_cascade_create test: arbitrary_configs_truncate_partition_create diff --git a/src/test/regress/expected/function_create.out b/src/test/regress/expected/function_create.out new file mode 100644 index 000000000..692b60805 --- /dev/null +++ b/src/test/regress/expected/function_create.out @@ -0,0 +1,223 @@ +\set VERBOSITY terse +CREATE SCHEMA function_create; +SET search_path TO function_create; +-- helper function to verify the function of a coordinator is the same on all workers +CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text) + RETURNS bool + LANGUAGE plpgsql +AS $func$ +DECLARE + coordinatorSql text; + workerSql text; +BEGIN + SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql; + FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP + IF workerSql != coordinatorSql THEN + RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql; + RETURN false; + END IF; + END LOOP; + + RETURN true; +END; +$func$; +-- test delegating function calls +CREATE TABLE warnings ( + id int primary key, + message text +); +SELECT create_distributed_table('warnings', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO warnings VALUES (1, 'hello arbitrary config tests'); +CREATE FUNCTION warning(int, text) +RETURNS void +LANGUAGE plpgsql AS $$ +BEGIN + RAISE WARNING '%', $2; +END; +$$; +SELECT create_distributed_function('warning(int,text)','$1'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +-- verify that the function definition is consistent in the cluster +SELECT verify_function_is_same_on_workers('function_create.warning(int,text)'); + verify_function_is_same_on_workers +--------------------------------------------------------------------- + t +(1 row) + +-- test a function that performs operation on the single shard of a reference table +CREATE TABLE monotonic_series(used_values int); +SELECT create_reference_table('monotonic_series'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO monotonic_series VALUES (1), (3), (5); +CREATE FUNCTION add_new_item_to_series() +RETURNS int +LANGUAGE SQL +AS $func$ +INSERT INTO monotonic_series SELECT max(used_values)+1 FROM monotonic_series RETURNING used_values; +$func$; +-- Create and distribute a simple function +CREATE FUNCTION eq(macaddr, macaddr) RETURNS bool + AS 'select $1 = $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; +-- testing alter statements for a distributed function +-- ROWS 5, untested because; +-- ERROR: ROWS is not applicable when function does not return a set +SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)'); + verify_function_is_same_on_workers +--------------------------------------------------------------------- + t +(1 row) + +ALTER FUNCTION eq(macaddr,macaddr) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE COST 5; +SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)'); + verify_function_is_same_on_workers +--------------------------------------------------------------------- + t +(1 row) + +ALTER FUNCTION eq(macaddr,macaddr) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED; +SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)'); + verify_function_is_same_on_workers +--------------------------------------------------------------------- + t +(1 row) + +ALTER FUNCTION eq(macaddr,macaddr) STRICT VOLATILE PARALLEL SAFE; +SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)'); + verify_function_is_same_on_workers +--------------------------------------------------------------------- + t +(1 row) + +-- Test SET/RESET for alter function +ALTER ROUTINE eq(macaddr,macaddr) SET client_min_messages TO debug; +SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)'); + verify_function_is_same_on_workers +--------------------------------------------------------------------- + t +(1 row) + +ALTER FUNCTION eq(macaddr,macaddr) RESET client_min_messages; +SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)'); + verify_function_is_same_on_workers +--------------------------------------------------------------------- + t +(1 row) + +ALTER FUNCTION eq(macaddr,macaddr) SET search_path TO 'sch'';ma', public; +SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)'); + verify_function_is_same_on_workers +--------------------------------------------------------------------- + t +(1 row) + +ALTER FUNCTION eq(macaddr,macaddr) RESET search_path; +-- rename function and make sure the new name can be used on the workers +ALTER FUNCTION eq(macaddr,macaddr) RENAME TO eq2; +SELECT verify_function_is_same_on_workers('function_create.eq2(macaddr,macaddr)'); + verify_function_is_same_on_workers +--------------------------------------------------------------------- + t +(1 row) + +-- user-defined aggregates with & without strict +create function sum2_sfunc_strict(state int, x int) +returns int immutable strict language plpgsql as $$ +begin return state + x; +end; +$$; +create function sum2_finalfunc_strict(state int) +returns int immutable strict language plpgsql as $$ +begin return state * 2; +end; +$$; +create function sum2_sfunc(state int, x int) +returns int immutable language plpgsql as $$ +begin return state + x; +end; +$$; +create function sum2_finalfunc(state int) +returns int immutable language plpgsql as $$ +begin return state * 2; +end; +$$; +create aggregate sum2 (int) ( + sfunc = sum2_sfunc, + stype = int, + finalfunc = sum2_finalfunc, + combinefunc = sum2_sfunc, + initcond = '0' +); +create aggregate sum2_strict (int) ( + sfunc = sum2_sfunc_strict, + stype = int, + finalfunc = sum2_finalfunc_strict, + combinefunc = sum2_sfunc_strict +); +-- user-defined aggregates with multiple-parameters +create function psum_sfunc(s int, x int, y int) +returns int immutable language plpgsql as $$ +begin return coalesce(s,0) + coalesce(x*y+3,1); +end; +$$; +create function psum_sfunc_strict(s int, x int, y int) +returns int immutable strict language plpgsql as $$ +begin return coalesce(s,0) + coalesce(x*y+3,1); +end; +$$; +create function psum_combinefunc(s1 int, s2 int) +returns int immutable language plpgsql as $$ +begin return coalesce(s1,0) + coalesce(s2,0); +end; +$$; +create function psum_combinefunc_strict(s1 int, s2 int) +returns int immutable strict language plpgsql as $$ +begin return coalesce(s1,0) + coalesce(s2,0); +end; +$$; +create function psum_finalfunc(x int) +returns int immutable language plpgsql as $$ +begin return x * 2; +end; +$$; +create function psum_finalfunc_strict(x int) +returns int immutable strict language plpgsql as $$ +begin return x * 2; +end; +$$; +create aggregate psum(int, int)( + sfunc=psum_sfunc, + combinefunc=psum_combinefunc, + finalfunc=psum_finalfunc, + stype=int +); +create aggregate psum_strict(int, int)( + sfunc=psum_sfunc_strict, + combinefunc=psum_combinefunc_strict, + finalfunc=psum_finalfunc_strict, + stype=int, + initcond=0 +); +-- generate test data +create table aggdata (id int, key int, val int, valf float8); +select create_distributed_table('aggdata', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/expected/functions.out b/src/test/regress/expected/functions.out new file mode 100644 index 000000000..2401f81fd --- /dev/null +++ b/src/test/regress/expected/functions.out @@ -0,0 +1,139 @@ +\set VERBOSITY terse +SET search_path TO function_create; +-- test user defined function with a distribution column argument +SELECT + warning (id, message) +FROM + warnings +WHERE + id = 1; +WARNING: hello arbitrary config tests + warning +--------------------------------------------------------------------- + +(1 row) + +SELECT warning (1, 'Push down to worker that holds the partition value of 1'); +WARNING: Push down to worker that holds the partition value of 1 + warning +--------------------------------------------------------------------- + +(1 row) + +SELECT warning (2, 'Push down to worker that holds the partition value of 2'); +WARNING: Push down to worker that holds the partition value of 2 + warning +--------------------------------------------------------------------- + +(1 row) + +SELECT warning (3, 'Push down to worker that holds the partition value of 3'); +WARNING: Push down to worker that holds the partition value of 3 + warning +--------------------------------------------------------------------- + +(1 row) + +SELECT warning (4, 'Push down to worker that holds the partition value of 4'); +WARNING: Push down to worker that holds the partition value of 4 + warning +--------------------------------------------------------------------- + +(1 row) + +SELECT warning (5, 'Push down to worker that holds the partition value of 5'); +WARNING: Push down to worker that holds the partition value of 5 + warning +--------------------------------------------------------------------- + +(1 row) + +SELECT warning (6, 'Push down to worker that holds the partition value of 6'); +WARNING: Push down to worker that holds the partition value of 6 + warning +--------------------------------------------------------------------- + +(1 row) + +SELECT warning (7, 'Push down to worker that holds the partition value of 7'); +WARNING: Push down to worker that holds the partition value of 7 + warning +--------------------------------------------------------------------- + +(1 row) + +-- insert some data to test user defined aggregates +INSERT INTO aggdata (id, key, val, valf) + VALUES (1, 1, 2, 11.2), + (2, 1, NULL, 2.1), + (3, 2, 2, 3.22), + (4, 2, 3, 4.23), + (5, 2, 5, 5.25), + (6, 3, 4, 63.4), + (7, 5, NULL, 75), + (8, 6, NULL, NULL), + (9, 6, NULL, 96), + (10, 7, 8, 1078), + (11, 9, 0, 1.19); +-- test user defined aggregates +SELECT + key, + sum2 (val), + sum2_strict (val), + stddev(valf)::numeric(10, 5), + psum (val, valf::int), + psum_strict (val, valf::int) +FROM + aggdata +GROUP BY + key +ORDER BY + key; + key | sum2 | sum2_strict | stddev | psum | psum_strict +--------------------------------------------------------------------- + 1 | | 4 | 6.43467 | 52 | 50 + 2 | 20 | 20 | 1.01500 | 104 | 104 + 3 | 8 | 8 | | 510 | 510 + 5 | | | | 2 | 0 + 6 | | | | 4 | 0 + 7 | 16 | 16 | | 17254 | 17254 + 9 | 0 | 0 | | 6 | 6 +(7 rows) + +-- test function that writes to a reference table +SELECT add_new_item_to_series(); + add_new_item_to_series +--------------------------------------------------------------------- + 6 +(1 row) + +SELECT add_new_item_to_series(); + add_new_item_to_series +--------------------------------------------------------------------- + 7 +(1 row) + +SELECT add_new_item_to_series(); + add_new_item_to_series +--------------------------------------------------------------------- + 8 +(1 row) + +SELECT add_new_item_to_series(); + add_new_item_to_series +--------------------------------------------------------------------- + 9 +(1 row) + +SELECT add_new_item_to_series(); + add_new_item_to_series +--------------------------------------------------------------------- + 10 +(1 row) + +SELECT add_new_item_to_series(); + add_new_item_to_series +--------------------------------------------------------------------- + 11 +(1 row) + diff --git a/src/test/regress/expected/postgres.out b/src/test/regress/expected/postgres.out index 2fdb1714e..3c146525c 100644 --- a/src/test/regress/expected/postgres.out +++ b/src/test/regress/expected/postgres.out @@ -24,3 +24,16 @@ AS $function$ BEGIN END; $function$; +CREATE OR REPLACE FUNCTION pg_catalog.create_distributed_function ( + function_name regprocedure, + distribution_arg_name text DEFAULT NULL, + colocate_with text DEFAULT 'default', + force_delegation bool DEFAULT NULL +) + RETURNS void + LANGUAGE plpgsql + CALLED ON NULL INPUT + AS $function$ + BEGIN + END; + $function$; diff --git a/src/test/regress/sql/function_create.sql b/src/test/regress/sql/function_create.sql new file mode 100644 index 000000000..2973769c9 --- /dev/null +++ b/src/test/regress/sql/function_create.sql @@ -0,0 +1,185 @@ +\set VERBOSITY terse +CREATE SCHEMA function_create; +SET search_path TO function_create; + +-- helper function to verify the function of a coordinator is the same on all workers +CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text) + RETURNS bool + LANGUAGE plpgsql +AS $func$ +DECLARE + coordinatorSql text; + workerSql text; +BEGIN + SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql; + FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP + IF workerSql != coordinatorSql THEN + RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql; + RETURN false; + END IF; + END LOOP; + + RETURN true; +END; +$func$; + +-- test delegating function calls +CREATE TABLE warnings ( + id int primary key, + message text +); + +SELECT create_distributed_table('warnings', 'id'); +INSERT INTO warnings VALUES (1, 'hello arbitrary config tests'); + +CREATE FUNCTION warning(int, text) +RETURNS void +LANGUAGE plpgsql AS $$ +BEGIN + RAISE WARNING '%', $2; +END; +$$; + +SELECT create_distributed_function('warning(int,text)','$1'); + +-- verify that the function definition is consistent in the cluster +SELECT verify_function_is_same_on_workers('function_create.warning(int,text)'); + +-- test a function that performs operation on the single shard of a reference table +CREATE TABLE monotonic_series(used_values int); +SELECT create_reference_table('monotonic_series'); +INSERT INTO monotonic_series VALUES (1), (3), (5); + +CREATE FUNCTION add_new_item_to_series() +RETURNS int +LANGUAGE SQL +AS $func$ +INSERT INTO monotonic_series SELECT max(used_values)+1 FROM monotonic_series RETURNING used_values; +$func$; + +-- Create and distribute a simple function +CREATE FUNCTION eq(macaddr, macaddr) RETURNS bool + AS 'select $1 = $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; + +-- testing alter statements for a distributed function +-- ROWS 5, untested because; +-- ERROR: ROWS is not applicable when function does not return a set +SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE COST 5; +SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED; +SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) STRICT VOLATILE PARALLEL SAFE; +SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)'); + +-- Test SET/RESET for alter function +ALTER ROUTINE eq(macaddr,macaddr) SET client_min_messages TO debug; +SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) RESET client_min_messages; +SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) SET search_path TO 'sch'';ma', public; +SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)'); +ALTER FUNCTION eq(macaddr,macaddr) RESET search_path; + +-- rename function and make sure the new name can be used on the workers +ALTER FUNCTION eq(macaddr,macaddr) RENAME TO eq2; +SELECT verify_function_is_same_on_workers('function_create.eq2(macaddr,macaddr)'); + +-- user-defined aggregates with & without strict +create function sum2_sfunc_strict(state int, x int) +returns int immutable strict language plpgsql as $$ +begin return state + x; +end; +$$; + +create function sum2_finalfunc_strict(state int) +returns int immutable strict language plpgsql as $$ +begin return state * 2; +end; +$$; + +create function sum2_sfunc(state int, x int) +returns int immutable language plpgsql as $$ +begin return state + x; +end; +$$; + +create function sum2_finalfunc(state int) +returns int immutable language plpgsql as $$ +begin return state * 2; +end; +$$; + +create aggregate sum2 (int) ( + sfunc = sum2_sfunc, + stype = int, + finalfunc = sum2_finalfunc, + combinefunc = sum2_sfunc, + initcond = '0' +); + +create aggregate sum2_strict (int) ( + sfunc = sum2_sfunc_strict, + stype = int, + finalfunc = sum2_finalfunc_strict, + combinefunc = sum2_sfunc_strict +); + +-- user-defined aggregates with multiple-parameters +create function psum_sfunc(s int, x int, y int) +returns int immutable language plpgsql as $$ +begin return coalesce(s,0) + coalesce(x*y+3,1); +end; +$$; + +create function psum_sfunc_strict(s int, x int, y int) +returns int immutable strict language plpgsql as $$ +begin return coalesce(s,0) + coalesce(x*y+3,1); +end; +$$; + +create function psum_combinefunc(s1 int, s2 int) +returns int immutable language plpgsql as $$ +begin return coalesce(s1,0) + coalesce(s2,0); +end; +$$; + +create function psum_combinefunc_strict(s1 int, s2 int) +returns int immutable strict language plpgsql as $$ +begin return coalesce(s1,0) + coalesce(s2,0); +end; +$$; + +create function psum_finalfunc(x int) +returns int immutable language plpgsql as $$ +begin return x * 2; +end; +$$; + +create function psum_finalfunc_strict(x int) +returns int immutable strict language plpgsql as $$ +begin return x * 2; +end; +$$; + +create aggregate psum(int, int)( + sfunc=psum_sfunc, + combinefunc=psum_combinefunc, + finalfunc=psum_finalfunc, + stype=int +); + +create aggregate psum_strict(int, int)( + sfunc=psum_sfunc_strict, + combinefunc=psum_combinefunc_strict, + finalfunc=psum_finalfunc_strict, + stype=int, + initcond=0 +); + +-- generate test data +create table aggdata (id int, key int, val int, valf float8); +select create_distributed_table('aggdata', 'id'); diff --git a/src/test/regress/sql/functions.sql b/src/test/regress/sql/functions.sql new file mode 100644 index 000000000..ce8bf38e9 --- /dev/null +++ b/src/test/regress/sql/functions.sql @@ -0,0 +1,56 @@ +\set VERBOSITY terse +SET search_path TO function_create; + +-- test user defined function with a distribution column argument +SELECT + warning (id, message) +FROM + warnings +WHERE + id = 1; + +SELECT warning (1, 'Push down to worker that holds the partition value of 1'); +SELECT warning (2, 'Push down to worker that holds the partition value of 2'); +SELECT warning (3, 'Push down to worker that holds the partition value of 3'); +SELECT warning (4, 'Push down to worker that holds the partition value of 4'); +SELECT warning (5, 'Push down to worker that holds the partition value of 5'); +SELECT warning (6, 'Push down to worker that holds the partition value of 6'); +SELECT warning (7, 'Push down to worker that holds the partition value of 7'); + + +-- insert some data to test user defined aggregates +INSERT INTO aggdata (id, key, val, valf) + VALUES (1, 1, 2, 11.2), + (2, 1, NULL, 2.1), + (3, 2, 2, 3.22), + (4, 2, 3, 4.23), + (5, 2, 5, 5.25), + (6, 3, 4, 63.4), + (7, 5, NULL, 75), + (8, 6, NULL, NULL), + (9, 6, NULL, 96), + (10, 7, 8, 1078), + (11, 9, 0, 1.19); + +-- test user defined aggregates +SELECT + key, + sum2 (val), + sum2_strict (val), + stddev(valf)::numeric(10, 5), + psum (val, valf::int), + psum_strict (val, valf::int) +FROM + aggdata +GROUP BY + key +ORDER BY + key; + +-- test function that writes to a reference table +SELECT add_new_item_to_series(); +SELECT add_new_item_to_series(); +SELECT add_new_item_to_series(); +SELECT add_new_item_to_series(); +SELECT add_new_item_to_series(); +SELECT add_new_item_to_series(); diff --git a/src/test/regress/sql/postgres.sql b/src/test/regress/sql/postgres.sql index 77c6a3a7b..e0915d583 100644 --- a/src/test/regress/sql/postgres.sql +++ b/src/test/regress/sql/postgres.sql @@ -27,3 +27,17 @@ AS $function$ BEGIN END; $function$; + +CREATE OR REPLACE FUNCTION pg_catalog.create_distributed_function ( + function_name regprocedure, + distribution_arg_name text DEFAULT NULL, + colocate_with text DEFAULT 'default', + force_delegation bool DEFAULT NULL +) + RETURNS void + LANGUAGE plpgsql + CALLED ON NULL INPUT + AS $function$ + BEGIN + END; + $function$; diff --git a/src/test/regress/sql_schedule b/src/test/regress/sql_schedule index fc011abf6..80d5e2d38 100644 --- a/src/test/regress/sql_schedule +++ b/src/test/regress/sql_schedule @@ -8,6 +8,7 @@ test: local_dist_join test: connectivity_checks citus_run_command test: schemas test: sequences +test: functions test: arbitrary_configs_truncate test: arbitrary_configs_truncate_cascade test: arbitrary_configs_truncate_partition