Add arbitrary config tests for function DDL statements (#5885)

pull/5901/head
Hanefi Onaldi 2022-04-12 16:03:10 +03:00 committed by GitHub
parent dd78c81378
commit 6254f30305
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 638 additions and 3 deletions

View File

@ -321,11 +321,14 @@ class CitusShardReplicationFactorClusterConfig(CitusDefaultClusterConfig):
self.new_settings = {"citus.shard_replication_factor": 2} self.new_settings = {"citus.shard_replication_factor": 2}
self.skip_tests = [ self.skip_tests = [
# citus does not support foreign keys in distributed tables # citus does not support foreign keys in distributed tables
# when citus.shard_replication_factor > 2 # when citus.shard_replication_factor >= 2
"arbitrary_configs_truncate_partition_create", "arbitrary_configs_truncate_partition", "arbitrary_configs_truncate_partition_create", "arbitrary_configs_truncate_partition",
# citus does not support modifying a partition when # citus does not support modifying a partition when
# citus.shard_replication_factor > 2 # citus.shard_replication_factor >= 2
"arbitrary_configs_truncate_cascade_create", "arbitrary_configs_truncate_cascade"] "arbitrary_configs_truncate_cascade_create", "arbitrary_configs_truncate_cascade",
# citus does not support colocating functions with distributed tables when
# citus.shard_replication_factor >= 2
"function_create", "functions"]
class CitusSingleShardClusterConfig(CitusDefaultClusterConfig): class CitusSingleShardClusterConfig(CitusDefaultClusterConfig):

View File

@ -8,6 +8,7 @@ test: schemas_create
test: views_create test: views_create
test: sequences_create test: sequences_create
test: index_create test: index_create
test: function_create
test: arbitrary_configs_truncate_create test: arbitrary_configs_truncate_create
test: arbitrary_configs_truncate_cascade_create test: arbitrary_configs_truncate_cascade_create
test: arbitrary_configs_truncate_partition_create test: arbitrary_configs_truncate_partition_create

View File

@ -0,0 +1,223 @@
\set VERBOSITY terse
CREATE SCHEMA function_create;
SET search_path TO function_create;
-- helper function to verify the function of a coordinator is the same on all workers
CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text)
RETURNS bool
LANGUAGE plpgsql
AS $func$
DECLARE
coordinatorSql text;
workerSql text;
BEGIN
SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql;
FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP
IF workerSql != coordinatorSql THEN
RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql;
RETURN false;
END IF;
END LOOP;
RETURN true;
END;
$func$;
-- test delegating function calls
CREATE TABLE warnings (
id int primary key,
message text
);
SELECT create_distributed_table('warnings', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO warnings VALUES (1, 'hello arbitrary config tests');
CREATE FUNCTION warning(int, text)
RETURNS void
LANGUAGE plpgsql AS $$
BEGIN
RAISE WARNING '%', $2;
END;
$$;
SELECT create_distributed_function('warning(int,text)','$1');
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- verify that the function definition is consistent in the cluster
SELECT verify_function_is_same_on_workers('function_create.warning(int,text)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
-- test a function that performs operation on the single shard of a reference table
CREATE TABLE monotonic_series(used_values int);
SELECT create_reference_table('monotonic_series');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO monotonic_series VALUES (1), (3), (5);
CREATE FUNCTION add_new_item_to_series()
RETURNS int
LANGUAGE SQL
AS $func$
INSERT INTO monotonic_series SELECT max(used_values)+1 FROM monotonic_series RETURNING used_values;
$func$;
-- Create and distribute a simple function
CREATE FUNCTION eq(macaddr, macaddr) RETURNS bool
AS 'select $1 = $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- testing alter statements for a distributed function
-- ROWS 5, untested because;
-- ERROR: ROWS is not applicable when function does not return a set
SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION eq(macaddr,macaddr) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE COST 5;
SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION eq(macaddr,macaddr) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED;
SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION eq(macaddr,macaddr) STRICT VOLATILE PARALLEL SAFE;
SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
-- Test SET/RESET for alter function
ALTER ROUTINE eq(macaddr,macaddr) SET client_min_messages TO debug;
SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION eq(macaddr,macaddr) RESET client_min_messages;
SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION eq(macaddr,macaddr) SET search_path TO 'sch'';ma', public;
SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
ALTER FUNCTION eq(macaddr,macaddr) RESET search_path;
-- rename function and make sure the new name can be used on the workers
ALTER FUNCTION eq(macaddr,macaddr) RENAME TO eq2;
SELECT verify_function_is_same_on_workers('function_create.eq2(macaddr,macaddr)');
verify_function_is_same_on_workers
---------------------------------------------------------------------
t
(1 row)
-- user-defined aggregates with & without strict
create function sum2_sfunc_strict(state int, x int)
returns int immutable strict language plpgsql as $$
begin return state + x;
end;
$$;
create function sum2_finalfunc_strict(state int)
returns int immutable strict language plpgsql as $$
begin return state * 2;
end;
$$;
create function sum2_sfunc(state int, x int)
returns int immutable language plpgsql as $$
begin return state + x;
end;
$$;
create function sum2_finalfunc(state int)
returns int immutable language plpgsql as $$
begin return state * 2;
end;
$$;
create aggregate sum2 (int) (
sfunc = sum2_sfunc,
stype = int,
finalfunc = sum2_finalfunc,
combinefunc = sum2_sfunc,
initcond = '0'
);
create aggregate sum2_strict (int) (
sfunc = sum2_sfunc_strict,
stype = int,
finalfunc = sum2_finalfunc_strict,
combinefunc = sum2_sfunc_strict
);
-- user-defined aggregates with multiple-parameters
create function psum_sfunc(s int, x int, y int)
returns int immutable language plpgsql as $$
begin return coalesce(s,0) + coalesce(x*y+3,1);
end;
$$;
create function psum_sfunc_strict(s int, x int, y int)
returns int immutable strict language plpgsql as $$
begin return coalesce(s,0) + coalesce(x*y+3,1);
end;
$$;
create function psum_combinefunc(s1 int, s2 int)
returns int immutable language plpgsql as $$
begin return coalesce(s1,0) + coalesce(s2,0);
end;
$$;
create function psum_combinefunc_strict(s1 int, s2 int)
returns int immutable strict language plpgsql as $$
begin return coalesce(s1,0) + coalesce(s2,0);
end;
$$;
create function psum_finalfunc(x int)
returns int immutable language plpgsql as $$
begin return x * 2;
end;
$$;
create function psum_finalfunc_strict(x int)
returns int immutable strict language plpgsql as $$
begin return x * 2;
end;
$$;
create aggregate psum(int, int)(
sfunc=psum_sfunc,
combinefunc=psum_combinefunc,
finalfunc=psum_finalfunc,
stype=int
);
create aggregate psum_strict(int, int)(
sfunc=psum_sfunc_strict,
combinefunc=psum_combinefunc_strict,
finalfunc=psum_finalfunc_strict,
stype=int,
initcond=0
);
-- generate test data
create table aggdata (id int, key int, val int, valf float8);
select create_distributed_table('aggdata', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)

View File

@ -0,0 +1,139 @@
\set VERBOSITY terse
SET search_path TO function_create;
-- test user defined function with a distribution column argument
SELECT
warning (id, message)
FROM
warnings
WHERE
id = 1;
WARNING: hello arbitrary config tests
warning
---------------------------------------------------------------------
(1 row)
SELECT warning (1, 'Push down to worker that holds the partition value of 1');
WARNING: Push down to worker that holds the partition value of 1
warning
---------------------------------------------------------------------
(1 row)
SELECT warning (2, 'Push down to worker that holds the partition value of 2');
WARNING: Push down to worker that holds the partition value of 2
warning
---------------------------------------------------------------------
(1 row)
SELECT warning (3, 'Push down to worker that holds the partition value of 3');
WARNING: Push down to worker that holds the partition value of 3
warning
---------------------------------------------------------------------
(1 row)
SELECT warning (4, 'Push down to worker that holds the partition value of 4');
WARNING: Push down to worker that holds the partition value of 4
warning
---------------------------------------------------------------------
(1 row)
SELECT warning (5, 'Push down to worker that holds the partition value of 5');
WARNING: Push down to worker that holds the partition value of 5
warning
---------------------------------------------------------------------
(1 row)
SELECT warning (6, 'Push down to worker that holds the partition value of 6');
WARNING: Push down to worker that holds the partition value of 6
warning
---------------------------------------------------------------------
(1 row)
SELECT warning (7, 'Push down to worker that holds the partition value of 7');
WARNING: Push down to worker that holds the partition value of 7
warning
---------------------------------------------------------------------
(1 row)
-- insert some data to test user defined aggregates
INSERT INTO aggdata (id, key, val, valf)
VALUES (1, 1, 2, 11.2),
(2, 1, NULL, 2.1),
(3, 2, 2, 3.22),
(4, 2, 3, 4.23),
(5, 2, 5, 5.25),
(6, 3, 4, 63.4),
(7, 5, NULL, 75),
(8, 6, NULL, NULL),
(9, 6, NULL, 96),
(10, 7, 8, 1078),
(11, 9, 0, 1.19);
-- test user defined aggregates
SELECT
key,
sum2 (val),
sum2_strict (val),
stddev(valf)::numeric(10, 5),
psum (val, valf::int),
psum_strict (val, valf::int)
FROM
aggdata
GROUP BY
key
ORDER BY
key;
key | sum2 | sum2_strict | stddev | psum | psum_strict
---------------------------------------------------------------------
1 | | 4 | 6.43467 | 52 | 50
2 | 20 | 20 | 1.01500 | 104 | 104
3 | 8 | 8 | | 510 | 510
5 | | | | 2 | 0
6 | | | | 4 | 0
7 | 16 | 16 | | 17254 | 17254
9 | 0 | 0 | | 6 | 6
(7 rows)
-- test function that writes to a reference table
SELECT add_new_item_to_series();
add_new_item_to_series
---------------------------------------------------------------------
6
(1 row)
SELECT add_new_item_to_series();
add_new_item_to_series
---------------------------------------------------------------------
7
(1 row)
SELECT add_new_item_to_series();
add_new_item_to_series
---------------------------------------------------------------------
8
(1 row)
SELECT add_new_item_to_series();
add_new_item_to_series
---------------------------------------------------------------------
9
(1 row)
SELECT add_new_item_to_series();
add_new_item_to_series
---------------------------------------------------------------------
10
(1 row)
SELECT add_new_item_to_series();
add_new_item_to_series
---------------------------------------------------------------------
11
(1 row)

View File

@ -24,3 +24,16 @@ AS $function$
BEGIN BEGIN
END; END;
$function$; $function$;
CREATE OR REPLACE FUNCTION pg_catalog.create_distributed_function (
function_name regprocedure,
distribution_arg_name text DEFAULT NULL,
colocate_with text DEFAULT 'default',
force_delegation bool DEFAULT NULL
)
RETURNS void
LANGUAGE plpgsql
CALLED ON NULL INPUT
AS $function$
BEGIN
END;
$function$;

View File

@ -0,0 +1,185 @@
\set VERBOSITY terse
CREATE SCHEMA function_create;
SET search_path TO function_create;
-- helper function to verify the function of a coordinator is the same on all workers
CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text)
RETURNS bool
LANGUAGE plpgsql
AS $func$
DECLARE
coordinatorSql text;
workerSql text;
BEGIN
SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql;
FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP
IF workerSql != coordinatorSql THEN
RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql;
RETURN false;
END IF;
END LOOP;
RETURN true;
END;
$func$;
-- test delegating function calls
CREATE TABLE warnings (
id int primary key,
message text
);
SELECT create_distributed_table('warnings', 'id');
INSERT INTO warnings VALUES (1, 'hello arbitrary config tests');
CREATE FUNCTION warning(int, text)
RETURNS void
LANGUAGE plpgsql AS $$
BEGIN
RAISE WARNING '%', $2;
END;
$$;
SELECT create_distributed_function('warning(int,text)','$1');
-- verify that the function definition is consistent in the cluster
SELECT verify_function_is_same_on_workers('function_create.warning(int,text)');
-- test a function that performs operation on the single shard of a reference table
CREATE TABLE monotonic_series(used_values int);
SELECT create_reference_table('monotonic_series');
INSERT INTO monotonic_series VALUES (1), (3), (5);
CREATE FUNCTION add_new_item_to_series()
RETURNS int
LANGUAGE SQL
AS $func$
INSERT INTO monotonic_series SELECT max(used_values)+1 FROM monotonic_series RETURNING used_values;
$func$;
-- Create and distribute a simple function
CREATE FUNCTION eq(macaddr, macaddr) RETURNS bool
AS 'select $1 = $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- testing alter statements for a distributed function
-- ROWS 5, untested because;
-- ERROR: ROWS is not applicable when function does not return a set
SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) CALLED ON NULL INPUT IMMUTABLE SECURITY INVOKER PARALLEL UNSAFE COST 5;
SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) RETURNS NULL ON NULL INPUT STABLE SECURITY DEFINER PARALLEL RESTRICTED;
SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) STRICT VOLATILE PARALLEL SAFE;
SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)');
-- Test SET/RESET for alter function
ALTER ROUTINE eq(macaddr,macaddr) SET client_min_messages TO debug;
SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) RESET client_min_messages;
SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) SET search_path TO 'sch'';ma', public;
SELECT verify_function_is_same_on_workers('function_create.eq(macaddr,macaddr)');
ALTER FUNCTION eq(macaddr,macaddr) RESET search_path;
-- rename function and make sure the new name can be used on the workers
ALTER FUNCTION eq(macaddr,macaddr) RENAME TO eq2;
SELECT verify_function_is_same_on_workers('function_create.eq2(macaddr,macaddr)');
-- user-defined aggregates with & without strict
create function sum2_sfunc_strict(state int, x int)
returns int immutable strict language plpgsql as $$
begin return state + x;
end;
$$;
create function sum2_finalfunc_strict(state int)
returns int immutable strict language plpgsql as $$
begin return state * 2;
end;
$$;
create function sum2_sfunc(state int, x int)
returns int immutable language plpgsql as $$
begin return state + x;
end;
$$;
create function sum2_finalfunc(state int)
returns int immutable language plpgsql as $$
begin return state * 2;
end;
$$;
create aggregate sum2 (int) (
sfunc = sum2_sfunc,
stype = int,
finalfunc = sum2_finalfunc,
combinefunc = sum2_sfunc,
initcond = '0'
);
create aggregate sum2_strict (int) (
sfunc = sum2_sfunc_strict,
stype = int,
finalfunc = sum2_finalfunc_strict,
combinefunc = sum2_sfunc_strict
);
-- user-defined aggregates with multiple-parameters
create function psum_sfunc(s int, x int, y int)
returns int immutable language plpgsql as $$
begin return coalesce(s,0) + coalesce(x*y+3,1);
end;
$$;
create function psum_sfunc_strict(s int, x int, y int)
returns int immutable strict language plpgsql as $$
begin return coalesce(s,0) + coalesce(x*y+3,1);
end;
$$;
create function psum_combinefunc(s1 int, s2 int)
returns int immutable language plpgsql as $$
begin return coalesce(s1,0) + coalesce(s2,0);
end;
$$;
create function psum_combinefunc_strict(s1 int, s2 int)
returns int immutable strict language plpgsql as $$
begin return coalesce(s1,0) + coalesce(s2,0);
end;
$$;
create function psum_finalfunc(x int)
returns int immutable language plpgsql as $$
begin return x * 2;
end;
$$;
create function psum_finalfunc_strict(x int)
returns int immutable strict language plpgsql as $$
begin return x * 2;
end;
$$;
create aggregate psum(int, int)(
sfunc=psum_sfunc,
combinefunc=psum_combinefunc,
finalfunc=psum_finalfunc,
stype=int
);
create aggregate psum_strict(int, int)(
sfunc=psum_sfunc_strict,
combinefunc=psum_combinefunc_strict,
finalfunc=psum_finalfunc_strict,
stype=int,
initcond=0
);
-- generate test data
create table aggdata (id int, key int, val int, valf float8);
select create_distributed_table('aggdata', 'id');

