From 04b5dc39bc8146a5a225eaf08604e8f654e500c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 18 Oct 2019 19:50:44 +0000 Subject: [PATCH] distributed_functions: test aggregation strategies --- .../expected/distributed_functions.out | 68 ++++++++++++++++++- .../regress/sql/distributed_functions.sql | 38 ++++++++++- 2 files changed, 103 insertions(+), 3 deletions(-) diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index a638779b8..803470f58 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -91,12 +91,19 @@ begin return state * 2; end; $$; +CREATE FUNCTION agg_combinefunc(state1 int, state2 int) +RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$ +begin + return state1 + state2; +end; +$$; CREATE AGGREGATE sum2(int) ( sfunc = agg_sfunc, stype = int, sspace = 8, finalfunc = agg_finalfunc, finalfunc_extra, + combinefunc = agg_combinefunc, initcond = '5', msfunc = agg_sfunc, mstype = int, @@ -111,13 +118,22 @@ CREATE AGGREGATE sum2(int) ( -- allow alter statements to be propagated and keep the functions in sync across machines SET citus.enable_ddl_propagation TO on; -- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists. -CREATE TABLE colocation_table(id int); +CREATE TABLE colocation_table(id int, val int); SELECT create_distributed_table('colocation_table','id'); create_distributed_table -------------------------- (1 row) +INSERT INTO colocation_table VALUES (1, 0), (2, 1), (3, 4), (6, 2), (3, 2), (2, 0), (8, 4); +CREATE TABLE cojoin_table(id int, val int); +SELECT create_distributed_table('cojoin_table','id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO cojoin_table VALUES (1, 2), (2, 3), (5, 6), (6, 7), (3, 4), (2, 3), (8, 9); -- make sure that none of the active and primary nodes hasmetadata -- at the start of the test select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; @@ -369,6 +385,50 @@ SELECT * FROM run_command_on_workers('SELECT function_tests2.add(2,3);') ORDER B ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests; ALTER AGGREGATE sum2(int) SET SCHEMA function_tests2; +-- Test distributed execution +-- This fails before being marked for distributed execution +SELECT ct.val, function_tests2.sum2(ct.id + jt.val) +FROM colocation_table ct +JOIN cojoin_table jt ON ct.id = jt.id +GROUP BY ct.val ORDER BY ct.val; +ERROR: unsupported aggregate function sum2 +SELECT mark_aggregate_for_distributed_execution('function_tests2.sum2(int)', 'combine'); + mark_aggregate_for_distributed_execution +------------------------------------------ + +(1 row) + +SELECT ct.val, function_tests2.sum2(ct.id + jt.val) +FROM colocation_table ct +JOIN cojoin_table jt ON ct.id = jt.id +GROUP BY ct.val ORDER BY ct.val; + val | sum2 +-----+------ + 0 | 56 + 1 | 40 + 2 | 70 + 4 | 78 +(4 rows) + +-- Commute strategy isn't rejected, but gives incorrect results +SELECT mark_aggregate_for_distributed_execution('function_tests2.sum2(int)', 'commute'); + mark_aggregate_for_distributed_execution +------------------------------------------ + +(1 row) + +SELECT ct.val, function_tests2.sum2(ct.id + jt.val) +FROM colocation_table ct +JOIN cojoin_table jt ON ct.id = jt.id +GROUP BY ct.val ORDER BY ct.val; + val | sum2 +-----+------ + 0 | 102 + 1 | 70 + 2 | 130 + 4 | 146 +(4 rows) + -- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded @@ -388,6 +448,10 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY localhost | 57638 | t | 6 (2 rows) +-- First test that we reject NULL +SELECT create_distributed_function(NULL); +ERROR: the first parameter for create_distributed_function() should be a single a valid function or procedure name followed by a list of parameters in parantheses +HINT: skip the parameters with OUT argtype as they are not part of the signature in PostgreSQL -- distributed functions should not be allowed to depend on an extension, also functions -- that depend on an extension should not be allowed to be distributed. ALTER FUNCTION add(int,int) DEPENDS ON EXTENSION citus; @@ -441,7 +505,7 @@ SELECT create_distributed_function('add_with_param_names(int, int)', '$3'); ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() SELECT create_distributed_function('add_with_param_names(int, int)', '$1a'); -ERROR: invalid input syntax for integer: "1a" +ERROR: invalid input syntax for type integer: "1a" -- non existing column name SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa'); ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index ad5d7eff2..a7162baf3 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -97,12 +97,20 @@ begin end; $$; +CREATE FUNCTION agg_combinefunc(state1 int, state2 int) +RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$ +begin + return state1 + state2; +end; +$$; + CREATE AGGREGATE sum2(int) ( sfunc = agg_sfunc, stype = int, sspace = 8, finalfunc = agg_finalfunc, finalfunc_extra, + combinefunc = agg_combinefunc, initcond = '5', msfunc = agg_sfunc, mstype = int, @@ -119,8 +127,13 @@ CREATE AGGREGATE sum2(int) ( SET citus.enable_ddl_propagation TO on; -- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists. -CREATE TABLE colocation_table(id int); +CREATE TABLE colocation_table(id int, val int); SELECT create_distributed_table('colocation_table','id'); +INSERT INTO colocation_table VALUES (1, 0), (2, 1), (3, 4), (6, 2), (3, 2), (2, 0), (8, 4); + +CREATE TABLE cojoin_table(id int, val int); +SELECT create_distributed_table('cojoin_table','id'); +INSERT INTO cojoin_table VALUES (1, 2), (2, 3), (5, 6), (6, 7), (3, 4), (2, 3), (8, 9); -- make sure that none of the active and primary nodes hasmetadata -- at the start of the test @@ -217,6 +230,26 @@ ALTER FUNCTION function_tests2.add(int,int) SET SCHEMA function_tests; ALTER AGGREGATE sum2(int) SET SCHEMA function_tests2; +-- Test distributed execution +-- This fails before being marked for distributed execution +SELECT ct.val, function_tests2.sum2(ct.id + jt.val) +FROM colocation_table ct +JOIN cojoin_table jt ON ct.id = jt.id +GROUP BY ct.val ORDER BY ct.val; + +SELECT mark_aggregate_for_distributed_execution('function_tests2.sum2(int)', 'combine'); +SELECT ct.val, function_tests2.sum2(ct.id + jt.val) +FROM colocation_table ct +JOIN cojoin_table jt ON ct.id = jt.id +GROUP BY ct.val ORDER BY ct.val; + +-- Commute strategy isn't rejected, but gives incorrect results +SELECT mark_aggregate_for_distributed_execution('function_tests2.sum2(int)', 'commute'); +SELECT ct.val, function_tests2.sum2(ct.id + jt.val) +FROM colocation_table ct +JOIN cojoin_table jt ON ct.id = jt.id +GROUP BY ct.val ORDER BY ct.val; + -- when a function is distributed and we create or replace the function we need to propagate the statement to the worker to keep it in sync with the coordinator CREATE OR REPLACE FUNCTION add(integer, integer) RETURNS integer AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us if the update succeeded @@ -226,6 +259,9 @@ AS 'select $1 * $2;' -- I know, this is not an add, but the output will tell us SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)'); SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; +-- First test that we reject NULL +SELECT create_distributed_function(NULL); + -- distributed functions should not be allowed to depend on an extension, also functions -- that depend on an extension should not be allowed to be distributed. ALTER FUNCTION add(int,int) DEPENDS ON EXTENSION citus;