diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 2ae9fdde3..3ab0c71c8 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -303,17 +303,6 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) return NULL; } - if (localGroupId != COORDINATOR_GROUP_ID) - { - /* - * We are calling a distributed function on a worker node. We currently - * only delegate from the coordinator. - * - * TODO: remove this restriction. - */ - return NULL; - } - /* * Cannot delegate functions for INSERT ... SELECT func(), since they require * coordinated transactions. diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index e77e0c3b5..d48f001bf 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -74,6 +74,11 @@ LANGUAGE plpgsql AS $$ BEGIN y := x + y * 2; END;$$; +CREATE FUNCTION mx_call_func_bigint_force(x bigint, INOUT y bigint) +LANGUAGE plpgsql AS $$ +BEGIN + PERFORM multi_mx_function_call_delegation.mx_call_func_bigint(x, y); +END;$$; -- create another function which verifies: -- 1. we work fine with multiple return columns -- 2. we work fine in combination with custom types @@ -193,12 +198,6 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass (1 row) -select colocate_proc_with_table('mx_call_func_bigint', 'mx_call_dist_table_bigint'::regclass, 1); - colocate_proc_with_table ---------------------------------------------------------------------- - -(1 row) - select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1); colocate_proc_with_table --------------------------------------------------------------------- @@ -211,6 +210,26 @@ select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0); (1 row) +select create_distributed_function('mx_call_func_bigint(bigint,bigint)', 'x', + colocate_with := 'mx_call_dist_table_bigint'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +-- set up a force_delegation function +select create_distributed_function('mx_call_func_bigint_force(bigint,bigint)', 'x', + colocate_with := 'mx_call_dist_table_2', + force_delegation := true); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + select mx_call_func(2, 0); DEBUG: pushing down the function call mx_call_func @@ -748,6 +767,16 @@ HINT: Connect to the coordinator and run it again. -- show that functions can be delegated from worker nodes SET client_min_messages TO DEBUG1; SELECT mx_call_func(2, 0); +DEBUG: pushing down the function call + mx_call_func +--------------------------------------------------------------------- + 28 +(1 row) + +-- not delegated in a transaction block +BEGIN; +SELECT mx_call_func(2, 0); +DEBUG: not pushing down function calls in a multi-statement transaction DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line XX at assignment @@ -759,9 +788,52 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment 28 (1 row) +END; +-- not delegated in a DO block +DO $$ +BEGIN + PERFORM mx_call_func(2, 0); +END; +$$ LANGUAGE plpgsql; +DEBUG: not pushing down function calls in a multi-statement transaction +CONTEXT: SQL statement "SELECT mx_call_func(2, 0)" +PL/pgSQL function inline_code_block line XX at PERFORM +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment +SQL statement "SELECT mx_call_func(2, 0)" +PL/pgSQL function inline_code_block line XX at PERFORM +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment +SQL statement "SELECT mx_call_func(2, 0)" +PL/pgSQL function inline_code_block line XX at PERFORM +-- forced calls are delegated in a transaction block +BEGIN; +SELECT mx_call_func_bigint_force(4, 2); +DEBUG: pushing down function call in a multi-statement transaction +DEBUG: pushing down the function call + mx_call_func_bigint_force +--------------------------------------------------------------------- + 2 +(1 row) + +END; +-- forced calls are delegated in a DO block +DO $$ +BEGIN + PERFORM * FROM mx_call_func_bigint_force(4, 2); +END; +$$ LANGUAGE plpgsql; +DEBUG: pushing down function call in a multi-statement transaction +CONTEXT: SQL statement "SELECT * FROM mx_call_func_bigint_force(4, 2)" +PL/pgSQL function inline_code_block line XX at PERFORM +DEBUG: pushing down the function call +CONTEXT: SQL statement "SELECT * FROM mx_call_func_bigint_force(4, 2)" +PL/pgSQL function inline_code_block line XX at PERFORM \c - - - :master_port SET search_path TO multi_mx_function_call_delegation, public; RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA multi_mx_function_call_delegation CASCADE; -NOTICE: drop cascades to 15 other objects +NOTICE: drop cascades to 16 other objects diff --git a/src/test/regress/expected/multi_mx_function_call_delegation_0.out b/src/test/regress/expected/multi_mx_function_call_delegation_0.out index 657183bc2..06a7b320d 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation_0.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation_0.out @@ -74,6 +74,11 @@ LANGUAGE plpgsql AS $$ BEGIN y := x + y * 2; END;$$; +CREATE FUNCTION mx_call_func_bigint_force(x bigint, INOUT y bigint) +LANGUAGE plpgsql AS $$ +BEGIN + PERFORM multi_mx_function_call_delegation.mx_call_func_bigint(x, y); +END;$$; -- create another function which verifies: -- 1. we work fine with multiple return columns -- 2. we work fine in combination with custom types @@ -193,12 +198,6 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass (1 row) -select colocate_proc_with_table('mx_call_func_bigint', 'mx_call_dist_table_bigint'::regclass, 1); - colocate_proc_with_table ---------------------------------------------------------------------- - -(1 row) - select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1); colocate_proc_with_table --------------------------------------------------------------------- @@ -211,6 +210,26 @@ select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0); (1 row) +select create_distributed_function('mx_call_func_bigint(bigint,bigint)', 'x', + colocate_with := 'mx_call_dist_table_bigint'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +-- set up a force_delegation function +select create_distributed_function('mx_call_func_bigint_force(bigint,bigint)', 'x', + colocate_with := 'mx_call_dist_table_2', + force_delegation := true); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + select mx_call_func(2, 0); DEBUG: pushing down the function call mx_call_func @@ -748,6 +767,16 @@ HINT: Connect to the coordinator and run it again. -- show that functions can be delegated from worker nodes SET client_min_messages TO DEBUG1; SELECT mx_call_func(2, 0); +DEBUG: pushing down the function call + mx_call_func +--------------------------------------------------------------------- + 28 +(1 row) + +-- not delegated in a transaction block +BEGIN; +SELECT mx_call_func(2, 0); +DEBUG: not pushing down function calls in a multi-statement transaction DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line XX at assignment @@ -759,9 +788,52 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment 28 (1 row) +END; +-- not delegated in a DO block +DO $$ +BEGIN + PERFORM mx_call_func(2, 0); +END; +$$ LANGUAGE plpgsql; +DEBUG: not pushing down function calls in a multi-statement transaction +CONTEXT: SQL statement "SELECT mx_call_func(2, 0)" +PL/pgSQL function inline_code_block line XX at PERFORM +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment +SQL statement "SELECT mx_call_func(2, 0)" +PL/pgSQL function inline_code_block line XX at PERFORM +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment +SQL statement "SELECT mx_call_func(2, 0)" +PL/pgSQL function inline_code_block line XX at PERFORM +-- forced calls are delegated in a transaction block +BEGIN; +SELECT mx_call_func_bigint_force(4, 2); +DEBUG: pushing down function call in a multi-statement transaction +DEBUG: pushing down the function call + mx_call_func_bigint_force +--------------------------------------------------------------------- + 2 +(1 row) + +END; +-- forced calls are delegated in a DO block +DO $$ +BEGIN + PERFORM * FROM mx_call_func_bigint_force(4, 2); +END; +$$ LANGUAGE plpgsql; +DEBUG: pushing down function call in a multi-statement transaction +CONTEXT: SQL statement "SELECT * FROM mx_call_func_bigint_force(4, 2)" +PL/pgSQL function inline_code_block line XX at PERFORM +DEBUG: pushing down the function call +CONTEXT: SQL statement "SELECT * FROM mx_call_func_bigint_force(4, 2)" +PL/pgSQL function inline_code_block line XX at PERFORM \c - - - :master_port SET search_path TO multi_mx_function_call_delegation, public; RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA multi_mx_function_call_delegation CASCADE; -NOTICE: drop cascades to 15 other objects +NOTICE: drop cascades to 16 other objects diff --git a/src/test/regress/sql/multi_mx_function_call_delegation.sql b/src/test/regress/sql/multi_mx_function_call_delegation.sql index 4dfe91322..206969456 100644 --- a/src/test/regress/sql/multi_mx_function_call_delegation.sql +++ b/src/test/regress/sql/multi_mx_function_call_delegation.sql @@ -57,6 +57,12 @@ BEGIN y := x + y * 2; END;$$; +CREATE FUNCTION mx_call_func_bigint_force(x bigint, INOUT y bigint) +LANGUAGE plpgsql AS $$ +BEGIN + PERFORM multi_mx_function_call_delegation.mx_call_func_bigint(x, y); +END;$$; + -- create another function which verifies: -- 1. we work fine with multiple return columns -- 2. we work fine in combination with custom types @@ -104,10 +110,17 @@ select mx_call_func_custom_types('S', 'A'); -- Mark them as colocated with a table. Now we should route them to workers. select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1); -select colocate_proc_with_table('mx_call_func_bigint', 'mx_call_dist_table_bigint'::regclass, 1); select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1); select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0); +select create_distributed_function('mx_call_func_bigint(bigint,bigint)', 'x', + colocate_with := 'mx_call_dist_table_bigint'); + +-- set up a force_delegation function +select create_distributed_function('mx_call_func_bigint_force(bigint,bigint)', 'x', + colocate_with := 'mx_call_dist_table_2', + force_delegation := true); + select mx_call_func(2, 0); select mx_call_func_bigint(4, 2); select mx_call_func_custom_types('S', 'A'); @@ -294,6 +307,30 @@ select create_distributed_function('mx_call_func(int,int)'); SET client_min_messages TO DEBUG1; SELECT mx_call_func(2, 0); +-- not delegated in a transaction block +BEGIN; +SELECT mx_call_func(2, 0); +END; + +-- not delegated in a DO block +DO $$ +BEGIN + PERFORM mx_call_func(2, 0); +END; +$$ LANGUAGE plpgsql; + +-- forced calls are delegated in a transaction block +BEGIN; +SELECT mx_call_func_bigint_force(4, 2); +END; + +-- forced calls are delegated in a DO block +DO $$ +BEGIN + PERFORM * FROM mx_call_func_bigint_force(4, 2); +END; +$$ LANGUAGE plpgsql; + \c - - - :master_port SET search_path TO multi_mx_function_call_delegation, public;