Enable function call pushdown from workers

pull/5660/head
Marco Slot 2022-01-27 18:39:52 +01:00
parent f712dfc558
commit 63c6896716
4 changed files with 196 additions and 26 deletions

View File

@ -303,17 +303,6 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
return NULL;
}
if (localGroupId != COORDINATOR_GROUP_ID)
{
/*
* We are calling a distributed function on a worker node. We currently
* only delegate from the coordinator.
*
* TODO: remove this restriction.
*/
return NULL;
}
/*
* Cannot delegate functions for INSERT ... SELECT func(), since they require
* coordinated transactions.

View File

@ -74,6 +74,11 @@ LANGUAGE plpgsql AS $$
BEGIN
y := x + y * 2;
END;$$;
CREATE FUNCTION mx_call_func_bigint_force(x bigint, INOUT y bigint)
LANGUAGE plpgsql AS $$
BEGIN
PERFORM multi_mx_function_call_delegation.mx_call_func_bigint(x, y);
END;$$;
-- create another function which verifies:
-- 1. we work fine with multiple return columns
-- 2. we work fine in combination with custom types
@ -193,12 +198,6 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass
(1 row)
select colocate_proc_with_table('mx_call_func_bigint', 'mx_call_dist_table_bigint'::regclass, 1);
colocate_proc_with_table
---------------------------------------------------------------------
(1 row)
select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
colocate_proc_with_table
---------------------------------------------------------------------
@ -211,6 +210,26 @@ select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0);
(1 row)
select create_distributed_function('mx_call_func_bigint(bigint,bigint)', 'x',
colocate_with := 'mx_call_dist_table_bigint');
DEBUG: switching to sequential query execution mode
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- set up a force_delegation function
select create_distributed_function('mx_call_func_bigint_force(bigint,bigint)', 'x',
colocate_with := 'mx_call_dist_table_2',
force_delegation := true);
DEBUG: switching to sequential query execution mode
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
select mx_call_func(2, 0);
DEBUG: pushing down the function call
mx_call_func
@ -748,6 +767,16 @@ HINT: Connect to the coordinator and run it again.
-- show that functions can be delegated from worker nodes
SET client_min_messages TO DEBUG1;
SELECT mx_call_func(2, 0);
DEBUG: pushing down the function call
mx_call_func
---------------------------------------------------------------------
28
(1 row)
-- not delegated in a transaction block
BEGIN;
SELECT mx_call_func(2, 0);
DEBUG: not pushing down function calls in a multi-statement transaction
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
@ -759,9 +788,52 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
28
(1 row)
END;
-- not delegated in a DO block
DO $$
BEGIN
PERFORM mx_call_func(2, 0);
END;
$$ LANGUAGE plpgsql;
DEBUG: not pushing down function calls in a multi-statement transaction
CONTEXT: SQL statement "SELECT mx_call_func(2, 0)"
PL/pgSQL function inline_code_block line XX at PERFORM
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
SQL statement "SELECT mx_call_func(2, 0)"
PL/pgSQL function inline_code_block line XX at PERFORM
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
SQL statement "SELECT mx_call_func(2, 0)"
PL/pgSQL function inline_code_block line XX at PERFORM
-- forced calls are delegated in a transaction block
BEGIN;
SELECT mx_call_func_bigint_force(4, 2);
DEBUG: pushing down function call in a multi-statement transaction
DEBUG: pushing down the function call
mx_call_func_bigint_force
---------------------------------------------------------------------
2
(1 row)
END;
-- forced calls are delegated in a DO block
DO $$
BEGIN
PERFORM * FROM mx_call_func_bigint_force(4, 2);
END;
$$ LANGUAGE plpgsql;
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT * FROM mx_call_func_bigint_force(4, 2)"
PL/pgSQL function inline_code_block line XX at PERFORM
DEBUG: pushing down the function call
CONTEXT: SQL statement "SELECT * FROM mx_call_func_bigint_force(4, 2)"
PL/pgSQL function inline_code_block line XX at PERFORM
\c - - - :master_port
SET search_path TO multi_mx_function_call_delegation, public;
RESET client_min_messages;
\set VERBOSITY terse
DROP SCHEMA multi_mx_function_call_delegation CASCADE;
NOTICE: drop cascades to 15 other objects
NOTICE: drop cascades to 16 other objects

View File

@ -74,6 +74,11 @@ LANGUAGE plpgsql AS $$
BEGIN
y := x + y * 2;
END;$$;
CREATE FUNCTION mx_call_func_bigint_force(x bigint, INOUT y bigint)
LANGUAGE plpgsql AS $$
BEGIN
PERFORM multi_mx_function_call_delegation.mx_call_func_bigint(x, y);
END;$$;
-- create another function which verifies:
-- 1. we work fine with multiple return columns
-- 2. we work fine in combination with custom types
@ -193,12 +198,6 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass
(1 row)
select colocate_proc_with_table('mx_call_func_bigint', 'mx_call_dist_table_bigint'::regclass, 1);
colocate_proc_with_table
---------------------------------------------------------------------
(1 row)
select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
colocate_proc_with_table
---------------------------------------------------------------------
@ -211,6 +210,26 @@ select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0);
(1 row)
select create_distributed_function('mx_call_func_bigint(bigint,bigint)', 'x',
colocate_with := 'mx_call_dist_table_bigint');
DEBUG: switching to sequential query execution mode
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- set up a force_delegation function
select create_distributed_function('mx_call_func_bigint_force(bigint,bigint)', 'x',
colocate_with := 'mx_call_dist_table_2',
force_delegation := true);
DEBUG: switching to sequential query execution mode
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
select mx_call_func(2, 0);
DEBUG: pushing down the function call
mx_call_func
@ -748,6 +767,16 @@ HINT: Connect to the coordinator and run it again.
-- show that functions can be delegated from worker nodes
SET client_min_messages TO DEBUG1;
SELECT mx_call_func(2, 0);
DEBUG: pushing down the function call
mx_call_func
---------------------------------------------------------------------
28
(1 row)
-- not delegated in a transaction block
BEGIN;
SELECT mx_call_func(2, 0);
DEBUG: not pushing down function calls in a multi-statement transaction
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
@ -759,9 +788,52 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
28
(1 row)
END;
-- not delegated in a DO block
DO $$
BEGIN
PERFORM mx_call_func(2, 0);
END;
$$ LANGUAGE plpgsql;
DEBUG: not pushing down function calls in a multi-statement transaction
CONTEXT: SQL statement "SELECT mx_call_func(2, 0)"
PL/pgSQL function inline_code_block line XX at PERFORM
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
SQL statement "SELECT mx_call_func(2, 0)"
PL/pgSQL function inline_code_block line XX at PERFORM
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
SQL statement "SELECT mx_call_func(2, 0)"
PL/pgSQL function inline_code_block line XX at PERFORM
-- forced calls are delegated in a transaction block
BEGIN;
SELECT mx_call_func_bigint_force(4, 2);
DEBUG: pushing down function call in a multi-statement transaction
DEBUG: pushing down the function call
mx_call_func_bigint_force
---------------------------------------------------------------------
2
(1 row)
END;
-- forced calls are delegated in a DO block
DO $$
BEGIN
PERFORM * FROM mx_call_func_bigint_force(4, 2);
END;
$$ LANGUAGE plpgsql;
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT * FROM mx_call_func_bigint_force(4, 2)"
PL/pgSQL function inline_code_block line XX at PERFORM
DEBUG: pushing down the function call
CONTEXT: SQL statement "SELECT * FROM mx_call_func_bigint_force(4, 2)"
PL/pgSQL function inline_code_block line XX at PERFORM
\c - - - :master_port
SET search_path TO multi_mx_function_call_delegation, public;
RESET client_min_messages;
\set VERBOSITY terse
DROP SCHEMA multi_mx_function_call_delegation CASCADE;
NOTICE: drop cascades to 15 other objects
NOTICE: drop cascades to 16 other objects

View File

@ -57,6 +57,12 @@ BEGIN
y := x + y * 2;
END;$$;
CREATE FUNCTION mx_call_func_bigint_force(x bigint, INOUT y bigint)
LANGUAGE plpgsql AS $$
BEGIN
PERFORM multi_mx_function_call_delegation.mx_call_func_bigint(x, y);
END;$$;
-- create another function which verifies:
-- 1. we work fine with multiple return columns
-- 2. we work fine in combination with custom types
@ -104,10 +110,17 @@ select mx_call_func_custom_types('S', 'A');
-- Mark them as colocated with a table. Now we should route them to workers.
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1);
select colocate_proc_with_table('mx_call_func_bigint', 'mx_call_dist_table_bigint'::regclass, 1);
select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0);
select create_distributed_function('mx_call_func_bigint(bigint,bigint)', 'x',
colocate_with := 'mx_call_dist_table_bigint');
-- set up a force_delegation function
select create_distributed_function('mx_call_func_bigint_force(bigint,bigint)', 'x',
colocate_with := 'mx_call_dist_table_2',
force_delegation := true);
select mx_call_func(2, 0);
select mx_call_func_bigint(4, 2);
select mx_call_func_custom_types('S', 'A');
@ -294,6 +307,30 @@ select create_distributed_function('mx_call_func(int,int)');
SET client_min_messages TO DEBUG1;
SELECT mx_call_func(2, 0);
-- not delegated in a transaction block
BEGIN;
SELECT mx_call_func(2, 0);
END;
-- not delegated in a DO block
DO $$
BEGIN
PERFORM mx_call_func(2, 0);
END;
$$ LANGUAGE plpgsql;
-- forced calls are delegated in a transaction block
BEGIN;
SELECT mx_call_func_bigint_force(4, 2);
END;
-- forced calls are delegated in a DO block
DO $$
BEGIN
PERFORM * FROM mx_call_func_bigint_force(4, 2);
END;
$$ LANGUAGE plpgsql;
\c - - - :master_port
SET search_path TO multi_mx_function_call_delegation, public;