diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed
index 5ede38e35..ae4b63ac7 100644
--- a/src/test/regress/bin/normalize.sed
+++ b/src/test/regress/bin/normalize.sed
@@ -240,3 +240,4 @@ s/ERROR: ROLLBACK is not allowed in an SQL function/ERROR: ROLLBACK is not all
/Parent Relationship/d
/Parent-Relationship/d
s/function array_cat_agg\(anycompatiblearray\)/function array_cat_agg\(anyarray\)/g
+s/TRIM\(BOTH FROM value\)/btrim\(value\)/g
diff --git a/src/test/regress/expected/multi_mx_call.out b/src/test/regress/expected/multi_mx_call.out
index 8511ed143..c493fdcd4 100644
--- a/src/test/regress/expected/multi_mx_call.out
+++ b/src/test/regress/expected/multi_mx_call.out
@@ -182,10 +182,10 @@ SET client_min_messages TO DEBUG1;
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: stored procedure does not have co-located tables
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
y
---------------------------------------------------------------------
@@ -266,10 +266,10 @@ begin;
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: cannot push down CALL in multi-statement transaction
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
y
---------------------------------------------------------------------
@@ -300,10 +300,10 @@ select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: cannot push down invalid distribution_argument_index
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
y
---------------------------------------------------------------------
@@ -319,10 +319,10 @@ select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: cannot push down invalid distribution_argument_index
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
y
---------------------------------------------------------------------
@@ -354,10 +354,10 @@ select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::re
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: cannot push down function call for replicated distributed tables
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
y
---------------------------------------------------------------------
@@ -449,10 +449,10 @@ SET client_min_messages TO DEBUG1;
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: there is no worker node with metadata
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
y
---------------------------------------------------------------------
@@ -495,10 +495,10 @@ DETAIL: A distributed function is created. To make sure subsequent commands see
call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4));
DEBUG: distribution argument value must be a constant
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
y
---------------------------------------------------------------------
@@ -517,10 +517,10 @@ DEBUG: pushing down the procedure
call multi_mx_call.mx_call_proc(floor(random())::int, 2);
DEBUG: arguments in a distributed stored procedure must be constant expressions
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
y
---------------------------------------------------------------------
diff --git a/src/test/regress/expected/multi_mx_call_0.out b/src/test/regress/expected/multi_mx_call_0.out
new file mode 100644
index 000000000..8511ed143
--- /dev/null
+++ b/src/test/regress/expected/multi_mx_call_0.out
@@ -0,0 +1,533 @@
+-- Test passing off CALL to mx workers
+create schema multi_mx_call;
+set search_path to multi_mx_call, public;
+-- Create worker-local tables to test procedure calls were routed
+set citus.shard_replication_factor to 2;
+-- This table requires specific settings, create before getting into things
+create table mx_call_dist_table_replica(id int, val int);
+select create_distributed_table('mx_call_dist_table_replica', 'id');
+ create_distributed_table
+---------------------------------------------------------------------
+
+(1 row)
+
+insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5);
+set citus.shard_replication_factor to 1;
+--
+-- Create tables and procedures we want to use in tests
+--
+create table mx_call_dist_table_1(id int, val int);
+select create_distributed_table('mx_call_dist_table_1', 'id');
+ create_distributed_table
+---------------------------------------------------------------------
+
+(1 row)
+
+insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5);
+create table mx_call_dist_table_2(id int, val int);
+select create_distributed_table('mx_call_dist_table_2', 'id');
+ create_distributed_table
+---------------------------------------------------------------------
+
+(1 row)
+
+insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4);
+create table mx_call_dist_table_bigint(id bigint, val bigint);
+select create_distributed_table('mx_call_dist_table_bigint', 'id');
+ create_distributed_table
+---------------------------------------------------------------------
+
+(1 row)
+
+insert into mx_call_dist_table_bigint values (1,1),(1,2),(2,2),(3,3),(3,4);
+create table mx_call_dist_table_ref(id int, val int);
+select create_reference_table('mx_call_dist_table_ref');
+ create_reference_table
+---------------------------------------------------------------------
+
+(1 row)
+
+insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8);
+create type mx_call_enum as enum ('A', 'S', 'D', 'F');
+create table mx_call_dist_table_enum(id int, key mx_call_enum);
+select create_distributed_table('mx_call_dist_table_enum', 'key');
+ create_distributed_table
+---------------------------------------------------------------------
+
+(1 row)
+
+insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F');
+-- test that a distributed function can be colocated with a reference table
+CREATE TABLE ref(groupid int);
+SELECT create_reference_table('ref');
+ create_reference_table
+---------------------------------------------------------------------
+
+(1 row)
+
+CREATE OR REPLACE PROCEDURE my_group_id_proc()
+LANGUAGE plpgsql
+SET search_path FROM CURRENT
+AS $$
+DECLARE
+ gid int;
+BEGIN
+ SELECT groupid INTO gid
+ FROM pg_dist_local_group;
+
+ INSERT INTO ref(groupid) VALUES (gid);
+END;
+$$;
+SELECT create_distributed_function('my_group_id_proc()', colocate_with := 'ref');
+ create_distributed_function
+---------------------------------------------------------------------
+
+(1 row)
+
+CALL my_group_id_proc();
+CALL my_group_id_proc();
+SELECT DISTINCT(groupid) FROM ref ORDER BY 1;
+ groupid
+---------------------------------------------------------------------
+ 14
+(1 row)
+
+TRUNCATE TABLE ref;
+-- test round robin task assignment policy uses different workers on consecutive procedure calls.
+SET citus.task_assignment_policy TO 'round-robin';
+CALL my_group_id_proc();
+CALL my_group_id_proc();
+CALL my_group_id_proc();
+SELECT DISTINCT(groupid) FROM ref ORDER BY 1;
+ groupid
+---------------------------------------------------------------------
+ 14
+ 18
+(2 rows)
+
+TRUNCATE TABLE ref;
+RESET citus.task_assignment_policy;
+CREATE PROCEDURE mx_call_proc(x int, INOUT y int)
+LANGUAGE plpgsql AS $$
+BEGIN
+ -- groupid is 0 in coordinator and non-zero in workers, so by using it here
+ -- we make sure the procedure is being executed in the worker.
+ y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group);
+ -- we also make sure that we can run distributed queries in the procedures
+ -- that are routed to the workers.
+ y := y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id);
+END;$$;
+CREATE PROCEDURE mx_call_proc_bigint(x bigint, INOUT y bigint)
+LANGUAGE plpgsql AS $$
+BEGIN
+ y := x + y * 2;
+END;$$;
+-- create another procedure which verifies:
+-- 1. we work fine with multiple return columns
+-- 2. we work fine in combination with custom types
+CREATE PROCEDURE mx_call_proc_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum)
+LANGUAGE plpgsql AS $$
+BEGIN
+ y := x;
+ x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
+END;$$;
+-- Test that undistributed procedures have no issue executing
+call multi_mx_call.mx_call_proc(2, 0);
+ y
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+call multi_mx_call.mx_call_proc_custom_types('S', 'A');
+ x | y
+---------------------------------------------------------------------
+ F | S
+(1 row)
+
+-- Same for unqualified names
+call mx_call_proc(2, 0);
+ y
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+call mx_call_proc_custom_types('S', 'A');
+ x | y
+---------------------------------------------------------------------
+ F | S
+(1 row)
+
+-- Mark both procedures as distributed ...
+select create_distributed_function('mx_call_proc(int,int)');
+ create_distributed_function
+---------------------------------------------------------------------
+
+(1 row)
+
+select create_distributed_function('mx_call_proc_bigint(bigint,bigint)');
+ create_distributed_function
+---------------------------------------------------------------------
+
+(1 row)
+
+select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)');
+ create_distributed_function
+---------------------------------------------------------------------
+
+(1 row)
+
+-- We still don't route them to the workers, because they aren't
+-- colocated with any distributed tables.
+SET client_min_messages TO DEBUG1;
+call multi_mx_call.mx_call_proc(2, 0);
+DEBUG: stored procedure does not have co-located tables
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+ y
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+call mx_call_proc_bigint(4, 2);
+DEBUG: stored procedure does not have co-located tables
+ y
+---------------------------------------------------------------------
+ 8
+(1 row)
+
+call multi_mx_call.mx_call_proc_custom_types('S', 'A');
+DEBUG: stored procedure does not have co-located tables
+ x | y
+---------------------------------------------------------------------
+ F | S
+(1 row)
+
+-- Mark them as colocated with a table. Now we should route them to workers.
+select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+select colocate_proc_with_table('mx_call_proc_bigint', 'mx_call_dist_table_bigint'::regclass, 1);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+call multi_mx_call.mx_call_proc(2, 0);
+DEBUG: pushing down the procedure
+ y
+---------------------------------------------------------------------
+ 28
+(1 row)
+
+call multi_mx_call.mx_call_proc_custom_types('S', 'A');
+DEBUG: pushing down the procedure
+ x | y
+---------------------------------------------------------------------
+ S | S
+(1 row)
+
+call mx_call_proc(2, 0);
+DEBUG: pushing down the procedure
+ y
+---------------------------------------------------------------------
+ 28
+(1 row)
+
+call mx_call_proc_custom_types('S', 'A');
+DEBUG: pushing down the procedure
+ x | y
+---------------------------------------------------------------------
+ S | S
+(1 row)
+
+-- Test implicit cast of int to bigint
+call mx_call_proc_bigint(4, 2);
+DEBUG: pushing down the procedure
+ y
+---------------------------------------------------------------------
+ 8
+(1 row)
+
+-- We don't allow distributing calls inside transactions
+begin;
+call multi_mx_call.mx_call_proc(2, 0);
+DEBUG: cannot push down CALL in multi-statement transaction
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+ y
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+commit;
+-- Drop the table colocated with mx_call_proc_custom_types. Now it shouldn't
+-- be routed to workers anymore.
+SET client_min_messages TO NOTICE;
+drop table mx_call_dist_table_enum;
+SET client_min_messages TO DEBUG1;
+call multi_mx_call.mx_call_proc_custom_types('S', 'A');
+DEBUG: stored procedure does not have co-located tables
+ x | y
+---------------------------------------------------------------------
+ F | S
+(1 row)
+
+-- Make sure we do bounds checking on distributed argument index
+-- This also tests that we have cache invalidation for pg_dist_object updates
+select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+call multi_mx_call.mx_call_proc(2, 0);
+DEBUG: cannot push down invalid distribution_argument_index
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+ y
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+call multi_mx_call.mx_call_proc(2, 0);
+DEBUG: cannot push down invalid distribution_argument_index
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+ y
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+-- We support colocating with reference tables
+select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, NULL);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+call multi_mx_call.mx_call_proc(2, 0);
+DEBUG: will push down CALL for reference tables
+DEBUG: pushing down the procedure
+ y
+---------------------------------------------------------------------
+ 28
+(1 row)
+
+-- We don't currently support colocating with replicated tables
+select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+call multi_mx_call.mx_call_proc(2, 0);
+DEBUG: cannot push down function call for replicated distributed tables
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+ y
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+SET client_min_messages TO NOTICE;
+drop table mx_call_dist_table_replica;
+SET client_min_messages TO DEBUG1;
+select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+-- Test that we handle transactional constructs correctly inside a procedure
+-- that is routed to the workers.
+CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$
+BEGIN
+ INSERT INTO multi_mx_call.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
+ COMMIT;
+ UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
+ ROLLBACK;
+ -- Now do the final update!
+ UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
+END;$$;
+-- before distribution ...
+CALL multi_mx_call.mx_call_proc_tx(10);
+-- after distribution ...
+select create_distributed_function('mx_call_proc_tx(int)', '$1', 'mx_call_dist_table_1');
+DEBUG: switching to sequential query execution mode
+DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
+ create_distributed_function
+---------------------------------------------------------------------
+
+(1 row)
+
+CALL multi_mx_call.mx_call_proc_tx(20);
+DEBUG: pushing down the procedure
+SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val;
+ id | val
+---------------------------------------------------------------------
+ 3 | 1
+ 3 | 5
+ 4 | 5
+ 6 | 5
+ 9 | 2
+ 10 | -2
+ 11 | 3
+ 20 | -2
+ 21 | 3
+(9 rows)
+
+-- Test that we properly propagate errors raised from procedures.
+CREATE PROCEDURE mx_call_proc_raise(x int) LANGUAGE plpgsql AS $$
+BEGIN
+ RAISE WARNING 'warning';
+ RAISE EXCEPTION 'error';
+END;$$;
+select create_distributed_function('mx_call_proc_raise(int)', '$1', 'mx_call_dist_table_1');
+DEBUG: switching to sequential query execution mode
+DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
+ create_distributed_function
+---------------------------------------------------------------------
+
+(1 row)
+
+\set VERBOSITY terse
+call multi_mx_call.mx_call_proc_raise(2);
+DEBUG: pushing down the procedure
+WARNING: warning
+ERROR: error
+\set VERBOSITY default
+-- Test that we don't propagate to non-metadata worker nodes
+SET client_min_messages TO WARNING;
+select stop_metadata_sync_to_node('localhost', :worker_1_port);
+ stop_metadata_sync_to_node
+---------------------------------------------------------------------
+
+(1 row)
+
+select stop_metadata_sync_to_node('localhost', :worker_2_port);
+ stop_metadata_sync_to_node
+---------------------------------------------------------------------
+
+(1 row)
+
+SET client_min_messages TO DEBUG1;
+call multi_mx_call.mx_call_proc(2, 0);
+DEBUG: there is no worker node with metadata
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+ y
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+SET client_min_messages TO NOTICE;
+select start_metadata_sync_to_node('localhost', :worker_1_port);
+ start_metadata_sync_to_node
+---------------------------------------------------------------------
+
+(1 row)
+
+select start_metadata_sync_to_node('localhost', :worker_2_port);
+ start_metadata_sync_to_node
+---------------------------------------------------------------------
+
+(1 row)
+
+-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make
+-- worker backend caches inconsistent. Reconnect to coordinator to use
+-- new worker connections, hence new backends.
+\c - - - :master_port
+SET search_path to multi_mx_call, public;
+SET client_min_messages TO DEBUG1;
+--
+-- Test non-const parameter values
+--
+CREATE FUNCTION mx_call_add(int, int) RETURNS int
+ AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE;
+SELECT create_distributed_function('mx_call_add(int,int)');
+DEBUG: switching to sequential query execution mode
+DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
+ create_distributed_function
+---------------------------------------------------------------------
+
+(1 row)
+
+-- non-const distribution parameters cannot be pushed down
+call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4));
+DEBUG: distribution argument value must be a constant
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+ y
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+-- non-const parameter can be pushed down
+call multi_mx_call.mx_call_proc(multi_mx_call.mx_call_add(3, 4), 2);
+DEBUG: pushing down the procedure
+ y
+---------------------------------------------------------------------
+ 33
+(1 row)
+
+-- volatile parameter cannot be pushed down
+call multi_mx_call.mx_call_proc(floor(random())::int, 2);
+DEBUG: arguments in a distributed stored procedure must be constant expressions
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
+ y
+---------------------------------------------------------------------
+ 27
+(1 row)
+
+reset client_min_messages;
+\set VERBOSITY terse
+drop schema multi_mx_call cascade;
+NOTICE: drop cascades to 13 other objects
diff --git a/src/test/regress/expected/multi_mx_explain.out b/src/test/regress/expected/multi_mx_explain.out
index ab996275c..2c58dd003 100644
--- a/src/test/regress/expected/multi_mx_explain.out
+++ b/src/test/regress/expected/multi_mx_explain.out
@@ -88,13 +88,11 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Partial Mode": "Simple",
- "Parent Relationship": "Outer",
"Parallel Aware": false,
"Group Key": ["remote_scan.l_quantity"],
"Plans": [
{
"Node Type": "Custom Scan",
- "Parent Relationship": "Outer",
"Custom Plan Provider": "Citus Adaptive",
"Parallel Aware": false,
"Distributed Query": {
@@ -116,7 +114,6 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
"Plans": [
{
"Node Type": "Seq Scan",
- "Parent Relationship": "Outer",
"Parallel Aware": false,
"Relation Name": "lineitem_mx_1220052",
"Alias": "lineitem_mx"
@@ -162,7 +159,6 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
Aggregate
Hashed
Simple
- Outer
false
- remote_scan.l_quantity
@@ -170,7 +166,6 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
Custom Scan
- Outer
Citus Adaptive
false
@@ -194,7 +189,6 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
Seq Scan
- Outer
false
lineitem_mx_1220052
lineitem_mx
@@ -234,13 +228,11 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
- Node Type: "Aggregate"
Strategy: "Hashed"
Partial Mode: "Simple"
- Parent Relationship: "Outer"
Parallel Aware: false
Group Key:
- "remote_scan.l_quantity"
Plans:
- Node Type: "Custom Scan"
- Parent Relationship: "Outer"
Custom Plan Provider: "Citus Adaptive"
Parallel Aware: false
Distributed Query:
@@ -259,7 +251,6 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
- "l_quantity"
Plans:
- Node Type: "Seq Scan"
- Parent Relationship: "Outer"
Parallel Aware: false
Relation Name: "lineitem_mx_1220052"
Alias: "lineitem_mx"
@@ -537,7 +528,6 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
"Plans": [
{
"Node Type": "Custom Scan",
- "Parent Relationship": "Outer",
"Custom Plan Provider": "Citus Adaptive",
"Parallel Aware": false,
"Distributed Query": {
@@ -558,7 +548,6 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
"Plans": [
{
"Node Type": "Hash Join",
- "Parent Relationship": "Outer",
"Parallel Aware": false,
"Join Type": "Inner",
"Inner Unique": false,
@@ -566,7 +555,6 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
"Plans": [
{
"Node Type": "Hash Join",
- "Parent Relationship": "Outer",
"Parallel Aware": false,
"Join Type": "Inner",
"Inner Unique": false,
@@ -574,19 +562,16 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
"Plans": [
{
"Node Type": "Seq Scan",
- "Parent Relationship": "Outer",
"Parallel Aware": false,
"Relation Name": "supplier_mx_1220087",
"Alias": "supplier_mx"
},
{
"Node Type": "Hash",
- "Parent Relationship": "Inner",
"Parallel Aware": false,
"Plans": [
{
"Node Type": "Seq Scan",
- "Parent Relationship": "Outer",
"Parallel Aware": false,
"Relation Name": "lineitem_mx_1220052",
"Alias": "lineitem_mx"
@@ -597,12 +582,10 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
},
{
"Node Type": "Hash",
- "Parent Relationship": "Inner",
"Parallel Aware": false,
"Plans": [
{
"Node Type": "Hash Join",
- "Parent Relationship": "Outer",
"Parallel Aware": false,
"Join Type": "Inner",
"Inner Unique": false,
@@ -610,19 +593,16 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
"Plans": [
{
"Node Type": "Seq Scan",
- "Parent Relationship": "Outer",
"Parallel Aware": false,
"Relation Name": "customer_mx_1220084",
"Alias": "customer_mx"
},
{
"Node Type": "Hash",
- "Parent Relationship": "Inner",
"Parallel Aware": false,
"Plans": [
{
"Node Type": "Seq Scan",
- "Parent Relationship": "Outer",
"Parallel Aware": false,
"Relation Name": "orders_mx_1220068",
"Alias": "orders_mx"
@@ -673,7 +653,6 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
Custom Scan
- Outer
Citus Adaptive
false
@@ -694,7 +673,6 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
Hash Join
- Outer
false
Inner
false
@@ -702,7 +680,6 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
Hash Join
- Outer
false
Inner
false
@@ -710,19 +687,16 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
Seq Scan
- Outer
false
supplier_mx_1220087
supplier_mx
Hash
- Inner
false
Seq Scan
- Outer
false
lineitem_mx_1220052
lineitem_mx
@@ -733,12 +707,10 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
Hash
- Inner
false
Hash Join
- Outer
false
Inner
false
@@ -746,19 +718,16 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
Seq Scan
- Outer
false
customer_mx_1220084
customer_mx
Hash
- Inner
false
Seq Scan
- Outer
false
orders_mx_1220068
orders_mx
@@ -805,7 +774,6 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Parallel Aware: false
Plans:
- Node Type: "Custom Scan"
- Parent Relationship: "Outer"
Custom Plan Provider: "Citus Adaptive"
Parallel Aware: false
Distributed Query:
@@ -822,55 +790,45 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Parallel Aware: false
Plans:
- Node Type: "Hash Join"
- Parent Relationship: "Outer"
Parallel Aware: false
Join Type: "Inner"
Inner Unique: false
Hash Cond: "(lineitem_mx.l_orderkey = orders_mx.o_orderkey)"
Plans:
- Node Type: "Hash Join"
- Parent Relationship: "Outer"
Parallel Aware: false
Join Type: "Inner"
Inner Unique: false
Hash Cond: "(supplier_mx.s_suppkey = lineitem_mx.l_suppkey)"
Plans:
- Node Type: "Seq Scan"
- Parent Relationship: "Outer"
Parallel Aware: false
Relation Name: "supplier_mx_1220087"
Alias: "supplier_mx"
- Node Type: "Hash"
- Parent Relationship: "Inner"
Parallel Aware: false
Plans:
- Node Type: "Seq Scan"
- Parent Relationship: "Outer"
Parallel Aware: false
Relation Name: "lineitem_mx_1220052"
Alias: "lineitem_mx"
- Node Type: "Hash"
- Parent Relationship: "Inner"
Parallel Aware: false
Plans:
- Node Type: "Hash Join"
- Parent Relationship: "Outer"
Parallel Aware: false
Join Type: "Inner"
Inner Unique: false
Hash Cond: "(customer_mx.c_custkey = orders_mx.o_custkey)"
Plans:
- Node Type: "Seq Scan"
- Parent Relationship: "Outer"
Parallel Aware: false
Relation Name: "customer_mx_1220084"
Alias: "customer_mx"
- Node Type: "Hash"
- Parent Relationship: "Inner"
Parallel Aware: false
Plans:
- Node Type: "Seq Scan"
- Parent Relationship: "Outer"
Parallel Aware: false
Relation Name: "orders_mx_1220068"
Alias: "orders_mx"
diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out
index 817cc92a7..dd5bfdbfc 100644
--- a/src/test/regress/expected/multi_mx_function_call_delegation.out
+++ b/src/test/regress/expected/multi_mx_function_call_delegation.out
@@ -143,10 +143,10 @@ SET client_min_messages TO DEBUG1;
select mx_call_func(2, 0);
DEBUG: function does not have co-located tables
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
---------------------------------------------------------------------
@@ -230,26 +230,33 @@ DEBUG: pushing down the function call
(S,S)
(1 row)
--- This is currently an undetected failure when using the binary protocol
--- It should not be enabled by default until this is resolved. The tests above
--- will fail too, when changing the default to TRUE;
+-- this is fixed with pg14 and this will fail prior to
+-- pg 14
SET citus.enable_binary_protocol = TRUE;
select mx_call_func_custom_types('S', 'A');
DEBUG: pushing down the function call
-ERROR: wrong data type: XXXX, expected XXXX
+ mx_call_func_custom_types
+---------------------------------------------------------------------
+ (S,S)
+(1 row)
+
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
DEBUG: pushing down the function call
-ERROR: wrong data type: XXXX, expected XXXX
+ mx_call_func_custom_types
+---------------------------------------------------------------------
+ (S,S)
+(1 row)
+
RESET citus.enable_binary_protocol;
-- We don't allow distributing calls inside transactions
begin;
select mx_call_func(2, 0);
DEBUG: not pushing down function calls in a multi-statement transaction
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
---------------------------------------------------------------------
@@ -280,10 +287,10 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass
select mx_call_func(2, 0);
DEBUG: cannot push down invalid distribution_argument_index
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
---------------------------------------------------------------------
@@ -299,10 +306,10 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass
select mx_call_func(2, 0);
DEBUG: cannot push down invalid distribution_argument_index
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
---------------------------------------------------------------------
@@ -333,10 +340,10 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_replica'::re
select mx_call_func(2, 0);
DEBUG: cannot push down function call for replicated distributed tables
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
---------------------------------------------------------------------
@@ -515,10 +522,10 @@ SET client_min_messages TO DEBUG1;
select mx_call_func(2, 0);
DEBUG: the worker node does not have metadata
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
---------------------------------------------------------------------
@@ -562,10 +569,10 @@ DETAIL: A distributed function is created. To make sure subsequent commands see
select mx_call_func((select x + 1 from mx_call_add(3, 4) x), 2);
DEBUG: arguments in a distributed function must not contain subqueries
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (9 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((9 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
---------------------------------------------------------------------
@@ -576,10 +583,10 @@ PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
select mx_call_func(floor(random())::int, 2);
DEBUG: arguments in a distributed function must be constant expressions
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
---------------------------------------------------------------------
@@ -589,10 +596,10 @@ PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
-- test forms we don't distribute
select * from mx_call_func(2, 0);
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
y
---------------------------------------------------------------------
@@ -615,10 +622,10 @@ select mx_call_func(2, 0) from mx_call_dist_table_1;
select mx_call_func(2, 0) where mx_call_func(0, 2) = 0;
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
---------------------------------------------------------------------
@@ -626,16 +633,16 @@ PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
select mx_call_func(2, 0), mx_call_func(0, 2);
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
-DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
-CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
+CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func | mx_call_func
---------------------------------------------------------------------
diff --git a/src/test/regress/expected/multi_mx_function_call_delegation_0.out b/src/test/regress/expected/multi_mx_function_call_delegation_0.out
new file mode 100644
index 000000000..e16ba2922
--- /dev/null
+++ b/src/test/regress/expected/multi_mx_function_call_delegation_0.out
@@ -0,0 +1,724 @@
+-- Test passing off function call to mx workers
+CREATE SCHEMA multi_mx_function_call_delegation;
+SET search_path TO multi_mx_function_call_delegation, public;
+SET citus.shard_replication_factor TO 2;
+-- This table requires specific settings, create before getting into things
+create table mx_call_dist_table_replica(id int, val int);
+select create_distributed_table('mx_call_dist_table_replica', 'id');
+ create_distributed_table
+---------------------------------------------------------------------
+
+(1 row)
+
+insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5);
+SET citus.shard_replication_factor TO 1;
+--
+-- Create tables and functions we want to use in tests
+--
+create table mx_call_dist_table_1(id int, val int);
+select create_distributed_table('mx_call_dist_table_1', 'id');
+ create_distributed_table
+---------------------------------------------------------------------
+
+(1 row)
+
+insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5);
+create table mx_call_dist_table_2(id int, val int);
+select create_distributed_table('mx_call_dist_table_2', 'id');
+ create_distributed_table
+---------------------------------------------------------------------
+
+(1 row)
+
+insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4);
+create table mx_call_dist_table_bigint(id bigint, val bigint);
+select create_distributed_table('mx_call_dist_table_bigint', 'id');
+ create_distributed_table
+---------------------------------------------------------------------
+
+(1 row)
+
+insert into mx_call_dist_table_bigint values (1,1),(1,2),(2,2),(3,3),(3,4);
+create table mx_call_dist_table_ref(id int, val int);
+select create_reference_table('mx_call_dist_table_ref');
+ create_reference_table
+---------------------------------------------------------------------
+
+(1 row)
+
+insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8);
+create type mx_call_enum as enum ('A', 'S', 'D', 'F');
+create table mx_call_dist_table_enum(id int, key mx_call_enum);
+select create_distributed_table('mx_call_dist_table_enum', 'key');
+ create_distributed_table
+---------------------------------------------------------------------
+
+(1 row)
+
+insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F');
+CREATE FUNCTION squares(int) RETURNS SETOF RECORD
+ AS $$ SELECT i, i * i FROM generate_series(1, $1) i $$
+ LANGUAGE SQL;
+CREATE FUNCTION mx_call_func(x int, INOUT y int)
+LANGUAGE plpgsql AS $$
+BEGIN
+ -- groupid is 0 in coordinator and non-zero in workers, so by using it here
+ -- we make sure the function is being executed in the worker.
+ y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group);
+ -- we also make sure that we can run distributed queries in the functions
+ -- that are routed to the workers.
+ y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id);
+END;$$;
+CREATE FUNCTION mx_call_func_bigint(x bigint, INOUT y bigint)
+LANGUAGE plpgsql AS $$
+BEGIN
+ y := x + y * 2;
+END;$$;
+-- create another function which verifies:
+-- 1. we work fine with multiple return columns
+-- 2. we work fine in combination with custom types
+CREATE FUNCTION mx_call_func_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum)
+LANGUAGE plpgsql AS $$
+BEGIN
+ y := x;
+ x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
+END;$$;
+-- Test that undistributed functions have no issue executing
+select multi_mx_function_call_delegation.mx_call_func(2, 0);
+ mx_call_func
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
+ mx_call_func_custom_types
+---------------------------------------------------------------------
+ (F,S)
+(1 row)
+
+select squares(4);
+ squares
+---------------------------------------------------------------------
+ (1,1)
+ (2,4)
+ (3,9)
+ (4,16)
+(4 rows)
+
+-- Same for unqualified name
+select mx_call_func(2, 0);
+ mx_call_func
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+-- Mark both functions as distributed ...
+select create_distributed_function('mx_call_func(int,int)');
+ create_distributed_function
+---------------------------------------------------------------------
+
+(1 row)
+
+select create_distributed_function('mx_call_func_bigint(bigint,bigint)');
+ create_distributed_function
+---------------------------------------------------------------------
+
+(1 row)
+
+select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)');
+ create_distributed_function
+---------------------------------------------------------------------
+
+(1 row)
+
+select create_distributed_function('squares(int)');
+ create_distributed_function
+---------------------------------------------------------------------
+
+(1 row)
+
+-- We still don't route them to the workers, because they aren't
+-- colocated with any distributed tables.
+SET client_min_messages TO DEBUG1;
+select mx_call_func(2, 0);
+DEBUG: function does not have co-located tables
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+ mx_call_func
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+select multi_mx_function_call_delegation.mx_call_func_bigint(4, 2);
+DEBUG: function does not have co-located tables
+ mx_call_func_bigint
+---------------------------------------------------------------------
+ 8
+(1 row)
+
+select mx_call_func_custom_types('S', 'A');
+DEBUG: function does not have co-located tables
+ mx_call_func_custom_types
+---------------------------------------------------------------------
+ (F,S)
+(1 row)
+
+-- Mark them as colocated with a table. Now we should route them to workers.
+select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+select colocate_proc_with_table('mx_call_func_bigint', 'mx_call_dist_table_bigint'::regclass, 1);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+select mx_call_func(2, 0);
+DEBUG: pushing down the function call
+ mx_call_func
+---------------------------------------------------------------------
+ 28
+(1 row)
+
+select mx_call_func_bigint(4, 2);
+DEBUG: pushing down the function call
+ mx_call_func_bigint
+---------------------------------------------------------------------
+ 8
+(1 row)
+
+select mx_call_func_custom_types('S', 'A');
+DEBUG: pushing down the function call
+ mx_call_func_custom_types
+---------------------------------------------------------------------
+ (S,S)
+(1 row)
+
+select squares(4);
+DEBUG: pushing down the function call
+ERROR: input of anonymous composite types is not implemented
+select multi_mx_function_call_delegation.mx_call_func(2, 0);
+DEBUG: pushing down the function call
+ mx_call_func
+---------------------------------------------------------------------
+ 28
+(1 row)
+
+select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
+DEBUG: pushing down the function call
+ mx_call_func_custom_types
+---------------------------------------------------------------------
+ (S,S)
+(1 row)
+
+-- this is fixed with pg14 and this will fail prior to
+-- pg 14
+SET citus.enable_binary_protocol = TRUE;
+select mx_call_func_custom_types('S', 'A');
+DEBUG: pushing down the function call
+ERROR: wrong data type: XXXX, expected XXXX
+select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
+DEBUG: pushing down the function call
+ERROR: wrong data type: XXXX, expected XXXX
+RESET citus.enable_binary_protocol;
+-- We don't allow distributing calls inside transactions
+begin;
+select mx_call_func(2, 0);
+DEBUG: not pushing down function calls in a multi-statement transaction
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+ mx_call_func
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+commit;
+-- Drop the table colocated with mx_call_func_custom_types. Now it shouldn't
+-- be routed to workers anymore.
+SET client_min_messages TO NOTICE;
+drop table mx_call_dist_table_enum;
+SET client_min_messages TO DEBUG1;
+select mx_call_func_custom_types('S', 'A');
+DEBUG: function does not have co-located tables
+ mx_call_func_custom_types
+---------------------------------------------------------------------
+ (F,S)
+(1 row)
+
+-- Make sure we do bounds checking on distributed argument index
+-- This also tests that we have cache invalidation for pg_dist_object updates
+select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, -1);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+select mx_call_func(2, 0);
+DEBUG: cannot push down invalid distribution_argument_index
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+ mx_call_func
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 2);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+select mx_call_func(2, 0);
+DEBUG: cannot push down invalid distribution_argument_index
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+ mx_call_func
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+-- We don't currently support colocating with reference tables
+select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_ref'::regclass, 1);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+select mx_call_func(2, 0);
+DEBUG: pushing down the function call
+ mx_call_func
+---------------------------------------------------------------------
+ 28
+(1 row)
+
+-- We don't currently support colocating with replicated tables
+select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_replica'::regclass, 1);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+select mx_call_func(2, 0);
+DEBUG: cannot push down function call for replicated distributed tables
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+ mx_call_func
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+SET client_min_messages TO NOTICE;
+drop table mx_call_dist_table_replica;
+SET client_min_messages TO DEBUG1;
+select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1);
+ colocate_proc_with_table
+---------------------------------------------------------------------
+
+(1 row)
+
+-- Test table returning functions.
+CREATE FUNCTION mx_call_func_tbl(x int)
+RETURNS TABLE (p0 int, p1 int)
+LANGUAGE plpgsql AS $$
+BEGIN
+ INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
+ UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
+ UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
+ RETURN QUERY
+ SELECT id, val
+ FROM multi_mx_function_call_delegation.mx_call_dist_table_1 t
+ WHERE id >= x
+ ORDER BY 1, 2;
+END;$$;
+-- before distribution ...
+select mx_call_func_tbl(10);
+ mx_call_func_tbl
+---------------------------------------------------------------------
+ (10,-1)
+ (11,4)
+(2 rows)
+
+-- after distribution ...
+select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1');
+DEBUG: switching to sequential query execution mode
+DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
+ create_distributed_function
+---------------------------------------------------------------------
+
+(1 row)
+
+select mx_call_func_tbl(20);
+DEBUG: pushing down the function call
+ mx_call_func_tbl
+---------------------------------------------------------------------
+ (20,-1)
+ (21,4)
+(2 rows)
+
+-- Test that we properly propagate errors raised from procedures.
+CREATE FUNCTION mx_call_func_raise(x int)
+RETURNS void LANGUAGE plpgsql AS $$
+BEGIN
+ RAISE WARNING 'warning';
+ RAISE EXCEPTION 'error';
+END;$$;
+select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1');
+DEBUG: switching to sequential query execution mode
+DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
+ create_distributed_function
+---------------------------------------------------------------------
+
+(1 row)
+
+\set VERBOSITY terse
+select mx_call_func_raise(2);
+DEBUG: pushing down the function call
+WARNING: warning
+ERROR: error
+\set VERBOSITY default
+-- Don't push-down when doing INSERT INTO ... SELECT func();
+SET client_min_messages TO ERROR;
+CREATE TABLE test (x int primary key);
+SELECT create_distributed_table('test','x');
+ create_distributed_table
+---------------------------------------------------------------------
+
+(1 row)
+
+CREATE OR REPLACE FUNCTION delegated_function(a int)
+RETURNS int
+LANGUAGE plpgsql
+AS $function$
+DECLARE
+BEGIN
+ INSERT INTO multi_mx_function_call_delegation.test VALUES (a);
+ INSERT INTO multi_mx_function_call_delegation.test VALUES (a + 1);
+ RETURN a+2;
+END;
+$function$;
+SELECT create_distributed_function('delegated_function(int)', 'a');
+ create_distributed_function
+---------------------------------------------------------------------
+
+(1 row)
+
+SET client_min_messages TO DEBUG1;
+INSERT INTO test SELECT delegated_function(1);
+DEBUG: distributed INSERT ... SELECT can only select from distributed tables
+DEBUG: not pushing down function calls in INSERT ... SELECT
+DEBUG: Collecting INSERT ... SELECT results on coordinator
+-- Don't push down in subqueries or CTEs.
+SELECT * FROM test WHERE not exists(
+ SELECT delegated_function(4)
+);
+DEBUG: not pushing down function calls in CTEs or Subqueries
+DEBUG: generating subplan XXX_1 for subquery SELECT multi_mx_function_call_delegation.delegated_function(4) AS delegated_function
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x FROM multi_mx_function_call_delegation.test WHERE (NOT (EXISTS (SELECT intermediate_result.delegated_function FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer))))
+ x
+---------------------------------------------------------------------
+(0 rows)
+
+WITH r AS (
+ SELECT delegated_function(7)
+) SELECT * FROM test WHERE (SELECT count(*)=0 FROM r);
+DEBUG: generating subplan XXX_1 for CTE r: SELECT multi_mx_function_call_delegation.delegated_function(7) AS delegated_function
+DEBUG: not pushing down function calls in CTEs or Subqueries
+DEBUG: generating subplan XXX_2 for subquery SELECT (count(*) OPERATOR(pg_catalog.=) 0) FROM (SELECT intermediate_result.delegated_function FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)) r
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x FROM multi_mx_function_call_delegation.test WHERE (SELECT intermediate_result."?column?" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result("?column?" boolean))
+ x
+---------------------------------------------------------------------
+(0 rows)
+
+WITH r AS (
+ SELECT delegated_function(10)
+), t AS (
+ SELECT count(*) c FROM r
+) SELECT * FROM test, t WHERE t.c=0;
+DEBUG: CTE t is going to be inlined via distributed planning
+DEBUG: generating subplan XXX_1 for CTE r: SELECT multi_mx_function_call_delegation.delegated_function(10) AS delegated_function
+DEBUG: not pushing down function calls in CTEs or Subqueries
+DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS c FROM (SELECT intermediate_result.delegated_function FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)) r
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT test.x, t.c FROM multi_mx_function_call_delegation.test, (SELECT intermediate_result.c FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(c bigint)) t WHERE (t.c OPERATOR(pg_catalog.=) 0)
+ x | c
+---------------------------------------------------------------------
+(0 rows)
+
+WITH r AS (
+ SELECT count(*) FROM test
+), s AS (
+ SELECT delegated_function(13)
+), t AS (
+ SELECT count(*) c FROM s
+) SELECT * FROM test, r, t WHERE t.c=0;
+DEBUG: CTE r is going to be inlined via distributed planning
+DEBUG: CTE t is going to be inlined via distributed planning
+DEBUG: generating subplan XXX_1 for CTE s: SELECT multi_mx_function_call_delegation.delegated_function(13) AS delegated_function
+DEBUG: not pushing down function calls in CTEs or Subqueries
+DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count FROM multi_mx_function_call_delegation.test
+DEBUG: generating subplan XXX_3 for subquery SELECT count(*) AS c FROM (SELECT intermediate_result.delegated_function FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)) s
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT test.x, r.count, t.c FROM multi_mx_function_call_delegation.test, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) r, (SELECT intermediate_result.c FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(c bigint)) t WHERE (t.c OPERATOR(pg_catalog.=) 0)
+ x | count | c
+---------------------------------------------------------------------
+(0 rows)
+
+-- Test that we don't propagate to non-metadata worker nodes
+SET client_min_messages TO WARNING;
+select stop_metadata_sync_to_node('localhost', :worker_1_port);
+ stop_metadata_sync_to_node
+---------------------------------------------------------------------
+
+(1 row)
+
+select stop_metadata_sync_to_node('localhost', :worker_2_port);
+ stop_metadata_sync_to_node
+---------------------------------------------------------------------
+
+(1 row)
+
+SET client_min_messages TO DEBUG1;
+select mx_call_func(2, 0);
+DEBUG: the worker node does not have metadata
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+ mx_call_func
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+SET client_min_messages TO NOTICE;
+select start_metadata_sync_to_node('localhost', :worker_1_port);
+ start_metadata_sync_to_node
+---------------------------------------------------------------------
+
+(1 row)
+
+select start_metadata_sync_to_node('localhost', :worker_2_port);
+ start_metadata_sync_to_node
+---------------------------------------------------------------------
+
+(1 row)
+
+-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make
+-- worker backend caches inconsistent. Reconnect to coordinator to use
+-- new worker connections, hence new backends.
+\c - - - :master_port
+SET search_path to multi_mx_function_call_delegation, public;
+SET client_min_messages TO DEBUG1;
+SET citus.shard_replication_factor = 1;
+--
+-- Test non-const parameter values
+--
+CREATE FUNCTION mx_call_add(int, int) RETURNS int
+ AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE;
+SELECT create_distributed_function('mx_call_add(int,int)', '$1');
+DEBUG: switching to sequential query execution mode
+DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
+ create_distributed_function
+---------------------------------------------------------------------
+
+(1 row)
+
+-- subquery parameters cannot be pushed down
+select mx_call_func((select x + 1 from mx_call_add(3, 4) x), 2);
+DEBUG: arguments in a distributed function must not contain subqueries
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (9 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+ mx_call_func
+---------------------------------------------------------------------
+ 35
+(1 row)
+
+-- volatile parameter cannot be pushed down
+select mx_call_func(floor(random())::int, 2);
+DEBUG: arguments in a distributed function must be constant expressions
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+ mx_call_func
+---------------------------------------------------------------------
+ 27
+(1 row)
+
+-- test forms we don't distribute
+select * from mx_call_func(2, 0);
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+ y
+---------------------------------------------------------------------
+ 29
+(1 row)
+
+select mx_call_func(2, 0) from mx_call_dist_table_1;
+ mx_call_func
+---------------------------------------------------------------------
+ 28
+ 28
+ 28
+ 28
+ 28
+ 28
+ 28
+ 28
+ 28
+(9 rows)
+
+select mx_call_func(2, 0) where mx_call_func(0, 2) = 0;
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+ mx_call_func
+---------------------------------------------------------------------
+(0 rows)
+
+select mx_call_func(2, 0), mx_call_func(0, 2);
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
+CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
+PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
+ mx_call_func | mx_call_func
+---------------------------------------------------------------------
+ 29 | 27
+(1 row)
+
+DO $$ BEGIN perform mx_call_func_tbl(40); END; $$;
+DEBUG: not pushing down function calls in a multi-statement transaction
+CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)"
+PL/pgSQL function inline_code_block line 1 at PERFORM
+SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val;
+ id | val
+---------------------------------------------------------------------
+ 40 | -1
+ 41 | 4
+(2 rows)
+
+-- Prepared statements. Repeat 8 times to test for generic plans
+PREPARE call_plan (int, int) AS SELECT mx_call_func($1, $2);
+EXECUTE call_plan(2, 0);
+DEBUG: pushing down the function call
+ mx_call_func
+---------------------------------------------------------------------
+ 28
+(1 row)
+
+EXECUTE call_plan(2, 0);
+DEBUG: pushing down the function call
+ mx_call_func
+---------------------------------------------------------------------
+ 28
+(1 row)
+
+EXECUTE call_plan(2, 0);
+DEBUG: pushing down the function call
+ mx_call_func
+---------------------------------------------------------------------
+ 28
+(1 row)
+
+EXECUTE call_plan(2, 0);
+DEBUG: pushing down the function call
+ mx_call_func
+---------------------------------------------------------------------
+ 28
+(1 row)
+
+EXECUTE call_plan(2, 0);
+DEBUG: pushing down the function call
+ mx_call_func
+---------------------------------------------------------------------
+ 28
+(1 row)
+
+EXECUTE call_plan(2, 0);
+DEBUG: pushing down the function call
+ mx_call_func
+---------------------------------------------------------------------
+ 28
+(1 row)
+
+EXECUTE call_plan(2, 0);
+DEBUG: pushing down the function call
+ mx_call_func
+---------------------------------------------------------------------
+ 28
+(1 row)
+
+EXECUTE call_plan(2, 0);
+DEBUG: pushing down the function call
+ mx_call_func
+---------------------------------------------------------------------
+ 28
+(1 row)
+
+\c - - - :worker_1_port
+SET search_path TO multi_mx_function_call_delegation, public;
+-- create_distributed_function is disallowed from worker nodes
+select create_distributed_function('mx_call_func(int,int)');
+ERROR: operation is not allowed on this node
+HINT: Connect to the coordinator and run it again.
+\c - - - :master_port
+SET search_path TO multi_mx_function_call_delegation, public;
+RESET client_min_messages;
+\set VERBOSITY terse
+DROP SCHEMA multi_mx_function_call_delegation CASCADE;
+NOTICE: drop cascades to 14 other objects
diff --git a/src/test/regress/sql/multi_mx_function_call_delegation.sql b/src/test/regress/sql/multi_mx_function_call_delegation.sql
index 4f7de1d92..513146a4e 100644
--- a/src/test/regress/sql/multi_mx_function_call_delegation.sql
+++ b/src/test/regress/sql/multi_mx_function_call_delegation.sql
@@ -103,9 +103,8 @@ select multi_mx_function_call_delegation.mx_call_func(2, 0);
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
--- This is currently an undetected failure when using the binary protocol
--- It should not be enabled by default until this is resolved. The tests above
--- will fail too, when changing the default to TRUE;
+-- this is fixed with pg14 and this will fail prior to
+-- pg 14
SET citus.enable_binary_protocol = TRUE;
select mx_call_func_custom_types('S', 'A');
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');