Add alternative output for multi_function_in_join

With pg13, constants functions from "FROM" clause are replaced. This
means that in citus side, we will see the constraints in restriction
info, instead of the function call. For example:

SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC;

Assuming that the function `add` returns constant, it will be evaluated
on postgres side. This means that this query will be routable because
there will be only one shard after pruning with the restrictions.
However before pg13, this would be multi shard query. And it would go
into recursive planning, the function would be evaluated on the
coordinator because it can be.

This means that with pg13, users will need to distribute the function
because when it is routable executable, it will currently also send the
function call to the worker in the query. So the function should exist
in the worker.

It could be better to replace the constant in the query tree as well so
that the query string sent to the worker has the constant value and
therefore it doesn't need the function. However I feel like users would
already have the function in workers if they have any multi shard query.

Commit on Postgres side:
7266d0997dd2a0632da38a594c78e25ff21df67e
pull/3900/head
Sait Talha Nisanci 2020-06-11 10:34:11 +03:00
parent a34a1126ec
commit 6ff4e42706
3 changed files with 263 additions and 2 deletions

View File

@ -12,6 +12,8 @@
CREATE SCHEMA functions_in_joins; CREATE SCHEMA functions_in_joins;
SET search_path TO 'functions_in_joins'; SET search_path TO 'functions_in_joins';
SET citus.next_shard_id TO 2500000; SET citus.next_shard_id TO 2500000;
SET citus.replication_model to 'streaming';
SET citus.shard_replication_factor to 1;
CREATE TABLE table1 (id int, data int); CREATE TABLE table1 (id int, data int);
SELECT create_distributed_table('table1','id'); SELECT create_distributed_table('table1','id');
create_distributed_table create_distributed_table
@ -38,9 +40,15 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, ta
CREATE FUNCTION add(integer, integer) RETURNS integer CREATE FUNCTION add(integer, integer) RETURNS integer
AS 'SELECT $1 + $2;' AS 'SELECT $1 + $2;'
LANGUAGE SQL; LANGUAGE SQL;
SELECT create_distributed_function('add(integer,integer)');
DEBUG: switching to sequential query execution mode
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC; SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC;
DEBUG: generating subplan XXX_1 for subquery SELECT sum FROM functions_in_joins.add(3, 5) sum(sum)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, sum.sum FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum integer)) sum ON ((table1.id OPERATOR(pg_catalog.=) sum.sum))) ORDER BY table1.id
id | data | sum id | data | sum
--------------------------------------------------------------------- ---------------------------------------------------------------------
8 | 64 | 8 8 | 64 | 8

View File

