diff --git a/src/test/regress/expected/multi_function_in_join.out b/src/test/regress/expected/multi_function_in_join.out index ac0ecd748..1f253aad9 100644 --- a/src/test/regress/expected/multi_function_in_join.out +++ b/src/test/regress/expected/multi_function_in_join.out @@ -12,6 +12,8 @@ CREATE SCHEMA functions_in_joins; SET search_path TO 'functions_in_joins'; SET citus.next_shard_id TO 2500000; +SET citus.replication_model to 'streaming'; +SET citus.shard_replication_factor to 1; CREATE TABLE table1 (id int, data int); SELECT create_distributed_table('table1','id'); create_distributed_table @@ -38,9 +40,15 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, ta CREATE FUNCTION add(integer, integer) RETURNS integer AS 'SELECT $1 + $2;' LANGUAGE SQL; +SELECT create_distributed_function('add(integer,integer)'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC; -DEBUG: generating subplan XXX_1 for subquery SELECT sum FROM functions_in_joins.add(3, 5) sum(sum) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, sum.sum FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum integer)) sum ON ((table1.id OPERATOR(pg_catalog.=) sum.sum))) ORDER BY table1.id id | data | sum --------------------------------------------------------------------- 8 | 64 | 8 diff --git a/src/test/regress/expected/multi_function_in_join_0.out b/src/test/regress/expected/multi_function_in_join_0.out new file mode 100644 index 000000000..8498c4a13 --- /dev/null +++ b/src/test/regress/expected/multi_function_in_join_0.out @@ -0,0 +1,250 @@ +-- +-- multi function in join queries aims to test the function calls that are +-- used in joins. +-- +-- These functions are supposed to be executed on the worker and to ensure +-- that we wrap those functions inside (SELECT * FROM fnc()) sub queries. +-- +-- We do not yet support those functions that: +-- - have lateral joins +-- - have WITH ORDINALITY clause +-- - are user-defined and immutable +CREATE SCHEMA functions_in_joins; +SET search_path TO 'functions_in_joins'; +SET citus.next_shard_id TO 2500000; +SET citus.replication_model to 'streaming'; +SET citus.shard_replication_factor to 1; +CREATE TABLE table1 (id int, data int); +SELECT create_distributed_table('table1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table1 +SELECT x, x*x +from generate_series(1, 100) as f (x); +-- Verbose messages for observing the subqueries that wrapped function calls +SET client_min_messages TO DEBUG1; +-- Check joins on a sequence +CREATE SEQUENCE numbers; +SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n) ORDER BY id ASC; +DEBUG: generating subplan XXX_1 for subquery SELECT n FROM nextval('functions_in_joins.numbers'::regclass) n(n) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.n FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(n bigint)) n ON ((table1.id OPERATOR(pg_catalog.=) n.n))) ORDER BY table1.id + id | data | n +--------------------------------------------------------------------- + 1 | 1 | 1 +(1 row) + +-- Check joins of a function that returns a single integer +CREATE FUNCTION add(integer, integer) RETURNS integer +AS 'SELECT $1 + $2;' +LANGUAGE SQL; +SELECT create_distributed_function('add(integer,integer)'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC; +DEBUG: generating subplan XXX_1 for subquery SELECT sum FROM functions_in_joins.add(3, 5) sum(sum) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, sum.sum FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum integer)) sum ON ((table1.id OPERATOR(pg_catalog.=) sum.sum))) ORDER BY table1.id + id | data | sum +--------------------------------------------------------------------- + 8 | 64 | 8 +(1 row) + +-- Check join of plpgsql functions +-- a function returning a single integer +CREATE OR REPLACE FUNCTION increment(i integer) RETURNS integer AS $$ +BEGIN + RETURN i + 1; +END; +$$ LANGUAGE plpgsql; +SELECT * FROM table1 JOIN increment(2) val ON (id = val) ORDER BY id ASC; +DEBUG: generating subplan XXX_1 for subquery SELECT val FROM functions_in_joins.increment(2) val(val) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, val.val FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.val FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(val integer)) val ON ((table1.id OPERATOR(pg_catalog.=) val.val))) ORDER BY table1.id + id | data | val +--------------------------------------------------------------------- + 3 | 9 | 3 +(1 row) + +-- a function that returns a set of integers +CREATE OR REPLACE FUNCTION next_k_integers(IN first_value INTEGER, + IN k INTEGER DEFAULT 3, + OUT result INTEGER) + RETURNS SETOF INTEGER AS $$ +BEGIN + RETURN QUERY SELECT x FROM generate_series(first_value, first_value+k-1) f(x); +END; +$$ LANGUAGE plpgsql; +SELECT * +FROM table1 JOIN next_k_integers(3,2) next_integers ON (id = next_integers.result) +ORDER BY id ASC; +DEBUG: generating subplan XXX_1 for subquery SELECT result FROM functions_in_joins.next_k_integers(3, 2) next_integers(result) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, next_integers.result FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.result FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(result integer)) next_integers ON ((table1.id OPERATOR(pg_catalog.=) next_integers.result))) ORDER BY table1.id + id | data | result +--------------------------------------------------------------------- + 3 | 9 | 3 + 4 | 16 | 4 +(2 rows) + +-- a function returning set of records +CREATE FUNCTION get_set_of_records() RETURNS SETOF RECORD AS $cmd$ +SELECT x, x+1 FROM generate_series(0,4) f(x) +$cmd$ +LANGUAGE SQL; +SELECT * FROM table1 JOIN get_set_of_records() AS t2(x int, y int) ON (id = x) ORDER BY id ASC; +DEBUG: generating subplan XXX_1 for subquery SELECT x, y FROM functions_in_joins.get_set_of_records() t2(x integer, y integer) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, t2.x, t2.y FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) t2 ON ((table1.id OPERATOR(pg_catalog.=) t2.x))) ORDER BY table1.id + id | data | x | y +--------------------------------------------------------------------- + 1 | 1 | 1 | 2 + 2 | 4 | 2 | 3 + 3 | 9 | 3 | 4 + 4 | 16 | 4 | 5 +(4 rows) + +-- a function returning table +CREATE FUNCTION dup(int) RETURNS TABLE(f1 int, f2 text) +AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ +LANGUAGE SQL; +SELECT f.* FROM table1 t JOIN dup(32) f ON (f1 = id); +DEBUG: generating subplan XXX_1 for subquery SELECT f1, f2 FROM functions_in_joins.dup(32) f(f1, f2) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f.f1, f.f2 FROM (functions_in_joins.table1 t JOIN (SELECT intermediate_result.f1, intermediate_result.f2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(f1 integer, f2 text)) f ON ((f.f1 OPERATOR(pg_catalog.=) t.id))) + f1 | f2 +--------------------------------------------------------------------- + 32 | 32 is text +(1 row) + +-- a stable function +CREATE OR REPLACE FUNCTION the_minimum_id() + RETURNS INTEGER STABLE AS 'SELECT min(id) FROM table1' LANGUAGE SQL; +SELECT * FROM table1 JOIN the_minimum_id() min_id ON (id = min_id); +DEBUG: generating subplan XXX_1 for subquery SELECT min_id FROM functions_in_joins.the_minimum_id() min_id(min_id) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, min_id.min_id FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.min_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(min_id integer)) min_id ON ((table1.id OPERATOR(pg_catalog.=) min_id.min_id))) + id | data | min_id +--------------------------------------------------------------------- + 1 | 1 | 1 +(1 row) + +-- a built-in immutable function +SELECT * FROM table1 JOIN abs(100) as hundred ON (id = hundred) ORDER BY id ASC; + id | data | hundred +--------------------------------------------------------------------- + 100 | 10000 | 100 +(1 row) + +-- function joins inside a CTE +WITH next_row_to_process AS ( + SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n) + ) +SELECT * +FROM table1, next_row_to_process +WHERE table1.data <= next_row_to_process.data +ORDER BY 1,2 ASC; +DEBUG: generating subplan XXX_1 for CTE next_row_to_process: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN nextval('functions_in_joins.numbers'::regclass) n(n) ON ((table1.id OPERATOR(pg_catalog.=) n.n))) +DEBUG: generating subplan XXX_1 for subquery SELECT n FROM nextval('functions_in_joins.numbers'::regclass) n(n) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.n FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(n bigint)) n ON ((table1.id OPERATOR(pg_catalog.=) n.n))) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, next_row_to_process.id, next_row_to_process.data, next_row_to_process.n FROM functions_in_joins.table1, (SELECT intermediate_result.id, intermediate_result.data, intermediate_result.n FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, data integer, n bigint)) next_row_to_process WHERE (table1.data OPERATOR(pg_catalog.<=) next_row_to_process.data) ORDER BY table1.id, table1.data + id | data | id | data | n +--------------------------------------------------------------------- + 1 | 1 | 2 | 4 | 2 + 2 | 4 | 2 | 4 | 2 +(2 rows) + +-- Multiple functions in an RTE +SELECT * FROM ROWS FROM (next_k_integers(5), next_k_integers(10)) AS f(a, b), + table1 WHERE id = a ORDER BY id ASC; +DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM ROWS FROM(functions_in_joins.next_k_integers(5), functions_in_joins.next_k_integers(10)) f(a, b) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f.a, f.b, table1.id, table1.data FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) f(a, b), functions_in_joins.table1 WHERE (table1.id OPERATOR(pg_catalog.=) f.a) ORDER BY table1.id + a | b | id | data +--------------------------------------------------------------------- + 5 | 10 | 5 | 25 + 6 | 11 | 6 | 36 + 7 | 12 | 7 | 49 +(3 rows) + +-- Custom Type returning function used in a join +RESET client_min_messages; +CREATE TYPE min_and_max AS ( + minimum INT, + maximum INT +); +SET client_min_messages TO DEBUG1; +CREATE OR REPLACE FUNCTION max_and_min () RETURNS + min_and_max AS $$ +DECLARE + result min_and_max%rowtype; +begin + select into result min(data) as minimum, max(data) as maximum from table1; + return result; +end; +$$ language plpgsql; +SELECT * FROM table1 JOIN max_and_min() m ON (m.maximum = data OR m.minimum = data) ORDER BY 1,2,3,4; +DEBUG: generating subplan XXX_1 for subquery SELECT minimum, maximum FROM functions_in_joins.max_and_min() m(minimum, maximum) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, m.minimum, m.maximum FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.minimum, intermediate_result.maximum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(minimum integer, maximum integer)) m ON (((m.maximum OPERATOR(pg_catalog.=) table1.data) OR (m.minimum OPERATOR(pg_catalog.=) table1.data)))) ORDER BY table1.id, table1.data, m.minimum, m.maximum + id | data | minimum | maximum +--------------------------------------------------------------------- + 1 | 1 | 1 | 10000 + 100 | 10000 | 1 | 10000 +(2 rows) + +-- The following tests will fail as we do not support all joins on +-- all kinds of functions +-- In other words, we cannot recursively plan the functions and hence +-- the query fails on the workers +SET client_min_messages TO ERROR; +\set VERBOSITY terse +-- function joins in CTE results can create lateral joins that are not supported +-- we execute the query within a function to consolidate the error messages +-- between different executors +CREATE FUNCTION raise_failed_execution_func_join(query text) RETURNS void AS $$ +BEGIN + EXECUTE query; + EXCEPTION WHEN OTHERS THEN + IF SQLERRM LIKE 'failed to execute task%' THEN + RAISE 'Task failed to execute'; + ELSIF SQLERRM LIKE '%does not exist%' THEN + RAISE 'Task failed to execute'; + END IF; +END; +$$LANGUAGE plpgsql; +SELECT raise_failed_execution_func_join($$ + WITH one_row AS ( + SELECT * FROM table1 WHERE id=52 + ) + SELECT table1.id, table1.data + FROM one_row, table1, next_k_integers(one_row.id, 5) next_five_ids + WHERE table1.id = next_five_ids; +$$); +ERROR: Task failed to execute +-- a user-defined immutable function +CREATE OR REPLACE FUNCTION the_answer_to_life() + RETURNS INTEGER IMMUTABLE AS 'SELECT 42' LANGUAGE SQL; +SELECT raise_failed_execution_func_join($$ + SELECT * FROM table1 JOIN the_answer_to_life() the_answer ON (id = the_answer); +$$); +ERROR: Task failed to execute +SELECT raise_failed_execution_func_join($$ + SELECT * + FROM table1 + JOIN next_k_integers(10,5) WITH ORDINALITY next_integers + ON (id = next_integers.result); +$$); +ERROR: Task failed to execute +-- WITH ORDINALITY clause +SELECT raise_failed_execution_func_join($$ + SELECT * + FROM table1 + JOIN next_k_integers(10,5) WITH ORDINALITY next_integers + ON (id = next_integers.result) + ORDER BY id ASC; +$$); +ERROR: Task failed to execute +RESET client_min_messages; +DROP SCHEMA functions_in_joins CASCADE; +NOTICE: drop cascades to 12 other objects +SET search_path TO DEFAULT; diff --git a/src/test/regress/sql/multi_function_in_join.sql b/src/test/regress/sql/multi_function_in_join.sql index a1f9c9e87..2fb7fdf18 100644 --- a/src/test/regress/sql/multi_function_in_join.sql +++ b/src/test/regress/sql/multi_function_in_join.sql @@ -13,6 +13,8 @@ CREATE SCHEMA functions_in_joins; SET search_path TO 'functions_in_joins'; SET citus.next_shard_id TO 2500000; +SET citus.replication_model to 'streaming'; +SET citus.shard_replication_factor to 1; CREATE TABLE table1 (id int, data int); SELECT create_distributed_table('table1','id'); @@ -32,6 +34,7 @@ SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n) ORDER BY id ASC; CREATE FUNCTION add(integer, integer) RETURNS integer AS 'SELECT $1 + $2;' LANGUAGE SQL; +SELECT create_distributed_function('add(integer,integer)'); SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC; -- Check join of plpgsql functions