View File

@ -0,0 +1,56 @@
\set VERBOSITY terse
SET search_path TO function_create;
-- test user defined function with a distribution column argument
SELECT
warning (id, message)
FROM
warnings
WHERE
id = 1;
SELECT warning (1, 'Push down to worker that holds the partition value of 1');
SELECT warning (2, 'Push down to worker that holds the partition value of 2');
SELECT warning (3, 'Push down to worker that holds the partition value of 3');
SELECT warning (4, 'Push down to worker that holds the partition value of 4');
SELECT warning (5, 'Push down to worker that holds the partition value of 5');
SELECT warning (6, 'Push down to worker that holds the partition value of 6');
SELECT warning (7, 'Push down to worker that holds the partition value of 7');
-- insert some data to test user defined aggregates
INSERT INTO aggdata (id, key, val, valf)
VALUES (1, 1, 2, 11.2),
(2, 1, NULL, 2.1),
(3, 2, 2, 3.22),
(4, 2, 3, 4.23),
(5, 2, 5, 5.25),
(6, 3, 4, 63.4),
(7, 5, NULL, 75),
(8, 6, NULL, NULL),
(9, 6, NULL, 96),
(10, 7, 8, 1078),
(11, 9, 0, 1.19);
-- test user defined aggregates
SELECT
key,
sum2 (val),
sum2_strict (val),
stddev(valf)::numeric(10, 5),
psum (val, valf::int),
psum_strict (val, valf::int)
FROM
aggdata
GROUP BY
key
ORDER BY
key;
-- test function that writes to a reference table
SELECT add_new_item_to_series();
SELECT add_new_item_to_series();
SELECT add_new_item_to_series();
SELECT add_new_item_to_series();
SELECT add_new_item_to_series();
SELECT add_new_item_to_series();

View File

@ -27,3 +27,17 @@ AS $function$
BEGIN BEGIN
END; END;
$function$; $function$;
CREATE OR REPLACE FUNCTION pg_catalog.create_distributed_function (
function_name regprocedure,
distribution_arg_name text DEFAULT NULL,
colocate_with text DEFAULT 'default',
force_delegation bool DEFAULT NULL
)
RETURNS void
LANGUAGE plpgsql
CALLED ON NULL INPUT
AS $function$
BEGIN
END;
$function$;

View File

@ -8,6 +8,7 @@ test: local_dist_join
test: connectivity_checks citus_run_command test: connectivity_checks citus_run_command
test: schemas test: schemas
test: sequences test: sequences
test: functions
test: arbitrary_configs_truncate test: arbitrary_configs_truncate
test: arbitrary_configs_truncate_cascade test: arbitrary_configs_truncate_cascade
test: arbitrary_configs_truncate_partition test: arbitrary_configs_truncate_partition