@ -0,0 +1,250 @@
--
-- multi function in join queries aims to test the function calls that are
-- used in joins.
--
-- These functions are supposed to be executed on the worker and to ensure
-- that we wrap those functions inside (SELECT * FROM fnc()) sub queries.
--
-- We do not yet support those functions that:
-- - have lateral joins
-- - have WITH ORDINALITY clause
-- - are user-defined and immutable
CREATE SCHEMA functions_in_joins;
SET search_path TO 'functions_in_joins';
SET citus.next_shard_id TO 2500000;
SET citus.replication_model to 'streaming';
SET citus.shard_replication_factor to 1;
CREATE TABLE table1 (id int, data int);
SELECT create_distributed_table('table1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table1
SELECT x, x*x
from generate_series(1, 100) as f (x);
-- Verbose messages for observing the subqueries that wrapped function calls
SET client_min_messages TO DEBUG1;
-- Check joins on a sequence
CREATE SEQUENCE numbers;
SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n) ORDER BY id ASC;
DEBUG: generating subplan XXX_1 for subquery SELECT n FROM nextval('functions_in_joins.numbers'::regclass) n(n)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.n FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(n bigint)) n ON ((table1.id OPERATOR(pg_catalog.=) n.n))) ORDER BY table1.id
id | data | n
---------------------------------------------------------------------
1 | 1 | 1
(1 row)
-- Check joins of a function that returns a single integer
CREATE FUNCTION add(integer, integer) RETURNS integer
AS 'SELECT $1 + $2;'
LANGUAGE SQL;
SELECT create_distributed_function('add(integer,integer)');
DEBUG: switching to sequential query execution mode
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC;
DEBUG: generating subplan XXX_1 for subquery SELECT sum FROM functions_in_joins.add(3, 5) sum(sum)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, sum.sum FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum integer)) sum ON ((table1.id OPERATOR(pg_catalog.=) sum.sum))) ORDER BY table1.id
id | data | sum
---------------------------------------------------------------------
8 | 64 | 8
(1 row)
-- Check join of plpgsql functions
-- a function returning a single integer
CREATE OR REPLACE FUNCTION increment(i integer) RETURNS integer AS $$
BEGIN
RETURN i + 1;
END;
$$ LANGUAGE plpgsql;
SELECT * FROM table1 JOIN increment(2) val ON (id = val) ORDER BY id ASC;
DEBUG: generating subplan XXX_1 for subquery SELECT val FROM functions_in_joins.increment(2) val(val)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, val.val FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.val FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(val integer)) val ON ((table1.id OPERATOR(pg_catalog.=) val.val))) ORDER BY table1.id
id | data | val
---------------------------------------------------------------------
3 | 9 | 3
(1 row)
-- a function that returns a set of integers
CREATE OR REPLACE FUNCTION next_k_integers(IN first_value INTEGER,
IN k INTEGER DEFAULT 3,
OUT result INTEGER)
RETURNS SETOF INTEGER AS $$
BEGIN
RETURN QUERY SELECT x FROM generate_series(first_value, first_value+k-1) f(x);
END;
$$ LANGUAGE plpgsql;
SELECT *
FROM table1 JOIN next_k_integers(3,2) next_integers ON (id = next_integers.result)
ORDER BY id ASC;
DEBUG: generating subplan XXX_1 for subquery SELECT result FROM functions_in_joins.next_k_integers(3, 2) next_integers(result)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, next_integers.result FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.result FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(result integer)) next_integers ON ((table1.id OPERATOR(pg_catalog.=) next_integers.result))) ORDER BY table1.id
id | data | result
---------------------------------------------------------------------
3 | 9 | 3
4 | 16 | 4
(2 rows)
-- a function returning set of records
CREATE FUNCTION get_set_of_records() RETURNS SETOF RECORD AS $cmd$
SELECT x, x+1 FROM generate_series(0,4) f(x)
$cmd$
LANGUAGE SQL;
SELECT * FROM table1 JOIN get_set_of_records() AS t2(x int, y int) ON (id = x) ORDER BY id ASC;
DEBUG: generating subplan XXX_1 for subquery SELECT x, y FROM functions_in_joins.get_set_of_records() t2(x integer, y integer)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, t2.x, t2.y FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) t2 ON ((table1.id OPERATOR(pg_catalog.=) t2.x))) ORDER BY table1.id
id | data | x | y
---------------------------------------------------------------------
1 | 1 | 1 | 2
2 | 4 | 2 | 3
3 | 9 | 3 | 4
4 | 16 | 4 | 5
(4 rows)
-- a function returning table
CREATE FUNCTION dup(int) RETURNS TABLE(f1 int, f2 text)
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
LANGUAGE SQL;
SELECT f.* FROM table1 t JOIN dup(32) f ON (f1 = id);
DEBUG: generating subplan XXX_1 for subquery SELECT f1, f2 FROM functions_in_joins.dup(32) f(f1, f2)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f.f1, f.f2 FROM (functions_in_joins.table1 t JOIN (SELECT intermediate_result.f1, intermediate_result.f2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(f1 integer, f2 text)) f ON ((f.f1 OPERATOR(pg_catalog.=) t.id)))
f1 | f2
---------------------------------------------------------------------
32 | 32 is text
(1 row)
-- a stable function
CREATE OR REPLACE FUNCTION the_minimum_id()
RETURNS INTEGER STABLE AS 'SELECT min(id) FROM table1' LANGUAGE SQL;
SELECT * FROM table1 JOIN the_minimum_id() min_id ON (id = min_id);
DEBUG: generating subplan XXX_1 for subquery SELECT min_id FROM functions_in_joins.the_minimum_id() min_id(min_id)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, min_id.min_id FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.min_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(min_id integer)) min_id ON ((table1.id OPERATOR(pg_catalog.=) min_id.min_id)))
id | data | min_id
---------------------------------------------------------------------
1 | 1 | 1
(1 row)
-- a built-in immutable function
SELECT * FROM table1 JOIN abs(100) as hundred ON (id = hundred) ORDER BY id ASC;
id | data | hundred
---------------------------------------------------------------------
100 | 10000 | 100
(1 row)
-- function joins inside a CTE
WITH next_row_to_process AS (
SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n)
)
SELECT *
FROM table1, next_row_to_process
WHERE table1.data <= next_row_to_process.data
ORDER BY 1,2 ASC;
DEBUG: generating subplan XXX_1 for CTE next_row_to_process: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN nextval('functions_in_joins.numbers'::regclass) n(n) ON ((table1.id OPERATOR(pg_catalog.=) n.n)))
DEBUG: generating subplan XXX_1 for subquery SELECT n FROM nextval('functions_in_joins.numbers'::regclass) n(n)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, n.n FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.n FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(n bigint)) n ON ((table1.id OPERATOR(pg_catalog.=) n.n)))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, next_row_to_process.id, next_row_to_process.data, next_row_to_process.n FROM functions_in_joins.table1, (SELECT intermediate_result.id, intermediate_result.data, intermediate_result.n FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, data integer, n bigint)) next_row_to_process WHERE (table1.data OPERATOR(pg_catalog.<=) next_row_to_process.data) ORDER BY table1.id, table1.data
id | data | id | data | n
---------------------------------------------------------------------
1 | 1 | 2 | 4 | 2
2 | 4 | 2 | 4 | 2
(2 rows)
-- Multiple functions in an RTE
SELECT * FROM ROWS FROM (next_k_integers(5), next_k_integers(10)) AS f(a, b),
table1 WHERE id = a ORDER BY id ASC;
DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM ROWS FROM(functions_in_joins.next_k_integers(5), functions_in_joins.next_k_integers(10)) f(a, b)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f.a, f.b, table1.id, table1.data FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) f(a, b), functions_in_joins.table1 WHERE (table1.id OPERATOR(pg_catalog.=) f.a) ORDER BY table1.id
a | b | id | data
---------------------------------------------------------------------
5 | 10 | 5 | 25
6 | 11 | 6 | 36
7 | 12 | 7 | 49
(3 rows)
-- Custom Type returning function used in a join
RESET client_min_messages;
CREATE TYPE min_and_max AS (
minimum INT,
maximum INT
);
SET client_min_messages TO DEBUG1;
CREATE OR REPLACE FUNCTION max_and_min () RETURNS
min_and_max AS $$
DECLARE
result min_and_max%rowtype;
begin
select into result min(data) as minimum, max(data) as maximum from table1;
return result;
end;
$$ language plpgsql;
SELECT * FROM table1 JOIN max_and_min() m ON (m.maximum = data OR m.minimum = data) ORDER BY 1,2,3,4;
DEBUG: generating subplan XXX_1 for subquery SELECT minimum, maximum FROM functions_in_joins.max_and_min() m(minimum, maximum)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table1.id, table1.data, m.minimum, m.maximum FROM (functions_in_joins.table1 JOIN (SELECT intermediate_result.minimum, intermediate_result.maximum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(minimum integer, maximum integer)) m ON (((m.maximum OPERATOR(pg_catalog.=) table1.data) OR (m.minimum OPERATOR(pg_catalog.=) table1.data)))) ORDER BY table1.id, table1.data, m.minimum, m.maximum
id | data | minimum | maximum
---------------------------------------------------------------------
1 | 1 | 1 | 10000
100 | 10000 | 1 | 10000
(2 rows)
-- The following tests will fail as we do not support all joins on
-- all kinds of functions
-- In other words, we cannot recursively plan the functions and hence
-- the query fails on the workers
SET client_min_messages TO ERROR;
\set VERBOSITY terse
-- function joins in CTE results can create lateral joins that are not supported
-- we execute the query within a function to consolidate the error messages
-- between different executors
CREATE FUNCTION raise_failed_execution_func_join(query text) RETURNS void AS $$
BEGIN
EXECUTE query;
EXCEPTION WHEN OTHERS THEN
IF SQLERRM LIKE 'failed to execute task%' THEN
RAISE 'Task failed to execute';
ELSIF SQLERRM LIKE '%does not exist%' THEN
RAISE 'Task failed to execute';
END IF;
END;
$$LANGUAGE plpgsql;
SELECT raise_failed_execution_func_join($$
WITH one_row AS (
SELECT * FROM table1 WHERE id=52
)
SELECT table1.id, table1.data
FROM one_row, table1, next_k_integers(one_row.id, 5) next_five_ids
WHERE table1.id = next_five_ids;
$$);
ERROR: Task failed to execute
-- a user-defined immutable function
CREATE OR REPLACE FUNCTION the_answer_to_life()
RETURNS INTEGER IMMUTABLE AS 'SELECT 42' LANGUAGE SQL;
SELECT raise_failed_execution_func_join($$
SELECT * FROM table1 JOIN the_answer_to_life() the_answer ON (id = the_answer);
$$);
ERROR: Task failed to execute
SELECT raise_failed_execution_func_join($$
SELECT *
FROM table1
JOIN next_k_integers(10,5) WITH ORDINALITY next_integers
ON (id = next_integers.result);
$$);
ERROR: Task failed to execute
-- WITH ORDINALITY clause
SELECT raise_failed_execution_func_join($$
SELECT *
FROM table1
JOIN next_k_integers(10,5) WITH ORDINALITY next_integers
ON (id = next_integers.result)
ORDER BY id ASC;
$$);
ERROR: Task failed to execute
RESET client_min_messages;
DROP SCHEMA functions_in_joins CASCADE;
NOTICE: drop cascades to 12 other objects
SET search_path TO DEFAULT;

View File

@ -13,6 +13,8 @@
CREATE SCHEMA functions_in_joins; CREATE SCHEMA functions_in_joins;
SET search_path TO 'functions_in_joins'; SET search_path TO 'functions_in_joins';
SET citus.next_shard_id TO 2500000; SET citus.next_shard_id TO 2500000;
SET citus.replication_model to 'streaming';
SET citus.shard_replication_factor to 1;
CREATE TABLE table1 (id int, data int); CREATE TABLE table1 (id int, data int);
SELECT create_distributed_table('table1','id'); SELECT create_distributed_table('table1','id');
@ -32,6 +34,7 @@ SELECT * FROM table1 JOIN nextval('numbers') n ON (id = n) ORDER BY id ASC;
CREATE FUNCTION add(integer, integer) RETURNS integer CREATE FUNCTION add(integer, integer) RETURNS integer
AS 'SELECT $1 + $2;' AS 'SELECT $1 + $2;'
LANGUAGE SQL; LANGUAGE SQL;
SELECT create_distributed_function('add(integer,integer)');
SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC; SELECT * FROM table1 JOIN add(3,5) sum ON (id = sum) ORDER BY id ASC;
-- Check join of plpgsql functions -- Check join of plpgsql functions