distributed_functions: test aggregation strategies

fix_120_custom_aggregates_distribute_multiarg
Philip Dubé 2019-10-18 19:50:44 +00:00
parent 364b33a22d
commit 04b5dc39bc
2 changed files with 103 additions and 3 deletions

View File

@ -91,12 +91,19 @@ begin
return state * 2;
end;
$$;
CREATE FUNCTION agg_combinefunc(state1 int, state2 int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state1 + state2;
end;
$$;
CREATE AGGREGATE sum2(int) (
sfunc = agg_sfunc,
stype = int,
sspace = 8,
finalfunc = agg_finalfunc,
finalfunc_extra,
combinefunc = agg_combinefunc,
initcond = '5',
msfunc = agg_sfunc,
mstype = int,
@ -111,13 +118,22 @@ CREATE AGGREGATE sum2(int) (
-- allow alter statements to be propagated and keep the functions in sync across machines
SET citus.enable_ddl_propagation TO on;
-- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id int);
CREATE TABLE colocation_table(id int, val int);
SELECT create_distributed_table('colocation_table','id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO colocation_table VALUES (1, 0), (2, 1), (3, 4), (6, 2), (3, 2), (2, 0), (8, 4);
CREATE TABLE cojoin_table(id int, val int);
SELECT create_distributed_table('cojoin_table','id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO cojoin_table VALUES (1, 2), (2, 3), (5, 6), (6, 7), (3, 4), (2, 3), (8, 9);
-- make sure that none of the active and primary nodes hasmetadata
-- at the start of the test
select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
@ -369,6 +385,50 @@ SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER B
ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests;
ALTER AGGREGATE sum2(int) SET SCHEMA function_tests2;
-- Test distributed execution
-- This fails before being marked for distributed execution
SELECT ct.val, function_tests2.sum2(ct.id + jt.val)
FROM colocation_table ct
JOIN cojoin_table jt ON ct.id = jt.id
GROUP BY ct.val ORDER BY ct.val;
ERROR: unsupported aggregate function sum2
SELECT mark_aggregate_for_distributed_execution('function_tests2.sum2(int)', 'combine');
mark_aggregate_for_distributed_execution
------------------------------------------
(1 row)
SELECT ct.val, function_tests2.sum2(ct.id + jt.val)
FROM colocation_table ct
JOIN cojoin_table jt ON ct.id = jt.id
GROUP BY ct.val ORDER BY ct.val;
val | sum2
-----+------
0 | 56
1 | 40
2 | 70
4 | 78
(4 rows)
-- Commute strategy isn't rejected, but gives incorrect results
SELECT mark_aggregate_for_distributed_execution('function_tests2.sum2(int)', 'commute');
mark_aggregate_for_distributed_execution
------------------------------------------
(1 row)
SELECT ct.val, function_tests2.sum2(ct.id + jt.val)
FROM colocation_table ct
JOIN cojoin_table jt ON ct.id = jt.id
GROUP BY ct.val ORDER BY ct.val;
val | sum2
-----+------
0 | 102
1 | 70
2 | 130
4 | 146
(4 rows)
-- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator
CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer
AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded
@ -388,6 +448,10 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY
localhost | 57638 | t | 6
(2 rows)
-- First test that we reject NULL
SELECT create_distributed_function(NULL);
ERROR: the first parameter for create_distributed_function() should be a single a valid function or procedure name followed by a list of parameters in parantheses
HINT: skip the parameters with OUT argtype as they are not part of the signature in PostgreSQL
-- distributed functions should not be allowed to depend on an extension, also functions
-- that depend on an extension should not be allowed to be distributed.
ALTER FUNCTION add(int,int) DEPENDS ON EXTENSION citus;
@ -441,7 +505,7 @@ SELECT create_distributed_function('add_with_param_names(int, int)', '$3');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
SELECT create_distributed_function('add_with_param_names(int, int)', '$1a');
ERROR: invalid input syntax for integer: "1a"
ERROR: invalid input syntax for type integer: "1a"
-- non existing column name
SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid

View File

@ -97,12 +97,20 @@ begin
end;
$$;
CREATE FUNCTION agg_combinefunc(state1 int, state2 int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state1 + state2;
end;
$$;
CREATE AGGREGATE sum2(int) (
sfunc = agg_sfunc,
stype = int,
sspace = 8,
finalfunc = agg_finalfunc,
finalfunc_extra,
combinefunc = agg_combinefunc,
initcond = '5',
msfunc = agg_sfunc,
mstype = int,
@ -119,8 +127,13 @@ CREATE AGGREGATE sum2(int) (
SET citus.enable_ddl_propagation TO on;
-- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id int);
CREATE TABLE colocation_table(id int, val int);
SELECT create_distributed_table('colocation_table','id');
INSERT INTO colocation_table VALUES (1, 0), (2, 1), (3, 4), (6, 2), (3, 2), (2, 0), (8, 4);
CREATE TABLE cojoin_table(id int, val int);
SELECT create_distributed_table('cojoin_table','id');
INSERT INTO cojoin_table VALUES (1, 2), (2, 3), (5, 6), (6, 7), (3, 4), (2, 3), (8, 9);
-- make sure that none of the active and primary nodes hasmetadata
-- at the start of the test
@ -217,6 +230,26 @@ ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests;
ALTER AGGREGATE sum2(int) SET SCHEMA function_tests2;
-- Test distributed execution
-- This fails before being marked for distributed execution
SELECT ct.val, function_tests2.sum2(ct.id + jt.val)
FROM colocation_table ct
JOIN cojoin_table jt ON ct.id = jt.id
GROUP BY ct.val ORDER BY ct.val;
SELECT mark_aggregate_for_distributed_execution('function_tests2.sum2(int)', 'combine');
SELECT ct.val, function_tests2.sum2(ct.id + jt.val)
FROM colocation_table ct
JOIN cojoin_table jt ON ct.id = jt.id
GROUP BY ct.val ORDER BY ct.val;
-- Commute strategy isn't rejected, but gives incorrect results
SELECT mark_aggregate_for_distributed_execution('function_tests2.sum2(int)', 'commute');
SELECT ct.val, function_tests2.sum2(ct.id + jt.val)
FROM colocation_table ct
JOIN cojoin_table jt ON ct.id = jt.id
GROUP BY ct.val ORDER BY ct.val;
-- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator
CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer
AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded
@ -226,6 +259,9 @@ AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
-- First test that we reject NULL
SELECT create_distributed_function(NULL);
-- distributed functions should not be allowed to depend on an extension, also functions
-- that depend on an extension should not be allowed to be distributed.
ALTER FUNCTION add(int,int) DEPENDS ON EXTENSION citus;