-- Test passing off function call to mx workers CREATE SCHEMA multi_mx_function_call_delegation; SET search_path TO multi_mx_function_call_delegation, public; SET citus.shard_replication_factor TO 2; -- This table requires specific settings, create before getting into things create table mx_call_dist_table_replica(id int, val int); select create_distributed_table('mx_call_dist_table_replica', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5); SET citus.shard_replication_factor TO 1; -- -- Create tables and functions we want to use in tests -- create table mx_call_dist_table_1(id int, val int); select create_distributed_table('mx_call_dist_table_1', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5); create table mx_call_dist_table_2(id int, val int); select create_distributed_table('mx_call_dist_table_2', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4); create table mx_call_dist_table_bigint(id bigint, val bigint); select create_distributed_table('mx_call_dist_table_bigint', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) insert into mx_call_dist_table_bigint values (1,1),(1,2),(2,2),(3,3),(3,4); create table mx_call_dist_table_ref(id int, val int); select create_reference_table('mx_call_dist_table_ref'); create_reference_table --------------------------------------------------------------------- (1 row) insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8); create type mx_call_enum as enum ('A', 'S', 'D', 'F'); create table mx_call_dist_table_enum(id int, key mx_call_enum); select create_distributed_table('mx_call_dist_table_enum', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F'); CREATE FUNCTION squares(int) RETURNS SETOF RECORD AS $$ SELECT i, i * i FROM generate_series(1, $1) i $$ LANGUAGE SQL; CREATE FUNCTION mx_call_func(x int, INOUT y int) LANGUAGE plpgsql AS $$ BEGIN -- groupid is 0 in coordinator and non-zero in workers, so by using it here -- we make sure the function is being executed in the worker. y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group); -- we also make sure that we can run distributed queries in the functions -- that are routed to the workers. y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id); END;$$; CREATE FUNCTION mx_call_func_bigint(x bigint, INOUT y bigint) LANGUAGE plpgsql AS $$ BEGIN y := x + y * 2; END;$$; -- create another function which verifies: -- 1. we work fine with multiple return columns -- 2. we work fine in combination with custom types CREATE FUNCTION mx_call_func_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum) LANGUAGE plpgsql AS $$ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; -- Test that undistributed functions have no issue executing select multi_mx_function_call_delegation.mx_call_func(2, 0); mx_call_func --------------------------------------------------------------------- 29 (1 row) select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); mx_call_func_custom_types --------------------------------------------------------------------- (F,S) (1 row) select squares(4); squares --------------------------------------------------------------------- (1,1) (2,4) (3,9) (4,16) (4 rows) -- Same for unqualified name select mx_call_func(2, 0); mx_call_func --------------------------------------------------------------------- 29 (1 row) -- Mark both functions as distributed ... select create_distributed_function('mx_call_func(int,int)'); create_distributed_function --------------------------------------------------------------------- (1 row) select create_distributed_function('mx_call_func_bigint(bigint,bigint)'); create_distributed_function --------------------------------------------------------------------- (1 row) select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)'); create_distributed_function --------------------------------------------------------------------- (1 row) select create_distributed_function('squares(int)'); create_distributed_function --------------------------------------------------------------------- (1 row) -- We still don't route them to the workers, because they aren't -- colocated with any distributed tables. SET client_min_messages TO DEBUG1; select mx_call_func(2, 0); DEBUG: function does not have co-located tables DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- 29 (1 row) select multi_mx_function_call_delegation.mx_call_func_bigint(4, 2); DEBUG: function does not have co-located tables mx_call_func_bigint --------------------------------------------------------------------- 8 (1 row) select mx_call_func_custom_types('S', 'A'); DEBUG: function does not have co-located tables mx_call_func_custom_types --------------------------------------------------------------------- (F,S) (1 row) -- Mark them as colocated with a table. Now we should route them to workers. select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1); colocate_proc_with_table --------------------------------------------------------------------- (1 row) select colocate_proc_with_table('mx_call_func_bigint', 'mx_call_dist_table_bigint'::regclass, 1); colocate_proc_with_table --------------------------------------------------------------------- (1 row) select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1); colocate_proc_with_table --------------------------------------------------------------------- (1 row) select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0); colocate_proc_with_table --------------------------------------------------------------------- (1 row) select mx_call_func(2, 0); DEBUG: pushing down the function call mx_call_func --------------------------------------------------------------------- 28 (1 row) select mx_call_func_bigint(4, 2); DEBUG: pushing down the function call mx_call_func_bigint --------------------------------------------------------------------- 8 (1 row) select mx_call_func_custom_types('S', 'A'); DEBUG: pushing down the function call mx_call_func_custom_types --------------------------------------------------------------------- (S,S) (1 row) select squares(4); DEBUG: pushing down the function call ERROR: input of anonymous composite types is not implemented select multi_mx_function_call_delegation.mx_call_func(2, 0); DEBUG: pushing down the function call mx_call_func --------------------------------------------------------------------- 28 (1 row) select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); DEBUG: pushing down the function call mx_call_func_custom_types --------------------------------------------------------------------- (S,S) (1 row) -- This is currently an undetected failure when using the binary protocol -- It should not be enabled by default until this is resolved. The tests above -- will fail too, when changing the default to TRUE; SET citus.enable_binary_protocol = TRUE; select mx_call_func_custom_types('S', 'A'); DEBUG: pushing down the function call ERROR: wrong data type: XXXX, expected XXXX select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); DEBUG: pushing down the function call ERROR: wrong data type: XXXX, expected XXXX RESET citus.enable_binary_protocol; -- We don't allow distributing calls inside transactions begin; select mx_call_func(2, 0); DEBUG: not pushing down function calls in a multi-statement transaction DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- 29 (1 row) commit; -- Drop the table colocated with mx_call_func_custom_types. Now it shouldn't -- be routed to workers anymore. SET client_min_messages TO NOTICE; drop table mx_call_dist_table_enum; SET client_min_messages TO DEBUG1; select mx_call_func_custom_types('S', 'A'); DEBUG: function does not have co-located tables mx_call_func_custom_types --------------------------------------------------------------------- (F,S) (1 row) -- Make sure we do bounds checking on distributed argument index -- This also tests that we have cache invalidation for pg_dist_object updates select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, -1); colocate_proc_with_table --------------------------------------------------------------------- (1 row) select mx_call_func(2, 0); DEBUG: cannot push down invalid distribution_argument_index DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- 29 (1 row) select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 2); colocate_proc_with_table --------------------------------------------------------------------- (1 row) select mx_call_func(2, 0); DEBUG: cannot push down invalid distribution_argument_index DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- 29 (1 row) -- We don't currently support colocating with reference tables select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_ref'::regclass, 1); colocate_proc_with_table --------------------------------------------------------------------- (1 row) select mx_call_func(2, 0); DEBUG: pushing down the function call mx_call_func --------------------------------------------------------------------- 28 (1 row) -- We don't currently support colocating with replicated tables select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_replica'::regclass, 1); colocate_proc_with_table --------------------------------------------------------------------- (1 row) select mx_call_func(2, 0); DEBUG: cannot push down function call for replicated distributed tables DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- 29 (1 row) SET client_min_messages TO NOTICE; drop table mx_call_dist_table_replica; SET client_min_messages TO DEBUG1; select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1); colocate_proc_with_table --------------------------------------------------------------------- (1 row) -- Test table returning functions. CREATE FUNCTION mx_call_func_tbl(x int) RETURNS TABLE (p0 int, p1 int) LANGUAGE plpgsql AS $$ BEGIN INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4); UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val+1 WHERE id >= x; UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val-1 WHERE id >= x; RETURN QUERY SELECT id, val FROM multi_mx_function_call_delegation.mx_call_dist_table_1 t WHERE id >= x ORDER BY 1, 2; END;$$; -- before distribution ... select mx_call_func_tbl(10); mx_call_func_tbl --------------------------------------------------------------------- (10,-1) (11,4) (2 rows) -- after distribution ... select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- (1 row) select mx_call_func_tbl(20); DEBUG: pushing down the function call mx_call_func_tbl --------------------------------------------------------------------- (20,-1) (21,4) (2 rows) -- Test that we properly propagate errors raised from procedures. CREATE FUNCTION mx_call_func_raise(x int) RETURNS void LANGUAGE plpgsql AS $$ BEGIN RAISE WARNING 'warning'; RAISE EXCEPTION 'error'; END;$$; select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- (1 row) \set VERBOSITY terse select mx_call_func_raise(2); DEBUG: pushing down the function call WARNING: warning ERROR: error \set VERBOSITY default -- Don't push-down when doing INSERT INTO ... SELECT func(); SET client_min_messages TO ERROR; CREATE TABLE test (x int primary key); SELECT create_distributed_table('test','x'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE OR REPLACE FUNCTION delegated_function(a int) RETURNS int LANGUAGE plpgsql AS $function$ DECLARE BEGIN INSERT INTO multi_mx_function_call_delegation.test VALUES (a); INSERT INTO multi_mx_function_call_delegation.test VALUES (a + 1); RETURN a+2; END; $function$; SELECT create_distributed_function('delegated_function(int)', 'a'); create_distributed_function --------------------------------------------------------------------- (1 row) SET client_min_messages TO DEBUG1; INSERT INTO test SELECT delegated_function(1); DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: not pushing down function calls in INSERT ... SELECT DEBUG: Collecting INSERT ... SELECT results on coordinator -- Don't push down in subqueries or CTEs. SELECT * FROM test WHERE not exists( SELECT delegated_function(4) ); DEBUG: not pushing down function calls in CTEs or Subqueries DEBUG: generating subplan XXX_1 for subquery SELECT multi_mx_function_call_delegation.delegated_function(4) AS delegated_function DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x FROM multi_mx_function_call_delegation.test WHERE (NOT (EXISTS (SELECT intermediate_result.delegated_function FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)))) x --------------------------------------------------------------------- (0 rows) WITH r AS ( SELECT delegated_function(7) ) SELECT * FROM test WHERE (SELECT count(*)=0 FROM r); DEBUG: generating subplan XXX_1 for CTE r: SELECT multi_mx_function_call_delegation.delegated_function(7) AS delegated_function DEBUG: not pushing down function calls in CTEs or Subqueries DEBUG: generating subplan XXX_2 for subquery SELECT (count(*) OPERATOR(pg_catalog.=) 0) FROM (SELECT intermediate_result.delegated_function FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)) r DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x FROM multi_mx_function_call_delegation.test WHERE (SELECT intermediate_result."?column?" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result("?column?" boolean)) x --------------------------------------------------------------------- (0 rows) WITH r AS ( SELECT delegated_function(10) ), t AS ( SELECT count(*) c FROM r ) SELECT * FROM test, t WHERE t.c=0; DEBUG: CTE t is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for CTE r: SELECT multi_mx_function_call_delegation.delegated_function(10) AS delegated_function DEBUG: not pushing down function calls in CTEs or Subqueries DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS c FROM (SELECT intermediate_result.delegated_function FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)) r DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT test.x, t.c FROM multi_mx_function_call_delegation.test, (SELECT intermediate_result.c FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(c bigint)) t WHERE (t.c OPERATOR(pg_catalog.=) 0) x | c --------------------------------------------------------------------- (0 rows) WITH r AS ( SELECT count(*) FROM test ), s AS ( SELECT delegated_function(13) ), t AS ( SELECT count(*) c FROM s ) SELECT * FROM test, r, t WHERE t.c=0; DEBUG: CTE r is going to be inlined via distributed planning DEBUG: CTE t is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for CTE s: SELECT multi_mx_function_call_delegation.delegated_function(13) AS delegated_function DEBUG: not pushing down function calls in CTEs or Subqueries DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count FROM multi_mx_function_call_delegation.test DEBUG: generating subplan XXX_3 for subquery SELECT count(*) AS c FROM (SELECT intermediate_result.delegated_function FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)) s DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT test.x, r.count, t.c FROM multi_mx_function_call_delegation.test, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) r, (SELECT intermediate_result.c FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(c bigint)) t WHERE (t.c OPERATOR(pg_catalog.=) 0) x | count | c --------------------------------------------------------------------- (0 rows) -- Test that we don't propagate to non-metadata worker nodes select stop_metadata_sync_to_node('localhost', :worker_1_port); stop_metadata_sync_to_node --------------------------------------------------------------------- (1 row) select stop_metadata_sync_to_node('localhost', :worker_2_port); stop_metadata_sync_to_node --------------------------------------------------------------------- (1 row) select mx_call_func(2, 0); DEBUG: the worker node does not have metadata DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- 29 (1 row) SET client_min_messages TO NOTICE; select start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node --------------------------------------------------------------------- (1 row) select start_metadata_sync_to_node('localhost', :worker_2_port); start_metadata_sync_to_node --------------------------------------------------------------------- (1 row) -- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make -- worker backend caches inconsistent. Reconnect to coordinator to use -- new worker connections, hence new backends. \c - - - :master_port SET search_path to multi_mx_function_call_delegation, public; SET client_min_messages TO DEBUG1; SET citus.shard_replication_factor = 1; -- -- Test non-const parameter values -- CREATE FUNCTION mx_call_add(int, int) RETURNS int AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; SELECT create_distributed_function('mx_call_add(int,int)', '$1'); DEBUG: switching to sequential query execution mode DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- (1 row) -- subquery parameters cannot be pushed down select mx_call_func((select x + 1 from mx_call_add(3, 4) x), 2); DEBUG: arguments in a distributed function must not contain subqueries DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (9 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- 35 (1 row) -- volatile parameter cannot be pushed down select mx_call_func(floor(random())::int, 2); DEBUG: arguments in a distributed function must be constant expressions DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- 27 (1 row) -- test forms we don't distribute select * from mx_call_func(2, 0); DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment y --------------------------------------------------------------------- 29 (1 row) select mx_call_func(2, 0) from mx_call_dist_table_1; mx_call_func --------------------------------------------------------------------- 28 28 28 28 28 28 28 28 28 (9 rows) select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func --------------------------------------------------------------------- (0 rows) select mx_call_func(2, 0), mx_call_func(0, 2); DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func | mx_call_func --------------------------------------------------------------------- 29 | 27 (1 row) DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; DEBUG: not pushing down function calls in a multi-statement transaction CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)" PL/pgSQL function inline_code_block line 1 at PERFORM SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val; id | val --------------------------------------------------------------------- 40 | -1 41 | 4 (2 rows) -- Prepared statements. Repeat six times to test for generic plans PREPARE call_plan (int, int) AS SELECT mx_call_func($1, $2); EXECUTE call_plan(2, 0); DEBUG: pushing down the function call mx_call_func --------------------------------------------------------------------- 28 (1 row) EXECUTE call_plan(2, 0); DEBUG: pushing down the function call mx_call_func --------------------------------------------------------------------- 28 (1 row) EXECUTE call_plan(2, 0); DEBUG: pushing down the function call mx_call_func --------------------------------------------------------------------- 28 (1 row) EXECUTE call_plan(2, 0); DEBUG: pushing down the function call mx_call_func --------------------------------------------------------------------- 28 (1 row) EXECUTE call_plan(2, 0); DEBUG: pushing down the function call mx_call_func --------------------------------------------------------------------- 28 (1 row) EXECUTE call_plan(2, 0); DEBUG: pushing down the function call mx_call_func --------------------------------------------------------------------- 28 (1 row) \c - - - :worker_1_port SET search_path TO multi_mx_function_call_delegation, public; -- create_distributed_function is disallowed from worker nodes select create_distributed_function('mx_call_func(int,int)'); ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. \c - - - :master_port SET search_path TO multi_mx_function_call_delegation, public; RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA multi_mx_function_call_delegation CASCADE; NOTICE: drop cascades to 14 other objects